この記事では、中国の IT ベテラン向けに作成されたシンプルなデータベースを紹介します。これは、私たちが使用しているデータベースほど強力ではありませんが、学ぶ価値があります。特定の環境で使用でき、より柔軟で便利です。
データベースの名前は WawaDB で、Python で実装されています。これは、Python が非常に強力であることを示しています。
はじめに
ログの要件は一般的に次のとおりです:
変更ではなく追加のみ、書き込みは時系列順に書き込まれます。
大量の書き込み、少量の読み取り、およびクエリは一般にデータをクエリします。
MongoDB MongoDB の固定コレクションはこの要件を十分に満たしていますが、MongoDB は比較的大量のメモリを占有するため、モグラ塚を大騒ぎしているような気分になります。
WawaDB のアイデアは、1,000 個のログが書き込まれるたびに、現在の時刻とログ ファイルのオフセットをインデックス ファイルに記録することです。
次に、時間ごとにログをクエリする場合は、まずインデックスをメモリにロードし、二分法を使用して時点のオフセットを見つけてから、ログ ファイルを開いて指定された場所をシークします。これにより、データがユーザーに提供されます。ログ ファイル全体を走査することなく、ニーズをすばやく見つけて読み取ることができます。
パフォーマンス
Core 2 P8400、2.26GHZ、2Gメモリ、32ビットwin7
書き込みテスト:
1分間に10,000個のデータの書き込みをシミュレート、合計5時間のデータを書き込み、300万個のデータを挿入、各データは 54 文字で、所要時間は 2 分 51 秒です
読み取りテスト: 指定された期間内に特定の部分文字列を含むログを読み取ります
データ範囲走査データ量結果数値時間 (秒)
5時間 300万 604 6.6
2時間 120万 225 2.7
1時間 600,096 1.3
30分 300,044 0.6
インデックス
で、おそらくログに記録された時間をインデックスするだけです。実装, 二分探索は明らかに B ツリーほど効率的ではありませんが、一般的には一桁劣ることはなく、実装は非常に簡単です。
これはスパースインデックスであるため、すべてのログにそのオフセットを記録するインデックスがあるわけではないため、データを読み取るときは、読み取りミスを防ぐためにさらにデータを前方に読み取り、実際に必要なデータを読み取るまで待つ必要があります。ユーザー。
以下に示すように、たとえば、ユーザーが 25 から 43 までのログを読みたい場合、二分法を使用して 25 を見つけ、30 が位置する点を見つけます。
インデックス: 0 10 20 30 40 50 Log : |......|.......|..........|.......|....... ...|>>>a = [0 , 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>> >bisect.bisect_left(a, 43)>> >5>>>a[5]>>50
したがって、少し先に進んで 20 からログを読み始める必要があります (21 を読み取った後)。 、22、23、24は25 Smallより大きいため、25、26、27、...を読み取った後、ユーザーに戻ります
40(前のスケールの50)を読み取った後、それはです。現在のデータが 43 より大きいかどうかを判断する必要があります。43 より大きい場合 (全開間隔でデータを返す)、読み取りを停止する必要があります。
全体として、私たちは大きなファイルのほんの一部を操作して、ユーザーが必要とするデータを取得しただけです。
バッファ
ログを書き込む際の大量のディスク書き込みを減らすために、追加ログのインデックス作成時のバッファは 10k に設定されます。システムのデフォルトは 4k です。
同様に、ログの読み取り効率を向上させるために、読み取りバッファーも 10k に設定されており、ログのサイズに応じて適切に調整する必要があります。
インデックスの読み取りと書き込みは行バッファーに設定されており、不完全なインデックス行が読み取られるのを防ぐために、すべての完全な行をディスクにフラッシュする必要があります (実際、行バッファーを設定しても、行バッファーの半分は-描画された行は引き続き読み取ることができます)。
お問い合わせ
何ですか? SQL をサポートするには、問題を起こすのはやめてください。100 行のコードで SQL をサポートできるでしょうか?
これで、システムが指定された時間範囲内でデータ行を走査すると、ユーザーのラムダ条件が満たされた場合にのみクエリがユーザーに返されます。
もちろん、これはユーザーが必要としない大量のデータを読み取ることになり、各行をラムダ式で計算する必要がありますが、それは仕方がありません、シンプルさは美しいです。
以前は、クエリが必要な条件、ログ時刻、ログファイルのオフセットをインデックスに記録し、インデックスから条件を満たすオフセットを見つけて、各データをログ ファイル内で 1 回検索され、1 回読み取られます。これには、読み取られるデータ量が少ないという利点が 1 つだけありますが、欠点が 2 つあります:
インデックス ファイルが非常に大きく、メモリにロードするのが不便です
毎回最初にシークする必要があります。バッファは使用されていないようで、非常に遅く、データのセグメントを連続的に読み取ってラムダフィルタリングを使用するよりも4〜5倍遅くなります
書き込み
データであり、各ログ行の先頭はタイムスタンプです。
マルチスレッド
クエリデータは複数のスレッドで同時にクエリできるため、複数の並列読み取りが競合することはありません。
書き込みに関しては、単なる追加操作ではありますが、複数のスレッドでファイルを追加しても安全かどうかは確認されていないため、キューと書き込み専用のスレッドを使用することをお勧めします。
鍵
鍵はありません。
並べ替え
デフォルトでは、クエリされたデータは時系列順に並べられます。他の並べ替えが必要な場合は、メモリに取得した後、Python のsorted 関数を使用して任意に並べ替えることができます。
100行を超えるデータベースコード
# -*- coding:utf-8 -*- import os import time import bisect import itertools from datetime import datetime import logging default_data_dir = './data/' default_write_buffer_size = 1024*10 default_read_buffer_size = 1024*10 default_index_interval = 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def __init__(self, index_name): self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1) self.indexes, self.offsets, self.index_count = [], [], 0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for line in self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError: # 索引如果没有flush的话,可能读到有半行的数据 pass def append_index(self, key, offset): self.index_count += 1 if self.index_count % default_index_interval == 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left = bisect.bisect_left(self.indexes, str(begin_key)) right = bisect.bisect_left(self.indexes, str(end_key)) left, right = left - 1, right - 1 if left < 0: left = 0 if right < 0: right = 0 if right > len(self.indexes) - 1: right = len(self.indexes) - 1 logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left], self.offsets[right] class WawaDB: def __init__(self, db_name): self.db_name = db_name self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size) self.index = WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line = fp_data.readline() find_real_begin_offset = False will_read_len, read_len = int(end_offset) - int(begin_offset), 0 while line: read_len += len(line) if (not find_real_begin_offset) and (line < str(begin_key)): line = fp_data.readline() continue find_real_begin_offset = True if (read_len >= will_read_len) and (line > str(end_key)): break yield line.rstrip('\r\n') line = fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data, basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time = time.mktime(record_time.timetuple()) data = '%s %s %s' % (record_time, data, os.linesep) offset = self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset = self.index.get_offsets(begin_time, end_time) for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetime import datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args, **kargs): start_time = datetime.now() result = f(*args, **kargs) print '%s take time:%s' % (test_name, (datetime.now() - start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now = datetime.now() begin_time = now - timedelta(hours=5) while begin_time < now: print begin_time for i in range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time += timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time = datetime.now() - timedelta(hours=3) end_time = begin_time + timedelta(minutes=120) results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db = get_db() gen_test_data(db) #db.index.fp_index.flush() db = get_db() test_get_data(db) init() if __name__ == '__main__': test()