Hello World - 5-Minute Quick Start¶
Get started with apflow in under 5 minutes with this minimal example.
Installation¶
Your First Task (Python)¶
Create a file hello.py:
from apflow import executor_register, TaskBuilder, execute_tasks
from apflow.extensions import BaseTask
# Define a simple executor
@executor_register()
class HelloWorld(BaseTask):
id = "hello_world"
async def execute(self, inputs: dict) -> dict:
name = inputs.get("name", "World")
return {"message": f"Hello, {name}!"}
# Create and execute a task
if __name__ == "__main__":
task = TaskBuilder("greet_alice", "hello_world")\
.with_inputs({"name": "Alice"})\
.build()
result = execute_tasks([task])
print(result["message"]) # Output: Hello, Alice!
Run it:
Output:
That's it! You've created and executed your first apflow task.
Using the CLI¶
You can also use apflow via CLI without writing code:
1. Create a task module¶
tasks.py:
from apflow import executor_register
from apflow.extensions import BaseTask
@executor_register()
class Greeter(BaseTask):
id = "greeter"
async def execute(self, inputs: dict) -> dict:
return {"message": f"Hello, {inputs['name']}!"}
2. Execute via CLI¶
# Make sure tasks.py is imported (in same directory or PYTHONPATH)
apflow run greeter --inputs '{"name": "Bob"}'
Output:
What's Happening?¶
- @executor_register() - Registers your task executor with apflow
- BaseTask - Base class providing the
execute()interface - TaskBuilder - Fluent API for creating tasks
- execute_tasks() - Executes one or more tasks and returns results
Built-in Executors¶
Don't want to write custom code? Use built-in executors:
REST API Call¶
from apflow import TaskBuilder, execute_tasks
task = TaskBuilder("fetch_data", "rest_executor")\
.with_inputs({
"url": "https://api.github.com/users/octocat",
"method": "GET"
})\
.build()
result = execute_tasks([task])
print(result["json"]["login"]) # Output: octocat
Execute SSH Command¶
task = TaskBuilder("check_disk", "ssh_executor")\
.with_inputs({
"host": "example.com",
"username": "admin",
"key_file": "~/.ssh/id_rsa",
"command": "df -h"
})\
.build()
result = execute_tasks([task])
print(result["stdout"])
Task Dependencies¶
Create workflows with dependencies:
from apflow import TaskBuilder, execute_tasks
# Task 1: Fetch user data
fetch_task = TaskBuilder("fetch_user", "rest_executor")\
.with_inputs({
"url": "https://api.example.com/user",
"method": "GET"
})\
.build()
# Task 2: Process data (depends on Task 1)
process_task = TaskBuilder("process_data", "my_processor")\
.with_inputs({"user_id": "123"})\
.with_dependencies([fetch_task.id])\
.build()
# Execute: fetch_task runs first, then process_task
results = execute_tasks([fetch_task, process_task])
Next Steps¶
- Executor Selection Guide - Choose the right executor for your use case
- Complete Quick Start - Comprehensive tutorial with all features
- Task Dependencies - Learn about complex workflows
Common Questions¶
Where is my data stored?¶
By default, apflow uses DuckDB (embedded database) stored at .data/apflow.duckdb in your project directory.
How do I see task status?¶
Can I use PostgreSQL instead of DuckDB?¶
Yes! Set the database URL:
export APFLOW_DATABASE_URL="postgresql://user:pass@localhost/apflow"
apflow db migrate # Run migrations
How do I deploy to production?¶
For production deployments, see: - Production Deployment - Distributed Mode (multi-node orchestration)
Get Help¶
- Documentation: https://docs.apflow.dev
- GitHub Issues: Report bugs or request features
- CLI Help:
apflow --help