1. Reaktor
Der Kern von Twisted ist Reaktor, und Reaktor ist zwangsläufig synchron/asynchron, blockierend/nicht blockierend. In Daves konzeptioneller Einführung im ersten Kapitel gibt es eine kleine Grenze zwischen synchron/asynchron Fuzzy, bezüglich Synchronisation/Asynchronität, Blockierung/Nichtblockierung siehe Zhihu-Diskussion. Was Proactor (Aktivator) und Reactor (Reaktor) betrifft, finden Sie hier einen empfohlenen Blog mit einer ausführlicheren Einführung.
Was Netzwerk-IO im Reaktormodus betrifft, sollte es sich um synchrones IO und nicht um asynchrones IO handeln. Der in Daves erstem Kapitel erwähnte Kern der Asynchronität besteht darin, die Kontrolle über die Aufgabe explizit aufzugeben, anstatt vom Betriebssystem willkürlich gestoppt zu werden. Der Programmierer muss die Aufgabe in einer Reihenfolge organisieren, die sie in abwechselnden kleinen Schritten abschließt. Wenn also eine der Aufgaben die Ausgabe einer anderen Aufgabe verwendet, muss die abhängige Aufgabe (d. h. die Aufgabe, die die Ausgabe empfängt) so konzipiert sein, dass sie eine Reihe von Bits oder Fragmenten und nicht alle auf einmal empfängt.
Die explizite und proaktive Aufgabe der Kontrolle über eine Aufgabe ähnelt in gewisser Weise der Denkweise von Coroutinen. Reactor kann als Planer von Coroutinen angesehen werden. Der Reaktor ist eine Ereignisschleife, an der wir interessiert sind (z. B. dass der Socket lesbar/beschreibbar ist) und Prozessoren (z. B. die Ausführung von Lese- und Schreibvorgängen) beim Reaktor registriert werden Nach Abschluss der Prozessorausführung entspricht dies dem Hängenbleiben (Ertrag) der Coroutine, der Rückkehr zur Ereignisschleife des Reaktors, dem Warten auf das nächste Ereignis und dem Zurückrufen. Der Reaktor selbst verfügt über einen synchronen Ereignis-Demultiplexer, der durch Select/Epoll und andere Mechanismen implementiert werden kann. Natürlich basiert die Ereignisauslösung des Twisted-Reaktors nicht unbedingt auf IO, sondern kann auch durch andere Mechanismen wie Timer ausgelöst werden.
Twisted Reactor erfordert nicht, dass wir Ereignisse und Rückruffunktionen aktiv registrieren, sondern wird durch Polymorphismus implementiert (Erbung bestimmter Klassen, Implementierung der betreffenden Ereignisschnittstelle und anschließende Übergabe an Twisted Reactor). In Bezug auf Twisted Reactor sind einige Dinge zu beachten:
Twisted.internet.reactor ist ein Singleton-Modus und jedes Programm kann nur einen Reaktor haben.
Versuchen Sie, den Vorgang in der Reactor-Callback-Funktion so schnell wie möglich abzuschließen und nicht blockieren. Aufgaben und Reaktoren sind im Wesentlichen Single-Threaded-Callback-Code und verdrehter Code, der in einem bestimmten Callback-Funktionsblock ausgeführt wird es sei denn, „reactor.stop()“ wird explizit verwendet, aber im Allgemeinen bedeutet der Aufruf von „reactor.stop()“, dass die Anwendung beendet wird.
# 示例一 twisted底层API的使用 from twisted.internet import reacto from twisted.internet import main from twisted.internet.interfaces import IReadDescriptor import socket class MySocket(IReadDescriptor): def __init__(self, address): # 连接服务器 self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0) # tell the Twisted reactor to monitor this socket for reading reactor.addReader(self) # 接口: 告诉reactor 监听的套接字描述符 def fileno(self): try: return self.sock.fileno() except socket.error: return -1 # 接口: 在连接断开时的回调 def connectionLost(self, reason): self.sock.close() reactor.removeReader(self) # 当应用程序需要终止时 调用: # reactor.stop() # 接口: 当套接字描述符有数据可读时 def doRead(self): bytes = '' # 尽可能多的读取数据 while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST if not bytes: return main.CONNECTION_DONE else: # 在这里解析协议并处理数据 print bytes
Der Socket ist nicht blockierend, die Bedeutung des Reaktors geht verloren
Wir stellen die vom Reaktor benötigte Schnittstelle bereit, indem wir IReadDescriptor erben.
Fügen Sie den Socket zu „reactor.addReader“ hinzu Die Klasse wird dem Listening-Objekt des Reaktors hinzugefügt.
main.CONNECTION_LOST ist ein vordefinierter Wert von Twisted. Durch diese Werte können wir den nächsten Rückruf bis zu einem gewissen Grad steuern (ähnlich wie bei der Simulation eines Ereignisses).
Aber die MySocket-Klasse oben ist nicht gut genug. Die Hauptnachteile sind:
Wir müssen die Daten selbst lesen, anstatt dass das Framework sie für uns liest und Ausnahmen behandelt.
Netzwerk-IO und Datenverarbeitung werden miteinander vermischt und nicht getrennt
Drei .Twisted-Abstraktionen
Transporte: Netzwerkverbindungsschicht, nur für Netzwerkverbindung und Lesen/Schreiben verantwortlich Bytedaten
Protokolle: Protokollschicht, bedient geschäftsbezogene Netzwerkprotokolle, konvertiert den Bytestrom in von der Anwendung benötigte Daten
Protokollfabriken: Protokollfabrik, verantwortlich für die Erstellung von Protokollen, jede Netzwerkverbindung verfügt über ein Protokollobjekt (da Es ist notwendig, den Protokollanalysestatus zu speichern)
Diese Konzepte von Twisted sind dem Ranch-Netzwerk-Framework in Erlang sehr ähnlich. Das Ranch-Framework abstrahiert auch die Konzepte von Transporten und Protokollen, wenn eine neue Netzwerkverbindung besteht Erstellt Transporte und Protokolle, bei denen Protokolle vom Benutzer beim Starten der Ranch übergeben werden. Es handelt sich um ein Modul, das das Verhalten von ranch_protocol implementiert. Wenn Protokolle initialisiert werden, werden die der Verbindung entsprechenden Transporte empfangen, sodass wir Byte-Stream-Daten verarbeiten können in Protokollen, gemäß unserem Das Protokoll analysiert und verarbeitet die Daten. Gleichzeitig können Daten über Transporte gesendet werden (Ranch hat die Bytestream-Daten bereits für Sie gelesen).
Ähnlich wie Ranch erstellt Twisted auch Protokolle und leitet den Transport weiter, wenn eine neue Verbindung eintrifft. Wir müssen den Bytestream nur in der dataReceived(self, data)-Schnittstelle verarbeiten . Nur Daten. Derzeit kann Twisted als wirklich asynchron in Netzwerk-E/A angesehen werden. Es hilft uns, mit Netzwerk-E/A und möglichen Ausnahmen umzugehen, und trennt Netzwerk-E/A und Datenverarbeitung, indem es sie in Transporte und Protokolle abstrahiert, was die Effizienz des Programms verbessert und Robustheit.
# 示例二 twisted抽象的使用 from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory class MyProtocol(Protocol): # 接口: Protocols初始化时调用,并传入Transports # 另外 twisted会自动将Protocols的factory对象成员设为ProtocolsFactory实例的引用 # 如此就可以通过factory来与MyProtocolFactory交互 def makeConnection(self,trans): print 'make connection: get transport: ', trans print 'my factory is: ', self.factory # 接口: 有数据到达 def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getPeer()) # 接口: 连接断开 def connectionLost(self, reason): # 连接断开的处理 class MyProtocolFactory(ClientFactory): # 接口: 通过protocol类成员指出需要创建的Protocols protocol = PoetryProtocol # tell base class what proto to build def __init__(self, address): self.poetry_count = poetry_count self.poems = {} # task num -> poem # 接口: 在创建Protocols的回调 def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) # 在这里对proto做一些初始化.... return proto # 接口: 连接Server失败时的回调 def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() def main(address): factory = MyClientFactory(address) host, port = address # 连接服务端时传入ProtocolsFactory reactor.connectTCP(host, port, factory) reactor.run()
四. twisted Deferred
twisted Deferred对象用于解决这样的问题:有时候我们需要在ProtocolsFactory中嵌入自己的回调,以便Protocols中发生某个事件(如所有Protocols都处理完成)时,回调我们指定的函数(如TaskFinished)。如果我们自己来实现回调,需要处理几个问题:
如何区分回调的正确返回和错误返回?(我们在使用异步调用时,要尤其注意错误返回的重要性)
如果我们的正确返回和错误返回都需要执行一个公共函数(如关闭连接)呢?
如果保证该回调只被调用一次?
Deferred对象便用于解决这种问题,它提供两个回调链,分别对应于正确返回和错误返回,在正确返回或错误返回时,它会依次调用对应链中的函数,并且保证回调的唯一性。
d = Deferred() # 添加正确回调和错误回调 d.addCallbacks(your_ok_callback, your_err_callback) # 添加公共回调函数 d.addBoth(your_common_callback) # 正确返回 将依次调用 your_ok_callback(Res) -> common_callback(Res) d.callback(Res) # 错误返回 将依次调用 your_err_callback(Err) -> common_callback(Err) d.errback(Err) # 注意,对同一个Defered对象,只能返回一次,尝试多次返回将会报错
twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。
有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。
Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。
五.综合示例
下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8 #xiaorui.cc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread import os,sys from twisted.python import threadable; threadable.init(1) deferred =deferToThread.__get__ import time def todoprint_(result): print result def running(): "Prints a few dots on stdout while the reactor is running." # sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running) @deferred def sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '\nend sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我这里先做别的事情" print de print "go go end"
更多剖析Python的Twisted框架的核心特性相关文章请关注PHP中文网!