Python で tail コマンドを完了する方法

WBOY
リリース: 2023-05-08 21:04:15
転載
1699 人が閲覧しました

1. 初版 - ファイルの末尾からリアルタイム データを読み取る

主なアイデアは、ファイルを開き、ファイルの末尾にポインタを移動し、データがあれば出力するというものです。 .

import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: f.seek(0, 2) # 从文件结尾处开始seek while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
ログイン後にコピー

その後、次の呼び出しを行うだけです:

python xxx.py filename

2. 2 番目のバージョン -- tail -f# の実装

##tail -fデフォルトでは、データの最後の 10 行が最初に読み取られ、次にリアルタイム データが読み取られます。ファイルの最後から読み込みます。小さいファイルの場合は、最初にファイルの内容をすべて読み取って、最後の 10 行を出力することもできます。ただし、全文を読み取ってから最後の 10 行を取得するパフォーマンスは高くなく、限界があります。 10 行をロールバックする条件も非常に複雑です。最初に全文を読み取ってから最後の 10 行を取得する実装を見てみましょう:

import time import sys from typing import Callable, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval def __call__(self): with open(self.file_name) as f: self.read_last_line(f) while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, f): last_lines = f.readlines()[-10:] for line in last_lines: self.output(line) if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
ログイン後にコピー

実装が非常にシンプルであることがわかります。このバージョンでは、

read_last_line 関数が 1 つだけあります。次に、パフォーマンスの問題を解決する必要があります。ファイルが非常に大きい場合、特に一部のログ ファイルのサイズが数ギガバイトになる場合、このロジックは機能しません。 Linux システムでは、最後の 10 行にジャンプするためのポインタを指定できるインターフェイスがありません。最後の 10 行の出力をシミュレートするには、次のメソッドを使用するしかありません。 10 行:

  • 最初にカーソルが最新の文字にジャンプし、現在のカーソルを保存してから、データ行の文字長 (できればそれ以上) を推定します。ここでは、次の値に基づいて処理します。 1 行あたり 1024 文字の長さ

  • 次に、seek メソッドを使用して、seek(-1024 * 10, 2) の文字にジャンプします。これは、最後の 10 行内の推定コンテンツです。

  • 次に内容を判断し、ジャンプした文字の長さが 10 * 1024 未満であれば、ファイル全体が 10 行に満たないことが証明され、元の

    read_last_lineメソッドを使用します。

  • ジャンプの文字長が 1024 * 10 に等しい場合、改行文字を使用して、文字長が何行取得されたかを計算します。行数が 10 より大きい場合、最後の 10 行のみが出力されます。4 行しか読み取られていない場合は、読み続けます。6*1024、10 行が読み取られるまで

上記の手順を実行すると、最後の 10 行のデータが計算され、印刷できるようになり、追加データを入力できるようになりますが、この時点でファイルの内容が変更されている可能性があり、カーソルも変更されています。今回は、データの欠落や印刷の繰り返しを防ぐために、カーソルを保存したばかりのカーソルに戻す必要があります。

分析が完了したら、再開を開始できます。

read_last_line関数を構築しました。

import time import sys from typing import Callable, List, NoReturn class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line def __call__(self, n: int = 10): with open(self.file_name) as f: self.read_last_line(f, n) while True: line: str = f.readline() if line: self.output(line) # 使用print都会每次都打印新的一行 else: time.sleep(self.interval) def read_last_line(self, file, n): read_len: int = self.len_line * n # 跳转游标到最后 file.seek(0, 2) # 获取当前结尾的游标位置 now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来 file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标 last_line_list: List[str] = file.read().split('\n')[-n:] # 重新获取游标位置 now_tell: int = file.tell() break # 跳转到我们预估的字符位置 file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果获取的行数大于要求的行数,则获取前n行的行数 last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游标,确保接下来打印的数据不重复 file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
ログイン後にコピー

3. 第 3 版 - 出力ログ ファイルのエレガントな読み取り

ファイルが 1 回あたり 1 回読み取られる場合、リアルタイム読み取りの論理パフォーマンスは依然として非常に優れていることがわかります。 2 番目に、リアルタイムのパフォーマンスが遅すぎます。間隔を短くすると、プロセッサが過剰に占有されます。ファイルを印刷する前にファイルを更新できれば、パフォーマンスが向上するのが最良のパフォーマンス状況です。幸いなことに、Linux の

inotifyにはそのような機能が用意されています。さらに、ログ ファイルの機能として、ログ ファイルがログローテーションされることもあります。ログがログローテーションされている場合は、ファイルを再度開く必要があります。データを読み取ります。この状況はinotifyにも使用できます。inotifyがファイルが再度開かれたというイベントを取得すると、ファイルを再度開き、再度読み取ります。

import os import sys from typing import Callable, List, NoReturn import pyinotify multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # 监控多个事件 class InotifyEventHandler(pyinotify.ProcessEvent): # 定制化事件处理类,注意继承 """ 执行inotify event的封装 """ f: 'open()' filename: str path: str wm: 'pyinotify.WatchManager' output: Callable def my_init(self, **kargs): """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化""" # 获取文件 filename: str = kargs.pop('filename') if not os.path.exists(filename): raise RuntimeError('Not Found filename') if '/' not in filename: filename = os.getcwd() + '/' + filename index = filename.rfind('/') if index == len(filename) - 1 or index == -1: raise RuntimeError('Not a legal path') self.f = None self.filename = filename self.output: Callable = kargs.pop('output') self.wm = kargs.pop('wm') # 只监控路径,这样就能知道文件是否移动 self.path = filename[:index] self.wm.add_watch(self.path, multi_event) def read_line(self): """统一的输出方法""" for line in self.f.readlines(): self.output(line) def process_IN_MODIFY(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取""" if event.pathname == self.filename: self.read_line() def process_IN_MOVE_SELF(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取""" if event.pathname == self.filename: # 检测到文件被移动重新打开文件 self.f.close() self.f = open(self.filename) self.read_line() def __enter__(self) -> 'InotifyEventHandler': self.f = open(self.filename) return self def __exit__(self, exc_type, exc_val, exc_tb): self.f.close() class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line wm = pyinotify.WatchManager() # 创建WatchManager对象 inotify_event_handler = InotifyEventHandler( **dict(filename=file_name, wm=wm, output=output) ) # 实例化我们定制化后的事件处理类, 采用**dict传参数 wm.add_watch('/tmp', multi_event) # 添加监控的目录,及事件 self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # 在notifier实例化时传入,notifier会自动执行 self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler def __call__(self, n: int = 10): """通过inotify的with管理打开文件""" with self.inotify_event_handle as i: # 先读取指定的行数 self.read_last_line(i.f, n) # 启用inotify的监听 self.notifier.loop() def read_last_line(self, file, n): read_len: int = self.len_line * n # 获取当前结尾的游标位置 file.seek(0, 2) now_tell: int = file.tell() while True: if read_len > file.tell(): # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来 file.seek(0) last_line_list: List[str] = file.read().split('\n')[-n:] # 重新获取游标位置 now_tell: int = file.tell() break file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: # 如果获取的行数大于要求的行数,则获取前n行的行数 last_line_list: List[str] = read_str.split('\n')[-n:] break else: # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取 if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n for line in last_line_list: self.output(line + '\n') # 重置游标,确保接下来打印的数据不重复 file.seek(now_tell) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
ログイン後にコピー
ファイルが open ではなく inotify で開かれたことがわかります (このとき、初期化のために my_init メソッドが呼び出されます)。開いた後も、元の n 行を開いたコードを実行し、それを inotify に渡します。 inotify を実行する前に、inotifyy に対応するイベントにファイルの再オープン メソッドとファイルの印刷メソッドをマウントします。その後、inotify が実行されると、対応するイベントに従って、対応するメソッドが実行されます。

以上がPython で tail コマンドを完了する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!