Während des Projektanforderungstestprozesses müssen Sie einige Anwendungsfallanforderungen an den Nginx-Server senden und dann das entsprechende Nginx-Protokoll überprüfen, um festzustellen, ob charakteristischer Inhalt zu ermitteln ist ob die Aufgabe erfolgreich ausgeführt wurde. Um die Effizienz zu verbessern, muss dieser Prozess automatisiert werden.
Python 3.6.5
Codedesign und -implementierung
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @CreateTime: 2021/06/26 9:05 @Author : shouke """ import time import threading import subprocess from collections import deque def collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = "tail -0f /usr/local/openresty/nginx/logs/access.log" while task_status != "req_log_got": with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = "" outs, errs = "", "" try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print("获取nginx日志超时,正在重试") proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print("获取nginx日志超时,再次超时,停止重试") break finally: for line in outs.split(" "): flag = ""client_ip":"10.118.0.77"" # 特征 if flag in line: # 查找包含特征内容的日志 log_for_req += line if task_status == "req_finished": nginx_log_queue.append(log_for_req) task_status = "req_log_got" def run_tasks(task_list): """ 运行任务 :param task_list 任务列表 """ global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 执行任务前,让收集日志线程先做好准备 print("正在执行任务:%s" % task.get("name")) # 执行Nginx任务请求 # ... task_status = "req_finished" time_to_wait = 0.1 while task_status != "req_log_got": # 请求触发的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 获取到用例请求触发的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = "req_ready" # 解析日志 # do something here # ... else: print("存储请求日志的队列为空") # do something here # ... if __name__ == "__main__": nginx_log_queue = deque() is_tasks_compete = False # 所有任务是否执行完成 task_status = "req_ready" # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态 print("###########################任务开始###########################") tast_list = [{"name":"test_task", "other":"..."}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print("###########################任务完成###########################")
Hinweis:
# 🎜🎜#1. Warum werden die oben genannten Codes nicht in einem Schritt und direkttail -0f /usr/local/openresty/nginx/logs/access.log | ausgefüllt? Dies liegt daran, dass das Nginx-Protokoll auf diese Weise nicht abgerufen werden kann. <p><code>tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"
呢?这是因为这样做无法获取到Nginx的日志
2、实践时发现,第一次执行proc.communicate(timeout=2)
获取日志时,总是无法获取,会超时,需要二次获取,并且timeout
设置太小时(实践时尝试过设置为1秒
2 In der Praxis wurde festgestellt, dass proc.communicate(timeout=2)
zum ersten Mal ausgeführt wird Es dauert immer, bis das Protokoll abgerufen werden kann. Es kommt zu einer Zeitüberschreitung und eine zweite Erfassung ist erforderlich. Wenn der timeout
zu klein eingestellt ist (in der Praxis habe ich versucht, ihn auf 1 Sekunde
Das obige ist der detaillierte Inhalt vonSo erhalten Sie das Nginx-Protokoll entsprechend der Aufgabenanforderung in Python in Echtzeit. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!