Skip to Content
DocsWriting ScriptsThe @workflow Decorator

The @workflow Decorator

The @workflow decorator marks a function as a workflow — a composition of tasks that forms a directed acyclic graph (DAG) for execution on Temporal.

Basic Usage

from marqov import task, workflow @task def add(x, y): return x + y @workflow def compute(): a = add(1, 2) # Returns TaskProxy, not 3 b = add(3, 4) # Returns TaskProxy, not 7 c = add(a, b) # Depends on a and b return c

When you call a @workflow function, it does not execute the tasks. Instead it:

  1. Creates a new TransportGraph
  2. Runs the function body in “graph building mode” — task calls record nodes and edges
  3. Returns a WorkflowDispatch object that holds the graph

Parameters

@workflow(name="VQE-H2") def vqe_step(theta): ...
ParameterTypeDefaultDescription
namestrFunction nameDisplay name for the workflow in Temporal UI

You can use the bare decorator for the default name:

@workflow def my_workflow(): ...

The WorkflowDispatch Object

Calling a workflow function returns a WorkflowDispatch:

dispatch = compute() # WorkflowDispatch(compute, nodes=3, parallel_groups=2)

dispatch.run(client)

Execute the workflow on Temporal and wait for the result.

from temporalio.client import Client client = await Client.connect("localhost:7233") dispatch = compute() result = await dispatch.run(client) print(result) # The final output value

Parameters:

  • client — Temporal client connection
  • task_queue — Temporal task queue name (default: "marqov-workflows")

Returns: The workflow result (deserialized from JSON). If the workflow returns enriched metadata, run() unwraps it and returns only the result field.

dispatch.run_with_ids(client)

Execute the workflow and return the result along with tracking IDs. This is what the platform executor uses internally.

dispatch = vqe_step(0.5, executor_config) raw_result, workflow_id, run_id = await dispatch.run_with_ids(client)

Returns: A tuple of (result_dict, workflow_id, run_id).

The result_dict includes the full enriched result with _workflow_metadata (execution graph, task timeline, timing info) rather than unwrapping it like run() does.

dispatch.dispatch(client)

Submit the workflow to Temporal without waiting for the result. Useful for fire-and-forget patterns.

workflow_id = await dispatch.dispatch(client) print(f"Submitted: {workflow_id}") # Check result later handle = client.get_workflow_handle(workflow_id) result = await handle.result()

Returns: The workflow ID string (e.g., "compute-a1b2c3d4").

dispatch.visualize()

Generate a DOT-format graph of the workflow for visualization with Graphviz.

dispatch = vqe_step(0.5, {}) dot = dispatch.visualize() print(dot)

Output:

digraph lattice { rankdir=TB; "a1b2c3d4" [label="measure_pauli"]; "e5f6g7h8" [label="measure_pauli"]; "i9j0k1l2" [label="compute_energy" style=filled fillcolor=lightblue]; "a1b2c3d4" -> "i9j0k1l2"; "e5f6g7h8" -> "i9j0k1l2"; }

Output nodes (the workflow return value) are highlighted in light blue.

dispatch.get_parallel_groups()

Get the execution levels — groups of tasks that run in parallel at each step.

dispatch = vqe_step(0.5, {}) groups = dispatch.get_parallel_groups() for i, group in enumerate(groups): node_names = [dispatch.graph.nodes[nid].func_name for nid in group] print(f"Level {i}: {node_names}")

Output:

Level 0: ['measure_pauli', 'measure_pauli', 'measure_pauli', 'measure_pauli', 'measure_pauli'] Level 1: ['compute_energy']

Returns: list[list[str]] — each inner list contains node IDs that execute concurrently.

Complete Example

from marqov import task, workflow, Circuit @task(retries=3, timeout=3600) async def measure(circuit_dict: dict, pauli: str, config: dict) -> dict: circuit = Circuit.from_dict(circuit_dict) # ... execute on quantum backend ... return {"pauli": pauli, "expectation": 0.42} @task(timeout=60) def compute_energy(zi: dict, iz: dict) -> dict: energy = zi["expectation"] + iz["expectation"] return {"energy": energy} @workflow(name="VQE-Step") def vqe_step(theta: float, executor_config: dict): circuit = Circuit().rx(theta, 0).cnot(0, 1) cd = circuit.to_dict() zi = measure(cd, "ZI", executor_config) # Parallel iz = measure(cd, "IZ", executor_config) # Parallel return compute_energy(zi, iz) # After both complete async def main(client, params): dispatch = vqe_step(params["theta"], params) # Inspect the graph before running print(dispatch.visualize()) print(f"Parallel groups: {len(dispatch.get_parallel_groups())}") result = await dispatch.run(client) return {"result": result}

Workflow Return Values

A workflow can return:

  • A single TaskProxy — becomes the sole output node
  • A list/tuple of TaskProxy objects — all become output nodes, result is a dict keyed by node ID
  • A dict with TaskProxy values — the proxy values become output nodes
@workflow def multi_output(): a = task_a() b = task_b() return [a, b] # Both are output nodes

Deprecated Aliases

The @lattice decorator is a deprecated alias for @workflow. It will be removed in v0.3.0:

# Old (deprecated) from marqov import lattice @lattice def my_workflow(): ... # New (preferred) from marqov import workflow @workflow def my_workflow(): ...
Last updated on