Custom Tasks Guide¶
Learn how to create your own custom executors (tasks) in aipartnerupflow. This guide will walk you through everything from simple tasks to advanced patterns.
What You'll Learn¶
- ✅ How to create custom executors
- ✅ How to register and use them
- ✅ Input validation with JSON Schema
- ✅ Error handling best practices
- ✅ Common patterns and examples
- ✅ Testing your custom tasks
Table of Contents¶
- Quick Start
- Understanding Executors
- Creating Your First Executor
- Required Components
- Input Schema
- Error Handling
- Common Patterns
- Advanced Features
- Best Practices
- Testing
Quick Start¶
The fastest way to create a custom executor:
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class MyFirstExecutor(BaseTask):
"""A simple custom executor"""
id = "my_first_executor"
name = "My First Executor"
description = "Does something useful"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the task"""
result = f"Processed: {inputs.get('data', 'no data')}"
return {"status": "completed", "result": result}
def get_input_schema(self) -> Dict[str, Any]:
"""Define input parameters"""
return {
"type": "object",
"properties": {
"data": {"type": "string", "description": "Input data"}
},
"required": ["data"]
}
That's it! Just import it and use it:
# Import to register
from my_module import MyFirstExecutor
# Use it
task = await task_manager.task_repository.create_task(
name="my_first_executor", # Must match id
user_id="user123",
inputs={"data": "Hello!"}
)
Understanding Executors¶
What is an Executor?¶
An executor is a piece of code that performs a specific task. Think of it as a function that: - Takes inputs (parameters) - Does some work - Returns a result
Example: - An executor that fetches data from an API - An executor that processes files - An executor that sends emails - An executor that runs AI models
Executor vs Task¶
Executor: The code that does the work (reusable) Task: An instance of work to be done (specific execution)
Analogy: - Executor = A recipe (reusable template) - Task = A specific meal made from the recipe (one-time execution)
BaseTask vs ExecutableTask¶
BaseTask: Recommended base class (simpler, includes registration)
from aipartnerupflow import BaseTask, executor_register
@executor_register()
class MyTask(BaseTask):
id = "my_task"
# ...
ExecutableTask: Lower-level interface (more control)
from aipartnerupflow import ExecutableTask
class MyTask(ExecutableTask):
@property
def id(self) -> str:
return "my_task"
# ...
Recommendation: Use BaseTask with @executor_register() - it's simpler!
Creating Your First Executor¶
Let's create a complete, working example step by step.
Step 1: Create the Executor Class¶
Create a file greeting_executor.py:
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class GreetingExecutor(BaseTask):
"""Creates personalized greetings"""
id = "greeting_executor"
name = "Greeting Executor"
description = "Creates personalized greeting messages"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute greeting creation"""
name = inputs.get("name", "Guest")
language = inputs.get("language", "en")
greetings = {
"en": f"Hello, {name}!",
"es": f"¡Hola, {name}!",
"fr": f"Bonjour, {name}!"
}
return {
"greeting": greetings.get(language, greetings["en"]),
"name": name,
"language": language
}
def get_input_schema(self) -> Dict[str, Any]:
"""Define input parameters"""
return {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the person to greet"
},
"language": {
"type": "string",
"enum": ["en", "es", "fr", "zh"],
"description": "Language for the greeting",
"default": "en"
}
},
"required": ["name"]
}
Step 2: Use Your Executor¶
Create a file use_greeting.py:
import asyncio
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
# Import to register the executor
from greeting_executor import GreetingExecutor
async def main():
db = create_session()
task_manager = TaskManager(db)
# Create task using your executor
task = await task_manager.task_repository.create_task(
name="greeting_executor", # Must match executor id
user_id="user123",
inputs={
"name": "Alice",
"language": "en"
}
)
# Execute
task_tree = TaskTreeNode(task)
await task_manager.distribute_task_tree(task_tree)
# Get result
result = await task_manager.task_repository.get_task_by_id(task.id)
print(f"Greeting: {result.result['greeting']}")
if __name__ == "__main__":
asyncio.run(main())
Step 3: Run It¶
Expected Output:
Congratulations! You just created and used your first custom executor! 🎉
Required Components¶
Every executor must have these components:
1. Unique ID¶
Purpose: Identifies the executor (used when creating tasks)
Best Practices: - Use lowercase with underscores - Be descriptive: fetch_user_data not task1 - Keep it consistent: don't change after deployment
2. Display Name¶
Purpose: Human-readable name
3. Description¶
Purpose: Explains what the executor does
4. Execute Method¶
Purpose: The actual work happens here
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute the task
Args:
inputs: Input parameters (from task.inputs)
Returns:
Execution result dictionary
"""
# Your logic here
return {"status": "completed", "result": "..."}
Key Points: - Must be async - Receives inputs dictionary - Returns a dictionary - Can raise exceptions (will be caught by TaskManager)
5. Input Schema¶
Purpose: Defines what inputs are expected (for validation)
def get_input_schema(self) -> Dict[str, Any]:
"""
Return JSON Schema for input parameters
Returns:
JSON Schema dictionary
"""
return {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "Parameter description"
}
},
"required": ["param1"]
}
Input Schema¶
Input schemas use JSON Schema format to define and validate inputs.
Basic Schema¶
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"}
},
"required": ["name"]
}
Common Field Types¶
String¶
Integer¶
Boolean¶
Array¶
"items": {
"type": "array",
"items": {"type": "string"},
"description": "List of items",
"minItems": 1
}
Object¶
"config": {
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"type": "string"}
},
"description": "Configuration object"
}
Enum (Limited Choices)¶
"status": {
"type": "string",
"enum": ["pending", "active", "completed"],
"description": "Task status",
"default": "pending"
}
Default Values¶
Provide defaults for optional parameters:
"timeout": {
"type": "integer",
"description": "Timeout in seconds",
"default": 30 # Used if not provided
}
Required Fields¶
Specify which fields are required:
{
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"}
},
"required": ["name", "email"] # Both required
}
Complete Schema Example¶
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "API endpoint URL",
"format": "uri"
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE"],
"description": "HTTP method",
"default": "GET"
},
"headers": {
"type": "object",
"description": "HTTP headers",
"additionalProperties": {"type": "string"}
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds",
"minimum": 1,
"maximum": 300,
"default": 30
},
"retry": {
"type": "boolean",
"description": "Whether to retry on failure",
"default": false
}
},
"required": ["url"]
}
Error Handling¶
Returning Errors (Recommended)¶
Return error information in the result:
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
try:
result = perform_operation(inputs)
return {"status": "completed", "result": result}
except ValueError as e:
return {
"status": "failed",
"error": str(e),
"error_type": "validation_error"
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"error_type": "execution_error"
}
Benefits: - More control over error format - Can include additional context - Task status will be "failed"
Raising Exceptions¶
You can also raise exceptions (TaskManager will catch them):
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
if not inputs.get("required_param"):
raise ValueError("required_param is required")
# Continue with execution
return {"status": "completed", "result": "..."}
Note: TaskManager will catch exceptions and mark the task as "failed".
Best Practices¶
- Validate early: Check inputs at the start
- Return meaningful errors: Include error type and message
- Handle specific exceptions: Catch specific errors, not just
Exception - Include context: Add relevant information to error messages
Example:
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
url = inputs.get("url")
if not url:
return {
"status": "failed",
"error": "URL is required",
"error_type": "validation_error",
"field": "url"
}
if not isinstance(url, str):
return {
"status": "failed",
"error": "URL must be a string",
"error_type": "type_error",
"field": "url",
"received_type": type(url).__name__
}
# Continue with execution
try:
result = await fetch_url(url)
return {"status": "completed", "result": result}
except TimeoutError:
return {
"status": "failed",
"error": f"Request to {url} timed out",
"error_type": "timeout_error"
}
Common Patterns¶
Pattern 1: HTTP API Call¶
import aiohttp
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class APICallExecutor(BaseTask):
"""Calls an external HTTP API"""
id = "api_call_executor"
name = "API Call Executor"
description = "Calls an external HTTP API"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
url = inputs.get("url")
method = inputs.get("method", "GET")
headers = inputs.get("headers", {})
timeout = inputs.get("timeout", 30)
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method,
url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
data = await response.json() if response.content_type == "application/json" else await response.text()
return {
"status": "completed",
"status_code": response.status,
"data": data
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {"type": "string", "description": "API URL"},
"method": {"type": "string", "enum": ["GET", "POST"], "default": "GET"},
"headers": {"type": "object"},
"timeout": {"type": "integer", "default": 30}
},
"required": ["url"]
}
Pattern 2: Data Processing¶
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class DataProcessor(BaseTask):
"""Processes data"""
id = "data_processor"
name = "Data Processor"
description = "Processes data with various operations"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
data = inputs.get("data", [])
operation = inputs.get("operation", "sum")
if not isinstance(data, list):
return {
"status": "failed",
"error": "Data must be a list",
"error_type": "validation_error"
}
if operation == "sum":
result = sum(data)
elif operation == "average":
result = sum(data) / len(data) if data else 0
elif operation == "max":
result = max(data) if data else None
elif operation == "min":
result = min(data) if data else None
else:
return {
"status": "failed",
"error": f"Unknown operation: {operation}",
"error_type": "validation_error"
}
return {
"status": "completed",
"operation": operation,
"result": result,
"input_count": len(data)
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {"type": "number"},
"description": "Array of numbers"
},
"operation": {
"type": "string",
"enum": ["sum", "average", "max", "min"],
"default": "sum"
}
},
"required": ["data"]
}
Pattern 3: File Operations¶
import aiofiles
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class FileReader(BaseTask):
"""Reads files"""
id = "file_reader"
name = "File Reader"
description = "Reads content from files"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
file_path = inputs.get("file_path")
if not file_path:
return {
"status": "failed",
"error": "file_path is required",
"error_type": "validation_error"
}
try:
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
return {
"status": "completed",
"file_path": file_path,
"content": content,
"size": len(content)
}
except FileNotFoundError:
return {
"status": "failed",
"error": f"File not found: {file_path}",
"error_type": "file_not_found"
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to file"
}
},
"required": ["file_path"]
}
Pattern 4: Database Query¶
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class DatabaseQuery(BaseTask):
"""Executes database queries"""
id = "db_query"
name = "Database Query"
description = "Executes database queries"
def __init__(self):
super().__init__()
# Initialize database connection
# self.db = create_db_connection()
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
query = inputs.get("query")
params = inputs.get("params", {})
if not query:
return {
"status": "failed",
"error": "query is required",
"error_type": "validation_error"
}
try:
# Execute query
# result = await self.db.fetch(query, params)
result = [] # Placeholder
return {
"status": "completed",
"rows": result,
"count": len(result)
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"error_type": "database_error"
}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL query"},
"params": {"type": "object", "description": "Query parameters"}
},
"required": ["query"]
}
Advanced Features¶
Cancellation Support¶
Implement cancellation for long-running tasks:
class CancellableTask(BaseTask):
cancelable: bool = True # Mark as cancellable
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
self._cancelled = False
for i in range(100):
# Check for cancellation
if self._cancelled:
return {
"status": "cancelled",
"message": "Task was cancelled",
"progress": i
}
# Do work
await asyncio.sleep(0.1)
return {"status": "completed", "result": "done"}
async def cancel(self) -> Dict[str, Any]:
"""Cancel task execution"""
self._cancelled = True
return {
"status": "cancelled",
"message": "Cancellation requested"
}
Note: Not all executors need cancellation. Only implement if your task can be safely cancelled.
Accessing Task Context¶
Access task information through the executor:
class ContextAwareTask(BaseTask):
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
# Task context is available through TaskManager
# You can access it via hooks or by storing task reference
# Example: Access task ID (if available)
# task_id = getattr(self, '_task_id', None)
return {"status": "completed"}
Best Practices¶
1. Keep Tasks Focused¶
Good:
Bad:
2. Validate Inputs Early¶
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
# Validate at the start
url = inputs.get("url")
if not url:
return {"status": "failed", "error": "URL is required"}
if not isinstance(url, str):
return {"status": "failed", "error": "URL must be a string"}
# Continue with execution
3. Use Async Properly¶
Good:
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
return {"status": "completed", "data": data}
Bad:
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
import requests
response = requests.get(url) # Blocking!
return {"status": "completed", "data": response.json()}
4. Document Your Tasks¶
class MyCustomTask(BaseTask):
"""
Custom task that performs specific operations.
This task processes input data and returns processed results.
It supports various processing modes and configurations.
Example:
task = create_task(
name="my_custom_task",
inputs={"data": [1, 2, 3], "mode": "sum"}
)
"""
5. Return Consistent Results¶
# Good: Consistent format
return {
"status": "completed",
"result": result,
"metadata": {...}
}
# Bad: Inconsistent format
return result # Sometimes just the result
return {"data": result} # Sometimes wrapped
Testing¶
Unit Testing¶
Test your executor in isolation:
import pytest
from my_executors import GreetingExecutor
@pytest.mark.asyncio
async def test_greeting_executor():
executor = GreetingExecutor()
# Test with valid inputs
result = await executor.execute({
"name": "Alice",
"language": "en"
})
assert result["status"] == "completed"
assert "Hello, Alice!" in result["greeting"]
# Test with default language
result = await executor.execute({"name": "Bob"})
assert result["language"] == "en"
# Test with invalid language
result = await executor.execute({
"name": "Charlie",
"language": "invalid"
})
# Should handle gracefully
Integration Testing¶
Test with TaskManager:
import pytest
from aipartnerupflow import TaskManager, TaskTreeNode, create_session
from my_executors import GreetingExecutor
@pytest.mark.asyncio
async def test_executor_integration():
# Import to register
from my_executors import GreetingExecutor
db = create_session()
task_manager = TaskManager(db)
# Create and execute task
task = await task_manager.task_repository.create_task(
name="greeting_executor",
user_id="test_user",
inputs={"name": "Test User", "language": "en"}
)
task_tree = TaskTreeNode(task)
await task_manager.distribute_task_tree(task_tree)
# Verify result
result = await task_manager.task_repository.get_task_by_id(task.id)
assert result.status == "completed"
assert "Test User" in result.result["greeting"]
Next Steps¶
- Task Orchestration Guide - Learn how to orchestrate multiple tasks
- Basic Examples - More practical examples
- Best Practices Guide - Advanced techniques
- API Reference - Complete API documentation
Need help? Check the FAQ or Quick Start Guide