yulinfeng000/blive

控制台报错

Closed this issue · 15 comments

await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/client_ws.py", line 156, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 688, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 601, in _send_frame
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
Job "BLiver.heartbeat (trigger: interval[0:00:30], next run at: 2022-02-18 00:14:06 CST)" raised an exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/apscheduler/executors/base_py3.py", line 30, in run_coroutine_job
    retval = await job.func(*job.args, **job.kwargs)
  File "/usr/local/lib/python3.8/dist-packages/blive/framework.py", line 147, in heartbeat
    await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/client_ws.py", line 156, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 688, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 601, in _send_frame
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
Job "BLiver.heartbeat (trigger: interval[0:00:30], next run at: 2022-02-18 00:14:36 CST)" raised an exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/apscheduler/executors/base_py3.py", line 30, in run_coroutine_job
    retval = await job.func(*job.args, **job.kwargs)
  File "/usr/local/lib/python3.8/dist-packages/blive/framework.py", line 147, in heartbeat
    await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/client_ws.py", line 156, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 688, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 601, in _send_frame
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
Job "BLiver.heartbeat (trigger: interval[0:00:30], next run at: 2022-02-18 00:15:06 CST)" raised an exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/apscheduler/executors/base_py3.py", line 30, in run_coroutine_job
    retval = await job.func(*job.args, **job.kwargs)
  File "/usr/local/lib/python3.8/dist-packages/blive/framework.py", line 147, in heartbeat
    await self._ws.send_bytes(packman.pack(heartbeat(), Operation.HEARTBEAT))
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/client_ws.py", line 156, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 688, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "/usr/local/lib/python3.8/dist-packages/aiohttp/http_websocket.py", line 601, in _send_frame
    raise ConnectionResetError("Cannot write to closing transport")
ConnectionResetError: Cannot write to closing transport
^CTraceback (most recent call last):
  File "main.py", line 44, in <module>
    app.run()
  File "/usr/local/lib/python3.8/dist-packages/blive/framework.py", line 176, in run
    loop.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)

可能是由于直播间一段时间没有弹幕,导致超时

可能B站因为直播间长时间没有弹幕关闭ws连接了

请问能提供复现的条件吗

使用的app.py,是一个小主播,除了直播的时候没有弹幕。
我使用screen挂在后台全天监测保存弹幕,在非直播时间报错,弹幕姬停止运行并不在重启
可以增加timeout时间,或者通过try不显示报错并自动重连?

好的,这个需求我收到了,会跟进的,如果你有什么想法也欢迎提pr

已增加断线重连机制,如果没有其他的问题的话我会在一周后关闭这个issue

请教一下,我如果想同时监听好几个直播间弹幕的话,把app.py封装成class,类似这样可以吗?

from blive import BLiver, Events, BLiverCtx
from blive.msg import (
    DanMuMsg,
    HotRankChangeV2Msg,
    InteractWordMsg,
    SendGiftMsg,
    SuperChatMsg,
)


class bullet:
    app = BLiver(255)
    
    def __init__(self, uid):
        self.uid = uid
        self.app = BLiver(uid)
        print(uid)

    @app.on(Events.DANMU_MSG)
    async def listen(ctx: BLiverCtx):
        danmu = DanMuMsg(ctx.body)
        print(
            f"\n{danmu.sender['name']}({danmu.sender['medal']['medal_name']}:{danmu.sender['medal']['medal_level']}): \"{danmu.content}\"\n"
        )
    
    
    @app.on(Events.INTERACT_WORD)
    async def listen_join(ctx: BLiverCtx):
        join = InteractWordMsg(ctx.body)
        print(
            "欢迎",
            f"{join.user['name']} ({join.user['medal']['medal_name']}:{join.user['medal']['medal_level']})",
            "进入直播间",
        )
    
    
    @app.on(Events.SUPER_CHAT_MESSAGE)
    async def listen_sc(ctx: BLiverCtx):
        msg = SuperChatMsg(ctx.body)
        print(msg.sender)
        print(msg.content)
        print(msg.start_time)
        print(msg.time)
        print(msg.price)
    
    
    @app.on(Events.SEND_GIFT)
    async def listen_gift(ctx: BLiverCtx):
        msg = SendGiftMsg(ctx.body)
        print(f"{msg.sender['name']} 送出 {msg.gift['gift_name']}")
    
    
    @app.on(Events.HOT_RANK_CHANGED_V2)
    async def hot(ctx: BLiverCtx):
        msg = HotRankChangeV2Msg(ctx.body)
        print(
            f"恭喜 {ctx.bliver.uname} 在 {msg.area_name} 区 的 {msg.rank_desc} 榜单中获得第 {msg.rank} 名"
        )
    
    
x = bullet(5169315)
x.app.run()

但是这样似乎无法正常运行,可能是在初始化bliver时就已经开始监听了,无法在__init__中修改,有什么好的办法吗?

谢谢~

给一个例子吧,其实有设计了这样的功能但是没有写在README里,如果你用着不错我会考虑写进README,例子里实现了同时监听2个直播间的弹幕,更多直播间可以类推,如果遇到性能问题建议安装uvloop库替换python的原生事件循环库

import asyncio
from blive import BLiver,Events,BLiverCtx
from blive.msg import DanMuMsg


# 定义弹幕事件handler
async def listen(ctx: BLiverCtx):
    danmu = DanMuMsg(ctx.body)
    print(
        f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
    )


async def main():
    # 两个直播间
    ke = BLiver(605)
    azi = BLiver(510)

    # 注册handler
    ke.register_handler(Events.DANMU_MSG, listen)
    azi.register_handler(Events.DANMU_MSG,listen)

    # 以异步task的形式运行
    task1 = ke.run_as_task()
    task2 = azi.run_as_task()

    # await 两个任务
    await asyncio.gather(*[task1,task2])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

试了一下,感觉不错,建议加入到README
我还在尝试监听1、2号直播间的同时,加入监听3号直播间,或不影响1号直播间弹幕的同时,关闭2号直播间监听,但是我只能关闭重启,会导致其他直播间弹幕丢失,有什么好办法吗?

以run_as_task的形式运行后,一个BLiver应用的生命周期都可以以 异步任务( asyncio.Task ) 的形式管理了。关于这方面请学习asyncio库和python异步io编程相关的知识,原谅我很难在issue里三言两语讲清楚,再给一个例子以供参考,例子实现了在运行中停止一个BLiver实例和启动一个BLiver实例。

我的想法是本项目应该尽可能简单(simple),灵活而强大。而不是臃肿和复杂,并且提供asyncio的Task对象很容易和其他asyncio生态融合( fastapi,sanic ... ),你可以写出任何形式的代码,所以本项目不会提供更高级的封装类。

def main():
    async def stop_bliver(task):
         # 睡10秒后停止azi直播间
        await asyncio.sleep(10)
        print("停止一个直播间")
        task.cancel()

    async def stop_bliver_38():
        await asyncio.sleep(10)
        print("停止azi直播间")
        for task in asyncio.all_tasks():
            if task.get_name() == "azi":
                task.cancel()
    
    async def new_bliver():
        # 15秒后创建一个新直播间监听
        await asyncio.sleep(15)
        print("创建一个新直播间监听")
        dys = BLiver(7777)
        dys.run_as_task()
        


    loop = asyncio.get_event_loop()
    # 两个直播间
    ke = BLiver(605)
    azi = BLiver(510)

    # 注册handler
    ke.register_handler(Events.DANMU_MSG, listen)
    azi.register_handler(Events.DANMU_MSG, listen)

    
    task1 = ke.run_as_task()
    task2 = azi.run_as_task()

    # asyncio.Task.set_name 方法只在python3.8以上才有!!!想用名字管理task建议升级到python3.8
    # task1.set_name("ke")
    # task2.set_name("azi")

    loop.create_task(stop_bliver(task1))
    # loop.create_task(stop_bliver_38())
    loop.create_task(new_bliver())
    
    #只要run_forever,也可以不await,因为await是为了达到程序表现像同步代码的效果
    loop.run_forever() 


if __name__ == "__main__":
    main()

和fastapi整合的例子

# main.py

from fastapi import FastAPI
from blive import BLiver,Events
from blive.msg import DanMuMsg

app = FastAPI()

BLIVER_POOL = {}

def create_bliver(roomid):
    # 定义弹幕事件handler
    async def listen(ctx):
        danmu = DanMuMsg(ctx.body)
        print(
            f'\n{danmu.sender.name} ({danmu.sender.medal.medal_name}:{danmu.sender.medal.medal_level}): "{danmu.content}"\n'
        )
    b = BLiver(roomid)
    b.register_handler(Events.DANMU_MSG,listen)
    return b


@app.get("/create")
async def create_new_bliver(roomid:int):
    room = BLIVER_POOL.get(roomid,None)
    if not room:
        b = create_bliver(roomid)
        BLIVER_POOL[roomid] = b.run_as_task()
    return {"msg":"创建一个新直播间弹幕监听成功"}


@app.get("/del")
async def rm_bliver(roomid:int):
    room = BLIVER_POOL.get(roomid,None)
    if room:
        room.cancel()
    return {"msg":"移除直播间弹幕监听成功"}


@app.get("/show")
async def show():
    return list(BLIVER_POOL.keys())

实现了请求 GET http://ip:port/create?roomid=xxx 创建新监听, 请求 GET http://ip:port/del?roomid=xxx 移除监听 和 GET http://ip:port/show 展示当前所有监听的房间

和fastapi的这个脚本,连续开好几天之后,似乎create会失效,目前不知道原因

新开个issue吧,顺便附上log之类的(如果有的话),感觉很难复现

之前说的,create失效的原因找到了
我直接复制的readme中与fastapi结合部分的代码,其中,del部分没有从BLIVER_POOL中pop,再次开启监听时,程序会认为已开启,导致开启失败。

我简单修改了一下

# 创建直播间弹幕监听
@app.get("/create")
async def create_new_bliver(roomid: int):
    room = BLIVER_POOL.get(roomid, None)
    if room == None:
        b = create_bliver(roomid)
        BLIVER_POOL[roomid] = b.run_as_task() 
        log("INFO", f"开启直播间监听: {roomid}")
        return {"msg": f"创建直播间 {roomid} 弹幕监听成功", "status": True}
    return {"msg": f"直播间 {roomid} 已在监听中", "status": False}

# 删除直播间弹幕监听
@app.get("/del")
async def rm_bliver(roomid: int):
    global BLIVER_POOL
    room = BLIVER_POOL.get(roomid, None)
    if room != None:
        room.cancel()
        BLIVER_POOL.pop(roomid)
        return {"msg": f"删除直播间 {roomid} 弹幕监听成功", "status": True}
    return {"msg": f"直播间 {roomid} 不在弹幕监听中", "status": False}

# 已在监听中的直播间列表
@app.get("/show")
async def show():
    return list(BLIVER_POOL.keys())

# 查询直播间弹幕监听
@app.get("/query")
async def query(roomid: int):
    room = BLIVER_POOL.get(roomid, None)
    if room != None:
        return {"msg": f"直播间 {roomid} 弹幕正在监听中", "status": True}
    return {"msg": f"直播间 {roomid} 弹幕不在监听中", "status": False}

之前说的,create失效的原因找到了 我直接复制的readme中与fastapi结合部分的代码,其中,del部分没有从BLIVER_POOL中pop,再次开启监听时,程序会认为已开启,导致开启失败。

我简单修改了一下

# 创建直播间弹幕监听
@app.get("/create")
async def create_new_bliver(roomid: int):
    room = BLIVER_POOL.get(roomid, None)
    if room == None:
        b = create_bliver(roomid)
        BLIVER_POOL[roomid] = b.run_as_task() 
        log("INFO", f"开启直播间监听: {roomid}")
        return {"msg": f"创建直播间 {roomid} 弹幕监听成功", "status": True}
    return {"msg": f"直播间 {roomid} 已在监听中", "status": False}

# 删除直播间弹幕监听
@app.get("/del")
async def rm_bliver(roomid: int):
    global BLIVER_POOL
    room = BLIVER_POOL.get(roomid, None)
    if room != None:
        room.cancel()
        BLIVER_POOL.pop(roomid)
        return {"msg": f"删除直播间 {roomid} 弹幕监听成功", "status": True}
    return {"msg": f"直播间 {roomid} 不在弹幕监听中", "status": False}

# 已在监听中的直播间列表
@app.get("/show")
async def show():
    return list(BLIVER_POOL.keys())

# 查询直播间弹幕监听
@app.get("/query")
async def query(roomid: int):
    room = BLIVER_POOL.get(roomid, None)
    if room != None:
        return {"msg": f"直播间 {roomid} 弹幕正在监听中", "status": True}
    return {"msg": f"直播间 {roomid} 弹幕不在监听中", "status": False}

updated, thank you