After going through the same (fantastic, I must say) course on coroutines by Beazley, I asked myself the very same question - how could one adjust the code to work with the native coroutines introduced in Python 3.5?
It turns out it can be done with relatively small changes to the code. I will assume the readers are familiar with the course material, and will take the pyos4.py version as a base - the first Scheduler
version that supports "system calls".
TIP: A full runnable example can be found in Appendix A at the end.
Objective
The goal is turn the following coroutine code:
def foo():
mytid = yield GetTid() # a "system call"
for i in xrange(3):
print "I'm foo", mytid
yield # a "trap"
... into a native coroutine and still use just as before:
async def foo():
mytid = await GetTid() # a "system call"
for i in range(3):
print("I'm foo", mytid)
await ??? # a "trap" (will explain the missing bit later)
We want to run it without asyncio
, as we already have our own event loop that drives the entire process - it's the Scheduler
class.
Awaitable objects
Native coroutines do not work right off the bat, the following code results in an error:
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
sched = Scheduler()
sched.new(foo())
sched.mainloop()
Traceback (most recent call last):
...
mytid = await GetTid()
TypeError: object GetTid can't be used in 'await' expression
PEP 492 explains what kind of objects can be awaited on. One of the options is "an object with an __await__
method returning an iterator".
Just like yield from
, if you are familiar with it, await
acts as a tunnel between the object awaited on and the outermost code that drives the coroutine (usually an event loop). This is best demonstrated with an example:
class Awaitable:
def __await__(self):
value = yield 1
print("Awaitable received:", value)
value = yield 2
print("Awaitable received:", value)
value = yield 3
print("Awaitable received:", value)
return 42
async def foo():
print("foo start")
result = await Awaitable()
print("foo received result:", result)
print("foo end")
Driving the foo()
coroutine interactively produces the following:
>>> f_coro = foo() # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
Whatever gets sent into f_coro
is channeled down into the Awaitable
instance. Similarly, whatever Awaitable.__await__()
produces is bubbled up to the topmost code that sends the values in.
The entire process is transparent to the f_coro
coroutine, which is not directly involved and does not see values being passed up and down. However, when Awaitable
's iterator is exhausted, its return value becomes the result of the await
expression (42 in our case), and that's where f_coro
is finally resumed.
Note that await
expressions in coroutines can also be chained. A coroutine can await another coroutine that awaits another coroutine... until the entire chain ends with a yield
somewhere down the road.
Sending values into the coroutine itself
How can this knowledge help us? Well, in the course material a coroutine can yield a SystemCall
instance. The scheduler understands these and lets the system call handle the requested operation.
In order for a coroutine to bring a SystemCall
up to the scheduler, a SystemCall
instance can simply yield itself, and it will be channeled up to the scheduler as described in the previous section.
The first required change is therefore to add this logic to the base SystemCall
class:
class SystemCall:
...
def __await__(self):
yield self
With the SystemCall
instances made awaitable, the following now actually runs:
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()
Output:
I'm foo None
Task 1 terminated
Great, it does not crash anymore!
However, the coroutine did not receive the task ID, and got None
instead. This is because the value set by the system call's handle()
method and sent by the Task.run()
method:
# in Task.run()
self.target.send(self.sendval)
... ended up in the SystemCall.__await__()
method. If we want to bring the value into the coroutine, the system call must return it, so that it becomes the value of the await
expression in the coroutine.
class SystemCall:
...
def __await__(self):
return (yield self)
Running the same code with the modified SystemCall
produces the desired output:
I'm foo 1
Task 1 terminated
Running the coroutines concurrently
We still need a way to suspend a coroutine, i.e. to have a system "trap" code. In the course material, this is done with a plain yield
inside a coroutine, but an attempt to use a plain await
is actually a syntax error:
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await # SyntaxError here
Fortunately, the workaround is easy. Since we already have working system calls, we can add a dummy no-op system call whose only job is to suspend the coroutine and immediately re-schedule it:
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
Setting a sendval
on the task is optional, as this system call is not expected to produce any meaningful value, but we opt to make this explicit.
We now have everything in place to run a multitasking operating system!
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
Output:
I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
I'm foo 1
I'm bar 2
Task 1 terminated
I'm bar 2
I'm bar 2
Task 2 terminated
Footnotes
The Scheduler
code is completely unchanged.
It. Just. Works.
This shows the beauty of the original design where the scheduler and the tasks that run in it are not coupled to each other, and we were able to change the coroutine implementation without the Scheduler
knowing about it. Even the Task
class that wraps the coroutines did not have to change.
Trampolining is not needed.
In the pyos8.py version of the system, a concept of a trampoline is implemented. It allows the coroutines to delegate a part of their work to another coroutine with the help of the shceduler (the scheduler calls the sub-coroutine on behalf of the parent coroutine and sends the former's result into the parent).
This mechanism is not needed, since await
(and its older companion, yield from
) already make such chaining possible as explained at the beginning.
Appendix A - a full runnable example (requires Python 3.5+)
example_full.py
from queue import Queue
# ------------------------------------------------------------
# === Tasks ===
# ------------------------------------------------------------
class Task:
taskid = 0
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
# ------------------------------------------------------------
# === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
def __init__(self):
self.ready = Queue()
self.taskmap = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self,task):
print("Task %d terminated" % task.tid)
del self.taskmap[task.tid]
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result,SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task)
# ------------------------------------------------------------
# === System Calls ===
# ------------------------------------------------------------
class SystemCall:
def handle(self):
pass
def __await__(self):
return (yield self)
# Return a task's ID number
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional