python - 如何使用pykafka consumer进行数据处理并保存?
怪我咯
怪我咯 2017-04-18 10:34:31
0
1
1310

使用本地kafka bin/kafka-console-producer.sh --broker-list kafkaIP:port --topic topicName
创建命令行生产数据,然后打开python

from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.x.x:9092")
topic = client.topics['wr_test']
consumer = topic.get_balanced_consumer(consumer_group='test-consumer-group',auto_commit_enable=True,zookeeper_connect='192.168.x.x:2121')

然后自己编写了简单的一套处理函数,从外部引用。将数据处理后存入elasticsearch 或者 数据库
比如
for msg in consumer:

if msg is not None:
    外部引入的处理函数(msg.value)
    

在python命令行
for msg in consumer:

print msg.offset, msg.value

这时候使用生产者敲入一些数据,在消费端就会就会立即打印出来
但是写成py文件之后,每次运行只会处理最近的生产的一次内容,在生产者中再进行输入一些内容,py文件就不会再进行数据处理了。
所以向问下如何编写能运行后能一直对消费者数据进行处理的函数?要注意哪些地方?

另外,get_balanced_consumer的方法,是连接zookeeper消费
使用topic.get_simple_consumer是直接消费kafka,使用这种方式就提示No handler for...的错误

还有一个疑问,就是实际生产环境日志产生量很快,应该如何编写一个多线程处理方法?

怪我咯
怪我咯

走同样的路,发现不同的人生

全員に返信(1)
左手右手慢动作

他の人のブログで代替ソリューションを見ました
http://www.cnblogs.com/castle...
コンシューマからの msg.value をリストに読み取り、リストからデータを読み取ります データ処理を実行し、処理が終了したら、取得したデータをリストに取り出します。 try: ... を除く :... continue も使用してください

いいねを押す +0
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート