Frequently Asked Questions (FAQ)¶
Common questions and answers about aipartnerupflow.
Table of Contents¶
Getting Started¶
Q: How do I install aipartnerupflow?¶
A: Use pip:
# Minimal installation (core only)
pip install aipartnerupflow
# Full installation (all features)
pip install aipartnerupflow[all]
# Specific features
pip install aipartnerupflow[crewai] # LLM support
pip install aipartnerupflow[cli] # CLI tools
pip install aipartnerupflow[a2a] # A2A Protocol server
Q: Do I need to set up a database?¶
A: No! DuckDB is used by default and requires no setup. It just works out of the box.
If you want to use PostgreSQL:
Q: What Python version do I need?¶
A: Python 3.10 or higher (3.12+ recommended).
Q: Where should I start?¶
A: 1. Quick Start Guide - Get running in 10 minutes 2. First Steps Tutorial - Complete beginner tutorial 3. Core Concepts - Understand the fundamentals
Tasks and Executors¶
Q: What's the difference between a Task and an Executor?¶
A: - Executor: The code that does the work (reusable template) - Task: An instance of work to be done (specific execution)
Analogy: - Executor = A recipe (reusable) - Task = A specific meal made from the recipe (one-time)
Q: How do I create a custom executor?¶
A: Use BaseTask with @executor_register():
from aipartnerupflow import BaseTask, executor_register
from typing import Dict, Any
@executor_register()
class MyExecutor(BaseTask):
id = "my_executor"
name = "My Executor"
description = "Does something"
async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "completed", "result": "..."}
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {},
"required": []
}
See the Custom Tasks Guide for details.
Q: How do I use my custom executor?¶
A: 1. Import it (this registers it automatically):
- Use it when creating tasks:
Q: My executor isn't being found. What's wrong?¶
A: Common issues:
-
Not imported: Make sure you import the executor class:
-
Wrong name: The
namefield must match the executorid: -
Not registered: Make sure you use
@executor_register():
Q: Can I use built-in executors?¶
A: Yes! Built-in executors are automatically available:
Core Executors (always available): - system_info_executor - Get system information (CPU, memory, disk) - command_executor - Execute shell commands (requires security config) - aggregate_results_executor - Aggregate results from multiple tasks
Remote Execution Executors: - rest_executor - HTTP/REST API calls (requires pip install aipartnerupflow[http]) - ssh_executor - Remote command execution via SSH (requires pip install aipartnerupflow[ssh]) - grpc_executor - gRPC service calls (requires pip install aipartnerupflow[grpc]) - websocket_executor - Bidirectional WebSocket communication - apflow_api_executor - Call other aipartnerupflow API instances - mcp_executor - Model Context Protocol executor (stdio: no dependencies, HTTP: requires [a2a])
MCP Server: - MCP Server exposes aipartnerupflow task orchestration as MCP tools and resources - Start with: AIPARTNERUPFLOW_API_PROTOCOL=mcp python -m aipartnerupflow.api.main - Provides 8 MCP tools: execute_task, create_task, get_task, update_task, delete_task, list_tasks, get_task_status, cancel_task - Provides 2 MCP resources: task://{task_id}, tasks:// - Supports both HTTP and stdio transport modes
Container Executors: - docker_executor - Containerized command execution (requires pip install aipartnerupflow[docker])
AI Executors (optional): - crewai_executor - LLM-based agents (requires pip install aipartnerupflow[crewai]) - batch_crewai_executor - Batch execution of multiple crews (requires pip install aipartnerupflow[crewai])
Generation Executors: - generate_executor - Generate task tree JSON arrays from natural language requirements using LLM (requires pip install openai or pip install anthropic)
Just use them by name:
task = await task_manager.task_repository.create_task(
name="system_info_executor",
user_id="user123",
inputs={"resource": "cpu"}
)
For more details on all executors, see the Custom Tasks Guide.
Task Orchestration¶
Q: What's the difference between parent_id and dependencies?¶
A: This is a critical distinction!
parent_id: Organizational only (like folders) - does NOT affect execution orderdependencies: Controls execution order - determines when tasks run
Example:
# Task B is a child of Task A (organizational)
# But Task B depends on Task C (execution order)
task_a = create_task(name="task_a")
task_c = create_task(name="task_c")
task_b = create_task(
name="task_b",
parent_id=task_a.id, # Organizational
dependencies=[{"id": task_c.id, "required": True}] # Execution order
)
# Execution: C runs first, then B (regardless of parent-child)
See Task Orchestration Guide for details.
Q: How do I make tasks run in parallel?¶
A: Don't add dependencies between them:
# Task 1 (no dependencies)
task1 = create_task(name="task1", ...)
# Task 2 (no dependencies on task1)
task2 = create_task(name="task2", ...)
# Both run in parallel!
Q: How do I make tasks run sequentially?¶
A: Add dependencies:
# Task 1
task1 = create_task(name="task1", ...)
# Task 2 depends on Task 1
task2 = create_task(
name="task2",
dependencies=[{"id": task1.id, "required": True}],
...
)
# Execution order: Task 1 → Task 2
Q: How do priorities work?¶
A: Lower numbers = higher priority = execute first:
Important: Dependencies take precedence over priorities!
Q: My task is stuck in "pending" status. Why?¶
A: Common causes:
- Dependencies not satisfied: Check if dependency tasks are completed
- Executor not found: Verify task name matches executor ID
- Task not executed: Make sure you called
distribute_task_tree() - Parent task failed: Check parent task status
Debug:
task = await task_manager.task_repository.get_task_by_id(task_id)
print(f"Status: {task.status}")
print(f"Error: {task.error}")
print(f"Dependencies: {task.dependencies}")
Troubleshooting¶
Q: Task executor not found error¶
Error: Task executor not found: executor_id
Solutions: 1. For built-in executors: Make sure you've imported the extension:
- For custom executors:
- Make sure you used
@executor_register() - Import the executor class in your main script
- Verify the
namefield matches the executorid
Q: Database connection error¶
Error: Database connection issues
Solutions: - DuckDB (default): No setup needed! It just works. - PostgreSQL: Set environment variable:
Q: Import error¶
Error: ModuleNotFoundError: No module named 'aipartnerupflow'
Solution:
Q: Task stays in "in_progress" forever¶
Problem: Task never completes
Solutions: 1. Check if executor is hanging (infinite loop, waiting for resource) 2. Verify executor supports cancellation 3. Check for deadlocks in dependencies 4. Review executor implementation
Debug:
# Check task status
task = await task_manager.task_repository.get_task_by_id(task_id)
print(f"Status: {task.status}")
print(f"Error: {task.error}")
# Try cancelling
if task.status == "in_progress":
await task_manager.cancel_task(task_id, "Manual cancellation")
Q: Dependency not satisfied¶
Problem: Task waiting for dependency that never completes
Solutions: 1. Verify dependency task ID is correct 2. Check if dependency task failed 3. Ensure dependency task is in the same tree 4. Check dependency task status
Debug:
# Check dependency status
dependency = await task_manager.task_repository.get_task_by_id(dependency_id)
print(f"Dependency status: {dependency.status}")
print(f"Dependency error: {dependency.error}")
Q: Priority not working¶
Problem: Tasks not executing in expected order
Solutions: 1. Verify priority values (lower = higher priority) 2. Check if dependencies override priority (they do!) 3. Ensure tasks are at the same level in the tree 4. Remember: dependencies take precedence!
Advanced Topics¶
Q: How do I use CrewAI (LLM tasks)?¶
A:
-
Install:
-
Create a crew:
from aipartnerupflow.extensions.crewai import CrewManager from aipartnerupflow.core.extensions import get_registry crew = CrewManager( id="my_crew", name="My Crew", agents=[{"role": "Analyst", "goal": "Analyze data"}], tasks=[{"description": "Analyze: {text}", "agent": "Analyst"}] ) get_registry().register(crew) -
Use it:
Note: Requires LLM API key (OpenAI, Anthropic, etc.)
Q: How do I set LLM API keys?¶
A:
Option 1: Environment variable
Option 2: Request header (for API server)
Option 3: Configuration
Q: How do I handle task failures?¶
A:
-
Check task status:
-
Handle in executor:
-
Use optional dependencies for fallback:
Q: Can I cancel a running task?¶
A: Yes, if the executor supports cancellation:
result = await task_manager.cancel_task(
task_id="task_123",
error_message="User requested cancellation"
)
Note: Not all executors support cancellation. Check executor.cancelable property.
Q: How do I get real-time updates?¶
A: Use streaming execution:
# Execute with streaming
await task_manager.distribute_task_tree_with_streaming(
task_tree,
use_callback=True
)
Or use the API server with SSE (Server-Sent Events).
Q: How do I use hooks?¶
A:
Pre-execution hook:
from aipartnerupflow import register_pre_hook
@register_pre_hook
async def validate_inputs(task):
"""Validate inputs before execution"""
if task.inputs and "url" in task.inputs:
url = task.inputs["url"]
if not url.startswith(("http://", "https://")):
task.inputs["url"] = f"https://{url}"
Post-execution hook:
from aipartnerupflow import register_post_hook
@register_post_hook
async def log_results(task, inputs, result):
"""Log results after execution"""
print(f"Task {task.id} completed:")
print(f" Result: {result}")
Q: How do I extend TaskModel?¶
A:
from aipartnerupflow.core.storage.sqlalchemy.models import TaskModel
from aipartnerupflow import set_task_model_class
from sqlalchemy import Column, String
class CustomTaskModel(TaskModel):
"""Custom TaskModel with additional fields"""
__tablename__ = "apflow_tasks"
project_id = Column(String(255), nullable=True, index=True)
# Set before creating tasks
set_task_model_class(CustomTaskModel)
# Now tasks can have project_id
task = await task_manager.task_repository.create_task(
name="my_task",
user_id="user123",
project_id="proj-123", # Custom field
inputs={...}
)
Still Have Questions?¶
- Documentation Index - Browse all documentation
- Quick Start Guide - Get started quickly
- Examples - See practical examples
- GitHub Issues - Report bugs
- GitHub Discussions - Ask questions
Found an issue? Report it on GitHub