Source code for ddsim.unitarysimulator

"""Backend for DDSIM Unitary Simulator."""

import logging
import time
import uuid
import warnings
from math import log2, sqrt
from typing import List, Union

import numpy as np
from qiskit import QiskitError, QuantumCircuit
from qiskit.compiler import assemble
from qiskit.providers import BackendV1, Options
from qiskit.providers.models import QasmBackendConfiguration
from qiskit.qobj import PulseQobj, QasmQobj, QasmQobjExperiment, Qobj
from qiskit.result import Result
from qiskit.utils.multiprocessing import local_hardware_info

from mqt.ddsim import ConstructionMode, UnitarySimulator, __version__, get_matrix
from mqt.ddsim.error import DDSIMError
from mqt.ddsim.job import DDSIMJob

logger = logging.getLogger(__name__)


[docs]class UnitarySimulatorBackend(BackendV1): """Decision diagram-based unitary simulator.""" def __init__(self, configuration=None, provider=None, **fields): conf = { "backend_name": "unitary_simulator", "backend_version": __version__, "n_qubits": min(24, int(log2(sqrt(local_hardware_info()["memory"] * (1024**3) / 16)))), "url": "https://github.com/cda-tum/ddsim", "simulator": True, "local": True, "conditional": False, "open_pulse": False, "memory": False, "max_shots": 1000000000, "coupling_map": None, "description": "MQT DDSIM C++ Unitary Simulator", "basis_gates": [ "gphase", "id", "u0", "u1", "u2", "u3", "cu3", "x", "cx", "ccx", "mcx_gray", "mcx_recursive", "mcx_vchain", "y", "cy", "z", "cz", "h", "ch", "s", "sdg", "t", "tdg", "rx", "crx", "mcrx", "ry", "cry", "mcry", "rz", "crz", "mcrz", "p", "cp", "cu1", "mcphase", "sx", "csx", "sxdg", "swap", "cswap", "iswap", "dcx", "ecr", "rxx", "ryy", "rzz", "rzx", "xx_minus_yy", "xx_plus_yy", ], "gates": [], } super().__init__( configuration=(configuration or QasmBackendConfiguration.from_dict(conf)), provider=provider, **fields ) @classmethod def _default_options(cls): return Options(shots=1, mode="recursive", parameter_binds=None)
[docs] def run(self, quantum_circuits: Union[QuantumCircuit, List[QuantumCircuit]], **options): if isinstance(quantum_circuits, (QasmQobj, PulseQobj)): msg = "QasmQobj and PulseQobj are not supported." raise QiskitError(msg) if not isinstance(quantum_circuits, list): quantum_circuits = [quantum_circuits] out_options = {} for key in options: if not hasattr(self.options, key): warnings.warn("Option %s is not used by this backend" % key, UserWarning, stacklevel=2) else: out_options[key] = options[key] circuit_qobj = assemble(quantum_circuits, self, **out_options) job_id = str(uuid.uuid4()) local_job = DDSIMJob(self, job_id, self._run_job, circuit_qobj, **options) local_job.submit() return local_job
def _run_job(self, job_id, qobj_instance: Qobj, **options): self._validate(qobj_instance) start = time.time() result_list = [self.run_experiment(qobj_exp, **options) for qobj_exp in qobj_instance.experiments] end = time.time() result = { "backend_name": self.configuration().backend_name, "backend_version": self.configuration().backend_version, "qobj_id": qobj_instance.qobj_id, "job_id": job_id, "results": result_list, "status": "COMPLETED", "success": True, "time_taken": (end - start), "header": qobj_instance.header.to_dict(), } return Result.from_dict(result)
[docs] def run_experiment(self, qobj_experiment: QasmQobjExperiment, **options): start = time.time() seed = options.get("seed", -1) mode = options.get("mode", "recursive") if mode == "sequential": construction_mode = ConstructionMode.sequential elif mode == "recursive": construction_mode = ConstructionMode.recursive else: msg = ( f"Construction mode {mode} not supported by DDSIM unitary simulator. Available modes are " "'recursive' and 'sequential'" ) raise DDSIMError(msg) sim = UnitarySimulator(qobj_experiment, seed=seed, mode=construction_mode) sim.construct() # Add extract resulting matrix from final DD and write data unitary = np.zeros((2**qobj_experiment.header.n_qubits, 2**qobj_experiment.header.n_qubits), dtype=complex) get_matrix(sim, unitary) data = { "unitary": unitary, "construction_time": sim.get_construction_time(), "max_dd_nodes": sim.get_max_node_count(), "dd_nodes": sim.get_final_node_count(), } end = time.time() return { "name": qobj_experiment.header.name, "shots": 1, "data": data, "status": "DONE", "success": True, "time_taken": (end - start), "header": qobj_experiment.header.to_dict(), }
def _validate(self, qobj): """Semantic validations of the qobj which cannot be done via schemas. Some of these may later move to backend schemas. 1. No shots 2. No measurements in the middle """ n_qubits = qobj.config.n_qubits max_qubits = self.configuration().n_qubits if n_qubits > max_qubits: msg = f"Number of qubits {n_qubits} is greater than maximum ({max_qubits}) for '{self.name()}'." raise DDSIMError(msg) if hasattr(qobj.config, "shots") and qobj.config.shots != 1: logger.info('"%s" only supports 1 shot. Setting shots=1.', self.name()) qobj.config.shots = 1 for experiment in qobj.experiments: name = experiment.header.name if getattr(experiment.config, "shots", 1) != 1: logger.info('"%s" only supports 1 shot. Setting shots=1 for circuit "%s".', self.name(), name) experiment.config.shots = 1 for operation in experiment.instructions: if operation.name in ["measure", "reset"]: msg = f"Unsupported '{self.name()}' instruction '{operation.name}' in circuit '{name}'." raise DDSIMError(msg)