You are viewing thespeedbump

 
 
05 August 2008 @ 08:15 pm
Asynchronous programming using imvu.task II: Futures  
In my last post, I demonstrated a tiny little application that asynchronously opens a bunch of http connections in order to recursively download all the assets linked to by a page.

There's just one problem: our program is going to reuse threads (imvu.task maintains a threadpool), but it has no upper limit on the number of connections it will open at the same time. Let's address that.

Instead of spinning off a new task for every web request, what we'll do is maintain a set of worker tasks to process incoming requests.

That looks like this:

from imvu.task import task
from imvu.task import Future, Queue
from imvu.task import Start, Return

class Pipeliner(object):
    def __init__(self, jobCount=2):
        self.__queue = Queue()
        self.__workers = None
        self.__jobCount = jobCount

    @task
    def start(self):
        assert self.__workers is None
        self.__workers = [(yield Start(self.__work())) for _ in range(self.__jobCount)]

    @task
    def schedule(self, workItem):
        assert self.__workers is not None

        f = Future()
        self.__queue.put((f, workItem))

        result = yield f
        yield Return(result)

    @task
    def __work(self):
        while True:
            future, workItem = yield self.__queue.get()
            try:
                result = yield workItem
                future.complete(result=result, error=None)
            except Exception, e:
                future.complete(result=None, error=e)


There are a bunch of things going on in this class. Let's look at them one by one:

First, we introduce a queue. imvu.task queues are just like a queue in any other language you may see, except that you can block and wait for something to be put into the queue. This is why Pipeliner.__work yields on self.__queue.get().

Secondly, and more importantly, we introduce the concept of a Future. A Future is an object that represents a return value that may not have been computed just yet. Futures are occasionally called promises for this reason.

This class makes use of two logical threads of execution: a worker task that continuously pulls work items off a queue, performs the work, and provides results; and a second task that schedules work items, waits for the result to materialize, and returns it.

Pipeliner.schedule is our public interface: client code will use it to schedule a bit of work, and yield on it until the work is complete. Its operation is quite simple: it creates a future, places it on a queue alongside the work item, waits for the result, and returns it.

Pipeliner.__worker is a simple consumer loop: wait until a work item has been enqueued, pop it off, do the work, and then provide the result to the future. The call to future.complete() provides the result to the future, which automatically wakes up any tasks that were blocked on it. Notice that complete() takes two arguments: a result and an error. This way, exceptions raised by tasks can be propagated to the tasks that are waiting on them.

Using this in our URL getter application is simplicity itself. Create it, start it (to get its workers fired up), and then use it!

pipeliner = Pipeliner(jobCount=2)
taskScheduler._call(pipeliner.start())

@task
def doLotsOfStuff(url):
    all_urls.append(url)

    content = yield pipeliner.schedule(getUrl(url))

    file(filenameForUrl(url), 'wt').write(content)

    urls_to_get = [
        u for u in extractUrls(content)
        if stripExtension(u) in extensions_we_want
    ]

    yield RunInParallel(
        doLotsOfStuff(u)
        for u in urls_to_get
        if u not in all_urls
    )


Next time, I'll go into a bit of the mechanics behind futures, and show how to use that information to simplify our Pipeliner some.
Tags: ,