Real-World Examples¶
Complete, runnable examples for common real-world use cases. These examples demonstrate how to use aipartnerupflow in production scenarios.
Table of Contents¶
- Data Processing Pipeline
- API Integration Workflow
- Batch Processing with Dependencies
- Error Handling and Retry
- Multi-Step Workflow
Data Processing Pipeline¶
Scenario¶
Process data from multiple sources, transform it, and save results.
Complete Example¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Step 1: Fetch data from multiple sources
fetch_api = await task_manager.task_repository.create_task(
name="rest_executor", # Use built-in REST executor
user_id="user123",
priority=1,
inputs={
"url": "https://api.example.com/data",
"method": "GET"
}
)
fetch_db = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
priority=1,
inputs={
"command": "psql -c 'SELECT * FROM users LIMIT 100'"
}
)
# Step 2: Process data (depends on both fetches)
process_task = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=fetch_api.id,
dependencies=[
{"id": fetch_api.id, "required": True},
{"id": fetch_db.id, "required": True}
],
priority=2,
inputs={
"command": "python process_data.py"
}
)
# Step 3: Save results
save_task = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=fetch_api.id,
dependencies=[{"id": process_task.id, "required": True}],
priority=3,
inputs={
"command": "python save_results.py"
}
)
# Build tree
root = TaskTreeNode(fetch_api)
root.add_child(TaskTreeNode(fetch_db))
root.add_child(TaskTreeNode(process_task))
root.add_child(TaskTreeNode(save_task))
# Execute
await task_manager.distribute_task_tree(root)
# Check results
final_result = await task_manager.task_repository.get_task_by_id(save_task.id)
print(f"Pipeline completed: {final_result.status}")
if __name__ == "__main__":
asyncio.run(main())
Execution Flow¶
API Integration Workflow¶
Scenario¶
Call multiple APIs, aggregate results, and send notifications.
Complete Example¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Call multiple APIs in parallel
api1 = await task_manager.task_repository.create_task(
name="http_request_executor",
user_id="user123",
priority=1,
inputs={
"url": "https://api.service1.com/data",
"method": "GET"
}
)
api2 = await task_manager.task_repository.create_task(
name="http_request_executor",
user_id="user123",
priority=1,
inputs={
"url": "https://api.service2.com/data",
"method": "GET"
}
)
api3 = await task_manager.task_repository.create_task(
name="http_request_executor",
user_id="user123",
priority=1,
inputs={
"url": "https://api.service3.com/data",
"method": "GET"
}
)
# Aggregate results
aggregate = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=api1.id,
dependencies=[
{"id": api1.id, "required": True},
{"id": api2.id, "required": True},
{"id": api3.id, "required": True}
],
priority=2,
inputs={
"command": "python aggregate.py"
}
)
# Send notification
notify = await task_manager.task_repository.create_task(
name="http_request_executor",
user_id="user123",
parent_id=api1.id,
dependencies=[{"id": aggregate.id, "required": True}],
priority=3,
inputs={
"url": "https://api.notification.com/send",
"method": "POST",
"body": {"message": "Processing complete"}
}
)
# Build tree
root = TaskTreeNode(api1)
root.add_child(TaskTreeNode(api2))
root.add_child(TaskTreeNode(api3))
root.add_child(TaskTreeNode(aggregate))
root.add_child(TaskTreeNode(notify))
# Execute
await task_manager.distribute_task_tree(root)
print("API integration workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
Execution Flow¶
Batch Processing with Dependencies¶
Scenario¶
Process a batch of items where each item depends on the previous one.
Complete Example¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Create batch of items
items = ["item1", "item2", "item3", "item4", "item5"]
tasks = []
# Create first task (no dependencies)
first_task = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
priority=1,
inputs={"command": f"process {items[0]}"}
)
tasks.append(first_task)
# Create remaining tasks (each depends on previous)
for i in range(1, len(items)):
task = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=first_task.id,
dependencies=[{"id": tasks[i-1].id, "required": True}],
priority=1 + i,
inputs={"command": f"process {items[i]}"}
)
tasks.append(task)
# Build sequential chain
root = TaskTreeNode(tasks[0])
for i in range(1, len(tasks)):
root.add_child(TaskTreeNode(tasks[i]))
# Execute
await task_manager.distribute_task_tree(root)
# Check final task
final_result = await task_manager.task_repository.get_task_by_id(tasks[-1].id)
print(f"Batch processing completed: {final_result.status}")
if __name__ == "__main__":
asyncio.run(main())
Execution Flow¶
Error Handling and Retry¶
Scenario¶
Handle failures gracefully with fallback tasks.
Complete Example¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Primary task (may fail)
primary = await task_manager.task_repository.create_task(
name="http_request_executor",
user_id="user123",
priority=1,
inputs={
"url": "https://unreliable-api.com/data",
"method": "GET"
}
)
# Fallback task (runs even if primary fails)
fallback = await task_manager.task_repository.create_task(
name="http_request_executor",
user_id="user123",
parent_id=primary.id,
dependencies=[{"id": primary.id, "required": False}], # Optional dependency
priority=2,
inputs={
"url": "https://backup-api.com/data",
"method": "GET"
}
)
# Final task (works with either primary or fallback)
final = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=primary.id,
dependencies=[
{"id": primary.id, "required": False}, # Optional
{"id": fallback.id, "required": False} # Optional
],
priority=3,
inputs={"command": "python process_result.py"}
)
# Build tree
root = TaskTreeNode(primary)
root.add_child(TaskTreeNode(fallback))
root.add_child(TaskTreeNode(final))
# Execute
await task_manager.distribute_task_tree(root)
# Check results
primary_result = await task_manager.task_repository.get_task_by_id(primary.id)
fallback_result = await task_manager.task_repository.get_task_by_id(fallback.id)
final_result = await task_manager.task_repository.get_task_by_id(final.id)
print(f"Primary: {primary_result.status}")
print(f"Fallback: {fallback_result.status}")
print(f"Final: {final_result.status}")
if __name__ == "__main__":
asyncio.run(main())
Execution Flow¶
Multi-Step Workflow¶
Scenario¶
Complex workflow with multiple stages and parallel processing.
Complete Example¶
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
async def main():
db = create_session()
task_manager = TaskManager(db)
# Stage 1: Data Collection (parallel)
collect1 = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
priority=1,
inputs={"resource": "cpu"}
)
collect2 = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
priority=1,
inputs={"resource": "memory"}
)
collect3 = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
priority=1,
inputs={"resource": "disk"}
)
# Stage 2: Processing (depends on collection)
process1 = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=collect1.id,
dependencies=[{"id": collect1.id, "required": True}],
priority=2,
inputs={"command": "python process_cpu.py"}
)
process2 = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=collect2.id,
dependencies=[{"id": collect2.id, "required": True}],
priority=2,
inputs={"command": "python process_memory.py"}
)
process3 = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=collect3.id,
dependencies=[{"id": collect3.id, "required": True}],
priority=2,
inputs={"command": "python process_disk.py"}
)
# Stage 3: Aggregation (depends on all processing)
aggregate = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=collect1.id,
dependencies=[
{"id": process1.id, "required": True},
{"id": process2.id, "required": True},
{"id": process3.id, "required": True}
],
priority=3,
inputs={"command": "python aggregate.py"}
)
# Stage 4: Finalization (depends on aggregation)
finalize = await task_manager.task_repository.create_task(
name="command_executor",
user_id="user123",
parent_id=collect1.id,
dependencies=[{"id": aggregate.id, "required": True}],
priority=4,
inputs={"command": "python finalize.py"}
)
# Build tree
root = TaskTreeNode(collect1)
root.add_child(TaskTreeNode(collect2))
root.add_child(TaskTreeNode(collect3))
root.add_child(TaskTreeNode(process1))
root.add_child(TaskTreeNode(process2))
root.add_child(TaskTreeNode(process3))
root.add_child(TaskTreeNode(aggregate))
root.add_child(TaskTreeNode(finalize))
# Execute
await task_manager.distribute_task_tree(root)
# Check final result
final_result = await task_manager.task_repository.get_task_by_id(finalize.id)
print(f"Multi-step workflow completed: {final_result.status}")
if __name__ == "__main__":
asyncio.run(main())
Execution Flow¶
Collect1 ──→ Process1 ──┐
Collect2 ──→ Process2 ──├──→ Aggregate ──→ Finalize
Collect3 ──→ Process3 ──┘
Best Practices¶
1. Use Meaningful Task Names¶
# Good
name="fetch_user_data"
name="process_payment"
name="send_notification"
# Bad
name="task1"
name="task2"
2. Set Appropriate Priorities¶
# Critical tasks first
priority=0 # Highest priority
# Normal tasks
priority=2 # Default
# Background tasks
priority=3 # Lower priority
3. Handle Errors Gracefully¶
4. Keep Trees Manageable¶
- 3-5 levels deep
- 10-20 tasks per tree
- Use sub-trees for complex workflows
5. Monitor Task Status¶
# Check task status after execution
task = await task_manager.task_repository.get_task_by_id(task_id)
if task.status == "failed":
print(f"Error: {task.error}")
Next Steps¶
- Task Orchestration Guide - Learn more about orchestration patterns
- Best Practices - Design patterns and optimization
- Custom Tasks - Create your own executors