Actor with proxy
This example shows how to define an actor by creating a class that subclasses
ThreadingActor and defines attributes and methods that
can be accessed through a proxy. This makes the actor almost as easy to use as a
regular class, with the only difference being that the method calls and
attribute accesses are asynchronous and return futures.
This actor does not override the on_receive()
method, but it is entirely possible to combine the two approaches and use both
regular message passing to the on_receive() method
and proxy-based access to methods and attributes on the same actor.
Example
examples/proxy.py
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pykka",
# ]
# ///
import threading
import time
from typing import cast
from pykka import ActorProxy, ThreadingActor
from pykka.typing import ActorMemberMixin, proxy_field, proxy_method
def log(msg: str) -> None:
thread_name = threading.current_thread().name
print(f"{thread_name}: {msg}")
class FooActor(ThreadingActor):
field = "this is the value of FooActor.field"
def bar(self) -> None:
log("this was printed by FooActor.bar()")
def baz(self) -> str:
time.sleep(0.5) # Block a bit to make it realistic
return "this was returned by FooActor.baz() after a delay"
class FooProxy(ActorMemberMixin, ActorProxy[FooActor]):
field = proxy_field(FooActor.field)
bar = proxy_method(FooActor.bar)
baz = proxy_method(FooActor.baz)
if __name__ == "__main__":
# Start the actor and get a proxy to it
proxy = cast("FooProxy", FooActor.start().proxy())
# Method with side effect
log("calling FooActor.bar() ...")
proxy.bar()
# Method with return value
log("calling FooActor.baz() ...")
# Does not block, returns a future:
result = proxy.baz()
log("printing result ... (blocking)")
# Blocks until ready:
log(result.get())
# Field reading
log("reading FooActor.field ...")
# Does not block, returns a future:
result = proxy.field
log("printing result ... (blocking)")
# Blocks until ready:
log(result.get())
# Field writing
log("writing FooActor.field ...")
# Assignment does not block:
proxy.field = "new value" # type: ignore[assignment]
# Does not block, returns a future:
result = proxy.field
log("printing new field value ... (blocking)")
# Blocks until ready:
log(result.get())
# Stop the actor
proxy.stop()
Output
$ uv run examples/proxy.py
MainThread: calling AnActor.foo() ...
MainThread: calling AnActor.bar() ...
AnActor-1 (_actor_loop): this was printed by AnActor.foo()
MainThread: printing result ... (blocking)
MainThread: this was returned by AnActor.bar() after a delay
MainThread: reading AnActor.field ...
MainThread: printing result ... (blocking)
MainThread: this is the value of AnActor.field
MainThread: writing AnActor.field ...
MainThread: printing new field value ... (blocking)
MainThread: new value