> 백엔드 개발 > 파이썬 튜토리얼 > Python에서 비동기적으로 원격 서버에 로그를 보내는 방법

Python에서 비동기적으로 원격 서버에 로그를 보내는 방법

WBOY
풀어 주다: 2023-05-11 10:31:05
앞으로
1333명이 탐색했습니다.

StreamHandler 및 FileHandler

먼저 cmd 및 파일로 출력할 간단한 코드 세트를 작성해 보겠습니다.

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")
로그인 후 복사

먼저 로거를 초기화하고 로그 수준을 DEBUG로 설정한 다음 cmd_handler 및 file_handler를 초기화하고 마지막으로 추가 로거에 넣고 스크립트를 실행하면 cmd에 출력됩니다

[2020-09-23 10:45:56] [DEBUG] 오늘 날씨가 좋습니다 에 출력됩니다 현재 디렉터리의 debug.log 파일에 HTTPHandler를 추가합니다. 기록 시 원격 서버에 로그를 보내려면 Python 표준 라이브러리인 login.handler에 많은 핸들러가 정의되어 있습니다. 일부는 직접 사용할 수 있고, 토네이도를 로컬에서 사용하여 로그 수신을 위한 인터페이스를 작성하고, 수신된 모든 매개변수를 인쇄할 수 있습니다[2020-09-23 10:45:56] [DEBUG] 今天天气不错且会写入到当前目录下的debug.log文件中

添加HTTPHandler

如果想要在记录时将日志发送到远程服务器上,可以添加一个 HTTPHandler , 在python标准库logging.handler中,已经为我们定义好了很多handler,有些我们可以直接用,本地使用tornado写一个接收 日志的接口,将接收到的参数全都打印出来

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
&#39;funcName&#39;: [b &#39;<module>&#39;],
&#39;created&#39;: [b &#39;1600831054.8881223&#39;],
&#39;msecs&#39;: [b &#39;888.1223201751709&#39;],
&#39;relativeCreated&#39;: [b &#39;22.99976348876953&#39;],
&#39;thread&#39;: [b &#39;14876&#39;],
&#39;threadName&#39;: [b &#39;MainThread&#39;],
&#39;processName&#39;: [b &#39;MainProcess&#39;],
&#39;process&#39;: [b &#39;8648&#39;],
&#39;message&#39;: [b
&#39;\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;],
&#39;asctime&#39;: [b &#39;2020-09-23 11:17:34&#39;]
}
로그인 후 복사

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于

[2020-09-23 10:45:56][DEBUG] 今天天气不错

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    &#39;&#39;&#39;
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   &#39;&#39;&#39;
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={&#39;log&#39;: msg}, headers=headers,
timeout=1)
로그인 후 복사

정보가 많다고 할 수 있지만 우리가 원하는 것은 아닙니다. 우리는

[2020-09-23 10:45:56][DEBUG] The Weather is good today와 유사한 로그를 원합니다.

logging.handlers.HTTPHandler는 단순히 모든 로그 정보를 서버가 콘텐츠를 정리하는 방법은 서버에서 수행하므로 두 가지 방법이 있는데, 하나는 서버 코드를 변경하고 전달된 로그 정보를 기반으로 로그 내용을 재구성하는 것입니다. . 클래스를 사용하여 전송할 때 로그 내용을 서버에 다시 형식화할 수 있습니다.

두 번째 방법을 사용하는 이유는 이 방법이 더 유연하기 때문입니다. 서버는 녹음에만 사용되며 어떤 콘텐츠를 보낼지는 클라이언트가 결정해야 합니다.

클래스를 재정의해야 합니다.logging.handlers.HTTPHandler 클래스를 참조하고 httpHandler 클래스를 다시 작성할 수 있습니다.

각 로그 클래스는 로그를 기록할 때 실제로 이 내보내기 메서드를 재정의해야 합니다. 실행 방법:

{&#39;log&#39;: [b&#39;[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99&#39;]}
로그인 후 복사

위 코드에는 전송된 매개변수 msg = self.format(record)을 정의하는 줄이 있습니다. 이 코드 줄은 해당 내용이 로그에 설정된 형식에 따라 반환된다는 의미입니다. 물체.

그런 다음 요청 라이브러리를 통해 콘텐츠를 보냅니다. get 또는 post 방법을 사용하더라도 서버는 정상적으로 로그를 수신할 수 있습니다

async def post(self):
  print(self.getParam(&#39;log&#39;))
  await asyncio.sleep(5)
  self.write({"msg": &#39;ok&#39;})
로그인 후 복사

바이트 유형을 get으로 변환합니다.

[2020-09- 23 11: 43:50] [디버그] 오늘 날씨가 좋다

원격 로그를 비동기적으로 전송

이제 문제를 생각해 보겠습니다. 로그가 원격 서버로 전송될 때 원격 서버에서 매우 느리게 처리하면 문제가 발생합니다. 시간이 많이 소모됩니다. 일정 시간이 지나면 로그 기록 속도가 느려집니다. 서버 로그 처리 클래스를 수정하고 5초 동안 일시 중지하여 긴 처리 프로세스를 시뮬레이션합니다.

logger.debug("今天天气不错")
logger.debug("是风和日丽的")
로그인 후 복사

이때 인쇄해 보겠습니다. 위 로그:

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{&#39;log&#39;: msg},
로그인 후 복사

얻은 출력은 다음과 같습니다.


[2020-09-23 11:47:33] [DEBUG] 오늘 날씨가 좋습니다

[2020-09-23 11:47:38 ] [DEBUG] 날씨가 맑고 맑습니다

시간 간격도 5초인 것을 확인했습니다.

이제 문제가 발생합니다. 원래는 단순한 로그였지만 이제는 전체 스크립트를 끌어내리는 부담이 되었기 때문에 원격 로그 작성을 비동기적으로 처리해야 합니다.

1 멀티 스레드 처리 사용

먼저 생각해야 할 것은 멀티 스레드를 사용하여 로그 전송 방법을 실행하는 것입니다.

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中
로그인 후 복사

이 방법은 차단하지 않는다는 주요 목적을 달성할 수 있지만 매번 로그가 인쇄되면 스레드를 열어야 하는데 이는 리소스 낭비이기도 합니다.

2 스레드 풀을 사용하여

처리할 수도 있습니다. Python의 Concurrent.futures에는 스레드 풀과 프로세스 풀인 ThreadPoolExecutor 및 ProcessPoolExecutor 클래스가 있습니다. 이들은 초기화 중에 먼저 여러 스레드를 정의한 다음입니다. 이 스레드가 해당 기능을 처리하도록 하여 매번 새 스레드를 생성할 필요가 없도록 하세요

스레드 풀의 기본 사용법:

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = &#39;&&#39;
    else:
      sep = &#39;?&#39;
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={&#39;log&#39;: msg},
headers=headers, timeout=6)
로그인 후 복사

스레드 풀에 n개의 스레드가 있는 경우 숫자가 제출된 작업 수가 n보다 크면 중복된 작업이 대기열에 배치됩니다.

위의 내보내기 기능을 다시 수정하세요

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = &#39;&&#39;
      else:
        sep = &#39;?&#39;
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={&#39;log&#39;: msg}) as resp:
          print(await resp.text())
로그인 후 복사

여기서 스레드 풀을 하나만 초기화하는 이유는 무엇입니까? 이렇게 하면 스레드가 여러 개 있는 경우 고급 대기열의 로그가 먼저 전송된다는 것을 보장할 수 있습니다. 풀에서는 반드시 주문이 보장되는 것은 아닙니다.

3 비동기 aiohttp 라이브러리를 사용하여 요청 보내기

위 CustomHandler 클래스의 내보내기 메서드는 요청 자체를 차단하고 실행 중이며 스크립트가 중단되는 것은 바로 요청입니다. . 오랜 시간이 지나면 차단 요청 라이브러리를 비동기식 aiohttp로 대체하여 get 및 post 메서드를 실행하고 CustomHandler

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
&#39;CustomHandler.emit&#39; was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
로그인 후 복사

에서 내보내기 메서드를 다시 작성할 수 있습니다. 이때 코드 실행이 중단됩니다.

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
로그인 후 복사
로그인 후 복사
🎜 서버가 로그 전송 요청을 받지 못했습니다. 🎜이유는 Emit 메소드에 async with session.post 함수를 사용하기 때문에 async로 수정된 함수에서 실행해야 하므로 Emit 함수를 수정하고 async로 수정하기 때문입니다. 함수와 반환값은 코루틴 개체입니다. 코루틴 개체를 실행하려면 wait를 사용해야 하지만, 스크립트 어디에서도 wait 방출()이 호출되지 않으므로 충돌 정보에 코루틴 'CustomHandler.emit'이 대기된 적이 없다고 표시됩니다. 🎜

既然emit方法返回的是一个coroutine对象,那么我们将它放一个loop中执行

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
로그인 후 복사
로그인 후 복사

执行依然报错:

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一个coroutine,但是传进来的对象不是。
这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

解决办法是有的,我们使用 asyncio.get_event_loop() 获取一个事件循环对象, 我们可以在这个对象上注册很多协程对象,这样当执行事件循环的时候,就是去执行注册在该事件循环上的协程,

我们通过一个小例子来看一下:

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()
로그인 후 복사

我们使用 loop = asyncio.get_event_loop() 创建了一个事件循环对象loop, 并且在loop上创建了两个task, 并且给task1添加了一个回调函数,在task1它执行结束以后,将loop停掉。
注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

上面得到的输出为:

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()
로그인 후 복사

上面的代码将loop.run_forever() 注释掉,换成time.sleep(5) 停5秒, 这时脚本不会有任何输出,在停了5秒 以后就中止了,
回到之前的日志发送远程服务器的代码,我们可以使用aiohttp封装一个发送数据的函数, 然后在emit中将 这个函数注册到全局的事件循环对象loop中,最后再执行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()
로그인 후 복사

这时脚本就可以正常的异步执行了:

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 来代替,目的都是将协程对象注册到事件循环中。

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

위 내용은 Python에서 비동기적으로 원격 서버에 로그를 보내는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:yisu.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
최신 이슈
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿