grpc: reduce client idle pings to avoid ENHANCE_YOUR_CALM (#7885)

* grpc: reduce client idle pings to avoid ENHANCE_YOUR_CALM (too_many_pings)

* test: use context.WithTimeout and pb constants for keepalive

* test(kafka): use separate dial and client contexts in NewDirectBrokerClient

* test(kafka): fix client context usage in NewDirectBrokerClient
This commit is contained in:
Chris Lu
2025-12-26 10:58:18 -08:00
committed by GitHub
parent c260e6a22e
commit 5aa111708d
3 changed files with 24 additions and 14 deletions

View File

@@ -87,7 +87,7 @@ public class FilerGrpcClient {
.maxHeaderListSize(16 * 1024 * 1024) .maxHeaderListSize(16 * 1024 * 1024)
.keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS) .keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
.keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS) .keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true) .keepAliveWithoutCalls(false)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
.withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024); .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024);

View File

@@ -14,6 +14,8 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -119,22 +121,28 @@ type PublisherSession struct {
} }
func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) { func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) {
ctx, cancel := context.WithCancel(context.Background()) // Use a short-lived context for dialing so we don't store a canceled
// context in the returned client. The client's operational context
// (used by methods) should be cancellable independently.
dialCtx, dialCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer dialCancel()
// Add connection timeout and keepalive settings // Add keepalive settings; use exported server constants to keep values in sync.
conn, err := grpc.DialContext(ctx, brokerAddr, conn, err := grpc.DialContext(dialCtx, brokerAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithTimeout(30*time.Second),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // Increased from 10s to 30s Time: pb.GrpcKeepAliveTime, // align with server MinTime
Timeout: 10 * time.Second, // Increased from 5s to 10s Timeout: pb.GrpcKeepAliveTimeout, // align with server timeout
PermitWithoutStream: false, // Changed to false to reduce pings PermitWithoutStream: false, // reduce pings when idle
})) }))
if err != nil { if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to broker: %v", err) return nil, fmt.Errorf("failed to connect to broker: %v", err)
} }
// Create a long-lived context for the client's lifetime and store it
// in the returned DirectBrokerClient so callers can cancel when done.
clientCtx, clientCancel := context.WithCancel(context.Background())
client := mq_pb.NewSeaweedMessagingClient(conn) client := mq_pb.NewSeaweedMessagingClient(conn)
return &DirectBrokerClient{ return &DirectBrokerClient{
@@ -142,8 +150,8 @@ func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) {
conn: conn, conn: conn,
client: client, client: client,
publishers: make(map[string]*PublisherSession), publishers: make(map[string]*PublisherSession),
ctx: ctx, ctx: clientCtx,
cancel: cancel, cancel: clientCancel,
}, nil }, nil
} }

View File

@@ -94,9 +94,11 @@ func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...gr
grpc.WaitForReady(waitForReady), grpc.WaitForReady(waitForReady),
), ),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: GrpcKeepAliveTime, // client ping server if no activity for this long Time: GrpcKeepAliveTime, // client ping server if no activity for this long
Timeout: GrpcKeepAliveTimeout, // ping timeout Timeout: GrpcKeepAliveTimeout, // ping timeout
PermitWithoutStream: true, // Disable pings when there are no active streams to avoid triggering
// server enforcement for too-frequent pings from idle clients.
PermitWithoutStream: false,
})) }))
for _, opt := range opts { for _, opt := range opts {
if opt != nil { if opt != nil {