/blive

简单的B站直播弹幕websocket监听处理框架

Primary LanguagePythonMIT LicenseMIT

B 站弹幕监听框架

特点

  • 简单,只需房间号即可监听
  • 异步,io 不阻塞,及时获取消息

B 站直播弹幕 websocket 协议分析

PROTOCOL 分析

快速开始

  1. 安装

    pip install blive

  2. 创建 app

    from blive import  BLiver
    
    app = BLiver(123) #123为房间号
  3. 创建处理器

    from blive import  BLiver, Events, BLiverCtx
    
    app = BLiver(123)
    
    # 标记该方法监听弹幕消息,更多消息类型请参考 Events 类源代码
    @app.on(Events.DANMU_MSG)
    async def listen_danmu(ctx: BLiverCtx):
        danmu = DanMuMsg(ctx.body) #ctx.body 套上相应的消息操作类即可得到消息的基本内容,也可直接操作 ctx.body
        print(danmu.content)
        print(danmu.sender)
        print(danmu.timestamp)
  4. 运行

    from blive import  BLiver, Events, BLiverCtx
    
    app = BLiver(123)
    
    @app.on(Events.DANMU_MSG)
    async def listen_danmu(ctx: BLiverCtx):
        danmu = DanMuMsg(ctx.body)
        print(danmu.content)
        print(danmu.sender)
        print(danmu.timestamp)
    
    app.run() # 运行app!

同时监听多个直播间

import asyncio
from blive import BLiver, Events, BLiverCtx
from blive.msg import DanMuMsg


# 定义弹幕事件handler
async def listen(ctx: BLiverCtx):
   danmu = DanMuMsg(ctx.body)
   print(
      f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
   )


async def main():
   # 两个直播间
   ke = BLiver(605)
   azi = BLiver(510)

   # 注册handler
   ke.register_handler(Events.DANMU_MSG, listen)
   azi.register_handler(Events.DANMU_MSG, listen)

   # 以异步task的形式运行
   task1 = ke.run_as_task()
   task2 = azi.run_as_task()

   # await 两个任务
   await asyncio.gather(*[task1, task2])


if __name__ == "__main__":
   loop = asyncio.get_event_loop()
   loop.run_until_complete(main()) 

作为协议解析工具在其他地方使用(伪代码)

from blive.core import BWS_MsgPackage

packman = BWS_MsgPackage() # 实例化一个消息包处理器

while True:
   data = ws.receive() # 当收到消息时
   msg = packman.unpack(data) # 使用packman解析消息,返回一个形如 [(header,body), (header,body), ... ] 数组
   print(msg)

与 fastapi (其他asyncio生态框架) 配合使用

from fastapi import FastAPI
from blive import BLiver,Events
from blive.msg import DanMuMsg

app = FastAPI()

BLIVER_POOL = {}


# 定义弹幕事件handler
async def handler(ctx):
   danmu = DanMuMsg(ctx.body)
   print(
      f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
   )

def create_bliver(roomid):
    b = BLiver(roomid)
    b.register_handler(Events.DANMU_MSG,handler)
    return b


@app.get("/create")
async def create_new_bliver(roomid:int):
    room = BLIVER_POOL.get(roomid,None)
    if not room:
        b = create_bliver(roomid)
        BLIVER_POOL[roomid] = b.run_as_task() # 启动监听
    return {"msg":"创建一个新直播间弹幕监听成功"}


@app.get("/del")
async def rm_bliver(roomid:int):
    room = BLIVER_POOL.get(roomid,None)
    if room:
        room.cancel()
    return {"msg":"移除直播间弹幕监听成功"}


@app.get("/show")
async def show():
    return list(BLIVER_POOL.keys())

全局异常处理

全局异常处理分为两个共享级别,分别为类级别和实例级别,在类上注册的异常处理为所有类实例共享,实例级别的异常处理只有实例自身拥有

app = BLiver(510)

@app.catch(ZeroDivisionError)
def err_handler(e, app: BLiver):
    print(f"{app.uname} catch ZeroDivisionError", e)
   
@app.on(Events.DANMU_MSG)
async def danmu_handler(ctx):
   1 / 0 # will raise ZeroDivisionError

azi.run()

项目简介

  • blive 文件夹为框架代码

    • core.py 为B站ws直播聊天室协议包处理的核心代码

    • framework.py 为框架代码

    • msg.py 为消息操作类代码

  • example/app.py 以框架形式运行例子

  • example/multi_room.py 同时监听多个直播间的实现

  • example/with_fastapi.py 与fastapi 配合使用的例子

  • example/error_handler.py 错误处理的例子

TODO

  • 更多的消息操作类(欢迎各位提pr)
  • 尝试加入中间件架构(目前感觉需求不大)