Error Handling and Debugging
This guide covers how errors propagate through Marqov workflows, how to configure retries, and how to debug failures.
Error Categories
Marqov classifies errors into three categories:
| Category | Meaning | Examples |
|---|---|---|
| user | Problem with the script or input | Syntax errors, invalid QASM, unsupported gates, import errors |
| transient | Temporary failure, may succeed on retry | Rate limiting (429), backend timeout (502), network errors |
| system | Platform-level failure | Auth failures (401), internal server errors (500) |
These categories determine how errors are displayed in the dashboard and whether a “Try Again” button appears (transient errors only).
@task Retry Configuration
Configure retries on individual tasks using the retries and timeout parameters:
from marqov import task
@task(retries=3, timeout=3600)
async def call_quantum_backend(circuit_dict: dict, config: dict) -> dict:
"""Retries up to 3 times on failure. Times out after 1 hour."""
from marqov import Circuit
circuit = Circuit.from_dict(circuit_dict)
# ... execute on quantum backend ...
return {"counts": {"00": 500, "11": 500}}How Temporal Retries Work
When a task fails, Temporal retries it with exponential backoff:
- Total attempts:
retries + 1(original + retries) - Initial interval: 1 second
- Maximum interval: 60 seconds
- Backoff coefficient: 2.0
For a task with retries=3, the retry timeline looks like:
Attempt 1: immediate
(fail) -> wait 1s
Attempt 2: retry
(fail) -> wait 2s
Attempt 3: retry
(fail) -> wait 4s
Attempt 4: final retry
(fail) -> task fails permanentlyChoosing Retry Counts
| Task Type | Recommended Retries | Recommended Timeout | Reason |
|---|---|---|---|
| Pure computation | 0-1 | 60s | Deterministic; retrying won’t help |
| Simulator (SV1, DM1) | 2-3 | 600s | Occasional API/network failures |
| QPU (IonQ, Rigetti) | 3 | 3600s | Long queue times, transient API errors |
| Data fetch / S3 | 2-3 | 120s | Network flakiness |
@task(retries=0, timeout=60)
def compute_energy(zi, iz, zz, xx, yy):
"""Pure math -- no retries needed."""
return {"energy": sum_expectations(zi, iz, zz, xx, yy)}
@task(retries=3, timeout=3600)
async def measure_pauli(circuit_dict, pauli, config):
"""Quantum backend call -- retries for transient failures."""
...Timeout Handling
The timeout parameter sets the maximum wall-clock time for a single task execution (one attempt, not total across retries). If a task exceeds its timeout, Temporal cancels it and either retries (if retries remain) or fails the task permanently.
@task(timeout=300) # 5 minutes per attempt
async def medium_task():
...
@task(timeout=3600) # 1 hour per attempt (for QPU queue times)
async def long_running_task():
...The overall workflow also has a timeout set by the executor (default: 3600 seconds). This is a hard cap on total workflow execution time, regardless of individual task timeouts.
How Errors Appear in the Dashboard
Workflow-level failures
If any task fails permanently (exhausts all retries), the entire workflow fails. The dashboard shows:
- Job status: Failed
- Error message from the failed task
- Workflow execution graph with the failed task highlighted
- Task timeline showing which tasks completed and which failed
Task-level errors in the timeline
Each task in the workflow execution timeline includes:
status:"completed"or"failed"started_at/completed_attimestampserror: error message (if failed)
Debugging with Temporal UI
The Temporal UI (typically at http://localhost:8080 for local development) provides detailed visibility into workflow execution.
Finding your workflow
- Open the Temporal UI
- Navigate to the
defaultnamespace - Search by workflow ID (shown in the job detail page) or browse recent workflows
Inspecting execution
The workflow detail page shows:
- Event history: Every activity scheduled, started, completed, or failed
- Activity inputs/outputs: The serialized arguments and return values
- Retry attempts: Each retry with its error message
- Timing: Start time, end time, and duration for every activity
Common patterns to look for
All activities at one level started simultaneously: This confirms parallel execution is working. If activities start sequentially, check that your tasks have no unintended dependencies.
Activity retrying repeatedly: Check the error message in each retry. Common causes:
- Network timeout to quantum backend
- Rate limiting from cloud provider
- S3 permission errors
Workflow timed out: The overall workflow exceeded its timeout. Consider increasing individual task timeouts or the workflow timeout.
Writing Defensive Tasks
Handle import errors inside tasks
Tasks execute in Temporal activity workers, which may have different dependencies than your local environment. Import heavy libraries inside the task body:
@task(retries=2, timeout=600)
async def run_on_braket(circuit_dict: dict, config: dict) -> dict:
# Import inside the task -- not at module level
from marqov import Circuit
from marqov.executors.braket import BraketExecutor, BraketExecutorConfig
circuit = Circuit.from_dict(circuit_dict)
executor = BraketExecutor(BraketExecutorConfig(**config))
result = await executor.execute(circuit, shots=1000)
return {"counts": result.counts}Return serializable results
Task results must be JSON-serializable because they pass through Temporal’s serialization layer:
@task
def good_task():
return {"energy": -1.85, "counts": {"00": 500, "11": 500}}
@task
def bad_task():
import numpy as np
return np.array([1, 2, 3]) # NumPy arrays are not JSON-serializableConvert numpy arrays, complex objects, etc. to Python primitives before returning.
Provide clear error messages
When raising errors from tasks, include context that helps debugging:
@task(retries=2, timeout=600)
async def measure(circuit_dict: dict, pauli: str, config: dict) -> dict:
if not config.get("device_arn"):
raise ValueError(
f"device_arn is required in config. Got keys: {list(config.keys())}"
)
...Example: Full Error-Resilient Workflow
from marqov import task, workflow, Circuit
@task(retries=3, timeout=3600)
async def measure_pauli(circuit_dict: dict, pauli: str, config: dict) -> dict:
"""Quantum measurement with retries for backend flakiness."""
from marqov import Circuit
from marqov.executors.braket import BraketExecutor, BraketExecutorConfig
circuit = Circuit.from_dict(circuit_dict)
executor = BraketExecutor(BraketExecutorConfig(**config))
result = await executor.execute(circuit, shots=1000)
return {
"pauli": pauli,
"expectation": compute_expectation(result.counts, pauli),
}
@task(retries=0, timeout=60)
def compute_energy(zi: dict, iz: dict) -> dict:
"""Pure computation -- no retries, short timeout."""
energy = zi["expectation"] * 0.3435 + iz["expectation"] * -0.4347
return {"energy": energy}
@workflow(name="Resilient-VQE")
def vqe_step(theta: float, executor_config: dict):
circuit = Circuit().rx(theta, 0).cnot(0, 1)
cd = circuit.to_dict()
zi = measure_pauli(cd, "ZI", executor_config) # 3 retries
iz = measure_pauli(cd, "IZ", executor_config) # 3 retries
return compute_energy(zi, iz) # No retries
async def main(client, params):
dispatch = vqe_step(
theta=params.get("theta", 0.5),
executor_config=params,
)
result = await dispatch.run(client)
return {
"result": result,
"_summary": {
"Energy": f"{result['energy']:.4f} Ha",
},
}