Skip to Content
DocsWriting ScriptsThe @task Decorator

The @task Decorator

The @task decorator marks a function as a unit of work in a Marqov workflow. Tasks are the building blocks that get distributed, parallelized, and retried by the Temporal execution engine.

Basic Usage

from marqov import task @task def add(x, y): return x + y

When called outside a workflow, a task executes normally and returns its result:

result = add(1, 2) # Returns 3

When called inside a @workflow function, a task does not execute immediately. Instead, it returns a TaskProxy object that records the call in a dependency graph. The actual execution happens later when the workflow is dispatched to Temporal.

Parameters

@task( name="measure-pauli", # Display name (default: function name) executor="local", # Executor backend: "local" or "braket" retries=3, # Retry attempts on failure (default: 0) timeout=600, # Max execution time in seconds (default: 300) ) async def measure(circuit_dict, pauli, config): ...
ParameterTypeDefaultDescription
namestrFunction nameDisplay name shown in Temporal UI and dashboard
executorstr"local"Which executor runs this task
retriesint0Number of retry attempts on failure
timeoutfloat300.0Maximum execution time in seconds

Bare decorator

You can use @task without parentheses for defaults:

@task def add(x, y): return x + y

Or with explicit parameters:

@task(retries=2, timeout=60) def add(x, y): return x + y

Automatic Parallelization

Tasks that do not depend on each other’s results run in parallel automatically. The workflow engine detects dependencies by tracking which TaskProxy values are passed as arguments to other tasks.

Sequential pattern (has dependencies)

@task def step_a(): return 1 @task def step_b(a_result): return a_result + 1 @workflow def sequential(): a = step_a() # Level 0 b = step_b(a) # Level 1 -- depends on a, runs AFTER a return b

Execution order: step_a finishes, then step_b runs.

Parallel pattern (independent tasks)

@task(retries=3, timeout=3600) async def measure_pauli(circuit_dict, pauli, config): # ... run circuit on quantum backend ... return {"pauli": pauli, "expectation": 0.42} @task def compute_energy(zi, iz, zz, xx, yy): # ... combine Pauli expectations ... return {"energy": -1.85} @workflow def vqe_step(theta, executor_config): circuit = build_ansatz(theta) circuit_dict = circuit.to_dict() # These 5 tasks share NO dependencies -- they run IN PARALLEL zi = measure_pauli(circuit_dict, "ZI", executor_config) iz = measure_pauli(circuit_dict, "IZ", executor_config) zz = measure_pauli(circuit_dict, "ZZ", executor_config) xx = measure_pauli(circuit_dict, "XX", executor_config) yy = measure_pauli(circuit_dict, "YY", executor_config) # This task depends on all 5 -- runs AFTER all complete return compute_energy(zi, iz, zz, xx, yy)

Execution order:

  • Level 0: All five measure_pauli tasks run concurrently
  • Level 1: compute_energy runs after all five complete

You can verify this in the Temporal UI — all five activities start at the same time.

TaskProxy Behavior

When a task is called inside a @workflow, it returns a TaskProxy instead of the real result. The proxy:

  1. Registers a TaskNode in the workflow’s transport graph
  2. Records which other proxies were passed as arguments (creating dependency edges)
  3. Cannot be inspected for its value — it is a placeholder, not the actual result

This means you cannot branch on a task’s return value inside a workflow:

@workflow def bad_example(): result = my_task() if result > 0: # ERROR: result is a TaskProxy, not a number ...

All branching logic must happen inside a task, not between tasks in a workflow.

Serialization Rules

Task arguments must be transportable across process boundaries. They are serialized via cloudpickle for the function reference and JSON for the arguments.

Safe argument types:

  • Primitives: int, float, str, bool, None
  • Collections: list, tuple, dict (with serializable contents)
  • TaskProxy objects (automatically resolved to dependency references)

Avoid passing:

  • Open file handles, database connections, sockets
  • Lambda functions or closures as arguments
  • Large objects (prefer serializing to dict first)

For quantum circuits, serialize with circuit.to_dict() and reconstruct inside the task with Circuit.from_dict():

@task async def run_on_braket(circuit_dict: dict, config: dict) -> dict: from marqov import Circuit circuit = Circuit.from_dict(circuit_dict) braket_circuit = circuit.to_braket() # ... execute ...

Async vs Sync Tasks

Tasks can be either sync or async. Use async for I/O-bound work (API calls, quantum backend execution):

@task(retries=3, timeout=3600) async def measure(circuit_dict, config): # Async -- good for network I/O result = await executor.execute(circuit, shots=1000) return result.counts @task def compute(counts): # Sync -- good for pure computation return sum(counts.values())

Retry Behavior

When retries is set, Temporal automatically retries the task on failure with exponential backoff:

  • Initial retry interval: 1 second
  • Maximum retry interval: 60 seconds
  • Backoff coefficient: 2.0
  • Total attempts: retries + 1 (the original attempt plus retries)
@task(retries=3, timeout=3600) async def call_quantum_backend(circuit_dict, config): """Retries up to 3 times (4 total attempts) with exponential backoff.""" ...

See Error Handling for more details on retry configuration and debugging failures.

Last updated on