Skip to Content
DocsTutorialsParallel Task Execution

Parallel Task Execution

This tutorial explains how Marqov automatically detects independent tasks and runs them in parallel. You will learn how the dependency graph is built, how execution levels are determined, and how to structure your workflows to maximize parallelism.

By the end, you will understand:

  • How Marqov traces task calls to build a dependency graph (the TransportGraph)
  • How execution levels are computed via topological sort
  • How to write workflows that maximize parallel execution
  • How to visualize the workflow graph

Prerequisites

The Key Idea

When you call a @task function inside a @workflow, it does not execute immediately. Instead, it returns a TaskProxy — a placeholder that records the call and its inputs in a directed acyclic graph (DAG) called the TransportGraph.

Marqov inspects the inputs to each task call. If an input is a TaskProxy (the output of another task), that creates a dependency edge. If two tasks share no proxy inputs from each other, they are independent and can run in parallel.

A Simple Example

Consider this workflow:

from marqov import task, workflow @task def add(x, y): return x + y @workflow def compute(): a = add(1, 2) # No dependencies -- Level 0 b = add(3, 4) # No dependencies -- Level 0 c = add(a, b) # Depends on a and b -- Level 1 return c

When you call compute(), Marqov traces the function and builds this graph:

add(1,2) ----+ +--> add(a, b) add(3,4) ----+

Tasks add(1,2) and add(3,4) have no task dependencies (their inputs are plain integers, not TaskProxy objects), so they are placed at Level 0 and run in parallel. Task add(a, b) takes two TaskProxy inputs, so it depends on both and is placed at Level 1.

How Dependencies Are Detected

The dependency detection happens in extract_dependencies(). When a @task is called inside a @workflow, Marqov examines every positional and keyword argument:

  • If an argument is a TaskProxy, a dependency edge is created from that proxy’s task to the current task.
  • If an argument is a list or tuple containing TaskProxy objects, each one creates a dependency edge.
  • Plain values (integers, strings, dicts) create no dependencies.

This is why you can pass the same task output to multiple downstream tasks and they will all depend on it:

@workflow def fan_out(): data = prepare() # Level 0 r1 = process_a(data) # Level 1 (depends on prepare) r2 = process_b(data) # Level 1 (depends on prepare) r3 = process_c(data) # Level 1 (depends on prepare) return combine(r1, r2, r3) # Level 2 (depends on all three)

This produces 3-way parallelism at Level 1.

How Execution Levels Work

Marqov computes execution levels using a topological sort. The algorithm is straightforward:

  1. Find all nodes with no unresolved dependencies. These form Level 0.
  2. Remove Level 0 nodes from the graph.
  3. Find all nodes whose dependencies are now fully resolved. These form Level 1.
  4. Repeat until the graph is empty.

If a cycle is detected (task A depends on B, B depends on A), Marqov raises a ValueError.

The result is a list of lists, where each inner list contains node IDs that can execute concurrently:

dispatch = compute() levels = dispatch.get_parallel_groups() # levels = [["node_a", "node_b"], ["node_c"]]

Tasks within the same level run in parallel as concurrent Temporal activities. Levels execute in strict order — Level 1 does not start until every task in Level 0 has completed.

A Real-World Example: VQE

The VQE H2 benchmark demonstrates this pattern with 5 parallel Pauli measurements:

@workflow(name="VQE-H2-SV1") def vqe_step(theta, executor_config): circuit = build_ansatz(theta) circuit_dict = circuit.to_dict() # 5 independent measurements -- all depend only on build_ansatz zi = measure_pauli(circuit_dict, "ZI", executor_config) iz = measure_pauli(circuit_dict, "IZ", executor_config) zz = measure_pauli(circuit_dict, "ZZ", executor_config) xx = measure_pauli(circuit_dict, "XX", executor_config) yy = measure_pauli(circuit_dict, "YY", executor_config) # Energy computation depends on all 5 measurements return compute_energy(zi, iz, zz, xx, yy)

Execution levels:

Level 0: build_ansatz (1 task) Level 1: measure_pauli x5 (5 tasks, parallel) Level 2: compute_energy (1 task)

On SV1, each measure_pauli call takes about 2 seconds. Sequential execution would take 10 seconds for measurements alone. With 5-way parallelism, the measurement level completes in about 2 seconds — a roughly 43% reduction in total wall time.

Visualizing the Graph

Every WorkflowDispatch object can produce a DOT-format graph visualization:

dispatch = vqe_step(0.5, {}) print(dispatch.visualize())

This outputs:

digraph lattice { rankdir=TB; "a1b2c3d4" [label="build_ansatz"]; "e5f6g7h8" [label="measure_pauli"]; "i9j0k1l2" [label="measure_pauli"]; "m3n4o5p6" [label="measure_pauli"]; "q7r8s9t0" [label="measure_pauli"]; "u1v2w3x4" [label="measure_pauli"]; "y5z6a7b8" [label="compute_energy" style=filled fillcolor=lightblue]; "a1b2c3d4" -> "e5f6g7h8"; "a1b2c3d4" -> "i9j0k1l2"; ... }

You can render this with any Graphviz tool to see the workflow structure. Output nodes (the final results) are highlighted in light blue.

You can also inspect the parallel groups directly:

dispatch = vqe_step(0.5, {}) groups = dispatch.get_parallel_groups() for i, group in enumerate(groups): node_names = [dispatch.graph.nodes[nid].func_name for nid in group] print(f"Level {i}: {node_names} ({len(group)} parallel)")

Output:

Level 0: ['build_ansatz'] (1 parallel) Level 1: ['measure_pauli', 'measure_pauli', 'measure_pauli', 'measure_pauli', 'measure_pauli'] (5 parallel) Level 2: ['compute_energy'] (1 parallel)

Sequential vs. Parallel: Before and After

Here is a concrete comparison. Consider a workflow with 3 independent tasks, each taking 2 seconds:

Sequential execution (no parallelism)

task_a: |████████| 2s task_b: |████████| 2s task_c: |████████| 2s Total: 6s

Parallel execution (Marqov default)

task_a: |████████| 2s task_b: |████████| 2s task_c: |████████| 2s Total: 2s

Marqov achieves this automatically. You do not need to write any threading or async code. The dependency graph determines what can run in parallel, and Temporal executes independent activities concurrently.

Patterns for Maximizing Parallelism

Fan-out / Fan-in

The most common pattern. One task produces data, multiple tasks process it independently, then one task combines the results:

@workflow def map_reduce(): data = prepare() results = [process(data, i) for i in range(10)] # 10-way parallel return aggregate(*results)

Diamond

Two separate preparation tasks feed into two processing tasks, which both feed into a final task:

@workflow def diamond(): a = prep_a() # Level 0 b = prep_b() # Level 0 (parallel with prep_a) c = merge(a, b) # Level 1 return finalize(c) # Level 2

Independent pipelines

Two completely independent chains that run in parallel end-to-end:

@workflow def dual_pipeline(): x1 = step_a1() # Level 0 x2 = step_a2(x1) # Level 1 y1 = step_b1() # Level 0 (parallel with step_a1) y2 = step_b2(y1) # Level 1 (parallel with step_a2) return combine(x2, y2) # Level 2

What to Avoid

Hidden sequential dependencies

If you pass the output of one task to the next, you create a chain:

@workflow def sequential(): a = step1() b = step2(a) # Depends on step1 c = step3(b) # Depends on step2 return c # All sequential, no parallelism

This is not wrong — it just does not benefit from parallelism. If the tasks genuinely depend on each other, this is the correct structure.

Unnecessary dependencies

Avoid passing task outputs you do not actually need:

# Bad: step2 receives 'a' but does not use it, yet it creates a dependency @workflow def accidental_dependency(): a = step1() b = step2(a) # step2 now depends on step1 c = step3() # step3 is independent -- runs parallel with step2 return combine(b, c) # Better: if step2 does not need step1's output, do not pass it @workflow def clean_parallel(): a = step1() # Level 0 b = step2() # Level 0 (parallel with step1!) c = step3() # Level 0 (parallel with both!) return combine(a, b, c)

Next Steps

Last updated on