test(stream): replace time.Sleep with Eventually assertion in throttle test

This commit is contained in:
Deluan 2026-03-25 08:35:32 -04:00
parent 6fe2b76ac1
commit c1f12d148d

View File

@ -16,10 +16,10 @@ var _ = Describe("TranscodingThrottle", func() {
Describe("Acquire/Release", func() { Describe("Acquire/Release", func() {
It("allows up to maxConcurrent acquires", func() { It("allows up to maxConcurrent acquires", func() {
t := newTranscodingThrottle(2, 10, time.Second) t := newTranscodingThrottle(2, 10, time.Second)
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
// Third should block, so test it doesn't return immediately // Third should block, so test it doesn't return immediately
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) ctx, cancel := context.WithTimeout(GinkgoT().Context(), 50*time.Millisecond)
defer cancel() defer cancel()
err := t.Acquire(ctx) err := t.Acquire(ctx)
Expect(err).To(MatchError(ErrTranscodingBusy)) Expect(err).To(MatchError(ErrTranscodingBusy))
@ -27,30 +27,26 @@ var _ = Describe("TranscodingThrottle", func() {
It("releases a slot and allows new acquire", func() { It("releases a slot and allows new acquire", func() {
t := newTranscodingThrottle(1, 10, time.Second) t := newTranscodingThrottle(1, 10, time.Second)
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
t.Release() t.Release()
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
}) })
It("returns ErrTranscodingBusy when backlog limit is reached", func() { It("returns ErrTranscodingBusy when backlog limit is reached", func() {
t := newTranscodingThrottle(1, 2, 5*time.Second) t := newTranscodingThrottle(1, 2, 5*time.Second)
// Fill the slot // Fill the slot
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
// Fill the backlog (2 waiters) — they block in goroutines // Fill the backlog (2 waiters) — they block in goroutines
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
wg.Add(1) wg.Go(func() { _ = t.Acquire(GinkgoT().Context()) })
go func() {
defer wg.Done()
_ = t.Acquire(context.Background())
}()
} }
// Give goroutines time to enter backlog // Wait until both goroutines are in the backlog
time.Sleep(50 * time.Millisecond) Eventually(func() int64 { return t.backlog.Load() }).Should(BeNumerically(">=", 2))
// Third waiter should be rejected immediately (backlog full) // Third waiter should be rejected immediately (backlog full)
err := t.Acquire(context.Background()) err := t.Acquire(GinkgoT().Context())
Expect(err).To(MatchError(ErrTranscodingBusy)) Expect(err).To(MatchError(ErrTranscodingBusy))
// Clean up: release all // Clean up: release all
@ -62,15 +58,14 @@ var _ = Describe("TranscodingThrottle", func() {
It("returns ErrTranscodingBusy when timeout expires", func() { It("returns ErrTranscodingBusy when timeout expires", func() {
t := newTranscodingThrottle(1, 10, 50*time.Millisecond) t := newTranscodingThrottle(1, 10, 50*time.Millisecond)
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
err := t.Acquire(context.Background()) Expect(t.Acquire(GinkgoT().Context())).To(MatchError(ErrTranscodingBusy))
Expect(err).To(MatchError(ErrTranscodingBusy))
}) })
It("respects context cancellation", func() { It("respects context cancellation", func() {
t := newTranscodingThrottle(1, 10, 5*time.Second) t := newTranscodingThrottle(1, 10, 5*time.Second)
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(GinkgoT().Context())
cancel() cancel()
err := t.Acquire(ctx) err := t.Acquire(ctx)
Expect(err).To(MatchError(ErrTranscodingBusy)) Expect(err).To(MatchError(ErrTranscodingBusy))
@ -79,7 +74,7 @@ var _ = Describe("TranscodingThrottle", func() {
It("is disabled when maxConcurrent is 0", func() { It("is disabled when maxConcurrent is 0", func() {
t := newTranscodingThrottle(0, 10, time.Second) t := newTranscodingThrottle(0, 10, time.Second)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
Expect(t.Acquire(context.Background())).To(Succeed()) Expect(t.Acquire(GinkgoT().Context())).To(Succeed())
} }
}) })
}) })