Files
seaweedFS/weed/pb/filer_pb_tail.go
Chris Lu 84b8a8e010 filer.sync: fix checkpoint not being saved properly (#7719)
* filer.sync: fix race condition on first checkpoint save

Initialize lastWriteTime to time.Now() instead of zero time to prevent
the first checkpoint save from being triggered immediately when the
first event arrives. This gives async jobs time to complete and update
the watermark before the checkpoint is saved.

Previously, the zero time caused lastWriteTime.Add(3s).Before(now) to
be true on the first event, triggering an immediate checkpoint save
attempt. But since jobs are processed asynchronously, the watermark
was still 0 (initial value), causing the save to be skipped due to
the 'if offsetTsNs == 0 { return nil }' check.

Fixes #7717

* filer.sync: save checkpoint on graceful shutdown

Add graceful shutdown handling to save the final checkpoint when
filer.sync is terminated. Previously, any sync progress within the
last 3-second checkpoint interval would be lost on shutdown.

Changes:
- Add syncState struct to track current processor and offset save info
- Add atomic pointers syncStateA2B and syncStateB2A for both directions
- Register grace.OnInterrupt hook to save checkpoints on shutdown
- Modify doSubscribeFilerMetaChanges to update sync state atomically

This ensures that when filer.sync is restarted, it resumes from the
correct position instead of potentially replaying old events.

Fixes #7717
2025-12-11 10:25:02 -08:00

130 lines
3.6 KiB
Go

package pb
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"io"
"time"
)
type EventErrorType int
const (
TrivialOnError EventErrorType = iota
FatalOnError
RetryForeverOnError
DontLogError
)
// MetadataFollowOption is used to control the behavior of the metadata following
// process. Part of it is used as a cursor to resume the following process.
type MetadataFollowOption struct {
ClientName string
ClientId int32
ClientEpoch int32
SelfSignature int32
PathPrefix string
AdditionalPathPrefixes []string
DirectoriesToWatch []string
StartTsNs int64
StopTsNs int64
EventErrorType EventErrorType
}
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
err := WithFilerClient(true, option.SelfSignature, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(option, processEventFn))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %w", err)
}
return err
}
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, option *MetadataFollowOption, processEventFn ProcessMetadataFunc) error {
err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(option, processEventFn))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %w", err)
}
return nil
}
func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn ProcessMetadataFunc) func(client filer_pb.SeaweedFilerClient) error {
return func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: option.ClientName,
PathPrefix: option.PathPrefix,
PathPrefixes: option.AdditionalPathPrefixes,
Directories: option.DirectoriesToWatch,
SinceNs: option.StartTsNs,
Signature: option.SelfSignature,
ClientId: option.ClientId,
ClientEpoch: option.ClientEpoch,
UntilNs: option.StopTsNs,
})
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
switch option.EventErrorType {
case TrivialOnError:
glog.Errorf("process %v: %v", resp, err)
case FatalOnError:
glog.Fatalf("process %v: %v", resp, err)
case RetryForeverOnError:
util.RetryUntil("followMetaUpdates", func() error {
return processEventFn(resp)
}, func(err error) bool {
glog.Errorf("process %v: %v", resp, err)
return true
})
case DontLogError:
// pass
default:
glog.Errorf("process %v: %v", resp, err)
}
}
option.StartTsNs = resp.TsNs
}
}
}
func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
var counter int64
var lastWriteTime = time.Now()
return func(resp *filer_pb.SubscribeMetadataResponse) error {
if err := processEventFn(resp); err != nil {
return err
}
counter++
if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
lastWriteTime = time.Now()
if err := offsetFunc(counter, resp.TsNs); err != nil {
return err
}
counter = 0
}
return nil
}
}