diff --git a/conf/configuration.go b/conf/configuration.go index fce5e0b2f..97cacbbe5 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -58,6 +58,7 @@ type configOptions struct { PlaylistsPath string SmartPlaylistRefreshDelay time.Duration AutoTranscodeDownload bool + MaxConcurrentTranscodes int DefaultDownsamplingFormat string Search searchOptions `json:",omitzero"` SimilarSongsMatchThreshold int @@ -113,35 +114,37 @@ type configOptions struct { Agents string // DevFlags. These are used to enable/disable debugging and incomplete features - DevLogLevels map[string]string `json:",omitempty"` - DevLogSourceLine bool - DevEnableProfiler bool - DevAutoCreateAdminPassword string - DevAutoLoginUsername string - DevActivityPanel bool - DevActivityPanelUpdateRate time.Duration - DevSidebarPlaylists bool - DevShowArtistPage bool - DevUIShowConfig bool - DevNewEventStream bool - DevOffsetOptimize int - DevArtworkMaxRequests int - DevArtworkThrottleBacklogLimit int - DevArtworkThrottleBacklogTimeout time.Duration - DevArtistInfoTimeToLive time.Duration - DevAlbumInfoTimeToLive time.Duration - DevExternalScanner bool - DevScannerThreads uint - DevSelectiveWatcher bool - DevInsightsInitialDelay time.Duration - DevEnablePlayerInsights bool - DevEnablePluginsInsights bool - DevPluginCompilationTimeout time.Duration - DevExternalArtistFetchMultiplier float64 - DevOptimizeDB bool - DevPreserveUnicodeInExternalCalls bool - DevEnableMediaFileProbe bool - DevJpegCoverArt bool + DevLogLevels map[string]string `json:",omitempty"` + DevLogSourceLine bool + DevEnableProfiler bool + DevAutoCreateAdminPassword string + DevAutoLoginUsername string + DevActivityPanel bool + DevActivityPanelUpdateRate time.Duration + DevSidebarPlaylists bool + DevShowArtistPage bool + DevUIShowConfig bool + DevNewEventStream bool + DevOffsetOptimize int + DevArtworkMaxRequests int + DevArtworkThrottleBacklogLimit int + DevArtworkThrottleBacklogTimeout time.Duration + DevTranscodeThrottleBacklogLimit int + DevTranscodeThrottleBacklogTimeout time.Duration + DevArtistInfoTimeToLive time.Duration + DevAlbumInfoTimeToLive time.Duration + DevExternalScanner bool + DevScannerThreads uint + DevSelectiveWatcher bool + DevInsightsInitialDelay time.Duration + DevEnablePlayerInsights bool + DevEnablePluginsInsights bool + DevPluginCompilationTimeout time.Duration + DevExternalArtistFetchMultiplier float64 + DevOptimizeDB bool + DevPreserveUnicodeInExternalCalls bool + DevEnableMediaFileProbe bool + DevJpegCoverArt bool } type scannerOptions struct { @@ -703,6 +706,7 @@ func setViperDefaults() { viper.SetDefault("enablem3uexternalalbumart", false) viper.SetDefault("enablemediafilecoverart", true) viper.SetDefault("autotranscodedownload", false) + viper.SetDefault("maxconcurrenttranscodes", max(4, runtime.NumCPU())) viper.SetDefault("defaultdownsamplingformat", consts.DefaultDownsamplingFormat) viper.SetDefault("search.fullstring", false) viper.SetDefault("search.backend", "fts") @@ -813,6 +817,8 @@ func setViperDefaults() { viper.SetDefault("devartworkmaxrequests", max(4, runtime.NumCPU())) viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit) viper.SetDefault("devartworkthrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout) + viper.SetDefault("devtranscodethrottlebackloglimit", consts.RequestThrottleBacklogLimit) + viper.SetDefault("devtranscodethrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout) viper.SetDefault("devartistinfotimetolive", consts.ArtistInfoTimeToLive) viper.SetDefault("devalbuminfotimetolive", consts.AlbumInfoTimeToLive) viper.SetDefault("devexternalscanner", true) diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index de03b4d2f..1396aa493 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -91,6 +91,11 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req return s, nil } + // Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss) + if err := getTranscodingThrottle().Acquire(ctx); err != nil { + return nil, err + } + job := &streamJob{ ms: ms, mf: mf, @@ -104,12 +109,19 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } r, err := ms.cache.Get(ctx, job) if err != nil { + getTranscodingThrottle().Release() log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err) return nil, err } cached = r.Cached - - s.ReadCloser = r + if cached { + // Cache hit — no ffmpeg process running, release the slot immediately + getTranscodingThrottle().Release() + s.ReadCloser = r + } else { + // Cache miss — slot released when stream is closed + s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release} + } s.Seeker = r.Seeker log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath, diff --git a/core/stream/media_streamer_test.go b/core/stream/media_streamer_test.go index 1bc21e239..9cd987a57 100644 --- a/core/stream/media_streamer_test.go +++ b/core/stream/media_streamer_test.go @@ -31,6 +31,7 @@ var _ = Describe("MediaStreamer", func() { }) testCache := stream.NewTranscodingCache() Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) + conf.Server.MaxConcurrentTranscodes = 0 // Disable throttling for general tests streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache) }) AfterEach(func() { @@ -72,4 +73,20 @@ var _ = Describe("MediaStreamer", func() { Expect(s.Seekable()).To(BeTrue()) }) }) + + Context("NewStream with throttle", func() { + var mf *model.MediaFile + BeforeEach(func() { + var err error + mf, err = ds.MediaFile(ctx).Get("123") + Expect(err).ToNot(HaveOccurred()) + }) + + It("does not throttle raw/direct-play requests even when throttle is saturated", func() { + s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + _ = s.Close() + }) + }) }) diff --git a/core/stream/throttle.go b/core/stream/throttle.go new file mode 100644 index 000000000..8b28bba63 --- /dev/null +++ b/core/stream/throttle.go @@ -0,0 +1,103 @@ +package stream + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/log" + "github.com/navidrome/navidrome/utils/singleton" + "golang.org/x/sync/semaphore" +) + +// ErrTranscodingBusy is returned when the transcoding throttle rejects a request +// because the concurrency limit and backlog are both full. +var ErrTranscodingBusy = errors.New("too many concurrent transcodes") + +// transcodingThrottle limits the number of concurrent transcoding operations. +type transcodingThrottle struct { + sem *semaphore.Weighted + backlog atomic.Int64 + maxBacklog int64 + timeout time.Duration + disabled bool +} + +func newTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *transcodingThrottle { + if maxConcurrent <= 0 { + return &transcodingThrottle{disabled: true} + } + return &transcodingThrottle{ + sem: semaphore.NewWeighted(int64(maxConcurrent)), + maxBacklog: int64(maxBacklog), + timeout: timeout, + } +} + +// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires. +func (t *transcodingThrottle) Acquire(ctx context.Context) error { + if t.disabled { + return nil + } + + // Fast path: try to acquire without touching the backlog counter + if t.sem.TryAcquire(1) { + return nil + } + + // Slow path: semaphore is full, enter backlog queue + // Increment-then-check-then-rollback to avoid TOCTOU race + current := t.backlog.Add(1) + if current > t.maxBacklog { + t.backlog.Add(-1) + log.Warn(ctx, "Transcoding request rejected, throttle backlog full", "backlog", current-1) + return ErrTranscodingBusy + } + + log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + err := t.sem.Acquire(ctx, 1) + t.backlog.Add(-1) + if err != nil { + log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") + return ErrTranscodingBusy + } + return nil +} + +// Release frees a transcoding slot. +func (t *transcodingThrottle) Release() { + if t.disabled { + return + } + t.sem.Release(1) +} + +// releaseOnClose wraps a ReadCloser to call a release function exactly once on Close. +type releaseOnClose struct { + io.ReadCloser + release func() + once sync.Once +} + +func (r *releaseOnClose) Close() error { + err := r.ReadCloser.Close() + r.once.Do(r.release) + return err +} + +// getTranscodingThrottle returns a singleton transcodingThrottle created from the current configuration. +func getTranscodingThrottle() *transcodingThrottle { + return singleton.GetInstance(func() *transcodingThrottle { + return newTranscodingThrottle( + conf.Server.MaxConcurrentTranscodes, + conf.Server.DevTranscodeThrottleBacklogLimit, + conf.Server.DevTranscodeThrottleBacklogTimeout, + ) + }) +} diff --git a/core/stream/throttle_test.go b/core/stream/throttle_test.go new file mode 100644 index 000000000..b8b073d08 --- /dev/null +++ b/core/stream/throttle_test.go @@ -0,0 +1,109 @@ +package stream + +import ( + "context" + "errors" + "io" + "strings" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("TranscodingThrottle", func() { + Describe("Acquire/Release", func() { + It("allows up to maxConcurrent acquires", func() { + t := newTranscodingThrottle(2, 10, time.Second) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + // Third should block, so test it doesn't return immediately + ctx, cancel := context.WithTimeout(GinkgoT().Context(), 50*time.Millisecond) + defer cancel() + err := t.Acquire(ctx) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("releases a slot and allows new acquire", func() { + t := newTranscodingThrottle(1, 10, time.Second) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + t.Release() + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + }) + + It("returns ErrTranscodingBusy when backlog limit is reached", func() { + t := newTranscodingThrottle(1, 2, 5*time.Second) + // Fill the slot + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + + // Fill the backlog (2 waiters) — they block in goroutines + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Go(func() { _ = t.Acquire(GinkgoT().Context()) }) + } + // Wait until both goroutines are in the backlog + Eventually(func() int64 { return t.backlog.Load() }).Should(BeNumerically(">=", 2)) + + // Third waiter should be rejected immediately (backlog full) + err := t.Acquire(GinkgoT().Context()) + Expect(err).To(MatchError(ErrTranscodingBusy)) + + // Clean up: release all + t.Release() + t.Release() + t.Release() + wg.Wait() + }) + + It("returns ErrTranscodingBusy when timeout expires", func() { + t := newTranscodingThrottle(1, 10, 50*time.Millisecond) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(MatchError(ErrTranscodingBusy)) + }) + + It("respects context cancellation", func() { + t := newTranscodingThrottle(1, 10, 5*time.Second) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + ctx, cancel := context.WithCancel(GinkgoT().Context()) + cancel() + err := t.Acquire(ctx) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("is disabled when maxConcurrent is 0", func() { + t := newTranscodingThrottle(0, 10, time.Second) + for i := 0; i < 100; i++ { + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + } + }) + }) +}) + +var _ = Describe("releaseOnClose", func() { + It("calls release exactly once on Close", func() { + var count int + rc := &releaseOnClose{ + ReadCloser: io.NopCloser(strings.NewReader("data")), + release: func() { count++ }, + } + Expect(rc.Close()).To(Succeed()) + Expect(rc.Close()).To(Succeed()) // double close + Expect(count).To(Equal(1)) + }) + + It("propagates close error from underlying ReadCloser", func() { + rc := &releaseOnClose{ + ReadCloser: &failCloser{}, + release: func() {}, + } + err := rc.Close() + Expect(err).To(MatchError("close failed")) + }) +}) + +// failCloser is a ReadCloser whose Close always returns an error +type failCloser struct{ io.Reader } + +func (f *failCloser) Read(p []byte) (int, error) { return 0, io.EOF } +func (f *failCloser) Close() error { return errors.New("close failed") } diff --git a/server/subsonic/stream.go b/server/subsonic/stream.go index b49af2b24..9905c14de 100644 --- a/server/subsonic/stream.go +++ b/server/subsonic/stream.go @@ -1,12 +1,14 @@ package subsonic import ( + "errors" "fmt" "net/http" "strconv" "strings" "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/core/stream" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model/request" @@ -31,22 +33,25 @@ func (api *Router) Stream(w http.ResponseWriter, r *http.Request) (*responses.Su } streamReq := api.transcodeDecision.ResolveRequest(ctx, mf, format, maxBitRate, timeOffset) - stream, err := api.streamer.NewStream(ctx, mf, streamReq) + s, err := api.streamer.NewStream(ctx, mf, streamReq) if err != nil { + if errors.Is(err, stream.ErrTranscodingBusy) { + return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later") + } return nil, err } // Make sure the stream will be closed at the end, to avoid leakage defer func() { - if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { - log.Error("Error closing stream", "id", id, "file", stream.Name(), err) + if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { + log.Error("Error closing stream", "id", id, "file", s.Name(), err) } }() w.Header().Set("X-Content-Type-Options", "nosniff") - w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(stream.Duration()), 'G', -1, 32)) + w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(s.Duration()), 'G', -1, 32)) - _, err = stream.Serve(ctx, w, r) + _, err = s.Serve(ctx, w, r) return nil, err } @@ -100,22 +105,25 @@ func (api *Router) Download(w http.ResponseWriter, r *http.Request) (*responses. switch v := entity.(type) { case *model.MediaFile: streamReq := api.transcodeDecision.ResolveRequest(ctx, v, format, maxBitRate, 0) - stream, err := api.streamer.NewStream(ctx, v, streamReq) + s, err := api.streamer.NewStream(ctx, v, streamReq) if err != nil { + if errors.Is(err, stream.ErrTranscodingBusy) { + return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later") + } return nil, err } // Make sure the stream will be closed at the end, to avoid leakage defer func() { - if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { - log.Error("Error closing stream", "id", id, "file", stream.Name(), err) + if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { + log.Error("Error closing stream", "id", id, "file", s.Name(), err) } }() - disposition := fmt.Sprintf("attachment; filename=\"%s\"", stream.Name()) + disposition := fmt.Sprintf("attachment; filename=\"%s\"", s.Name()) w.Header().Set("Content-Disposition", disposition) - _, err = stream.Serve(ctx, w, r) + _, err = s.Serve(ctx, w, r) return nil, err case *model.Album: setHeaders(v.Name) diff --git a/server/subsonic/transcode.go b/server/subsonic/transcode.go index 4e494b324..eb50fc251 100644 --- a/server/subsonic/transcode.go +++ b/server/subsonic/transcode.go @@ -379,8 +379,13 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (* } // Create stream - stream, err := api.streamer.NewStream(ctx, mf, streamReq) + s, err := api.streamer.NewStream(ctx, mf, streamReq) if err != nil { + if errors.Is(err, stream.ErrTranscodingBusy) { + log.Warn(ctx, "Transcoding throttle full", "mediaID", mediaID) + http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) + return nil, nil + } log.Error(ctx, "Error creating stream", "mediaID", mediaID, err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return nil, nil @@ -388,14 +393,14 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (* // Make sure the stream will be closed at the end defer func() { - if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { - log.Error("Error closing stream", "id", mediaID, "file", stream.Name(), err) + if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { + log.Error("Error closing stream", "id", mediaID, "file", s.Name(), err) } }() w.Header().Set("X-Content-Type-Options", "nosniff") - n, err := stream.Serve(ctx, w, r) + n, err := s.Serve(ctx, w, r) if err != nil || n == 0 { http.Error(w, "Internal Server Error", http.StatusInternalServerError) } diff --git a/server/subsonic/transcode_test.go b/server/subsonic/transcode_test.go index 15ba168d7..e95692098 100644 --- a/server/subsonic/transcode_test.go +++ b/server/subsonic/transcode_test.go @@ -7,6 +7,8 @@ import ( "net/http" "net/http/httptest" + "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/conf/configtest" "github.com/navidrome/navidrome/core/stream" "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/tests" @@ -322,6 +324,47 @@ var _ = Describe("Transcode endpoints", func() { Expect(fakeStreamer.captured.Channels).To(Equal(2)) Expect(fakeStreamer.captured.Offset).To(Equal(10)) }) + + It("returns 503 when transcoding throttle is full", func() { + busyStreamer := &busyMediaStreamer{} + router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD) + mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}}) + mockTD.resolvedReq = stream.Request{Format: "mp3", BitRate: 128} + + r := newGetRequest("mediaId=song-1", "mediaType=song", "transcodeParams=valid-token") + resp, err := router.GetTranscodeStream(w, r) + Expect(err).ToNot(HaveOccurred()) + Expect(resp).To(BeNil()) + Expect(w.Code).To(Equal(http.StatusServiceUnavailable)) + }) + }) + + Describe("Stream - throttle", func() { + It("returns error when transcoding throttle is full", func() { + busyStreamer := &busyMediaStreamer{} + router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD) + mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}}) + + r := newGetRequest("id=song-1") + _, err := router.Stream(w, r) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes")) + }) + }) + + Describe("Download - throttle", func() { + It("returns error when transcoding throttle is full", func() { + DeferCleanup(configtest.SetupConfig()) + conf.Server.EnableDownloads = true + busyStreamer := &busyMediaStreamer{} + router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD) + mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}}) + + r := newGetRequest("id=song-1") + _, err := router.Download(w, r) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes")) + }) }) Describe("bpsToKbps", func() { @@ -422,3 +465,10 @@ func (f *fakeMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, req f.captured = &req return nil, errStreamCaptured } + +// busyMediaStreamer always returns ErrTranscodingBusy from NewStream +type busyMediaStreamer struct{} + +func (b *busyMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, _ stream.Request) (*stream.Stream, error) { + return nil, stream.ErrTranscodingBusy +}