The GrpcMaxConnectionAge and GrpcMaxConnectionAgeGrace constants have a troubled history - they were removed in 2022 due to gRPC issues, reverted later, and recently re-added. However, they are not essential to the core worker reconnection fix which was solved through proper goroutine ordering. The Docker Swarm DNS handling mentioned in the comments is not critical, and these parameters have caused problems in the past. Removing them simplifies the configuration without losing functionality.
364 lines
12 KiB
Go
364 lines
12 KiB
Go
package pb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/request_id"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
)
|
|
|
|
const (
|
|
Max_Message_Size = 1 << 30 // 1 GB
|
|
|
|
// gRPC keepalive settings - must be consistent between client and server
|
|
GrpcKeepAliveTime = 60 * time.Second // ping interval when no activity
|
|
GrpcKeepAliveTimeout = 20 * time.Second // ping timeout
|
|
)
|
|
|
|
var (
|
|
// cache grpc connections
|
|
grpcClients = make(map[string]*versionedGrpcClient)
|
|
grpcClientsLock sync.Mutex
|
|
)
|
|
|
|
type versionedGrpcClient struct {
|
|
*grpc.ClientConn
|
|
version int
|
|
errCount int
|
|
}
|
|
|
|
func init() {
|
|
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
|
|
http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
|
|
}
|
|
|
|
func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
|
|
var options []grpc.ServerOption
|
|
options = append(options,
|
|
grpc.KeepaliveParams(keepalive.ServerParameters{
|
|
Time: GrpcKeepAliveTime, // server pings client if no activity for this long
|
|
Timeout: GrpcKeepAliveTimeout, // ping timeout
|
|
}),
|
|
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
|
MinTime: GrpcKeepAliveTime, // min time a client should wait before sending a ping
|
|
PermitWithoutStream: true,
|
|
}),
|
|
grpc.MaxRecvMsgSize(Max_Message_Size),
|
|
grpc.MaxSendMsgSize(Max_Message_Size),
|
|
grpc.MaxConcurrentStreams(1000), // Allow more concurrent streams
|
|
grpc.InitialWindowSize(16*1024*1024), // 16MB initial window
|
|
grpc.InitialConnWindowSize(16*1024*1024), // 16MB connection window
|
|
grpc.MaxHeaderListSize(8*1024*1024), // 8MB header list limit
|
|
grpc.UnaryInterceptor(requestIDUnaryInterceptor()),
|
|
)
|
|
for _, opt := range opts {
|
|
if opt != nil {
|
|
options = append(options, opt)
|
|
}
|
|
}
|
|
return grpc.NewServer(options...)
|
|
}
|
|
|
|
func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
|
// opts = append(opts, grpc.WithBlock())
|
|
// opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
|
|
var options []grpc.DialOption
|
|
|
|
options = append(options,
|
|
// grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithDefaultCallOptions(
|
|
grpc.MaxCallSendMsgSize(Max_Message_Size),
|
|
grpc.MaxCallRecvMsgSize(Max_Message_Size),
|
|
grpc.WaitForReady(waitForReady),
|
|
),
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: GrpcKeepAliveTime, // client ping server if no activity for this long
|
|
Timeout: GrpcKeepAliveTimeout, // ping timeout
|
|
PermitWithoutStream: true,
|
|
}))
|
|
for _, opt := range opts {
|
|
if opt != nil {
|
|
options = append(options, opt)
|
|
}
|
|
}
|
|
return grpc.DialContext(ctx, address, options...)
|
|
}
|
|
|
|
func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialOption) (*versionedGrpcClient, error) {
|
|
|
|
grpcClientsLock.Lock()
|
|
defer grpcClientsLock.Unlock()
|
|
|
|
existingConnection, found := grpcClients[address]
|
|
if found {
|
|
glog.V(3).Infof("gRPC cache hit for %s (version %d)", address, existingConnection.version)
|
|
return existingConnection, nil
|
|
}
|
|
|
|
glog.V(2).Infof("Creating new gRPC connection to %s", address)
|
|
ctx := context.Background()
|
|
grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("fail to dial %s: %v", address, err)
|
|
}
|
|
|
|
vgc := &versionedGrpcClient{
|
|
grpcConnection,
|
|
rand.Int(),
|
|
0,
|
|
}
|
|
grpcClients[address] = vgc
|
|
glog.V(2).Infof("New gRPC connection established to %s (version %d)", address, vgc.version)
|
|
|
|
return vgc, nil
|
|
}
|
|
|
|
func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
|
|
return func(
|
|
ctx context.Context,
|
|
req interface{},
|
|
info *grpc.UnaryServerInfo,
|
|
handler grpc.UnaryHandler,
|
|
) (interface{}, error) {
|
|
// Get request ID from incoming metadata
|
|
var reqID string
|
|
if incomingMd, ok := metadata.FromIncomingContext(ctx); ok {
|
|
if idList := incomingMd.Get(request_id.AmzRequestIDHeader); len(idList) > 0 {
|
|
reqID = idList[0]
|
|
}
|
|
}
|
|
if reqID == "" {
|
|
reqID = uuid.New().String()
|
|
}
|
|
|
|
// Store request ID in context for handlers to access
|
|
ctx = request_id.Set(ctx, reqID)
|
|
|
|
// Also set outgoing context so handlers making downstream gRPC calls
|
|
// will automatically propagate the request ID
|
|
ctx = metadata.AppendToOutgoingContext(ctx, request_id.AmzRequestIDHeader, reqID)
|
|
|
|
// Set trailer with request ID for response
|
|
grpc.SetTrailer(ctx, metadata.Pairs(request_id.AmzRequestIDHeader, reqID))
|
|
|
|
return handler(ctx, req)
|
|
}
|
|
}
|
|
|
|
// shouldInvalidateConnection checks if an error indicates the cached connection should be invalidated
|
|
func shouldInvalidateConnection(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
// Check gRPC status codes first (more reliable)
|
|
if s, ok := status.FromError(err); ok {
|
|
code := s.Code()
|
|
switch code {
|
|
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.Internal:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Fall back to string matching for transport-level errors not captured by gRPC codes
|
|
errStr := err.Error()
|
|
errLower := strings.ToLower(errStr)
|
|
return strings.Contains(errLower, "transport") ||
|
|
strings.Contains(errLower, "connection closed") ||
|
|
strings.Contains(errLower, "dns") ||
|
|
strings.Contains(errLower, "connection refused") ||
|
|
strings.Contains(errLower, "no route to host") ||
|
|
strings.Contains(errLower, "network is unreachable") ||
|
|
strings.Contains(errLower, "connection reset")
|
|
}
|
|
|
|
// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
|
|
func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error {
|
|
|
|
if !streamingMode {
|
|
vgc, err := getOrCreateConnection(address, waitForReady, opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("getOrCreateConnection %s: %v", address, err)
|
|
}
|
|
executionErr := fn(vgc.ClientConn)
|
|
if executionErr != nil {
|
|
if shouldInvalidateConnection(executionErr) {
|
|
grpcClientsLock.Lock()
|
|
if t, ok := grpcClients[address]; ok {
|
|
if t.version == vgc.version {
|
|
glog.V(1).Infof("Removing cached gRPC connection to %s due to error: %v", address, executionErr)
|
|
vgc.Close()
|
|
delete(grpcClients, address)
|
|
}
|
|
}
|
|
grpcClientsLock.Unlock()
|
|
}
|
|
}
|
|
return executionErr
|
|
} else {
|
|
ctx := context.Background()
|
|
if signature != 0 {
|
|
// Optimize: Use AppendToOutgoingContext instead of creating new map
|
|
ctx = metadata.AppendToOutgoingContext(ctx, "sw-client-id", fmt.Sprintf("%d", signature))
|
|
}
|
|
grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("fail to dial %s: %v", address, err)
|
|
}
|
|
defer grpcConnection.Close()
|
|
executionErr := fn(grpcConnection)
|
|
if executionErr != nil {
|
|
return executionErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
}
|
|
|
|
func ParseServerAddress(server string, deltaPort int) (newServerAddress string, err error) {
|
|
|
|
host, port, parseErr := hostAndPort(server)
|
|
if parseErr != nil {
|
|
return "", fmt.Errorf("server port parse error: %w", parseErr)
|
|
}
|
|
|
|
newPort := int(port) + deltaPort
|
|
|
|
return util.JoinHostPort(host, newPort), nil
|
|
}
|
|
|
|
func hostAndPort(address string) (host string, port uint64, err error) {
|
|
colonIndex := strings.LastIndex(address, ":")
|
|
if colonIndex < 0 {
|
|
return "", 0, fmt.Errorf("server should have hostname:port format: %v", address)
|
|
}
|
|
port, err = strconv.ParseUint(address[colonIndex+1:], 10, 64)
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf("server port parse error: %w", err)
|
|
}
|
|
|
|
return address[:colonIndex], port, err
|
|
}
|
|
|
|
func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
|
|
|
|
host, port, parseErr := hostAndPort(server)
|
|
if parseErr != nil {
|
|
glog.Fatalf("server address %s parse error: %v", server, parseErr)
|
|
}
|
|
|
|
grpcPort := int(port) + 10000
|
|
|
|
return util.JoinHostPort(host, grpcPort)
|
|
}
|
|
|
|
func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
|
|
host, grpcPort, parseErr := hostAndPort(grpcAddress)
|
|
if parseErr != nil {
|
|
glog.Fatalf("server grpc address %s parse error: %v", grpcAddress, parseErr)
|
|
}
|
|
|
|
port := int(grpcPort) - 10000
|
|
|
|
return util.JoinHostPort(host, port)
|
|
}
|
|
|
|
func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error {
|
|
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := master_pb.NewSeaweedClient(grpcConnection)
|
|
return fn(client)
|
|
}, master.ToGrpcAddress(), waitForReady, grpcDialOption)
|
|
|
|
}
|
|
|
|
func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpcDialOption grpc.DialOption, fn func(client volume_server_pb.VolumeServerClient) error) error {
|
|
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
|
|
return fn(client)
|
|
}, volumeServer.ToGrpcAddress(), false, grpcDialOption)
|
|
|
|
}
|
|
|
|
func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
|
|
|
|
for _, masterGrpcAddress := range masterGrpcAddresses {
|
|
err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := master_pb.NewSeaweedClient(grpcConnection)
|
|
return fn(client)
|
|
}, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
|
|
|
|
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
|
|
return fn(client)
|
|
}, brokerGrpcAddress, false, grpcDialOption)
|
|
|
|
}
|
|
|
|
func WithFilerClient(streamingMode bool, signature int32, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
|
|
|
|
return WithGrpcFilerClient(streamingMode, signature, filer, grpcDialOption, fn)
|
|
|
|
}
|
|
|
|
func WithGrpcFilerClient(streamingMode bool, signature int32, filerAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
|
|
|
|
return WithGrpcClient(streamingMode, signature, func(grpcConnection *grpc.ClientConn) error {
|
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
|
return fn(client)
|
|
}, filerAddress.ToGrpcAddress(), false, grpcDialOption)
|
|
|
|
}
|
|
|
|
func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
|
|
|
|
for _, filerAddress := range filerAddresses {
|
|
err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
|
return fn(client)
|
|
}, filerAddress.ToGrpcAddress(), false, grpcDialOption)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func WithWorkerClient(streamingMode bool, workerAddress string, grpcDialOption grpc.DialOption, fn func(client worker_pb.WorkerServiceClient) error) error {
|
|
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
|
client := worker_pb.NewWorkerServiceClient(grpcConnection)
|
|
return fn(client)
|
|
}, workerAddress, false, grpcDialOption)
|
|
}
|