The @task Decorator
The @task decorator marks a function as a unit of work in a Marqov workflow. Tasks are the building blocks that get distributed, parallelized, and retried by the Temporal execution engine.
Basic Usage
from marqov import task
@task
def add(x, y):
return x + yWhen called outside a workflow, a task executes normally and returns its result:
result = add(1, 2) # Returns 3When called inside a @workflow function, a task does not execute immediately. Instead, it returns a TaskProxy object that records the call in a dependency graph. The actual execution happens later when the workflow is dispatched to Temporal.
Parameters
@task(
name="measure-pauli", # Display name (default: function name)
executor="local", # Executor backend: "local" or "braket"
retries=3, # Retry attempts on failure (default: 0)
timeout=600, # Max execution time in seconds (default: 300)
)
async def measure(circuit_dict, pauli, config):
...| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Function name | Display name shown in Temporal UI and dashboard |
executor | str | "local" | Which executor runs this task |
retries | int | 0 | Number of retry attempts on failure |
timeout | float | 300.0 | Maximum execution time in seconds |
Bare decorator
You can use @task without parentheses for defaults:
@task
def add(x, y):
return x + yOr with explicit parameters:
@task(retries=2, timeout=60)
def add(x, y):
return x + yAutomatic Parallelization
Tasks that do not depend on each other’s results run in parallel automatically. The workflow engine detects dependencies by tracking which TaskProxy values are passed as arguments to other tasks.
Sequential pattern (has dependencies)
@task
def step_a():
return 1
@task
def step_b(a_result):
return a_result + 1
@workflow
def sequential():
a = step_a() # Level 0
b = step_b(a) # Level 1 -- depends on a, runs AFTER a
return bExecution order: step_a finishes, then step_b runs.
Parallel pattern (independent tasks)
@task(retries=3, timeout=3600)
async def measure_pauli(circuit_dict, pauli, config):
# ... run circuit on quantum backend ...
return {"pauli": pauli, "expectation": 0.42}
@task
def compute_energy(zi, iz, zz, xx, yy):
# ... combine Pauli expectations ...
return {"energy": -1.85}
@workflow
def vqe_step(theta, executor_config):
circuit = build_ansatz(theta)
circuit_dict = circuit.to_dict()
# These 5 tasks share NO dependencies -- they run IN PARALLEL
zi = measure_pauli(circuit_dict, "ZI", executor_config)
iz = measure_pauli(circuit_dict, "IZ", executor_config)
zz = measure_pauli(circuit_dict, "ZZ", executor_config)
xx = measure_pauli(circuit_dict, "XX", executor_config)
yy = measure_pauli(circuit_dict, "YY", executor_config)
# This task depends on all 5 -- runs AFTER all complete
return compute_energy(zi, iz, zz, xx, yy)Execution order:
- Level 0: All five
measure_paulitasks run concurrently - Level 1:
compute_energyruns after all five complete
You can verify this in the Temporal UI — all five activities start at the same time.
TaskProxy Behavior
When a task is called inside a @workflow, it returns a TaskProxy instead of the real result. The proxy:
- Registers a
TaskNodein the workflow’s transport graph - Records which other proxies were passed as arguments (creating dependency edges)
- Cannot be inspected for its value — it is a placeholder, not the actual result
This means you cannot branch on a task’s return value inside a workflow:
@workflow
def bad_example():
result = my_task()
if result > 0: # ERROR: result is a TaskProxy, not a number
...All branching logic must happen inside a task, not between tasks in a workflow.
Serialization Rules
Task arguments must be transportable across process boundaries. They are serialized via cloudpickle for the function reference and JSON for the arguments.
Safe argument types:
- Primitives:
int,float,str,bool,None - Collections:
list,tuple,dict(with serializable contents) TaskProxyobjects (automatically resolved to dependency references)
Avoid passing:
- Open file handles, database connections, sockets
- Lambda functions or closures as arguments
- Large objects (prefer serializing to dict first)
For quantum circuits, serialize with circuit.to_dict() and reconstruct inside the task with Circuit.from_dict():
@task
async def run_on_braket(circuit_dict: dict, config: dict) -> dict:
from marqov import Circuit
circuit = Circuit.from_dict(circuit_dict)
braket_circuit = circuit.to_braket()
# ... execute ...Async vs Sync Tasks
Tasks can be either sync or async. Use async for I/O-bound work (API calls, quantum backend execution):
@task(retries=3, timeout=3600)
async def measure(circuit_dict, config):
# Async -- good for network I/O
result = await executor.execute(circuit, shots=1000)
return result.counts
@task
def compute(counts):
# Sync -- good for pure computation
return sum(counts.values())Retry Behavior
When retries is set, Temporal automatically retries the task on failure with exponential backoff:
- Initial retry interval: 1 second
- Maximum retry interval: 60 seconds
- Backoff coefficient: 2.0
- Total attempts:
retries + 1(the original attempt plus retries)
@task(retries=3, timeout=3600)
async def call_quantum_backend(circuit_dict, config):
"""Retries up to 3 times (4 total attempts) with exponential backoff."""
...See Error Handling for more details on retry configuration and debugging failures.