프로젝트 요구사항 테스트 프로세스 중에 일부 사용 사례 요청을 Nginx 서버에 보낸 다음 해당 Nginx 로그를 확인하여 작업이 성공적으로 실행되었는지 확인하는 특징적인 콘텐츠가 있는지 확인해야 합니다. 효율성을 높이려면 이 프로세스를 자동화해야 합니다.
Python 3.6.5
코드 설계 및 구현
#!/usr/bin/env python # -*- coding:utf-8 -*- """ @CreateTime: 2021/06/26 9:05 @Author : shouke """ import time import threading import subprocess from collections import deque def collect_nginx_log(): global nginx_log_queue global is_tasks_compete global task_status args = "tail -0f /usr/local/openresty/nginx/logs/access.log" while task_status != "req_log_got": with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc: log_for_req = "" outs, errs = "", "" try: outs, errs = proc.communicate(timeout=2) except subprocess.TimeoutExpired: print("获取nginx日志超时,正在重试") proc.kill() try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: print("获取nginx日志超时,再次超时,停止重试") break finally: for line in outs.split(" "): flag = ""client_ip":"10.118.0.77"" # 特征 if flag in line: # 查找包含特征内容的日志 log_for_req += line if task_status == "req_finished": nginx_log_queue.append(log_for_req) task_status = "req_log_got" def run_tasks(task_list): """ 运行任务 :param task_list 任务列表 """ global nginx_log_queue global is_tasks_compete global task_status for task in task_list: thread = threading.Thread(target=collect_nginx_log, name="collect_nginx_log") thread.start() time.sleep(1) # 执行任务前,让收集日志线程先做好准备 print("正在执行任务:%s" % task.get("name")) # 执行Nginx任务请求 # ... task_status = "req_finished" time_to_wait = 0.1 while task_status != "req_log_got": # 请求触发的nginx日志收集未完成 time.sleep(time_to_wait) time_to_wait += 0.01 else:# 获取到用例请求触发的nginx日志 if nginx_log_queue: nginx_log = nginx_log_queue.popleft() task_status = "req_ready" # 解析日志 # do something here # ... else: print("存储请求日志的队列为空") # do something here # ... if __name__ == "__main__": nginx_log_queue = deque() is_tasks_compete = False # 所有任务是否执行完成 task_status = "req_ready" # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态 print("###########################任务开始###########################") tast_list = [{"name":"test_task", "other":"..."}] run_tasks(tast_list) is_tasks_compete = True current_active_thread_num = len(threading.enumerate()) while current_active_thread_num != 1: time.sleep(2) current_active_thread_num = len(threading.enumerate()) print("###########################任务完成###########################")
참고:
1. 위 코드가 한 단계로 직접 tail -0f /usr/local/openresty/ 완료되지 않는 이유는 무엇입니까? nginx/logs /access.log | grep "기능 콘텐츠"
? 이렇게 하면 Nginx 로그를 얻을 수 없기 때문입니다tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"
呢?这是因为这样做无法获取到Nginx的日志
2、实践时发现,第一次执行proc.communicate(timeout=2)
获取日志时,总是无法获取,会超时,需要二次获取,并且timeout
设置太小时(实践时尝试过设置为1秒
proc.communicate(timeout=2)
를 처음 실행하면, 항상 획득이 불가능하고 시간 초과됩니다. 두 번째 획득이 필요하며 timeout
설정이 너무 작습니다(실제로는 1초
로 설정해 보았습니다.) ), 이로 인해 두 번째 실행 중에 Nginx 로그를 얻지 못하는 결과도 발생합니다. 🎜위 내용은 Python에서 작업 요청에 해당하는 Nginx 로그를 실시간으로 가져오는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!