from __future__ import annotations
import logging
import threading
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Literal,
Optional,
TypeVar,
Union,
overload,
)
if TYPE_CHECKING:
from pykka import Actor, ActorRef, Future
__all__ = ["ActorRegistry"]
logger = logging.getLogger("pykka")
A = TypeVar("A", bound="Actor")
[docs]class ActorRegistry:
"""Registry which provides easy access to all running actors.
Contains global state, but should be thread-safe.
"""
_actor_refs: ClassVar[list[ActorRef[Any]]] = []
_actor_refs_lock: ClassVar[threading.RLock] = threading.RLock()
[docs] @classmethod
def broadcast(
cls,
message: Any,
target_class: Union[str, type[Actor], None] = None,
) -> None:
"""Broadcast ``message`` to all actors of the specified ``target_class``.
If no ``target_class`` is specified, the message is broadcasted to all
actors.
:param message: the message to send
:type message: any
:param target_class: optional actor class to broadcast the message to
:type target_class: class or class name
"""
if isinstance(target_class, str):
targets = cls.get_by_class_name(target_class)
elif target_class is not None:
targets = cls.get_by_class(target_class)
else:
targets = cls.get_all()
for ref in targets:
ref.tell(message)
[docs] @classmethod
def get_all(cls) -> list[ActorRef[Any]]:
"""Get all running actors.
:returns: list of :class:`pykka.ActorRef`
"""
with cls._actor_refs_lock:
return cls._actor_refs[:]
[docs] @classmethod
def get_by_class(
cls,
actor_class: type[A],
) -> list[ActorRef[A]]:
"""Get all running actors of the given class or a subclass.
:param actor_class: actor class, or any superclass of the actor
:type actor_class: class
:returns: list of :class:`pykka.ActorRef`
"""
with cls._actor_refs_lock:
return [
ref
for ref in cls._actor_refs
if issubclass(ref.actor_class, actor_class)
]
[docs] @classmethod
def get_by_class_name(
cls,
actor_class_name: str,
) -> list[ActorRef[Any]]:
"""Get all running actors of the given class name.
:param actor_class_name: actor class name
:type actor_class_name: string
:returns: list of :class:`pykka.ActorRef`
"""
with cls._actor_refs_lock:
return [
ref
for ref in cls._actor_refs
if ref.actor_class.__name__ == actor_class_name
]
[docs] @classmethod
def get_by_urn(
cls,
actor_urn: str,
) -> Optional[ActorRef[Any]]:
"""Get an actor by its universally unique URN.
:param actor_urn: actor URN
:type actor_urn: string
:returns: :class:`pykka.ActorRef` or :class:`None` if not found
"""
with cls._actor_refs_lock:
refs = [ref for ref in cls._actor_refs if ref.actor_urn == actor_urn]
if not refs:
return None
return refs[0]
[docs] @classmethod
def register(
cls,
actor_ref: ActorRef[Any],
) -> None:
"""Register an :class:`ActorRef` in the registry.
This is done automatically when an actor is started, e.g. by calling
:meth:`Actor.start() <pykka.Actor.start>`.
:param actor_ref: reference to the actor to register
:type actor_ref: :class:`pykka.ActorRef`
"""
with cls._actor_refs_lock:
cls._actor_refs.append(actor_ref)
logger.debug(f"Registered {actor_ref}")
@overload
@classmethod
def stop_all(
cls,
*,
block: Literal[True],
timeout: float | None = ...,
) -> list[bool]:
...
@overload
@classmethod
def stop_all(
cls,
*,
block: Literal[False],
timeout: float | None = ...,
) -> list[Future[bool]]:
...
@overload
@classmethod
def stop_all(
cls,
*,
block: bool = True,
timeout: Optional[float] = None,
) -> Union[list[bool], list[Future[bool]]]:
...
[docs] @classmethod
def stop_all(
cls,
*,
block: bool = True,
timeout: Optional[float] = None,
) -> Union[list[bool], list[Future[bool]]]:
"""Stop all running actors.
``block`` and ``timeout`` works as for
:meth:`ActorRef.stop() <pykka.ActorRef.stop>`.
If ``block`` is :class:`True`, the actors are guaranteed to be stopped
in the reverse of the order they were started in. This is helpful if
you have simple dependencies in between your actors, where it is
sufficient to shut down actors in a LIFO manner: last started, first
stopped.
If you have more complex dependencies in between your actors, you
should take care to shut them down in the required order yourself, e.g.
by stopping dependees from a dependency's
:meth:`on_stop() <pykka.Actor.on_stop>` method.
:returns: If not blocking, a list with a future for each stop action.
If blocking, a list of return values from
:meth:`pykka.ActorRef.stop`.
"""
return [
ref.stop(block=block, timeout=timeout) for ref in reversed(cls.get_all())
]
[docs] @classmethod
def unregister(
cls,
actor_ref: ActorRef[A],
) -> None:
"""Remove an :class:`ActorRef <pykka.ActorRef>` from the registry.
This is done automatically when an actor is stopped, e.g. by calling
:meth:`Actor.stop() <pykka.Actor.stop>`.
:param actor_ref: reference to the actor to unregister
:type actor_ref: :class:`pykka.ActorRef`
"""
removed = False
with cls._actor_refs_lock:
if actor_ref in cls._actor_refs:
cls._actor_refs.remove(actor_ref)
removed = True
if removed:
logger.debug(f"Unregistered {actor_ref}")
else:
logger.debug(f"Unregistered {actor_ref} (not found in registry)")