Examples
The examples/
dir in Pykka’s Git repo includes some runnable examples of Pykka
usage.
Plain actor
#!/usr/bin/env python3
import pykka
GetMessages = object()
class PlainActor(pykka.ThreadingActor):
def __init__(self):
super().__init__()
self.stored_messages = []
def on_receive(self, message):
if message is GetMessages:
return self.stored_messages
self.stored_messages.append(message)
return None
if __name__ == "__main__":
actor = PlainActor.start()
actor.tell({"no": "Norway", "se": "Sweden"})
actor.tell({"a": 3, "b": 4, "c": 5})
print(actor.ask(GetMessages))
actor.stop()
Output:
[{'no': 'Norway', 'se': 'Sweden'}, {'a': 3, 'b': 4, 'c': 5}]
Actor with proxy
#!/usr/bin/env python3
import threading
import time
import pykka
class AnActor(pykka.ThreadingActor):
field = "this is the value of AnActor.field"
def proc(self):
log("this was printed by AnActor.proc()")
def func(self):
time.sleep(0.5) # Block a bit to make it realistic
return "this was returned by AnActor.func() after a delay"
def log(msg):
thread_name = threading.current_thread().name
print(f"{thread_name}: {msg}")
if __name__ == "__main__":
actor = AnActor.start().proxy()
for _ in range(3):
# Method with side effect
log("calling AnActor.proc() ...")
actor.proc()
# Method with return value
log("calling AnActor.func() ...")
result = actor.func() # Does not block, returns a future
log("printing result ... (blocking)")
log(result.get()) # Blocks until ready
# Field reading
log("reading AnActor.field ...")
result = actor.field # Does not block, returns a future
log("printing result ... (blocking)")
log(result.get()) # Blocks until ready
# Field writing
log("writing AnActor.field ...")
actor.field = "new value" # Assignment does not block
result = actor.field # Does not block, returns a future
log("printing new field value ... (blocking)")
log(result.get()) # Blocks until ready
actor.stop()
Output:
MainThread: calling AnActor.proc() ...
MainThread: calling AnActor.func() ...
MainThread: printing result ... (blocking)
AnActor-1: this was printed by AnActor.proc()
MainThread: this was returned by AnActor.func() after a delay
MainThread: reading AnActor.field ...
MainThread: printing result ... (blocking)
MainThread: this is the value of AnActor.field
MainThread: writing AnActor.field ...
MainThread: printing new field value ... (blocking)
MainThread: new value
MainThread: calling AnActor.proc() ...
MainThread: calling AnActor.func() ...
MainThread: printing result ... (blocking)
AnActor-1: this was printed by AnActor.proc()
MainThread: this was returned by AnActor.func() after a delay
MainThread: reading AnActor.field ...
MainThread: printing result ... (blocking)
MainThread: new value
MainThread: writing AnActor.field ...
MainThread: printing new field value ... (blocking)
MainThread: new value
MainThread: calling AnActor.proc() ...
MainThread: calling AnActor.func() ...
AnActor-1: this was printed by AnActor.proc()
MainThread: printing result ... (blocking)
MainThread: this was returned by AnActor.func() after a delay
MainThread: reading AnActor.field ...
MainThread: printing result ... (blocking)
MainThread: new value
MainThread: writing AnActor.field ...
MainThread: printing new field value ... (blocking)
MainThread: new value
Multiple cooperating actors
#!/usr/bin/env python3
import pykka
class Adder(pykka.ThreadingActor):
def add_one(self, i):
print(f"{self} is increasing {i}")
return i + 1
class Bookkeeper(pykka.ThreadingActor):
def __init__(self, adder):
super().__init__()
self.adder = adder
def count_to(self, target):
i = 0
while i < target:
i = self.adder.add_one(i).get()
print(f"{self} got {i} back")
if __name__ == "__main__":
adder = Adder.start().proxy()
bookkeeper = Bookkeeper.start(adder).proxy()
bookkeeper.count_to(10).get()
pykka.ActorRegistry.stop_all()
Output:
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 0
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 1 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 1
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 2 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 2
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 3 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 3
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 4 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 4
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 5 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 5
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 6 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 6
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 7 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 7
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 8 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 8
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 9 back
Adder (urn:uuid:f50029eb-7cea-4ab9-98bf-a5bf65af8b8f) is increasing 9
Bookkeeper (urn:uuid:4f2d4e78-7a33-4c4f-86ac-7c415a7205f4) got 10 back
Pool of actors sharing work
#!/usr/bin/env python3
"""Resolve a bunch of IP addresses using a pool of resolver actors.
Based on example contributed by Kristian Klette <klette@klette.us>.
Either run without arguments:
./resolver.py
Or specify pool size and IPs to resolve:
./resolver.py 3 193.35.52.{1,2,3,4,5,6,7,8,9}
"""
import pprint
import socket
import sys
import pykka
class Resolver(pykka.ThreadingActor):
def resolve(self, ip):
try:
info = socket.gethostbyaddr(ip)
print(f"Finished resolving {ip}")
return info[0]
except Exception:
print(f"Failed resolving {ip}")
return None
def run(pool_size, *ips):
# Start resolvers
resolvers = [Resolver.start().proxy() for _ in range(pool_size)]
# Distribute work by mapping IPs to resolvers (not blocking)
hosts = []
for i, ip in enumerate(ips):
hosts.append(resolvers[i % len(resolvers)].resolve(ip))
# Gather results (blocking)
ip_to_host = zip(ips, pykka.get_all(hosts))
pprint.pprint(list(ip_to_host))
# Clean up
pykka.ActorRegistry.stop_all()
if __name__ == "__main__":
if len(sys.argv[1:]) >= 2:
run(int(sys.argv[1]), *sys.argv[2:])
else:
ips = [f"193.35.52.{i}" for i in range(1, 50)]
run(10, *ips)
Mopidy music server
Pykka was originally created back in 2011 as a formalization of concurrency patterns that emerged in the Mopidy music server. The original Pykka source code wasn’t extracted from Mopidy, but it built and improved on the concepts from Mopidy. Mopidy was later ported to build on Pykka instead of its own concurrency abstractions.
Mopidy still use Pykka extensively to keep independent parts, like the MPD and HTTP frontend servers or the Spotify and Google Music integrations, running independently. Every one of Mopidy’s more than 100 extensions has at least one Pykka actor. By running each extension as an independent actor, errors and bugs in one extension is attempted isolated, to reduce the effect on the rest of the system.
You can browse the Mopidy source code to find many real life examples of Pykka usage.