Python怎麼使用ClickHouse

WBOY
發布: 2023-05-17 08:19:28
轉載
2897 人瀏覽過

    ClickHouse是近年來備受關注的開源列式資料庫(DBMS),主要用於資料線上分析(OLAP)領域,於2016年開源。目前國內社區火熱,各大廠紛紛跟進大規模使用。

    • 今日頭條,內部用ClickHouse來做用戶行為分析,內部一共幾千個ClickHouse節點,單集群最大1200節點,總數據量幾十PB,日增原始數據300TB左右。

    • 騰訊內部用ClickHouse做遊戲資料分析,並且為此建立了一整套監控維繫體系。

    • 從2018年7月開始試用,攜程內部已經將80%的業務遷移到ClickHouse資料庫上。每天資料增量十多億,近百萬次查詢請求。

    • 快手內部也在使用ClickHouse,儲存總量約10PB, 每天新增200TB, 90%查詢小於3S。

    在國外,Yandex內部有數百節點做使用者點擊行為分析,CloudFlare、Spotify等頭部公司也正在使用。

    ClickHouse最初是為了開發YandexMetrica,這是世界第二大Web分析平台。多年來一直作為該系統的核心組件被該系統持續使用。

    1. 關於ClickHouse使用實踐

    首先,我們回顧一些基礎概念:

    • OLTP#:是傳統的關係型資料庫,主要操作增刪改查,強調交易一致性,如銀行體系、電商系統。

    • OLAP:是倉庫型資料庫,主要是讀取數據,做複雜數據分析,專注於技術決策支持,提供直覺簡單的結果。

    1.1. ClickHouse 應用於資料倉儲場景

    ClickHouse做為列式資料庫,列式資料庫更適合OLAP場景,OLAP場景的關鍵特徵:

    • 絕大多數是讀取請求

    • 資料以相當大的批次(> 1000行)更新,而不是單行更新;或根本沒有更新。

    • 已新增至資料庫的資料不能修改。

    • 對於讀取,從資料庫中提取相當多的行,但只提取列的一小部分。

    • 寬表,即每個表包含大量的列

    • #查詢相對較少(通常每台伺服器每秒查詢數百次或更少)

    • 對於簡單查詢,允許延遲大約50毫秒

    • 列中的資料相對較小:數字和短字符字串(例如,每個URL 60個位元組)

    • 處理單一查詢時需要高吞吐量(每台伺服器每秒可達數十億行)

    • 交易不是必須的

    • 對資料一致性要求低

    • 每個查詢都有一個大表。除了他以外,其他的都很小。

    • 查詢結果明顯小於來源資料。換句話說,資料經過過濾或聚合,因此結果適合於單一伺服器的RAM中

    #1.2. 客戶端工具DBeaver

    ##Clickhouse客戶端工具為dbeaver,官網為https://dbeaver.io/。

    • dbeaver是免費和開源(GPL)為開發人員和資料庫管理員通用資料庫工具。 [百度百科]

    • 該專案的核心目標是提高易用性,因此我們專門設計和開發了一個資料庫管理工具。免費、跨平台、基於開源框架和允許各種擴展寫作(插件)。

    • 它支援任何具有一個JDBC驅動程式資料庫。

    • 它可以處理任何的外部資料來源。

    透過操作介面選單中「資料庫」建立配置新連接,如下圖所示,選擇並下載ClickHouse驅動(預設不含驅動)。

    Python怎麼使用ClickHouse

    DBeaver配置是基於Jdbc方式,一般預設URL和連接埠如下:

    jdbc:clickhouse://192.168.17.61:8123
    登入後複製

    如下圖所示。

    在是用DBeaver連接Clickhouse做查詢時,有時會出現連接或查詢逾時的情況,這個時候可以在連接的參數中加入設定socket_timeout參數來解決問題。

    jdbc:clickhouse://{host}:{port}[/{database}]?socket_timeout=600000
    登入後複製

    Python怎麼使用ClickHouse

    1.3. 大數據應用實作

    • 環境簡單說明:

    • 硬體資源有限,僅16G內存,交易資料為億級。

    本應用程式是某交易大數據,主要包括交易主表、相關客戶資訊、物料資訊、歷史價格、優惠及積分資訊等,其中主交易表為自關聯樹狀表結構。

    為了分析客戶交易行為,在有限資源的條件下,按日和交易點抽取、匯集交易明細為交易記錄,如下圖所示。

    Python怎麼使用ClickHouse

    其中,在ClickHouse上,交易数据结构由60个列(字段)组成,截取部分如下所示:

    Python怎麼使用ClickHouse

    针对频繁出现“would use 10.20 GiB , maximum: 9.31 GiB”等内存不足的情况,基于ClickHouse的SQL,编写了提取聚合数据集SQL语句,如下所示。

    Python怎麼使用ClickHouse

    大约60s返回结果,如下所示:

    Python怎麼使用ClickHouse

    2. Python使用ClickHouse实践

    2.1. ClickHouse第三方Python驱动clickhouse_driver

    ClickHouse没有提供官方Python接口驱动,常用第三方驱动接口为clickhouse_driver,可以使用pip方式安装,如下所示:

    pip install clickhouse_driver Collecting clickhouse_driver Downloading https://files.pythonhosted.org/packages/88/59/c570218bfca84bd0ece896c0f9ac0bf1e11543f3c01d8409f5e4f801f992/clickhouse_driver-0.2.1-cp36-cp36m-win_amd64.whl (173kB) 100% |████████████████████████████████| 174kB 27kB/s Collecting tzlocal<3.0 (from clickhouse_driver) Downloading https://files.pythonhosted.org/packages/5d/94/d47b0fd5988e6b7059de05720a646a2930920fff247a826f61674d436ba4/tzlocal-2.1-py2.py3-none-any.whl Requirement already satisfied: pytz in d:\python\python36\lib\site-packages (from clickhouse_driver) (2020.4) Installing collected packages: tzlocal, clickhouse-driver Successfully installed clickhouse-driver-0.2.1 tzlocal-2.1
    登入後複製

    使用的client api不能用了,报错如下:

    File "clickhouse_driver\varint.pyx", line 62, in clickhouse_driver.varint.read_varint

    File "clickhouse_driver\bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.BufferedReader.read_one

    File "clickhouse_driver\bufferedreader.pyx", line 240, in clickhouse_driver.bufferedreader.BufferedSocketReader.read_into_buffer

    EOFError: Unexpected EOF while reading bytes

    Python驱动使用ClickHouse端口9000

    ClickHouse服务器和客户端之间的通信有两种协议:http(端口8123)和本机(端口9000)。DBeaver驱动配置使用jdbc驱动方式,端口为8123。

    ClickHouse接口返回数据类型为元组,也可以返回Pandas的DataFrame,本文代码使用的为返回DataFrame。

    collection = self.client.query_dataframe(self.query_sql)
    登入後複製

    2.2. 实践程序代码

    由于我本机最初资源为8G内存(现扩到16G),以及实际可操作性,分批次取数据保存到多个文件中,每个文件大约为1G。

    # -*- coding: utf-8 -*- ''' Created on 2021年3月1日 @author: xiaoyw ''' import pandas as pd import json import numpy as np import datetime from clickhouse_driver import Client #from clickhouse_driver import connect # 基于Clickhouse数据库基础数据对象类 class DB_Obj(object): ''' 192.168.17.61:9000 ebd_all_b04.card_tbl_trade_m_orc ''' def __init__(self, db_name): self.db_name = db_name host='192.168.17.61' #服务器地址 port ='9000' #'8123' #端口 user='***' #用户名 password='***' #密码 database=db_name #数据库 send_receive_timeout = 25 #超时时间 self.client = Client(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout) #self.conn = connect(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout) def setPriceTable(self,df): self.pricetable = df def get_trade(self,df_trade,filename): print('Trade join price!') df_trade = pd.merge(left=df_trade,right=self.pricetable[['occurday','DIM_DATE','END_DATE','V_0','V_92','V_95','ZDE_0','ZDE_92', 'ZDE_95']],how="left",on=['occurday']) df_trade.to_csv(filename,mode='a',encoding='utf-8',index=False) def get_datas(self,query_sql): n = 0 # 累计处理卡客户数据 k = 0 # 取每次DataFrame数据量 batch = 100000 #100000 # 分批次处理 i = 0 # 文件标题顺序累加 flag=True # 数据处理解释标志 filename = 'card_trade_all_{}.csv' while flag: self.query_sql = query_sql.format(n, n+batch) print('query started') collection = self.client.query_dataframe(self.query_sql) print('return query result') df_trade = collection #pd.DataFrame(collection) i=i+1 k = len(df_trade) if k > 0: self.get_trade(df_trade, filename.format(i)) n = n + batch if k == 0: flag=False print('Completed ' + str(k) + 'trade details!') print('Usercard count ' + str(n) ) return n # 价格变动数据集 class Price_Table(object): def __init__(self, cityname, startdate): self.cityname = cityname self.startdate = startdate self.filename = 'price20210531.csv' def get_price(self): df_price = pd.read_csv(self.filename) ...... self.price_table=self.price_table.append(data_dict, ignore_index=True) print('generate price table!') class CardTradeDB(object): def __init__(self,db_obj): self.db_obj = db_obj def insertDatasByCSV(self,filename): # 存在数据混合类型 df = pd.read_csv(filename,low_memory=False) # 获取交易记录 def getTradeDatasByID(self,ID_list=None): # 字符串过长,需要使用''' query_sql = '''select C.carduser_id,C.org_id,C.cardasn,C.occurday as ...... limit {},{}) group by C.carduser_id,C.org_id,C.cardasn,C.occurday order by C.carduser_id,C.occurday''' n = self.db_obj.get_datas(query_sql) return n if __name__ == '__main__': PTable = Price_Table('湖北','2015-12-01') PTable.get_price() db_obj = DB_Obj('ebd_all_b04') db_obj.setPriceTable(PTable.price_table) CTD = CardTradeDB(db_obj) df = CTD.getTradeDatasByID()
    登入後複製

    返回本地文件为:

    Python怎麼使用ClickHouse

    3. 小结一下

    ClickHouse运用于OLAP场景时,拥有出色的查询速度,但需要具备大内存支持。Python第三方clickhouse-driver 驱动基本满足数据处理需求,如果能返回Pandas DataFrame最好。

    ClickHouse和Pandas聚合都是非常快的,ClickHouse聚合函数也较为丰富(例如文中anyLast(x)返回最后遇到的值),如果能通过SQL聚合的,还是在ClickHouse中完成比较理想,把更小的结果集反馈给Python进行机器学习。

    操作ClickHouse删除指定数据

    def info_del2(i): client = click_client(host='地址', port=端口, user='用户名', password='密码', database='数据库') sql_detail='alter table SS_GOODS_ORDER_ALL delete where order_id='+str(i)+';' try: client.execute(sql_detail) except Exception as e: print(e,'删除商品数据失败')
    登入後複製

    在进行数据删除的时候,python操作clickhou和mysql的方式不太一样,这里不能使用以往常用的%s然后添加数据的方式,必须完整的编辑一条语句,如同上面方法所写的一样,传进去的参数统一使用str类型

    以上是Python怎麼使用ClickHouse的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    相關標籤:
    來源:yisu.com
    本網站聲明
    本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
    最新下載
    更多>
    網站特效
    網站源碼
    網站素材
    前端模板
    關於我們 免責聲明 Sitemap
    PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!