mirror of
https://github.com/navidrome/navidrome.git
synced 2026-05-03 06:51:16 +00:00
feat(stream): add TranscodingThrottle with tests
This commit is contained in:
parent
0a0f1779cb
commit
58f7959204
92
core/stream/throttle.go
Normal file
92
core/stream/throttle.go
Normal file
@ -0,0 +1,92 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
// ErrTranscodingBusy is returned when the transcoding throttle rejects a request
|
||||
// because the concurrency limit and backlog are both full.
|
||||
var ErrTranscodingBusy = errors.New("too many concurrent transcodes")
|
||||
|
||||
// TranscodingThrottle limits the number of concurrent transcoding operations.
|
||||
type TranscodingThrottle struct {
|
||||
sem *semaphore.Weighted
|
||||
backlog atomic.Int64
|
||||
maxBacklog int64
|
||||
timeout time.Duration
|
||||
disabled bool
|
||||
}
|
||||
|
||||
// NewTranscodingThrottle creates a throttle that allows maxConcurrent simultaneous
|
||||
// transcodes with maxBacklog queued waiters and the given timeout.
|
||||
// If maxConcurrent is 0, throttling is disabled.
|
||||
func NewTranscodingThrottle(maxConcurrent, maxBacklog int, timeout time.Duration) *TranscodingThrottle {
|
||||
if maxConcurrent <= 0 {
|
||||
return &TranscodingThrottle{disabled: true}
|
||||
}
|
||||
return &TranscodingThrottle{
|
||||
sem: semaphore.NewWeighted(int64(maxConcurrent)),
|
||||
maxBacklog: int64(maxBacklog),
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire blocks until a transcoding slot is available, the backlog is full, or the timeout expires.
|
||||
func (t *TranscodingThrottle) Acquire(ctx context.Context) error {
|
||||
if t.disabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Increment-then-check-then-rollback to avoid TOCTOU race
|
||||
current := t.backlog.Add(1)
|
||||
if current > t.maxBacklog {
|
||||
t.backlog.Add(-1)
|
||||
log.Warn(ctx, "Transcoding request rejected, throttle backlog full", "backlog", current-1)
|
||||
return ErrTranscodingBusy
|
||||
}
|
||||
|
||||
if !t.sem.TryAcquire(1) {
|
||||
log.Info(ctx, "Transcoding request queued, waiting for slot", "backlog", current)
|
||||
ctx, cancel := context.WithTimeout(ctx, t.timeout)
|
||||
defer cancel()
|
||||
err := t.sem.Acquire(ctx, 1)
|
||||
t.backlog.Add(-1)
|
||||
if err != nil {
|
||||
log.Warn(ctx, "Transcoding request rejected, timeout waiting for slot")
|
||||
return ErrTranscodingBusy
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
t.backlog.Add(-1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release frees a transcoding slot.
|
||||
func (t *TranscodingThrottle) Release() {
|
||||
if t.disabled {
|
||||
return
|
||||
}
|
||||
t.sem.Release(1)
|
||||
}
|
||||
|
||||
// releaseOnClose wraps a ReadCloser to call a release function exactly once on Close.
|
||||
type releaseOnClose struct {
|
||||
io.ReadCloser
|
||||
release func()
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (r *releaseOnClose) Close() error {
|
||||
err := r.ReadCloser.Close()
|
||||
r.once.Do(r.release)
|
||||
return err
|
||||
}
|
||||
114
core/stream/throttle_test.go
Normal file
114
core/stream/throttle_test.go
Normal file
@ -0,0 +1,114 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("TranscodingThrottle", func() {
|
||||
Describe("Acquire/Release", func() {
|
||||
It("allows up to maxConcurrent acquires", func() {
|
||||
t := NewTranscodingThrottle(2, 10, time.Second)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
// Third should block, so test it doesn't return immediately
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||||
defer cancel()
|
||||
err := t.Acquire(ctx)
|
||||
Expect(err).To(MatchError(ErrTranscodingBusy))
|
||||
})
|
||||
|
||||
It("releases a slot and allows new acquire", func() {
|
||||
t := NewTranscodingThrottle(1, 10, time.Second)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
t.Release()
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
})
|
||||
|
||||
It("returns ErrTranscodingBusy when backlog limit is reached", func() {
|
||||
t := NewTranscodingThrottle(1, 2, 5*time.Second)
|
||||
// Fill the slot
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
|
||||
// Fill the backlog (2 waiters) — they block in goroutines
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 2; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = t.Acquire(context.Background())
|
||||
}()
|
||||
}
|
||||
// Give goroutines time to enter backlog
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Third waiter should be rejected immediately (backlog full)
|
||||
err := t.Acquire(context.Background())
|
||||
Expect(err).To(MatchError(ErrTranscodingBusy))
|
||||
|
||||
// Clean up: release all
|
||||
t.Release()
|
||||
t.Release()
|
||||
t.Release()
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
It("returns ErrTranscodingBusy when timeout expires", func() {
|
||||
t := NewTranscodingThrottle(1, 10, 50*time.Millisecond)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
err := t.Acquire(context.Background())
|
||||
Expect(err).To(MatchError(ErrTranscodingBusy))
|
||||
})
|
||||
|
||||
It("respects context cancellation", func() {
|
||||
t := NewTranscodingThrottle(1, 10, 5*time.Second)
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
err := t.Acquire(ctx)
|
||||
Expect(err).To(MatchError(ErrTranscodingBusy))
|
||||
})
|
||||
|
||||
It("is disabled when maxConcurrent is 0", func() {
|
||||
t := NewTranscodingThrottle(0, 10, time.Second)
|
||||
for i := 0; i < 100; i++ {
|
||||
Expect(t.Acquire(context.Background())).To(Succeed())
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("releaseOnClose", func() {
|
||||
It("calls release exactly once on Close", func() {
|
||||
var count int
|
||||
rc := &releaseOnClose{
|
||||
ReadCloser: io.NopCloser(strings.NewReader("data")),
|
||||
release: func() { count++ },
|
||||
}
|
||||
Expect(rc.Close()).To(Succeed())
|
||||
Expect(rc.Close()).To(Succeed()) // double close
|
||||
Expect(count).To(Equal(1))
|
||||
})
|
||||
|
||||
It("propagates close error from underlying ReadCloser", func() {
|
||||
rc := &releaseOnClose{
|
||||
ReadCloser: &failCloser{},
|
||||
release: func() {},
|
||||
}
|
||||
err := rc.Close()
|
||||
Expect(err).To(MatchError("close failed"))
|
||||
})
|
||||
})
|
||||
|
||||
// failCloser is a ReadCloser whose Close always returns an error
|
||||
type failCloser struct{ io.Reader }
|
||||
|
||||
func (f *failCloser) Read(p []byte) (int, error) { return 0, io.EOF }
|
||||
func (f *failCloser) Close() error { return errors.New("close failed") }
|
||||
Loading…
x
Reference in New Issue
Block a user