Andy Friesen (thespeedbump) wrote,

Introduction to asynchronous programming using imvu.task

It is my distinct pleasure to announce that IMVU has open sourced its asynchronous event library, imvu.task. You can find it on our SourceForge.net page.

The best way to summarize what imvu.task does is to say that it provides an efficient way to do useful work while waiting for things. This may seem a bit simplistic, but if you think about it, any application that deals with the web (whether it runs on the desktop, in a browser or whatever else) is tasked with maintaining responsiveness in the face of a potentially very slow, unreliable Internet.

The easiest way to illustrate this is with an example: Suppose we want to download an HTML page, plus all of the images, CSS, and Javascript it directly links to.

A naïve implementation might look like this:
import sys
import urllib2
import re

extensions_we_want = ['gif', 'png', 'jpg', 'jpeg', 'css', 'js', 'html']

def stripExtension(s):
    m = re.match('.*\.([a-zA-Z0-9]+)$', s)
    if m and m.groups(0):
        return m.groups(0)[0]

def extractUrls(content):
    return re.findall(r'''href=[\'"]([^'"]*)['"]''', content)

def filenameForUrl(url):
    return url[url.rfind('/') + 1:]

try:
    (url,) = sys.argv[1:]
except ValueError:
    print 'Syntax: %s url_to_get' % sys.argv[0]
    sys.exit()

urls_to_get = [url]

index = 0
while index < len(urls_to_get):
    url = urls_to_get[index]
    index += 1

    print >> sys.stderr, 'Fetching', url
    content = urllib2.urlopen(url).read()
    file(filenameForUrl(url), 'wt').write(content)

    urls_to_get.extend(
        url for url in extractUrls(content)
        if stripExtension(url) in extensions_we_want
        and url not in urls_to_get
    )
There is a fair amount of boilerplate involved, but it's nothing too terrifying.

The problem, however, is that our application is spending the entire time blocked on I/O. If we sent out multiple requests in parallel, it would go much faster. Let's try that:
import time
import re
import sys
import threading
import urllib2

extensions_we_want = ['gif', 'png', 'jpg', 'jpeg', 'css', 'js', 'html']

def stripExtension(s):
    m = re.match('.*\.([a-zA-Z0-9]+)$', s)
    if m and m.groups(0):
        return m.groups(0)[0]

def extractUrls(content):
    return re.findall(r'''href=[\'"]([^'"]*)['"]''', content)

def filenameForUrl(url):
    return url[url.rfind('/') + 1:]

try:
    (url,) = sys.argv[1:]
except ValueError:
    print 'Syntax: %s url_to_get' % sys.argv[0]
    sys.exit()

all_urls = []
threads = []

def getUrl(url, onComplete):
    def work():
        print >> sys.stderr, 'Fetching', url
        content = urllib2.urlopen(url).read()
        onComplete(content)

    t = threading.Thread(target=work)
    threads.append(t)
    t.start()

def doLotsOfStuff(url):
    def complete(content):
        file(filenameForUrl(url), 'wt').write(content)
        urls_to_get = [
            u for u in extractUrls(content)
            if stripExtension(u) in extensions_we_want
        ]

        for u in urls_to_get:
            if u not in all_urls:
                all_urls.append(u)
                doLotsOfStuff(u)

    getUrl(url, complete)

doLotsOfStuff(url)

while threads:
    t = threads.pop()
    t.join()
This version runs substantially faster because it can request many resources at the same time. Awesome! There's just two small problems:
  1. This version is much more complex, requiring complex callback chains and threads.
  2. Since it uses threads, and since I whipped this up in about a half-hour, I'm pretty sure it has at least one crippling race condition.


Let's look at the same program restructured to use imvu.task:
import time
import re
import sys
import threading
import urllib2

from imvu.task import EventPump, TaskScheduler
from imvu.task import RunInParallel
from imvu.task import task, threadtask

extensions_we_want = ['gif', 'png', 'jpg', 'jpeg', 'css', 'js', 'html']
all_urls = []

def stripExtension(s):
    m = re.match('.*\.([a-zA-Z0-9]+)$', s)
    if m and m.groups(0):
        return m.groups(0)[0]

def extractUrls(content):
    return re.findall(r'''href=[\'"]([^'"]*)['"]''', content)

def filenameForUrl(url):
    return url[url.rfind('/') + 1:]

@threadtask
def getUrl(url):
    print 'Fetching', url
    return urllib2.urlopen(url).read()

@task
def doLotsOfStuff(url):
    all_urls.append(url)

    content = yield getUrl(url)

    file(filenameForUrl(url), 'wt').write(content)

    urls_to_get = [
        u for u in extractUrls(content)
        if stripExtension(u) in extensions_we_want
    ]

    yield RunInParallel(
        doLotsOfStuff(u)
        for u in urls_to_get
        if u not in all_urls
    )

try:
    (url,) = sys.argv[1:]
except ValueError:
    print 'Syntax: %s url_to_get' % sys.argv[0]
    sys.exit()

eventPump = EventPump()
taskScheduler = TaskScheduler(eventPump, time.time)

taskScheduler._call(doLotsOfStuff(url))
There are a bunch of interesting things going on in this version:

For starters, we introduce the concept of a "task." A task is essentially a function always runs on the main thread, but is allowed to be blocked on other tasks.

Secondly, we have a "threadtask." Threadtasks are actually not tasks (we are probably going to rename them). Threadtasks run off the main thread, but (and here's the wonderful part) tasks can be blocked on the result of a threadtask.

At runtime, what happens is that our doLotsOfStuff task calls on getUrl() and "yields" (or blocks) on its result. This is essentially syntax sugar for dispatching a job to a threadpool, and suspending the calling task until that job completes. When the result comes back, doLotsOfStuff recieves the result, and (on the main thread!) proceeds to parse it.

That being done, something else interesting happens: we use imvu.task.RunInParallel to spin off many tasks at the same time. This is where it starts to get awesome. Tasks always run on the main thread, so TaskScheduler will pick the first task off the queue and run it. In our case, though, the task blocks on a threadtask right away. Since that call will take some time to complete, the task is suspended, and another is picked off the work queue. In this way, our program is asynchronously spinning off many simultaneous web requests.

Notice that we also yield on the result of RunInParallel. This means that doLotsOfStuff() will not finish running until all of its sub-requests also finish. This lets us join all of our parallel-running work-items with a simple taskScheduler._call.

That's all for now. Next time I will talk about how to use "futures" to implement request pipelining.

Part II: Futures
Tags: code, imvu
  • Post a new comment

    Error

    Comments allowed for friends only

    Anonymous comments are disabled in this journal

    default userpic
  • 0 comments