Home  >  Article  >  Backend Development  >  How to complete the tail command with Python

How to complete the tail command with Python

WBOY
WBOYforward
2023-05-08 21:04:151651browse

1. First Edition - Reading real-time data from the end of the file

The main idea is: open the file, move the pointer to the end of the file, then output the data if there is data, and sleep for a period of time if there is no data. .

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            f.seek(0, 2)  # 从文件结尾处开始seek
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

After that, just make the following call:

python xxx.py filename

2. Second version--implement tail -f

tail -fBy default, the last 10 lines of data are read first, and then the real-time data is read from the end of the file. For small files, you can read all the file contents first and output the last 10 lines. However, the performance of reading the full text and then getting the last 10 lines is not high, and the boundary conditions for rolling back 10 lines are also very complicated. Let’s look at the implementation of reading the full text first and then getting the last 10 lines:

import time
import sys

from typing import Callable, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self):
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f):
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()

You can see The implementation is very simple. Compared with the first version, there is only one more read_last_line function. Next, we have to solve the performance problem. When the file is very large, this logic will not work, especially for some Log files are often several gigabytes in size, and if they are all read out, the memory will burst. In the Linux system, there is no interface that can specify the pointer to jump to the last 10 lines. You can only use the following method to simulate the output of the last 10 lines:

  • First the cursor jumps to the latest character, saves the current cursor, and then estimates the character length of a row of data, preferably more. Here I process it based on the length of 1024 characters per row

  • Then use the seek method to jump to the characters of seek(-1024 * 10, 2). This is our estimated content within the last 10 lines

  • Then judge the content. If the length of the jumped characters is less than 10 * 1024, it proves that the entire file does not have 10 lines, and the original read_last_line method is used.

  • If the character length of the jump is equal to 1024 * 10, use the newline character to calculate how many lines the character length has been fetched. If the number of lines is greater than 10, only the last 10 lines will be output. If only 4 lines have been read, continue reading. 6*1024, until 10 lines are read

After the above steps, the data of the last 10 lines have been calculated and can be printed out, and you can enter the append data, but at this time The content of the file may have changed, and our cursor has also changed. At this time, the cursor must be jumped back to the cursor just saved to prevent missing data or repeated printing.

After the analysis is completed, you can start restarting Constructed the read_last_line function.

import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10):
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # 使用print都会每次都打印新的一行
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 跳转游标到最后
        file.seek(0, 2)
        # 获取当前结尾的游标位置
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            # 跳转到我们预估的字符位置
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

3. The third edition - elegantly reading the output log file

It can be found that the logical performance of real-time reading is still very good Poor, if the file is read once per second, the real-time performance will be too slow. If the interval is reduced, the processor will occupy too much. The best performance situation is if the file can be updated before printing the file, then the performance can be improved. It is guaranteed. Fortunately, inotify in Linux provides such a function. In addition, a feature of the log file is that it will be logrotated. If the log is logrotated, then we need to reopen the file. And further read the data, this situation can also be used inotify, when inotify gets the event that the file is reopened, we reopen the file and read again.

import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # 监控多个事件


class InotifyEventHandler(pyinotify.ProcessEvent):  # 定制化事件处理类,注意继承
    """
    执行inotify event的封装
    """
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs):
        """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""

        # 获取文件
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        # 只监控路径,这样就能知道文件是否移动
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self):
        """统一的输出方法"""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event):
        """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
        if event.pathname == self.filename:
            # 检测到文件被移动重新打开文件
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()


class Tail(object):
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ):
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # 创建WatchManager对象
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # 实例化我们定制化后的事件处理类, 采用**dict传参数
        wm.add_watch('/tmp', multi_event)  # 添加监控的目录,及事件
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # 在notifier实例化时传入,notifier会自动执行
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10):
        """通过inotify的with管理打开文件"""
        with self.inotify_event_handle as i:
            # 先读取指定的行数
            self.read_last_line(i.f, n)
            # 启用inotify的监听
            self.notifier.loop()

    def read_last_line(self, file, n):
        read_len: int = self.len_line * n

        # 获取当前结尾的游标位置
        file.seek(0, 2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                # 重新获取游标位置
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                # 如果获取的行数大于要求的行数,则获取前n行的行数
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        # 重置游标,确保接下来打印的数据不重复
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f", "--filename")
    parser.add_argument("-n", "--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))

You can see that the file was opened with inotify instead of open (at this time, the my_init method will be called for initialization). After opening, we still run the code that opened the original n lines, and then handed it over to inotify to run. . Before inotify runs, we mount the re-open file method and print file method in the events corresponding to inotifyy. After that, when inotify runs, the corresponding methods will be executed according to the corresponding events.

The above is the detailed content of How to complete the tail command with Python. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete