From 58f7959204481574feb350e5c261c11450d724e9 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 07:55:44 -0400 Subject: [PATCH 01/11] feat(stream): add TranscodingThrottle with tests --- core/stream/throttle.go | 92 ++++++++++++++++++++++++++++ core/stream/throttle_test.go | 114 +++++++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 core/stream/throttle.go create mode 100644 core/stream/throttle_test.go diff --git a/core/stream/throttle.go b/core/stream/throttle.go new file mode 100644 index 000000000..acf219dc8 --- /dev/null +++ b/core/stream/throttle.go @@ -0,0 +1,92 @@ +package stream + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/navidrome/navidrome/log" + "golang.org/x/sync/semaphore" +) + +// ErrTranscodingBusy is returned when the transcoding throttle rejects a request +// because the concurrency limit and backlog are both full. +var ErrTranscodingBusy = errors.New("too many concurrent transcodes") + +// TranscodingThrottle limits the number of concurrent transcoding operations. +type TranscodingThrottle struct { + sem *semaphore.Weighted + backlog atomic.Int64 + maxBacklog int64 + timeout time.Duration + disabled bool +} + +// NewTranscodingThrottle creates a throttle that allows maxConcurrent simultaneous +// transcodes with maxBacklog queued waiters and the given timeout. +// If maxConcurrent is 0, throttling is disabled. +func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *TranscodingThrottle { + if maxConcurrent <= 0 { + return &TranscodingThrottle{disabled: true} + } + return &TranscodingThrottle{ + sem: semaphore.NewWeighted(int64(maxConcurrent)), + maxBacklog: int64(maxBacklog), + timeout: timeout, + } +} + +// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires. +func (t *TranscodingThrottle) Acquire(ctx context.Context) error { + if t.disabled { + return nil + } + + // Increment-then-check-then-rollback to avoid TOCTOU race + current := t.backlog.Add(1) + if current > t.maxBacklog { + t.backlog.Add(-1) + log.Warn(ctx, "Transcoding request rejected, throttle backlog full", "backlog", current-1) + return ErrTranscodingBusy + } + + if !t.sem.TryAcquire(1) { + log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + err := t.sem.Acquire(ctx, 1) + t.backlog.Add(-1) + if err != nil { + log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") + return ErrTranscodingBusy + } + return nil + } + + t.backlog.Add(-1) + return nil +} + +// Release frees a transcoding slot. +func (t *TranscodingThrottle) Release() { + if t.disabled { + return + } + t.sem.Release(1) +} + +// releaseOnClose wraps a ReadCloser to call a release function exactly once on Close. +type releaseOnClose struct { + io.ReadCloser + release func() + once sync.Once +} + +func (r *releaseOnClose) Close() error { + err := r.ReadCloser.Close() + r.once.Do(r.release) + return err +} diff --git a/core/stream/throttle_test.go b/core/stream/throttle_test.go new file mode 100644 index 000000000..1c8c7a40d --- /dev/null +++ b/core/stream/throttle_test.go @@ -0,0 +1,114 @@ +package stream + +import ( + "context" + "errors" + "io" + "strings" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("TranscodingThrottle", func() { + Describe("Acquire/Release", func() { + It("allows up to maxConcurrent acquires", func() { + t := NewTranscodingThrottle(2, 10, time.Second) + Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(context.Background())).To(Succeed()) + // Third should block, so test it doesn't return immediately + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err := t.Acquire(ctx) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("releases a slot and allows new acquire", func() { + t := NewTranscodingThrottle(1, 10, time.Second) + Expect(t.Acquire(context.Background())).To(Succeed()) + t.Release() + Expect(t.Acquire(context.Background())).To(Succeed()) + }) + + It("returns ErrTranscodingBusy when backlog limit is reached", func() { + t := NewTranscodingThrottle(1, 2, 5*time.Second) + // Fill the slot + Expect(t.Acquire(context.Background())).To(Succeed()) + + // Fill the backlog (2 waiters) — they block in goroutines + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = t.Acquire(context.Background()) + }() + } + // Give goroutines time to enter backlog + time.Sleep(50 * time.Millisecond) + + // Third waiter should be rejected immediately (backlog full) + err := t.Acquire(context.Background()) + Expect(err).To(MatchError(ErrTranscodingBusy)) + + // Clean up: release all + t.Release() + t.Release() + t.Release() + wg.Wait() + }) + + It("returns ErrTranscodingBusy when timeout expires", func() { + t := NewTranscodingThrottle(1, 10, 50*time.Millisecond) + Expect(t.Acquire(context.Background())).To(Succeed()) + err := t.Acquire(context.Background()) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("respects context cancellation", func() { + t := NewTranscodingThrottle(1, 10, 5*time.Second) + Expect(t.Acquire(context.Background())).To(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := t.Acquire(ctx) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("is disabled when maxConcurrent is 0", func() { + t := NewTranscodingThrottle(0, 10, time.Second) + for i := 0; i < 100; i++ { + Expect(t.Acquire(context.Background())).To(Succeed()) + } + }) + }) +}) + +var _ = Describe("releaseOnClose", func() { + It("calls release exactly once on Close", func() { + var count int + rc := &releaseOnClose{ + ReadCloser: io.NopCloser(strings.NewReader("data")), + release: func() { count++ }, + } + Expect(rc.Close()).To(Succeed()) + Expect(rc.Close()).To(Succeed()) // double close + Expect(count).To(Equal(1)) + }) + + It("propagates close error from underlying ReadCloser", func() { + rc := &releaseOnClose{ + ReadCloser: &failCloser{}, + release: func() {}, + } + err := rc.Close() + Expect(err).To(MatchError("close failed")) + }) +}) + +// failCloser is a ReadCloser whose Close always returns an error +type failCloser struct{ io.Reader } + +func (f *failCloser) Read(p []byte) (int, error) { return 0, io.EOF } +func (f *failCloser) Close() error { return errors.New("close failed") } From 9219a8c2a4c77becae30499b5f06c21454889cbc Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 07:58:41 -0400 Subject: [PATCH 02/11] feat(conf): add MaxConcurrentTranscodes and dev throttle config options --- conf/configuration.go | 64 +++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/conf/configuration.go b/conf/configuration.go index 5f74d6db0..441a5bcf4 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -58,6 +58,7 @@ type configOptions struct { PlaylistsPath string SmartPlaylistRefreshDelay time.Duration AutoTranscodeDownload bool + MaxConcurrentTranscodes int DefaultDownsamplingFormat string Search searchOptions `json:",omitzero"` SimilarSongsMatchThreshold int @@ -113,35 +114,37 @@ type configOptions struct { Agents string // DevFlags. These are used to enable/disable debugging and incomplete features - DevLogLevels map[string]string `json:",omitempty"` - DevLogSourceLine bool - DevEnableProfiler bool - DevAutoCreateAdminPassword string - DevAutoLoginUsername string - DevActivityPanel bool - DevActivityPanelUpdateRate time.Duration - DevSidebarPlaylists bool - DevShowArtistPage bool - DevUIShowConfig bool - DevNewEventStream bool - DevOffsetOptimize int - DevArtworkMaxRequests int - DevArtworkThrottleBacklogLimit int - DevArtworkThrottleBacklogTimeout time.Duration - DevArtistInfoTimeToLive time.Duration - DevAlbumInfoTimeToLive time.Duration - DevExternalScanner bool - DevScannerThreads uint - DevSelectiveWatcher bool - DevInsightsInitialDelay time.Duration - DevEnablePlayerInsights bool - DevEnablePluginsInsights bool - DevPluginCompilationTimeout time.Duration - DevExternalArtistFetchMultiplier float64 - DevOptimizeDB bool - DevPreserveUnicodeInExternalCalls bool - DevEnableMediaFileProbe bool - DevJpegCoverArt bool + DevLogLevels map[string]string `json:",omitempty"` + DevLogSourceLine bool + DevEnableProfiler bool + DevAutoCreateAdminPassword string + DevAutoLoginUsername string + DevActivityPanel bool + DevActivityPanelUpdateRate time.Duration + DevSidebarPlaylists bool + DevShowArtistPage bool + DevUIShowConfig bool + DevNewEventStream bool + DevOffsetOptimize int + DevArtworkMaxRequests int + DevArtworkThrottleBacklogLimit int + DevArtworkThrottleBacklogTimeout time.Duration + DevTranscodeThrottleBacklogLimit int + DevTranscodeThrottleBacklogTimeout time.Duration + DevArtistInfoTimeToLive time.Duration + DevAlbumInfoTimeToLive time.Duration + DevExternalScanner bool + DevScannerThreads uint + DevSelectiveWatcher bool + DevInsightsInitialDelay time.Duration + DevEnablePlayerInsights bool + DevEnablePluginsInsights bool + DevPluginCompilationTimeout time.Duration + DevExternalArtistFetchMultiplier float64 + DevOptimizeDB bool + DevPreserveUnicodeInExternalCalls bool + DevEnableMediaFileProbe bool + DevJpegCoverArt bool } type scannerOptions struct { @@ -663,6 +666,7 @@ func setViperDefaults() { viper.SetDefault("enablem3uexternalalbumart", false) viper.SetDefault("enablemediafilecoverart", true) viper.SetDefault("autotranscodedownload", false) + viper.SetDefault("maxconcurrenttranscodes", 4) viper.SetDefault("defaultdownsamplingformat", consts.DefaultDownsamplingFormat) viper.SetDefault("search.fullstring", false) viper.SetDefault("search.backend", "fts") @@ -770,6 +774,8 @@ func setViperDefaults() { viper.SetDefault("devartworkmaxrequests", max(4, runtime.NumCPU())) viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit) viper.SetDefault("devartworkthrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout) + viper.SetDefault("devtranscodethrottlebackloglimit", consts.RequestThrottleBacklogLimit) + viper.SetDefault("devtranscodethrottlebacklogtimeout", consts.RequestThrottleBacklogTimeout) viper.SetDefault("devartistinfotimetolive", consts.ArtistInfoTimeToLive) viper.SetDefault("devalbuminfotimetolive", consts.AlbumInfoTimeToLive) viper.SetDefault("devexternalscanner", true) From b018d7b634ea60392d9a15ef3b35b7aa0c59e892 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 07:58:44 -0400 Subject: [PATCH 03/11] feat(stream): wire TranscodingThrottle into MediaStreamer --- cmd/wire_gen.go | 6 ++++-- core/stream/media_streamer.go | 5 +++-- core/stream/media_streamer_test.go | 3 ++- core/stream/throttle.go | 10 ++++++++++ core/wire_providers.go | 1 + 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index 5b9fd648f..93ff02463 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -96,7 +96,8 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router { provider := external.NewProvider(dataStore, agentsAgents) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) transcodingCache := stream.GetTranscodingCache() - mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) + transcodingThrottle := stream.GetTranscodingThrottle() + mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle) share := core.NewShare(dataStore) archiver := core.NewArchiver(mediaStreamer, dataStore, share) players := core.NewPlayers(dataStore) @@ -124,7 +125,8 @@ func CreatePublicRouter() *public.Router { provider := external.NewProvider(dataStore, agentsAgents) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) transcodingCache := stream.GetTranscodingCache() - mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) + transcodingThrottle := stream.GetTranscodingThrottle() + mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle) share := core.NewShare(dataStore) archiver := core.NewArchiver(mediaStreamer, dataStore, share) router := public.New(dataStore, artworkArtwork, mediaStreamer, share, archiver) diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index de03b4d2f..84fa84321 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -27,14 +27,15 @@ type MediaStreamer interface { type TranscodingCache cache.FileCache -func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer { - return &mediaStreamer{ds: ds, transcoder: t, cache: cache} +func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache, throttle *TranscodingThrottle) MediaStreamer { + return &mediaStreamer{ds: ds, transcoder: t, cache: cache, throttle: throttle} } type mediaStreamer struct { ds model.DataStore transcoder ffmpeg.FFmpeg cache cache.FileCache + throttle *TranscodingThrottle } type streamJob struct { diff --git a/core/stream/media_streamer_test.go b/core/stream/media_streamer_test.go index 1bc21e239..c7c7268fd 100644 --- a/core/stream/media_streamer_test.go +++ b/core/stream/media_streamer_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "time" "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf/configtest" @@ -31,7 +32,7 @@ var _ = Describe("MediaStreamer", func() { }) testCache := stream.NewTranscodingCache() Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache) + streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache, stream.NewTranscodingThrottle(0, 100, time.Minute)) }) AfterEach(func() { _ = os.RemoveAll(conf.Server.CacheFolder) diff --git a/core/stream/throttle.go b/core/stream/throttle.go index acf219dc8..d9386e7b9 100644 --- a/core/stream/throttle.go +++ b/core/stream/throttle.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/log" "golang.org/x/sync/semaphore" ) @@ -90,3 +91,12 @@ func (r *releaseOnClose) Close() error { r.once.Do(r.release) return err } + +// GetTranscodingThrottle creates a TranscodingThrottle from the current configuration. +func GetTranscodingThrottle() *TranscodingThrottle { + return NewTranscodingThrottle( + conf.Server.MaxConcurrentTranscodes, + conf.Server.DevTranscodeThrottleBacklogLimit, + conf.Server.DevTranscodeThrottleBacklogTimeout, + ) +} diff --git a/core/wire_providers.go b/core/wire_providers.go index 276d9556a..e53b789be 100644 --- a/core/wire_providers.go +++ b/core/wire_providers.go @@ -16,6 +16,7 @@ import ( var Set = wire.NewSet( stream.NewMediaStreamer, stream.GetTranscodingCache, + stream.GetTranscodingThrottle, NewArchiver, NewPlayers, NewShare, From 4d14ad812cc2536032a468939039b08caeb041b1 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:03:21 -0400 Subject: [PATCH 04/11] feat(stream): integrate TranscodingThrottle into NewStream --- core/stream/media_streamer.go | 16 +++++++++++-- core/stream/media_streamer_test.go | 37 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index 84fa84321..da605fb8e 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -92,6 +92,11 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req return s, nil } + // Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss) + if err := ms.throttle.Acquire(ctx); err != nil { + return nil, err + } + job := &streamJob{ ms: ms, mf: mf, @@ -105,12 +110,19 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } r, err := ms.cache.Get(ctx, job) if err != nil { + ms.throttle.Release() log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err) return nil, err } cached = r.Cached - - s.ReadCloser = r + if cached { + // Cache hit — no ffmpeg process running, release the slot immediately + ms.throttle.Release() + s.ReadCloser = r + } else { + // Cache miss — slot released when stream is closed + s.ReadCloser = &releaseOnClose{ReadCloser: r, release: ms.throttle.Release} + } s.Seeker = r.Seeker log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", filePath, diff --git a/core/stream/media_streamer_test.go b/core/stream/media_streamer_test.go index c7c7268fd..a5380324b 100644 --- a/core/stream/media_streamer_test.go +++ b/core/stream/media_streamer_test.go @@ -73,4 +73,41 @@ var _ = Describe("MediaStreamer", func() { Expect(s.Seekable()).To(BeTrue()) }) }) + + Context("NewStream with throttle", func() { + var mf *model.MediaFile + BeforeEach(func() { + var err error + mf, err = ds.MediaFile(ctx).Get("123") + Expect(err).ToNot(HaveOccurred()) + }) + + It("returns ErrTranscodingBusy when throttle rejects", func() { + throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) + Expect(throttle.Acquire(context.Background())).To(Succeed()) + + testCache := stream.NewTranscodingCache() + Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) + throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) + + _, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64}) + Expect(err).To(MatchError(stream.ErrTranscodingBusy)) + throttle.Release() + }) + + It("does not throttle raw/direct-play requests", func() { + throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) + Expect(throttle.Acquire(context.Background())).To(Succeed()) + + testCache := stream.NewTranscodingCache() + Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) + throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) + + s, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + _ = s.Close() + throttle.Release() + }) + }) }) From 87e02a3c2ded2d20312edede26ba4352d7cf7179 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:05:41 -0400 Subject: [PATCH 05/11] feat(subsonic): handle ErrTranscodingBusy in Stream and Download endpoints --- server/subsonic/stream.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/server/subsonic/stream.go b/server/subsonic/stream.go index b49af2b24..9905c14de 100644 --- a/server/subsonic/stream.go +++ b/server/subsonic/stream.go @@ -1,12 +1,14 @@ package subsonic import ( + "errors" "fmt" "net/http" "strconv" "strings" "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/core/stream" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model/request" @@ -31,22 +33,25 @@ func (api *Router) Stream(w http.ResponseWriter, r *http.Request) (*responses.Su } streamReq := api.transcodeDecision.ResolveRequest(ctx, mf, format, maxBitRate, timeOffset) - stream, err := api.streamer.NewStream(ctx, mf, streamReq) + s, err := api.streamer.NewStream(ctx, mf, streamReq) if err != nil { + if errors.Is(err, stream.ErrTranscodingBusy) { + return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later") + } return nil, err } // Make sure the stream will be closed at the end, to avoid leakage defer func() { - if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { - log.Error("Error closing stream", "id", id, "file", stream.Name(), err) + if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { + log.Error("Error closing stream", "id", id, "file", s.Name(), err) } }() w.Header().Set("X-Content-Type-Options", "nosniff") - w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(stream.Duration()), 'G', -1, 32)) + w.Header().Set("X-Content-Duration", strconv.FormatFloat(float64(s.Duration()), 'G', -1, 32)) - _, err = stream.Serve(ctx, w, r) + _, err = s.Serve(ctx, w, r) return nil, err } @@ -100,22 +105,25 @@ func (api *Router) Download(w http.ResponseWriter, r *http.Request) (*responses. switch v := entity.(type) { case *model.MediaFile: streamReq := api.transcodeDecision.ResolveRequest(ctx, v, format, maxBitRate, 0) - stream, err := api.streamer.NewStream(ctx, v, streamReq) + s, err := api.streamer.NewStream(ctx, v, streamReq) if err != nil { + if errors.Is(err, stream.ErrTranscodingBusy) { + return nil, newError(responses.ErrorGeneric, "too many concurrent transcodes, try again later") + } return nil, err } // Make sure the stream will be closed at the end, to avoid leakage defer func() { - if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { - log.Error("Error closing stream", "id", id, "file", stream.Name(), err) + if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { + log.Error("Error closing stream", "id", id, "file", s.Name(), err) } }() - disposition := fmt.Sprintf("attachment; filename=\"%s\"", stream.Name()) + disposition := fmt.Sprintf("attachment; filename=\"%s\"", s.Name()) w.Header().Set("Content-Disposition", disposition) - _, err = stream.Serve(ctx, w, r) + _, err = s.Serve(ctx, w, r) return nil, err case *model.Album: setHeaders(v.Name) From 803d96bf8c85cd35191cf9cc2cb2288f41c066b6 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:05:45 -0400 Subject: [PATCH 06/11] feat(subsonic): handle ErrTranscodingBusy in GetTranscodeStream with HTTP 503 --- server/subsonic/transcode.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/server/subsonic/transcode.go b/server/subsonic/transcode.go index 4e494b324..eb50fc251 100644 --- a/server/subsonic/transcode.go +++ b/server/subsonic/transcode.go @@ -379,8 +379,13 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (* } // Create stream - stream, err := api.streamer.NewStream(ctx, mf, streamReq) + s, err := api.streamer.NewStream(ctx, mf, streamReq) if err != nil { + if errors.Is(err, stream.ErrTranscodingBusy) { + log.Warn(ctx, "Transcoding throttle full", "mediaID", mediaID) + http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) + return nil, nil + } log.Error(ctx, "Error creating stream", "mediaID", mediaID, err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return nil, nil @@ -388,14 +393,14 @@ func (api *Router) GetTranscodeStream(w http.ResponseWriter, r *http.Request) (* // Make sure the stream will be closed at the end defer func() { - if err := stream.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { - log.Error("Error closing stream", "id", mediaID, "file", stream.Name(), err) + if err := s.Close(); err != nil && log.IsGreaterOrEqualTo(log.LevelDebug) { + log.Error("Error closing stream", "id", mediaID, "file", s.Name(), err) } }() w.Header().Set("X-Content-Type-Options", "nosniff") - n, err := stream.Serve(ctx, w, r) + n, err := s.Serve(ctx, w, r) if err != nil || n == 0 { http.Error(w, "Internal Server Error", http.StatusInternalServerError) } From 179ba1e6002ae84cf933410588a8d9b342e2563f Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:08:55 -0400 Subject: [PATCH 07/11] test(subsonic): add handler tests for ErrTranscodingBusy in all endpoints --- server/subsonic/transcode_test.go | 50 +++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/server/subsonic/transcode_test.go b/server/subsonic/transcode_test.go index 15ba168d7..e95692098 100644 --- a/server/subsonic/transcode_test.go +++ b/server/subsonic/transcode_test.go @@ -7,6 +7,8 @@ import ( "net/http" "net/http/httptest" + "github.com/navidrome/navidrome/conf" + "github.com/navidrome/navidrome/conf/configtest" "github.com/navidrome/navidrome/core/stream" "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/tests" @@ -322,6 +324,47 @@ var _ = Describe("Transcode endpoints", func() { Expect(fakeStreamer.captured.Channels).To(Equal(2)) Expect(fakeStreamer.captured.Offset).To(Equal(10)) }) + + It("returns 503 when transcoding throttle is full", func() { + busyStreamer := &busyMediaStreamer{} + router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD) + mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}}) + mockTD.resolvedReq = stream.Request{Format: "mp3", BitRate: 128} + + r := newGetRequest("mediaId=song-1", "mediaType=song", "transcodeParams=valid-token") + resp, err := router.GetTranscodeStream(w, r) + Expect(err).ToNot(HaveOccurred()) + Expect(resp).To(BeNil()) + Expect(w.Code).To(Equal(http.StatusServiceUnavailable)) + }) + }) + + Describe("Stream - throttle", func() { + It("returns error when transcoding throttle is full", func() { + busyStreamer := &busyMediaStreamer{} + router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD) + mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}}) + + r := newGetRequest("id=song-1") + _, err := router.Stream(w, r) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes")) + }) + }) + + Describe("Download - throttle", func() { + It("returns error when transcoding throttle is full", func() { + DeferCleanup(configtest.SetupConfig()) + conf.Server.EnableDownloads = true + busyStreamer := &busyMediaStreamer{} + router = New(ds, nil, busyStreamer, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, mockTD) + mockMFRepo.SetData(model.MediaFiles{{ID: "song-1"}}) + + r := newGetRequest("id=song-1") + _, err := router.Download(w, r) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("too many concurrent transcodes")) + }) }) Describe("bpsToKbps", func() { @@ -422,3 +465,10 @@ func (f *fakeMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, req f.captured = &req return nil, errStreamCaptured } + +// busyMediaStreamer always returns ErrTranscodingBusy from NewStream +type busyMediaStreamer struct{} + +func (b *busyMediaStreamer) NewStream(_ context.Context, _ *model.MediaFile, _ stream.Request) (*stream.Stream, error) { + return nil, stream.ErrTranscodingBusy +} From c7865ff15932a091bc80cdf96d9499d71842ce31 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:17:20 -0400 Subject: [PATCH 08/11] fix(stream): make TranscodingThrottle a singleton and optimize hot path - Use sync.Once to ensure a single shared throttle instance across all wire-injected consumers (subsonic + public routers) - Optimize Acquire() fast path: TryAcquire before touching the atomic backlog counter, avoiding unnecessary atomic ops when slots are free refactor(stream): use singleton package for TranscodingThrottle --- core/stream/throttle.go | 42 ++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/core/stream/throttle.go b/core/stream/throttle.go index d9386e7b9..b28a75479 100644 --- a/core/stream/throttle.go +++ b/core/stream/throttle.go @@ -10,6 +10,7 @@ import ( "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/log" + "github.com/navidrome/navidrome/utils/singleton" "golang.org/x/sync/semaphore" ) @@ -46,6 +47,12 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error { return nil } + // Fast path: try to acquire without touching the backlog counter + if t.sem.TryAcquire(1) { + return nil + } + + // Slow path: semaphore is full, enter backlog queue // Increment-then-check-then-rollback to avoid TOCTOU race current := t.backlog.Add(1) if current > t.maxBacklog { @@ -54,20 +61,15 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error { return ErrTranscodingBusy } - if !t.sem.TryAcquire(1) { - log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) - ctx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - err := t.sem.Acquire(ctx, 1) - t.backlog.Add(-1) - if err != nil { - log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") - return ErrTranscodingBusy - } - return nil - } - + log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + err := t.sem.Acquire(ctx, 1) t.backlog.Add(-1) + if err != nil { + log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") + return ErrTranscodingBusy + } return nil } @@ -92,11 +94,13 @@ func (r *releaseOnClose) Close() error { return err } -// GetTranscodingThrottle creates a TranscodingThrottle from the current configuration. +// GetTranscodingThrottle returns a singleton TranscodingThrottle created from the current configuration. func GetTranscodingThrottle() *TranscodingThrottle { - return NewTranscodingThrottle( - conf.Server.MaxConcurrentTranscodes, - conf.Server.DevTranscodeThrottleBacklogLimit, - conf.Server.DevTranscodeThrottleBacklogTimeout, - ) + return singleton.GetInstance(func() *TranscodingThrottle { + return NewTranscodingThrottle( + conf.Server.MaxConcurrentTranscodes, + conf.Server.DevTranscodeThrottleBacklogLimit, + conf.Server.DevTranscodeThrottleBacklogTimeout, + ) + }) } From faa4acf5835eab8e7892fcf1a0840814323c2671 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:27:23 -0400 Subject: [PATCH 09/11] refactor(stream): make TranscodingThrottle an internal component of mediaStreamer The throttle is an implementation detail of mediaStreamer, not a dependency that needs to be injected. Remove it from the constructor and wire providers; create it internally via a singleton. This simplifies the API and keeps Release() unexported. --- cmd/wire_gen.go | 6 ++---- core/stream/media_streamer.go | 13 ++++++------- core/stream/media_streamer_test.go | 29 ++++------------------------- core/stream/throttle.go | 25 +++++++++++-------------- core/stream/throttle_test.go | 12 ++++++------ core/wire_providers.go | 1 - 6 files changed, 29 insertions(+), 57 deletions(-) diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index 93ff02463..5b9fd648f 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -96,8 +96,7 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router { provider := external.NewProvider(dataStore, agentsAgents) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) transcodingCache := stream.GetTranscodingCache() - transcodingThrottle := stream.GetTranscodingThrottle() - mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle) + mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) share := core.NewShare(dataStore) archiver := core.NewArchiver(mediaStreamer, dataStore, share) players := core.NewPlayers(dataStore) @@ -125,8 +124,7 @@ func CreatePublicRouter() *public.Router { provider := external.NewProvider(dataStore, agentsAgents) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) transcodingCache := stream.GetTranscodingCache() - transcodingThrottle := stream.GetTranscodingThrottle() - mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle) + mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) share := core.NewShare(dataStore) archiver := core.NewArchiver(mediaStreamer, dataStore, share) router := public.New(dataStore, artworkArtwork, mediaStreamer, share, archiver) diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index da605fb8e..1396aa493 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -27,15 +27,14 @@ type MediaStreamer interface { type TranscodingCache cache.FileCache -func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache, throttle *TranscodingThrottle) MediaStreamer { - return &mediaStreamer{ds: ds, transcoder: t, cache: cache, throttle: throttle} +func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer { + return &mediaStreamer{ds: ds, transcoder: t, cache: cache} } type mediaStreamer struct { ds model.DataStore transcoder ffmpeg.FFmpeg cache cache.FileCache - throttle *TranscodingThrottle } type streamJob struct { @@ -93,7 +92,7 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } // Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss) - if err := ms.throttle.Acquire(ctx); err != nil { + if err := getTranscodingThrottle().Acquire(ctx); err != nil { return nil, err } @@ -110,18 +109,18 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } r, err := ms.cache.Get(ctx, job) if err != nil { - ms.throttle.Release() + getTranscodingThrottle().Release() log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err) return nil, err } cached = r.Cached if cached { // Cache hit — no ffmpeg process running, release the slot immediately - ms.throttle.Release() + getTranscodingThrottle().Release() s.ReadCloser = r } else { // Cache miss — slot released when stream is closed - s.ReadCloser = &releaseOnClose{ReadCloser: r, release: ms.throttle.Release} + s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release} } s.Seeker = r.Seeker diff --git a/core/stream/media_streamer_test.go b/core/stream/media_streamer_test.go index a5380324b..9cd987a57 100644 --- a/core/stream/media_streamer_test.go +++ b/core/stream/media_streamer_test.go @@ -4,7 +4,6 @@ import ( "context" "io" "os" - "time" "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf/configtest" @@ -32,7 +31,8 @@ var _ = Describe("MediaStreamer", func() { }) testCache := stream.NewTranscodingCache() Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache, stream.NewTranscodingThrottle(0, 100, time.Minute)) + conf.Server.MaxConcurrentTranscodes = 0 // Disable throttling for general tests + streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache) }) AfterEach(func() { _ = os.RemoveAll(conf.Server.CacheFolder) @@ -82,32 +82,11 @@ var _ = Describe("MediaStreamer", func() { Expect(err).ToNot(HaveOccurred()) }) - It("returns ErrTranscodingBusy when throttle rejects", func() { - throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) - Expect(throttle.Acquire(context.Background())).To(Succeed()) - - testCache := stream.NewTranscodingCache() - Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) - - _, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64}) - Expect(err).To(MatchError(stream.ErrTranscodingBusy)) - throttle.Release() - }) - - It("does not throttle raw/direct-play requests", func() { - throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) - Expect(throttle.Acquire(context.Background())).To(Succeed()) - - testCache := stream.NewTranscodingCache() - Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) - - s, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) + It("does not throttle raw/direct-play requests even when throttle is saturated", func() { + s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) Expect(err).ToNot(HaveOccurred()) Expect(s).ToNot(BeNil()) _ = s.Close() - throttle.Release() }) }) }) diff --git a/core/stream/throttle.go b/core/stream/throttle.go index b28a75479..8b28bba63 100644 --- a/core/stream/throttle.go +++ b/core/stream/throttle.go @@ -18,8 +18,8 @@ import ( // because the concurrency limit and backlog are both full. var ErrTranscodingBusy = errors.New("too many concurrent transcodes") -// TranscodingThrottle limits the number of concurrent transcoding operations. -type TranscodingThrottle struct { +// transcodingThrottle limits the number of concurrent transcoding operations. +type transcodingThrottle struct { sem *semaphore.Weighted backlog atomic.Int64 maxBacklog int64 @@ -27,14 +27,11 @@ type TranscodingThrottle struct { disabled bool } -// NewTranscodingThrottle creates a throttle that allows maxConcurrent simultaneous -// transcodes with maxBacklog queued waiters and the given timeout. -// If maxConcurrent is 0, throttling is disabled. -func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *TranscodingThrottle { +func newTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *transcodingThrottle { if maxConcurrent <= 0 { - return &TranscodingThrottle{disabled: true} + return &transcodingThrottle{disabled: true} } - return &TranscodingThrottle{ + return &transcodingThrottle{ sem: semaphore.NewWeighted(int64(maxConcurrent)), maxBacklog: int64(maxBacklog), timeout: timeout, @@ -42,7 +39,7 @@ func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration } // Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires. -func (t *TranscodingThrottle) Acquire(ctx context.Context) error { +func (t *transcodingThrottle) Acquire(ctx context.Context) error { if t.disabled { return nil } @@ -74,7 +71,7 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error { } // Release frees a transcoding slot. -func (t *TranscodingThrottle) Release() { +func (t *transcodingThrottle) Release() { if t.disabled { return } @@ -94,10 +91,10 @@ func (r *releaseOnClose) Close() error { return err } -// GetTranscodingThrottle returns a singleton TranscodingThrottle created from the current configuration. -func GetTranscodingThrottle() *TranscodingThrottle { - return singleton.GetInstance(func() *TranscodingThrottle { - return NewTranscodingThrottle( +// getTranscodingThrottle returns a singleton transcodingThrottle created from the current configuration. +func getTranscodingThrottle() *transcodingThrottle { + return singleton.GetInstance(func() *transcodingThrottle { + return newTranscodingThrottle( conf.Server.MaxConcurrentTranscodes, conf.Server.DevTranscodeThrottleBacklogLimit, conf.Server.DevTranscodeThrottleBacklogTimeout, diff --git a/core/stream/throttle_test.go b/core/stream/throttle_test.go index 1c8c7a40d..719e9ba5e 100644 --- a/core/stream/throttle_test.go +++ b/core/stream/throttle_test.go @@ -15,7 +15,7 @@ import ( var _ = Describe("TranscodingThrottle", func() { Describe("Acquire/Release", func() { It("allows up to maxConcurrent acquires", func() { - t := NewTranscodingThrottle(2, 10, time.Second) + t := newTranscodingThrottle(2, 10, time.Second) Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(context.Background())).To(Succeed()) // Third should block, so test it doesn't return immediately @@ -26,14 +26,14 @@ var _ = Describe("TranscodingThrottle", func() { }) It("releases a slot and allows new acquire", func() { - t := NewTranscodingThrottle(1, 10, time.Second) + t := newTranscodingThrottle(1, 10, time.Second) Expect(t.Acquire(context.Background())).To(Succeed()) t.Release() Expect(t.Acquire(context.Background())).To(Succeed()) }) It("returns ErrTranscodingBusy when backlog limit is reached", func() { - t := NewTranscodingThrottle(1, 2, 5*time.Second) + t := newTranscodingThrottle(1, 2, 5*time.Second) // Fill the slot Expect(t.Acquire(context.Background())).To(Succeed()) @@ -61,14 +61,14 @@ var _ = Describe("TranscodingThrottle", func() { }) It("returns ErrTranscodingBusy when timeout expires", func() { - t := NewTranscodingThrottle(1, 10, 50*time.Millisecond) + t := newTranscodingThrottle(1, 10, 50*time.Millisecond) Expect(t.Acquire(context.Background())).To(Succeed()) err := t.Acquire(context.Background()) Expect(err).To(MatchError(ErrTranscodingBusy)) }) It("respects context cancellation", func() { - t := NewTranscodingThrottle(1, 10, 5*time.Second) + t := newTranscodingThrottle(1, 10, 5*time.Second) Expect(t.Acquire(context.Background())).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -77,7 +77,7 @@ var _ = Describe("TranscodingThrottle", func() { }) It("is disabled when maxConcurrent is 0", func() { - t := NewTranscodingThrottle(0, 10, time.Second) + t := newTranscodingThrottle(0, 10, time.Second) for i := 0; i < 100; i++ { Expect(t.Acquire(context.Background())).To(Succeed()) } diff --git a/core/wire_providers.go b/core/wire_providers.go index e53b789be..276d9556a 100644 --- a/core/wire_providers.go +++ b/core/wire_providers.go @@ -16,7 +16,6 @@ import ( var Set = wire.NewSet( stream.NewMediaStreamer, stream.GetTranscodingCache, - stream.GetTranscodingThrottle, NewArchiver, NewPlayers, NewShare, From 6fe2b76ac173479fd62e8af23b4d7156cf4fac64 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:30:44 -0400 Subject: [PATCH 10/11] feat(configuration): set maxconcurrenttranscodes to the maximum of 4 or available CPU cores Signed-off-by: Deluan --- conf/configuration.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/configuration.go b/conf/configuration.go index 441a5bcf4..5adaeca8a 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -666,7 +666,7 @@ func setViperDefaults() { viper.SetDefault("enablem3uexternalalbumart", false) viper.SetDefault("enablemediafilecoverart", true) viper.SetDefault("autotranscodedownload", false) - viper.SetDefault("maxconcurrenttranscodes", 4) + viper.SetDefault("maxconcurrenttranscodes", max(4, runtime.NumCPU())) viper.SetDefault("defaultdownsamplingformat", consts.DefaultDownsamplingFormat) viper.SetDefault("search.fullstring", false) viper.SetDefault("search.backend", "fts") From c1f12d148dcdda283f479f7b84ddeb749620be99 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:35:32 -0400 Subject: [PATCH 11/11] test(stream): replace time.Sleep with Eventually assertion in throttle test --- core/stream/throttle_test.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/core/stream/throttle_test.go b/core/stream/throttle_test.go index 719e9ba5e..b8b073d08 100644 --- a/core/stream/throttle_test.go +++ b/core/stream/throttle_test.go @@ -16,10 +16,10 @@ var _ = Describe("TranscodingThrottle", func() { Describe("Acquire/Release", func() { It("allows up to maxConcurrent acquires", func() { t := newTranscodingThrottle(2, 10, time.Second) - Expect(t.Acquire(context.Background())).To(Succeed()) - Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) // Third should block, so test it doesn't return immediately - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + ctx, cancel := context.WithTimeout(GinkgoT().Context(), 50*time.Millisecond) defer cancel() err := t.Acquire(ctx) Expect(err).To(MatchError(ErrTranscodingBusy)) @@ -27,30 +27,26 @@ var _ = Describe("TranscodingThrottle", func() { It("releases a slot and allows new acquire", func() { t := newTranscodingThrottle(1, 10, time.Second) - Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) t.Release() - Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) }) It("returns ErrTranscodingBusy when backlog limit is reached", func() { t := newTranscodingThrottle(1, 2, 5*time.Second) // Fill the slot - Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) // Fill the backlog (2 waiters) — they block in goroutines var wg sync.WaitGroup for i := 0; i < 2; i++ { - wg.Add(1) - go func() { - defer wg.Done() - _ = t.Acquire(context.Background()) - }() + wg.Go(func() { _ = t.Acquire(GinkgoT().Context()) }) } - // Give goroutines time to enter backlog - time.Sleep(50 * time.Millisecond) + // Wait until both goroutines are in the backlog + Eventually(func() int64 { return t.backlog.Load() }).Should(BeNumerically(">=", 2)) // Third waiter should be rejected immediately (backlog full) - err := t.Acquire(context.Background()) + err := t.Acquire(GinkgoT().Context()) Expect(err).To(MatchError(ErrTranscodingBusy)) // Clean up: release all @@ -62,15 +58,14 @@ var _ = Describe("TranscodingThrottle", func() { It("returns ErrTranscodingBusy when timeout expires", func() { t := newTranscodingThrottle(1, 10, 50*time.Millisecond) - Expect(t.Acquire(context.Background())).To(Succeed()) - err := t.Acquire(context.Background()) - Expect(err).To(MatchError(ErrTranscodingBusy)) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(MatchError(ErrTranscodingBusy)) }) It("respects context cancellation", func() { t := newTranscodingThrottle(1, 10, 5*time.Second) - Expect(t.Acquire(context.Background())).To(Succeed()) - ctx, cancel := context.WithCancel(context.Background()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) + ctx, cancel := context.WithCancel(GinkgoT().Context()) cancel() err := t.Acquire(ctx) Expect(err).To(MatchError(ErrTranscodingBusy)) @@ -79,7 +74,7 @@ var _ = Describe("TranscodingThrottle", func() { It("is disabled when maxConcurrent is 0", func() { t := newTranscodingThrottle(0, 10, time.Second) for i := 0; i < 100; i++ { - Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(GinkgoT().Context())).To(Succeed()) } }) })