| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702 | package platformimport (	"errors"	"fmt"	"git.beejay.kim/WatchDog/ward/internal/tools"	"git.beejay.kim/WatchDog/ward/internal/ws"	"git.beejay.kim/WatchDog/ward/model"	"git.beejay.kim/WatchDog/ward/platform/afreeca"	"github.com/kelindar/bitmap"	"github.com/rs/zerolog/log"	"github.com/spf13/cast"	"github.com/tidwall/gjson"	"google.golang.org/protobuf/proto"	"google.golang.org/protobuf/types/known/timestamppb"	"io"	"net/http"	"net/url"	"strings"	"time")const (	afreecaHeaderReferer     = "https://play.afreecatv.com"	afreecaMessageBufferSize = 256	afreecaStateAuthenticated uint32 = iota	afreecaStateQuickView)type Afreeca struct {	Username string `yaml:"username"`	Password string `yaml:"password"`	host string	rest *http.Client	info *afreeca.Channel	// credentials	cookies []*http.Cookie	uuid    string	state           bitmap.Bitmap	bridgeConnector ws.ConnectionOptions	chatConnector   ws.ConnectionOptions}func (p *Afreeca) Init(bid string) error {	var err error	if p.host = strings.TrimSpace(bid); p.host == "" {		return errors.New("broadcaster ID cannot be empty")	}	// Generate UUID	p.uuid = afreeca.GenerateUID()	// Initiate HTTP Client	p.rest = &http.Client{		Transport: func() *http.Transport {			t := http.DefaultTransport.(*http.Transport)			t.Proxy = http.ProxyFromEnvironment			return t		}(),		Timeout: time.Second * 30,	}	// Retrieve and validate channel data	if p.info, err = p.getChannelData(); err != nil {		return err	}	if p.info.Result != 1 {		return errors.New("could not fetch channel data")	}	if p.info.PasswordRequired {		return errors.New("password is required")	}	p.bridgeConnector = ws.ConnectionOptions{		MessageListener: make(chan []byte, afreecaMessageBufferSize),		PingPeriod:      time.Second * 20,		PingMessage:     afreeca.NewBridgeMessage(afreeca.BridgeCommandKeepAlive).MustMarshall(),		Headers: http.Header{			"Sec-Websocket-Protocol": []string{"bridge"},			"Origin":                 []string{afreecaHeaderReferer},		},	}	p.chatConnector = ws.ConnectionOptions{		MessageListener: make(chan []byte, afreecaMessageBufferSize),		PingPeriod:      time.Minute,		PingMessage:     afreeca.NewChatPingMessage(),		Headers: http.Header{			"Sec-Websocket-Protocol": []string{"chat"},			"Origin":                 []string{afreecaHeaderReferer},		},	}	return nil}func (p *Afreeca) push(ch chan proto.Message, m proto.Message) error {	if ch == nil {		return errors.New("could not push message to the closed channel")	}	if m == nil {		return errors.New("could not push empty message")	}	select {	case ch <- m:	}	return nil}func (p *Afreeca) Connect(ch chan proto.Message) error {	if p.info == nil {		return errors.New("illegal state: empty channel data")	}	var (		bridge *ws.Connection		chat   *ws.Connection		err    error	)	// Open bridge connection	if bridge, err = ws.NewConnection("wss://bridge.afreecatv.com/Websocket", p.bridgeConnector); err != nil {		return err	}	if err = p.onBridgeOpened(bridge); err != nil {		return err	}	for {		select {		case buf := <-p.bridgeConnector.MessageListener:			m := afreeca.BridgeMessage{}			if err = m.UnmarshalJSON(buf); err != nil {				log.Error().					Str("raw", string(buf)).					Err(err).					Msg("could not Unmarshall bridge message")				return err			}			switch m.Command {			case afreeca.BridgeCommandLogin:				if err = p.onBridgeAuthenticated(bridge, m.Data); err != nil {					return err				}			case afreeca.BridgeCommandCertTicket:				if err = p.onCertTicketReceived(bridge, m.Data); err != nil {					return err				}				if chat, err = ws.NewConnection(p.info.ChatServerUrl(), p.chatConnector); err != nil {					return err				}				if err = p.onChatOpened(chat); err != nil {					return err				}			case afreeca.BridgeCommandUserCount:				t := model.Online{					At:           timestamppb.New(time.Now()),					Platform:     p.Id(),					Broadcaster:  p.host,					Raw:          buf,					DevicePc:     cast.ToUint64(m.Data["uiJoinChUser"]),					DeviceMobile: cast.ToUint64(m.Data["uiMbUser"]),				}				t.Total = t.DeviceMobile + t.DevicePc + t.DeviceUnknown				if err = p.push(ch, &t); err != nil {					return err				}			case afreeca.BridgeCommandUserCountExtended:				t := model.Online{					At:           timestamppb.New(time.Now()),					Platform:     p.Id(),					Broadcaster:  p.host,					Raw:          buf,					DevicePc:     cast.ToUint64(m.Data["uiJoinChPCUser"]),					DeviceMobile: cast.ToUint64(m.Data["uiJoinChMBUser"]),				}				t.Total = t.DeviceMobile + t.DevicePc + t.DeviceUnknown				if err = p.push(ch, &t); err != nil {					return err				}			case afreeca.BridgeCommandClosed:				return p.onBridgeClosed(bridge, m.Data)			default:				log.Debug().					Str("command", string(m.Command)).					Any("bridge", m.Data).					Send()			}		case buf := <-p.chatConnector.MessageListener:			m := afreeca.ChatMessage{}			if err = m.Unmarshall(buf); err != nil {				log.Error().					Err(err).					Str("raw", string(buf)).					Msg("could not Unmarshall chat message")				continue			}			if m.Is(afreeca.Kind.Authorize) {				if err = p.onChatAuthenticated(chat); err != nil {					return err				}			} else if m.Is(afreeca.Kind.Authenticate) {				if err = chat.Send(afreeca.NewChatRosterRequestMessage()); err != nil {					return err				}			} else if m.Is(afreeca.Kind.Chat) {				d := m.Data()				if err = p.push(ch, &model.Message{					At:          timestamppb.New(time.Now()),					Platform:    p.Id(),					Broadcaster: p.host,					Raw:         buf,					User: &model.User{						Id:   afreeca.NormalizeUserID(d[1]),						Name: d[5],					},					Text: d[0],				}); err != nil {					return err				}			} else if m.Is(afreeca.Kind.ChatEmoticon) {				d := m.Data()				if err = p.push(ch, &model.Message{					At:          timestamppb.New(time.Now()),					Platform:    p.Id(),					Broadcaster: p.host,					Raw:         buf,					User: &model.User{						Id:   afreeca.NormalizeUserID(d[5]),						Name: d[6],					},					Text: fmt.Sprintf("%s:%d",						cast.ToString(d[2]),						cast.ToInt(d[3]),					),					Sticker: true,				}); err != nil {					return err				}			} else if m.Is(afreeca.Kind.Donation) {				d := m.Data()				if err = p.push(ch, &model.Donation{					At:          timestamppb.New(time.Now()),					Platform:    p.Id(),					Broadcaster: p.host,					Raw:         buf,					User: &model.User{						Id:   afreeca.NormalizeUserID(d[1]),						Name: d[2],					},					Amount: cast.ToUint64(d[3]),				}); err != nil {					return err				}			} else if m.Is(afreeca.Kind.Roster.List) {				d := m.Data()				if cast.ToInt(d[0]) != 1 {					continue				}				d = d[1:]				for i := 0; i < len(d); i++ {					id := afreeca.NormalizeUserID(d[i])					pass := 2					if !strings.Contains(d[i+pass], `|`) {						pass = 3					}					i += pass					if err = p.push(ch, &model.RosterChange{						At:          timestamppb.New(time.Now()),						Platform:    p.Id(),						Broadcaster: p.host,						Raw:         buf,						User: &model.User{							Id: id,						},						Operation: model.RosterChange_OP_JOINED,					}); err != nil {						return err					}				}			} else if m.Is(afreeca.Kind.Roster.Change) {				d := m.Data()				op := model.RosterChange_OP_JOINED				if cast.ToInt(d[0]) == -1 {					op = model.RosterChange_OP_LEFT				}				if err = p.push(ch, &model.RosterChange{					At:          timestamppb.New(time.Now()),					Platform:    p.Id(),					Broadcaster: p.host,					Raw:         buf,					User: &model.User{						Id: afreeca.NormalizeUserID(d[1]),					},					Operation: op,				}); err != nil {					return err				}			} else {				log.Debug().					Bytes("kind", m.Kind()).					Any("chat", m.Data()).					Send()			}		}	}}func (p *Afreeca) Id() string {	return "afreeca"}func (p *Afreeca) Host() string {	return p.host}func (p *Afreeca) makeRequest(meth string, url string, params url.Values) (*http.Request, error) {	var (		request *http.Request		err     error	)	if http.MethodPost == meth {		var body io.Reader		if params != nil {			body = strings.NewReader(params.Encode())		}		request, err = http.NewRequest(meth, url, body)	} else {		if params != nil {			url = fmt.Sprintf("%s?%s", url, params.Encode())		}		request, err = http.NewRequest(meth, url, nil)	}	if err != nil {		return nil, err	}	request.Header.Set("Referer", afreecaHeaderReferer)	if http.MethodPost == meth && params != nil {		request.Header.Set("Content-Type", "application/x-www-form-urlencoded")	}	for i := range p.cookies {		request.AddCookie(p.cookies[i])	}	return request, nil}func (p *Afreeca) login() error {	if p.Username == "" || p.Password == "" {		return errors.New("username or password is empty")	}	var (		request  *http.Request		response *http.Response		buf []byte		err error	)	if request, err = p.makeRequest(http.MethodPost,		"https://login.afreecatv.com/app/LoginAction.php",		url.Values{			"szWork":        []string{"login"},			"szType":        []string{"json"},			"szUid":         []string{p.Username},			"szPassword":    []string{p.Password},			"isSaveId":      []string{"true"},			"isSavePw":      []string{"false"},			"isSaveJoin":    []string{"false"},			"isLoginRetain": []string{"Y"},		}); err != nil {		return err	}	if response, err = p.rest.Do(request); err != nil {		return err	}	//goland:noinspection ALL	defer response.Body.Close()	if buf, err = tools.DecodeHttpResponse(response); err != nil {		return err	}	if gjson.GetBytes(buf, "RESULT").Int() != 1 {		return errors.New("login failed")	}	p.cookies = response.Cookies()	p.state.Set(afreecaStateAuthenticated)	return nil}func (p *Afreeca) getHLSKey(preset afreeca.Preset) (string, error) {	var (		request  *http.Request		response *http.Response		buf []byte		err error	)	if request, err = p.makeRequest(http.MethodPost,		"https://live.afreecatv.com/afreeca/player_live_api.php",		url.Values{			"bid":         []string{p.host},			"bno":         []string{cast.ToString(p.info.Id)},			"from_api":    []string{"0"},			"mode":        []string{"landing"},			"player_type": []string{"html5"},			"pwd":         []string{""},			"quality":     []string{preset.Name},			"stream_type": []string{"common"},			"type":        []string{"aid"},		}); err != nil {	}	if response, err = p.rest.Do(request); err != nil {		return "", err	}	//goland:noinspection ALL	defer response.Body.Close()	if buf, err = tools.DecodeHttpResponse(response); err != nil {		return "", err	}	if gjson.GetBytes(buf, "CHANNEL.RESULT").Int() != 1 {		err = errors.New("could not fetch HLS key")		log.Debug().			Str("json", string(buf)).			Err(err).			Send()		return "", err	}	if aid := gjson.GetBytes(buf, "CHANNEL.AID").String(); aid == "" {		err = errors.New("could not fetch HLS key")		log.Debug().			Str("json", string(buf)).			Err(err).			Send()		return "", err	} else {		return aid, nil	}}func (p *Afreeca) HLSStream() (*http.Request, error) {	var (		preset *afreeca.Preset		aid    string		uri    string		params = url.Values{			"return_type": []string{"gs_cdn_pc_web"},		}		request  *http.Request		response *http.Response		buf []byte		err error	)	if preset = p.info.GetPreset("original"); preset == nil {		return nil, errors.New("could not find feasible preset")	}	if aid, err = p.getHLSKey(*preset); err != nil {		return nil, err	}	params.Add("broad_key",		fmt.Sprintf("%d-common-%s-hls", p.info.Id, preset.Name))	if request, err = p.makeRequest(http.MethodGet,		fmt.Sprintf("%s/broad_stream_assign.html", p.info.RMD),		params); err != nil {		return nil, err	}	if response, err = p.rest.Do(request); err != nil {		return nil, err	}	//goland:noinspection ALL	defer response.Body.Close()	if buf, err = tools.DecodeHttpResponse(response); err != nil {		return nil, err	}	if gjson.GetBytes(buf, "result").Int() != 1 {		err = errors.New("could not fetch HLS url")		log.Debug().			Str("json", string(buf)).			Err(err).			Send()		return nil, err	}	if uri = gjson.GetBytes(buf, "view_url").String(); uri == "" {		err = errors.New("could not fetch HLS url")		log.Debug().			Str("json", string(buf)).			Err(err).			Send()		return nil, err	}	return p.makeRequest(http.MethodGet, uri,		url.Values{			"aid": []string{aid},		})}func (p *Afreeca) getChannelData() (*afreeca.Channel, error) {	var (		request  *http.Request		response *http.Response		ch  afreeca.Channel		buf []byte		err error	)	if request, err = p.makeRequest(http.MethodPost,		fmt.Sprintf("https://live.afreecatv.com/afreeca/player_live_api.php?bjid=%s", p.host),		url.Values{			"bid":         []string{p.host},			"type":        []string{"live"},			"pwd":         []string{},			"player_type": []string{"html5"},			"stream_type": []string{"common"},			"quality":     []string{"HD"},			"mode":        []string{"landing"},			"from_api":    []string{"0"},			"is_revive":   []string{"false"},		}); err != nil {		return nil, err	}	if response, err = p.rest.Do(request); err != nil {		return nil, err	}	//goland:noinspection ALL	defer response.Body.Close()	if buf, err = tools.DecodeHttpResponse(response); err != nil {		return nil, err	}	if err = ch.Unmarshall(buf); err != nil {		return nil, err	}	if !p.state.Contains(afreecaStateAuthenticated) &&		(ch.IsLoginRequired() || ch.IsGeoRestricted() || ch.AudienceGrade > 0) {		log.Debug().Msg("broadcast is login required / geo restricted / has audience grade restrictions; attempt to login")		if err = p.login(); err != nil {			return nil, err		}		log.Debug().Msg("successfully logged in")		return p.getChannelData()	}	if p.getCookieValue("_au") == "" {		if response, err = p.rest.Get(			fmt.Sprintf("https://play.afreecatv.com/%s/%d", p.host, ch.Id)); err != nil {			return nil, err		}		p.cookies = response.Cookies()	}	return &ch, nil}func (p *Afreeca) getCookieValue(k string) string {	for i := range p.cookies {		if p.cookies[i].Name == k {			return p.cookies[i].Value		}	}	return ""}func (p *Afreeca) onBridgeOpened(c *ws.Connection) error {	m := afreeca.NewBridgeInitGatewayMessage(p.host, p.uuid, *p.info, p.getCookieValue)	return c.Send(m.MustMarshall())}func (p *Afreeca) onBridgeClosed(_ *ws.Connection, data map[string]any) error {	var (		s   = cast.ToString(data["pcEndingMsg"])		err error	)	if s, err = url.QueryUnescape(s); err == nil {		log.Debug().Msg(s)	}	return errorClosed}func (p *Afreeca) onBridgeAuthenticated(_ *ws.Connection, data map[string]any) error {	// find if we have QuickView	if cast.ToUint(data["iMode"]) == 1 {		p.state.Set(afreecaStateQuickView)	} else {		p.state.Remove(afreecaStateQuickView)	}	return nil}func (p *Afreeca) onCertTicketReceived(c *ws.Connection, data map[string]any) error {	m := afreeca.NewBridgeInitBroadcastMessage(p.uuid, *p.info, p.getCookieValue)	m.AddArgument("append_data", data["pcAppendDat"])	m.AddArgument("gw_ticket", data["pcTicket"])	return c.Send(m.MustMarshall())}func (p *Afreeca) onChatOpened(c *ws.Connection) error {	m := afreeca.NewChatAuthorizeMessage(		"",		p.state.Contains(afreecaStateQuickView),	)	return c.Send(m.Build())}func (p *Afreeca) onChatAuthenticated(c *ws.Connection) error {	joinlog := afreeca.BridgeMessageValue{}	joinlog.AddTuple("log", func() afreeca.BridgeMessageValueParams {		params := afreeca.BridgeMessageValueParams{}		params.Add("set_bps", p.info.Bitrate)		params.Add("view_bps", 500)		params.Add("quality", "sd")		params.Add("uuid", p.uuid)		params.Add("geo_cc", p.info.GeoName)		params.Add("geo_rc", p.info.GeoCode)		params.Add("acpt_lang", p.info.AcceptLanguage)		params.Add("svc_lang", p.info.ServiceLanguage)		params.Add("subscribe", 0)		params.Add("lowlatency", 0)		params.Add("mode", "landing")		return params	}())	joinlog.AddTuple("pwd", "")	joinlog.AddTuple("auth_info", "NULL")	joinlog.AddTuple("pver", 2)	joinlog.AddTuple("access_system", "html5")	m := afreeca.NewChatMessage(afreeca.Kind.Authenticate,		cast.ToString(p.info.ChatId),		p.info.FanToken,		"0",		"",		joinlog.String(),		"")	return c.Send(m.Build())}
 |