This commit is contained in:
chrislu
2022-05-30 22:47:29 -07:00
28 changed files with 854 additions and 745 deletions

View File

@@ -39,15 +39,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
var isDone bool
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if isDone {
return nil
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -55,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
@@ -70,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
break
}
}
if isDone {
return nil
}
time.Sleep(1127 * time.Millisecond)
}
@@ -98,15 +105,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
var isDone bool
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if isDone {
return nil
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -119,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
@@ -134,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
break
}
}
if isDone {
return nil
}
}
return readInMemoryLogErr

View File

@@ -151,7 +151,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
// TODO deprecated, will be be removed after 2020-12-31
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
isFresh := fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
@@ -164,7 +164,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
fs.filer.AggregateFromPeers(option.Host)
existingNodes := fs.filer.ListExistingPeerUpdates()
startFromTime := time.Now().Add(-filer.LogFlushInterval)
if isFresh {
glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err == nil {
glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes)
}
}
fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
fs.filer.LoadBuckets()

View File

@@ -15,9 +15,10 @@ func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.Lis
for _, node := range clusterNodes {
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
Address: string(node.Address),
Version: node.Version,
IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
Address: string(node.Address),
Version: node.Version,
IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
CreatedAtNs: node.CreatedTs.UnixNano(),
})
}
return resp, nil

View File

@@ -342,7 +342,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
return seq
}
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}