navidrome/core/stream/media_streamer.go
Deluan faa4acf583 refactor(stream): make TranscodingThrottle an internal component of mediaStreamer
The throttle is an implementation detail of mediaStreamer, not a
dependency that needs to be injected. Remove it from the constructor
and wire providers; create it internally via a singleton. This
simplifies the API and keeps Release() unexported.
2026-03-25 08:27:23 -04:00

270 lines
8.5 KiB
Go

package stream
import (
"context"
"fmt"
"io"
"mime"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/consts"
"github.com/navidrome/navidrome/core/ffmpeg"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request"
"github.com/navidrome/navidrome/utils/cache"
"github.com/navidrome/navidrome/utils/req"
)
type MediaStreamer interface {
NewStream(ctx context.Context, mf *model.MediaFile, req Request) (*Stream, error)
}
type TranscodingCache cache.FileCache
func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer {
return &mediaStreamer{ds: ds, transcoder: t, cache: cache}
}
type mediaStreamer struct {
ds model.DataStore
transcoder ffmpeg.FFmpeg
cache cache.FileCache
}
type streamJob struct {
ms *mediaStreamer
mf *model.MediaFile
filePath string
format string
bitRate int
sampleRate int
bitDepth int
channels int
offset int
}
func (j *streamJob) Key() string {
return fmt.Sprintf("%s.%s.%d.%d.%d.%d.%s.%d", j.mf.ID, j.mf.UpdatedAt.Format(time.RFC3339Nano), j.bitRate, j.sampleRate, j.bitDepth, j.channels, j.format, j.offset)
}
// NewStream creates a Stream for the given MediaFile and Request. It handles both raw streaming (no transcoding)
// and transcoded streaming based on the requested format and bitrate. It also logs detailed information about
// the streaming request and whether the transcoding result was served from cache or not.
func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req Request) (*Stream, error) {
var format string
var bitRate int
var cached bool
defer func() {
log.Info(ctx, "Streaming file", "title", mf.Title, "artist", mf.Artist, "format", format, "cached", cached,
"bitRate", bitRate, "sampleRate", req.SampleRate, "bitDepth", req.BitDepth, "channels", req.Channels,
"user", userName(ctx), "transcoding", format != "raw",
"originalFormat", mf.Suffix, "originalBitRate", mf.BitRate)
}()
format = req.Format
bitRate = req.BitRate
if format == "" || format == "raw" {
format = "raw"
bitRate = 0
}
s := &Stream{ctx: ctx, mf: mf, format: format, bitRate: bitRate}
filePath := mf.AbsolutePath()
if format == "raw" {
log.Debug(ctx, "Streaming RAW file", "id", mf.ID, "path", filePath,
"requestBitrate", req.BitRate, "requestFormat", req.Format, "requestOffset", req.Offset,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
"selectedBitrate", bitRate, "selectedFormat", format)
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
s.ReadCloser = f
s.Seeker = f
s.format = mf.Suffix
return s, nil
}
// Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss)
if err := getTranscodingThrottle().Acquire(ctx); err != nil {
return nil, err
}
job := &streamJob{
ms: ms,
mf: mf,
filePath: filePath,
format: format,
bitRate: bitRate,
sampleRate: req.SampleRate,
bitDepth: req.BitDepth,
channels: req.Channels,
offset: req.Offset,
}
r, err := ms.cache.Get(ctx, job)
if err != nil {
getTranscodingThrottle().Release()
log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err)
return nil, err
}
cached = r.Cached
if cached {
// Cache hit — no ffmpeg process running, release the slot immediately
getTranscodingThrottle().Release()
s.ReadCloser = r
} else {
// Cache miss — slot released when stream is closed
s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release}
}
s.Seeker = r.Seeker
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath,
"requestBitrate", req.BitRate, "requestFormat", req.Format, "requestOffset", req.Offset,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
"selectedBitrate", bitRate, "selectedFormat", format, "cached", cached, "seekable", s.Seekable())
return s, nil
}
type Stream struct {
ctx context.Context
mf *model.MediaFile
bitRate int
format string
io.ReadCloser
io.Seeker
}
func (s *Stream) Seekable() bool { return s.Seeker != nil }
func (s *Stream) Duration() float32 { return s.mf.Duration }
func (s *Stream) ContentType() string { return mime.TypeByExtension("." + s.format) }
func (s *Stream) Name() string { return s.mf.Title + "." + s.format }
func (s *Stream) ModTime() time.Time { return s.mf.UpdatedAt }
func (s *Stream) EstimatedContentLength() int {
return int(s.mf.Duration * float32(s.bitRate) / 8 * 1024)
}
// Serve writes the stream to the HTTP response. For seekable streams it uses http.ServeContent
// (supporting range requests). For non-seekable streams it writes directly and logs any errors.
// Returns the number of bytes written and an error only when io.Copy fails with 0 bytes written
// (meaning the HTTP 200 status has not been flushed yet and the caller can still send an error response).
// Empty output (0 bytes, no error) is logged but not treated as an error.
func (s *Stream) Serve(ctx context.Context, w http.ResponseWriter, r *http.Request) (int64, error) {
if s.Seekable() {
http.ServeContent(w, r, s.Name(), s.ModTime(), s)
return -1, nil
}
w.Header().Set("Accept-Ranges", "none")
w.Header().Set("Content-Type", s.ContentType())
if req.Params(r).BoolOr("estimateContentLength", false) {
length := strconv.Itoa(s.EstimatedContentLength())
log.Trace(ctx, "Estimated content-length", "contentLength", length)
w.Header().Set("Content-Length", length)
}
if r.Method == http.MethodHead {
go func() { _, _ = io.Copy(io.Discard, s) }()
return 0, nil
}
id := s.mf.ID
c, err := io.Copy(w, s)
if err != nil {
log.Error(ctx, "Error sending transcoded file", "id", id, err)
if c == 0 {
w.Header().Del("Content-Length")
return 0, fmt.Errorf("sending transcoded file: %w", err)
}
return c, nil
}
if c == 0 {
log.Error(ctx, "Transcoding returned empty output, ffmpeg may have failed. "+
"Check that ffmpeg supports the requested codec. Enable Trace logging for ffmpeg stderr details",
"id", id, "format", s.ContentType())
} else {
log.Trace(ctx, "Success sending transcoded file", "id", id, "size", c)
}
return c, nil
}
// NewStream creates a non-seekable Stream from the given components.
func NewStream(mf *model.MediaFile, format string, bitRate int, r io.ReadCloser) *Stream {
return &Stream{
ctx: context.Background(),
mf: mf,
format: format,
bitRate: bitRate,
ReadCloser: r,
}
}
var (
onceTranscodingCache sync.Once
instanceTranscodingCache TranscodingCache
)
func GetTranscodingCache() TranscodingCache {
onceTranscodingCache.Do(func() {
instanceTranscodingCache = NewTranscodingCache()
})
return instanceTranscodingCache
}
func NewTranscodingCache() TranscodingCache {
return cache.NewFileCache("Transcoding", conf.Server.TranscodingCacheSize,
consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems,
func(ctx context.Context, arg cache.Item) (io.Reader, error) {
job := arg.(*streamJob)
command := LookupTranscodeCommand(ctx, job.ms.ds, job.format)
if command == "" {
log.Error(ctx, "No transcoding command available", "format", job.format)
return nil, os.ErrInvalid
}
// Choose the appropriate context based on EnableTranscodingCancellation configuration.
// This is where we decide whether transcoding processes should be cancellable or not.
var transcodingCtx context.Context
if conf.Server.EnableTranscodingCancellation {
// Use the request context directly, allowing cancellation when client disconnects
transcodingCtx = ctx
} else {
// Use background context with request values preserved.
// This prevents cancellation but maintains request metadata (user, client, etc.)
transcodingCtx = request.AddValues(context.Background(), ctx)
}
out, err := job.ms.transcoder.Transcode(transcodingCtx, ffmpeg.TranscodeOptions{
Command: command,
Format: job.format,
FilePath: job.filePath,
BitRate: job.bitRate,
SampleRate: job.sampleRate,
BitDepth: job.bitDepth,
Channels: job.channels,
Offset: job.offset,
})
if err != nil {
log.Error(ctx, "Error starting transcoder", "id", job.mf.ID, err)
return nil, os.ErrInvalid
}
return out, nil
})
}
// userName extracts the username from the context for logging purposes.
func userName(ctx context.Context) string {
if user, ok := request.UserFrom(ctx); !ok {
return "UNKNOWN"
} else {
return user.UserName
}
}