From 18c0bcb8aab5ec533091e5c55116144cea17071a Mon Sep 17 00:00:00 2001 From: zijiren233 Date: Sun, 12 Jan 2025 14:56:00 +0800 Subject: [PATCH] feat: bili danmu --- go.mod | 2 +- internal/model/movie.go | 2 + internal/vendor/bilibili.go | 8 +- server/handlers/danmu.go | 57 +++++ server/handlers/init.go | 2 + .../vendors/vendorBilibili/bilibili.go | 4 + .../vendors/vendorbilibili/bilibili.go | 4 + .../handlers/vendors/vendorbilibili/danmu.go | 242 ++++++++++++++++++ server/handlers/vendors/vendors.go | 4 + synctv-web | 2 +- vendors | 2 +- 11 files changed, 323 insertions(+), 6 deletions(-) create mode 100644 server/handlers/danmu.go create mode 100644 server/handlers/vendors/vendorbilibili/danmu.go diff --git a/go.mod b/go.mod index 732ef760..d0787bf1 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ replace github.com/synctv-org/vendors => ./vendors require ( github.com/Boostport/mjml-go v0.15.0 + github.com/andybalholm/brotli v1.1.1 github.com/caarlos0/env/v9 v9.0.0 github.com/cavaliergopher/grab/v3 v3.0.1 github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 @@ -62,7 +63,6 @@ require ( cloud.google.com/go/compute/metadata v0.6.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect github.com/BurntSushi/toml v1.3.2 // indirect - github.com/andybalholm/brotli v1.1.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/bytedance/sonic v1.12.6 // indirect github.com/bytedance/sonic/loader v0.2.1 // indirect diff --git a/internal/model/movie.go b/internal/model/movie.go index 447121df..5ff0685a 100644 --- a/internal/model/movie.go +++ b/internal/model/movie.go @@ -74,6 +74,8 @@ type MovieBase struct { Type string `json:"type"` ParentID EmptyNullString `gorm:"type:char(32)" json:"parentId"` MoreSources []*MoreSource `gorm:"serializer:fastjson;type:text" json:"moreSources,omitempty"` + Danmu string `gorm:"type:varchar(8192)" json:"danmu"` + StreamDanmu string `gorm:"type:varchar(8192)" json:"streamDanmu"` Live bool `json:"live"` Proxy bool `json:"proxy"` RtmpSource bool `json:"rtmpSource"` diff --git a/internal/vendor/bilibili.go b/internal/vendor/bilibili.go index c96230cc..3f5aab52 100644 --- a/internal/vendor/bilibili.go +++ b/internal/vendor/bilibili.go @@ -19,9 +19,7 @@ func LoadBilibiliClient(name string) BilibiliInterface { return bilibiliLocalClient } -var ( - bilibiliLocalClient BilibiliInterface -) +var bilibiliLocalClient BilibiliInterface func init() { bilibiliLocalClient = bilibiliService.NewBilibiliService(nil) @@ -50,6 +48,10 @@ func newGrpcBilibili(client bilibili.BilibiliClient) BilibiliInterface { } } +func (g *grpcBilibili) GetLiveDanmuInfo(ctx context.Context, in *bilibili.GetLiveDanmuInfoReq) (*bilibili.GetLiveDanmuInfoResp, error) { + return g.client.GetLiveDanmuInfo(ctx, in) +} + func (g *grpcBilibili) NewQRCode(ctx context.Context, in *bilibili.Empty) (*bilibili.NewQRCodeResp, error) { return g.client.NewQRCode(ctx, in) } diff --git a/server/handlers/danmu.go b/server/handlers/danmu.go new file mode 100644 index 00000000..8d1ca896 --- /dev/null +++ b/server/handlers/danmu.go @@ -0,0 +1,57 @@ +package handlers + +import ( + "context" + "net/http" + + "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" + "github.com/synctv-org/synctv/internal/op" + "github.com/synctv-org/synctv/server/handlers/vendors" + "github.com/synctv-org/synctv/server/model" +) + +func StreamDanmu(ctx *gin.Context) { + log := ctx.MustGet("log").(*log.Entry) + + room := ctx.MustGet("room").(*op.RoomEntry).Value() + // user := ctx.MustGet("user").(*op.UserEntry).Value() + + m, err := room.GetMovieByID(ctx.Param("movieId")) + if err != nil { + log.Errorf("get movie by id error: %v", err) + ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorResp(err)) + return + } + + v, err := vendors.NewVendorService(room, m) + if err != nil { + log.Errorf("new vendor service error: %v", err) + ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorResp(err)) + return + } + + danmu, ok := v.(vendors.VendorDanmuService) + if !ok { + log.Errorf("vendor %s not support danmu", m.VendorInfo.Vendor) + ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorStringResp("vendor not support danmu")) + return + } + + c, cancel := context.WithCancel(ctx.Request.Context()) + defer cancel() + + err = danmu.StreamDanmu(c, func(danmu string) error { + ctx.SSEvent("danmu", danmu) + if err := ctx.Err(); err != nil { + return err + } + ctx.Writer.Flush() + return nil + }) + if err != nil { + log.Errorf("stream danmu error: %v", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewAPIErrorResp(err)) + return + } +} diff --git a/server/handlers/init.go b/server/handlers/init.go index f7c547cf..cf9c883f 100644 --- a/server/handlers/init.go +++ b/server/handlers/init.go @@ -231,6 +231,8 @@ func initMovie(movie *gin.RouterGroup, needAuthMovie *gin.RouterGroup) { live.GET("/hls/data/:roomId/:movieId/:dataId", ServeHlsLive) } + + needAuthMovie.GET("/danmu/:movieId", StreamDanmu) } func initUser(user *gin.RouterGroup, needAuthUser *gin.RouterGroup) { diff --git a/server/handlers/vendors/vendorBilibili/bilibili.go b/server/handlers/vendors/vendorBilibili/bilibili.go index c3dcd54d..abc0c78b 100644 --- a/server/handlers/vendors/vendorBilibili/bilibili.go +++ b/server/handlers/vendors/vendorBilibili/bilibili.go @@ -214,6 +214,8 @@ func (s *BilibiliVendorService) GenMovieInfo(ctx context.Context, user *op.User, if movie.MovieBase.Live { movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) movie.MovieBase.Type = "m3u8" + + movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) return movie, nil } @@ -260,6 +262,8 @@ func (s *BilibiliVendorService) GenProxyMovieInfo(ctx context.Context, user *op. if movie.MovieBase.Live { movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) movie.MovieBase.Type = "m3u8" + + movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) return movie, nil } diff --git a/server/handlers/vendors/vendorbilibili/bilibili.go b/server/handlers/vendors/vendorbilibili/bilibili.go index c3dcd54d..abc0c78b 100644 --- a/server/handlers/vendors/vendorbilibili/bilibili.go +++ b/server/handlers/vendors/vendorbilibili/bilibili.go @@ -214,6 +214,8 @@ func (s *BilibiliVendorService) GenMovieInfo(ctx context.Context, user *op.User, if movie.MovieBase.Live { movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) movie.MovieBase.Type = "m3u8" + + movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) return movie, nil } @@ -260,6 +262,8 @@ func (s *BilibiliVendorService) GenProxyMovieInfo(ctx context.Context, user *op. if movie.MovieBase.Live { movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) movie.MovieBase.Type = "m3u8" + + movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID) return movie, nil } diff --git a/server/handlers/vendors/vendorbilibili/danmu.go b/server/handlers/vendors/vendorbilibili/danmu.go new file mode 100644 index 00000000..a6e6b91e --- /dev/null +++ b/server/handlers/vendors/vendorbilibili/danmu.go @@ -0,0 +1,242 @@ +package vendorbilibili + +import ( + "bytes" + "compress/zlib" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/andybalholm/brotli" + "github.com/gorilla/websocket" + json "github.com/json-iterator/go" + log "github.com/sirupsen/logrus" + "github.com/synctv-org/synctv/internal/vendor" + "github.com/synctv-org/synctv/utils" + "github.com/synctv-org/vendors/api/bilibili" +) + +type command uint32 + +const ( + CMD_HEARTBEAT command = 2 + CMD_HEARTBEAT_REPLY command = 3 + CMD_NORMAL command = 5 + CMD_AUTH command = 7 + CMD_AUTH_REPLY command = 8 +) + +type header struct { + TotalSize uint32 + HeaderLen uint16 + Version uint16 + Command command + Sequence uint32 +} + +var headerLen = binary.Size(header{}) + +func (h *header) Marshal() ([]byte, error) { + buf := bytes.NewBuffer(make([]byte, 0, headerLen)) + err := binary.Write(buf, binary.BigEndian, h) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (h *header) Unmarshal(data []byte) error { + return binary.Read(bytes.NewReader(data), binary.BigEndian, h) +} + +func newHeader(size uint32, command command, sequence uint32) header { + h := header{ + TotalSize: uint32(headerLen) + size, + HeaderLen: uint16(headerLen), + Command: command, + Sequence: sequence, + } + switch command { + case CMD_HEARTBEAT, CMD_AUTH: + h.Version = 1 + } + return h +} + +type verifyHello struct { + UID int64 `json:"uid"` + RoomID uint64 `json:"roomid,omitempty"` + ProtoVer int `json:"protover,omitempty"` + Platform string `json:"platform,omitempty"` + Type int `json:"type,omitempty"` + Key string `json:"key,omitempty"` +} + +func newVerifyHello(roomID uint64, key string) *verifyHello { + return &verifyHello{ + RoomID: roomID, + ProtoVer: 3, + Platform: "web", + Type: 2, + Key: key, + } +} + +func writeVerifyHello(conn *websocket.Conn, hello *verifyHello) error { + msg, err := json.Marshal(hello) + if err != nil { + return err + } + header := newHeader(uint32(len(msg)), CMD_AUTH, 1) + headerBytes, err := header.Marshal() + if err != nil { + return err + } + return conn.WriteMessage(websocket.BinaryMessage, append(headerBytes, msg...)) +} + +func writeHeartbeat(conn *websocket.Conn, sequence uint32) error { + header := newHeader(0, CMD_HEARTBEAT, sequence) + headerBytes, err := header.Marshal() + if err != nil { + return err + } + return conn.WriteMessage(websocket.BinaryMessage, headerBytes) +} + +type replyCmd struct { + Cmd string `json:"cmd"` +} + +func (v *BilibiliVendorService) StreamDanmu(ctx context.Context, handler func(danmu string) error) error { + resp, err := vendor.LoadBilibiliClient("").GetLiveDanmuInfo(ctx, &bilibili.GetLiveDanmuInfoReq{ + RoomID: v.movie.VendorInfo.Bilibili.Cid, + }) + if err != nil { + return err + } + if len(resp.HostList) == 0 { + return errors.New("no host list") + } + wssHost := resp.HostList[0].Host + wssPort := resp.HostList[0].WssPort + + conn, _, err := websocket. + DefaultDialer. + DialContext( + ctx, + fmt.Sprintf("wss://%s:%d/sub", wssHost, wssPort), + http.Header{ + "User-Agent": []string{utils.UA}, + "Origin": []string{"https://live.bilibili.com"}, + }, + ) + if err != nil { + return err + } + defer conn.Close() + + err = writeVerifyHello( + conn, + newVerifyHello( + v.movie.VendorInfo.Bilibili.Cid, + resp.Token, + ), + ) + if err != nil { + return err + } + + _, _, err = conn.ReadMessage() + if err != nil { + return err + } + + go func() { + ticker := time.NewTicker(time.Second * 20) + defer ticker.Stop() + sequence := uint32(1) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sequence++ + err = writeHeartbeat(conn, sequence) + if err != nil { + log.Errorf("write heartbeat error: %v", err) + } + } + } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + _, message, err := conn.ReadMessage() + if err != nil { + return err + } + header := header{} + err = header.Unmarshal(message[:headerLen]) + if err != nil { + return err + } + switch header.Command { + case CMD_HEARTBEAT_REPLY: + continue + } + data := message[headerLen:] + switch header.Version { + case 2: + // zlib + zlibReader, err := zlib.NewReader(bytes.NewReader(data)) + if err != nil { + return err + } + defer zlibReader.Close() + data, err = io.ReadAll(zlibReader) + if err != nil { + return err + } + case 3: + // brotli + brotliReader := brotli.NewReader(bytes.NewReader(data)) + data, err = io.ReadAll(brotliReader) + if err != nil { + return err + } + data = data[headerLen:] + } + reply := replyCmd{} + err = json.Unmarshal(data, &reply) + if err != nil { + return err + } + switch reply.Cmd { + case "DANMU_MSG": + danmu := danmuMsg{} + err = json.Unmarshal(data, &danmu) + if err != nil { + return err + } + content, ok := danmu.Info[1].(string) + if !ok { + return errors.New("content is not string") + } + handler(content) + case "DM_INTERACTION": + } + } + } +} + +type danmuMsg struct { + Info []any `json:"info"` +} diff --git a/server/handlers/vendors/vendors.go b/server/handlers/vendors/vendors.go index c0db42d4..1e5fc979 100644 --- a/server/handlers/vendors/vendors.go +++ b/server/handlers/vendors/vendors.go @@ -38,6 +38,10 @@ type VendorService interface { GenMovieInfo(ctx context.Context, reqUser *op.User, userAgent, userToken string) (*dbModel.Movie, error) } +type VendorDanmuService interface { + StreamDanmu(ctx context.Context, handler func(danmu string) error) error +} + func NewVendorService(room *op.Room, movie *op.Movie) (VendorService, error) { switch movie.VendorInfo.Vendor { case dbModel.VendorBilibili: diff --git a/synctv-web b/synctv-web index f7376302..06afd9cf 160000 --- a/synctv-web +++ b/synctv-web @@ -1 +1 @@ -Subproject commit f737630257f56b6c703853a36d626786fe2a6bd9 +Subproject commit 06afd9cf4a261f8395d09f7859db9ec40abc67c7 diff --git a/vendors b/vendors index e31a7149..54947d82 160000 --- a/vendors +++ b/vendors @@ -1 +1 @@ -Subproject commit e31a7149e750fc145c2559b7023082dd16718726 +Subproject commit 54947d824dd71a2bc3aff1ba572470cd31261dec