refactoring

This commit is contained in:
chrislu
2023-03-21 23:01:49 -07:00
parent de4545c28b
commit 5db9fcccd4
11 changed files with 182 additions and 44 deletions

View File

@@ -19,22 +19,35 @@ const (
RetryForeverOnError
)
// MetadataFollowOption is used to control the behavior of the metadata following
// process. Part of it is used as a cursor to resume the following process.
type MetadataFollowOption struct {
ClientName string
ClientId int32
ClientEpoch int32
SelfSignature int32
PathPrefix string
AdditionalPathPrefixes []string
DirectoriesToWatch []string
StartTsNs int64
StopTsNs int64
EventErrorType EventErrorType
}
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32, clientEpoch int32,
pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
err := WithFilerClient(true, clientId, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
err := WithFilerClient(true, option.SelfSignature, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(option, processEventFn))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
return err
}
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName string, clientId int32, clientEpoch int32, pathPrefix string, directoriesToWatch []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, nil, directoriesToWatch, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(option, processEventFn))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
@@ -42,20 +55,20 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, clientName
return nil
}
func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch int32, pathPrefix string, additionalPathPrefixes []string, directoriesToWatch []string, lastTsNs *int64, untilTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error {
func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn ProcessMetadataFunc) func(client filer_pb.SeaweedFilerClient) error {
return func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: clientName,
PathPrefix: pathPrefix,
PathPrefixes: additionalPathPrefixes,
Directories: directoriesToWatch,
SinceNs: *lastTsNs,
Signature: selfSignature,
ClientId: clientId,
ClientEpoch: clientEpoch,
UntilNs: untilTsNs,
ClientName: option.ClientName,
PathPrefix: option.PathPrefix,
PathPrefixes: option.AdditionalPathPrefixes,
Directories: option.DirectoriesToWatch,
SinceNs: option.StartTsNs,
Signature: option.SelfSignature,
ClientId: option.ClientId,
ClientEpoch: option.ClientEpoch,
UntilNs: option.StopTsNs,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
@@ -71,7 +84,7 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch in
}
if err := processEventFn(resp); err != nil {
switch eventErrorType {
switch option.EventErrorType {
case TrivialOnError:
glog.Errorf("process %v: %v", resp, err)
case FatalOnError:
@@ -87,7 +100,7 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, clientEpoch in
glog.Errorf("process %v: %v", resp, err)
}
}
*lastTsNs = resp.TsNs
option.StartTsNs = resp.TsNs
}
}
}