aipartnerupflow Architecture Design Document¶
Core Positioning¶
The core of aipartnerupflow is task orchestration and execution specifications
This is a task orchestration framework library that provides: 1. Task Orchestration Specifications: Task tree construction, dependency management, priority scheduling 2. Task Execution Specifications: Unified execution interface supporting multiple task types 3. Supporting Features: Storage, unified external API interfaces
aipartnerupflow is a reusable framework library.
Architecture Layers¶
┌─────────────────────────────────────────────────────────────┐
│ Unified External API Interface Layer │
│ - A2A Protocol Server (HTTP/SSE/WebSocket) [a2a] │
│ (A2A Protocol is the standard protocol for agent communication)
│ - CLI Tools [cli] │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Task Orchestration Specification Layer (CORE) │
│ - TaskManager: Task tree orchestration, dependency │
│ management, priority scheduling │
│ - ExecutableTask: Task execution specification interface │
└─────────────────────────────────────────────────────────────┘
Protocol Standard¶
A2A (Agent-to-Agent) Protocol is the standard protocol adopted by aipartnerupflow for agent communication.
Why A2A Protocol? - Mature Standard: A2A Protocol is a mature, production-ready specification designed specifically for AI Agent systems - Complete Features: Provides streaming execution, task management, agent capability descriptions, and multiple transport methods - Well-Designed Abstraction: RequestContext encapsulates all request information, EventQueue unifies streaming updates - Protocol-Agnostic: Can be implemented over different transport layers (HTTP REST, SSE, WebSocket)
A2A Protocol Components: - AgentExecutor: Interface for executing agent tasks - RequestContext: Encapsulates method, parameters, metadata, and message content - EventQueue: Unified interface for streaming updates and real-time progress notifications - AgentCard / AgentSkill: Agent capability description mechanism ↓ ┌─────────────────────────────────────────────────────────────┐ │ Task Execution Layer │ │ - CrewManager [crewai]: CrewAI (LLM) task execution │ │ - BatchManager [batch]: Batch task orchestration │ │ - Custom Tasks: Traditional external service calls, │ │ automated task services, etc. │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Supporting Features Layer │ │ - Storage: Task state persistence (DuckDB/PostgreSQL) │ │ - Streaming: Real-time progress updates │ └─────────────────────────────────────────────────────────────┘
## Module Organization
### Core (pip install aipartnerupflow)
**Pure task orchestration framework - NO CrewAI dependency**
interfaces/ # Core interfaces ├── executable_task.py # ExecutableTask interface ├── base/ # Base class implementations │ └── base_task.py # BaseTask base class └── storage.py # TaskStorage interface
storage/ # Storage implementation ├── factory.py # create_storage() function ├── sqlalchemy/ # SQLAlchemy implementation └── dialects/ # Database dialects (DuckDB/PostgreSQL)
utils/ # Utility functions
extensions/crewai/ ├── init.py ├── crew_manager.py # CrewManager - CrewAI wrapper ├── batch_manager.py # BatchManager - batch execution of multiple crews └── types.py # CrewManagerState, BatchState**Installation**: `pip install aipartnerupflow[crewai]`
**Includes**:
- CrewManager for LLM-based agent crews
- BatchManager for atomic batch execution of multiple crews
- Related type definitions
**Note**: BatchManager is part of [crewai] because it's specifically designed for batching multiple CrewAI crews together.
#### [http_executor] - HTTP/Remote API Task Execution (Future)
**Installation**: `pip install aipartnerupflow[http]` (future)
**Purpose**: Execute tasks by calling remote HTTP/HTTPS APIs.
**Features**:
- HTTP/HTTPS request execution
- Retry logic with exponential backoff
- Authentication support (API keys, OAuth, etc.)
- Request/response validation
- Timeout handling
**Use Case**: Tasks that need to call external REST APIs, webhooks, or remote services.
**Note**: This is a future feature. CrewAI is the first task execution implementation.
**Note**: For examples and learning templates, see the test cases in `tests/integration/` and `tests/extensions/`. Test cases serve as comprehensive examples demonstrating real-world usage patterns.
#### [a2a] - A2A Protocol Server
Includes: - Task orchestration specifications (TaskManager) - Core interfaces (ExecutableTask, BaseTask, TaskStorage) - Storage (DuckDB default) - NO CrewAI dependency
Use case: Users who want pure orchestration framework with custom task implementations.
With CrewAI Support¶
Adds: - CrewManager for LLM-based tasks via CrewAI - BatchManager for atomic batch execution of multiple crews
Use case: Users who want LLM agent capabilities and/or batch execution of multiple crews.
Full Installation¶
Includes: Core + crewai + batch + api + cli + examples + postgres
Core Components¶
1. Task Orchestration Specifications (Core)¶
TaskManager (execution/task_manager.py)¶
- Core Responsibility: Task orchestration and execution specifications
- Features:
- Task tree construction and management (TaskTreeNode)
- Dependency resolution and execution order control
- Priority scheduling
- Task state management (pending, in_progress, completed, failed)
- Task lifecycle management (create, execute, complete, failure handling)
ExecutableTask (interfaces/executable_task.py)¶
- Responsibility: Define task execution specification interface
- Implementations:
CrewManager[crewai]: LLM-based tasks (via CrewAI)- Custom tasks: Traditional external service calls, automated task services, etc.
2. Task Execution Layer¶
CrewManager (extensions/crewai/crew_manager.py) [crewai]¶
- Responsibility: CrewAI (LLM) task execution - ExecutableTask implementation
- Features:
- Wraps CrewAI Crew functionality
- Supports LLM-based agent collaboration
- Implements ExecutableTask interface
- Type: Task executor (one of many possible implementations)
BatchManager (extensions/crewai/batch_manager.py) [crewai]¶
- Responsibility: Batch task orchestration for multiple crews (simple merging)
- Features:
- Atomic execution of multiple crews
- Result merging
- Not an ExecutableTask (it's a container, not a task)
- Location: Part of [crewai] because it's specifically for batching CrewAI crews together
HTTPExecutor (extensions/http_executor/http_executor.py) [http] (Future)¶
- Responsibility: Remote HTTP/API call task execution - ExecutableTask implementation
- Features:
- HTTP/HTTPS request execution
- Retry logic, timeout handling
- Authentication support
- Type: Task executor (future implementation)
- Use Case: Tasks that call external REST APIs or remote services
Custom Tasks (Core)¶
- Types:
- User-defined implementations of ExecutableTask
- Can be any task type (database operations, file processing, etc.)
- Implementation: Inherit from
ExecutableTaskorBaseTask - Location: Users create these in their own codebase
Built-in Executors (Features)¶
- CrewManager [crewai]: LLM-based tasks via CrewAI
- HTTPExecutor [http] (Future): Remote API calls via HTTP
- Future executors: Shell executor, database executor, queue executor, etc.
- Location:
extensions/directory
3. Supporting Features¶
Storage (storage/)¶
- Responsibility: Task state persistence
- Implementations:
- DuckDB (default, embedded, zero-config)
- PostgreSQL (optional, production environment)
- Features:
- Task creation, querying, updating
- Task tree state management
TaskModel Design and A2A Protocol Integration:
The storage layer uses TaskModel (SQLAlchemy model) to persist task definitions and execution results. A key design decision is the separation between TaskModel (task definition) and A2A Protocol's Task (execution instance).
Conceptual Separation:
| Component | Nature | Purpose | Lifecycle |
|---|---|---|---|
| TaskModel | Task definition (static) | Task orchestration, dependency management, task tree structure | Persistent (can have multiple executions) |
| A2A Protocol Task | Execution instance (dynamic) | LLM message context management, execution tracking | Single execution lifecycle |
Key Understanding:
A2A Protocol's Task is primarily designed for LLM message context management. It represents an execution instance with: - Task.history: LLM conversation history - Task.artifacts: LLM-generated results - Task.status.message: LLM response messages
TaskModel, on the other hand, focuses on task orchestration: - Task definition metadata (name, dependencies, priority, schemas) - Task tree structure (parent_id, children) - Latest execution results (extracted from A2A Task.artifacts) - Execution status and progress
Mapping Relationship:
TaskModel (Task Definition) A2A Protocol Task (Execution Instance)
───────────────────────────────────────────────────────────────────────────
TaskModel.id → Task.context_id (task definition ID = context ID)
TaskModel.status → TaskStatus.state (status mapping)
TaskModel.result → Task.artifacts (execution result)
TaskModel.error → TaskStatus.message (error message)
TaskModel.user_id → Task.metadata["user_id"] (optional user identifier)
Task.id → Execution instance ID (A2A Protocol internal, auto-generated)
Task.history → Not stored in TaskModel (LLM conversation history, execution-level)
Design Decisions:
- TaskModel does NOT store execution-level fields:
- ❌
context_id: Execution-level concept (TaskModel.id serves this purpose) - ❌
artifacts: Execution instance field (extracted to TaskModel.result) - ❌
history: Execution instance field (LLM conversation history, managed by A2A Protocol) - ❌
metadata: Execution instance field (orchestration info stored in TaskModel fields) -
❌
kind: A2A Protocol-specific field -
One-to-Many Relationship:
- One
TaskModelcan have multipleTaskexecution instances - Each execution creates a new A2A Protocol
Taskwith a unique execution instance ID -
TaskModel.id(task definition ID) maps toTask.context_id -
Table Name:
- Default table name:
apflow_tasks(prefixed to distinguish from A2A Protocol'staskstable) - Configurable via
AIPARTNERUPFLOW_TASK_TABLE_NAMEenvironment variable -
See Configuration for details
-
Storage Bridge:
- Uses A2A SDK's
InMemoryTaskStorefor A2A Protocol task execution instances - TaskModel persistence is handled by
TaskRepository(separate from A2A TaskStore) - Converts between
TaskModel(task definition) and A2A ProtocolTask(execution instance) - Updates
TaskModelwith execution results from A2A ProtocolTask
API (api/) [a2a]¶
- Responsibility: Unified external API service layer supporting multiple network protocols
- Current Implementation: A2A Protocol Server (
api/a2a/) - Protocol Standard: A2A (Agent-to-Agent) Protocol
- Transport Layers: HTTP, SSE, WebSocket (all implementing A2A Protocol)
- Structure: A2A implementation organized in
api/a2a/subdirectory for better code organization - Future Extensions: May include additional protocols (e.g., REST API in
api/rest/) for direct HTTP access
Streaming (execution/streaming_callbacks.py)¶
- Responsibility: Real-time progress updates
- Features: Real-time task execution state updates
Task Type Support¶
1. LLM Tasks (CrewAI) [crewai]¶
# Requires: pip install aipartnerupflow[crewai]
from aipartnerupflow.extensions.crewai import CrewManager
crew = CrewManager(
agents=[{"role": "Analyst", "goal": "Analyze data"}],
tasks=[{"description": "Analyze input", "agent": "Analyst"}]
)
result = await crew.execute(inputs={...})
2. Traditional External Service Calls¶
from aipartnerupflow.core.interfaces.plugin import ExecutableTask
class APICallTask(ExecutableTask):
async def execute(self, inputs):
# Call external API
response = await http_client.post(url, data=inputs)
return response.json()
3. Automated Task Services¶
class ScheduledTask(ExecutableTask):
async def execute(self, inputs):
# Execute scheduled task logic
return {"status": "completed"}
Task Orchestration Patterns¶
Simple Batch Orchestration (BatchManager) [crewai]¶
- Multiple crews execute sequentially, results are merged
- Atomic operation: failure of any crew causes entire batch to fail
- Part of [crewai] extra (designed for batching CrewAI crews)
Complex Task Tree Orchestration (TaskManager) [core]¶
- Supports dependencies
- Supports priorities
- Supports hierarchical task tree structure
- Automatic scheduling and execution
- No external dependencies
Design Principles¶
- Clear Core: Task orchestration and execution specifications are the core
- Pure Core: Core has no external LLM/AI dependencies (CrewAI optional)
- Unified Interface: All task types unified through ExecutableTask interface
- Executor Pattern: Different executors (CrewAI, HTTP, etc.) are separate features
- Flexible Orchestration: Supports simple batch to complex task tree
- Optional Storage: Provides persistence but configurable
- Unified API: Provides unified external interface using A2A Protocol (standard protocol)
- Modular Installation: Users install only what they need
- Learning Resources: Test cases in
tests/integration/andtests/extensions/serve as comprehensive examples