Skip to content

Commit a9afac3

Browse files
committed
更新弹幕处理工具
1 parent 4f12cc2 commit a9afac3

File tree

13 files changed

+1545
-421
lines changed

13 files changed

+1545
-421
lines changed

utils/bilibili_live/blivedm/LICENSE

Lines changed: 0 additions & 21 deletions
This file was deleted.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
from .models import *
2+
__version__ = '1.1.0-dev'
3+
34
from .handlers import *
4-
from .client import *
5+
from .clients import *
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# -*- coding: utf-8 -*-
2+
from .web import *
3+
from .open_live import *
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
# -*- coding: utf-8 -*-
2+
import asyncio
3+
import datetime
4+
import hashlib
5+
import hmac
6+
import json
7+
import logging
8+
import uuid
9+
from typing import *
10+
11+
import aiohttp
12+
13+
from . import ws_base
14+
15+
__all__ = (
16+
'OpenLiveClient',
17+
)
18+
19+
logger = logging.getLogger('blivedm')
20+
21+
START_URL = 'https://live-open.biliapi.com/v2/app/start'
22+
HEARTBEAT_URL = 'https://live-open.biliapi.com/v2/app/heartbeat'
23+
END_URL = 'https://live-open.biliapi.com/v2/app/end'
24+
25+
26+
class OpenLiveClient(ws_base.WebSocketClientBase):
27+
"""
28+
开放平台客户端
29+
30+
文档参考:https://open-live.bilibili.com/document/
31+
32+
:param access_key_id: 在开放平台申请的access_key_id
33+
:param access_key_secret: 在开放平台申请的access_key_secret
34+
:param app_id: 在开放平台创建的项目ID
35+
:param room_owner_auth_code: 主播身份码
36+
:param session: cookie、连接池
37+
:param heartbeat_interval: 发送连接心跳包的间隔时间(秒)
38+
:param game_heartbeat_interval: 发送项目心跳包的间隔时间(秒)
39+
"""
40+
41+
def __init__(
42+
self,
43+
access_key_id: str,
44+
access_key_secret: str,
45+
app_id: int,
46+
room_owner_auth_code: str,
47+
*,
48+
session: Optional[aiohttp.ClientSession] = None,
49+
heartbeat_interval=30,
50+
game_heartbeat_interval=20,
51+
):
52+
super().__init__(session, heartbeat_interval)
53+
54+
self._access_key_id = access_key_id
55+
self._access_key_secret = access_key_secret
56+
self._app_id = app_id
57+
self._room_owner_auth_code = room_owner_auth_code
58+
self._game_heartbeat_interval = game_heartbeat_interval
59+
60+
# 在调用init_room后初始化的字段
61+
self._room_owner_uid: Optional[int] = None
62+
"""主播用户ID"""
63+
self._room_owner_open_id: Optional[str] = None
64+
"""主播Open ID"""
65+
self._host_server_url_list: Optional[List[str]] = []
66+
"""弹幕服务器URL列表"""
67+
self._auth_body: Optional[str] = None
68+
"""连接弹幕服务器用的认证包内容"""
69+
self._game_id: Optional[str] = None
70+
"""项目场次ID"""
71+
72+
# 在运行时初始化的字段
73+
self._game_heartbeat_timer_handle: Optional[asyncio.TimerHandle] = None
74+
"""发项目心跳包定时器的handle"""
75+
76+
@property
77+
def room_owner_uid(self) -> Optional[int]:
78+
"""
79+
主播用户ID,调用init_room后初始化
80+
"""
81+
return self._room_owner_uid
82+
83+
@property
84+
def room_owner_open_id(self) -> Optional[str]:
85+
"""
86+
主播Open ID,调用init_room后初始化
87+
"""
88+
return self._room_owner_open_id
89+
90+
@property
91+
def room_owner_auth_code(self):
92+
"""
93+
主播身份码
94+
"""
95+
return self._room_owner_auth_code
96+
97+
@property
98+
def app_id(self):
99+
"""
100+
在开放平台创建的项目ID
101+
"""
102+
return self._app_id
103+
104+
@property
105+
def game_id(self) -> Optional[str]:
106+
"""
107+
项目场次ID,调用init_room后初始化
108+
"""
109+
return self._game_id
110+
111+
async def close(self):
112+
"""
113+
释放本客户端的资源,调用后本客户端将不可用
114+
"""
115+
if self.is_running:
116+
logger.warning('room=%s is calling close(), but client is running', self.room_id)
117+
118+
if self._game_heartbeat_timer_handle is not None:
119+
self._game_heartbeat_timer_handle.cancel()
120+
self._game_heartbeat_timer_handle = None
121+
await self._end_game()
122+
123+
await super().close()
124+
125+
def _request_open_live(self, url, body: dict):
126+
body_bytes = json.dumps(body).encode('utf-8')
127+
headers = {
128+
'x-bili-accesskeyid': self._access_key_id,
129+
'x-bili-content-md5': hashlib.md5(body_bytes).hexdigest(),
130+
'x-bili-signature-method': 'HMAC-SHA256',
131+
'x-bili-signature-nonce': uuid.uuid4().hex,
132+
'x-bili-signature-version': '1.0',
133+
'x-bili-timestamp': str(int(datetime.datetime.now().timestamp())),
134+
}
135+
136+
str_to_sign = '\n'.join(
137+
f'{key}:{value}'
138+
for key, value in headers.items()
139+
)
140+
signature = hmac.new(
141+
self._access_key_secret.encode('utf-8'), str_to_sign.encode('utf-8'), hashlib.sha256
142+
).hexdigest()
143+
headers['Authorization'] = signature
144+
145+
headers['Content-Type'] = 'application/json'
146+
headers['Accept'] = 'application/json'
147+
return self._session.post(url, headers=headers, data=body_bytes)
148+
149+
async def init_room(self):
150+
"""
151+
开启项目,并初始化连接房间需要的字段
152+
153+
:return: 是否成功
154+
"""
155+
if not await self._start_game():
156+
return False
157+
158+
if self._game_id != '' and self._game_heartbeat_timer_handle is None:
159+
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
160+
self._game_heartbeat_interval, self._on_send_game_heartbeat
161+
)
162+
return True
163+
164+
async def _start_game(self):
165+
try:
166+
async with self._request_open_live(
167+
START_URL,
168+
{'code': self._room_owner_auth_code, 'app_id': self._app_id}
169+
) as res:
170+
if res.status != 200:
171+
logger.warning('_start_game() failed, status=%d, reason=%s', res.status, res.reason)
172+
return False
173+
data = await res.json()
174+
if data['code'] != 0:
175+
logger.warning('_start_game() failed, code=%d, message=%s, request_id=%s',
176+
data['code'], data['message'], data['request_id'])
177+
return False
178+
if not self._parse_start_game(data['data']):
179+
return False
180+
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
181+
logger.exception('_start_game() failed:')
182+
return False
183+
return True
184+
185+
def _parse_start_game(self, data):
186+
self._game_id = data['game_info']['game_id']
187+
websocket_info = data['websocket_info']
188+
self._auth_body = websocket_info['auth_body']
189+
self._host_server_url_list = websocket_info['wss_link']
190+
anchor_info = data['anchor_info']
191+
self._room_id = anchor_info['room_id']
192+
self._room_owner_uid = anchor_info['uid']
193+
self._room_owner_open_id = anchor_info['open_id']
194+
return True
195+
196+
async def _end_game(self):
197+
"""
198+
关闭项目。建议关闭客户端时保证调用到这个函数(close会调用),否则可能短时间内无法重复连接同一个房间
199+
"""
200+
if self._game_id in (None, ''):
201+
return True
202+
203+
try:
204+
async with self._request_open_live(
205+
END_URL,
206+
{'app_id': self._app_id, 'game_id': self._game_id}
207+
) as res:
208+
if res.status != 200:
209+
logger.warning('room=%d _end_game() failed, status=%d, reason=%s',
210+
self._room_id, res.status, res.reason)
211+
return False
212+
data = await res.json()
213+
code = data['code']
214+
if code != 0:
215+
if code in (7000, 7003):
216+
# 项目已经关闭了也算成功
217+
return True
218+
219+
logger.warning('room=%d _end_game() failed, code=%d, message=%s, request_id=%s',
220+
self._room_id, code, data['message'], data['request_id'])
221+
return False
222+
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
223+
logger.exception('room=%d _end_game() failed:', self._room_id)
224+
return False
225+
return True
226+
227+
def _on_send_game_heartbeat(self):
228+
"""
229+
定时发送项目心跳包的回调
230+
"""
231+
self._game_heartbeat_timer_handle = asyncio.get_running_loop().call_later(
232+
self._game_heartbeat_interval, self._on_send_game_heartbeat
233+
)
234+
asyncio.create_task(self._send_game_heartbeat())
235+
236+
async def _send_game_heartbeat(self):
237+
"""
238+
发送项目心跳包
239+
"""
240+
if self._game_id in (None, ''):
241+
logger.warning('game=%d _send_game_heartbeat() failed, game_id not found', self._game_id)
242+
return False
243+
244+
try:
245+
# 保存一下,防止await之后game_id改变
246+
game_id = self._game_id
247+
async with self._request_open_live(
248+
HEARTBEAT_URL,
249+
{'game_id': game_id}
250+
) as res:
251+
if res.status != 200:
252+
logger.warning('room=%d _send_game_heartbeat() failed, status=%d, reason=%s',
253+
self._room_id, res.status, res.reason)
254+
return False
255+
data = await res.json()
256+
code = data['code']
257+
if code != 0:
258+
logger.warning('room=%d _send_game_heartbeat() failed, code=%d, message=%s, request_id=%s',
259+
self._room_id, code, data['message'], data['request_id'])
260+
261+
if code == 7003 and self._game_id == game_id:
262+
# 项目异常关闭,可能是心跳超时,需要重新开启项目
263+
self._need_init_room = True
264+
if self._websocket is not None and not self._websocket.closed:
265+
await self._websocket.close()
266+
267+
return False
268+
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
269+
logger.exception('room=%d _send_game_heartbeat() failed:', self._room_id)
270+
return False
271+
return True
272+
273+
async def _on_before_ws_connect(self, retry_count):
274+
"""
275+
在每次建立连接之前调用,可以用来初始化房间
276+
"""
277+
# 重连次数太多则重新init_room,保险
278+
reinit_period = max(3, len(self._host_server_url_list or ()))
279+
if retry_count > 0 and retry_count % reinit_period == 0:
280+
self._need_init_room = True
281+
await super()._on_before_ws_connect(retry_count)
282+
283+
def _get_ws_url(self, retry_count) -> str:
284+
"""
285+
返回WebSocket连接的URL,可以在这里做故障转移和负载均衡
286+
"""
287+
return self._host_server_url_list[retry_count % len(self._host_server_url_list)]
288+
289+
async def _send_auth(self):
290+
"""
291+
发送认证包
292+
"""
293+
await self._websocket.send_bytes(self._make_packet(self._auth_body, ws_base.Operation.AUTH))
294+
295+
def _handle_command(self, command: dict):
296+
cmd = command.get('cmd', '')
297+
if cmd == 'LIVE_OPEN_PLATFORM_INTERACTION_END' and command['data']['game_id'] == self._game_id:
298+
# 服务器主动停止推送,可能是心跳超时,需要重新开启项目
299+
logger.warning('room=%d game end by server, game_id=%s', self._room_id, self._game_id)
300+
301+
self._need_init_room = True
302+
if self._websocket is not None and not self._websocket.closed:
303+
asyncio.create_task(self._websocket.close())
304+
return
305+
306+
super()._handle_command(command)

0 commit comments

Comments
 (0)