Source code for pykka._threading

from __future__ import annotations

import queue
import sys
import threading
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, Optional, TypeVar

from pykka import Actor, Future, Timeout

if TYPE_CHECKING:
    from pykka._actor import ActorInbox
    from pykka._envelope import Envelope
    from pykka._types import OptExcInfo

__all__ = ["ThreadingActor", "ThreadingFuture"]


T = TypeVar("T")


class ThreadingFutureResult(NamedTuple):
    value: Optional[Any] = None
    exc_info: Optional[OptExcInfo] = None


[docs]class ThreadingFuture(Future[T]): """Implementation of :class:`Future` for use with regular Python threads`. The future is implemented using a :class:`queue.Queue`. The future does *not* make a copy of the object which is :meth:`set() <pykka.Future.set>` on it. It is the setters responsibility to only pass immutable objects or make a copy of the object before setting it on the future. .. versionchanged:: 0.14 Previously, the encapsulated value was a copy made with :func:`copy.deepcopy`, unless the encapsulated value was a future, in which case the original future was encapsulated. """ def __init__(self) -> None: super().__init__() self._queue: queue.Queue[ThreadingFutureResult] = queue.Queue(maxsize=1) self._result: Optional[ThreadingFutureResult] = None
[docs] def get( self, *, timeout: Optional[float] = None, ) -> Any: try: return super().get(timeout=timeout) except NotImplementedError: pass try: if self._result is None: self._result = self._queue.get(True, timeout) if self._result.exc_info is not None: (exc_type, exc_value, exc_traceback) = self._result.exc_info assert exc_type is not None if exc_value is None: exc_value = exc_type() if exc_value.__traceback__ is not exc_traceback: raise exc_value.with_traceback(exc_traceback) raise exc_value except queue.Empty: raise Timeout(f"{timeout} seconds") from None else: return self._result.value
[docs] def set( self, value: Optional[Any] = None, ) -> None: self._queue.put(ThreadingFutureResult(value=value), block=False)
[docs] def set_exception( self, exc_info: Optional[OptExcInfo] = None, ) -> None: assert exc_info is None or len(exc_info) == 3 if exc_info is None: exc_info = sys.exc_info() self._queue.put(ThreadingFutureResult(exc_info=exc_info))
[docs]class ThreadingActor(Actor): """Implementation of :class:`Actor` using regular Python threads.""" use_daemon_thread: ClassVar[bool] = False """ A boolean value indicating whether this actor is executed on a thread that is a daemon thread (:class:`True`) or not (:class:`False`). This must be set before :meth:`pykka.Actor.start` is called, otherwise :exc:`RuntimeError` is raised. The entire Python program exits when no alive non-daemon threads are left. This means that an actor running on a daemon thread may be interrupted at any time, and there is no guarantee that cleanup will be done or that :meth:`pykka.Actor.on_stop` will be called. Actors do not inherit the daemon flag from the actor that made it. It always has to be set explicitly for the actor to run on a daemonic thread. """ @staticmethod def _create_actor_inbox() -> ActorInbox: inbox: queue.Queue[Envelope[Any]] = queue.Queue() return inbox @staticmethod def _create_future() -> Future[Any]: return ThreadingFuture() def _start_actor_loop(self) -> None: thread = threading.Thread(target=self._actor_loop) thread.name = thread.name.replace("Thread", self.__class__.__name__) thread.daemon = self.use_daemon_thread thread.start()