The @workflow Decorator
The @workflow decorator marks a function as a workflow — a composition of tasks that forms a directed acyclic graph (DAG) for execution on Temporal.
Basic Usage
from marqov import task, workflow
@task
def add(x, y):
return x + y
@workflow
def compute():
a = add(1, 2) # Returns TaskProxy, not 3
b = add(3, 4) # Returns TaskProxy, not 7
c = add(a, b) # Depends on a and b
return cWhen you call a @workflow function, it does not execute the tasks. Instead it:
- Creates a new
TransportGraph - Runs the function body in “graph building mode” — task calls record nodes and edges
- Returns a
WorkflowDispatchobject that holds the graph
Parameters
@workflow(name="VQE-H2")
def vqe_step(theta):
...| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Function name | Display name for the workflow in Temporal UI |
You can use the bare decorator for the default name:
@workflow
def my_workflow():
...The WorkflowDispatch Object
Calling a workflow function returns a WorkflowDispatch:
dispatch = compute()
# WorkflowDispatch(compute, nodes=3, parallel_groups=2)dispatch.run(client)
Execute the workflow on Temporal and wait for the result.
from temporalio.client import Client
client = await Client.connect("localhost:7233")
dispatch = compute()
result = await dispatch.run(client)
print(result) # The final output valueParameters:
client— Temporal client connectiontask_queue— Temporal task queue name (default:"marqov-workflows")
Returns: The workflow result (deserialized from JSON). If the workflow returns enriched metadata, run() unwraps it and returns only the result field.
dispatch.run_with_ids(client)
Execute the workflow and return the result along with tracking IDs. This is what the platform executor uses internally.
dispatch = vqe_step(0.5, executor_config)
raw_result, workflow_id, run_id = await dispatch.run_with_ids(client)Returns: A tuple of (result_dict, workflow_id, run_id).
The result_dict includes the full enriched result with _workflow_metadata (execution graph, task timeline, timing info) rather than unwrapping it like run() does.
dispatch.dispatch(client)
Submit the workflow to Temporal without waiting for the result. Useful for fire-and-forget patterns.
workflow_id = await dispatch.dispatch(client)
print(f"Submitted: {workflow_id}")
# Check result later
handle = client.get_workflow_handle(workflow_id)
result = await handle.result()Returns: The workflow ID string (e.g., "compute-a1b2c3d4").
dispatch.visualize()
Generate a DOT-format graph of the workflow for visualization with Graphviz.
dispatch = vqe_step(0.5, {})
dot = dispatch.visualize()
print(dot)Output:
digraph lattice {
rankdir=TB;
"a1b2c3d4" [label="measure_pauli"];
"e5f6g7h8" [label="measure_pauli"];
"i9j0k1l2" [label="compute_energy" style=filled fillcolor=lightblue];
"a1b2c3d4" -> "i9j0k1l2";
"e5f6g7h8" -> "i9j0k1l2";
}Output nodes (the workflow return value) are highlighted in light blue.
dispatch.get_parallel_groups()
Get the execution levels — groups of tasks that run in parallel at each step.
dispatch = vqe_step(0.5, {})
groups = dispatch.get_parallel_groups()
for i, group in enumerate(groups):
node_names = [dispatch.graph.nodes[nid].func_name for nid in group]
print(f"Level {i}: {node_names}")Output:
Level 0: ['measure_pauli', 'measure_pauli', 'measure_pauli', 'measure_pauli', 'measure_pauli']
Level 1: ['compute_energy']Returns: list[list[str]] — each inner list contains node IDs that execute concurrently.
Complete Example
from marqov import task, workflow, Circuit
@task(retries=3, timeout=3600)
async def measure(circuit_dict: dict, pauli: str, config: dict) -> dict:
circuit = Circuit.from_dict(circuit_dict)
# ... execute on quantum backend ...
return {"pauli": pauli, "expectation": 0.42}
@task(timeout=60)
def compute_energy(zi: dict, iz: dict) -> dict:
energy = zi["expectation"] + iz["expectation"]
return {"energy": energy}
@workflow(name="VQE-Step")
def vqe_step(theta: float, executor_config: dict):
circuit = Circuit().rx(theta, 0).cnot(0, 1)
cd = circuit.to_dict()
zi = measure(cd, "ZI", executor_config) # Parallel
iz = measure(cd, "IZ", executor_config) # Parallel
return compute_energy(zi, iz) # After both complete
async def main(client, params):
dispatch = vqe_step(params["theta"], params)
# Inspect the graph before running
print(dispatch.visualize())
print(f"Parallel groups: {len(dispatch.get_parallel_groups())}")
result = await dispatch.run(client)
return {"result": result}Workflow Return Values
A workflow can return:
- A single
TaskProxy— becomes the sole output node - A list/tuple of
TaskProxyobjects — all become output nodes, result is a dict keyed by node ID - A dict with
TaskProxyvalues — the proxy values become output nodes
@workflow
def multi_output():
a = task_a()
b = task_b()
return [a, b] # Both are output nodesDeprecated Aliases
The @lattice decorator is a deprecated alias for @workflow. It will be removed in v0.3.0:
# Old (deprecated)
from marqov import lattice
@lattice
def my_workflow():
...
# New (preferred)
from marqov import workflow
@workflow
def my_workflow():
...