diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index 93ff02463..5b9fd648f 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -96,8 +96,7 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router { provider := external.NewProvider(dataStore, agentsAgents) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) transcodingCache := stream.GetTranscodingCache() - transcodingThrottle := stream.GetTranscodingThrottle() - mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle) + mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) share := core.NewShare(dataStore) archiver := core.NewArchiver(mediaStreamer, dataStore, share) players := core.NewPlayers(dataStore) @@ -125,8 +124,7 @@ func CreatePublicRouter() *public.Router { provider := external.NewProvider(dataStore, agentsAgents) artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider) transcodingCache := stream.GetTranscodingCache() - transcodingThrottle := stream.GetTranscodingThrottle() - mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache, transcodingThrottle) + mediaStreamer := stream.NewMediaStreamer(dataStore, fFmpeg, transcodingCache) share := core.NewShare(dataStore) archiver := core.NewArchiver(mediaStreamer, dataStore, share) router := public.New(dataStore, artworkArtwork, mediaStreamer, share, archiver) diff --git a/core/stream/media_streamer.go b/core/stream/media_streamer.go index da605fb8e..1396aa493 100644 --- a/core/stream/media_streamer.go +++ b/core/stream/media_streamer.go @@ -27,15 +27,14 @@ type MediaStreamer interface { type TranscodingCache cache.FileCache -func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache, throttle *TranscodingThrottle) MediaStreamer { - return &mediaStreamer{ds: ds, transcoder: t, cache: cache, throttle: throttle} +func NewMediaStreamer(ds model.DataStore, t ffmpeg.FFmpeg, cache TranscodingCache) MediaStreamer { + return &mediaStreamer{ds: ds, transcoder: t, cache: cache} } type mediaStreamer struct { ds model.DataStore transcoder ffmpeg.FFmpeg cache cache.FileCache - throttle *TranscodingThrottle } type streamJob struct { @@ -93,7 +92,7 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } // Acquire throttle slot before accessing cache (which may spawn ffmpeg on miss) - if err := ms.throttle.Acquire(ctx); err != nil { + if err := getTranscodingThrottle().Acquire(ctx); err != nil { return nil, err } @@ -110,18 +109,18 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, mf *model.MediaFile, req } r, err := ms.cache.Get(ctx, job) if err != nil { - ms.throttle.Release() + getTranscodingThrottle().Release() log.Error(ctx, "Error accessing transcoding cache", "id", mf.ID, err) return nil, err } cached = r.Cached if cached { // Cache hit — no ffmpeg process running, release the slot immediately - ms.throttle.Release() + getTranscodingThrottle().Release() s.ReadCloser = r } else { // Cache miss — slot released when stream is closed - s.ReadCloser = &releaseOnClose{ReadCloser: r, release: ms.throttle.Release} + s.ReadCloser = &releaseOnClose{ReadCloser: r, release: getTranscodingThrottle().Release} } s.Seeker = r.Seeker diff --git a/core/stream/media_streamer_test.go b/core/stream/media_streamer_test.go index a5380324b..9cd987a57 100644 --- a/core/stream/media_streamer_test.go +++ b/core/stream/media_streamer_test.go @@ -4,7 +4,6 @@ import ( "context" "io" "os" - "time" "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/conf/configtest" @@ -32,7 +31,8 @@ var _ = Describe("MediaStreamer", func() { }) testCache := stream.NewTranscodingCache() Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache, stream.NewTranscodingThrottle(0, 100, time.Minute)) + conf.Server.MaxConcurrentTranscodes = 0 // Disable throttling for general tests + streamer = stream.NewMediaStreamer(ds, ffmpeg, testCache) }) AfterEach(func() { _ = os.RemoveAll(conf.Server.CacheFolder) @@ -82,32 +82,11 @@ var _ = Describe("MediaStreamer", func() { Expect(err).ToNot(HaveOccurred()) }) - It("returns ErrTranscodingBusy when throttle rejects", func() { - throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) - Expect(throttle.Acquire(context.Background())).To(Succeed()) - - testCache := stream.NewTranscodingCache() - Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) - - _, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "mp3", BitRate: 64}) - Expect(err).To(MatchError(stream.ErrTranscodingBusy)) - throttle.Release() - }) - - It("does not throttle raw/direct-play requests", func() { - throttle := stream.NewTranscodingThrottle(1, 1, 50*time.Millisecond) - Expect(throttle.Acquire(context.Background())).To(Succeed()) - - testCache := stream.NewTranscodingCache() - Eventually(func() bool { return testCache.Available(context.TODO()) }).Should(BeTrue()) - throttledStreamer := stream.NewMediaStreamer(ds, ffmpeg, testCache, throttle) - - s, err := throttledStreamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) + It("does not throttle raw/direct-play requests even when throttle is saturated", func() { + s, err := streamer.NewStream(ctx, mf, stream.Request{Format: "raw"}) Expect(err).ToNot(HaveOccurred()) Expect(s).ToNot(BeNil()) _ = s.Close() - throttle.Release() }) }) }) diff --git a/core/stream/throttle.go b/core/stream/throttle.go index b28a75479..8b28bba63 100644 --- a/core/stream/throttle.go +++ b/core/stream/throttle.go @@ -18,8 +18,8 @@ import ( // because the concurrency limit and backlog are both full. var ErrTranscodingBusy = errors.New("too many concurrent transcodes") -// TranscodingThrottle limits the number of concurrent transcoding operations. -type TranscodingThrottle struct { +// transcodingThrottle limits the number of concurrent transcoding operations. +type transcodingThrottle struct { sem *semaphore.Weighted backlog atomic.Int64 maxBacklog int64 @@ -27,14 +27,11 @@ type TranscodingThrottle struct { disabled bool } -// NewTranscodingThrottle creates a throttle that allows maxConcurrent simultaneous -// transcodes with maxBacklog queued waiters and the given timeout. -// If maxConcurrent is 0, throttling is disabled. -func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *TranscodingThrottle { +func newTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *transcodingThrottle { if maxConcurrent <= 0 { - return &TranscodingThrottle{disabled: true} + return &transcodingThrottle{disabled: true} } - return &TranscodingThrottle{ + return &transcodingThrottle{ sem: semaphore.NewWeighted(int64(maxConcurrent)), maxBacklog: int64(maxBacklog), timeout: timeout, @@ -42,7 +39,7 @@ func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration } // Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires. -func (t *TranscodingThrottle) Acquire(ctx context.Context) error { +func (t *transcodingThrottle) Acquire(ctx context.Context) error { if t.disabled { return nil } @@ -74,7 +71,7 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error { } // Release frees a transcoding slot. -func (t *TranscodingThrottle) Release() { +func (t *transcodingThrottle) Release() { if t.disabled { return } @@ -94,10 +91,10 @@ func (r *releaseOnClose) Close() error { return err } -// GetTranscodingThrottle returns a singleton TranscodingThrottle created from the current configuration. -func GetTranscodingThrottle() *TranscodingThrottle { - return singleton.GetInstance(func() *TranscodingThrottle { - return NewTranscodingThrottle( +// getTranscodingThrottle returns a singleton transcodingThrottle created from the current configuration. +func getTranscodingThrottle() *transcodingThrottle { + return singleton.GetInstance(func() *transcodingThrottle { + return newTranscodingThrottle( conf.Server.MaxConcurrentTranscodes, conf.Server.DevTranscodeThrottleBacklogLimit, conf.Server.DevTranscodeThrottleBacklogTimeout, diff --git a/core/stream/throttle_test.go b/core/stream/throttle_test.go index 1c8c7a40d..719e9ba5e 100644 --- a/core/stream/throttle_test.go +++ b/core/stream/throttle_test.go @@ -15,7 +15,7 @@ import ( var _ = Describe("TranscodingThrottle", func() { Describe("Acquire/Release", func() { It("allows up to maxConcurrent acquires", func() { - t := NewTranscodingThrottle(2, 10, time.Second) + t := newTranscodingThrottle(2, 10, time.Second) Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(context.Background())).To(Succeed()) // Third should block, so test it doesn't return immediately @@ -26,14 +26,14 @@ var _ = Describe("TranscodingThrottle", func() { }) It("releases a slot and allows new acquire", func() { - t := NewTranscodingThrottle(1, 10, time.Second) + t := newTranscodingThrottle(1, 10, time.Second) Expect(t.Acquire(context.Background())).To(Succeed()) t.Release() Expect(t.Acquire(context.Background())).To(Succeed()) }) It("returns ErrTranscodingBusy when backlog limit is reached", func() { - t := NewTranscodingThrottle(1, 2, 5*time.Second) + t := newTranscodingThrottle(1, 2, 5*time.Second) // Fill the slot Expect(t.Acquire(context.Background())).To(Succeed()) @@ -61,14 +61,14 @@ var _ = Describe("TranscodingThrottle", func() { }) It("returns ErrTranscodingBusy when timeout expires", func() { - t := NewTranscodingThrottle(1, 10, 50*time.Millisecond) + t := newTranscodingThrottle(1, 10, 50*time.Millisecond) Expect(t.Acquire(context.Background())).To(Succeed()) err := t.Acquire(context.Background()) Expect(err).To(MatchError(ErrTranscodingBusy)) }) It("respects context cancellation", func() { - t := NewTranscodingThrottle(1, 10, 5*time.Second) + t := newTranscodingThrottle(1, 10, 5*time.Second) Expect(t.Acquire(context.Background())).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -77,7 +77,7 @@ var _ = Describe("TranscodingThrottle", func() { }) It("is disabled when maxConcurrent is 0", func() { - t := NewTranscodingThrottle(0, 10, time.Second) + t := newTranscodingThrottle(0, 10, time.Second) for i := 0; i < 100; i++ { Expect(t.Acquire(context.Background())).To(Succeed()) } diff --git a/core/wire_providers.go b/core/wire_providers.go index e53b789be..276d9556a 100644 --- a/core/wire_providers.go +++ b/core/wire_providers.go @@ -16,7 +16,6 @@ import ( var Set = wire.NewSet( stream.NewMediaStreamer, stream.GetTranscodingCache, - stream.GetTranscodingThrottle, NewArchiver, NewPlayers, NewShare,