mirror of
https://github.com/navidrome/navidrome.git
synced 2026-05-03 06:51:16 +00:00
fix(stream): make TranscodingThrottle a singleton and optimize hot path
- Use sync.Once to ensure a single shared throttle instance across all wire-injected consumers (subsonic + public routers) - Optimize Acquire() fast path: TryAcquire before touching the atomic backlog counter, avoiding unnecessary atomic ops when slots are free refactor(stream): use singleton package for TranscodingThrottle
This commit is contained in:
parent
179ba1e600
commit
c7865ff159
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/navidrome/navidrome/conf"
|
"github.com/navidrome/navidrome/conf"
|
||||||
"github.com/navidrome/navidrome/log"
|
"github.com/navidrome/navidrome/log"
|
||||||
|
"github.com/navidrome/navidrome/utils/singleton"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -46,6 +47,12 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fast path: try to acquire without touching the backlog counter
|
||||||
|
if t.sem.TryAcquire(1) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path: semaphore is full, enter backlog queue
|
||||||
// Increment-then-check-then-rollback to avoid TOCTOU race
|
// Increment-then-check-then-rollback to avoid TOCTOU race
|
||||||
current := t.backlog.Add(1)
|
current := t.backlog.Add(1)
|
||||||
if current > t.maxBacklog {
|
if current > t.maxBacklog {
|
||||||
@ -54,20 +61,15 @@ func (t *TranscodingThrottle) Acquire(ctx context.Context) error {
|
|||||||
return ErrTranscodingBusy
|
return ErrTranscodingBusy
|
||||||
}
|
}
|
||||||
|
|
||||||
if !t.sem.TryAcquire(1) {
|
log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current)
|
||||||
log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current)
|
ctx, cancel := context.WithTimeout(ctx, t.timeout)
|
||||||
ctx, cancel := context.WithTimeout(ctx, t.timeout)
|
defer cancel()
|
||||||
defer cancel()
|
err := t.sem.Acquire(ctx, 1)
|
||||||
err := t.sem.Acquire(ctx, 1)
|
|
||||||
t.backlog.Add(-1)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot")
|
|
||||||
return ErrTranscodingBusy
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
t.backlog.Add(-1)
|
t.backlog.Add(-1)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot")
|
||||||
|
return ErrTranscodingBusy
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,11 +94,13 @@ func (r *releaseOnClose) Close() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTranscodingThrottle creates a TranscodingThrottle from the current configuration.
|
// GetTranscodingThrottle returns a singleton TranscodingThrottle created from the current configuration.
|
||||||
func GetTranscodingThrottle() *TranscodingThrottle {
|
func GetTranscodingThrottle() *TranscodingThrottle {
|
||||||
return NewTranscodingThrottle(
|
return singleton.GetInstance(func() *TranscodingThrottle {
|
||||||
conf.Server.MaxConcurrentTranscodes,
|
return NewTranscodingThrottle(
|
||||||
conf.Server.DevTranscodeThrottleBacklogLimit,
|
conf.Server.MaxConcurrentTranscodes,
|
||||||
conf.Server.DevTranscodeThrottleBacklogTimeout,
|
conf.Server.DevTranscodeThrottleBacklogLimit,
|
||||||
)
|
conf.Server.DevTranscodeThrottleBacklogTimeout,
|
||||||
|
)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user