fresh filer store bootstrap from the oldest peer

This commit is contained in:
chrislu
2022-05-30 21:27:48 -07:00
parent 490e0850bf
commit 6adc42147f
6 changed files with 56 additions and 31 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
"sort"
"strings"
"time"
@@ -68,13 +69,33 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio
return f
}
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate) {
func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
if len(existingNodes) == 0 {
return
}
sort.Slice(existingNodes, func(i, j int) bool {
return existingNodes[i].CreatedAtNs < existingNodes[j].CreatedAtNs
})
earliestNode := existingNodes[0]
if earliestNode.Address == string(self) {
return
}
glog.V(0).Infof("bootstrap from %v", earliestNode.Address)
err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
}, true)
return
}
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
for _, peerUpdate := range existingNodes {
f.MetaAggregator.OnPeerUpdate(peerUpdate)
f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom)
}
}
@@ -104,14 +125,13 @@ func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNod
return
}
func (f *Filer) SetStore(store FilerStore) {
func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
f.Store = NewFilerStoreWrapper(store)
f.setOrLoadFilerStoreSignature(store)
return f.setOrLoadFilerStoreSignature(store)
}
func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) (isFresh bool) {
storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
f.Signature = util.RandomInt32()
@@ -121,12 +141,14 @@ func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
}
glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
return true
} else if err == nil && len(storeIdBytes) == 4 {
f.Signature = int32(util.BytesToUint32(storeIdBytes))
glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
} else {
glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
}
return false
}
func (f *Filer) GetStore() (store FilerStore) {