이 기사에서는 중국 IT 베테랑을 위해 작성된 간단한 데이터베이스를 소개합니다. 우리가 사용하는 데이터베이스만큼 강력하지는 않지만 배울 가치가 있습니다. 특정 환경에서 사용할 수 있으며 더 유연하고 편리합니다.
데이터베이스 이름은 Python으로 구현된 WawaDB입니다. 이것은 파이썬이 매우 강력하다는 것을 보여줍니다!
소개
로깅 요구 사항은 일반적으로 다음과 같습니다.
첨부만 하고 수정하지 말고 시간순으로 작성하세요.
대량 작성하세요. 적은 양의 읽기, 쿼리는 일반적으로 특정 기간의 데이터를 쿼리합니다.
MongoDB의 고정 컬렉션은 이러한 요구를 매우 잘 충족할 수 있지만 MongoDB는 상대적으로 많은 양의 메모리를 차지하므로 약간 만드는 것처럼 느껴집니다. 두더지 언덕의 소란.
WawaDB의 아이디어는 1000개의 로그가 기록될 때마다 현재 시간과 로그 파일의 오프셋을 인덱스 파일에 기록하는 것입니다.
그러면 로그를 시간별로 쿼리할 때 먼저 인덱스를 메모리에 로드하고 이분법을 사용하여 시점의 오프셋을 찾은 다음 로그 파일을 열고 지정된 위치를 탐색하면 됩니다. 사용자 요구에 따라 전체 로그 파일을 탐색하지 않고도 신속하게 데이터를 찾아 읽을 수 있습니다.
성능
Core 2 P8400, 2.26GHZ, 2G 메모리, 32비트 win7
쓰기 테스트:
1분에 10,000개의 데이터 쓰기를 시뮬레이션합니다. , 총 5시간의 데이터 작성, 300만개의 데이터 삽입, 각 데이터가 54자, 2분 51초 소요
읽기 테스트 : 해당 기간에 특정 하위 문자열이 포함된 지정된 로그 읽기
데이터 범위 순회 데이터 볼륨 결과 계산 시간(초)
5시간 300만 604 6.6
2 시간 120만 225 2.7
1시간 600,000 96 1.3
30분 300,000 44 0.6
색인
로그에 기록된 시간만 색인화합니다. 소개에서는 대략적으로 인덱스 구현과 관련하여 이진 검색이 B 트리만큼 효율적이지는 않지만 일반적으로 크기가 다르지 않으며 구현이 매우 간단하다고 말합니다.
희소 인덱스이기 때문에 모든 로그에 오프셋을 기록하는 인덱스가 있는 것은 아니므로 데이터를 읽을 때 읽기 누락을 방지하기 위해 앞으로 더 많은 데이터를 읽어야 하며 실제로 읽은 내용을 읽을 때까지 기다려야 합니다. 그러면 데이터가 실제로 사용자에게 반환됩니다.
아래와 같이 예를 들어 사용자가 25부터 43까지의 로그를 읽고 싶다면 이분법을 사용하여 25를 찾고, 30이 위치한 지점을 찾으면
Index : 0 10 20 30 40 50 로그: |..........|..........|..........|....... ...|.......... |>>>>a = [0, 10, 20, 30, 40, 50]>>>>bisect.bisect_left(a, 35)>>>> 3>>>>a[3]>>>>30>>>>bisect.bisect_left(a, 43)>>>5>>>a[5]>>50
그래서 우리는 조금 앞으로 가서 20(30 이전 틱)부터 로그를 읽기 시작하면 21, 22, 23, 24는 25보다 작기 때문에 읽고 버려집니다. 25, 26, 27,...을 읽은 후, 사용자에게 반환됩니다
40(이전 틱의 50)까지 읽은 다음 현재 데이터가 43보다 큰지 확인해야 합니다. 43보다 큰 경우(완전 개방 상태에서 데이터 반환) 범위) 읽기를 중지해야 합니다.
전체적으로 대용량 파일 중 극히 일부분만 운영하여 사용자가 원하는 데이터를 얻을 수 있었습니다.
버퍼
로그 작성 시 대량의 디스크 쓰기를 줄이기 위해 로그 추가 시 버퍼는 10k로 설정됩니다. 시스템 기본값은 4k입니다.
마찬가지로 로그 읽기의 효율성을 높이기 위해 읽기 버퍼도 10k로 설정되어 있으며, 이 역시 로그 크기에 따라 적절히 조정해야 합니다.
인덱스 읽기 및 쓰기는 행 버퍼로 설정되며 불완전한 인덱스 행을 읽는 것을 방지하기 위해 모든 전체 행을 디스크로 플러시해야 합니다. 버퍼 세트, 절반만 읽은 행은 계속 읽을 수 있음) OK).
쿼리
뭐? SQL을 지원하려면 문제를 일으키지 마세요. 어떻게 100줄의 코드가 SQL을 지원할 수 있습니까?
이제 쿼리는 람다 식으로 직접 전달됩니다. 시스템이 지정된 시간 범위 내에서 데이터 행을 순회할 때 사용자의 람바다 조건이 충족되는 경우에만 사용자에게 반환됩니다.
물론 이렇게 하면 사용자에게 필요하지 않은 많은 데이터를 읽어야 하고, 각 줄은 람다 식으로 계산해야 하지만 그럴 리가 없고 단순함이 아름답다.
예전에는 인덱스에서 조건에 맞는 오프셋을 찾을 수 있도록 쿼리해야 하는 조건과 로그 시간, 로그 파일 오프셋을 인덱스에 기록해 두었고, 데이터는 로그 파일에서 검색됩니다. 한 번 읽으십시오. 이는 읽는 데이터의 양이 적다는 장점 하나만 있지만 두 가지 단점이 있습니다.
인덱스 파일이 너무 커서 메모리에 로딩하기 불편함
매번 읽을 때 먼저 읽어야 하며 버퍼를 사용하지 않는 것 같습니다. 매우 느립니다. 데이터 세그먼트를 연속적으로 읽고 람다로 필터링하는 것보다 4~5배 느립니다
쓰기
앞서 말했듯이 데이터를 수정하지 않고 추가만 하며, 각 로그 줄의 앞 부분은 타임스탬프입니다.
멀티스레딩
쿼리 데이터는 동시에 여러 스레드에서 쿼리할 수 있으므로 각 쿼리는 새로운 로그 파일 설명자를 엽니다. 더 많은 병렬 읽기가 발생하지 않습니다.
쓰기의 경우 단순한 추가 작업이지만 여러 스레드가 파일을 추가하는 것이 안전한지 확신할 수 없으므로 쓰기에는 큐와 전용 스레드를 사용하는 것이 좋습니다.
자물쇠
자물쇠가 없습니다.
정렬
기본적으로 쿼리된 데이터는 시간순으로 정렬됩니다. 다른 정렬이 필요한 경우 Python의 정렬 기능을 사용하여 메모리에 넣은 후 정렬할 수 있습니다. 원하는 대로 하세요.
100줄 이상의 데이터베이스 코드
# -*- coding:utf-8 -*- import os import time import bisect import itertools from datetime import datetime import logging default_data_dir = './data/' default_write_buffer_size = 1024*10 default_read_buffer_size = 1024*10 default_index_interval = 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def __init__(self, index_name): self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1) self.indexes, self.offsets, self.index_count = [], [], 0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for line in self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError: # 索引如果没有flush的话,可能读到有半行的数据 pass def append_index(self, key, offset): self.index_count += 1 if self.index_count % default_index_interval == 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left = bisect.bisect_left(self.indexes, str(begin_key)) right = bisect.bisect_left(self.indexes, str(end_key)) left, right = left - 1, right - 1 if left < 0: left = 0 if right < 0: right = 0 if right > len(self.indexes) - 1: right = len(self.indexes) - 1 logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left], self.offsets[right] class WawaDB: def __init__(self, db_name): self.db_name = db_name self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size) self.index = WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line = fp_data.readline() find_real_begin_offset = False will_read_len, read_len = int(end_offset) - int(begin_offset), 0 while line: read_len += len(line) if (not find_real_begin_offset) and (line < str(begin_key)): line = fp_data.readline() continue find_real_begin_offset = True if (read_len >= will_read_len) and (line > str(end_key)): break yield line.rstrip('\r\n') line = fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data, basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time = time.mktime(record_time.timetuple()) data = '%s %s %s' % (record_time, data, os.linesep) offset = self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset = self.index.get_offsets(begin_time, end_time) for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetime import datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args, **kargs): start_time = datetime.now() result = f(*args, **kargs) print '%s take time:%s' % (test_name, (datetime.now() - start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now = datetime.now() begin_time = now - timedelta(hours=5) while begin_time < now: print begin_time for i in range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time += timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time = datetime.now() - timedelta(hours=3) end_time = begin_time + timedelta(minutes=120) results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db = get_db() gen_test_data(db) #db.index.fp_index.flush() db = get_db() test_get_data(db) init() if __name__ == '__main__': test()