La colonne
Tutoriel vidéo Python décrit la méthode d'envoi asynchrone de journaux à un serveur distant en Python.
La façon la plus courante d'utiliser les journaux en python est de générer des journaux dans la console et les fichiers. Le module de journalisation fournit également les classes correspondantes. à utiliser, mais parfois nous pouvons avoir certains besoins, comme devoir envoyer des journaux à l'extrémité distante ou écrire directement dans la base de données
# -*- coding: utf-8 -*-""" ------------------------------------------------- File Name: loger Description : Author : yangyanxing date: 2020/9/23 ------------------------------------------------- """import loggingimport sysimport os# 初始化loggerlogger = logging.getLogger("yyx") logger.setLevel(logging.DEBUG)# 设置日志格式fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S')# 添加cmd handlercmd_handler = logging.StreamHandler(sys.stdout) cmd_handler.setLevel(logging.DEBUG) cmd_handler.setFormatter(fmt)# 添加文件的handlerlogpath = os.path.join(os.getcwd(), 'debug.log') file_handler = logging.FileHandler(logpath) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(fmt)# 将cmd和file handler添加到logger中logger.addHandler(cmd_handler) logger.addHandler(file_handler) logger.debug("今天天气不错")复制代码
sera imprimé dans cmd et écrit dans le fichier debug.log dans le répertoire actuel [2020-09-23 10:45:56] [DEBUG] 今天天气不错
. Dans la bibliothèque standard python logging.handler, de nombreux gestionnaires ont été définis pour nous, dont certains peuvent être utilisés directement. Utilisez tornado localement. imprimer tous les paramètres reçus HTTPHandler
# 添加一个httphandlerimport logging.handlers http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get') http_handler.setLevel(logging.DEBUG) http_handler.setFormatter(fmt) logger.addHandler(http_handler) logger.debug("今天天气不错")复制代码
{ 'name': [b 'yyx'], 'msg': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'], 'args': [b '()'], 'levelname': [b 'DEBUG'], 'levelno': [b '10'], 'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'], 'filename': [b 'loger.py'], 'module': [b 'loger'], 'exc_info': [b 'None'], 'exc_text': [b 'None'], 'stack_info': [b 'None'], 'lineno': [b '41'], 'funcName': [b '<module>'], 'created': [b '1600831054.8881223'], 'msecs': [b '888.1223201751709'], 'relativeCreated': [b '22.99976348876953'], 'thread': [b '14876'], 'threadName': [b 'MainThread'], 'processName': [b 'MainProcess'], 'process': [b '8648'], 'message': [b '\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'], 'asctime': [b '2020-09-23 11:17:34'] }复制代码
[2020-09-23 10:45:56] [DEBUG] 今天天气不错
envoie simplement toutes les informations du journal au serveur Quant à la façon dont le serveur organise le contenu, cela dépend du service. nous pouvons avoir deux méthodes. La première consiste à modifier le code du serveur et à réorganiser le contenu du journal en fonction des informations transmises. La seconde consiste à réécrire une classe afin qu'elle soit réorganisée lors de l'envoi du contenu du journal. .logging.handlers.HTTPHandler
cette classe et réécrire une classe httpHandler logging.handlers.HTTPHandler
class CustomHandler(logging.Handler): def __init__(self, host, uri, method="POST"): logging.Handler.__init__(self) self.url = "%s/%s" % (host, uri) method = method.upper() if method not in ["GET", "POST"]: raise ValueError("method must be GET or POST") self.method = method def emit(self, record): ''' :param record: :return: ''' msg = self.format(record) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) requests.get(url, timeout=1) else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } requests.post(self.url, data={'log': msg}, headers=headers, timeout=1)复制代码
msg = self.format(record)
[2020-09-23 11:43:50] [DEBUG] 今天天气不错
3. 🎜>
Nous examinons maintenant un problème. Lorsque le journal est envoyé au serveur distant, si le serveur distant le traite très lentement, cela prendra un certain temps, puis l'enregistrement du journal ralentiraModifiez la classe de traitement des journaux du serveur, laissez-la faire une pause de 5 secondes pour simuler un long processus de traitement
async def post(self): print(self.getParam('log')) await asyncio.sleep(5) self.write({"msg": 'ok'})复制代码
À ce moment, nous imprimons le journal ci-dessus
logger.debug("今天天气不错") logger.debug("是风和日丽的")复制代码
et le résultat obtenu est
[2020-09-23 11:47:33] [DEBUG] 今天天气不错 [2020-09-23 11:47:38] [DEBUG] 是风和日丽的复制代码
Nous avons remarqué que leur intervalle de temps est également de 5 secondes.
3.1 Utiliser le multi-threading
La première chose à penser est d'utiliser plusieurs threads pour exécuter la méthode d'envoi de journaux
def emit(self, record): msg = self.format(record) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) t = threading.Thread(target=requests.get, args=(url,)) t.start() else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } t = threading.Thread(target=requests.post, args=(self.url,), kwargs={"data":{'log': msg}, "headers":headers}) t.start()复制代码
3.2 Utiliser des pools de threads pour traiter
Il existe des classes ThreadPoolExecutor et ProcessPoolExecutor dans concurrent.futures de python, qui sont des pools de threads et des pools de processus. Ils sont utilisés en premier. lors de l'initialisation. Définissez plusieurs threads, puis laissez ces threads gérer les fonctions correspondantes, afin que vous n'ayez pas besoin de créer de nouveaux threads à chaque fois
Utilisation de base du pool de threads
exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程exector.submit(fn, args, kwargs) # 将函数submit到线程池中复制代码
S'il y a y a-t-il n threads dans le pool de threads, lorsque le nombre de tâches soumises est supérieur à n, les tâches en excès seront placées dans la file d'attente.
Modifiez à nouveau la fonction d'émission ci-dessus
exector = ThreadPoolExecutor(max_workers=1)def emit(self, record): msg = self.format(record) timeout = aiohttp.ClientTimeout(total=6) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) exector.submit(requests.get, url, timeout=6) else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } exector.submit(requests.post, self.url, data={'log': msg}, headers=headers, timeout=6)复制代码
3.3 Utilisez la bibliothèque asynchrone aiohttp pour envoyer des requêtes
La méthode submit de la classe CustomHandler ci-dessus utilise request.post pour envoyer des journaux. Les requêtes elles-mêmes sont bloquées et exécutées, c'est pourquoi son existence. rend le script bloqué pendant une longue période, nous pouvons donc remplacer la bibliothèque de requêtes de blocage par aiohttp asynchrone pour exécuter les méthodes get et post, et réécrire la méthode d'émission dans un CustomHandler
class CustomHandler(logging.Handler): def __init__(self, host, uri, method="POST"): logging.Handler.__init__(self) self.url = "%s/%s" % (host, uri) method = method.upper() if method not in ["GET", "POST"]: raise ValueError("method must be GET or POST") self.method = method async def emit(self, record): msg = self.format(record) timeout = aiohttp.ClientTimeout(total=6) if self.method == "GET": if (self.url.find("?") >= 0): sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg})) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(self.url) as resp: print(await resp.text()) else: headers = { "Content-type": "application/x-www-form-urlencoded", "Content-length": str(len(msg)) } async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: async with session.post(self.url, data={'log': msg}) as resp: print(await resp.text())复制代码
C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine 'CustomHandler.emit' was never awaited self.emit(record) RuntimeWarning: Enable tracemalloc to get the object allocation traceback复制代码
服务端也没有收到发送日志的请求。
究其原因是由于emit方法中使用async with session.post
函数,它需要在一个使用async 修饰的函数里执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个coroutine
对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在哪里调用 await emit() ,所以崩溃信息中显示coroutine 'CustomHandler.emit' was never awaited
.
既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行
async def main(): await logger.debug("今天天气不错") await logger.debug("是风和日丽的") loop = asyncio.get_event_loop() loop.run_until_complete(main())复制代码
执行依然报错
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '复制代码
意思是需要的是一个coroutine,但是传进来的对象不是。
这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方.
解决办法是有的,我们使用 asyncio.get_event_loop()
获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程, 我们通过一个小例子来看一下
import asyncio async def test(n): while n > 0: await asyncio.sleep(1) print("test {}".format(n)) n -= 1 return n async def test2(n): while n >0: await asyncio.sleep(1) print("test2 {}".format(n)) n -= 1def stoploop(task): print("执行结束, task n is {}".format(task.result())) loop.stop() loop = asyncio.get_event_loop() task = loop.create_task(test(5)) task2 = loop.create_task(test2(3)) task.add_done_callback(stoploop) task2 = loop.create_task(test2(3)) loop.run_forever()复制代码
我们使用loop = asyncio.get_event_loop()
创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉.
注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上,然后调用该循环的run_forever()
函数,从而使该循环上的协程对象得以正常的执行.
上面得到的输出为
test 5 test2 3 test 4 test2 2 test 3 test2 1 test 2 test 1 执行结束, task n is 0复制代码
可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了.
如果不执行loop.run_forever()
函数,则注册在它上面的协程也不会执行
loop = asyncio.get_event_loop() task = loop.create_task(test(5)) task.add_done_callback(stoploop) task2 = loop.create_task(test2(3)) time.sleep(5)# loop.run_forever()复制代码
上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒以后就中止了.
回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever() .
loop = asyncio.get_event_loop()class CustomHandler(logging.Handler): def __init__(self, host, uri, method="POST"): logging.Handler.__init__(self) self.url = "%s/%s" % (host, uri) method = method.upper() if method not in ["GET", "POST"]: raise ValueError("method must be GET or POST") self.method = method # 使用aiohttp封装发送数据函数 async def submit(self, data): timeout = aiohttp.ClientTimeout(total=6) if self.method == "GET": if self.url.find("?") >= 0: sep = '&' else: sep = '?' url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": data})) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as resp: print(await resp.text()) else: headers = { "Content-type": "application/x-www-form-urlencoded", } async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: async with session.post(self.url, data={'log': data}) as resp: print(await resp.text()) return True def emit(self, record): msg = self.format(record) loop.create_task(self.submit(msg))# 添加一个httphandlerhttp_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get') http_handler.setLevel(logging.DEBUG) http_handler.setFormatter(fmt) logger.addHandler(http_handler) logger.debug("今天天气不错") logger.debug("是风和日丽的") loop.run_forever()复制代码
这时脚本就可以正常的异步执行了.
loop.create_task(self.submit(msg))
也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop)
来代替,目的都是将协程对象注册到事件循环中.
但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用loop.stop()
方法. 可以注册到某个task的回调中.
相关免费学习推荐:python视频教程
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!