package download import ( "context" "errors" "github.com/grafov/m3u8" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/rs/zerolog/log" "io" "net/http" "net/url" "strings" "sync/atomic" "time" ) type HLSOptions struct { Timeout time.Duration MaxAttempts int } type HLS struct { ctx context.Context ctxClose context.CancelFunc exit chan any options HLSOptions client *http.Client request *http.Request retryAttempts atomic.Uint32 } func NewHLS(r func() (*http.Request, error), opts HLSOptions) (*HLS, error) { if r == nil { return nil, errors.New("illegal state: empty requester") } var ( d = HLS{ options: opts, client: &http.Client{ Transport: func() *http.Transport { transport := http.DefaultTransport.(*http.Transport) transport.Proxy = http.ProxyFromEnvironment return transport }(), Timeout: opts.Timeout, }, } err error ) if d.request, err = r(); err != nil { return nil, err } d.ctx, d.ctxClose = context.WithCancel(context.Background()) return &d, nil } func (d *HLS) Close() error { log.Info().Msg("request to close HLS recorder") if d.ctxClose != nil { d.ctxClose() } d.exit <- 0 return nil } func (d *HLS) Start() chan []byte { d.exit = make(chan any) var ( c = make(chan []byte) medialist *m3u8.MediaPlaylist segments = expirable.NewLRU[uint64, *m3u8.MediaSegment](64, nil, time.Minute) err error ) go func() { defer close(c) for { select { case <-d.exit: c <- nil return default: if medialist, err = d.getMediaList(); err != nil { log.Debug(). Err(err). Msg("could not retrieve m3u8 playlist") c <- nil return } if medialist.Closed { c <- nil return } duration := time.NewTicker(time.Duration(medialist.TargetDuration) * time.Second) for _, seq := range medialist.Segments { if seq == nil { continue } if _, ok := segments.Get(seq.SeqId); ok { continue } segments.Add(seq.SeqId, seq) var ( uri = seq.URI request *http.Request response *http.Response buff []byte ) // pre-handle relative segment URIs if !strings.HasPrefix(uri, "http") { if s, err := d.request.URL.Parse(uri); err != nil { log.Debug().Err(err).Msg("could not build segment abs segment URI") c <- nil return } else { if uri, err = url.QueryUnescape(s.String()); err != nil { log.Debug().Err(err).Msg("could not build segment abs segment URI") c <- nil return } } } request = d.request.Clone(d.ctx) request.Method = http.MethodGet if request.URL, err = request.URL.Parse(uri); err != nil { log.Debug().Err(err).Msg("could not parse segment URI") c <- nil return } if response, err = d.client.Do(request); err != nil { log.Debug().Err(err).Msg("could not retrieve segment data") c <- nil return } if buff, err = io.ReadAll(response.Body); err != nil { log.Debug().Err(err).Msg("could not read segment data") c <- nil return } _ = response.Body.Close() c <- buff } <-duration.C } } }() return c } func (d *HLS) requestMediaList(request *http.Request) (*http.Response, error) { var ( response *http.Response err error ) response, err = d.client.Do(request) if err != nil && errors.Is(err, http.ErrHandlerTimeout) { if d.retryAttempts.Load() == uint32(d.options.MaxAttempts) { return nil, err } select { case <-time.After(time.Second): } if d.options.MaxAttempts != -1 { d.retryAttempts.Add(1) } return d.requestMediaList(request) } if err != nil { return nil, err } d.retryAttempts.Store(0) return response, nil } func (d *HLS) getMediaList() (*m3u8.MediaPlaylist, error) { var ( request *http.Request response *http.Response playlist m3u8.Playlist t m3u8.ListType err error ) request = d.request.Clone(d.ctx) if response, err = d.requestMediaList(request); err != nil { return nil, err } if playlist, t, err = m3u8.DecodeFrom(response.Body, false); err != nil { return nil, err } if m3u8.MEDIA != t { return nil, errors.New("invalid media type") } return playlist.(*m3u8.MediaPlaylist), nil }