Prune wdclient Functions (#8855)
* chore(weed/wdclient): prune unused functions * chore(weed/wdclient): prune test-only functions and associated tests * chore(weed/wdclient): remove dead cursor field The cursor field and its initialization are no longer used after the removal of getLocationIndex. --------- Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
@@ -503,55 +503,17 @@ func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) b
|
|||||||
return true // Skip this unhealthy filer
|
return true // Skip this unhealthy filer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead
|
|
||||||
// This function is kept for backward compatibility but requires array access
|
|
||||||
// Note: This function is now thread-safe.
|
|
||||||
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
|
|
||||||
fc.filerAddressesMu.RLock()
|
|
||||||
if index >= int32(len(fc.filerHealth)) {
|
|
||||||
fc.filerAddressesMu.RUnlock()
|
|
||||||
return true // Invalid index - skip
|
|
||||||
}
|
|
||||||
health := fc.filerHealth[index]
|
|
||||||
fc.filerAddressesMu.RUnlock()
|
|
||||||
return fc.shouldSkipUnhealthyFilerWithHealth(health)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recordFilerSuccessWithHealth resets failure tracking for a successful filer
|
// recordFilerSuccessWithHealth resets failure tracking for a successful filer
|
||||||
func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) {
|
func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) {
|
||||||
atomic.StoreInt32(&health.failureCount, 0)
|
atomic.StoreInt32(&health.failureCount, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// recordFilerSuccess resets failure tracking for a successful filer
|
|
||||||
func (fc *FilerClient) recordFilerSuccess(index int32) {
|
|
||||||
fc.filerAddressesMu.RLock()
|
|
||||||
if index >= int32(len(fc.filerHealth)) {
|
|
||||||
fc.filerAddressesMu.RUnlock()
|
|
||||||
return // Invalid index
|
|
||||||
}
|
|
||||||
health := fc.filerHealth[index]
|
|
||||||
fc.filerAddressesMu.RUnlock()
|
|
||||||
fc.recordFilerSuccessWithHealth(health)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recordFilerFailureWithHealth increments failure count for an unhealthy filer
|
// recordFilerFailureWithHealth increments failure count for an unhealthy filer
|
||||||
func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) {
|
func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) {
|
||||||
atomic.AddInt32(&health.failureCount, 1)
|
atomic.AddInt32(&health.failureCount, 1)
|
||||||
atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
|
atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
// recordFilerFailure increments failure count for an unhealthy filer
|
|
||||||
func (fc *FilerClient) recordFilerFailure(index int32) {
|
|
||||||
fc.filerAddressesMu.RLock()
|
|
||||||
if index >= int32(len(fc.filerHealth)) {
|
|
||||||
fc.filerAddressesMu.RUnlock()
|
|
||||||
return // Invalid index
|
|
||||||
}
|
|
||||||
health := fc.filerHealth[index]
|
|
||||||
fc.filerAddressesMu.RUnlock()
|
|
||||||
fc.recordFilerFailureWithHealth(health)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LookupVolumeIds queries the filer for volume locations with automatic failover
|
// LookupVolumeIds queries the filer for volume locations with automatic failover
|
||||||
// Tries all configured filer addresses until one succeeds (high availability)
|
// Tries all configured filer addresses until one succeeds (high availability)
|
||||||
// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
|
// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
|
||||||
|
|||||||
@@ -15,10 +15,6 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
maxCursorIndex = 4096
|
|
||||||
)
|
|
||||||
|
|
||||||
type HasLookupFileIdFunction interface {
|
type HasLookupFileIdFunction interface {
|
||||||
GetLookupFileIdFunction() LookupFileIdFunctionType
|
GetLookupFileIdFunction() LookupFileIdFunctionType
|
||||||
}
|
}
|
||||||
@@ -41,7 +37,6 @@ type vidMap struct {
|
|||||||
vid2Locations map[uint32][]Location
|
vid2Locations map[uint32][]Location
|
||||||
ecVid2Locations map[uint32][]Location
|
ecVid2Locations map[uint32][]Location
|
||||||
DataCenter string
|
DataCenter string
|
||||||
cursor int32
|
|
||||||
cache atomic.Pointer[vidMap]
|
cache atomic.Pointer[vidMap]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -50,20 +45,9 @@ func newVidMap(dataCenter string) *vidMap {
|
|||||||
vid2Locations: make(map[uint32][]Location),
|
vid2Locations: make(map[uint32][]Location),
|
||||||
ecVid2Locations: make(map[uint32][]Location),
|
ecVid2Locations: make(map[uint32][]Location),
|
||||||
DataCenter: dataCenter,
|
DataCenter: dataCenter,
|
||||||
cursor: -1,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vc *vidMap) getLocationIndex(length int) (int, error) {
|
|
||||||
if length <= 0 {
|
|
||||||
return 0, fmt.Errorf("invalid length: %d", length)
|
|
||||||
}
|
|
||||||
if atomic.LoadInt32(&vc.cursor) == maxCursorIndex {
|
|
||||||
atomic.CompareAndSwapInt32(&vc.cursor, maxCursorIndex, -1)
|
|
||||||
}
|
|
||||||
return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vc *vidMap) isSameDataCenter(loc *Location) bool {
|
func (vc *vidMap) isSameDataCenter(loc *Location) bool {
|
||||||
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
|
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package wdclient
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -12,60 +11,6 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLocationIndex(t *testing.T) {
|
|
||||||
vm := &vidMap{}
|
|
||||||
// test must be failed
|
|
||||||
mustFailed := func(length int) {
|
|
||||||
_, err := vm.getLocationIndex(length)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("length %d must be failed", length)
|
|
||||||
}
|
|
||||||
if err.Error() != fmt.Sprintf("invalid length: %d", length) {
|
|
||||||
t.Errorf("length %d must be failed. error: %v", length, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mustFailed(-1)
|
|
||||||
mustFailed(0)
|
|
||||||
|
|
||||||
mustOk := func(length, cursor, expect int) {
|
|
||||||
if length <= 0 {
|
|
||||||
t.Fatal("please don't do this")
|
|
||||||
}
|
|
||||||
vm.cursor = int32(cursor)
|
|
||||||
got, err := vm.getLocationIndex(length)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("length: %d, why? %v\n", length, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if got != expect {
|
|
||||||
t.Errorf("cursor: %d, length: %d, expect: %d, got: %d\n", cursor, length, expect, got)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := -1; i < 100; i++ {
|
|
||||||
mustOk(7, i, (i+1)%7)
|
|
||||||
}
|
|
||||||
|
|
||||||
// when cursor reaches MaxInt64
|
|
||||||
mustOk(7, maxCursorIndex, 0)
|
|
||||||
|
|
||||||
// test with constructor
|
|
||||||
vm = newVidMap("")
|
|
||||||
length := 7
|
|
||||||
for i := 0; i < 100; i++ {
|
|
||||||
got, err := vm.getLocationIndex(length)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("length: %d, why? %v\n", length, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if got != i%length {
|
|
||||||
t.Errorf("length: %d, i: %d, got: %d\n", length, i, got)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLookupFileId(t *testing.T) {
|
func TestLookupFileId(t *testing.T) {
|
||||||
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
|
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
|
||||||
length := 5
|
length := 5
|
||||||
@@ -172,19 +117,3 @@ func TestConcurrentGetLocations(t *testing.T) {
|
|||||||
cancel()
|
cancel()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkLocationIndex(b *testing.B) {
|
|
||||||
b.SetParallelism(8)
|
|
||||||
vm := vidMap{
|
|
||||||
cursor: maxCursorIndex - 4000,
|
|
||||||
}
|
|
||||||
b.ResetTimer()
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
|
||||||
for pb.Next() {
|
|
||||||
_, err := vm.getLocationIndex(3)
|
|
||||||
if err != nil {
|
|
||||||
b.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user