In diesem Artikel wird eine einfache Datenbank vorgestellt, die für chinesische IT-Veteranen geschrieben wurde. Sie ist nicht so leistungsfähig wie die von uns verwendete Datenbank, aber es lohnt sich, daraus zu lernen. Es kann in bestimmten Umgebungen verwendet werden und ist flexibler und bequemer.
Der Name der Datenbank ist WawaDB, die in Python implementiert ist. Dies zeigt, dass Python sehr mächtig ist!
Einführung
Die Anforderungen für die Protokollierung lauten im Allgemeinen wie folgt:
Nur anhängen, nicht ändern, in chronologischer Reihenfolge schreiben; Bei einer kleinen Lesemenge fragt die Abfrage im Allgemeinen die Daten eines Zeitraums ab.
Die feste Sammlung von MongoDB kann diese Anforderung sehr gut erfüllen, aber MongoDB belegt relativ viel Speicher, was das Gefühl vermittelt, eine zu erstellen Aufregung aus einem Maulwurfshügel.
Die Idee von WawaDB besteht darin, jedes Mal, wenn 1000 Protokolle geschrieben werden, die aktuelle Zeit und den Offset der Protokolldatei in einer Indexdatei aufzuzeichnen.
Wenn Sie dann das Protokoll nach Zeit abfragen, laden Sie zuerst den Index in den Speicher, ermitteln Sie mit der Dichotomiemethode den Versatz des Zeitpunkts, öffnen Sie dann die Protokolldatei und suchen Sie nach dem angegebenen Speicherort Die vom Benutzer benötigten Daten können schnell lokalisiert und gelesen werden, ohne die gesamte Protokolldatei zu durchlaufen.
Leistung
Core 2 P8400, 2,26 GHz, 2G Speicher, 32 Bit Win7
Schreibtest:
Simulieren Sie das Schreiben von 10.000 Daten in 1 Minute Es wurden insgesamt 5 Stunden Daten geschrieben, 3 Millionen Datenelemente eingefügt, jedes Datenelement bestand aus 54 Zeichen und es dauerte 2 Minuten und 51 Sekunden
Lesetest: Lesen Sie bestimmte Protokolle, die eine bestimmte Teilzeichenfolge enthalten, im Zeitraum
Datenbereichsdurchlauf, Datenvolumen, Ergebniszählzeit (Sekunden)
5 Stunden 3 Millionen 604 6,6
2 Stunden 1,2 Millionen 225 2,7
1 Stunde 600.000 96 1,3
30 Minuten 300.000 44 0,6
Index
Indizieren Sie nur die im Protokoll aufgezeichnete Zeit, als In der Einleitung heißt es grob: In Bezug auf die Implementierung des Index ist die binäre Suche definitiv nicht so effizient wie B Tree, aber im Allgemeinen unterscheidet sie sich nicht um eine Größenordnung und die Implementierung ist sehr einfach.
Da es sich um einen spärlichen Index handelt, verfügt nicht jedes Protokoll über einen Index zum Aufzeichnen seines Offsets. Wenn Sie also Daten lesen, müssen Sie einige weitere Daten vorwärts lesen, um Lesefehler zu vermeiden, und warten, bis Sie gelesen haben, was Sie wirklich lesen Die Daten werden dann tatsächlich an den Benutzer zurückgegeben.
Wie unten gezeigt, möchte der Benutzer beispielsweise die Protokolle von 25 bis 43 lesen, die Dichotomiemethode verwenden, um 25 zu finden, und den Punkt finden, an dem sich 30 befindet,
Index : 0 10 20 30 40 50 Protokoll: |..........|..........|..........|....... ...|.......... |>>>>a = [0, 10, 20, 30, 40, 50]>>>>bisect.bisect_left(a, 35)>>>> 3>>>>a[3]>>>>30>>>>bisect_left(a, 43)>>>5>>>a[5]>>50
Also müssen wir Gehen Sie ein wenig vorwärts und beginnen Sie mit dem Lesen des Protokolls ab 20 (das Häkchen vor 30). 21, 22, 23 und 24 werden gelesen und weggeworfen, weil sie kleiner als 25 sind. Nach dem Lesen von 25, 26, 27,..., Sie werden an den Benutzer zurückgegeben
Bis 40 lesen (der vorherige Tick von 50) Dann muss festgestellt werden, ob die aktuellen Daten größer als 43 sind. Wenn sie größer als 43 sind (Rückgabedaten im vollständig geöffneten Zustand). Bereich), ist es notwendig, mit dem Lesen aufzuhören.
Insgesamt haben wir nur einen kleinen Teil der großen Datei bearbeitet, um die vom Benutzer gewünschten Daten zu erhalten.
Puffer
Um eine große Anzahl von Festplattenschreibvorgängen beim Schreiben von Protokollen zu reduzieren, wird der Puffer beim Anhängen des Protokolls auf 10 KB eingestellt. Der Systemstandard sollte 4 KB betragen.
Um die Effizienz beim Lesen von Protokollen zu verbessern, ist der Lesepuffer ebenfalls auf 10 KB eingestellt und muss entsprechend der Größe Ihres Protokolls angepasst werden.
Das Lesen und Schreiben des Index ist auf einen Zeilenpuffer eingestellt, und jede vollständige Zeile muss auf die Festplatte geleert werden, um zu verhindern, dass unvollständige Indexzeilen gelesen werden (tatsächlich hat die Praxis dies auch bei einer Zeile bewiesen). Puffer gesetzt, halbgelesene Zeilen können weiterhin gelesen werden) OK).
Abfrage
Was? Um SQL zu unterstützen, machen Sie keine Probleme mehr. Wie können 100 Codezeilen SQL unterstützen?
Jetzt wird die Abfrage direkt in einem Lambda-Ausdruck übergeben. Wenn das System die Datenzeilen innerhalb des angegebenen Zeitbereichs durchläuft, wird sie nur dann an den Benutzer zurückgegeben, wenn die Lambada-Bedingungen des Benutzers erfüllt sind.
Natürlich werden dadurch viele Daten gelesen, die der Benutzer nicht benötigt, und jede Zeile muss mit einem Lambda-Ausdruck berechnet werden, aber das geht nicht, Einfachheit ist schön.
In der Vergangenheit habe ich eine Bedingung, die abgefragt werden musste, die Protokollzeit und den Offset der Protokolldatei im Index aufgezeichnet, damit ich den Offset finden konnte, der die Bedingungen aus dem Index erfüllt, und dann jedes Stück davon Daten würden wie folgt aussehen: Einmal in der Protokolldatei suchen. Dies hat nur einen Vorteil: Die gelesene Datenmenge ist geringer, es gibt jedoch zwei Nachteile:
Die Indexdatei ist sehr groß und lässt sich nur schwer in den Speicher laden
Alle Wenn es gelesen wird, muss es zuerst gelesen werden. Es scheint, dass der Puffer nicht verwendet wird. Es ist sehr langsam, vier- oder fünfmal langsamer als das kontinuierliche Lesen eines Datensegments und das Filtern mit Lambda
Schreiben
Wie ich bereits sagte, hängen Sie die Daten nur an, ändern Sie sie nicht, und am Anfang jeder Protokollzeile steht ein Zeitstempel.
Multithreading
Abfragedaten können von mehreren Threads gleichzeitig abgefragt werden. Bei jeder Abfrage wird ein neuer Protokolldateideskriptor geöffnet mehr parallel. Die Lesevorgänge werden nicht kämpfen.
Obwohl es sich beim Schreiben nur um einen Anhängevorgang handelt, sind wir nicht sicher, ob es sicher ist, die Datei mit mehreren Threads anzuhängen. Daher wird empfohlen, zum Schreiben eine Warteschlange und einen dedizierten Thread zu verwenden.
Sperren
Es gibt keine Sperren.
Sortieren
Standardmäßig sind die abgefragten Daten in chronologischer Reihenfolge angeordnet. Wenn Sie eine andere Sortierung benötigen, können Sie sie nach dem Speichern in den Speicher sortieren es, wie auch immer Sie wollen.
Mehr als 100 Zeilen Datenbankcode
# -*- coding:utf-8 -*- import os import time import bisect import itertools from datetime import datetime import logging default_data_dir = './data/' default_write_buffer_size = 1024*10 default_read_buffer_size = 1024*10 default_index_interval = 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def __init__(self, index_name): self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1) self.indexes, self.offsets, self.index_count = [], [], 0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for line in self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError: # 索引如果没有flush的话,可能读到有半行的数据 pass def append_index(self, key, offset): self.index_count += 1 if self.index_count % default_index_interval == 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left = bisect.bisect_left(self.indexes, str(begin_key)) right = bisect.bisect_left(self.indexes, str(end_key)) left, right = left - 1, right - 1 if left < 0: left = 0 if right < 0: right = 0 if right > len(self.indexes) - 1: right = len(self.indexes) - 1 logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left], self.offsets[right] class WawaDB: def __init__(self, db_name): self.db_name = db_name self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size) self.index = WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line = fp_data.readline() find_real_begin_offset = False will_read_len, read_len = int(end_offset) - int(begin_offset), 0 while line: read_len += len(line) if (not find_real_begin_offset) and (line < str(begin_key)): line = fp_data.readline() continue find_real_begin_offset = True if (read_len >= will_read_len) and (line > str(end_key)): break yield line.rstrip('\r\n') line = fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data, basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time = time.mktime(record_time.timetuple()) data = '%s %s %s' % (record_time, data, os.linesep) offset = self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset = self.index.get_offsets(begin_time, end_time) for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetime import datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args, **kargs): start_time = datetime.now() result = f(*args, **kargs) print '%s take time:%s' % (test_name, (datetime.now() - start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now = datetime.now() begin_time = now - timedelta(hours=5) while begin_time < now: print begin_time for i in range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time += timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time = datetime.now() - timedelta(hours=3) end_time = begin_time + timedelta(minutes=120) results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db = get_db() gen_test_data(db) #db.index.fp_index.flush() db = get_db() test_get_data(db) init() if __name__ == '__main__': test()