diff --git a/core/stream/throttle.go b/core/stream/throttle.go new file mode 100644 index 000000000..acf219dc8 --- /dev/null +++ b/core/stream/throttle.go @@ -0,0 +1,92 @@ +package stream + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/navidrome/navidrome/log" + "golang.org/x/sync/semaphore" +) + +// ErrTranscodingBusy is returned when the transcoding throttle rejects a request +// because the concurrency limit and backlog are both full. +var ErrTranscodingBusy = errors.New("too many concurrent transcodes") + +// TranscodingThrottle limits the number of concurrent transcoding operations. +type TranscodingThrottle struct { + sem *semaphore.Weighted + backlog atomic.Int64 + maxBacklog int64 + timeout time.Duration + disabled bool +} + +// NewTranscodingThrottle creates a throttle that allows maxConcurrent simultaneous +// transcodes with maxBacklog queued waiters and the given timeout. +// If maxConcurrent is 0, throttling is disabled. +func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *TranscodingThrottle { + if maxConcurrent <= 0 { + return &TranscodingThrottle{disabled: true} + } + return &TranscodingThrottle{ + sem: semaphore.NewWeighted(int64(maxConcurrent)), + maxBacklog: int64(maxBacklog), + timeout: timeout, + } +} + +// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires. +func (t *TranscodingThrottle) Acquire(ctx context.Context) error { + if t.disabled { + return nil + } + + // Increment-then-check-then-rollback to avoid TOCTOU race + current := t.backlog.Add(1) + if current > t.maxBacklog { + t.backlog.Add(-1) + log.Warn(ctx, "Transcoding request rejected, throttle backlog full", "backlog", current-1) + return ErrTranscodingBusy + } + + if !t.sem.TryAcquire(1) { + log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + err := t.sem.Acquire(ctx, 1) + t.backlog.Add(-1) + if err != nil { + log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot") + return ErrTranscodingBusy + } + return nil + } + + t.backlog.Add(-1) + return nil +} + +// Release frees a transcoding slot. +func (t *TranscodingThrottle) Release() { + if t.disabled { + return + } + t.sem.Release(1) +} + +// releaseOnClose wraps a ReadCloser to call a release function exactly once on Close. +type releaseOnClose struct { + io.ReadCloser + release func() + once sync.Once +} + +func (r *releaseOnClose) Close() error { + err := r.ReadCloser.Close() + r.once.Do(r.release) + return err +} diff --git a/core/stream/throttle_test.go b/core/stream/throttle_test.go new file mode 100644 index 000000000..1c8c7a40d --- /dev/null +++ b/core/stream/throttle_test.go @@ -0,0 +1,114 @@ +package stream + +import ( + "context" + "errors" + "io" + "strings" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("TranscodingThrottle", func() { + Describe("Acquire/Release", func() { + It("allows up to maxConcurrent acquires", func() { + t := NewTranscodingThrottle(2, 10, time.Second) + Expect(t.Acquire(context.Background())).To(Succeed()) + Expect(t.Acquire(context.Background())).To(Succeed()) + // Third should block, so test it doesn't return immediately + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err := t.Acquire(ctx) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("releases a slot and allows new acquire", func() { + t := NewTranscodingThrottle(1, 10, time.Second) + Expect(t.Acquire(context.Background())).To(Succeed()) + t.Release() + Expect(t.Acquire(context.Background())).To(Succeed()) + }) + + It("returns ErrTranscodingBusy when backlog limit is reached", func() { + t := NewTranscodingThrottle(1, 2, 5*time.Second) + // Fill the slot + Expect(t.Acquire(context.Background())).To(Succeed()) + + // Fill the backlog (2 waiters) — they block in goroutines + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = t.Acquire(context.Background()) + }() + } + // Give goroutines time to enter backlog + time.Sleep(50 * time.Millisecond) + + // Third waiter should be rejected immediately (backlog full) + err := t.Acquire(context.Background()) + Expect(err).To(MatchError(ErrTranscodingBusy)) + + // Clean up: release all + t.Release() + t.Release() + t.Release() + wg.Wait() + }) + + It("returns ErrTranscodingBusy when timeout expires", func() { + t := NewTranscodingThrottle(1, 10, 50*time.Millisecond) + Expect(t.Acquire(context.Background())).To(Succeed()) + err := t.Acquire(context.Background()) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("respects context cancellation", func() { + t := NewTranscodingThrottle(1, 10, 5*time.Second) + Expect(t.Acquire(context.Background())).To(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := t.Acquire(ctx) + Expect(err).To(MatchError(ErrTranscodingBusy)) + }) + + It("is disabled when maxConcurrent is 0", func() { + t := NewTranscodingThrottle(0, 10, time.Second) + for i := 0; i < 100; i++ { + Expect(t.Acquire(context.Background())).To(Succeed()) + } + }) + }) +}) + +var _ = Describe("releaseOnClose", func() { + It("calls release exactly once on Close", func() { + var count int + rc := &releaseOnClose{ + ReadCloser: io.NopCloser(strings.NewReader("data")), + release: func() { count++ }, + } + Expect(rc.Close()).To(Succeed()) + Expect(rc.Close()).To(Succeed()) // double close + Expect(count).To(Equal(1)) + }) + + It("propagates close error from underlying ReadCloser", func() { + rc := &releaseOnClose{ + ReadCloser: &failCloser{}, + release: func() {}, + } + err := rc.Close() + Expect(err).To(MatchError("close failed")) + }) +}) + +// failCloser is a ReadCloser whose Close always returns an error +type failCloser struct{ io.Reader } + +func (f *failCloser) Read(p []byte) (int, error) { return 0, io.EOF } +func (f *failCloser) Close() error { return errors.New("close failed") }