disconnect from old subscribers
This commit is contained in:
@@ -57,11 +57,10 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var clientEpoch int32
|
|
||||||
metadataFollowOption := &pb.MetadataFollowOption{
|
metadataFollowOption := &pb.MetadataFollowOption{
|
||||||
ClientName: "mount",
|
ClientName: "mount",
|
||||||
ClientId: selfSignature,
|
ClientId: selfSignature,
|
||||||
ClientEpoch: clientEpoch,
|
ClientEpoch: 1,
|
||||||
SelfSignature: selfSignature,
|
SelfSignature: selfSignature,
|
||||||
PathPrefix: dir,
|
PathPrefix: dir,
|
||||||
AdditionalPathPrefixes: nil,
|
AdditionalPathPrefixes: nil,
|
||||||
@@ -71,7 +70,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
|||||||
EventErrorType: pb.FatalOnError,
|
EventErrorType: pb.FatalOnError,
|
||||||
}
|
}
|
||||||
util.RetryUntil("followMetaUpdates", func() error {
|
util.RetryUntil("followMetaUpdates", func() error {
|
||||||
clientEpoch++
|
metadataFollowOption.ClientEpoch++
|
||||||
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
|
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
glog.Errorf("follow metadata updates: %v", err)
|
glog.Errorf("follow metadata updates: %v", err)
|
||||||
|
|||||||
@@ -33,11 +33,10 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var clientEpoch int32
|
|
||||||
metadataFollowOption := &pb.MetadataFollowOption{
|
metadataFollowOption := &pb.MetadataFollowOption{
|
||||||
ClientName: clientName,
|
ClientName: clientName,
|
||||||
ClientId: s3a.randomClientId,
|
ClientId: s3a.randomClientId,
|
||||||
ClientEpoch: clientEpoch,
|
ClientEpoch: 1,
|
||||||
SelfSignature: 0,
|
SelfSignature: 0,
|
||||||
PathPrefix: prefix,
|
PathPrefix: prefix,
|
||||||
AdditionalPathPrefixes: nil,
|
AdditionalPathPrefixes: nil,
|
||||||
@@ -47,7 +46,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p
|
|||||||
EventErrorType: pb.FatalOnError,
|
EventErrorType: pb.FatalOnError,
|
||||||
}
|
}
|
||||||
util.RetryUntil("followIamChanges", func() error {
|
util.RetryUntil("followIamChanges", func() error {
|
||||||
clientEpoch++
|
metadataFollowOption.ClientEpoch++
|
||||||
return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
|
return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
glog.V(0).Infof("iam follow metadata changes: %v", err)
|
glog.V(0).Infof("iam follow metadata changes: %v", err)
|
||||||
|
|||||||
@@ -24,11 +24,13 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
|||||||
|
|
||||||
peerAddress := findClientAddress(stream.Context(), 0)
|
peerAddress := findClientAddress(stream.Context(), 0)
|
||||||
|
|
||||||
alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
|
isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
|
||||||
if alreadyKnown {
|
if isReplacing {
|
||||||
|
fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
|
||||||
|
} else if alreadyKnown {
|
||||||
return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
|
return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
|
||||||
}
|
}
|
||||||
defer fs.deleteClient(clientName, req.ClientId, req.ClientEpoch)
|
defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
|
||||||
|
|
||||||
lastReadTime := time.Unix(0, req.SinceNs)
|
lastReadTime := time.Unix(0, req.SinceNs)
|
||||||
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||||
@@ -64,6 +66,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
|||||||
fs.filer.MetaAggregator.ListenersLock.Lock()
|
fs.filer.MetaAggregator.ListenersLock.Lock()
|
||||||
fs.filer.MetaAggregator.ListenersCond.Wait()
|
fs.filer.MetaAggregator.ListenersCond.Wait()
|
||||||
fs.filer.MetaAggregator.ListenersLock.Unlock()
|
fs.filer.MetaAggregator.ListenersLock.Unlock()
|
||||||
|
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}, eachLogEntryFn)
|
}, eachLogEntryFn)
|
||||||
if readInMemoryLogErr != nil {
|
if readInMemoryLogErr != nil {
|
||||||
@@ -78,6 +83,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
|||||||
if isDone {
|
if isDone {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
|
||||||
|
glog.V(0).Infof("client %v is closed", clientName)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(1127 * time.Millisecond)
|
time.Sleep(1127 * time.Millisecond)
|
||||||
}
|
}
|
||||||
@@ -93,13 +102,15 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
|||||||
// use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
|
// use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
|
||||||
req.ClientId = -req.ClientId
|
req.ClientId = -req.ClientId
|
||||||
|
|
||||||
alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
|
isReplacing, alreadyKnown, clientName := fs.addClient("local", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
|
||||||
if alreadyKnown {
|
if isReplacing {
|
||||||
|
fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
|
||||||
|
} else if alreadyKnown {
|
||||||
return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
|
return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
glog.V(0).Infof(" - %v local subscribe %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
|
glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
|
||||||
fs.deleteClient(clientName, req.ClientId, req.ClientEpoch)
|
fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
lastReadTime := time.Unix(0, req.SinceNs)
|
lastReadTime := time.Unix(0, req.SinceNs)
|
||||||
@@ -141,6 +152,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
|||||||
fs.listenersLock.Lock()
|
fs.listenersLock.Lock()
|
||||||
fs.listenersCond.Wait()
|
fs.listenersCond.Wait()
|
||||||
fs.listenersLock.Unlock()
|
fs.listenersLock.Unlock()
|
||||||
|
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}, eachLogEntryFn)
|
}, eachLogEntryFn)
|
||||||
if readInMemoryLogErr != nil {
|
if readInMemoryLogErr != nil {
|
||||||
@@ -155,6 +169,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
|||||||
if isDone {
|
if isDone {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return readInMemoryLogErr
|
return readInMemoryLogErr
|
||||||
@@ -274,15 +291,16 @@ func matchByDirectory(dirPath string, directories []string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32, clientEpoch int32) (alreadyKnown bool, clientName string) {
|
func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) {
|
||||||
clientName = clientType + "@" + clientAddress
|
clientName = clientType + "@" + clientAddress
|
||||||
glog.V(0).Infof("+ listener %v", clientName)
|
glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
|
||||||
if clientId != 0 {
|
if clientId != 0 {
|
||||||
fs.knownListenersLock.Lock()
|
fs.knownListenersLock.Lock()
|
||||||
defer fs.knownListenersLock.Unlock()
|
defer fs.knownListenersLock.Unlock()
|
||||||
epoch, found := fs.knownListeners[clientId]
|
epoch, found := fs.knownListeners[clientId]
|
||||||
if !found || epoch < clientEpoch {
|
if !found || epoch < clientEpoch {
|
||||||
fs.knownListeners[clientId] = clientEpoch
|
fs.knownListeners[clientId] = clientEpoch
|
||||||
|
isReplacing = true
|
||||||
} else {
|
} else {
|
||||||
alreadyKnown = true
|
alreadyKnown = true
|
||||||
}
|
}
|
||||||
@@ -290,8 +308,8 @@ func (fs *FilerServer) addClient(clientType string, clientAddress string, client
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpoch int32) {
|
func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) {
|
||||||
glog.V(0).Infof("- listener %v", clientName)
|
glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
|
||||||
if clientId != 0 {
|
if clientId != 0 {
|
||||||
fs.knownListenersLock.Lock()
|
fs.knownListenersLock.Lock()
|
||||||
defer fs.knownListenersLock.Unlock()
|
defer fs.knownListenersLock.Unlock()
|
||||||
@@ -301,3 +319,15 @@ func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) hasClient(clientId int32, clientEpoch int32) bool {
|
||||||
|
if clientId != 0 {
|
||||||
|
fs.knownListenersLock.Lock()
|
||||||
|
defer fs.knownListenersLock.Unlock()
|
||||||
|
epoch, found := fs.knownListeners[clientId]
|
||||||
|
if found && epoch <= clientEpoch {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user