From c7865ff15932a091bc80cdf96d9499d71842ce31 Mon Sep 17 00:00:00 2001 From: Deluan Date: Wed, 25 Mar 2026 08:17:20 -0400 Subject: [PATCH] fix(stream): make TranscodingThrottle a singleton and optimize hot path - Use sync.Once to ensure a single shared throttle instance across all wire-injected consumers (subsonic + public routers) - Optimize Acquire() fast path: TryAcquire before touching the atomic backlog counter, avoiding unnecessary atomic ops when slots are free refactor(stream): use singleton package for TranscodingThrottle --- core/stream/throttle.go | 42 ++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/core/stream/throttle.go b/core/stream/throttle.go index d9386e7b9..b28a75479 100644 --- a/core/stream/throttle.go +++ b/core/stream/throttle.go @@ -10,6 +10,7 @@ import ( "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/log" + "github.com/navidrome/navidrome/utils/singleton" "golang.org/x/sync/semaphore" ) @@ -46,6 +47,12 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error { return nil } + // Fast path: try to acquire without touching the backlog counter + if t.sem.TryAcquire(1) { + return nil + } + + // Slow path: semaphore is full, enter backlog queue // Increment-then-check-then-rollback to avoid TOCTOU race current := t.backlog.Add(1) if current > t.maxBacklog { @@ -54,20 +61,15 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error { return ErrTranscodingBusy } - if !t.sem.TryAcquire(1) { - log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) - ctx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - err := t.sem.Acquire(ctx, 1) - t.backlog.Add(-1) - if err != nil { - log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") - return ErrTranscodingBusy - } - return nil - } - + log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + err := t.sem.Acquire(ctx, 1) t.backlog.Add(-1) + if err != nil { + log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") + return ErrTranscodingBusy + } return nil } @@ -92,11 +94,13 @@ func (r *releaseOnClose) Close() error { return err } -// GetTranscodingThrottle creates a TranscodingThrottle from the current configuration. +// GetTranscodingThrottle returns a singleton TranscodingThrottle created from the current configuration. func GetTranscodingThrottle() *TranscodingThrottle { - return NewTranscodingThrottle( - conf.Server.MaxConcurrentTranscodes, - conf.Server.DevTranscodeThrottleBacklogLimit, - conf.Server.DevTranscodeThrottleBacklogTimeout, - ) + return singleton.GetInstance(func() *TranscodingThrottle { + return NewTranscodingThrottle( + conf.Server.MaxConcurrentTranscodes, + conf.Server.DevTranscodeThrottleBacklogLimit, + conf.Server.DevTranscodeThrottleBacklogTimeout, + ) + }) }