Home >Operation and Maintenance >Nginx >How does Python obtain the Nginx log corresponding to the task request in real time?
During the project requirement testing process, you need to send some use case requests to the Nginx server, and then check the corresponding Nginx log to determine whether there is characteristic content to determine whether the task is successfully executed. In order to improve efficiency, this process needs to be automated.
Python 3.6.5
Code design and implementation
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @CreateTime: 2021/06/26 9:05 @Author : shouke """ import time import threading import subprocess from collections import deque def collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = "tail -0f /usr/local/openresty/nginx/logs/access.log" while task_status != "req_log_got": with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = "" outs, errs = "", "" try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print("获取nginx日志超时,正在重试") proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print("获取nginx日志超时,再次超时,停止重试") break finally: for line in outs.split(" "): flag = ""client_ip":"10.118.0.77"" # 特征 if flag in line: # 查找包含特征内容的日志 log_for_req += line if task_status == "req_finished": nginx_log_queue.append(log_for_req) task_status = "req_log_got" def run_tasks(task_list): """ 运行任务 :param task_list 任务列表 """ global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 执行任务前,让收集日志线程先做好准备 print("正在执行任务:%s" % task.get("name")) # 执行Nginx任务请求 # ... task_status = "req_finished" time_to_wait = 0.1 while task_status != "req_log_got": # 请求触发的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 获取到用例请求触发的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = "req_ready" # 解析日志 # do something here # ... else: print("存储请求日志的队列为空") # do something here # ... if __name__ == "__main__": nginx_log_queue = deque() is_tasks_compete = False # 所有任务是否执行完成 task_status = "req_ready" # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态 print("###########################任务开始###########################") tast_list = [{"name":"test_task", "other":"..."}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print("###########################任务完成###########################")
Note:
1. Why is the above code not In one step, directly tail -0f /usr/local/openresty/nginx/logs/access.log | grep "Characteristic content"
? This is because the Nginx log cannot be obtained by doing this
2. In practice, it was found that when proc.communicate(timeout=2)
was executed for the first time to obtain the log, it was always impossible to obtain it. It will time out and require a second acquisition, and the timeout
is set too small (in practice, I tried setting it to 1 second
), which will also cause the Nginx log to be unable to be obtained during the second execution.
The above is the detailed content of How does Python obtain the Nginx log corresponding to the task request in real time?. For more information, please follow other related articles on the PHP Chinese website!