Use Unix sockets for gRPC in mini mode (#8856)
* Use Unix sockets for gRPC between co-located services in mini mode In `weed mini`, all services run in one process. Previously, inter-service gRPC traffic (volume↔master, filer↔master, S3↔filer, worker↔admin, etc.) went through TCP loopback. This adds a gRPC Unix socket registry in the pb package: mini mode registers a socket path per gRPC port at startup, each gRPC server additionally listens on its socket, and GrpcDial transparently routes to the socket via WithContextDialer when a match is found. Standalone commands (weed master, weed filer, etc.) are unaffected since no sockets are registered. TCP listeners are kept for external clients. * Handle Serve error and clean up socket file in ServeGrpcOnLocalSocket Log non-expected errors from grpcServer.Serve (ignoring grpc.ErrServerStopped) and always remove the Unix socket file when Serve returns, ensuring cleanup on Stop/GracefulStop.
This commit is contained in:
@@ -108,6 +108,8 @@ func (s *WorkerGrpcServer) StartWithTLS(port int) error {
|
|||||||
go s.cleanupRoutine()
|
go s.cleanupRoutine()
|
||||||
go s.activeLogFetchLoop()
|
go s.activeLogFetchLoop()
|
||||||
|
|
||||||
|
pb.ServeGrpcOnLocalSocket(grpcServer, port)
|
||||||
|
|
||||||
// Start serving in a goroutine
|
// Start serving in a goroutine
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.grpcServer.Serve(listener); err != nil {
|
if err := s.grpcServer.Serve(listener); err != nil {
|
||||||
|
|||||||
@@ -421,6 +421,7 @@ func (fo *FilerOptions) startFiler() {
|
|||||||
go grpcS.Serve(grpcLocalL)
|
go grpcS.Serve(grpcLocalL)
|
||||||
}
|
}
|
||||||
go grpcS.Serve(grpcL)
|
go grpcS.Serve(grpcL)
|
||||||
|
pb.ServeGrpcOnLocalSocket(grpcS, grpcPort)
|
||||||
|
|
||||||
if runtime.GOOS != "windows" {
|
if runtime.GOOS != "windows" {
|
||||||
localSocket := *fo.localSocket
|
localSocket := *fo.localSocket
|
||||||
|
|||||||
@@ -254,6 +254,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
|
|||||||
go grpcS.Serve(grpcLocalL)
|
go grpcS.Serve(grpcLocalL)
|
||||||
}
|
}
|
||||||
go grpcS.Serve(grpcL)
|
go grpcS.Serve(grpcL)
|
||||||
|
pb.ServeGrpcOnLocalSocket(grpcS, grpcPort)
|
||||||
|
|
||||||
// For multi-master mode with non-Hashicorp raft, wait and check if we should join
|
// For multi-master mode with non-Hashicorp raft, wait and check if we should join
|
||||||
if !*masterOption.raftHashicorp && !isSingleMaster {
|
if !*masterOption.raftHashicorp && !isSingleMaster {
|
||||||
|
|||||||
@@ -823,6 +823,16 @@ func runMini(cmd *Command, args []string) bool {
|
|||||||
miniS3Options.filer = &filerAddress
|
miniS3Options.filer = &filerAddress
|
||||||
miniWebDavOptions.filer = &filerAddress
|
miniWebDavOptions.filer = &filerAddress
|
||||||
|
|
||||||
|
// Register Unix socket paths for gRPC services so local inter-service
|
||||||
|
// communication goes through Unix sockets instead of TCP.
|
||||||
|
pb.RegisterLocalGrpcSocket(*miniMasterOptions.portGrpc, fmt.Sprintf("/tmp/seaweedfs-master-grpc-%d.sock", *miniMasterOptions.portGrpc))
|
||||||
|
pb.RegisterLocalGrpcSocket(*miniOptions.v.portGrpc, fmt.Sprintf("/tmp/seaweedfs-volume-grpc-%d.sock", *miniOptions.v.portGrpc))
|
||||||
|
pb.RegisterLocalGrpcSocket(*miniFilerOptions.portGrpc, fmt.Sprintf("/tmp/seaweedfs-filer-grpc-%d.sock", *miniFilerOptions.portGrpc))
|
||||||
|
if *miniS3Options.portGrpc > 0 {
|
||||||
|
pb.RegisterLocalGrpcSocket(*miniS3Options.portGrpc, fmt.Sprintf("/tmp/seaweedfs-s3-grpc-%d.sock", *miniS3Options.portGrpc))
|
||||||
|
}
|
||||||
|
pb.RegisterLocalGrpcSocket(*miniAdminOptions.grpcPort, fmt.Sprintf("/tmp/seaweedfs-admin-grpc-%d.sock", *miniAdminOptions.grpcPort))
|
||||||
|
|
||||||
go stats_collect.StartMetricsServer(*miniMetricsHttpIp, *miniMetricsHttpPort)
|
go stats_collect.StartMetricsServer(*miniMetricsHttpIp, *miniMetricsHttpPort)
|
||||||
|
|
||||||
if *miniMasterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 {
|
if *miniMasterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 {
|
||||||
|
|||||||
@@ -376,6 +376,7 @@ func (s3opt *S3Options) startS3Server() bool {
|
|||||||
go grpcS.Serve(grpcLocalL)
|
go grpcS.Serve(grpcLocalL)
|
||||||
}
|
}
|
||||||
go grpcS.Serve(grpcL)
|
go grpcS.Serve(grpcL)
|
||||||
|
pb.ServeGrpcOnLocalSocket(grpcS, grpcPort)
|
||||||
|
|
||||||
if *s3opt.tlsPrivateKey != "" {
|
if *s3opt.tlsPrivateKey != "" {
|
||||||
// Check for port conflict when both HTTP and HTTPS are enabled on the same port
|
// Check for port conflict when both HTTP and HTTPS are enabled on the same port
|
||||||
|
|||||||
@@ -415,6 +415,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
|
|||||||
glog.Fatalf("start gRPC service failed, %s", err)
|
glog.Fatalf("start gRPC service failed, %s", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
pb.ServeGrpcOnLocalSocket(grpcS, grpcPort)
|
||||||
return grpcS
|
return grpcS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand/v2"
|
"math/rand/v2"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -42,6 +44,12 @@ var (
|
|||||||
// cache grpc connections
|
// cache grpc connections
|
||||||
grpcClients = make(map[string]*versionedGrpcClient)
|
grpcClients = make(map[string]*versionedGrpcClient)
|
||||||
grpcClientsLock sync.Mutex
|
grpcClientsLock sync.Mutex
|
||||||
|
|
||||||
|
// localGrpcSockets maps gRPC port numbers to Unix socket paths.
|
||||||
|
// When registered (by mini mode), gRPC clients connect via Unix socket
|
||||||
|
// instead of TCP for local services.
|
||||||
|
localGrpcSockets = make(map[int]string)
|
||||||
|
localGrpcSocketsLock sync.RWMutex
|
||||||
)
|
)
|
||||||
|
|
||||||
type versionedGrpcClient struct {
|
type versionedGrpcClient struct {
|
||||||
@@ -55,6 +63,59 @@ func init() {
|
|||||||
http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
|
http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterLocalGrpcSocket registers a Unix socket path for a gRPC port.
|
||||||
|
// When a gRPC client dials an address on this port, it uses the Unix socket.
|
||||||
|
func RegisterLocalGrpcSocket(grpcPort int, socketPath string) {
|
||||||
|
localGrpcSocketsLock.Lock()
|
||||||
|
defer localGrpcSocketsLock.Unlock()
|
||||||
|
localGrpcSockets[grpcPort] = socketPath
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLocalGrpcSocket returns the Unix socket path for a gRPC port, or empty if not registered.
|
||||||
|
func GetLocalGrpcSocket(grpcPort int) string {
|
||||||
|
localGrpcSocketsLock.RLock()
|
||||||
|
defer localGrpcSocketsLock.RUnlock()
|
||||||
|
return localGrpcSockets[grpcPort]
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveLocalGrpcSocket extracts the port from a gRPC address and returns
|
||||||
|
// the registered Unix socket path, if any.
|
||||||
|
func resolveLocalGrpcSocket(address string) string {
|
||||||
|
_, portStr, err := net.SplitHostPort(address)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
port, err := strconv.Atoi(portStr)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return GetLocalGrpcSocket(port)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeGrpcOnLocalSocket starts serving a gRPC server on a Unix socket
|
||||||
|
// if one is registered for the given port.
|
||||||
|
func ServeGrpcOnLocalSocket(grpcServer *grpc.Server, grpcPort int) {
|
||||||
|
socketPath := GetLocalGrpcSocket(grpcPort)
|
||||||
|
if socketPath == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
glog.Warningf("Failed to remove old gRPC socket %s: %v", socketPath, err)
|
||||||
|
}
|
||||||
|
listener, err := net.Listen("unix", socketPath)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to listen on gRPC Unix socket %s: %v", socketPath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.V(0).Infof("gRPC also listening on Unix socket %s", socketPath)
|
||||||
|
go func() {
|
||||||
|
if err := grpcServer.Serve(listener); err != nil && err != grpc.ErrServerStopped {
|
||||||
|
glog.Errorf("gRPC Unix socket server error on %s: %v", socketPath, err)
|
||||||
|
}
|
||||||
|
os.Remove(socketPath)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
|
func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
|
||||||
var options []grpc.ServerOption
|
var options []grpc.ServerOption
|
||||||
options = append(options,
|
options = append(options,
|
||||||
@@ -97,6 +158,14 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
|
|||||||
func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
var options []grpc.DialOption
|
var options []grpc.DialOption
|
||||||
|
|
||||||
|
// Route through Unix socket if one is registered for this address's port
|
||||||
|
if socketPath := resolveLocalGrpcSocket(address); socketPath != "" {
|
||||||
|
options = append(options, grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
|
||||||
|
var d net.Dialer
|
||||||
|
return d.DialContext(ctx, "unix", socketPath)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
options = append(options,
|
options = append(options,
|
||||||
grpc.WithDefaultCallOptions(
|
grpc.WithDefaultCallOptions(
|
||||||
grpc.MaxCallSendMsgSize(Max_Message_Size),
|
grpc.MaxCallSendMsgSize(Max_Message_Size),
|
||||||
|
|||||||
Reference in New Issue
Block a user