本文分享給大家的是基於asyncio 非同步協程框架實現收集B站直播彈幕收集系統的簡單設計,並附上源碼,有需要的小伙伴可以參考下
">
前言
雖然標題是全站,但目前只做了等級top 100 直播間的全天彈幕收集。
彈幕收集系統基於先前的B 站直播彈幕姬 Python 版修改而來。具體協議分析可以看上一篇文章。
直播彈幕協議是直接基於 TCP 協議,所以如果 B 站對類似我這種行為做反制措施,比較困難。應該有我不知道的技術手段來偵測類似我這種惡意行為。
我試過同時連接 100 個房間,和連接單一房間 100 次的實驗,都沒有問題。 >150 會被關閉連結。
直播間的選取
現在彈幕收集系統在選取直播間比較簡單,直接選取了等級 top100。
以後會修改這部分,改成定時去 http://live.bilibili.com/all 查看新開播的直播間,並動態新增任務。
非同步任務和彈幕儲存
收集系統仍舊使用了 asyncio 非同步協程框架,對於每個直播間都使用以下方法來加進 loop 中。
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
其實若將心跳任務 HeartbeatLoop 放入 connectorServer 中去啟動,程式碼看起來更優雅一些。但這麼做是因為我需要維護一個任務列表,後面會有描述。
在彈幕儲存上我花了些時間選擇。
資料庫儲存是一個同步 IO 的過程,Insert 的時候會阻塞彈幕收集的任務。雖然有 aiomysql 這種非同步介面,但設定資料庫太麻煩,我的設想是這個小系統能夠方便地部署。
最終我選擇使用自備的 sqlite3。但 sqlite3 無法做並行操作,故開了一個執行緒單獨進行資料庫儲存。在另一個線程中,100 * 2 個任務蒐集所有的彈幕、人數信息,並塞進隊列 commentq, numq 中。儲存執行緒每隔 10s 喚醒一次,將佇列中的資料寫進 sqlite3 中,並清空佇列。
在多執行緒和非同步的配合下,網路流量沒有被阻塞。
可能的連線失敗場景處理
彈幕協定是直接基於TCP,位元與位元直接關聯性較強,一旦解析錯誤,很容易就拋Exception (個人感覺,雖然TCP 是可靠傳輸,但B站伺服器本身發生錯誤也是有可能的)。所以有必要設計一個自動重連機制。
在asyncio 文件中提到,
Done means either that a result / exception are available, or that the future was cancelled.
#函數
正常回傳、
或是被cancel,都會退出目前任務。可以用 done() 來判斷。 每一個直播間對應兩個任務,解析任務是最容易掛的,但並不會影響心跳任務,所以必須找出並將對應心跳任務結束。
self.tasks[url] = [task1, task2]
在運行過程中,每隔10s 做一次檢查,
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]
結論
top100 平均一天 40M 彈幕數據。
收集的彈幕能做什麼?還沒想好,可能可以拿來做使用者行為分析 -_^
以上是基於asyncio非同步協程框架實現收集B站直播彈幕詳細介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!