本文介紹的是以為中國的IT資深人士寫的一個簡單的資料庫,沒有我們使用的資料庫那麼強大,但是值得大家借鏡。可以用在特定環境中,更靈活方便。
資料庫的名字叫WawaDB,是用python實現的。由此可見python是灰常強大啊!
簡介
記錄日誌的需求一般是這樣的:
只追加,不修改,寫入按時間順序寫入;
大量寫,少量讀,查詢一般查詢一個時間段的資料;
MongoMongoDB的固定集合很好的滿足了這個需求,但是MongoDB佔內存比較大,有點兒火穿蚊子,小題大做的感覺。
WawaDB的想法是每寫入1000個日誌,在一個索引檔案裡記錄下當前的時間和日誌檔案的偏移量。
然後按時間詢問日誌時,先把索引加載到內存中,用二分法查出時間點的偏移量,再打開日誌文件seek到指定位置,這樣就能很快定位用戶需要的數據並讀取,而不需要遍歷整個日誌檔。
性能
Core 2 P8400,2.26GHZ,2G內存,32 bit win7
寫入測試:
模擬1分鐘寫入10000條數據,共寫入5小時的數據,插入5個小時每個資料54個字符,用時2分51秒
讀取測試:讀取指定時間段內包含某個子字串的日誌
資料範圍遍歷資料量結果數用時(秒)
5小時300萬604 6.6
2小時120萬225 2.7
1小時60萬96 1.3
30分鐘30萬44 0.6
化的索引30萬44 0.6
實現,二分查找肯定沒B Tree效率高,但一般情況下也差不了一個數量級,而且實現特別簡單。 因為是稀疏索引,並不是每個日誌都有索引記錄它的偏移量,所以讀取數據時要往前多讀一些數據,防止漏讀,等讀到真正所需的數據時才真正給用戶回傳資料。 如下圖,例如用戶要讀25到43的日誌,用二分法找25,找到的是30所在的點,索引:0 10 20 10 20 10 :|....... ..|.........|.........|.........|.........|>>>a = [0 , 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>> >5>>>a[5]>>>50所以我們要往前倒一些,從20(30的前一個刻度)開始讀取日誌,21,22,23,24讀取後因為比25小,所以扔掉, 讀到25,26,27,...後返回給用戶讀取到40(50的前一個刻度)後就要判斷當前數據是否大於43了,如果大於43(返回全開區間的數據),就要停止讀了。 整體下來我們只操作了大檔案的很少一部分就得到了使用者想要的資料。 緩衝區為了減少寫入日誌時大量的磁碟寫,索引在append日誌時,把buffer設定成了10k,系統預設應該是4k。 同理,為了提高讀取日誌的效率,讀取的buffer也設定了10k,也需要根據你日誌的大小做適當調整。 索引的讀寫設定成了行buffer,每滿一行都要flush到磁碟上,防止讀到不完整的索引行(其實實作證明,設定了行buffer,還是能讀到半拉的行)。 查詢啥?要支援SQL,別鬧了,100行程式碼怎麼支援SQL呀。 現在查詢是直接傳入一個lambada表達式,系統遍歷指定時間範圍內的資料行時,滿足使用者的lambada條件才會傳回給使用者。 🎜🎜當然這樣會多讀取很多用戶不需要的數據,而且每行都要進行lambda表達式的運算,不過沒辦法,簡單就是美呀。 🎜
以前我是把一個需要查詢的條件和日誌時間,日誌檔案偏移量都記錄在索引裡,這樣從索引裡查找出符合條件的偏移量,然後每條資料都如日誌檔案裡seek一次, read一次。這樣好處只有一個,就是讀取的資料量少了,但缺點有兩個:
索引檔特別大,不方便載入到記憶體中
每次讀取都要先seek,貌似緩衝區用不上,特別慢,比連續讀一個段的數據,並用lambda過濾慢四五倍
寫入
前面說過了,只append,不修改數據,而且每行日誌最前面是時間戳。
多執行緒
查詢數據,可以多執行緒同時查詢,每次查詢都會開啟一個新的日誌檔案的描述符,所以並行的多個讀取不會打架。
寫入的話,雖然只是append操作,但不確認多執行緒對檔案進行append操作是否安全,所以建議用一個佇列,一個專用執行緒進行寫入。
鎖
沒有任何鎖。
排序
預設查詢出來的資料是按時間正序排列,如需其它排序,可取到內存後用python的sorted函數排序,想怎麼排就怎麼排。
100多行的資料庫程式碼
# -*- coding:utf-8 -*- import os import time import bisect import itertools from datetime import datetime import logging default_data_dir = './data/' default_write_buffer_size = 1024*10 default_read_buffer_size = 1024*10 default_index_interval = 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def __init__(self, index_name): self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1) self.indexes, self.offsets, self.index_count = [], [], 0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for line in self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError: # 索引如果没有flush的话,可能读到有半行的数据 pass def append_index(self, key, offset): self.index_count += 1 if self.index_count % default_index_interval == 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left = bisect.bisect_left(self.indexes, str(begin_key)) right = bisect.bisect_left(self.indexes, str(end_key)) left, right = left - 1, right - 1 if left < 0: left = 0 if right < 0: right = 0 if right > len(self.indexes) - 1: right = len(self.indexes) - 1 logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left], self.offsets[right] class WawaDB: def __init__(self, db_name): self.db_name = db_name self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size) self.index = WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line = fp_data.readline() find_real_begin_offset = False will_read_len, read_len = int(end_offset) - int(begin_offset), 0 while line: read_len += len(line) if (not find_real_begin_offset) and (line < str(begin_key)): line = fp_data.readline() continue find_real_begin_offset = True if (read_len >= will_read_len) and (line > str(end_key)): break yield line.rstrip('\r\n') line = fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data, basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time = time.mktime(record_time.timetuple()) data = '%s %s %s' % (record_time, data, os.linesep) offset = self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset = self.index.get_offsets(begin_time, end_time) for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetime import datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args, **kargs): start_time = datetime.now() result = f(*args, **kargs) print '%s take time:%s' % (test_name, (datetime.now() - start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now = datetime.now() begin_time = now - timedelta(hours=5) while begin_time < now: print begin_time for i in range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time += timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time = datetime.now() - timedelta(hours=3) end_time = begin_time + timedelta(minutes=120) results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db = get_db() gen_test_data(db) #db.index.fp_index.flush() db = get_db() test_get_data(db) init() if __name__ == '__main__': test()