Skip to content

Actors

pykka.Actor

Bases: ABC

An actor is an execution unit that executes concurrently with other actors.

To create an actor:

  1. subclass one of the Actor implementations:

  2. implement your methods, including __init__(), as usual,

  3. call Actor.start() on your actor class, passing the method any arguments for your constructor.

To stop an actor, call Actor.stop() or ActorRef.stop().

Example
import pykka

class MyActor(pykka.ThreadingActor):
    def __init__(self, my_arg=None):
        super().__init__()
        ... # My optional init code with access to start() arguments

    def on_start(self):
        ... # My optional setup code in same context as on_receive()

    def on_stop(self):
        ... # My optional cleanup code in same context as on_receive()

    def on_failure(self, exception_type, exception_value, traceback):
        ... # My optional cleanup code in same context as on_receive()

    def on_receive(self, message):
        ... # My optional message handling code for a plain actor

    def a_method(self, ...):
        ... # My regular method to be used through an ActorProxy

my_actor_ref = MyActor.start(my_arg=...)
my_actor_ref.stop()
Source code in src/pykka/_actor.py
class Actor(abc.ABC):
    """An actor is an execution unit that executes concurrently with other actors.

    To create an actor:

    1.  subclass one of the [`Actor`][pykka.Actor] implementations:

        - [`ThreadingActor`][pykka.ThreadingActor]

    2.  implement your methods, including `__init__()`, as usual,
    3.  call [`Actor.start()`][pykka.Actor.start] on your actor class,
        passing the method any arguments for your constructor.

    To stop an actor, call [`Actor.stop()`][pykka.Actor.stop] or
    [`ActorRef.stop()`][pykka.ActorRef.stop].

    Example:
        ```py
        import pykka

        class MyActor(pykka.ThreadingActor):
            def __init__(self, my_arg=None):
                super().__init__()
                ... # My optional init code with access to start() arguments

            def on_start(self):
                ... # My optional setup code in same context as on_receive()

            def on_stop(self):
                ... # My optional cleanup code in same context as on_receive()

            def on_failure(self, exception_type, exception_value, traceback):
                ... # My optional cleanup code in same context as on_receive()

            def on_receive(self, message):
                ... # My optional message handling code for a plain actor

            def a_method(self, ...):
                ... # My regular method to be used through an ActorProxy

        my_actor_ref = MyActor.start(my_arg=...)
        my_actor_ref.stop()
        ```

    """

    @classmethod
    def start(
        cls: type[A],
        *args: Any,
        **kwargs: Any,
    ) -> ActorRef[A]:
        """Start an actor.

        Starting an actor also registers it in the
        [`ActorRegistry`][pykka.ActorRegistry].

        Any arguments passed to [`start()`][pykka.Actor.start] will be passed on
        to the class constructor.

        Behind the scenes, the following is happening when you call
        [`start()`][pykka.Actor.start]:

        1.  The actor is created:

            1.  [`actor_urn`][pykka.Actor.actor_urn] is initialized with the
                assigned URN.
            2.  [`actor_inbox`][pykka.Actor.actor_inbox] is initialized with a
                new actor inbox.
            3.  [`actor_ref`][pykka.Actor.actor_ref] is initialized with a
                [`pykka.ActorRef`][pykka.ActorRef] object for safely
                communicating with the actor.
            4.  At this point, your `__init__()` code can run.

        2.  The actor is registered in [`pykka.ActorRegistry`][pykka.ActorRegistry].

        3.  The actor's receive loop is started.

        Returns:
            A ref which can be used to access the actor in a safe manner.

        """
        obj = cls(*args, **kwargs)
        assert obj.actor_ref is not None, (
            "Actor.__init__() have not been called. "
            "Did you forget to call super() in your override?"
        )
        ActorRegistry.register(obj.actor_ref)
        logger.debug(f"Starting {obj}")
        obj._start_actor_loop()
        return obj.actor_ref

    @staticmethod
    @abc.abstractmethod
    def _create_actor_inbox() -> ActorInbox:
        """Create an inbox for the actor.

        Internal method for implementors of new actor types.
        """
        msg = "Use a subclass of Actor"
        raise NotImplementedError(msg)

    @staticmethod
    @abc.abstractmethod
    def _create_future() -> Future[Any]:
        """Create a future for the actor.

        Internal method for implementors of new actor types.
        """
        msg = "Use a subclass of Actor"
        raise NotImplementedError(msg)

    @abc.abstractmethod
    def _start_actor_loop(self) -> None:
        """Create and start the actor's event loop.

        Internal method for implementors of new actor types.
        """
        msg = "Use a subclass of Actor"
        raise NotImplementedError(msg)

    actor_urn: str
    """
    The actor URN string is a universally unique identifier for the actor.

    It may be used for looking up a specific actor using
    [`ActorRegistry.get_by_urn()`][pykka.ActorRegistry.get_by_urn].
    """

    actor_inbox: ActorInbox
    """
    The actor's inbox.

    Use [`ActorRef.tell()`][pykka.ActorRef.tell],
    [`ActorRef.ask()`][pykka.ActorRef.ask], and friends to put messages in the
    inbox.
    """

    _actor_ref: ActorRef[Any]

    @property
    def actor_ref(self: A) -> ActorRef[A]:
        """The actor's [`ActorRef`][pykka.ActorRef] instance."""
        # This property only exists to improve the typing of the ActorRef.
        return self._actor_ref

    actor_stopped: threading.Event
    """
    A [`threading.Event`][threading.Event] representing whether or not the actor
    should continue processing messages.

    Use [`stop()`][pykka.Actor.stop] to change it.
    """

    def __init__(
        self,
        *_args: Any,
        **_kwargs: Any,
    ) -> None:
        """Create actor.

        Your are free to override `__init__()`, but you must call your
        superclass' `__init__()` to ensure that the fields
        [`actor_urn`][pykka.Actor.actor_urn],
        [`actor_inbox`][pykka.Actor.actor_inbox], and
        [`actor_ref`][pykka.Actor.actor_ref] are initialized.

        You can use `super()`:

            super().__init__()

        Or call you superclass directly:

            pykka.ThreadingActor.__init__(self)

        `__init__()` is called before the actor is started and registered
        in the [`ActorRegistry`][pykka.ActorRegistry].
        """
        self.actor_urn = uuid.uuid4().urn
        self.actor_inbox = self._create_actor_inbox()
        self.actor_stopped = threading.Event()

        self._actor_ref = ActorRef(self)

    def __str__(self) -> str:
        return f"{self.__class__.__name__} ({self.actor_urn})"

    def stop(self) -> None:
        """Stop the actor.

        It's equivalent to calling
        [`ActorRef.stop(block=False)`][pykka.ActorRef.stop].
        """
        self.actor_ref.tell(messages._ActorStop())  # noqa: SLF001

    def _stop(self) -> None:
        """Stop the actor immediately without processing the rest of the inbox."""
        ActorRegistry.unregister(self.actor_ref)
        self.actor_stopped.set()
        logger.debug(f"Stopped {self}")
        try:
            self.on_stop()
        except Exception:  # noqa: BLE001
            self._handle_failure(*sys.exc_info())

    def _actor_loop(self) -> None:
        """Run the actor's core loop.

        This is the method that will be executed by the thread or greenlet.
        """
        self._actor_loop_setup()
        self._actor_loop_running()
        self._actor_loop_teardown()

    def _actor_loop_setup(self) -> None:
        try:
            self.on_start()
        except Exception:  # noqa: BLE001
            self._handle_failure(*sys.exc_info())

    def _actor_loop_running(self) -> None:
        while not self.actor_stopped.is_set():
            envelope = self.actor_inbox.get()
            try:
                response = self._handle_receive(envelope.message)
                if envelope.reply_to is not None:
                    envelope.reply_to.set(response)
            except Exception:  # noqa: BLE001
                if envelope.reply_to is not None:
                    logger.info(
                        f"Exception returned from {self} to caller:",
                        exc_info=sys.exc_info(),
                    )
                    envelope.reply_to.set_exception()
                else:
                    self._handle_failure(*sys.exc_info())
                    try:
                        self.on_failure(*sys.exc_info())
                    except Exception:  # noqa: BLE001
                        self._handle_failure(*sys.exc_info())
            except BaseException:  # noqa: BLE001
                exception_value = sys.exc_info()[1]
                logger.debug(f"{exception_value!r} in {self}. Stopping all actors.")
                self._stop()
                ActorRegistry.stop_all()

    def _actor_loop_teardown(self) -> None:
        while not self.actor_inbox.empty():
            envelope = self.actor_inbox.get()
            if envelope.reply_to is not None:
                if isinstance(envelope.message, messages._ActorStop):  # noqa: SLF001
                    envelope.reply_to.set(None)
                else:
                    envelope.reply_to.set_exception(
                        exc_info=(
                            ActorDeadError,
                            ActorDeadError(
                                f"{self.actor_ref} stopped before handling the message"
                            ),
                            None,
                        )
                    )

    def on_start(self) -> None:  # noqa: B027
        """Run code at the beginning of the actor's life.

        Hook for doing any setup that should be done *after* the actor is
        started, but *before* it starts processing messages.

        /// note | Threading runtime
        For [`ThreadingActor`][pykka.ThreadingActor], this method is
        executed in the actor's own thread, while `__init__()` is executed
        in the thread that created the actor.
        ///

        If an exception is raised by this method the stack trace will be
        logged, and the actor will stop.
        """

    def on_stop(self) -> None:  # noqa: B027
        """Run code at the end of the actor's life.

        Hook for doing any cleanup that should be done *after* the actor has
        processed the last message, and *before* the actor stops.

        This hook is *not* called when the actor stops because of an unhandled
        exception. In that case, the [`on_failure()`][pykka.Actor.on_failure]
        hook is called instead.

        /// note | Threading runtime
        For [`ThreadingActor`][pykka.ThreadingActor] this method is executed
        in the actor's own thread, immediately before the thread exits.
        ///

        If an exception is raised by this method the stack trace will be
        logged, and the actor will stop.
        """

    def _handle_failure(
        self,
        exception_type: type[BaseException] | None,
        exception_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Log unexpected failures, unregisters and stops the actor."""
        logger.error(
            f"Unhandled exception in {self}:",
            exc_info=(exception_type, exception_value, traceback),  # type: ignore[arg-type]
        )
        ActorRegistry.unregister(self.actor_ref)
        self.actor_stopped.set()

    def on_failure(  # noqa: B027
        self,
        exception_type: type[BaseException] | None,
        exception_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Run code when an unhandled exception is raised.

        Hook for doing any cleanup *after* an unhandled exception is raised,
        and *before* the actor stops.

        /// note | Threading runtime
        For [`ThreadingActor`][pykka.ThreadingActor] this method is executed in
        the actor's own thread, immediately before the thread exits.
        ///

        The method's arguments are the relevant information from
        [`sys.exc_info()`][sys.exc_info].

        If an exception is raised by this method the stack trace will be
        logged, and the actor will stop.
        """

    def _handle_receive(self, message: Any) -> Any:
        """Handle messages sent to the actor."""
        if isinstance(message, messages._ActorStop):  # noqa: SLF001
            return self._stop()
        if isinstance(message, messages.ProxyCall):
            callee = get_attr_directly(self, message.attr_path)
            return callee(*message.args, **message.kwargs)
        if isinstance(message, messages.ProxyGetAttr):
            attr = get_attr_directly(self, message.attr_path)
            return attr
        if isinstance(message, messages.ProxySetAttr):
            parent_attr = get_attr_directly(self, message.attr_path[:-1])
            attr_name = message.attr_path[-1]
            return setattr(parent_attr, attr_name, message.value)
        return self.on_receive(message)

    def on_receive(self, message: Any) -> Any:
        """May be implemented for the actor to handle regular non-proxy messages.

        Args:
            message: the message to handle

        Returns:
            anything that should be sent as a reply to the sender

        """
        logger.warning(f"Unexpected message received by {self}: {message}")

actor_inbox instance-attribute

actor_inbox: ActorInbox = _create_actor_inbox()

The actor's inbox.

Use ActorRef.tell(), ActorRef.ask(), and friends to put messages in the inbox.

actor_ref property

actor_ref: ActorRef[A]

The actor's ActorRef instance.

actor_stopped instance-attribute

actor_stopped: Event = Event()

A threading.Event representing whether or not the actor should continue processing messages.

Use stop() to change it.

actor_urn instance-attribute

actor_urn: str = urn

The actor URN string is a universally unique identifier for the actor.

It may be used for looking up a specific actor using ActorRegistry.get_by_urn().

__init__

__init__(*_args: Any, **_kwargs: Any) -> None

Create actor.

Your are free to override __init__(), but you must call your superclass' __init__() to ensure that the fields actor_urn, actor_inbox, and actor_ref are initialized.

You can use super():

super().__init__()

Or call you superclass directly:

pykka.ThreadingActor.__init__(self)

__init__() is called before the actor is started and registered in the ActorRegistry.

Source code in src/pykka/_actor.py
def __init__(
    self,
    *_args: Any,
    **_kwargs: Any,
) -> None:
    """Create actor.

    Your are free to override `__init__()`, but you must call your
    superclass' `__init__()` to ensure that the fields
    [`actor_urn`][pykka.Actor.actor_urn],
    [`actor_inbox`][pykka.Actor.actor_inbox], and
    [`actor_ref`][pykka.Actor.actor_ref] are initialized.

    You can use `super()`:

        super().__init__()

    Or call you superclass directly:

        pykka.ThreadingActor.__init__(self)

    `__init__()` is called before the actor is started and registered
    in the [`ActorRegistry`][pykka.ActorRegistry].
    """
    self.actor_urn = uuid.uuid4().urn
    self.actor_inbox = self._create_actor_inbox()
    self.actor_stopped = threading.Event()

    self._actor_ref = ActorRef(self)

on_failure

on_failure(
    exception_type: type[BaseException] | None,
    exception_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Run code when an unhandled exception is raised.

Hook for doing any cleanup after an unhandled exception is raised, and before the actor stops.

Threading runtime

For ThreadingActor this method is executed in the actor's own thread, immediately before the thread exits.

The method's arguments are the relevant information from sys.exc_info().

If an exception is raised by this method the stack trace will be logged, and the actor will stop.

Source code in src/pykka/_actor.py
def on_failure(  # noqa: B027
    self,
    exception_type: type[BaseException] | None,
    exception_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Run code when an unhandled exception is raised.

    Hook for doing any cleanup *after* an unhandled exception is raised,
    and *before* the actor stops.

    /// note | Threading runtime
    For [`ThreadingActor`][pykka.ThreadingActor] this method is executed in
    the actor's own thread, immediately before the thread exits.
    ///

    The method's arguments are the relevant information from
    [`sys.exc_info()`][sys.exc_info].

    If an exception is raised by this method the stack trace will be
    logged, and the actor will stop.
    """

on_receive

on_receive(message: Any) -> Any

May be implemented for the actor to handle regular non-proxy messages.

Parameters:

  • message (Any) –

    the message to handle

Returns:

  • Any

    anything that should be sent as a reply to the sender

Source code in src/pykka/_actor.py
def on_receive(self, message: Any) -> Any:
    """May be implemented for the actor to handle regular non-proxy messages.

    Args:
        message: the message to handle

    Returns:
        anything that should be sent as a reply to the sender

    """
    logger.warning(f"Unexpected message received by {self}: {message}")

on_start

on_start() -> None

Run code at the beginning of the actor's life.

Hook for doing any setup that should be done after the actor is started, but before it starts processing messages.

Threading runtime

For ThreadingActor, this method is executed in the actor's own thread, while __init__() is executed in the thread that created the actor.

If an exception is raised by this method the stack trace will be logged, and the actor will stop.

Source code in src/pykka/_actor.py
def on_start(self) -> None:  # noqa: B027
    """Run code at the beginning of the actor's life.

    Hook for doing any setup that should be done *after* the actor is
    started, but *before* it starts processing messages.

    /// note | Threading runtime
    For [`ThreadingActor`][pykka.ThreadingActor], this method is
    executed in the actor's own thread, while `__init__()` is executed
    in the thread that created the actor.
    ///

    If an exception is raised by this method the stack trace will be
    logged, and the actor will stop.
    """

on_stop

on_stop() -> None

Run code at the end of the actor's life.

Hook for doing any cleanup that should be done after the actor has processed the last message, and before the actor stops.

This hook is not called when the actor stops because of an unhandled exception. In that case, the on_failure() hook is called instead.

Threading runtime

For ThreadingActor this method is executed in the actor's own thread, immediately before the thread exits.

If an exception is raised by this method the stack trace will be logged, and the actor will stop.

Source code in src/pykka/_actor.py
def on_stop(self) -> None:  # noqa: B027
    """Run code at the end of the actor's life.

    Hook for doing any cleanup that should be done *after* the actor has
    processed the last message, and *before* the actor stops.

    This hook is *not* called when the actor stops because of an unhandled
    exception. In that case, the [`on_failure()`][pykka.Actor.on_failure]
    hook is called instead.

    /// note | Threading runtime
    For [`ThreadingActor`][pykka.ThreadingActor] this method is executed
    in the actor's own thread, immediately before the thread exits.
    ///

    If an exception is raised by this method the stack trace will be
    logged, and the actor will stop.
    """

start classmethod

start(*args: Any, **kwargs: Any) -> ActorRef[A]

Start an actor.

Starting an actor also registers it in the ActorRegistry.

Any arguments passed to start() will be passed on to the class constructor.

Behind the scenes, the following is happening when you call start():

  1. The actor is created:

    1. actor_urn is initialized with the assigned URN.
    2. actor_inbox is initialized with a new actor inbox.
    3. actor_ref is initialized with a pykka.ActorRef object for safely communicating with the actor.
    4. At this point, your __init__() code can run.
  2. The actor is registered in pykka.ActorRegistry.

  3. The actor's receive loop is started.

Returns:

  • ActorRef[A]

    A ref which can be used to access the actor in a safe manner.

Source code in src/pykka/_actor.py
@classmethod
def start(
    cls: type[A],
    *args: Any,
    **kwargs: Any,
) -> ActorRef[A]:
    """Start an actor.

    Starting an actor also registers it in the
    [`ActorRegistry`][pykka.ActorRegistry].

    Any arguments passed to [`start()`][pykka.Actor.start] will be passed on
    to the class constructor.

    Behind the scenes, the following is happening when you call
    [`start()`][pykka.Actor.start]:

    1.  The actor is created:

        1.  [`actor_urn`][pykka.Actor.actor_urn] is initialized with the
            assigned URN.
        2.  [`actor_inbox`][pykka.Actor.actor_inbox] is initialized with a
            new actor inbox.
        3.  [`actor_ref`][pykka.Actor.actor_ref] is initialized with a
            [`pykka.ActorRef`][pykka.ActorRef] object for safely
            communicating with the actor.
        4.  At this point, your `__init__()` code can run.

    2.  The actor is registered in [`pykka.ActorRegistry`][pykka.ActorRegistry].

    3.  The actor's receive loop is started.

    Returns:
        A ref which can be used to access the actor in a safe manner.

    """
    obj = cls(*args, **kwargs)
    assert obj.actor_ref is not None, (
        "Actor.__init__() have not been called. "
        "Did you forget to call super() in your override?"
    )
    ActorRegistry.register(obj.actor_ref)
    logger.debug(f"Starting {obj}")
    obj._start_actor_loop()
    return obj.actor_ref

stop

stop() -> None

Stop the actor.

It's equivalent to calling ActorRef.stop(block=False).

Source code in src/pykka/_actor.py
def stop(self) -> None:
    """Stop the actor.

    It's equivalent to calling
    [`ActorRef.stop(block=False)`][pykka.ActorRef.stop].
    """
    self.actor_ref.tell(messages._ActorStop())  # noqa: SLF001

pykka.ActorRef

Bases: Generic[A]

Reference to a running actor which may safely be passed around.

ActorRef instances are returned by Actor.start() and the lookup methods in ActorRegistry. You should never need to create ActorRef instances yourself.

Source code in src/pykka/_ref.py
class ActorRef(Generic[A]):
    """Reference to a running actor which may safely be passed around.

    [`ActorRef`][pykka.ActorRef] instances are returned by
    [`Actor.start()`][pykka.Actor.start] and the lookup methods in
    [`ActorRegistry`][pykka.ActorRegistry]. You should never need to create
    [`ActorRef`][pykka.ActorRef] instances yourself.
    """

    actor_class: type[A]
    """The class of the referenced actor."""

    actor_urn: str
    """See [`Actor.actor_urn`][pykka.Actor.actor_urn]."""

    actor_inbox: ActorInbox
    """See [`Actor.actor_inbox`][pykka.Actor.actor_inbox]."""

    actor_stopped: Event
    """See [`Actor.actor_stopped`][pykka.Actor.actor_stopped]."""

    def __init__(
        self,
        actor: A,
    ) -> None:
        self._actor_weakref = weakref.ref(actor)
        self.actor_class = actor.__class__
        self.actor_urn = actor.actor_urn
        self.actor_inbox = actor.actor_inbox
        self.actor_stopped = actor.actor_stopped

    def __repr__(self) -> str:
        return f"<ActorRef for {self}>"

    def __str__(self) -> str:
        return f"{self.actor_class.__name__} ({self.actor_urn})"

    def is_alive(self) -> bool:
        """Check if actor is alive.

        This is based on the actor's stopped flag. The actor is not guaranteed
        to be alive and responding even though
        [`is_alive()`][pykka.ActorRef.is_alive] returns `True`.

        Returns:
            `True` if actor is alive, `False` otherwise.

        """
        return not self.actor_stopped.is_set()

    def tell(
        self,
        message: Any,
    ) -> None:
        """Send message to actor without waiting for any response.

        Will generally not block, but if the underlying queue is full it will
        block until a free slot is available.

        Args:
            message: message to send

        Raises:
            ActorDeadError: if actor is not available

        """
        if not self.is_alive():
            msg = f"{self} not found"
            raise ActorDeadError(msg)
        self.actor_inbox.put(Envelope(message))

    @overload
    def ask(
        self,
        message: Any,
        *,
        block: Literal[False],
        timeout: float | None = None,
    ) -> Future[Any]: ...

    @overload
    def ask(
        self,
        message: Any,
        *,
        block: Literal[True],
        timeout: float | None = None,
    ) -> Any: ...

    @overload
    def ask(
        self,
        message: Any,
        *,
        block: bool = True,
        timeout: float | None = None,
    ) -> Any | Future[Any]: ...

    def ask(
        self,
        message: Any,
        *,
        block: bool = True,
        timeout: float | None = None,
    ) -> Any | Future[Any]:
        """Send message to actor and wait for the reply.

        The message can be of any type.
        If `block` is `False`, it will immediately return a
        [`Future`][pykka.Future] instead of blocking.

        If `block` is `True`, and `timeout` is `None`, as default, the method
        will block until it gets a reply, potentially forever. If `timeout` is
        an integer or float, the method will wait for a reply for `timeout`
        seconds, and then raise [`pykka.Timeout`][pykka.Timeout].

        Args:
            message: message to send
            block: whether to block while waiting for a reply
            timeout: seconds to wait before timeout if blocking

        Raises:
            Timeout: if timeout is reached if blocking
            Exception: any exception returned by the receiving actor if blocking

        Returns:
            a future if not blocking, or a response if blocking

        """
        future = self.actor_class._create_future()  # noqa: SLF001

        try:
            if not self.is_alive():
                msg = f"{self} not found"
                raise ActorDeadError(msg)  # noqa: TRY301
        except ActorDeadError:
            future.set_exception()
        else:
            self.actor_inbox.put(Envelope(message, reply_to=future))

        if block:
            return future.get(timeout=timeout)

        return future

    @overload
    def stop(
        self,
        *,
        block: Literal[True],
        timeout: float | None = None,
    ) -> bool: ...

    @overload
    def stop(
        self,
        *,
        block: Literal[False],
        timeout: float | None = None,
    ) -> Future[bool]: ...

    @overload
    def stop(
        self,
        *,
        block: bool = True,
        timeout: float | None = None,
    ) -> bool | Future[bool]: ...

    def stop(
        self,
        *,
        block: bool = True,
        timeout: float | None = None,
    ) -> bool | Future[bool]:
        """Send a message to the actor, asking it to stop.

        Returns `True` if actor is stopped or was being stopped at the
        time of the call. `False` if actor was already dead. If
        `block` is `False`, it returns a future wrapping the result.

        Messages sent to the actor before the actor is asked to stop will
        be processed normally before it stops.

        Messages sent to the actor after the actor is asked to stop will
        be replied to with [`ActorDeadError`][pykka.ActorDeadError] after it
        stops.

        The actor may not be restarted.

        `block` and `timeout` works as for [`ask()`][pykka.ActorRef.ask].

        Returns:
            a future if not blocking, or a boolean result if blocking

        """
        ask_future = self.ask(_ActorStop(), block=False)

        def _stop_result_converter(timeout: float | None) -> bool:
            try:
                ask_future.get(timeout=timeout)
            except ActorDeadError:
                return False
            else:
                return True

        converted_future = ask_future.__class__()
        converted_future.set_get_hook(_stop_result_converter)
        converted_future = cast("Future[bool]", converted_future)

        if block:
            return converted_future.get(timeout=timeout)

        return converted_future

    def proxy(self: ActorRef[A]) -> ActorProxy[A]:
        """Wrap the [`ActorRef`][pykka.ActorRef] in an [`ActorProxy`][pykka.ActorProxy].

        Using this method like this:

            proxy = AnActor.start().proxy()

        is analogous to:

            proxy = ActorProxy(AnActor.start())

        Raises:
            ActorDeadError: if actor is not available

        Returns:
            a proxy object wrapping the actor reference.

        """
        return ActorProxy(actor_ref=self)

actor_class instance-attribute

actor_class: type[A] = __class__

The class of the referenced actor.

actor_inbox instance-attribute

actor_inbox: ActorInbox = actor_inbox

actor_stopped instance-attribute

actor_stopped: Event = actor_stopped

actor_urn instance-attribute

actor_urn: str = actor_urn

ask

ask(
    message: Any,
    *,
    block: Literal[False],
    timeout: float | None = None,
) -> Future[Any]
ask(
    message: Any,
    *,
    block: Literal[True],
    timeout: float | None = None,
) -> Any
ask(
    message: Any,
    *,
    block: bool = True,
    timeout: float | None = None,
) -> Any | Future[Any]
ask(
    message: Any,
    *,
    block: bool = True,
    timeout: float | None = None,
) -> Any | Future[Any]

Send message to actor and wait for the reply.

The message can be of any type. If block is False, it will immediately return a Future instead of blocking.

If block is True, and timeout is None, as default, the method will block until it gets a reply, potentially forever. If timeout is an integer or float, the method will wait for a reply for timeout seconds, and then raise pykka.Timeout.

Parameters:

  • message (Any) –

    message to send

  • block (bool, default: True ) –

    whether to block while waiting for a reply

  • timeout (float | None, default: None ) –

    seconds to wait before timeout if blocking

Raises:

  • Timeout

    if timeout is reached if blocking

  • Exception

    any exception returned by the receiving actor if blocking

Returns:

  • Any | Future[Any]

    a future if not blocking, or a response if blocking

Source code in src/pykka/_ref.py
def ask(
    self,
    message: Any,
    *,
    block: bool = True,
    timeout: float | None = None,
) -> Any | Future[Any]:
    """Send message to actor and wait for the reply.

    The message can be of any type.
    If `block` is `False`, it will immediately return a
    [`Future`][pykka.Future] instead of blocking.

    If `block` is `True`, and `timeout` is `None`, as default, the method
    will block until it gets a reply, potentially forever. If `timeout` is
    an integer or float, the method will wait for a reply for `timeout`
    seconds, and then raise [`pykka.Timeout`][pykka.Timeout].

    Args:
        message: message to send
        block: whether to block while waiting for a reply
        timeout: seconds to wait before timeout if blocking

    Raises:
        Timeout: if timeout is reached if blocking
        Exception: any exception returned by the receiving actor if blocking

    Returns:
        a future if not blocking, or a response if blocking

    """
    future = self.actor_class._create_future()  # noqa: SLF001

    try:
        if not self.is_alive():
            msg = f"{self} not found"
            raise ActorDeadError(msg)  # noqa: TRY301
    except ActorDeadError:
        future.set_exception()
    else:
        self.actor_inbox.put(Envelope(message, reply_to=future))

    if block:
        return future.get(timeout=timeout)

    return future

is_alive

is_alive() -> bool

Check if actor is alive.

This is based on the actor's stopped flag. The actor is not guaranteed to be alive and responding even though is_alive() returns True.

Returns:

  • bool

    True if actor is alive, False otherwise.

Source code in src/pykka/_ref.py
def is_alive(self) -> bool:
    """Check if actor is alive.

    This is based on the actor's stopped flag. The actor is not guaranteed
    to be alive and responding even though
    [`is_alive()`][pykka.ActorRef.is_alive] returns `True`.

    Returns:
        `True` if actor is alive, `False` otherwise.

    """
    return not self.actor_stopped.is_set()

proxy

proxy() -> ActorProxy[A]

Wrap the ActorRef in an ActorProxy.

Using this method like this:

proxy = AnActor.start().proxy()

is analogous to:

proxy = ActorProxy(AnActor.start())

Raises:

Returns:

  • ActorProxy[A]

    a proxy object wrapping the actor reference.

Source code in src/pykka/_ref.py
def proxy(self: ActorRef[A]) -> ActorProxy[A]:
    """Wrap the [`ActorRef`][pykka.ActorRef] in an [`ActorProxy`][pykka.ActorProxy].

    Using this method like this:

        proxy = AnActor.start().proxy()

    is analogous to:

        proxy = ActorProxy(AnActor.start())

    Raises:
        ActorDeadError: if actor is not available

    Returns:
        a proxy object wrapping the actor reference.

    """
    return ActorProxy(actor_ref=self)

stop

stop(
    *, block: Literal[True], timeout: float | None = None
) -> bool
stop(
    *, block: Literal[False], timeout: float | None = None
) -> Future[bool]
stop(
    *, block: bool = True, timeout: float | None = None
) -> bool | Future[bool]
stop(
    *, block: bool = True, timeout: float | None = None
) -> bool | Future[bool]

Send a message to the actor, asking it to stop.

Returns True if actor is stopped or was being stopped at the time of the call. False if actor was already dead. If block is False, it returns a future wrapping the result.

Messages sent to the actor before the actor is asked to stop will be processed normally before it stops.

Messages sent to the actor after the actor is asked to stop will be replied to with ActorDeadError after it stops.

The actor may not be restarted.

block and timeout works as for ask().

Returns:

  • bool | Future[bool]

    a future if not blocking, or a boolean result if blocking

Source code in src/pykka/_ref.py
def stop(
    self,
    *,
    block: bool = True,
    timeout: float | None = None,
) -> bool | Future[bool]:
    """Send a message to the actor, asking it to stop.

    Returns `True` if actor is stopped or was being stopped at the
    time of the call. `False` if actor was already dead. If
    `block` is `False`, it returns a future wrapping the result.

    Messages sent to the actor before the actor is asked to stop will
    be processed normally before it stops.

    Messages sent to the actor after the actor is asked to stop will
    be replied to with [`ActorDeadError`][pykka.ActorDeadError] after it
    stops.

    The actor may not be restarted.

    `block` and `timeout` works as for [`ask()`][pykka.ActorRef.ask].

    Returns:
        a future if not blocking, or a boolean result if blocking

    """
    ask_future = self.ask(_ActorStop(), block=False)

    def _stop_result_converter(timeout: float | None) -> bool:
        try:
            ask_future.get(timeout=timeout)
        except ActorDeadError:
            return False
        else:
            return True

    converted_future = ask_future.__class__()
    converted_future.set_get_hook(_stop_result_converter)
    converted_future = cast("Future[bool]", converted_future)

    if block:
        return converted_future.get(timeout=timeout)

    return converted_future

tell

tell(message: Any) -> None

Send message to actor without waiting for any response.

Will generally not block, but if the underlying queue is full it will block until a free slot is available.

Parameters:

  • message (Any) –

    message to send

Raises:

Source code in src/pykka/_ref.py
def tell(
    self,
    message: Any,
) -> None:
    """Send message to actor without waiting for any response.

    Will generally not block, but if the underlying queue is full it will
    block until a free slot is available.

    Args:
        message: message to send

    Raises:
        ActorDeadError: if actor is not available

    """
    if not self.is_alive():
        msg = f"{self} not found"
        raise ActorDeadError(msg)
    self.actor_inbox.put(Envelope(message))