package platform import ( "errors" "fmt" "git.beejay.kim/WatchDog/ward/internal/tools" "git.beejay.kim/WatchDog/ward/internal/ws" "git.beejay.kim/WatchDog/ward/model" "git.beejay.kim/WatchDog/ward/platform/afreeca" "github.com/kelindar/bitmap" "github.com/rs/zerolog/log" "github.com/spf13/cast" "github.com/tidwall/gjson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "io" "net/http" "net/url" "strings" "time" ) const ( afreecaHeaderReferer = "https://play.afreecatv.com" afreecaMessageBufferSize = 256 afreecaStateAuthenticated uint32 = iota afreecaStateQuickView ) type Afreeca struct { ForceAuth bool `yaml:"force-auth"` Username string `yaml:"username"` Password string `yaml:"password"` host string rest *http.Client info *afreeca.Channel // credentials cookies []*http.Cookie uuid string state bitmap.Bitmap bridgeConnector ws.ConnectionOptions chatConnector ws.ConnectionOptions } func (p *Afreeca) Init(bid string) error { var err error if p.host = strings.TrimSpace(bid); p.host == "" { return errors.New("broadcaster ID cannot be empty") } // Generate UUID p.uuid = afreeca.GenerateUID() // Initiate HTTP Client p.rest = &http.Client{ Transport: func() *http.Transport { t := http.DefaultTransport.(*http.Transport) t.Proxy = http.ProxyFromEnvironment return t }(), Timeout: time.Second * 30, } // Authorize if forced if p.ForceAuth { if err = p.login(); err != nil { return err } } // Retrieve and validate channel data if p.info, err = p.getChannelData(); err != nil { return err } if p.info.Result != 1 { return errors.New("could not fetch channel data") } if p.info.PasswordRequired { return errors.New("password is required") } p.bridgeConnector = ws.ConnectionOptions{ MessageListener: make(chan []byte, afreecaMessageBufferSize), PingPeriod: time.Second * 20, PingMessage: afreeca.NewBridgeMessage(afreeca.BridgeCommandKeepAlive).MustMarshall(), Headers: http.Header{ "Sec-Websocket-Protocol": []string{"bridge"}, "Origin": []string{afreecaHeaderReferer}, }, } p.chatConnector = ws.ConnectionOptions{ MessageListener: make(chan []byte, afreecaMessageBufferSize), PingPeriod: time.Minute, PingMessage: afreeca.NewChatPingMessage(), Headers: http.Header{ "Sec-Websocket-Protocol": []string{"chat"}, "Origin": []string{afreecaHeaderReferer}, }, } return nil } func (p *Afreeca) push(ch chan proto.Message, m proto.Message) error { if ch == nil { return errors.New("could not push message to the closed channel") } if m == nil { return errors.New("could not push empty message") } select { case ch <- m: } return nil } func (p *Afreeca) Connect(ch chan proto.Message) error { if p.info == nil { return errors.New("illegal state: empty channel data") } var ( bridge *ws.Connection chat *ws.Connection err error ) // Open bridge connection if bridge, err = ws.NewConnection("wss://bridge.afreecatv.com/Websocket", p.bridgeConnector); err != nil { return err } if err = p.onBridgeOpened(bridge); err != nil { return err } for { select { case buf := <-p.bridgeConnector.MessageListener: m := afreeca.BridgeMessage{} if err = m.UnmarshalJSON(buf); err != nil { log.Error(). Str("raw", string(buf)). Err(err). Msg("could not Unmarshall bridge message") return err } switch m.Command { case afreeca.BridgeCommandLogin: if err = p.onBridgeAuthenticated(bridge, m.Data); err != nil { return err } case afreeca.BridgeCommandCertTicket: if err = p.onCertTicketReceived(bridge, m.Data); err != nil { return err } if chat, err = ws.NewConnection(p.info.ChatServerUrl(), p.chatConnector); err != nil { return err } if err = p.onChatOpened(chat); err != nil { return err } case afreeca.BridgeCommandUserCount: t := model.Online{ At: timestamppb.New(time.Now()), Platform: p.Id(), Broadcaster: p.host, Raw: buf, DevicePc: cast.ToUint64(m.Data["uiJoinChUser"]), DeviceMobile: cast.ToUint64(m.Data["uiMbUser"]), } t.Total = t.DeviceMobile + t.DevicePc + t.DeviceUnknown if err = p.push(ch, &t); err != nil { return err } case afreeca.BridgeCommandUserCountExtended: //t := model.Online{ // At: timestamppb.New(time.Now()), // Platform: p.Id(), // Broadcaster: p.host, // Raw: buf, // DevicePc: cast.ToUint64(m.Data["uiJoinChPCUser"]), // DeviceMobile: cast.ToUint64(m.Data["uiJoinChMBUser"]), //} // //t.Total = t.DeviceMobile + t.DevicePc + t.DeviceUnknown //if err = p.push(ch, &t); err != nil { // return err //} continue case afreeca.BridgeCommandClosed: return p.onBridgeClosed(bridge, m.Data) default: log.Debug(). Str("command", string(m.Command)). Any("bridge", m.Data). Send() } case buf := <-p.chatConnector.MessageListener: m := afreeca.ChatMessage{} if err = m.Unmarshall(buf); err != nil { log.Error(). Err(err). Str("raw", string(buf)). Msg("could not Unmarshall chat message") continue } if m.Is(afreeca.Kind.Authorize) { if err = p.onChatAuthenticated(chat); err != nil { return err } } else if m.Is(afreeca.Kind.Authenticate) { if err = chat.Send(afreeca.NewChatRosterRequestMessage()); err != nil { return err } } else if m.Is(afreeca.Kind.Chat) { d := m.Data() if err = p.push(ch, &model.Message{ At: timestamppb.New(time.Now()), Platform: p.Id(), Broadcaster: p.host, Raw: buf, User: &model.User{ Id: afreeca.NormalizeUserID(d[1]), Name: d[5], }, Text: d[0], }); err != nil { return err } } else if m.Is(afreeca.Kind.ChatEmoticon) { d := m.Data() if err = p.push(ch, &model.Message{ At: timestamppb.New(time.Now()), Platform: p.Id(), Broadcaster: p.host, Raw: buf, User: &model.User{ Id: afreeca.NormalizeUserID(d[5]), Name: d[6], }, Text: fmt.Sprintf("%s:%d", cast.ToString(d[2]), cast.ToInt(d[3]), ), Sticker: true, }); err != nil { return err } } else if m.Is(afreeca.Kind.Donation) { d := m.Data() if err = p.push(ch, &model.Donation{ At: timestamppb.New(time.Now()), Platform: p.Id(), Broadcaster: p.host, Raw: buf, User: &model.User{ Id: afreeca.NormalizeUserID(d[1]), Name: d[2], }, Amount: cast.ToUint64(d[3]), }); err != nil { return err } } else if m.Is(afreeca.Kind.Roster.List) { d := m.Data() if cast.ToInt(d[0]) != 1 { continue } d = d[1:] for i := 0; i < len(d); i++ { id := afreeca.NormalizeUserID(d[i]) pass := 2 if !strings.Contains(d[i+pass], `|`) { pass = 3 } i += pass if err = p.push(ch, &model.RosterChange{ At: timestamppb.New(time.Now()), Platform: p.Id(), Broadcaster: p.host, Raw: buf, User: &model.User{ Id: id, }, Operation: model.RosterChange_OP_JOINED, }); err != nil { return err } } } else if m.Is(afreeca.Kind.Roster.Change) { d := m.Data() op := model.RosterChange_OP_JOINED if cast.ToInt(d[0]) == -1 { op = model.RosterChange_OP_LEFT } if err = p.push(ch, &model.RosterChange{ At: timestamppb.New(time.Now()), Platform: p.Id(), Broadcaster: p.host, Raw: buf, User: &model.User{ Id: afreeca.NormalizeUserID(d[1]), }, Operation: op, }); err != nil { return err } } else { log.Debug(). Bytes("kind", m.Kind()). Any("chat", m.Data()). Send() } } } } func (p *Afreeca) Id() string { return "afreeca" } func (p *Afreeca) Host() string { return p.host } func (p *Afreeca) makeRequest(meth string, url string, params url.Values) (*http.Request, error) { var ( request *http.Request err error ) if http.MethodPost == meth { var body io.Reader if params != nil { body = strings.NewReader(params.Encode()) } request, err = http.NewRequest(meth, url, body) } else { if params != nil { url = fmt.Sprintf("%s?%s", url, params.Encode()) } request, err = http.NewRequest(meth, url, nil) } if err != nil { return nil, err } request.Header.Set("Referer", afreecaHeaderReferer) if http.MethodPost == meth && params != nil { request.Header.Set("Content-Type", "application/x-www-form-urlencoded") } for i := range p.cookies { request.AddCookie(p.cookies[i]) } return request, nil } func (p *Afreeca) login() error { if p.Username == "" || p.Password == "" { return errors.New("username or password is empty") } var ( request *http.Request response *http.Response buf []byte err error ) if request, err = p.makeRequest(http.MethodPost, "https://login.afreecatv.com/app/LoginAction.php", url.Values{ "szWork": []string{"login"}, "szType": []string{"json"}, "szUid": []string{p.Username}, "szPassword": []string{p.Password}, "isSaveId": []string{"true"}, "isSavePw": []string{"false"}, "isSaveJoin": []string{"false"}, "isLoginRetain": []string{"Y"}, }); err != nil { return err } if response, err = p.rest.Do(request); err != nil { return err } //goland:noinspection ALL defer response.Body.Close() if buf, err = tools.DecodeHttpResponse(response); err != nil { return err } if gjson.GetBytes(buf, "RESULT").Int() != 1 { return errors.New("login failed") } p.cookies = response.Cookies() p.state.Set(afreecaStateAuthenticated) return nil } func (p *Afreeca) getHLSKey(preset afreeca.Preset) (string, error) { var ( request *http.Request response *http.Response buf []byte err error ) if request, err = p.makeRequest(http.MethodPost, "https://live.afreecatv.com/afreeca/player_live_api.php", url.Values{ "bid": []string{p.host}, "bno": []string{cast.ToString(p.info.Id)}, "from_api": []string{"0"}, "mode": []string{"landing"}, "player_type": []string{"html5"}, "pwd": []string{""}, "quality": []string{preset.Name}, "stream_type": []string{"common"}, "type": []string{"aid"}, }); err != nil { } if response, err = p.rest.Do(request); err != nil { return "", err } //goland:noinspection ALL defer response.Body.Close() if buf, err = tools.DecodeHttpResponse(response); err != nil { return "", err } if gjson.GetBytes(buf, "CHANNEL.RESULT").Int() != 1 { err = errors.New("could not fetch HLS key") log.Debug(). Str("json", string(buf)). Err(err). Send() return "", err } if aid := gjson.GetBytes(buf, "CHANNEL.AID").String(); aid == "" { err = errors.New("could not fetch HLS key") log.Debug(). Str("json", string(buf)). Err(err). Send() return "", err } else { return aid, nil } } func (p *Afreeca) HLSStream() (*http.Request, error) { var ( preset *afreeca.Preset aid string uri string params = url.Values{ "return_type": []string{"gs_cdn_pc_web"}, } request *http.Request response *http.Response buf []byte err error ) if preset = p.info.GetPreset("original"); preset == nil { return nil, errors.New("could not find feasible preset") } if aid, err = p.getHLSKey(*preset); err != nil { return nil, err } params.Add("broad_key", fmt.Sprintf("%d-common-%s-hls", p.info.Id, preset.Name)) if request, err = p.makeRequest(http.MethodGet, fmt.Sprintf("%s/broad_stream_assign.html", p.info.RMD), params); err != nil { return nil, err } if response, err = p.rest.Do(request); err != nil { return nil, err } //goland:noinspection ALL defer response.Body.Close() if buf, err = tools.DecodeHttpResponse(response); err != nil { return nil, err } if gjson.GetBytes(buf, "result").Int() != 1 { err = errors.New("could not fetch HLS url") log.Debug(). Str("json", string(buf)). Err(err). Send() return nil, err } if uri = gjson.GetBytes(buf, "view_url").String(); uri == "" { err = errors.New("could not fetch HLS url") log.Debug(). Str("json", string(buf)). Err(err). Send() return nil, err } return p.makeRequest(http.MethodGet, uri, url.Values{ "aid": []string{aid}, }) } func (p *Afreeca) getChannelData() (*afreeca.Channel, error) { var ( request *http.Request response *http.Response ch afreeca.Channel buf []byte err error ) if request, err = p.makeRequest(http.MethodPost, fmt.Sprintf("https://live.afreecatv.com/afreeca/player_live_api.php?bjid=%s", p.host), url.Values{ "bid": []string{p.host}, "type": []string{"live"}, "pwd": []string{}, "player_type": []string{"html5"}, "stream_type": []string{"common"}, "quality": []string{"HD"}, "mode": []string{"landing"}, "from_api": []string{"0"}, "is_revive": []string{"false"}, }); err != nil { return nil, err } if response, err = p.rest.Do(request); err != nil { return nil, err } //goland:noinspection ALL defer response.Body.Close() if buf, err = tools.DecodeHttpResponse(response); err != nil { return nil, err } if err = ch.Unmarshall(buf); err != nil { return nil, err } if !p.state.Contains(afreecaStateAuthenticated) && (ch.IsLoginRequired() || ch.IsGeoRestricted() || ch.AudienceGrade > 0) { log.Debug().Msg("broadcast is login required / geo restricted / has audience grade restrictions; attempt to login") if err = p.login(); err != nil { return nil, err } log.Debug().Msg("successfully logged in") return p.getChannelData() } if p.getCookieValue("_au") == "" { if response, err = p.rest.Get( fmt.Sprintf("https://play.afreecatv.com/%s/%d", p.host, ch.Id)); err != nil { return nil, err } p.cookies = response.Cookies() } return &ch, nil } func (p *Afreeca) getCookieValue(k string) string { for i := range p.cookies { if p.cookies[i].Name == k { return p.cookies[i].Value } } return "" } func (p *Afreeca) onBridgeOpened(c *ws.Connection) error { m := afreeca.NewBridgeInitGatewayMessage(p.host, p.uuid, *p.info, p.getCookieValue) return c.Send(m.MustMarshall()) } func (p *Afreeca) onBridgeClosed(_ *ws.Connection, data map[string]any) error { var ( s = cast.ToString(data["pcEndingMsg"]) err error ) if s, err = url.QueryUnescape(s); err == nil { log.Debug().Msg(s) } return errorClosed } func (p *Afreeca) onBridgeAuthenticated(_ *ws.Connection, data map[string]any) error { // find if we have QuickView if cast.ToUint(data["iMode"]) == 1 { p.state.Set(afreecaStateQuickView) } else { p.state.Remove(afreecaStateQuickView) } return nil } func (p *Afreeca) onCertTicketReceived(c *ws.Connection, data map[string]any) error { m := afreeca.NewBridgeInitBroadcastMessage(p.uuid, *p.info, p.getCookieValue) m.AddArgument("append_data", data["pcAppendDat"]) m.AddArgument("gw_ticket", data["pcTicket"]) return c.Send(m.MustMarshall()) } func (p *Afreeca) onChatOpened(c *ws.Connection) error { m := afreeca.NewChatAuthorizeMessage( "", p.state.Contains(afreecaStateQuickView), ) return c.Send(m.Build()) } func (p *Afreeca) onChatAuthenticated(c *ws.Connection) error { joinlog := afreeca.BridgeMessageValue{} joinlog.AddTuple("log", func() afreeca.BridgeMessageValueParams { params := afreeca.BridgeMessageValueParams{} params.Add("set_bps", p.info.Bitrate) params.Add("view_bps", 500) params.Add("quality", "sd") params.Add("uuid", p.uuid) params.Add("geo_cc", p.info.GeoName) params.Add("geo_rc", p.info.GeoCode) params.Add("acpt_lang", p.info.AcceptLanguage) params.Add("svc_lang", p.info.ServiceLanguage) params.Add("subscribe", 0) params.Add("lowlatency", 0) params.Add("mode", "landing") return params }()) joinlog.AddTuple("pwd", "") joinlog.AddTuple("auth_info", "NULL") joinlog.AddTuple("pver", 2) joinlog.AddTuple("access_system", "html5") m := afreeca.NewChatMessage(afreeca.Kind.Authenticate, cast.ToString(p.info.ChatId), p.info.FanToken, "0", "", joinlog.String(), "") return c.Send(m.Build()) }