Szenenbeschreibung:
现有许多行日志文本,按天压缩成一个个TB级的gzip文件。
使用流对每个压缩文件的数据段进行传输然后解压,对解压出的文本分词并索引
以后查到这个词时,定位到这个词所在的文件和段,再用流传输并解压
(实际上是想利用已有的压缩文件构造一个类似ES的搜索引擎)
Das Problem besteht nun darin, dass die empfangenen Daten aufgrund unvollständiger Informationen nicht dekomprimiert werden können, da es sich nicht um eine vollständig komprimierte Datei, sondern um Blockbinärdaten handelt.
Jetzt möchte ich eine solche Funktion implementieren: Zuerst die empfangenen Stream-Daten dekomprimieren und als vollständige Daten wiederherstellen (die ursprünglichen Protokolldaten werden durch Zeilenumbrüche getrennt, es wäre gut, den Text vor der Komprimierung aller Stream-Daten und den Offset davon abzurufen die entsprechende Datei) und berücksichtigen dann, dass Prozesse wie Übertragung und Speicherung Datenfehler verursachen können. Dekomprimieren Sie daher für jeden Datenstrom so viele Daten wie möglich, falls Fehler auftreten.
Ein Teil des relevanten Codes lautet wie folgt: (geändert von https://stackoverflow.com/que...)
import zlib
import traceback
CHUNKSIZE=30
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
f = open('test.py.gz','rb')
buffer = f.read(CHUNKSIZE)
i = 0
while buffer :
i += 1
try:
#skip two chunk
if i < 3 or i > 4:
outstr = d.decompress(buffer)
print('*'*10 + outstr + '#'*10)
except Exception, e:
print(traceback.print_exc())
finally:
buffer = f.read(CHUNKSIZE)
outstr = d.flush()
print(outstr)
f.close()
Wenn i>=3, wird jedes Mal in der Schleife ein Fehler gemeldet.
Meine Schlussfolgerung ist, dass die nachfolgenden Daten nicht dekomprimiert werden können, wenn der Stream diskontinuierlich ist (überspringt, um einen Teil der Daten zu empfangen).
Frage 1: Wie können wir jeden Teil der empfangenen Daten korrekt dekomprimieren? (Da es sich möglicherweise um den Algorithmus und die Datenstruktur der GZIP-Komprimierung handelt, schaue ich mir den entsprechenden Code an. Wenn das Problem gelöst werden kann, kann das Anhängen eines bestimmten Chucks im Übertragungsheader oder eines Chucks vor und nach den Daten, die dekomprimiert werden müssen, gelöst werden , es kann in Betracht gezogen werden)
Frage 2:
Wenn Sie nicht jeden Teil der empfangenen Daten korrekt dekomprimieren können, wie können Sie dann so viele Daten wie möglich dekomprimieren?
我觉得可以做一个出错重新续传的功能,传输前备份当前这一段数据流,你得判断出当前传输的这一段数据流是否传输完整了。这就要求传送端和接收端之间的传输协议是你能改动的,出现错误就立刻反馈fail给传输端,从刚才这段重新续传,没有错误就反馈OK,继续传输下一段。这样就能保证数据的完整性。如果文件太大,可以在内存中备份多些数据段,做些细节性的判断。
不太確定你描述的問題,不過在stackoverflow 有些問答或許有幫助。
How can I decompress a gzip stream with zlib?
Python decompressing gzip chunk-by-chunk