add mq agent options to server.go

This commit is contained in:
chrislu
2025-07-09 09:02:25 -07:00
parent cf5a24983a
commit 14859f0e8c

View File

@@ -24,14 +24,15 @@ type ServerOptions struct {
} }
var ( var (
serverOptions ServerOptions serverOptions ServerOptions
masterOptions MasterOptions masterOptions MasterOptions
filerOptions FilerOptions filerOptions FilerOptions
s3Options S3Options s3Options S3Options
sftpOptions SftpOptions sftpOptions SftpOptions
iamOptions IamOptions iamOptions IamOptions
webdavOptions WebDavOption webdavOptions WebDavOption
mqBrokerOptions MessageQueueBrokerOptions mqBrokerOptions MessageQueueBrokerOptions
mqAgentServerOptions MessageQueueAgentOptions
) )
func init() { func init() {
@@ -78,6 +79,7 @@ var (
isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service")
isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway")
isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker") isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker")
isStartingMqAgent = cmdServer.Flag.Bool("mq.agent", false, "whether to start message queue agent")
False = false False = false
) )
@@ -191,6 +193,9 @@ func init() {
mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port") mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port")
mqAgentServerOptions.brokersString = cmdServer.Flag.String("mq.agent.brokers", "localhost:17777", "comma-separated message queue brokers")
mqAgentServerOptions.port = cmdServer.Flag.Int("mq.agent.port", 16777, "message queue agent gRPC listen port")
} }
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
@@ -219,6 +224,10 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingMqBroker { if *isStartingMqBroker {
*isStartingFiler = true *isStartingFiler = true
} }
if *isStartingMqAgent {
*isStartingMqBroker = true
*isStartingFiler = true
}
if *isStartingMasterServer { if *isStartingMasterServer {
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers) _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers)
@@ -258,6 +267,8 @@ func runServer(cmd *Command, args []string) bool {
mqBrokerOptions.ip = serverIp mqBrokerOptions.ip = serverIp
mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap() mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap()
mqBrokerOptions.filerGroup = filerOptions.filerGroup mqBrokerOptions.filerGroup = filerOptions.filerGroup
mqAgentServerOptions.ip = serverIp
mqAgentServerOptions.brokers = pb.ServerAddresses(*mqAgentServerOptions.brokersString).ToAddresses()
// serverOptions.v.pulseSeconds = pulseSeconds // serverOptions.v.pulseSeconds = pulseSeconds
// masterOptions.pulseSeconds = pulseSeconds // masterOptions.pulseSeconds = pulseSeconds
@@ -346,6 +357,13 @@ func runServer(cmd *Command, args []string) bool {
}() }()
} }
if *isStartingMqAgent {
go func() {
time.Sleep(2 * time.Second)
mqAgentServerOptions.startQueueAgent()
}()
}
// start volume server // start volume server
if *isStartingVolumeServer { if *isStartingVolumeServer {
minFreeSpaces := util.MustParseMinFreeSpace(*volumeMinFreeSpace, *volumeMinFreeSpacePercent) minFreeSpaces := util.MustParseMinFreeSpace(*volumeMinFreeSpace, *volumeMinFreeSpacePercent)