Skip to content

Commit

Permalink
feat: bili danmu
Browse files Browse the repository at this point in the history
  • Loading branch information
zijiren233 committed Jan 12, 2025
1 parent 7034069 commit 18c0bcb
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ replace github.com/synctv-org/vendors => ./vendors

require (
github.com/Boostport/mjml-go v0.15.0
github.com/andybalholm/brotli v1.1.1
github.com/caarlos0/env/v9 v9.0.0
github.com/cavaliergopher/grab/v3 v3.0.1
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
cloud.google.com/go/compute/metadata v0.6.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bytedance/sonic v1.12.6 // indirect
github.com/bytedance/sonic/loader v0.2.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions internal/model/movie.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type MovieBase struct {
Type string `json:"type"`
ParentID EmptyNullString `gorm:"type:char(32)" json:"parentId"`
MoreSources []*MoreSource `gorm:"serializer:fastjson;type:text" json:"moreSources,omitempty"`
Danmu string `gorm:"type:varchar(8192)" json:"danmu"`
StreamDanmu string `gorm:"type:varchar(8192)" json:"streamDanmu"`
Live bool `json:"live"`
Proxy bool `json:"proxy"`
RtmpSource bool `json:"rtmpSource"`
Expand Down
8 changes: 5 additions & 3 deletions internal/vendor/bilibili.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ func LoadBilibiliClient(name string) BilibiliInterface {
return bilibiliLocalClient
}

var (
bilibiliLocalClient BilibiliInterface
)
var bilibiliLocalClient BilibiliInterface

func init() {
bilibiliLocalClient = bilibiliService.NewBilibiliService(nil)
Expand Down Expand Up @@ -50,6 +48,10 @@ func newGrpcBilibili(client bilibili.BilibiliClient) BilibiliInterface {
}
}

func (g *grpcBilibili) GetLiveDanmuInfo(ctx context.Context, in *bilibili.GetLiveDanmuInfoReq) (*bilibili.GetLiveDanmuInfoResp, error) {
return g.client.GetLiveDanmuInfo(ctx, in)
}

func (g *grpcBilibili) NewQRCode(ctx context.Context, in *bilibili.Empty) (*bilibili.NewQRCodeResp, error) {
return g.client.NewQRCode(ctx, in)
}
Expand Down
57 changes: 57 additions & 0 deletions server/handlers/danmu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package handlers

import (
"context"
"net/http"

"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/synctv-org/synctv/internal/op"
"github.com/synctv-org/synctv/server/handlers/vendors"
"github.com/synctv-org/synctv/server/model"
)

func StreamDanmu(ctx *gin.Context) {
log := ctx.MustGet("log").(*log.Entry)

room := ctx.MustGet("room").(*op.RoomEntry).Value()
// user := ctx.MustGet("user").(*op.UserEntry).Value()

m, err := room.GetMovieByID(ctx.Param("movieId"))
if err != nil {
log.Errorf("get movie by id error: %v", err)
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorResp(err))
return
}

v, err := vendors.NewVendorService(room, m)
if err != nil {
log.Errorf("new vendor service error: %v", err)
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorResp(err))
return
}

danmu, ok := v.(vendors.VendorDanmuService)
if !ok {
log.Errorf("vendor %s not support danmu", m.VendorInfo.Vendor)
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorStringResp("vendor not support danmu"))
return
}

c, cancel := context.WithCancel(ctx.Request.Context())
defer cancel()

err = danmu.StreamDanmu(c, func(danmu string) error {
ctx.SSEvent("danmu", danmu)
if err := ctx.Err(); err != nil {
return err
}
ctx.Writer.Flush()
return nil
})
if err != nil {
log.Errorf("stream danmu error: %v", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewAPIErrorResp(err))
return
}
}
2 changes: 2 additions & 0 deletions server/handlers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ func initMovie(movie *gin.RouterGroup, needAuthMovie *gin.RouterGroup) {

live.GET("/hls/data/:roomId/:movieId/:dataId", ServeHlsLive)
}

needAuthMovie.GET("/danmu/:movieId", StreamDanmu)
}

func initUser(user *gin.RouterGroup, needAuthUser *gin.RouterGroup) {
Expand Down
4 changes: 4 additions & 0 deletions server/handlers/vendors/vendorBilibili/bilibili.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (s *BilibiliVendorService) GenMovieInfo(ctx context.Context, user *op.User,
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"

movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}

Expand Down Expand Up @@ -260,6 +262,8 @@ func (s *BilibiliVendorService) GenProxyMovieInfo(ctx context.Context, user *op.
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"

movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}

Expand Down
4 changes: 4 additions & 0 deletions server/handlers/vendors/vendorbilibili/bilibili.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (s *BilibiliVendorService) GenMovieInfo(ctx context.Context, user *op.User,
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"

movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}

Expand Down Expand Up @@ -260,6 +262,8 @@ func (s *BilibiliVendorService) GenProxyMovieInfo(ctx context.Context, user *op.
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"

movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}

Expand Down
242 changes: 242 additions & 0 deletions server/handlers/vendors/vendorbilibili/danmu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package vendorbilibili

import (
"bytes"
"compress/zlib"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/andybalholm/brotli"
"github.com/gorilla/websocket"
json "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
"github.com/synctv-org/synctv/internal/vendor"
"github.com/synctv-org/synctv/utils"
"github.com/synctv-org/vendors/api/bilibili"
)

type command uint32

const (
CMD_HEARTBEAT command = 2
CMD_HEARTBEAT_REPLY command = 3
CMD_NORMAL command = 5
CMD_AUTH command = 7
CMD_AUTH_REPLY command = 8
)

type header struct {
TotalSize uint32
HeaderLen uint16
Version uint16
Command command
Sequence uint32
}

var headerLen = binary.Size(header{})

func (h *header) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, 0, headerLen))
err := binary.Write(buf, binary.BigEndian, h)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (h *header) Unmarshal(data []byte) error {
return binary.Read(bytes.NewReader(data), binary.BigEndian, h)
}

func newHeader(size uint32, command command, sequence uint32) header {
h := header{
TotalSize: uint32(headerLen) + size,
HeaderLen: uint16(headerLen),
Command: command,
Sequence: sequence,
}
switch command {
case CMD_HEARTBEAT, CMD_AUTH:
h.Version = 1
}
return h
}

type verifyHello struct {
UID int64 `json:"uid"`
RoomID uint64 `json:"roomid,omitempty"`
ProtoVer int `json:"protover,omitempty"`
Platform string `json:"platform,omitempty"`
Type int `json:"type,omitempty"`
Key string `json:"key,omitempty"`
}

func newVerifyHello(roomID uint64, key string) *verifyHello {
return &verifyHello{
RoomID: roomID,
ProtoVer: 3,
Platform: "web",
Type: 2,
Key: key,
}
}

func writeVerifyHello(conn *websocket.Conn, hello *verifyHello) error {
msg, err := json.Marshal(hello)
if err != nil {
return err
}
header := newHeader(uint32(len(msg)), CMD_AUTH, 1)
headerBytes, err := header.Marshal()
if err != nil {
return err
}
return conn.WriteMessage(websocket.BinaryMessage, append(headerBytes, msg...))
}

func writeHeartbeat(conn *websocket.Conn, sequence uint32) error {
header := newHeader(0, CMD_HEARTBEAT, sequence)
headerBytes, err := header.Marshal()
if err != nil {
return err
}
return conn.WriteMessage(websocket.BinaryMessage, headerBytes)
}

type replyCmd struct {
Cmd string `json:"cmd"`
}

func (v *BilibiliVendorService) StreamDanmu(ctx context.Context, handler func(danmu string) error) error {
resp, err := vendor.LoadBilibiliClient("").GetLiveDanmuInfo(ctx, &bilibili.GetLiveDanmuInfoReq{
RoomID: v.movie.VendorInfo.Bilibili.Cid,
})
if err != nil {
return err
}
if len(resp.HostList) == 0 {
return errors.New("no host list")
}
wssHost := resp.HostList[0].Host
wssPort := resp.HostList[0].WssPort

conn, _, err := websocket.
DefaultDialer.
DialContext(
ctx,
fmt.Sprintf("wss://%s:%d/sub", wssHost, wssPort),
http.Header{
"User-Agent": []string{utils.UA},
"Origin": []string{"https://live.bilibili.com"},
},
)
if err != nil {
return err
}
defer conn.Close()

err = writeVerifyHello(
conn,
newVerifyHello(
v.movie.VendorInfo.Bilibili.Cid,
resp.Token,
),
)
if err != nil {
return err
}

_, _, err = conn.ReadMessage()
if err != nil {
return err
}

go func() {
ticker := time.NewTicker(time.Second * 20)
defer ticker.Stop()
sequence := uint32(1)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sequence++
err = writeHeartbeat(conn, sequence)
if err != nil {
log.Errorf("write heartbeat error: %v", err)
}
}
}
}()

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
_, message, err := conn.ReadMessage()
if err != nil {
return err
}
header := header{}
err = header.Unmarshal(message[:headerLen])
if err != nil {
return err
}
switch header.Command {
case CMD_HEARTBEAT_REPLY:
continue
}
data := message[headerLen:]
switch header.Version {
case 2:
// zlib
zlibReader, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return err
}
defer zlibReader.Close()
data, err = io.ReadAll(zlibReader)
if err != nil {
return err
}
case 3:
// brotli
brotliReader := brotli.NewReader(bytes.NewReader(data))
data, err = io.ReadAll(brotliReader)
if err != nil {
return err
}
data = data[headerLen:]
}
reply := replyCmd{}
err = json.Unmarshal(data, &reply)
if err != nil {
return err
}
switch reply.Cmd {
case "DANMU_MSG":
danmu := danmuMsg{}
err = json.Unmarshal(data, &danmu)
if err != nil {
return err
}
content, ok := danmu.Info[1].(string)
if !ok {
return errors.New("content is not string")
}
handler(content)
case "DM_INTERACTION":
}
}
}
}

type danmuMsg struct {
Info []any `json:"info"`
}
Loading

0 comments on commit 18c0bcb

Please sign in to comment.