123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package download
- import (
- "context"
- "errors"
- "github.com/grafov/m3u8"
- "github.com/hashicorp/golang-lru/v2/expirable"
- "github.com/rs/zerolog/log"
- "io"
- "net/http"
- "net/url"
- "strings"
- "sync/atomic"
- "time"
- )
- type HLSOptions struct {
- Timeout time.Duration
- MaxAttempts int
- }
- type HLS struct {
- ctx context.Context
- ctxClose context.CancelFunc
- exit chan any
- options HLSOptions
- client *http.Client
- request *http.Request
- retryAttempts atomic.Uint32
- }
- func NewHLS(r func() (*http.Request, error), opts HLSOptions) (*HLS, error) {
- if r == nil {
- return nil, errors.New("illegal state: empty requester")
- }
- var (
- d = HLS{
- options: opts,
- client: &http.Client{
- Transport: func() *http.Transport {
- transport := http.DefaultTransport.(*http.Transport)
- transport.Proxy = http.ProxyFromEnvironment
- return transport
- }(),
- Timeout: opts.Timeout,
- },
- }
- err error
- )
- if d.request, err = r(); err != nil {
- return nil, err
- }
- d.ctx, d.ctxClose = context.WithCancel(context.Background())
- return &d, nil
- }
- func (d *HLS) Close() error {
- log.Info().Msg("request to close HLS recorder")
- if d.ctxClose != nil {
- d.ctxClose()
- }
- d.exit <- 0
- return nil
- }
- func (d *HLS) Start() chan []byte {
- d.exit = make(chan any)
- var (
- c = make(chan []byte)
- medialist *m3u8.MediaPlaylist
- segments = expirable.NewLRU[uint64, *m3u8.MediaSegment](64, nil, time.Minute)
- err error
- )
- go func() {
- defer close(c)
- for {
- select {
- case <-d.exit:
- c <- nil
- return
- default:
- if medialist, err = d.getMediaList(); err != nil {
- log.Debug().
- Err(err).
- Msg("could not retrieve m3u8 playlist")
- c <- nil
- return
- }
- if medialist.Closed {
- c <- nil
- return
- }
- duration := time.NewTicker(time.Duration(medialist.TargetDuration) * time.Second)
- for _, seq := range medialist.Segments {
- if seq == nil {
- continue
- }
- if _, ok := segments.Get(seq.SeqId); ok {
- continue
- }
- segments.Add(seq.SeqId, seq)
- var (
- uri = seq.URI
- request *http.Request
- response *http.Response
- buff []byte
- )
- // pre-handle relative segment URIs
- if !strings.HasPrefix(uri, "http") {
- if s, err := d.request.URL.Parse(uri); err != nil {
- log.Debug().Err(err).Msg("could not build segment abs segment URI")
- c <- nil
- return
- } else {
- if uri, err = url.QueryUnescape(s.String()); err != nil {
- log.Debug().Err(err).Msg("could not build segment abs segment URI")
- c <- nil
- return
- }
- }
- }
- request = d.request.Clone(d.ctx)
- request.Method = http.MethodGet
- if request.URL, err = request.URL.Parse(uri); err != nil {
- log.Debug().Err(err).Msg("could not parse segment URI")
- c <- nil
- return
- }
- if response, err = d.client.Do(request); err != nil {
- log.Debug().Err(err).Msg("could not retrieve segment data")
- c <- nil
- return
- }
- if buff, err = io.ReadAll(response.Body); err != nil {
- log.Debug().Err(err).Msg("could not read segment data")
- c <- nil
- return
- }
- _ = response.Body.Close()
- c <- buff
- }
- <-duration.C
- }
- }
- }()
- return c
- }
- func (d *HLS) requestMediaList(request *http.Request) (*http.Response, error) {
- var (
- response *http.Response
- err error
- )
- response, err = d.client.Do(request)
- if err != nil && errors.Is(err, http.ErrHandlerTimeout) {
- if d.retryAttempts.Load() == uint32(d.options.MaxAttempts) {
- return nil, err
- }
- select {
- case <-time.After(time.Second):
- }
- if d.options.MaxAttempts != -1 {
- d.retryAttempts.Add(1)
- }
- return d.requestMediaList(request)
- }
- if err != nil {
- return nil, err
- }
- d.retryAttempts.Store(0)
- return response, nil
- }
- func (d *HLS) getMediaList() (*m3u8.MediaPlaylist, error) {
- var (
- request *http.Request
- response *http.Response
- playlist m3u8.Playlist
- t m3u8.ListType
- err error
- )
- request = d.request.Clone(d.ctx)
- if response, err = d.requestMediaList(request); err != nil {
- return nil, err
- }
- if playlist, t, err = m3u8.DecodeFrom(response.Body, false); err != nil {
- return nil, err
- }
- if m3u8.MEDIA != t {
- return nil, errors.New("invalid media type")
- }
- return playlist.(*m3u8.MediaPlaylist), nil
- }
|