Twisted - wprowadzenie: reactor i deferred

Śledząc portale i blogi o treści programistycznej, trudno nie zauważyć wzmożonego ostatnio zainteresowania "nowymi" asynchronicznymi frameworkami napisanymi w Pythonie. Osobiście staram się jednak nie podniecać każdą nowością i aby się przekonać do biblioteki, potrzebuję czegoś więcej niż 10 linijek kodu, wyświetlających Hello world. Okazało się, że nie ja jedyny.

Od jakiegoś czasu czytam o twisted i w miarę możliwości staram się napisać trochę działającego kodu. Niestety, oprócz tutoriala w którym opisane zostały podstawy, do dyspozycji jest jeszcze tylko dokumentacja API.

Ponieważ sam dużo czasu spędziłem na gromadzeniu informacji, postaram się w miarę przystępnie opisać podstawy twisted.

Reaktor

Ponieważ twisted jest asynchronicznym frameworkiem, posiada własny obiekt odpowiedzialny za zarządzanie główną pętlą programu i wywoływanie obsługi zdarzeń:

>>> from twisted.internet import reactor

Oprócz podstawowej wersji, można wybrać spośród wielu innych implementacji.

Aby uruchomić reactor, należy wywołać na nim metodę run().

>>> reactor.run()

Deferred

Podstawowym "obiektem zadaniowym" jest w twisted Deferred. Dzięki niemu można zakolejkować i powiązać w łańcuch dowolną ilość zadań.

>>> from twisted.internet.defer import Deferred

Programując asynchronicznie używamy jednego procesu (przynajmniej na początku), który wykonuje kolejne zadania nadsyłane przez reaktor. Każde z tych zadań dostaje na swoje wykonanie tyle czasu ile potrzebuje, dlatego długo wykonujące się funkcje mogą skutecznie zamrozić aplikacje. Jeśli jednak opóźnienie spowodowane jest oczekiwaniem na jakieś zdarzenie, a nie przez czasochłonne obliczenia, problem można rozwiązać tworząc łańcuch małych porcji instrukcji.

Callback

Callback traktować można jako filtr z efektami ubocznymi. W najprostszym przypadku będzie to funkcja zwracająca przyjmowany argument:

>>> transparent_callback = lambda arg: arg

Filtry (ze względu na brak lepszej nazwy, tak będę nazywał te obiekty) łączyć można w łańcuchy wywołań. Wynik działania każdego z nich przekazywany jest jako argument wywołania kolejnego:

>>> from twisted.internet.defer import Deferred
>>>
>>> def double(x):
 ...    return x * 2
 ...
>>> def inc(x):
 ...    return x + 1
 ...
>>> d = Deferred()
>>> d.addCallback(double)
<Deferred at 0x860be4c>
>>> d.addCallback(inc)
<Deferred at 0x860be4c>
>>>
>>> d.callback(3)
>>> d.result
7

Errback

Ponieważ działanie filtru może zakończyć się błędem, obiekt typu Deferred pozwala na zdefiniowanie oddzielnej listy instrukcji do ich obsługi:

>>> def error_handler(error):
 ...    print error
 ...
>>> def fail(data):
 ...    raise Exception('Fail!')
 ...
>>> d = Deferred()
>>> d.addCallback(fail).addErrback(error_handler)
<Deferred at 0x863bd0c>
>>>
>>> d.callback(None)
[Failure instance: Traceback: <type 'exceptions.Exception'>: Fail!
/usr/lib/python2.6/site-packages/IPython/iplib.py:2257:runcode
<ipython console>:1:<module>
/usr/lib/python2.6/site-packages/twisted/internet/defer.py:243:callback
/usr/lib/python2.6/site-packages/twisted/internet/defer.py:312:_startRunCallbacks
--- <exception caught here> ---
/usr/lib/python2.6/site-packages/twisted/internet/defer.py:328:_runCallbacks
<ipython console>:2:fail
]

DeferredList

Oprócz grupowania filtrów, utworzyć można obiekt czekający na zakończenie ustalonego zbioru instancji Deferred.

>>> from twisted.internet import defer 
>>> 
>>> def printer(data):
...     print data
... 
>>> d1 = defer.Deferred()
>>> d1.addCallback(lambda x: x * 2).addCallback(lambda x: x + 1)
<Deferred at 0x93efd6c>
>>> d2 = defer.Deferred()
>>> d2.addCallback(lambda x: x / 2).addCallback(lambda x: x - 1)
<Deferred at 0x93efdcc>
>>>
>>> dl = defer.DeferredList([d1, d2])
>>> dl.addCallback(printer)
<DeferredList at 0x93f468c>
>>>
>>> d1.callback(2)
>>> d2.callback(4)
[(True, 5), (True, 1)]

Przykłady

Będąc na tym etapie znajomości twisted zastanawiałem się, jaki sens ma używanie Deferred skoro wystarczy użyć zwykłego time.sleep() aby skutecznie zablokować na jakiś czas aplikację.

>>> import time
>>>
>>> def check(deferred, data):
...     start_time = time.time()
...     d.callback(data)
...     print 'work time:', time.time() - start_time
...     
>>> def sleeper(sleep_time):
...     time.sleep(sleep_time)
...     return sleep_time
... 
>>> d = defer.Deferred()
>>> d.addCallback(sleeper).addCallback(sleeper)
<Deferred at 0x943c28c>
>>> check(d, 2)
work time: 4.00436711311

Gdybym użył procesów i dostosował funkcję check, zamiast 4 sekund wszystko trwałoby niewiele ponad 2. Po co więc programować asynchronicznie? Najlepiej wytłumaczę to chyba kolejnym fragmentem kodu z wykorzystaniem funkcji getPage:

>>> import time
>>> from twisted.web.client import getPage
>>> from twisted.internet import defer
>>> from twisted.internet import reactor
>>> 
>>> URLS = ['http://distrowatch.com/', 'http://www.vim.org/']
>>> 
>>> def pull_page(page):
...     print 'done'
... 
>>> 
>>> def end_work(result):
...     reactor.stop()
... 
>>> 
>>> deferreds = [getPage(url) for url in URLS]
>>> 
>>> for d in deferreds:
...     d.addCallback(pull_page)
... 
<Deferred at 0x90c79ec>
<Deferred at 0x90c7c8c>
>>> 
>>> dl = defer.DeferredList(deferreds)
>>> dl.addCallback(end_work)
<DeferredList at 0x90c7fcc>
>>> 
>>> start_time = time.time()
>>> reactor.run()
done
done
>>> print 'work time:', time.time() - start_time
('work time:', 1.927487850189209)

Aby pobrać dwie strony, używam getPage które zwraca obiekt Deferred. Do każdego obiektu z listy deferreds podpinam własny callback który wyświetli napis done. Dzięki temu wiem jak szybko pobrana została każda ze stron. Na koniec tworzę DeferredList, który po zakończeniu wszystkich zadań wyłączy reaktor. Czas działania skryptu - 2 sekundy.

A co jeśli zwiększę ilości stron do pobrania z 2 do 40? Gdybym napisał to w pętli przy użyciu urllib.urlopen, operacja trwałaby 20 razy dłużej. W przypadku twisted nie jest to takie oczywiste:

>>> URLS = URLS * 20
...
>>> print 'work time:', time.time() - start_time
('work time:', 24.748102903366089)
>>> 24.748102903366089 / 1.927487850189209
12.839563632495389