Śledząc portale i blogi o treści programistycznej, trudno nie zauważyć
wzmożonego ostatnio zainteresowania "nowymi"
asynchronicznymi
frameworkami
napisanymi w Pythonie. Osobiście staram się jednak nie podniecać każdą
nowością i aby się przekonać do biblioteki, potrzebuję czegoś więcej niż
10 linijek kodu, wyświetlających Hello world. Okazało się, że nie
ja
jedyny.
Od jakiegoś czasu czytam o twisted i w miarę możliwości staram się napisać trochę działającego kodu. Niestety, oprócz tutoriala w którym opisane zostały podstawy, do dyspozycji jest jeszcze tylko dokumentacja API.
Ponieważ sam dużo czasu spędziłem na gromadzeniu informacji, postaram się w miarę przystępnie opisać podstawy twisted.
Reaktor
Ponieważ twisted jest asynchronicznym frameworkiem, posiada własny obiekt odpowiedzialny za zarządzanie główną pętlą programu i wywoływanie obsługi zdarzeń:
>>> from twisted.internet import reactor
Oprócz podstawowej wersji, można wybrać spośród wielu innych implementacji.
Aby uruchomić reactor, należy wywołać na nim metodę run().
>>> reactor.run()
Deferred
Podstawowym "obiektem zadaniowym" jest w twisted Deferred. Dzięki niemu można zakolejkować i powiązać w łańcuch dowolną ilość zadań.
>>> from twisted.internet.defer import Deferred
Programując asynchronicznie używamy jednego procesu (przynajmniej na początku), który wykonuje kolejne zadania nadsyłane przez reaktor. Każde z tych zadań dostaje na swoje wykonanie tyle czasu ile potrzebuje, dlatego długo wykonujące się funkcje mogą skutecznie zamrozić aplikacje. Jeśli jednak opóźnienie spowodowane jest oczekiwaniem na jakieś zdarzenie, a nie przez czasochłonne obliczenia, problem można rozwiązać tworząc łańcuch małych porcji instrukcji.
Callback
Callback traktować można jako filtr z efektami ubocznymi. W najprostszym przypadku będzie to funkcja zwracająca przyjmowany argument:
>>> transparent_callback = lambda arg: arg
Filtry (ze względu na brak lepszej nazwy, tak będę nazywał te obiekty) łączyć można w łańcuchy wywołań. Wynik działania każdego z nich przekazywany jest jako argument wywołania kolejnego:
>>> from twisted.internet.defer import Deferred >>> >>> def double(x): ... return x * 2 ... >>> def inc(x): ... return x + 1 ... >>> d = Deferred() >>> d.addCallback(double) <Deferred at 0x860be4c> >>> d.addCallback(inc) <Deferred at 0x860be4c> >>> >>> d.callback(3) >>> d.result 7
Errback
Ponieważ działanie filtru może zakończyć się błędem, obiekt typu Deferred
pozwala na zdefiniowanie oddzielnej listy instrukcji do ich obsługi:
>>> def error_handler(error): ... print error ... >>> def fail(data): ... raise Exception('Fail!') ... >>> d = Deferred() >>> d.addCallback(fail).addErrback(error_handler) <Deferred at 0x863bd0c> >>> >>> d.callback(None) [Failure instance: Traceback: <type 'exceptions.Exception'>: Fail! /usr/lib/python2.6/site-packages/IPython/iplib.py:2257:runcode <ipython console>:1:<module> /usr/lib/python2.6/site-packages/twisted/internet/defer.py:243:callback /usr/lib/python2.6/site-packages/twisted/internet/defer.py:312:_startRunCallbacks --- <exception caught here> --- /usr/lib/python2.6/site-packages/twisted/internet/defer.py:328:_runCallbacks <ipython console>:2:fail ]
DeferredList
Oprócz grupowania filtrów, utworzyć można obiekt czekający na zakończenie
ustalonego zbioru instancji Deferred.
>>> from twisted.internet import defer >>> >>> def printer(data): ... print data ... >>> d1 = defer.Deferred() >>> d1.addCallback(lambda x: x * 2).addCallback(lambda x: x + 1) <Deferred at 0x93efd6c> >>> d2 = defer.Deferred() >>> d2.addCallback(lambda x: x / 2).addCallback(lambda x: x - 1) <Deferred at 0x93efdcc> >>> >>> dl = defer.DeferredList([d1, d2]) >>> dl.addCallback(printer) <DeferredList at 0x93f468c> >>> >>> d1.callback(2) >>> d2.callback(4) [(True, 5), (True, 1)]
Przykłady
Będąc na tym etapie znajomości twisted zastanawiałem się, jaki sens ma
używanie Deferred skoro wystarczy użyć zwykłego time.sleep() aby
skutecznie zablokować na jakiś czas aplikację.
>>> import time >>> >>> def check(deferred, data): ... start_time = time.time() ... d.callback(data) ... print 'work time:', time.time() - start_time ... >>> def sleeper(sleep_time): ... time.sleep(sleep_time) ... return sleep_time ... >>> d = defer.Deferred() >>> d.addCallback(sleeper).addCallback(sleeper) <Deferred at 0x943c28c> >>> check(d, 2) work time: 4.00436711311
Gdybym użył procesów i dostosował funkcję check, zamiast 4 sekund wszystko
trwałoby niewiele ponad 2. Po co więc programować asynchronicznie? Najlepiej
wytłumaczę to chyba kolejnym fragmentem kodu z wykorzystaniem funkcji
getPage:
>>> import time >>> from twisted.web.client import getPage >>> from twisted.internet import defer >>> from twisted.internet import reactor >>> >>> URLS = ['http://distrowatch.com/', 'http://www.vim.org/'] >>> >>> def pull_page(page): ... print 'done' ... >>> >>> def end_work(result): ... reactor.stop() ... >>> >>> deferreds = [getPage(url) for url in URLS] >>> >>> for d in deferreds: ... d.addCallback(pull_page) ... <Deferred at 0x90c79ec> <Deferred at 0x90c7c8c> >>> >>> dl = defer.DeferredList(deferreds) >>> dl.addCallback(end_work) <DeferredList at 0x90c7fcc> >>> >>> start_time = time.time() >>> reactor.run() done done >>> print 'work time:', time.time() - start_time ('work time:', 1.927487850189209)
Aby pobrać dwie strony, używam getPage które zwraca obiekt Deferred. Do
każdego obiektu z listy deferreds podpinam własny callback który wyświetli
napis done. Dzięki temu wiem jak szybko pobrana została każda ze stron. Na
koniec tworzę DeferredList, który po zakończeniu wszystkich zadań wyłączy
reaktor. Czas działania skryptu - 2 sekundy.
A co jeśli zwiększę ilości stron do pobrania z 2 do 40? Gdybym napisał to w pętli przy użyciu urllib.urlopen, operacja trwałaby 20 razy dłużej. W przypadku twisted nie jest to takie oczywiste:
>>> URLS = URLS * 20 ... >>> print 'work time:', time.time() - start_time ('work time:', 24.748102903366089) >>> 24.748102903366089 / 1.927487850189209 12.839563632495389