Maison > développement back-end > Tutoriel Python > Exemples de code pour écrire des programmes non bloquants à l'aide du framework Twisted de Python

Exemples de code pour écrire des programmes non bloquants à l'aide du framework Twisted de Python

高洛峰
Libérer: 2017-02-03 16:33:30
original
1307 Les gens l'ont consulté

Regardons d'abord un morceau de code :

# ~*~ Twisted - A Python tale ~*~
 
from time import sleep
 
# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
  # Our hosting company Threads Ltd. is bad. I start installation and...
  print "Start installation for", customer
  # ...then wait till the installation finishes successfully. It is
  # boring and I'm spending most of my time waiting while consuming
  # resources (memory and some CPU cycles). It's because the process
  # is *blocking*.
  sleep(3)
  print "All done for", customer
 
# I do this all day long for our customers
def developer_day(customers):
  for customer in customers:
    install_wordpress(customer)
 
developer_day(["Bill", "Elon", "Steve", "Mark"])
Copier après la connexion

Exécutez-le, le résultat est le suivant :

   
$ ./deferreds.py 1
Copier après la connexion
------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds
Copier après la connexion

C'est une séquence de code d’exécution. Pour quatre consommateurs, l'installation prend 3 secondes pour une personne, donc pour quatre personnes, cela prend 12 secondes. Ce n'est pas très satisfaisant, alors jetez un œil au deuxième exemple utilisant des fils de discussion :

import threading
 
# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
  # But we now have to synchronize... a.k.a. bureaucracy
  lock = threading.Lock()
  #
  def dev_day(id):
    print "Goodmorning from developer", id
    # Yuck - I hate locks...
    lock.acquire()
    while customers:
      customer = customers.pop(0)
      lock.release()
      # My Python is less readable
      install_wordpress(customer)
      lock.acquire()
    lock.release()
    print "Bye from developer", id
  # We go to work in the morning
  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
  [dev.start() for dev in devs]
  # We leave for the evening
  [dev.join() for dev in devs]
 
# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])
Copier après la connexion

Exécutez-le :

 $ ./deferreds.py 2
Copier après la connexion
------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds
Copier après la connexion

Cette fois, est un morceau de code exécuté en parallèle, à l'aide de 5 threads de travail. 15 consommateurs prenant 3 secondes chacun signifie un total de 45 secondes, mais utiliser 5 threads pour s'exécuter en parallèle ne prend qu'un total de 9 secondes. Ce code est un peu compliqué et une grande partie du code est utilisée pour gérer la concurrence plutôt que de se concentrer sur les algorithmes ou la logique métier. De plus, le résultat du programme semble très mitigé et difficile à lire. Même un code multithread simple est difficile à bien écrire, nous passons donc à Twisted :

# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!
 
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task
 
# Twisted has a slightly different approach
def schedule_install(customer):
  # They are calling us back when a Wordpress installation completes.
  # They connected the caller recognition system with our CRM and
  # we know exactly what a call is about and what has to be done next.
  #
  # We now design processes of what has to happen on certain events.
  def schedule_install_wordpress():
      def on_done():
        print "Callback: Finished installation for", customer
    print "Scheduling: Installation for", customer
    return task.deferLater(reactor, 3, on_done)
  #
  def all_done(_):
    print "All done for", customer
  #
  # For each customer, we schedule these processes on the CRM
  # and that
  # is all our chief-Twisted developer has to do
  d = schedule_install_wordpress()
  d.addCallback(all_done)
  #
  return d
 
# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  #
  # Here's what has to be done today
  work = [schedule_install(customer) for customer in customers]
  # Turn off the lights when done
  join = defer.DeferredList(work)
  join.addCallback(lambda _: reactor.stop())
  #
  print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
 
# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()
Copier après la connexion


Résultats en cours d'exécution :

------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds
Copier après la connexion


Cette fois, nous obtenons un code parfaitement exécuté et une sortie lisible sans utiliser de threads. Nous avons traité 15 consommateurs en parallèle, ce qui signifie que le temps d'exécution qui prenait initialement 45 secondes a été complété en 3 secondes. L'astuce est que nous remplaçons tous les appels bloquants à sleep() par les fonctions équivalentes task.deferLater() et de rappel dans Twisted. Comme le traitement s'effectue désormais ailleurs, nous pouvons servir 15 consommateurs simultanément sans aucune difficulté.
Les opérations de traitement mentionnées précédemment ont lieu ailleurs. Maintenant, pour expliquer, les opérations arithmétiques se produisent toujours dans le processeur, mais la vitesse de traitement du processeur est désormais très rapide par rapport aux opérations sur disque et réseau. Ainsi, transmettre des données au processeur ou envoyer des données du processeur vers la mémoire ou un autre processeur prend la plupart du temps. Nous utilisons des opérations non bloquantes pour gagner du temps dans ce domaine. Par exemple, task.deferLater() utilise une fonction de rappel qui sera activée lorsque les données auront été transférées.
Un autre point très important est les messages Goodmorning du développeur Twisted et Bye du développeur Twisted dans la sortie ! Ces deux informations ont déjà été imprimées au début de l'exécution du code. Si le code s’exécute si tôt à ce stade, quand notre application commence-t-elle réellement à s’exécuter ? La réponse est que pour une application Twisted (y compris Scrapy), elle est exécutée dans réacteur.run(). Avant d'appeler cette méthode, chaque chaîne différée pouvant être utilisée dans l'application doit être prête, puis la méthode réacteur.run() surveillera et activera la fonction de rappel.
Notez que l'une des règles principales de Reactor est que vous pouvez effectuer n'importe quelle opération à condition qu'elle soit suffisamment rapide et non bloquante.
D'accord maintenant, aucune partie du code n'est utilisée pour gérer plusieurs threads, mais ces fonctions de rappel semblent toujours un peu compliquées. Il peut être modifié comme ceci :

# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
  print "Scheduling: Installation for", customer
  yield task.deferLater(reactor, 3, lambda: None)
  print "Callback: Finished installation for", customer
  print "All done for", customer
 
def twisted_developer_day(customers):
  ... same as previously but using inline_install() instead of schedule_install()
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
Copier après la connexion


Le résultat d'exécution est le même que l'exemple précédent. La fonction de ce code est la même que celle de l'exemple précédent, mais il semble plus concis et clair. Le générateur inlineCallbacks peut utiliser certains mécanismes Python pour suspendre ou reprendre l'exécution de la fonction inline_install(). La fonction inline_install() devient un objet différé et s'exécute en parallèle pour chaque consommateur. Chaque fois qu'un rendement se produit, l'opération sera suspendue sur l'instance inline_install() actuelle jusqu'à ce que l'objet Deferred du rendement soit terminé, puis reprise.
La seule question maintenant est : et si nous n’avions pas seulement 15 consommateurs, mais, disons, 10 000 consommateurs ? Ce code lancera 10 000 séquences d'exécution simultanées (telles que des requêtes HTTP, des écritures de base de données, etc.). Il n’y a peut-être rien de mal à faire cela, mais cela peut aussi conduire à divers échecs. Dans les applications avec d'énormes requêtes simultanées, telles que Scrapy, nous devons souvent limiter le nombre de simultanéités à un niveau acceptable. Dans l'exemple suivant, nous utilisons task.Cooperator() pour compléter une telle fonction. Scrapy utilise également le même mécanisme dans son pipeline d'articles pour limiter le nombre de concurrence (c'est-à-dire le paramètre CONCURRENT_ITEMS) :

@defer.inlineCallbacks
def inline_install(customer):
  ... same as above
 
# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  work = (inline_install(customer) for customer in customers)
  #
  # We use the Cooperator mechanism to make the secretary not
  # service more than 5 customers simultaneously.
  coop = task.Cooperator()
  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
  #
  join.addCallback(lambda _: reactor.stop())
  print "Bye from Twisted developer!"
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
 
# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~
Copier après la connexion


Résultats en cours d'exécution :

$ ./deferreds.py 5
------ Running example 5 ------
Goodmorning from Twisted developer
Bye from Twisted developer!
Scheduling: Installation for Customer 0
...
Callback: Finished installation for Customer 4
All done for Customer 4
Scheduling: Installation for Customer 5
...
Callback: Finished installation for Customer 14
All done for Customer 14
* Elapsed time: 9.19 seconds
Copier après la connexion


Comme vous pouvez le voir dans le résultat ci-dessus, il semble y avoir 5 emplacements de traitement consommateur lorsque le programme est en cours d'exécution. Sauf si un emplacement est libéré, le traitement de la prochaine demande du consommateur ne commencera pas. Dans cet exemple, le temps de traitement est de 3 secondes, il semble donc qu'il soit traité par lots de 5. La performance finale est la même que celle de l'utilisation de threads, mais cette fois il n'y a qu'un seul thread, et le code est plus simple et plus facile à écrire du code correct.

PS : deferToThread permet aux fonctions de synchronisation d'être non bloquantes
wisted defer.Deferred (de twisted.internet import defer) peut renvoyer un objet différé.

Remarque : deferToThread est implémenté en utilisant threads , une utilisation excessive n'est pas recommandée La fonction est traitée dans un autre thread, principalement utilisée pour les opérations de lecture de bases de données/fichiers

..
 
# 代码片段
 
  def dataReceived(self, data):
    now = int(time.time())
 
    for ftype, data in self.fpcodec.feed(data):
      if ftype == 'oob':
        self.msg('OOB:', repr(data))
      elif ftype == 0x81: # 对服务器请求的心跳应答(这个是解析 防疲劳驾驶仪,发给gps上位机的,然后上位机发给服务器的)
        self.msg('FP.PONG:', repr(data))
      else:
        self.msg('TODO:', (ftype, data))
      d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)
      d.addCallback(self._doResult, extra)
Copier après la connexion


L'exemple complet voici pour votre référence

# -*- coding: utf-8 -*-
 
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
 
import functools
import time
 
# 耗时操作 这是一个同步阻塞函数
def mySleep(timeout):
  time.sleep(timeout)
 
  # 返回值相当于加进了callback里
  return 3
 
def say(result):
  print "耗时操作结束了, 并把它返回的结果给我了", result
 
# 用functools.partial包装一下, 传递参数进去
cb = functools.partial(mySleep, 3)
d = deferToThread(cb)
d.addCallback(say)
 
print "你还没有结束我就执行了, 哈哈"
 
reactor.run()
Copier après la connexion

更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal