iceberg: validate filer failover targets (#8637)

* iceberg: validate filer failover targets

* iceberg: tighten filer liveness checks

* iceberg: relax filer test readiness deadline
This commit is contained in:
Chris Lu
2026-03-15 17:45:55 -07:00
committed by GitHub
parent b868980260
commit 0afc675a55
2 changed files with 102 additions and 9 deletions

View File

@@ -5,8 +5,10 @@ import (
"fmt"
"path"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
@@ -31,6 +33,8 @@ type Handler struct {
grpcDialOption grpc.DialOption
}
const filerConnectTimeout = 5 * time.Second
// NewHandler creates a new handler for iceberg table maintenance.
func NewHandler(grpcDialOption grpc.DialOption) *Handler {
return &Handler{grpcDialOption: grpcDialOption}
@@ -205,7 +209,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
"min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}},
"min_manifests_to_rewrite": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinManifestsToRewrite}},
"snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}},
@@ -227,7 +231,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
JobTypeMaxRuntimeSeconds: 3600, // 1 hour max
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
"min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}},
"snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}},
"max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}},
@@ -272,7 +276,7 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq
tableFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "table_filter", ""))
// Connect to filer — try each address until one succeeds.
filerAddress, conn, err := h.connectToFiler(filerAddresses)
filerAddress, conn, err := h.connectToFiler(ctx, filerAddresses)
if err != nil {
return fmt.Errorf("connect to filer: %w", err)
}
@@ -382,7 +386,7 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ
}
// Connect to filer
conn, err := grpc.NewClient(filerAddress, h.grpcDialOption)
conn, err := h.dialFiler(ctx, filerAddress)
if err != nil {
return fmt.Errorf("connect to filer %s: %w", filerAddress, err)
}
@@ -488,13 +492,30 @@ func (h *Handler) sendEmptyDetection(sender pluginworker.DetectionSender) error
})
}
func (h *Handler) dialFiler(ctx context.Context, address string) (*grpc.ClientConn, error) {
opCtx, opCancel := context.WithTimeout(ctx, filerConnectTimeout)
defer opCancel()
conn, err := pb.GrpcDial(opCtx, address, false, h.grpcDialOption)
if err != nil {
return nil, err
}
client := filer_pb.NewSeaweedFilerClient(conn)
if _, err := client.Ping(opCtx, &filer_pb.PingRequest{}); err != nil {
_ = conn.Close()
return nil, err
}
return conn, nil
}
// connectToFiler tries each filer address in order and returns the first
// successful gRPC connection. If all addresses fail, it returns a
// consolidated error.
func (h *Handler) connectToFiler(addresses []string) (string, *grpc.ClientConn, error) {
// address whose gRPC connection and Ping request succeed.
func (h *Handler) connectToFiler(ctx context.Context, addresses []string) (string, *grpc.ClientConn, error) {
var lastErr error
for _, addr := range addresses {
conn, err := grpc.NewClient(addr, h.grpcDialOption)
conn, err := h.dialFiler(ctx, addr)
if err != nil {
lastErr = fmt.Errorf("filer %s: %w", addr, err)
continue