[docs]defrequires_submit(func:Callable[...,Any])->Callable[...,Any]:"""Decorator to ensure that a submit has been performed before calling the method. Args: func (callable): test function to be decorated. Returns: callable: the decorated function. """@functools.wraps(func)def_wrapper(self:DDSIMJob,*args:Any,**kwargs:Any)->Any:# noqa: ANN401ifself._futureisNone:msg="Job not submitted yet!. You have to .submit() first!"raiseJobError(msg)returnfunc(self,*args,**kwargs)return_wrapper
[docs]classDDSIMJob(JobV1):# type: ignore[misc]"""DDSIMJob class. Attributes: _executor (futures.Executor): executor to handle asynchronous jobs """_executor=futures.ThreadPoolExecutor(max_workers=1)def__init__(self,backend:BackendV2,job_id:str,fn:Callable[...,Result],experiments:Sequence[QuantumCircuit],parameter_values:Sequence[Parameters]|None,**args:dict[str,Any],)->None:super().__init__(backend,job_id)self._fn=fnself._experiments=experimentsself._parameter_values=parameter_valuesself._args=argsself._future:futures.Future[Result]|None=None
[docs]defsubmit(self)->None:"""Submit the job to the backend for execution. Raises: JobError: if trying to re-submit the job. """ifself._futureisnotNone:msg="Job was already submitted!"raiseJobError(msg)self._future=self._executor.submit(self._fn,self._job_id,self._experiments,self._parameter_values,**self._args,)
[docs]@requires_submitdefresult(self,timeout:float|None=None)->Result:# pylint: disable=arguments-differ"""Get job result. The behavior is the same as the underlying concurrent Future objects, https://docs.python.org/3/library/concurrent.futures.html#future-objects. Args: timeout (float): number of seconds to wait for results. Returns: qiskit.Result: Result object Raises: concurrent.futures.TimeoutError: if timeout occurred. concurrent.futures.CancelledError: if job cancelled before completed. """assertself._futureisnotNonereturnself._future.result(timeout=timeout)
[docs]@requires_submitdefcancel(self)->bool:"""Attempt to cancel the job."""assertself._futureisnotNonereturnself._future.cancel()
[docs]@requires_submitdefstatus(self)->JobStatus:"""Gets the status of the job by querying the Python's future. Returns: JobStatus: The current JobStatus Raises: JobError: If the future is in unexpected state concurrent.futures.TimeoutError: if timeout occurred. """# The order is important hereassertself._futureisnotNoneifself._future.running():returnJobStatus.RUNNINGifself._future.cancelled():returnJobStatus.CANCELLEDifself._future.done():returnJobStatus.DONEifself._future.exception()isNoneelseJobStatus.ERROR# Note: There is an undocumented Future state: PENDING, that seems to show up when# the job is enqueued, waiting for someone to pick it up. We need to deal with this# state but there's no public API for it, so we are assuming that if the job is not# in any of the previous states, is PENDING, ergo INITIALIZING for us.returnJobStatus.INITIALIZING
[docs]defbackend(self)->BackendV2|None:"""Return the instance of the backend used for this job."""returnself._backend