-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathget_danmu.py
73 lines (58 loc) · 2.03 KB
/
get_danmu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# 代码源自:https://github.com/xfgryujk/blivedm
import asyncio
import http.cookies
import random
from typing import *
import time
import aiohttp
import blivedm
import blivedm.models.web as web_models
# 直播间ID的取值看直播间URL
TEST_ROOM_IDS = [
21482458, # Datawhale
25933362, # Epsilon Luoo
]
# 这里填一个已登录账号的cookie的SESSDATA字段的值。不填也可以连接,但是收到弹幕的用户名会打码,UID会变成0
SESSDATA = ''
session: Optional[aiohttp.ClientSession] = None
async def main():
init_session()
try:
await run_multi_clients()
finally:
await session.close()
def init_session():
cookies = http.cookies.SimpleCookie()
cookies['SESSDATA'] = SESSDATA
cookies['SESSDATA']['domain'] = 'bilibili.com'
global session
session = aiohttp.ClientSession()
session.cookie_jar.update_cookies(cookies)
async def run_multi_clients():
"""
演示同时监听多个直播间
"""
clients = [blivedm.BLiveClient(room_id, session=session) for room_id in TEST_ROOM_IDS]
handler = MyHandler()
for client in clients:
client.set_handler(handler)
client.start()
try:
await asyncio.gather(*(
client.join() for client in clients
))
finally:
await asyncio.gather(*(
client.stop_and_close() for client in clients
))
class MyHandler(blivedm.BaseHandler):
def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage):
with open(f"./bilibili/danmu/danmu_{time.strftime('%Y-%m-%d', time.localtime())}.md", "a", encoding="utf-8") as f:
if client.room_id == 21482458:
print(f'- [ ] {message.msg} => [Datawhle]: {message.uname}', file=f)
elif client.room_id == 25933362:
print(f'- [ ] {message.msg} => [Luoo]: {message.uname}', file=f)
else:
print(f'- [ ] {message.msg} => [Other]: {message.uname}', file=f)
if __name__ == '__main__':
asyncio.run(main())