Python SDK Reference
Package: marqov (v0.2.0-dev)
Install: pip install marqov
Optional extras: pip install marqov[qiskit], marqov[cirq], marqov[pennylane], marqov[openqasm]
marqov.circuits
Circuit
Backend-agnostic quantum circuit with a fluent API. All gate methods return self for chaining.
from marqov import Circuit
circuit = Circuit().h(0).cnot(0, 1)Constructor:
Circuit() -> CircuitCreates an empty circuit.
Properties:
| Property | Type | Description |
|---|---|---|
num_qubits | int | Number of qubits in the circuit |
Single-qubit gates:
| Method | Signature | Description |
|---|---|---|
h(qubit) | (int) -> Circuit | Hadamard gate |
x(qubit) | (int) -> Circuit | Pauli-X (NOT) gate |
y(qubit) | (int) -> Circuit | Pauli-Y gate |
z(qubit) | (int) -> Circuit | Pauli-Z gate |
s(qubit) | (int) -> Circuit | S (phase) gate |
t(qubit) | (int) -> Circuit | T gate |
Rotation gates (angles in radians):
| Method | Signature | Description |
|---|---|---|
rx(angle, qubit) | (float, int) -> Circuit | Rx rotation |
ry(angle, qubit) | (float, int) -> Circuit | Ry rotation |
rz(angle, qubit) | (float, int) -> Circuit | Rz rotation |
Two-qubit gates:
| Method | Signature | Description |
|---|---|---|
cnot(control, target) | (int, int) -> Circuit | CNOT (CX) gate |
cx(control, target) | (int, int) -> Circuit | Alias for cnot |
cz(control, target) | (int, int) -> Circuit | CZ gate |
swap(qubit0, qubit1) | (int, int) -> Circuit | SWAP gate |
Backend conversion:
| Method | Signature | Description |
|---|---|---|
to_braket() | () -> braket.circuits.Circuit | Convert to Amazon Braket circuit |
to_qiskit() | () -> qiskit.QuantumCircuit | Convert to IBM Qiskit circuit |
to_cirq() | () -> cirq.Circuit | Convert to Google Cirq circuit |
to_pyquil() | () -> pyquil.Program | Convert to Rigetti PyQuil program |
to_openqasm(version=2) | (int) -> str | Export as OpenQASM string (version 2 or 3). Requires qiskit. |
Import methods (class methods):
| Method | Signature | Description |
|---|---|---|
Circuit.from_braket(circuit) | (BraketCircuit) -> Circuit | Import from Braket circuit |
Circuit.from_qiskit(circuit) | (QuantumCircuit) -> Circuit | Import from Qiskit circuit. Unsupported gates are auto-decomposed. Requires qiskit. |
Circuit.from_cirq(circuit) | (cirq.Circuit) -> Circuit | Import from Cirq circuit. Only LineQubit circuits accepted. Requires cirq. |
Circuit.from_pennylane(tape) | (QuantumScript) -> Circuit | Import from PennyLane tape. Only integer wires accepted. Requires pennylane. |
Circuit.from_openqasm(qasm_string) | (str) -> Circuit | Import from OpenQASM 2.0 or 3.0 string. Auto-detects version. Requires qiskit. |
Serialization:
| Method | Signature | Description |
|---|---|---|
to_dict() | () -> dict | Serialize to dictionary (for Temporal transport) |
Circuit.from_dict(data) | (dict) -> Circuit | Reconstruct from dictionary |
Simulation:
| Method | Signature | Description |
|---|---|---|
simulate() | () -> qf.State | Run on local QuantumFlow simulator |
bell_state
def bell_state() -> CircuitCreate a Bell state circuit (|00> + |11>)/sqrt(2). Equivalent to Circuit().h(0).cnot(0, 1).
ghz_state
def ghz_state(num_qubits: int) -> CircuitCreate a GHZ state circuit for num_qubits qubits. Applies H to qubit 0, then CNOT chain.
marqov.device
MarqovDevice
Unified interface for quantum backend execution. Accepts any supported circuit type and automatically converts to the target backend’s native format.
from marqov import MarqovDevice, get_device
device = get_device({"backend": "sv1", "device_arn": "...", "s3_bucket": "..."})
counts = device.run(circuit, shots=1000)Constructor:
MarqovDevice(backend: str, params: dict) -> MarqovDevice| Parameter | Type | Description |
|---|---|---|
backend | str | Backend identifier (e.g., sv1, local, ionq-aria-1) |
params | dict | Execution parameters (provider-specific) |
Properties:
| Property | Type | Description |
|---|---|---|
backend_name | str | Backend identifier string |
is_simulator | bool | True if targeting a simulator backend |
Methods:
run
def run(self, circuit, shots: int = 1000) -> dict[str, int]Execute a circuit and return measurement counts.
| Parameter | Type | Description |
|---|---|---|
circuit | Circuit | str | BraketCircuit | QuantumCircuit | cirq.Circuit | QuantumScript | Any supported circuit type |
shots | int | Number of measurement shots (default: 1000) |
Returns: Dictionary mapping bitstring outcomes to counts (e.g., {"00": 512, "11": 488}).
Raises: TypeError if circuit type is not supported. ValueError if S3 configuration is missing for AWS devices.
Supported circuit types for auto-conversion:
marqov.Circuit(native)str(OpenQASM)braket.circuits.Circuitqiskit.QuantumCircuitcirq.Circuitpennylane.tape.QuantumScript
get_device
def get_device(params: dict) -> MarqovDeviceFactory function to create a MarqovDevice from execution parameters.
| Parameter | Type | Description |
|---|---|---|
params | dict | Must contain backend key. Additional keys depend on provider. |
Required params keys by provider:
| Provider | Required Keys |
|---|---|
Local (local, marqov-sim) | backend |
| AWS Braket | backend, device_arn, s3_bucket or s3_destination_folder |
| Azure Quantum | backend, azure_subscription_id, azure_resource_group, azure_workspace_name |
Raises: ValueError if backend is missing.
marqov.executors
BaseExecutor (abstract)
Abstract base class for all quantum executors.
class BaseExecutor(ABC):
async def execute(self, circuit: Circuit, shots: int = 1000, **kwargs) -> ExecutionResult: ...
async def cancel(self, job_id: str) -> bool: ...
@property
def name(self) -> str: ...ExecutionResult
Dataclass holding results from circuit execution.
@dataclass
class ExecutionResult:
counts: dict[str, int]
backend: str
execution_time_ms: float
shots: int = 0
raw_result: Any = None
metadata: dict[str, Any] = field(default_factory=dict)| Attribute | Type | Description |
|---|---|---|
counts | dict[str, int] | Measurement outcome counts |
backend | str | Backend name |
execution_time_ms | float | Execution time in milliseconds |
shots | int | Number of shots |
raw_result | Any | Backend-specific raw result |
metadata | dict | Additional execution metadata |
Properties:
| Property | Type | Description |
|---|---|---|
probabilities | dict[str, float] | Measurement probabilities computed from counts |
LocalExecutor
Execute circuits on the QuantumFlow local state vector simulator. No cloud credentials required.
from marqov import LocalExecutor
executor = LocalExecutor()
result = await executor.execute(circuit, shots=1000)Constructor:
LocalExecutor(config: LocalExecutorConfig | None = None)LocalExecutorConfig fields:
| Field | Type | Default | Description |
|---|---|---|---|
seed | int | None | None | Random seed for reproducibility |
BraketExecutor
Execute circuits on AWS Braket devices (simulators and QPUs).
from marqov.executors import BraketExecutor, BraketExecutorConfig
config = BraketExecutorConfig(
device_arn="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
s3_bucket="amazon-braket-my-bucket",
)
executor = BraketExecutor(config)
result = await executor.execute(circuit, shots=1000)BraketExecutorConfig fields:
| Field | Type | Default | Description |
|---|---|---|---|
device_arn | str | (required) | Braket device ARN |
s3_bucket | str | (required) | S3 bucket for task results |
s3_prefix | str | "marqov" | S3 object prefix |
aws_profile | str | None | None | AWS profile name |
aws_region | str | None | None | AWS region (inferred from ARN if omitted) |
poll_interval_seconds | float | 1.0 | Polling interval for task completion |
timeout_seconds | float | None | None | Max wait time (None = no timeout) |
Additional methods:
| Method | Signature | Description |
|---|---|---|
cancel(job_id) | async (str) -> bool | Cancel a running Braket task by ARN |
get_device_status() | async () -> str | Get device status (ONLINE, OFFLINE, RETIRED) |
is_device_available() | async () -> bool | Check if device is ONLINE |
get_queue_depth() | async () -> dict[str, int] | Get queue depth info |
AzureQuantumExecutor
Execute circuits on Azure Quantum backends (Quantinuum, PASQAL, IonQ, Rigetti).
AzureQuantumExecutorConfig fields:
| Field | Type | Default | Description |
|---|---|---|---|
subscription_id | str | (required) | Azure subscription ID |
resource_group | str | (required) | Azure resource group |
workspace_name | str | (required) | Azure Quantum workspace name |
location | str | (required) | Azure region |
target | str | (required) | Target device name |
framework | str | "qiskit" | Framework to use (qiskit or cirq) |
timeout_seconds | float | 300.0 | Max wait time |
poll_interval_seconds | float | 2.0 | Polling interval |
ExecutorFactory
Factory for creating executors based on provider configuration.
from marqov.executors import ExecutorFactory
executor = ExecutorFactory.create_executor("sv1", {
"provider": "AWS Braket",
"device_arn": "arn:aws:braket:::device/quantum-simulator/amazon/sv1",
"s3_bucket": "my-bucket",
})Class methods:
| Method | Signature | Description |
|---|---|---|
create_executor(backend_slug, backend_config) | (str, dict) -> BaseExecutor | Create executor for given backend |
get_supported_providers() | () -> list[str] | List supported providers |
is_provider_supported(provider) | (str) -> bool | Check if provider is supported |
Supported providers: AWS Braket, Azure Quantum, Quantum Brilliance, Local. Coming soon: IBM Quantum, IonQ Direct.
marqov.workflows
task
Decorator to mark a function as a task (unit of work in a workflow).
from marqov import task
@task
def add(x, y):
return x + y
# With options:
@task(executor="braket", retries=3, timeout=600)
async def measure(circuit):
return await executor.run(circuit)Parameters (when used with options):
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | Function name | Display name for the task |
executor | str | "local" | Executor to use (local, braket) |
retries | int | 0 | Retry attempts on failure |
timeout | float | 300.0 | Maximum execution time in seconds |
Behavior:
- Outside a
@workflow: executes normally and returns the result. - Inside a
@workflow: returns aTaskProxy, registering the call in the transport graph for deferred execution.
workflow
Decorator to mark a function as a workflow (composition of tasks).
from marqov import task, workflow
@workflow
def vqe_step(theta):
circuit = build(theta)
z0 = measure(circuit, "ZI") # runs in parallel
z1 = measure(circuit, "IZ") # runs in parallel
return compute(z0, z1)
dispatch = vqe_step(0.5)
result = await dispatch.run(client)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | Function name | Display name for the workflow |
Returns: WorkflowDispatch object.
WorkflowDispatch
Handle for dispatching a workflow to Temporal.
dispatch = my_workflow(args)Methods:
| Method | Signature | Description |
|---|---|---|
visualize() | () -> str | Return DOT format graph visualization |
get_parallel_groups() | () -> list[list[str]] | Get groups of tasks that can run in parallel |
run(client, task_queue="marqov-workflows") | async (Any, str) -> Any | Execute on Temporal and wait for result |
run_with_ids(client, task_queue="marqov-workflows") | async (Any, str) -> tuple[Any, str, str] | Execute and return (result, workflow_id, run_id) |
dispatch(client, task_queue="marqov-workflows") | async (Any, str) -> str | Submit without waiting; returns workflow ID |
Attributes:
| Attribute | Type | Description |
|---|---|---|
graph | TransportGraph | The task dependency graph |
name | str | Workflow name |
TemporalConfig
Configuration for Temporal connection.
from marqov import TemporalConfig
config = TemporalConfig(
host="localhost",
port=7233,
namespace="default",
task_queue="marqov-workflows",
)
print(config.address) # "localhost:7233"| Attribute | Type | Default | Description |
|---|---|---|---|
host | str | "localhost" | Temporal server host |
port | int | 7233 | Temporal server port |
namespace | str | "default" | Temporal namespace |
task_queue | str | "marqov-workflows" | Default task queue name |
Properties:
| Property | Type | Description |
|---|---|---|
address | str | "{host}:{port}" |
create_worker
async def create_worker(client, task_queue: str = "marqov-workflows") -> WorkerCreate a Temporal worker that processes Marqov workflows. The worker registers the JobWorkflow and execute_task activity.
TaskConfig
Configuration for an individual task.
| Attribute | Type | Default | Description |
|---|---|---|---|
name | str | — | Task display name |
executor | str | "local" | Executor identifier |
retries | int | 0 | Retry attempts |
timeout_seconds | float | 300.0 | Maximum execution time |
Deprecated Aliases
The following are deprecated and will be removed in v0.3.0:
| Deprecated | Replacement |
|---|---|
@electron | @task |
@lattice | @workflow |
LatticeDispatch | WorkflowDispatch |
LatticeWorkflow | JobWorkflow |
ElectronProxy | TaskProxy |
execute_electron | execute_task |