on Sep 1st, 2008Managing asynchronous operations with python generators (part3)
Part 1: Introduction to generators in Python
Part 2: Indepth generator usage in Python (part2)
Part 3: Managing asynchronous operations with python generators (part3)
You can download all the code for this part here: http://totmacher.eu/upload/generators.tar.gz
I’m sorry, I lied to you guys - this part will not contain a real world example because I felt there were to many new concepts introduced. In this part we’ll go through something called asynchronous programming and how to create a scheduler to keep track of all the tasks we’re performing.
We’ll start of with the Task-base class, it wraps a generator exposing it’s own .next()-method and a .suspended()-method that is used by our scheduler to decide if we should run the task or not. A task can basically be anything, but more often then not it’s some type of operation that involves a delay we can’t control when it’s complete - such as a network call, disk access or something similar.
The directory structure for all of these examples is the following:
/ /demo __init__.py tasks.py scheduler.py example_N.py
demo/tasks.py:
class Task(object): def __init__(self, generator = None): self.generator = generator def suspended(self): return False def next(self): return self.generator.next()
No doubt a fairly simple class, three methods spanning just one line each. Either download the entire code for these examples or write it yourself and put it in the demo/tasks.py file.
Lets put together our scheduler, the scheduler isn’t a class - it’s just a module with two variables and two functions, it to is fairly simple and spans just about twenty five lines:
demo/scheduler.py:
queue = [] def add(task): global queue queue.append(task) def run(): global queue stack = queue queue = [] while len(stack) > 0: task = stack.pop(0) try: if not task.suspended(): task.next() except StopIteration: continue queue.append(task) if len(stack) == 0 and len(queue) > 0: stack = queue queue = []
One global list named queue, and two functions: add() that just appends an element on the queue and run() that runs through the queue until there are no more generators left in it. Let’s go through the run() function line by line:
- stack = queue - We store the queue in a temporary local variable
- queue = [] - Set queue to an empty list
- task = stack.pop(0) - Pop the first element of the stack
- if not task.suspended(): - If the task isn’t suspended continue
- task.next() - Call .next() on the task which in turn forwards the call to the generator the task wraps
- except StopIteration: - If we get a StopIteration exception from the executing task/generator we should just continue with the next one (this will push the task that was responsible for the exception out of the queue)
- queue.append(task) - Add the task in question to the queue again
- if len(stack) == 0 and len(queue) > 0: - If the stack is empty and the queue isn’t
- stack = queue - Put the queue in the stack
- queue = [] - And clear the queue
Let’s put this together in a simple example so we can see that our scheduler works as it should:
example_1.py
from demo import tasks, scheduler
def echo(word):
while True:
print word
yield
scheduler.add(
tasks.Task(
echo("Hello")))
scheduler.add(
tasks.Task(
echo("World!")))
scheduler.run()
If we run this from the the terminal with python example_1.py we will get Hello and World! looped over our screen for eternity:
... Hello World! Hello World! Hello World! Hello World! Hello World! ...
Maybe not so exiting, but if you look closely you will see that Hello and World! alternate between each other, meaning that when the generator that prints Hello yields the first time the scheduler will take control passing execution to the generator printing World!, when that has printed out its message the stack is empty so the scheduler fills the stack with the queue again and beings from the top, and it all begins again.
Let’s add a new function called sleeper() to the mix, here you have example_2.py:
from time import sleep
from demo import tasks, scheduler
def echo(word):
while True:
print word
yield
def sleeper(seconds):
while True:
sleep(seconds)
yield
scheduler.add(
tasks.Task(
echo("Hello")))
scheduler.add(
tasks.Task(
sleeper(1)))
scheduler.add(
tasks.Task(
echo("World!")))
scheduler.add(
tasks.Task(
sleeper(1)))
scheduler.run()
You should be able to figure out what happens when we run this example from the terminal, Hello will print - and then yield and the scheduler passes execution to the first sleeper, pausing for one second and then passing it to World! printing that which yields and then execution gets passed to our second sleeper generator that pauses execution for one second, this is mainly so you can see that they actually take turn instead of some mindless spaming from an infinite repeating loop without pauses.
Let’s make use of that .suspended()-method on the Task class shall we? Let’s create a generator that gets called every second time the scheduler asks for it instead of every iteration, here’s the code - add it to the end of demo/tasks.py:
class EvenTask(Task): def __init__(self, *args, **kwargs): super(self.__class__, self).__init__(*args, **kwargs) self.counter = 0 def suspended(self): self.counter += 1 return self.counter % 2 != 0
If you don’t understand the super(self.__class__, self).__init__(*args, **kwargs)-line it’s how you call the parent classes .__init__()-method in Python, .suspended() increases the counter with +1 each time it’s called but only returns false when we’re at an even number, allowing the task to be executed every other time.
Here’s the next example code, example_3.py
from time import sleep
from demo import tasks, scheduler
def echo(word):
while True:
print word
yield
def sleeper(seconds):
while True:
sleep(seconds)
yield
scheduler.add(
tasks.Task(
echo("Hello")))
scheduler.add(
tasks.Task(
sleeper(1)))
scheduler.add(
tasks.EvenTask(
echo("World!")))
scheduler.add(
tasks.Task(
sleeper(1)))
scheduler.run()
It’s identical to example_2.py except that the echo(”World!”)-task now is of the type EvenTask instead of Task, if you run this code you will get Hello Hello World! printed, because the World!-task will skip every other time.
... Hello Hello World! Hello Hello World! Hello Hello World! ...
So, I’ll take one last example before calling it for the day - if you remember the line “except StopIteration:” from the scheduler.run()-function we can use that to make a generator drop out of the scheduler’s run()-loop, i present to you the marvelous example_4.py:
from time import sleep
from demo import tasks, scheduler
def echo(word):
while True:
print word
yield
def sleeper(seconds):
while True:
sleep(seconds)
yield
def echo_once(word):
while True:
print word
yield
raise StopIteration
scheduler.add(
tasks.Task(
echo("Hello")))
scheduler.add(
tasks.Task(
echo_once("World!")))
scheduler.add(
tasks.Task(
sleeper(1)))
scheduler.run()
Running this from the terminal will yield (no pun intended) you something like this, only printing “World!” once:
Hello World! Hello Hello Hello ...
In the next part, number four I will show you a real world example with asynchronous network i/o, i just wanted to introduce the concept of a scheduler before jumping into a more advanced example.
[…] Part 1: Introduction to generators in Python Part 2: Indepth generator usage in Python (part2) Part 3: Managing asynchronous operations with python generators (part3) […]
[…] Part 1: Introduction to generators in Python Part 2: Indepth generator usage in Python (part2) Part 3: Managing asynchronous operations with python generators (part3) […]
Loving this series, keep it up! Can’t wait for the next part about async networking. Maybe mention Twisted?
You explained the complex (function) generator in a simple way. I tried others’ but your explanation was the best. If you have time please explain other complex parts of python too. I have bookmarked this blog and will come back to visit it again. Hopefully I will see more about python.
Thanks Cheers.