Python API Reference¶
Decorator Registration: override Parameter¶
All extension and tool registration decorators accept an override parameter:
override: bool = False
IfTrue, always force override any previous registration for this name or ID.
IfFalseand the name/ID exists, registration is skipped.
Example:
from apflow.core.extensions.decorators import executor_register
@executor_register(override=True)
class MyExecutor(BaseTask):
...
This will force override any previous registration for MyExecutor.
Tool Registration Example:
from apflow.core.tools.decorators import tool_register
@tool_register(name="custom_tool", override=True)
class CustomTool(BaseTool):
...
If a tool with the same name is already registered, setting override=True will force the new registration.
Wherever you see override in decorator signatures, it means:
If
override=True, any existing registration for the same name or ID will be forcibly replaced.
Looking for a quick syntax lookup? See the API Quick Reference for concise code patterns and usage. This document provides detailed explanations and examples.
Complete reference for apflow's Python API. This document lists all available APIs and how to use them.
For detailed implementation details, see: - Source code: src/apflow/ (well-documented with docstrings) - Test cases: tests/ (comprehensive examples of all features)
Table of Contents¶
- Overview
- TaskManager
- TaskBuilder
- ExecutableTask
- BaseTask
- TaskTreeNode
- TaskRepository
- TaskExecutor
- TaskCreator
- Extension Registry
- Hooks
- Common Patterns
Overview¶
The core API consists of:
- TaskManager: Task orchestration and execution engine
- TaskBuilder: Fluent API for creating and executing tasks
- ExecutableTask: Interface for all task executors
- BaseTask: Recommended base class for custom executors
- TaskTreeNode: Task tree structure representation
- TaskRepository: Database operations for tasks
- TaskExecutor: Singleton for task execution management
- TaskCreator: Task tree creation from arrays
- ExtensionRegistry: Extension discovery and management
Understanding Lifecycles:
For a complete understanding of task execution flow, database session management, and hook context lifecycle, see Task Tree Execution Lifecycle. This is essential reading for: - Implementing hooks that access the database - Understanding session scope and transaction boundaries - Debugging execution issues - Ensuring proper resource cleanup
Quick Start Example¶
import asyncio
from apflow import TaskManager, TaskTreeNode, create_session
async def main():
# Create database session
db = create_session()
# Create task manager
task_manager = TaskManager(db)
# Create a task
task = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
inputs={"resource": "cpu"}
)
# Build and execute task tree
task_tree = TaskTreeNode(task)
await task_manager.distribute_task_tree(task_tree)
# Get result
result = await task_manager.task_repository.get_task_by_id(task.id)
print(f"Result: {result.result}")
if __name__ == "__main__":
asyncio.run(main())
TaskManager¶
The main class for orchestrating and executing task trees.
Initialization¶
from apflow import TaskManager, create_session
db = create_session()
task_manager = TaskManager(
db,
root_task_id=None, # Optional: Root task ID for streaming
pre_hooks=None, # Optional: List of pre-execution hooks
post_hooks=None, # Optional: List of post-execution hooks
executor_instances=None # Optional: Shared executor instances dict
)
See: src/apflow/core/execution/task_manager.py for full implementation details.
Main Methods¶
distribute_task_tree(task_tree, use_callback=True)¶
Execute a task tree with dependency management and priority scheduling.
result = await task_manager.distribute_task_tree(
task_tree: TaskTreeNode,
use_callback: bool = True
) -> TaskTreeNode
See: tests/core/execution/test_task_manager.py for comprehensive examples.
distribute_task_tree_with_streaming(task_tree, use_callback=True)¶
Execute a task tree with real-time streaming for progress updates.
await task_manager.distribute_task_tree_with_streaming(
task_tree: TaskTreeNode,
use_callback: bool = True
) -> None
cancel_task(task_id, error_message=None)¶
Cancel a running task execution.
result = await task_manager.cancel_task(
task_id: str,
error_message: str | None = None
) -> Dict[str, Any]
Properties¶
task_repository(TaskRepository): Access to task repository for database operationsstreaming_callbacks(StreamingCallbacks): Streaming callbacks instance
See: Source code in src/apflow/core/execution/task_manager.py for all available methods and detailed documentation.
TaskBuilder¶
Fluent API for creating and executing tasks with method chaining.
Initialization¶
from apflow.core.builders import TaskBuilder
builder = TaskBuilder(
task_manager: TaskManager,
executor_id: str,
name: str | None = None,
user_id: str | None = None,
# ... other parameters
)
Method Chaining¶
result = await (
TaskBuilder(task_manager, "rest_executor")
.with_name("fetch_data")
.with_user("user_123")
.with_input("url", "https://api.example.com")
.with_input("method", "GET")
.depends_on("auth_task_id")
.execute()
)
Available Methods¶
with_name(name: str)- Set task namewith_user(user_id: str)- Set user IDwith_parent(parent_id: str)- Set parent task IDwith_priority(priority: int)- Set task priority (default: 2)with_inputs(inputs: Dict[str, Any])- Set all input parameterswith_input(key: str, value: Any)- Set single input parameterwith_params(params: Dict[str, Any])- Set task parameterswith_schemas(schemas: Dict[str, Any])- Set task schemaswith_dependencies(dependencies: Sequence[Dict[str, Any]])- Set task dependenciesdepends_on(*task_ids: str)- Add dependencies by task IDscopy_of(original_task_id: str)- Create copy of existing taskenable_streaming(context: Any | None = None)- Enable streaming executionenable_demo_mode(sleep_scale: float | None = None)- Enable demo modeexecute()- Execute the task and return result
See: Source code in src/apflow/core/builders.py for complete implementation.
BaseTask¶
Recommended base class for creating custom executors. Provides automatic registration via decorator.
Usage¶
from apflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class MyExecutor(BaseTask):
id = "my_executor"
name = "My Executor"
description = "Does something useful"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "completed", "result": "..."}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"param": {"type": "string", "description": "Parameter"}
},
"required": ["param"]
}
See: tests/extensions/tools/test_tools_decorator.py and docs/guides/custom-tasks.md for examples.
ExecutableTask¶
Abstract base class for all task executors. Use BaseTask for simplicity, or ExecutableTask for more control.
Required Interface¶
id(property): Unique identifiername(property): Display namedescription(property): Descriptionexecute(inputs): Main execution logic (async)get_input_schema(): Return JSON Schema for inputs
Optional Methods¶
cancel(): Cancel task execution (optional)
See: src/apflow/core/interfaces/executable_task.py for full interface definition.
TaskTreeNode¶
Represents a node in a task tree structure.
Main Methods¶
add_child(child): Add a child nodecalculate_progress(): Calculate progress (0.0 to 1.0)calculate_status(): Calculate overall status
Properties¶
task(TaskModel): The task model instancechildren(List[TaskTreeNode]): List of child nodes
See: src/apflow/core/types.py for full implementation and tests/core/execution/test_task_manager.py for usage examples.
TaskRepository¶
Database operations for tasks.
Main Methods¶
create_task(...): Create a new taskget_task_by_id(task_id): Get task by IDget_root_task(task): Get root taskbuild_task_tree(task): Build task tree from taskupdate_task(task_id, **kwarg)Update task fieldsdelete_task(task_id): Physically delete a task from the databaseget_all_children_recursive(task_id): Recursively get all child tasks (including grandchildren)list_tasks(...): List tasks with filters
See: src/apflow/core/storage/sqlalchemy/task_repository.py for all methods and tests/core/storage/sqlalchemy/test_task_repository.py for examples.
Note on Task Updates: - Critical fields (parent_id, user_id, dependencies) are validated strictly: - parent_id and user_id: Cannot be updated (always rejected) - dependencies: Can only be updated for pending tasks, with validation for references, circular dependencies, and executing dependents - Other fields can be updated freely from any task status - For API-level updates with validation, use the tasks.update JSON-RPC endpoint via TaskRoutes.handle_task_update()
Note on Task Deletion: - delete_task() performs physical deletion (not soft-delete) - For API-level deletion with validation, use the tasks.delete JSON-RPC endpoint via TaskRoutes.handle_task_delete() - The API endpoint validates that all tasks (task + children) are pending and checks for dependencies before deletion
TaskCreator¶
Create and manage task trees, links, copies, and archives.
Main Methods¶
create_task_tree_from_array(tasks): Create a task tree from an array of task dictionaries.from_link(original_task, ...): Create a new task as a reference (link) to an existing task. Returns a linked task or tree. Useful for deduplication and sharing results.from_copy(original_task, ...): Create a deep copy of an existing task or task tree. Supports copying children, dependencies, and selective subtree copying. Returns a new task or tree with new IDs.from_archive(original_task, ...): Create a read-only archive of an existing task or tree. Snapshots are immutable and preserve the state at the time of creation.from_mixed(original_task, ...): Create a new tree mixing links and copies, e.g., copy some tasks and link others for advanced workflows.
Example Usage¶
from apflow.core.execution import TaskCreator
# Assume db is a SQLAlchemy session
creator = TaskCreator(db)
# 1. Create a task tree from an array
tasks = [
{"id": "task_1", "name": "Task 1", "user_id": "user_123"},
{"id": "task_2", "name": "Task 2", "user_id": "user_123", "parent_id": "task_1"}
]
tree = await creator.create_task_tree_from_array(tasks)
# 2. Create a linked task (reference)
linked = await creator.from_link(original_task, user_id="user_123", parent_id=None)
# 3. Create a deep copy (optionally with children)
copied = await creator.from_copy(original_task, user_id="user_123", _recursive=True)
# 4. Create a archive (frozen, read-only)
archive = await creator.from_archive(original_task, user_id="user_123", _recursive=True)
# 5. Mixed: copy some, link others
mixed = await creator.from_mixed(original_task, user_id="user_123", link_task_ids=[...])
See: src/apflow/core/execution/task_creator.py for implementation and tests/core/execution/test_task_creator_origin_types.py for examples.
TaskModel¶
Database model for tasks.
Main Fields¶
id,parent_id,task_tree_id,user_id,name,status,prioritydependencies,inputs,params,result,error,schemasprogress,has_childrencreated_at,started_at,updated_at,completed_atoriginal_task_id,task_tree_id,origin_type,has_references
Methods¶
to_dict(): Convert model to dictionary
See: src/apflow/core/storage/sqlalchemy/models.py for full model definition.
TaskStatus¶
Task status constants and utilities.
Constants¶
PENDING,IN_PROGRESS,COMPLETED,FAILED,CANCELLED
Methods¶
is_terminal(status): Check if status is terminalis_active(status): Check if status is active
See: src/apflow/core/storage/sqlalchemy/models.py for implementation.
Utility Functions¶
Session Management¶
create_pooled_session(): Create a pooled database session context manager (recommended)create_session(): Create a new raw database sessionget_default_session(): Deprecated. Usecreate_pooled_session()instead.
Extension Registry¶
executor_register(): Decorator to register executors (recommended)register_pre_hook(hook): Register pre-execution hookregister_post_hook(hook): Register post-execution hookregister_task_tree_hook(event, hook): Register task tree lifecycle hookget_registry(): Get extension registry instanceget_available_executors(): Get list of available executors based on APFLOW_EXTENSIONS configuration
Getting Available Executors¶
The get_available_executors() function provides discovery of available executor types. This is useful for validating task schemas, generating UI options, or restricting executor access.
Function Signature:
def get_available_executors() -> dict[str, Any]:
"""
Get list of available executors based on APFLOW_EXTENSIONS configuration.
Returns:
Dictionary with:
- executors: List of available executor metadata
- count: Number of available executors
- restricted: Boolean indicating if access is restricted
- allowed_ids: List of allowed executor IDs (if restricted)
"""
Basic Usage:
from apflow.api.extensions import get_available_executors
# Get all available executors
result = get_available_executors()
print(f"Available executors: {result['count']}")
print(f"Restricted: {result['restricted']}")
for executor in result['executors']:
print(f" - {executor['id']}: {executor['name']}")
print(f" Extension: {executor['extension']}")
print(f" Description: {executor['description']}")
Response Structure:
{
"executors": [
{
"id": "system_info_executor",
"name": "System Info Executor",
"extension": "stdio",
"description": "Retrieve system information like CPU, memory, disk usage"
},
{
"id": "command_executor",
"name": "Command Executor",
"extension": "stdio",
"description": "Execute shell commands on the local system"
},
# ... more executors
],
"count": 2,
"restricted": False,
}
With APFLOW_EXTENSIONS Restriction: When APFLOW_EXTENSIONS=stdio,http is set, only executors from those extensions are returned:
result = get_available_executors()
# result['restricted'] == True
# result['allowed_ids'] == ['system_info_executor', 'command_executor', 'rest_executor']
Use Cases: - Validate task schemas against available executors before execution - Generate API/UI responses showing which executors users can access - Enforce security restrictions by limiting executor availability - Debug executor availability issues
Hook Database Access¶
get_hook_repository(): Get TaskRepository instance in hook context (recommended)get_hook_session(): Get database session in hook context
Hooks can access the database using the same session as TaskManager:
from apflow import register_pre_hook, get_hook_repository
@register_pre_hook
async def my_hook(task):
# Get repository from hook context
repo = get_hook_repository()
if repo:
# Modify task fields
await repo.update_task(task.id, priority=10)
# Query other tasks
pending = await repo.get_tasks_by_status("pending")
See: src/apflow/core/decorators.py and src/apflow/core/extensions/registry.py for implementation.
Type Definitions¶
TaskPreHook: Type alias for pre-execution hook functionsTaskPostHook: Type alias for post-execution hook functions
See: src/apflow/core/extensions/types.py for type definitions.
Error Handling¶
Common Exceptions¶
ValueError: Invalid input parametersRuntimeError: Execution errorsKeyError: Missing required fields
Error Response Format¶
Tasks that fail return:
Common Patterns¶
Pattern 1: Simple Task Execution¶
import asyncio
from apflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Create task
task = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
inputs={"resource": "cpu"}
)
# Build tree
tree = TaskTreeNode(task)
# Execute
await task_manager.distribute_task_tree(tree)
# Get result
result = await task_manager.task_repository.get_task_by_id(task.id)
print(f"Status: {result.status}")
print(f"Result: {result.result}")
if __name__ == "__main__":
asyncio.run(main())
Pattern 2: Sequential Tasks (Dependencies)¶
# Create tasks with dependencies
task1 = await task_manager.task_repository.create_task(
name="fetch_data",
user_id="user123",
priority=1
)
task2 = await task_manager.task_repository.create_task(
name="process_data",
user_id="user123",
parent_id=task1.id,
dependencies=[{"id": task1.id, "required": True}], # Waits for task1
priority=2,
inputs={"data": []} # Will be populated from task1 result
)
task3 = await task_manager.task_repository.create_task(
name="save_results",
user_id="user123",
parent_id=task1.id,
dependencies=[{"id": task2.id, "required": True}], # Waits for task2
priority=3
)
# Build tree
root = TaskTreeNode(task1)
root.add_child(TaskTreeNode(task2))
root.add_child(TaskTreeNode(task3))
# Execute (order: task1 → task2 → task3)
await task_manager.distribute_task_tree(root)
Pattern 3: Parallel Tasks¶
# Create root
root_task = await task_manager.task_repository.create_task(
name="root",
user_id="user123",
priority=1
)
# Create parallel tasks (no dependencies between them)
task1 = await task_manager.task_repository.create_task(
name="task1",
user_id="user123",
parent_id=root_task.id,
priority=2
)
task2 = await task_manager.task_repository.create_task(
name="task2",
user_id="user123",
parent_id=root_task.id,
priority=2 # Same priority, no dependencies = parallel
)
task3 = await task_manager.task_repository.create_task(
name="task3",
user_id="user123",
parent_id=root_task.id,
priority=2
)
# Build tree
root = TaskTreeNode(root_task)
root.add_child(TaskTreeNode(task1))
root.add_child(TaskTreeNode(task2))
root.add_child(TaskTreeNode(task3))
# Execute (all three run in parallel)
await task_manager.distribute_task_tree(root)
Pattern 4: Error Handling¶
# Execute task tree
await task_manager.distribute_task_tree(task_tree)
# Check all tasks for errors
def check_task_status(task_id):
task = await task_manager.task_repository.get_task_by_id(task_id)
if task.status == "failed":
print(f"Task {task_id} failed: {task.error}")
return False
elif task.status == "completed":
print(f"Task {task_id} completed: {task.result}")
return True
return None
# Check root task
root_status = check_task_status(root_task.id)
# Check all children
for child in task_tree.children:
check_task_status(child.task.id)
Pattern 5: Using TaskExecutor¶
from apflow.core.execution.task_executor import TaskExecutor
# Get singleton instance
executor = TaskExecutor()
# Execute tasks from definitions
tasks = [
{
"id": "task1",
"name": "my_executor",
"user_id": "user123",
"inputs": {"key": "value"}
},
{
"id": "task2",
"name": "my_executor",
"user_id": "user123",
"parent_id": "task1",
"dependencies": [{"id": "task1", "required": True}],
"inputs": {"key": "value2"}
}
]
# Execute
result = await executor.execute_tasks(
tasks=tasks,
root_task_id="root_123",
use_streaming=False
)
print(f"Execution result: {result}")
Pattern 6: Custom Executor with Error Handling¶
from apflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class RobustExecutor(BaseTask):
id = "robust_executor"
name = "Robust Executor"
description = "Executor with comprehensive error handling"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
try:
# Validate inputs
if not inputs.get("data"):
return {
"status": "failed",
"error": "data is required",
"error_type": "validation_error"
}
# Process
result = self._process(inputs["data"])
return {
"status": "completed",
"result": result
}
except ValueError as e:
return {
"status": "failed",
"error": str(e),
"error_type": "ValueError"
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__
}
def _process(self, data):
# Your processing logic
return {"processed": data}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"data": {"type": "string", "description": "Data to process"}
},
"required": ["data"]
}
A2A Protocol Integration¶
apflow implements the A2A (Agent-to-Agent) Protocol standard, allowing seamless integration with other A2A-compatible agents and services.
Using A2A Client SDK¶
The A2A protocol provides an official client SDK for easy integration. Here's how to use it with apflow:
Installation¶
Basic Example¶
from a2a.client import ClientFactory, ClientConfig
from a2a.types import Message, DataPart, Role, AgentCard
import httpx
import uuid
import asyncio
async def execute_task_via_a2a():
# Create HTTP client
httpx_client = httpx.AsyncClient(base_url="http://localhost:8000")
# Create A2A client config
config = ClientConfig(
streaming=True,
polling=False,
httpx_client=httpx_client
)
# Create client factory
factory = ClientFactory(config=config)
# Fetch agent card
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
card_response = await httpx_client.get(AGENT_CARD_WELL_KNOWN_PATH)
agent_card = AgentCard(**card_response.json())
# Create A2A client
client = factory.create(card=agent_card)
# Prepare task data
task_data = {
"id": "task-1",
"name": "My Task",
"user_id": "user123",
"schemas": {
"method": "system_info_executor"
},
"inputs": {}
}
# Create A2A message
data_part = DataPart(kind="data", data={"tasks": [task_data]})
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part]
)
# Send message and process responses
async for response in client.send_message(message):
if isinstance(response, Message):
for part in response.parts:
if part.kind == "data" and isinstance(part.data, dict):
result = part.data
print(f"Status: {result.get('status')}")
print(f"Progress: {result.get('progress')}")
await httpx_client.aclose()
# Run
asyncio.run(execute_task_via_a2a())
Push Notification Configuration¶
You can use push notifications to receive task execution updates via callback URL:
from a2a.types import Configuration, PushNotificationConfig
# Create push notification config
push_config = PushNotificationConfig(
url="https://your-server.com/callback",
headers={
"Authorization": "Bearer your-token"
}
)
# Create configuration
configuration = Configuration(
push_notification_config=push_config
)
# Create message with configuration
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part],
configuration=configuration
)
# Send message - server will use callback mode
async for response in client.send_message(message):
# Initial response only
print(f"Initial response: {response}")
break
The server will send task status updates to your callback URL as the task executes.
Cancelling Tasks¶
You can cancel a running task using the A2A Protocol cancel method:
from a2a.client import ClientFactory, ClientConfig
from a2a.types import Message, DataPart, Role, AgentCard, RequestContext
import httpx
import uuid
import asyncio
async def cancel_task_via_a2a():
# Create HTTP client
httpx_client = httpx.AsyncClient(base_url="http://localhost:8000")
# Create A2A client config
config = ClientConfig(
streaming=True,
polling=False,
httpx_client=httpx_client
)
# Create client factory
factory = ClientFactory(config=config)
# Fetch agent card
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
card_response = await httpx_client.get(AGENT_CARD_WELL_KNOWN_PATH)
agent_card = AgentCard(**card_response.json())
# Create A2A client
client = factory.create(card=agent_card)
# Create cancel request
# Task ID can be provided in multiple ways (priority order):
# 1. task_id in RequestContext
# 2. context_id in RequestContext
# 3. metadata.task_id
# 4. metadata.context_id
cancel_message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[], # Empty parts for cancel
task_id="my-running-task", # Task ID to cancel
metadata={
"error_message": "User requested cancellation" # Optional custom message
}
)
# Send cancel request and process responses
async for response in client.send_message(cancel_message):
if isinstance(response, Message):
for part in response.parts:
if part.kind == "data" and isinstance(part.data, dict):
result = part.data
status = result.get("status")
if status == "cancelled":
print(f"Task cancelled successfully: {result.get('message')}")
if "token_usage" in result:
print(f"Token usage: {result['token_usage']}")
if "result" in result:
print(f"Partial result: {result['result']}")
elif status == "failed":
print(f"Cancellation failed: {result.get('error', result.get('message'))}")
await httpx_client.aclose()
# Run
asyncio.run(cancel_task_via_a2a())
Notes: - The cancel() method sends a TaskStatusUpdateEvent through the EventQueue - Task ID extraction follows priority: task_id > context_id > metadata.task_id > metadata.context_id - If the task supports cancellation, the executor's cancel() method will be called - Token usage and partial results are preserved if available - The response includes protocol: "a2a" in the event data
Streaming Mode¶
Enable streaming mode to receive real-time progress updates:
# Create message with streaming enabled via metadata
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part],
metadata={"stream": True}
)
# Send message - will receive multiple updates
async for response in client.send_message(message):
if isinstance(response, Message):
# Process streaming updates
for part in response.parts:
if part.kind == "data":
update = part.data
print(f"Update: {update}")
Task Tree Execution¶
Execute complex task trees with dependencies:
tasks = [
{
"id": "parent-task",
"name": "Parent Task",
"user_id": "user123",
"dependencies": [
{"id": "child-1", "required": True},
{"id": "child-2", "required": True}
],
"schemas": {
"method": "aggregate_results_executor"
},
"inputs": {}
},
{
"id": "child-1",
"name": "Child Task 1",
"parent_id": "parent-task",
"user_id": "user123",
"schemas": {
"method": "system_info_executor"
},
"inputs": {"resource": "cpu"}
},
{
"id": "child-2",
"name": "Child Task 2",
"parent_id": "parent-task",
"user_id": "user123",
"dependencies": [{"id": "child-1", "required": True}],
"schemas": {
"method": "system_info_executor"
},
"inputs": {"resource": "memory"}
}
]
# Create message with task tree
data_part = DataPart(kind="data", data={"tasks": tasks})
message = Message(
message_id=str(uuid.uuid4()),
role=Role.user,
parts=[data_part]
)
# Execute task tree
async for response in client.send_message(message):
# Process responses
pass
A2A Protocol Documentation¶
For detailed information about the A2A Protocol, please refer to the official documentation:
- A2A Protocol Official Documentation: https://www.a2aprotocol.org/en/docs
- A2A Protocol Homepage: https://www.a2aprotocol.org