Python API Reference¶
Complete reference for aipartnerupflow's Python API. This document lists all available APIs and how to use them.
For detailed implementation details, see: - Source code: src/aipartnerupflow/ (well-documented with docstrings) - Test cases: tests/ (comprehensive examples of all features)
Table of Contents¶
- Overview
- TaskManager
- ExecutableTask
- BaseTask
- TaskTreeNode
- TaskRepository
- TaskExecutor
- TaskCreator
- Extension Registry
- Hooks
- Common Patterns
Overview¶
The core API consists of:
- TaskManager: Task orchestration and execution engine
- ExecutableTask: Interface for all task executors
- BaseTask: Recommended base class for custom executors
- TaskTreeNode: Task tree structure representation
- TaskRepository: Database operations for tasks
- TaskExecutor: Singleton for task execution management
- TaskCreator: Task tree creation from arrays
- ExtensionRegistry: Extension discovery and management
Quick Start Example¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
# Create database session
db = create_session()
# Create task manager
task_manager = TaskManager(db)
# Create a task
task = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
inputs={"resource": "cpu"}
)
# Build and execute task tree
task_tree = TaskTreeNode(task)
await task_manager.distribute_task_tree(task_tree)
# Get result
result = await task_manager.task_repository.get_task_by_id(task.id)
print(f"Result: {result.result}")
if __name__ == "__main__":
asyncio.run(main())
TaskManager¶
The main class for orchestrating and executing task trees.
Initialization¶
from aipartnerupflow import TaskManager, create_session
db = create_session()
task_manager = TaskManager(
db,
root_task_id=None, # Optional: Root task ID for streaming
pre_hooks=None, # Optional: List of pre-execution hooks
post_hooks=None, # Optional: List of post-execution hooks
executor_instances=None # Optional: Shared executor instances dict
)
See: src/aipartnerupflow/core/execution/task_manager.py for full implementation details.
Main Methods¶
distribute_task_tree(task_tree, use_callback=True)¶
Execute a task tree with dependency management and priority scheduling.
result = await task_manager.distribute_task_tree(
task_tree: TaskTreeNode,
use_callback: bool = True
) -> TaskTreeNode
See: tests/core/execution/test_task_manager.py for comprehensive examples.
distribute_task_tree_with_streaming(task_tree, use_callback=True)¶
Execute a task tree with real-time streaming for progress updates.
await task_manager.distribute_task_tree_with_streaming(
task_tree: TaskTreeNode,
use_callback: bool = True
) -> None
cancel_task(task_id, error_message=None)¶
Cancel a running task execution.
result = await task_manager.cancel_task(
task_id: str,
error_message: str | None = None
) -> Dict[str, Any]
Properties¶
task_repository(TaskRepository): Access to task repository for database operationsstreaming_callbacks(StreamingCallbacks): Streaming callbacks instance
See: Source code in src/aipartnerupflow/core/execution/task_manager.py for all available methods and detailed documentation.
BaseTask¶
Recommended base class for creating custom executors. Provides automatic registration via decorator.
Usage¶
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class MyExecutor(BaseTask):
id = "my_executor"
name = "My Executor"
description = "Does something useful"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "completed", "result": "..."}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"param": {"type": "string", "description": "Parameter"}
},
"required": ["param"]
}
See: tests/extensions/tools/test_tools_decorator.py and docs/guides/custom-tasks.md for examples.
ExecutableTask¶
Abstract base class for all task executors. Use BaseTask for simplicity, or ExecutableTask for more control.
Required Interface¶
id(property): Unique identifiername(property): Display namedescription(property): Descriptionexecute(inputs): Main execution logic (async)get_input_schema(): Return JSON Schema for inputs
Optional Methods¶
cancel(): Cancel task execution (optional)
See: src/aipartnerupflow/core/interfaces/executable_task.py for full interface definition.
TaskTreeNode¶
Represents a node in a task tree structure.
Main Methods¶
add_child(child): Add a child nodecalculate_progress(): Calculate progress (0.0 to 1.0)calculate_status(): Calculate overall status
Properties¶
task(TaskModel): The task model instancechildren(List[TaskTreeNode]): List of child nodes
See: src/aipartnerupflow/core/types.py for full implementation and tests/core/execution/test_task_manager.py for usage examples.
TaskRepository¶
Database operations for tasks.
Main Methods¶
create_task(...): Create a new taskget_task_by_id(task_id): Get task by IDget_root_task(task): Get root taskbuild_task_tree(task): Build task tree from taskupdate_task_status(task_id, status, ...): Update task status and related fields (error, result, progress, timestamps)update_task_inputs(task_id, inputs): Update task inputsupdate_task_dependencies(task_id, dependencies): Update task dependencies (with validation)update_task_name(task_id, name): Update task nameupdate_task_priority(task_id, priority): Update task priorityupdate_task_params(task_id, params): Update executor parametersupdate_task_schemas(task_id, schemas): Update validation schemasdelete_task(task_id): Physically delete a task from the databaseget_all_children_recursive(task_id): Recursively get all child tasks (including grandchildren)find_dependent_tasks(task_id): Find all tasks that depend on a given task (reverse dependencies)list_tasks(...): List tasks with filters
See: src/aipartnerupflow/core/storage/sqlalchemy/task_repository.py for all methods and tests/core/storage/sqlalchemy/test_task_repository.py for examples.
Note on Task Updates: - Critical fields (parent_id, user_id, dependencies) are validated strictly: - parent_id and user_id: Cannot be updated (always rejected) - dependencies: Can only be updated for pending tasks, with validation for references, circular dependencies, and executing dependents - Other fields can be updated freely from any task status - For API-level updates with validation, use the tasks.update JSON-RPC endpoint via TaskRoutes.handle_task_update()
Note on Task Deletion: - delete_task() performs physical deletion (not soft-delete) - For API-level deletion with validation, use the tasks.delete JSON-RPC endpoint via TaskRoutes.handle_task_delete() - The API endpoint validates that all tasks (task + children) are pending and checks for dependencies before deletion
TaskCreator¶
Create task trees from task arrays.
Main Methods¶
create_task_tree_from_array(tasks): Create task tree from array of task dictionariescreate_task_copy(original_task, children=False): Create a copy of an existing task tree for re-execution. Ifchildren=True, also copy each direct child task with its dependencies (deduplication ensures tasks depending on multiple copied tasks are only copied once).
See: src/aipartnerupflow/core/execution/task_creator.py for implementation and tests/core/execution/test_task_creator.py for examples.
TaskModel¶
Database model for tasks.
Main Fields¶
id,parent_id,user_id,name,status,prioritydependencies,inputs,params,result,error,schemasprogress,has_childrencreated_at,started_at,updated_at,completed_atoriginal_task_id,has_copy
Methods¶
to_dict(): Convert model to dictionary
See: src/aipartnerupflow/core/storage/sqlalchemy/models.py for full model definition.
TaskStatus¶
Task status constants and utilities.
Constants¶
PENDING,IN_PROGRESS,COMPLETED,FAILED,CANCELLED
Methods¶
is_terminal(status): Check if status is terminalis_active(status): Check if status is active
See: src/aipartnerupflow/core/storage/sqlalchemy/models.py for implementation.
Utility Functions¶
Session Management¶
create_session(): Create a new database sessionget_default_session(): Get the default database session
Extension Registry¶
executor_register(): Decorator to register executors (recommended)register_pre_hook(hook): Register pre-execution hookregister_post_hook(hook): Register post-execution hookget_registry(): Get extension registry instance
See: src/aipartnerupflow/core/decorators.py and src/aipartnerupflow/core/extensions/registry.py for implementation.
Type Definitions¶
TaskPreHook: Type alias for pre-execution hook functionsTaskPostHook: Type alias for post-execution hook functions
See: src/aipartnerupflow/core/extensions/types.py for type definitions.
Error Handling¶
Common Exceptions¶
ValueError: Invalid input parametersRuntimeError: Execution errorsKeyError: Missing required fields
Error Response Format¶
Tasks that fail return:
Common Patterns¶
Pattern 1: Simple Task Execution¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Create task
task = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
inputs={"resource": "cpu"}
)
# Build tree
tree = TaskTreeNode(task)
# Execute
await task_manager.distribute_task_tree(tree)
# Get result
result = await task_manager.task_repository.get_task_by_id(task.id)
print(f"Status: {result.status}")
print(f"Result: {result.result}")
if __name__ == "__main__":
asyncio.run(main())
Pattern 2: Sequential Tasks (Dependencies)¶
# Create tasks with dependencies
task1 = await task_manager.task_repository.create_task(
name="fetch_data",
user_id="user123",
priority=1
)
task2 = await task_manager.task_repository.create_task(
name="process_data",
user_id="user123",
parent_id=task1.id,
dependencies=[{"id": task1.id, "required": True}], # Waits for task1
priority=2,
inputs={"data": []} # Will be populated from task1 result
)
task3 = await task_manager.task_repository.create_task(
name="save_results",
user_id="user123",
parent_id=task1.id,
dependencies=[{"id": task2.id, "required": True}], # Waits for task2
priority=3
)
# Build tree
root = TaskTreeNode(task1)
root.add_child(TaskTreeNode(task2))
root.add_child(TaskTreeNode(task3))
# Execute (order: task1 → task2 → task3)
await task_manager.distribute_task_tree(root)
Pattern 3: Parallel Tasks¶
# Create root
root_task = await task_manager.task_repository.create_task(
name="root",
user_id="user123",
priority=1
)
# Create parallel tasks (no dependencies between them)
task1 = await task_manager.task_repository.create_task(
name="task1",
user_id="user123",
parent_id=root_task.id,
priority=2
)
task2 = await task_manager.task_repository.create_task(
name="task2",
user_id="user123",
parent_id=root_task.id,
priority=2 # Same priority, no dependencies = parallel
)
task3 = await task_manager.task_repository.create_task(
name="task3",
user_id="user123",
parent_id=root_task.id,
priority=2
)
# Build tree
root = TaskTreeNode(root_task)
root.add_child(TaskTreeNode(task1))
root.add_child(TaskTreeNode(task2))
root.add_child(TaskTreeNode(task3))
# Execute (all three run in parallel)
await task_manager.distribute_task_tree(root)
Pattern 4: Error Handling¶
# Execute task tree
await task_manager.distribute_task_tree(task_tree)
# Check all tasks for errors
def check_task_status(task_id):
task = await task_manager.task_repository.get_task_by_id(task_id)
if task.status == "failed":
print(f"Task {task_id} failed: {task.error}")
return False
elif task.status == "completed":
print(f"Task {task_id} completed: {task.result}")
return True
return None
# Check root task
root_status = check_task_status(root_task.id)
# Check all children
for child in task_tree.children:
check_task_status(child.task.id)
Pattern 5: Using TaskExecutor¶
from aipartnerupflow.core.execution.task_executor import TaskExecutor
# Get singleton instance
executor = TaskExecutor()
# Execute tasks from definitions
tasks = [
{
"id": "task1",
"name": "my_executor",
"user_id": "user123",
"inputs": {"key": "value"}
},
{
"id": "task2",
"name": "my_executor",
"user_id": "user123",
"parent_id": "task1",
"dependencies": [{"id": "task1", "required": True}],
"inputs": {"key": "value2"}
}
]
# Execute
result = await executor.execute_tasks(
tasks=tasks,
root_task_id="root_123",
use_streaming=False
)
print(f"Execution result: {result}")
Pattern 6: Custom Executor with Error Handling¶
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class RobustExecutor(BaseTask):
id = "robust_executor"
name = "Robust Executor"
description = "Executor with comprehensive error handling"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
try:
# Validate inputs
if not inputs.get("data"):
return {
"status": "failed",
"error": "data is required",
"error_type": "validation_error"
}
# Process
result = self._process(inputs["data"])
return {
"status": "completed",
"result": result
}
except ValueError as e:
return {
"status": "failed",
"error": str(e),
"error_type": "ValueError"
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__
}
def _process(self, data):
# Your processing logic
return {"processed": data}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"data": {"type": "string", "description": "Data to process"}
},
"required": ["data"]
}
A2A Protocol Integration¶
aipartnerupflow implements the A2A (Agent-to-Agent) Protocol standard, allowing seamless integration with other A2A-compatible agents and services.
Using A2A Client SDK¶
The A2A protocol provides an official client SDK for easy integration. Here's how to use it with aipartnerupflow:
Installation¶
Basic Example¶
from a2a.client import ClientFactory, ClientConfig
from a2a.types import Message, DataPart, Role, AgentCard
import httpx
import uuid
import asyncio
async def execute_task_via_a2a():
# Create HTTP client
httpx_client = httpx.AsyncClient(base_url="http://localhost:8000")
# Create A2A client config
config = ClientConfig(
streaming=True,
polling=False,
httpx_client=httpx_client
)
# Create client factory
factory = ClientFactory(config=config)
# Fetch agent card
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
card_response = await httpx_client.get(AGENT_CARD_WELL_KNOWN_PATH)
agent_card = AgentCard(**card_response.json())
# Create A2A client
client = factory.create(card=agent_card)
# Prepare task data
task_data = {
"id": "task-1",
"name": "My Task",
"user_id": "user123",
"schemas": {
"method": "system_info_executor"
},
"inputs": {}
}
# Create A2A message
data_part = DataPart(kind="data", data={"tasks": [task_data]})
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part]
)
# Send message and process responses
async for response in client.send_message(message):
if isinstance(response, Message):
for part in response.parts:
if part.kind == "data" and isinstance(part.data, dict):
result = part.data
print(f"Status: {result.get('status')}")
print(f"Progress: {result.get('progress')}")
await httpx_client.aclose()
# Run
asyncio.run(execute_task_via_a2a())
Push Notification Configuration¶
You can use push notifications to receive task execution updates via callback URL:
from a2a.types import Configuration, PushNotificationConfig
# Create push notification config
push_config = PushNotificationConfig(
url="https://your-server.com/callback",
headers={
"Authorization": "Bearer your-token"
}
)
# Create configuration
configuration = Configuration(
push_notification_config=push_config
)
# Create message with configuration
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part],
configuration=configuration
)
# Send message - server will use callback mode
async for response in client.send_message(message):
# Initial response only
print(f"Initial response: {response}")
break
The server will send task status updates to your callback URL as the task executes.
Cancelling Tasks¶
You can cancel a running task using the A2A Protocol cancel method:
from a2a.client import ClientFactory, ClientConfig
from a2a.types import Message, DataPart, Role, AgentCard, RequestContext
import httpx
import uuid
import asyncio
async def cancel_task_via_a2a():
# Create HTTP client
httpx_client = httpx.AsyncClient(base_url="http://localhost:8000")
# Create A2A client config
config = ClientConfig(
streaming=True,
polling=False,
httpx_client=httpx_client
)
# Create client factory
factory = ClientFactory(config=config)
# Fetch agent card
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
card_response = await httpx_client.get(AGENT_CARD_WELL_KNOWN_PATH)
agent_card = AgentCard(**card_response.json())
# Create A2A client
client = factory.create(card=agent_card)
# Create cancel request
# Task ID can be provided in multiple ways (priority order):
# 1. task_id in RequestContext
# 2. context_id in RequestContext
# 3. metadata.task_id
# 4. metadata.context_id
cancel_message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[], # Empty parts for cancel
task_id="my-running-task", # Task ID to cancel
metadata={
"error_message": "User requested cancellation" # Optional custom message
}
)
# Send cancel request and process responses
async for response in client.send_message(cancel_message):
if isinstance(response, Message):
for part in response.parts:
if part.kind == "data" and isinstance(part.data, dict):
result = part.data
status = result.get("status")
if status == "cancelled":
print(f"Task cancelled successfully: {result.get('message')}")
if "token_usage" in result:
print(f"Token usage: {result['token_usage']}")
if "result" in result:
print(f"Partial result: {result['result']}")
elif status == "failed":
print(f"Cancellation failed: {result.get('error', result.get('message'))}")
await httpx_client.aclose()
# Run
asyncio.run(cancel_task_via_a2a())
Notes: - The cancel() method sends a TaskStatusUpdateEvent through the EventQueue - Task ID extraction follows priority: task_id > context_id > metadata.task_id > metadata.context_id - If the task supports cancellation, the executor's cancel() method will be called - Token usage and partial results are preserved if available - The response includes protocol: "a2a" in the event data
Streaming Mode¶
Enable streaming mode to receive real-time progress updates:
# Create message with streaming enabled via metadata
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part],
metadata={"stream": True}
)
# Send message - will receive multiple updates
async for response in client.send_message(message):
if isinstance(response, Message):
# Process streaming updates
for part in response.parts:
if part.kind == "data":
update = part.data
print(f"Update: {update}")
Task Tree Execution¶
Execute complex task trees with dependencies:
tasks = [
{
"id": "parent-task",
"name": "Parent Task",
"user_id": "user123",
"dependencies": [
{"id": "child-1", "required": True},
{"id": "child-2", "required": True}
],
"schemas": {
"method": "aggregate_results_executor"
},
"inputs": {}
},
{
"id": "child-1",
"name": "Child Task 1",
"parent_id": "parent-task",
"user_id": "user123",
"schemas": {
"method": "system_info_executor"
},
"inputs": {"resource": "cpu"}
},
{
"id": "child-2",
"name": "Child Task 2",
"parent_id": "parent-task",
"user_id": "user123",
"dependencies": [{"id": "child-1", "required": True}],
"schemas": {
"method": "system_info_executor"
},
"inputs": {"resource": "memory"}
}
]
# Create message with task tree
data_part = DataPart(kind="data", data={"tasks": tasks})
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part]
)
# Execute task tree
async for response in client.send_message(message):
# Process responses
pass
Copy Before Execution¶
The A2A protocol supports copying existing tasks before execution. This is useful for preserving execution history while creating new execution instances.
Basic Copy Execution¶
To copy a task before execution, set copy_execution=True in the message metadata:
from a2a.types import Message, DataPart, Role
import uuid
# Create message with copy_execution metadata
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[DataPart(kind="data", data={})], # Empty tasks array when copying
metadata={
"task_id": "existing-task-id",
"copy_execution": True
}
)
# Execute copied task
async for response in client.send_message(message):
if isinstance(response, Message):
for part in response.parts:
if part.kind == "data" and isinstance(part.data, dict):
result = part.data
print(f"Copied task ID: {result.get('root_task_id')}")
print(f"Original task ID: {result.get('metadata', {}).get('original_task_id')}")
Copy with Children¶
To also copy child tasks and their dependencies:
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[DataPart(kind="data", data={})],
metadata={
"task_id": "parent-task-id",
"copy_execution": True,
"copy_children": True # Also copy children
}
)
Complete Example¶
import asyncio
from a2a.client import ClientFactory, ClientConfig
from a2a.types import Message, DataPart, Role, AgentCard
import httpx
async def copy_and_execute_task():
# Setup A2A client
httpx_client = httpx.AsyncClient(base_url="http://localhost:8000")
config = ClientConfig(streaming=True, httpx_client=httpx_client)
factory = ClientFactory(config=config)
# Get agent card
card_response = await httpx_client.get("/.well-known/agent-card")
agent_card = AgentCard(**card_response.json())
client = factory.create(card=agent_card)
# Copy existing task and execute
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[DataPart(kind="data", data={})],
metadata={
"task_id": "my-existing-task-id",
"copy_execution": True,
"copy_children": True,
"stream": True # Enable streaming
}
)
# Process responses
async for response in client.send_message(message):
if isinstance(response, Message):
for part in response.parts:
if part.kind == "data" and isinstance(part.data, dict):
data = part.data
if "metadata" in data and "original_task_id" in data["metadata"]:
print(f"Original: {data['metadata']['original_task_id']}")
print(f"Copied: {data.get('root_task_id')}")
print(f"Status: {data.get('status')}, Progress: {data.get('progress')}")
await httpx_client.aclose()
asyncio.run(copy_and_execute_task())
Key Points: - metadata.task_id: The ID of the existing task to copy - metadata.copy_execution: Set to True to enable copy execution - metadata.copy_children: Set to True to also copy child tasks (optional) - The response includes original_task_id in metadata for reference - Original task remains unchanged; a new copy is created and executed
A2A Protocol Documentation¶
For detailed information about the A2A Protocol, please refer to the official documentation:
- A2A Protocol Official Documentation: https://www.a2aprotocol.org/en/docs
- A2A Protocol Homepage: https://www.a2aprotocol.org