Skip to content

Actor with proxy

This example shows how to define an actor by creating a class that subclasses ThreadingActor and defines attributes and methods that can be accessed through a proxy. This makes the actor almost as easy to use as a regular class, with the only difference being that the method calls and attribute accesses are asynchronous and return futures.

This actor does not override the on_receive() method, but it is entirely possible to combine the two approaches and use both regular message passing to the on_receive() method and proxy-based access to methods and attributes on the same actor.

Example

examples/proxy.py
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
#     "pykka",
# ]
# ///

import threading
import time
from typing import cast

from pykka import ActorProxy, ThreadingActor
from pykka.typing import ActorMemberMixin, proxy_field, proxy_method


def log(msg: str) -> None:
    thread_name = threading.current_thread().name
    print(f"{thread_name}: {msg}")


class FooActor(ThreadingActor):
    field = "this is the value of FooActor.field"

    def bar(self) -> None:
        log("this was printed by FooActor.bar()")

    def baz(self) -> str:
        time.sleep(0.5)  # Block a bit to make it realistic
        return "this was returned by FooActor.baz() after a delay"


class FooProxy(ActorMemberMixin, ActorProxy[FooActor]):
    field = proxy_field(FooActor.field)
    bar = proxy_method(FooActor.bar)
    baz = proxy_method(FooActor.baz)


if __name__ == "__main__":
    # Start the actor and get a proxy to it
    proxy = cast("FooProxy", FooActor.start().proxy())

    # Method with side effect
    log("calling FooActor.bar() ...")
    proxy.bar()

    # Method with return value
    log("calling FooActor.baz() ...")
    # Does not block, returns a future:
    result = proxy.baz()
    log("printing result ... (blocking)")
    # Blocks until ready:
    log(result.get())

    # Field reading
    log("reading FooActor.field ...")
    # Does not block, returns a future:
    result = proxy.field
    log("printing result ... (blocking)")
    # Blocks until ready:
    log(result.get())

    # Field writing
    log("writing FooActor.field ...")
    # Assignment does not block:
    proxy.field = "new value"  # type: ignore[assignment]
    # Does not block, returns a future:
    result = proxy.field
    log("printing new field value ... (blocking)")
    # Blocks until ready:
    log(result.get())

    # Stop the actor
    proxy.stop()

Output

$ uv run examples/proxy.py
MainThread: calling AnActor.foo() ...
MainThread: calling AnActor.bar() ...
AnActor-1 (_actor_loop): this was printed by AnActor.foo()
MainThread: printing result ... (blocking)
MainThread: this was returned by AnActor.bar() after a delay
MainThread: reading AnActor.field ...
MainThread: printing result ... (blocking)
MainThread: this is the value of AnActor.field
MainThread: writing AnActor.field ...
MainThread: printing new field value ... (blocking)
MainThread: new value