Source code for mqt.ddsim.job

from __future__ import annotations

import functools
from concurrent import futures
from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence, Union

from qiskit.providers import JobError, JobStatus, JobV1

if TYPE_CHECKING:
    from qiskit import QuantumCircuit
    from qiskit.circuit import Parameter
    from qiskit.circuit.parameterexpression import ParameterValueType
    from qiskit.providers import BackendV2
    from qiskit.result import Result

    Parameters = Union[Mapping[Parameter, ParameterValueType], Sequence[ParameterValueType]]


[docs] def requires_submit(func: Callable[..., Any]) -> Callable[..., Any]: """Decorator to ensure that a submit has been performed before calling the method. Args: func (callable): test function to be decorated. Returns: callable: the decorated function. """ @functools.wraps(func) def _wrapper(self: DDSIMJob, *args: Any, **kwargs: Any) -> Any: # noqa: ANN401 if self._future is None: msg = "Job not submitted yet!. You have to .submit() first!" raise JobError(msg) return func(self, *args, **kwargs) return _wrapper
[docs] class DDSIMJob(JobV1): # type: ignore[misc] """DDSIMJob class. Attributes: _executor (futures.Executor): executor to handle asynchronous jobs """ _executor = futures.ThreadPoolExecutor(max_workers=1) def __init__( self, backend: BackendV2, job_id: str, fn: Callable[..., Result], experiments: Sequence[QuantumCircuit], parameter_values: Sequence[Parameters] | None, **args: dict[str, Any], ) -> None: super().__init__(backend, job_id) self._fn = fn self._experiments = experiments self._parameter_values = parameter_values self._args = args self._future: futures.Future[Result] | None = None
[docs] def submit(self) -> None: """Submit the job to the backend for execution. Raises: JobError: if trying to re-submit the job. """ if self._future is not None: msg = "Job was already submitted!" raise JobError(msg) self._future = self._executor.submit( self._fn, self._job_id, self._experiments, self._parameter_values, **self._args, )
[docs] @requires_submit def result(self, timeout: float | None = None) -> Result: # pylint: disable=arguments-differ """Get job result. The behavior is the same as the underlying concurrent Future objects, https://docs.python.org/3/library/concurrent.futures.html#future-objects. Args: timeout (float): number of seconds to wait for results. Returns: qiskit.Result: Result object Raises: concurrent.futures.TimeoutError: if timeout occurred. concurrent.futures.CancelledError: if job cancelled before completed. """ assert self._future is not None return self._future.result(timeout=timeout)
[docs] @requires_submit def cancel(self) -> bool: """Attempt to cancel the job.""" assert self._future is not None return self._future.cancel()
[docs] @requires_submit def status(self) -> JobStatus: """Gets the status of the job by querying the Python's future. Returns: JobStatus: The current JobStatus Raises: JobError: If the future is in unexpected state concurrent.futures.TimeoutError: if timeout occurred. """ # The order is important here assert self._future is not None if self._future.running(): return JobStatus.RUNNING if self._future.cancelled(): return JobStatus.CANCELLED if self._future.done(): return JobStatus.DONE if self._future.exception() is None else JobStatus.ERROR # Note: There is an undocumented Future state: PENDING, that seems to show up when # the job is enqueued, waiting for someone to pick it up. We need to deal with this # state but there's no public API for it, so we are assuming that if the job is not # in any of the previous states, is PENDING, ergo INITIALIZING for us. return JobStatus.INITIALIZING
[docs] def backend(self) -> BackendV2 | None: """Return the instance of the backend used for this job.""" return self._backend