Skip to Content

Python SDK Reference

Package: marqov (v0.2.0-dev)

Install: pip install marqov

Optional extras: pip install marqov[qiskit], marqov[cirq], marqov[pennylane], marqov[openqasm]


marqov.circuits

Circuit

Backend-agnostic quantum circuit with a fluent API. All gate methods return self for chaining.

from marqov import Circuit circuit = Circuit().h(0).cnot(0, 1)

Constructor:

Circuit() -> Circuit

Creates an empty circuit.

Properties:

PropertyTypeDescription
num_qubitsintNumber of qubits in the circuit

Single-qubit gates:

MethodSignatureDescription
h(qubit)(int) -> CircuitHadamard gate
x(qubit)(int) -> CircuitPauli-X (NOT) gate
y(qubit)(int) -> CircuitPauli-Y gate
z(qubit)(int) -> CircuitPauli-Z gate
s(qubit)(int) -> CircuitS (phase) gate
t(qubit)(int) -> CircuitT gate

Rotation gates (angles in radians):

MethodSignatureDescription
rx(angle, qubit)(float, int) -> CircuitRx rotation
ry(angle, qubit)(float, int) -> CircuitRy rotation
rz(angle, qubit)(float, int) -> CircuitRz rotation

Two-qubit gates:

MethodSignatureDescription
cnot(control, target)(int, int) -> CircuitCNOT (CX) gate
cx(control, target)(int, int) -> CircuitAlias for cnot
cz(control, target)(int, int) -> CircuitCZ gate
swap(qubit0, qubit1)(int, int) -> CircuitSWAP gate

Backend conversion:

MethodSignatureDescription
to_braket()() -> braket.circuits.CircuitConvert to Amazon Braket circuit
to_qiskit()() -> qiskit.QuantumCircuitConvert to IBM Qiskit circuit
to_cirq()() -> cirq.CircuitConvert to Google Cirq circuit
to_pyquil()() -> pyquil.ProgramConvert to Rigetti PyQuil program
to_openqasm(version=2)(int) -> strExport as OpenQASM string (version 2 or 3). Requires qiskit.

Import methods (class methods):

MethodSignatureDescription
Circuit.from_braket(circuit)(BraketCircuit) -> CircuitImport from Braket circuit
Circuit.from_qiskit(circuit)(QuantumCircuit) -> CircuitImport from Qiskit circuit. Unsupported gates are auto-decomposed. Requires qiskit.
Circuit.from_cirq(circuit)(cirq.Circuit) -> CircuitImport from Cirq circuit. Only LineQubit circuits accepted. Requires cirq.
Circuit.from_pennylane(tape)(QuantumScript) -> CircuitImport from PennyLane tape. Only integer wires accepted. Requires pennylane.
Circuit.from_openqasm(qasm_string)(str) -> CircuitImport from OpenQASM 2.0 or 3.0 string. Auto-detects version. Requires qiskit.

Serialization:

MethodSignatureDescription
to_dict()() -> dictSerialize to dictionary (for Temporal transport)
Circuit.from_dict(data)(dict) -> CircuitReconstruct from dictionary

Simulation:

MethodSignatureDescription
simulate()() -> qf.StateRun on local QuantumFlow simulator

bell_state

def bell_state() -> Circuit

Create a Bell state circuit (|00> + |11>)/sqrt(2). Equivalent to Circuit().h(0).cnot(0, 1).

ghz_state

def ghz_state(num_qubits: int) -> Circuit

Create a GHZ state circuit for num_qubits qubits. Applies H to qubit 0, then CNOT chain.


marqov.device

MarqovDevice

Unified interface for quantum backend execution. Accepts any supported circuit type and automatically converts to the target backend’s native format.

from marqov import MarqovDevice, get_device device = get_device({"backend": "sv1", "device_arn": "...", "s3_bucket": "..."}) counts = device.run(circuit, shots=1000)

Constructor:

MarqovDevice(backend: str, params: dict) -> MarqovDevice
ParameterTypeDescription
backendstrBackend identifier (e.g., sv1, local, ionq-aria-1)
paramsdictExecution parameters (provider-specific)

Properties:

PropertyTypeDescription
backend_namestrBackend identifier string
is_simulatorboolTrue if targeting a simulator backend

Methods:

run

def run(self, circuit, shots: int = 1000) -> dict[str, int]

Execute a circuit and return measurement counts.

ParameterTypeDescription
circuitCircuit | str | BraketCircuit | QuantumCircuit | cirq.Circuit | QuantumScriptAny supported circuit type
shotsintNumber of measurement shots (default: 1000)

Returns: Dictionary mapping bitstring outcomes to counts (e.g., {"00": 512, "11": 488}).

Raises: TypeError if circuit type is not supported. ValueError if S3 configuration is missing for AWS devices.

Supported circuit types for auto-conversion:

  • marqov.Circuit (native)
  • str (OpenQASM)
  • braket.circuits.Circuit
  • qiskit.QuantumCircuit
  • cirq.Circuit
  • pennylane.tape.QuantumScript

get_device

def get_device(params: dict) -> MarqovDevice

Factory function to create a MarqovDevice from execution parameters.

ParameterTypeDescription
paramsdictMust contain backend key. Additional keys depend on provider.

Required params keys by provider:

ProviderRequired Keys
Local (local, marqov-sim)backend
AWS Braketbackend, device_arn, s3_bucket or s3_destination_folder
Azure Quantumbackend, azure_subscription_id, azure_resource_group, azure_workspace_name

Raises: ValueError if backend is missing.


marqov.executors

BaseExecutor (abstract)

Abstract base class for all quantum executors.

class BaseExecutor(ABC): async def execute(self, circuit: Circuit, shots: int = 1000, **kwargs) -> ExecutionResult: ... async def cancel(self, job_id: str) -> bool: ... @property def name(self) -> str: ...

ExecutionResult

Dataclass holding results from circuit execution.

@dataclass class ExecutionResult: counts: dict[str, int] backend: str execution_time_ms: float shots: int = 0 raw_result: Any = None metadata: dict[str, Any] = field(default_factory=dict)
AttributeTypeDescription
countsdict[str, int]Measurement outcome counts
backendstrBackend name
execution_time_msfloatExecution time in milliseconds
shotsintNumber of shots
raw_resultAnyBackend-specific raw result
metadatadictAdditional execution metadata

Properties:

PropertyTypeDescription
probabilitiesdict[str, float]Measurement probabilities computed from counts

LocalExecutor

Execute circuits on the QuantumFlow local state vector simulator. No cloud credentials required.

from marqov import LocalExecutor executor = LocalExecutor() result = await executor.execute(circuit, shots=1000)

Constructor:

LocalExecutor(config: LocalExecutorConfig | None = None)

LocalExecutorConfig fields:

FieldTypeDefaultDescription
seedint | NoneNoneRandom seed for reproducibility

BraketExecutor

Execute circuits on AWS Braket devices (simulators and QPUs).

from marqov.executors import BraketExecutor, BraketExecutorConfig config = BraketExecutorConfig( device_arn="arn:aws:braket:::device/quantum-simulator/amazon/sv1", s3_bucket="amazon-braket-my-bucket", ) executor = BraketExecutor(config) result = await executor.execute(circuit, shots=1000)

BraketExecutorConfig fields:

FieldTypeDefaultDescription
device_arnstr(required)Braket device ARN
s3_bucketstr(required)S3 bucket for task results
s3_prefixstr"marqov"S3 object prefix
aws_profilestr | NoneNoneAWS profile name
aws_regionstr | NoneNoneAWS region (inferred from ARN if omitted)
poll_interval_secondsfloat1.0Polling interval for task completion
timeout_secondsfloat | NoneNoneMax wait time (None = no timeout)

Additional methods:

MethodSignatureDescription
cancel(job_id)async (str) -> boolCancel a running Braket task by ARN
get_device_status()async () -> strGet device status (ONLINE, OFFLINE, RETIRED)
is_device_available()async () -> boolCheck if device is ONLINE
get_queue_depth()async () -> dict[str, int]Get queue depth info

AzureQuantumExecutor

Execute circuits on Azure Quantum backends (Quantinuum, PASQAL, IonQ, Rigetti).

AzureQuantumExecutorConfig fields:

FieldTypeDefaultDescription
subscription_idstr(required)Azure subscription ID
resource_groupstr(required)Azure resource group
workspace_namestr(required)Azure Quantum workspace name
locationstr(required)Azure region
targetstr(required)Target device name
frameworkstr"qiskit"Framework to use (qiskit or cirq)
timeout_secondsfloat300.0Max wait time
poll_interval_secondsfloat2.0Polling interval

ExecutorFactory

Factory for creating executors based on provider configuration.

from marqov.executors import ExecutorFactory executor = ExecutorFactory.create_executor("sv1", { "provider": "AWS Braket", "device_arn": "arn:aws:braket:::device/quantum-simulator/amazon/sv1", "s3_bucket": "my-bucket", })

Class methods:

MethodSignatureDescription
create_executor(backend_slug, backend_config)(str, dict) -> BaseExecutorCreate executor for given backend
get_supported_providers()() -> list[str]List supported providers
is_provider_supported(provider)(str) -> boolCheck if provider is supported

Supported providers: AWS Braket, Azure Quantum, Quantum Brilliance, Local. Coming soon: IBM Quantum, IonQ Direct.


marqov.workflows

task

Decorator to mark a function as a task (unit of work in a workflow).

from marqov import task @task def add(x, y): return x + y # With options: @task(executor="braket", retries=3, timeout=600) async def measure(circuit): return await executor.run(circuit)

Parameters (when used with options):

ParameterTypeDefaultDescription
namestr | NoneFunction nameDisplay name for the task
executorstr"local"Executor to use (local, braket)
retriesint0Retry attempts on failure
timeoutfloat300.0Maximum execution time in seconds

Behavior:

  • Outside a @workflow: executes normally and returns the result.
  • Inside a @workflow: returns a TaskProxy, registering the call in the transport graph for deferred execution.

workflow

Decorator to mark a function as a workflow (composition of tasks).

from marqov import task, workflow @workflow def vqe_step(theta): circuit = build(theta) z0 = measure(circuit, "ZI") # runs in parallel z1 = measure(circuit, "IZ") # runs in parallel return compute(z0, z1) dispatch = vqe_step(0.5) result = await dispatch.run(client)

Parameters:

ParameterTypeDefaultDescription
namestr | NoneFunction nameDisplay name for the workflow

Returns: WorkflowDispatch object.

WorkflowDispatch

Handle for dispatching a workflow to Temporal.

dispatch = my_workflow(args)

Methods:

MethodSignatureDescription
visualize()() -> strReturn DOT format graph visualization
get_parallel_groups()() -> list[list[str]]Get groups of tasks that can run in parallel
run(client, task_queue="marqov-workflows")async (Any, str) -> AnyExecute on Temporal and wait for result
run_with_ids(client, task_queue="marqov-workflows")async (Any, str) -> tuple[Any, str, str]Execute and return (result, workflow_id, run_id)
dispatch(client, task_queue="marqov-workflows")async (Any, str) -> strSubmit without waiting; returns workflow ID

Attributes:

AttributeTypeDescription
graphTransportGraphThe task dependency graph
namestrWorkflow name

TemporalConfig

Configuration for Temporal connection.

from marqov import TemporalConfig config = TemporalConfig( host="localhost", port=7233, namespace="default", task_queue="marqov-workflows", ) print(config.address) # "localhost:7233"
AttributeTypeDefaultDescription
hoststr"localhost"Temporal server host
portint7233Temporal server port
namespacestr"default"Temporal namespace
task_queuestr"marqov-workflows"Default task queue name

Properties:

PropertyTypeDescription
addressstr"{host}:{port}"

create_worker

async def create_worker(client, task_queue: str = "marqov-workflows") -> Worker

Create a Temporal worker that processes Marqov workflows. The worker registers the JobWorkflow and execute_task activity.

TaskConfig

Configuration for an individual task.

AttributeTypeDefaultDescription
namestrTask display name
executorstr"local"Executor identifier
retriesint0Retry attempts
timeout_secondsfloat300.0Maximum execution time

Deprecated Aliases

The following are deprecated and will be removed in v0.3.0:

DeprecatedReplacement
@electron@task
@lattice@workflow
LatticeDispatchWorkflowDispatch
LatticeWorkflowJobWorkflow
ElectronProxyTaskProxy
execute_electronexecute_task
Last updated on