Source code for ddsim.pathqasmsimulator

"""Backend for DDSIM Task-Based Simulator."""

import logging
import pathlib
import time
import uuid
import warnings
from typing import List, Union

from qiskit import QiskitError, QuantumCircuit
from qiskit.compiler import assemble
from qiskit.providers import BackendV1, Options
from qiskit.providers.models import BackendConfiguration, BackendStatus
from qiskit.qobj import PulseQobj, QasmQobj, QasmQobjExperiment, Qobj
from qiskit.result import Result

from mqt.ddsim import PathCircuitSimulator, PathSimulatorConfiguration, PathSimulatorMode, __version__
from mqt.ddsim.job import DDSIMJob

logger = logging.getLogger(__name__)


[docs]def read_tensor_network_file(filename): import numpy as np import pandas as pd import quimb.tensor as qtn df = pd.read_json(filename) tensors = [] for i in range(len(df["tensors"])): tens_data = [complex(real, imag) for [real, imag] in df["tensors"][i][3]] tens_shape = df["tensors"][i][2] data = np.array(tens_data).reshape(tens_shape) inds = df["tensors"][i][1] tags = df["tensors"][i][0] tensors.append(qtn.Tensor(data, inds, tags, left_inds=inds[: len(inds) // 2])) return tensors
[docs]def create_tensor_network(qc): import quimb.tensor as qtn import sparse from mqt.ddsim import dump_tensor_network if isinstance(qc, QuantumCircuit): filename = qc.name + "_" + str(qc.num_qubits) + ".tensor" nqubits = qc.num_qubits else: filename = "tensor.tensor" nqubits = qc.header.n_qubits dump_tensor_network(qc, filename) tensors = read_tensor_network_file(filename) pathlib.Path(filename).unlink() # add the zero state tensor |0...0> at the beginning shape = [2] * nqubits data = sparse.COO(coords=[[0]] * nqubits, data=1, shape=shape, sorted=True, has_duplicates=False) inds = ["q" + str(i) + "_0" for i in range(nqubits)] tags = ["Q" + str(i) for i in range(nqubits)] tensors.insert(0, qtn.Tensor(data=data, inds=inds, tags=tags)) # using the following lines instead would allow much greater flexibility, # but is not supported for DD-based simulation at the moment # add the zero state tensor |0>...|0> at the beginning # shape = [2] # data = np.zeros(2) # data[0] = 1.0 # for i in range(qc.num_qubits): # inds = ['q'+str(i)+'_0'] # tags = ['Q'+str(i)] # tensors.insert(0, qtn.Tensor(data=data.reshape(shape), inds=inds, tags=tags)) return qtn.TensorNetwork(tensors)
[docs]def get_simulation_path( qc, max_time: int = 60, max_repeats: int = 1024, parallel_runs: int = 1, dump_path: bool = True, plot_ring: bool = False, ): import cotengra as ctg from opt_einsum.paths import linear_to_ssa tn = create_tensor_network(qc) opt = ctg.HyperOptimizer( max_time=max_time, max_repeats=max_repeats, progbar=True, parallel=parallel_runs, minimize="flops" ) info = tn.contract(all, get="path-info", optimize=opt) path = linear_to_ssa(info.path) if dump_path: filename = qc.name + "_" + str(qc.num_qubits) + ".path" if isinstance(qc, QuantumCircuit) else "simulation.path" with pathlib.Path(filename).open("w") as file: file.write(str(path)) if plot_ring: fig = opt.get_tree().plot_ring(return_fig=True) fig.savefig("simulation_ring.svg", bbox_inches="tight") return path
[docs]class PathQasmSimulatorBackend(BackendV1): """Python interface to MQT DDSIM Simulation Path Framework""" SHOW_STATE_VECTOR = False @classmethod def _default_options(cls) -> Options: return Options( shots=None, parameter_binds=None, simulator_seed=None, pathsim_configuration=PathSimulatorConfiguration(), mode=None, bracket_size=None, alternating_start=None, gate_cost=None, seed=None, cotengra_max_time=60, cotengra_max_repeats=1024, cotengra_plot_ring=False, cotengra_dump_path=True, ) def __init__(self, configuration=None, provider=None): conf = { "backend_name": "path_sim_qasm_simulator", "backend_version": __version__, "url": "https://github.com/cda-tum/ddsim", "simulator": True, "local": True, "description": "MQT DDSIM C++ simulation path framework", "basis_gates": [ "gphase", "id", "u0", "u1", "u2", "u3", "cu3", "x", "cx", "ccx", "mcx_gray", "mcx_recursive", "mcx_vchain", "mcx", "y", "cy", "z", "cz", "h", "ch", "s", "sdg", "t", "tdg", "rx", "crx", "mcrx", "ry", "cry", "mcry", "rz", "crz", "mcrz", "p", "cp", "cu1", "mcphase", "sx", "csx", "sxdg", "swap", "cswap", "iswap", "dcx", "ecr", "rxx", "ryy", "rzz", "rzx", "xx_minus_yy", "xx_plus_yy", "snapshot", ], "memory": False, "n_qubits": 128, "coupling_map": None, "conditional": False, "max_shots": 1000000000, "open_pulse": False, "gates": [], } super().__init__(configuration=configuration or BackendConfiguration.from_dict(conf), provider=provider)
[docs] def run(self, quantum_circuits: Union[QuantumCircuit, List[QuantumCircuit]], **options): if isinstance(quantum_circuits, (QasmQobj, PulseQobj)): msg = "QasmQobj and PulseQobj are not supported." raise QiskitError(msg) if not isinstance(quantum_circuits, list): quantum_circuits = [quantum_circuits] out_options = {} for key in options: if not hasattr(self.options, key): warnings.warn("Option %s is not used by this backend" % key, UserWarning, stacklevel=2) else: out_options[key] = options[key] circuit_qobj = assemble(quantum_circuits, self, **out_options) job_id = str(uuid.uuid4()) local_job = DDSIMJob(self, job_id, self._run_job, circuit_qobj, **options) local_job.submit() return local_job
def _run_job(self, job_id, qobj_instance: Qobj, **options): self._validate(qobj_instance) start = time.time() result_list = [self.run_experiment(qobj_exp, **options) for qobj_exp in qobj_instance.experiments] end = time.time() result = { "backend_name": self.configuration().backend_name, "backend_version": self.configuration().backend_version, "qobj_id": qobj_instance.qobj_id, "job_id": job_id, "results": result_list, "status": "COMPLETED", "success": True, "time_taken": (end - start), "header": qobj_instance.header.to_dict(), } return Result.from_dict(result)
[docs] def run_experiment(self, qobj_experiment: QasmQobjExperiment, **options): start_time = time.time() pathsim_configuration = options.get("pathsim_configuration", PathSimulatorConfiguration()) mode = options.get("mode") if mode is not None: pathsim_configuration.mode = PathSimulatorMode(mode) bracket_size = options.get("bracket_size") if bracket_size is not None: pathsim_configuration.bracket_size = bracket_size alternating_start = options.get("alternating_start") if alternating_start is not None: pathsim_configuration.alternating_start = alternating_start gate_cost = options.get("gate_cost") if gate_cost is not None: pathsim_configuration.gate_cost = gate_cost seed = options.get("seed") if seed is not None: pathsim_configuration.seed = seed sim = PathCircuitSimulator(qobj_experiment, config=pathsim_configuration) # determine the contraction path using cotengra in case this is requested if pathsim_configuration.mode == PathSimulatorMode.cotengra: max_time = options.get("cotengra_max_time", 60) max_repeats = options.get("cotengra_max_repeats", 1024) dump_path = options.get("cotengra_dump_path", False) plot_ring = options.get("cotengra_plot_ring", False) path = get_simulation_path( qobj_experiment, max_time=max_time, max_repeats=max_repeats, dump_path=dump_path, plot_ring=plot_ring ) sim.set_simulation_path(path, False) shots = options.get("shots", 1024) setup_time = time.time() counts = sim.simulate(shots) end_time = time.time() counts_hex = {hex(int(result, 2)): count for result, count in counts.items()} result = { "header": qobj_experiment.header.to_dict(), "name": qobj_experiment.header.name, "status": "DONE", "time_taken": end_time - start_time, "time_setup": setup_time - start_time, "time_sim": end_time - setup_time, "config": pathsim_configuration, "shots": shots, "data": {"counts": counts_hex}, "success": True, } if self.SHOW_STATE_VECTOR: result["data"]["statevector"] = sim.get_vector() return result
def _validate(self, _quantum_circuit): return
[docs] def status(self): """Return backend status. Returns: BackendStatus: the status of the backend. """ return BackendStatus( backend_name=self.name(), backend_version=self.configuration().backend_version, operational=True, pending_jobs=0, status_msg="", )