Source code for ddsim.hybridqasmsimulator

"""Backend for DDSIM Hybrid Schrodinger-Feynman Simulator."""

import logging
import time
import uuid
import warnings
from math import log2
from typing import List, Union

from qiskit import QiskitError, QuantumCircuit
from qiskit.compiler import assemble
from qiskit.providers import BackendV1, Options
from qiskit.providers.models import BackendConfiguration, BackendStatus
from qiskit.qobj import PulseQobj, QasmQobj, QasmQobjExperiment, Qobj
from qiskit.result import Result
from qiskit.utils.multiprocessing import local_hardware_info

from mqt.ddsim import HybridCircuitSimulator, HybridMode, __version__
from mqt.ddsim.error import DDSIMError
from mqt.ddsim.job import DDSIMJob

logger = logging.getLogger(__name__)


[docs]class HybridQasmSimulatorBackend(BackendV1): """Python interface to MQT DDSIM Hybrid Schrodinger-Feynman Simulator""" SHOW_STATE_VECTOR = False @classmethod def _default_options(cls) -> Options: return Options( shots=None, parameter_binds=None, simulator_seed=None, mode="amplitude", nthreads=local_hardware_info()["cpus"], ) def __init__(self, configuration=None, provider=None): conf = { "backend_name": "hybrid_qasm_simulator", "backend_version": __version__, "url": "https://github.com/cda-tum/ddsim", "simulator": True, "local": True, "description": "MQT DDSIM Hybrid Schrodinger-Feynman C++ simulator", "basis_gates": [ "gphase", "id", "u0", "u1", "u2", "u3", "cu3", "x", "cx", "y", "cy", "z", "cz", "h", "ch", "s", "sdg", "t", "tdg", "rx", "crx", "ry", "cry", "rz", "crz", "p", "cp", "cu1", "sx", "csx", "sxdg", # 'swap', 'cswap', 'iswap', "snapshot", ], "memory": False, "n_qubits": 128, "coupling_map": None, "conditional": False, "max_shots": 1000000000, "open_pulse": False, "gates": [], } super().__init__(configuration=configuration or BackendConfiguration.from_dict(conf), provider=provider)
[docs] def run(self, quantum_circuits: Union[QuantumCircuit, List[QuantumCircuit]], **options): if isinstance(quantum_circuits, (QasmQobj, PulseQobj)): msg = "QasmQobj and PulseQobj are not supported." raise QiskitError(msg) if not isinstance(quantum_circuits, list): quantum_circuits = [quantum_circuits] out_options = {} for key in options: if not hasattr(self.options, key): warnings.warn("Option %s is not used by this backend" % key, UserWarning, stacklevel=2) else: out_options[key] = options[key] circuit_qobj = assemble(quantum_circuits, self, **out_options) job_id = str(uuid.uuid4()) local_job = DDSIMJob(self, job_id, self._run_job, circuit_qobj, **options) local_job.submit() return local_job
def _run_job(self, job_id, qobj_instance: Qobj, **options): self._validate(qobj_instance) start = time.time() result_list = [self.run_experiment(qobj_exp, **options) for qobj_exp in qobj_instance.experiments] end = time.time() result = { "backend_name": self.configuration().backend_name, "backend_version": self.configuration().backend_version, "qobj_id": qobj_instance.qobj_id, "job_id": job_id, "results": result_list, "status": "COMPLETED", "success": True, "time_taken": (end - start), "header": qobj_instance.header.to_dict(), } return Result.from_dict(result)
[docs] def run_experiment(self, qobj_experiment: QasmQobjExperiment, **options): start_time = time.time() seed = options.get("seed", -1) mode = options.get("mode", "amplitude") nthreads = int(options.get("nthreads", local_hardware_info()["cpus"])) if mode == "amplitude": hybrid_mode = HybridMode.amplitude max_qubits = int(log2(local_hardware_info()["memory"] * (1024**3) / 16)) algorithm_qubits = qobj_experiment.header.n_qubits if algorithm_qubits > max_qubits: msg = "Not enough memory available to simulate the circuit even on a single thread" raise DDSIMError(msg) qubit_diff = max_qubits - algorithm_qubits nthreads = int(min(2**qubit_diff, nthreads)) elif mode == "dd": hybrid_mode = HybridMode.DD else: msg = f"Simulation mode{mode} not supported by hybrid simulator. Available modes are 'amplitude' and 'dd'." raise DDSIMError(msg) sim = HybridCircuitSimulator(qobj_experiment, seed=seed, mode=hybrid_mode, nthreads=nthreads) shots = options.get("shots", 1024) if self.SHOW_STATE_VECTOR and shots > 0: logger.info( "Statevector can only be shown if shots == 0 when using the amplitude hybrid simulation mode. Setting shots=0." ) shots = 0 counts = sim.simulate(shots) end_time = time.time() counts_hex = {hex(int(result, 2)): count for result, count in counts.items()} result = { "header": qobj_experiment.header.to_dict(), "name": qobj_experiment.header.name, "status": "DONE", "time_taken": end_time - start_time, "seed": seed, "mode": mode, "nthreads": nthreads, "shots": shots, "data": {"counts": counts_hex}, "success": True, } if self.SHOW_STATE_VECTOR: if sim.get_mode() == HybridMode.DD: result["data"]["statevector"] = sim.get_vector() else: result["data"]["statevector"] = sim.get_final_amplitudes() return result
def _validate(self, _quantum_circuit): return
[docs] def status(self): """Return backend status. Returns: BackendStatus: the status of the backend. """ return BackendStatus( backend_name=self.name(), backend_version=self.configuration().backend_version, operational=True, pending_jobs=0, status_msg="", )