SIGN IN SIGN UP
etcd-io / etcd UNCLAIMED

Distributed reliable key-value store for the most critical data of a distributed system

// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"sync/atomic"
"testing"
"time"
)
const (
pollPeriod = 1 * time.Millisecond
minimalNoChange = 20 * time.Millisecond
pollTimeout = 5 * time.Second
)
// Copied from https://github.com/kubernetes/kubernetes/blob/4116c15/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress_test.go
func TestConditionalProgressRequestor(t *testing.T) {
ctx := context.Background()
fc := newFakeClock()
pr := newTestConditionalProgressRequestor(fc, 100*time.Millisecond)
stopCtx, cancel := context.WithCancel(ctx)
defer cancel()
go pr.run(stopCtx)
var wantRequestsSent int32
var requestsSent int32
t.Log("Wait for goroutine to start")
time.Sleep(10 * time.Millisecond)
t.Log("No progress requests if no-one is waiting")
fc.Advance(250 * time.Millisecond)
if err := pollConditionNoChange(func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent)
}
t.Log("Adding waiters allows progress request to be sent")
pr.add()
fc.Advance(150 * time.Millisecond)
wantRequestsSent++
if err := pollConditionNoChange(func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent)
}
t.Log("Periodically request progress to be sent every period")
for wantRequestsSent < 5 {
fc.Advance(100 * time.Millisecond)
wantRequestsSent++
if err := pollConditionNoChange(func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent)
}
}
pr.remove()
t.Log("No progress requests if no-one is waiting")
fc.Advance(250 * time.Millisecond)
if err := pollConditionNoChange(func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent)
}
t.Log("No progress after stopping")
cancel()
fc.Advance(50 * time.Millisecond)
if err := pollConditionNoChange(func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent)
}
pr.add()
fc.Advance(250 * time.Millisecond)
if err := pollConditionNoChange(func() bool {
requestsSent = pr.progressRequestsSentCount.Load()
return requestsSent == wantRequestsSent
}); err != nil {
t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent)
}
}
func newTestConditionalProgressRequestor(clock Clock, requestInterval time.Duration) *testConditionalProgressRequestor {
pr := &testConditionalProgressRequestor{}
pr.conditionalProgressRequestor = newConditionalProgressRequestor(pr, clock, requestInterval)
return pr
}
type testConditionalProgressRequestor struct {
*conditionalProgressRequestor
progressRequestsSentCount atomic.Int32
}
func (pr *testConditionalProgressRequestor) RequestProgress(ctx context.Context) error {
pr.progressRequestsSentCount.Add(1)
return nil
}
func pollConditionNoChange(condition func() bool) error {
passCounter := 0
requiredNumberOfPasses := int(minimalNoChange/pollPeriod) + 1
deadline := time.Now().Add(pollTimeout)
ticker := time.NewTicker(pollPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if condition() {
passCounter++
} else {
passCounter = 0
}
if passCounter >= requiredNumberOfPasses {
return nil
}
if time.Now().After(deadline) {
return &timeoutError{}
}
}
}
}
type timeoutError struct{}
func (e *timeoutError) Error() string {
return "timed out waiting for condition to stabilize"
}