* filer.sync: replace O(n) conflict check with O(depth) index lookups The MetadataProcessor.conflictsWith() scanned all active jobs linearly for every new event dispatch. At high concurrency (256-1024), this O(n) scan under the activeJobsLock became a bottleneck that throttled the event dispatch pipeline, negating the benefit of higher -concurrency values. Replace the linear scan with three index maps: - activeFilePaths: O(1) exact file path lookup - activeDirPaths: O(1) directory path lookup per ancestor - descendantCount: O(1) check for active jobs under a directory Conflict check is now O(depth) where depth is the path depth (typically 3-6 levels), constant regardless of active job count. Benchmark confirms ~81ns per check whether there are 32 or 1024 active jobs. Also replace the O(n) watermark scan with minActiveTs tracking so non-oldest job completions are O(1). Ref: #8771 * filer.sync: replace O(n) watermark rescan with min-heap lazy deletion Address review feedback: - Replace minActiveTs O(n) rescan with a tsMinHeap using lazy deletion. Each TsNs is pushed once and popped once, giving O(log n) amortized watermark tracking regardless of completion order. - Fix benchmark to consume conflictsWith result via package-level sink variable to prevent compiler elision. The watermark advancement semantics (conservative, sets to completing job's TsNs) are unchanged from the original code. This is intentionally safe for idempotent replay on restart.
244 lines
6.7 KiB
Go
244 lines
6.7 KiB
Go
package command
|
|
|
|
import (
|
|
"container/heap"
|
|
"path"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// tsMinHeap implements heap.Interface for int64 timestamps.
|
|
type tsMinHeap []int64
|
|
|
|
func (h tsMinHeap) Len() int { return len(h) }
|
|
func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] }
|
|
func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) }
|
|
func (h *tsMinHeap) Pop() any {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[:n-1]
|
|
return x
|
|
}
|
|
|
|
type syncJobPaths struct {
|
|
path util.FullPath
|
|
newPath util.FullPath // empty for non-renames
|
|
isDirectory bool
|
|
}
|
|
|
|
type MetadataProcessor struct {
|
|
activeJobs map[int64]*syncJobPaths
|
|
activeJobsLock sync.Mutex
|
|
activeJobsCond *sync.Cond
|
|
concurrencyLimit int
|
|
fn pb.ProcessMetadataFunc
|
|
processedTsWatermark atomic.Int64
|
|
|
|
// Indexes for O(depth) conflict detection, replacing O(n) linear scan.
|
|
// activeFilePaths counts active file jobs at each exact path.
|
|
activeFilePaths map[util.FullPath]int
|
|
// activeDirPaths counts active directory jobs at each exact path.
|
|
activeDirPaths map[util.FullPath]int
|
|
// descendantCount counts active jobs (file or dir) strictly under each directory.
|
|
descendantCount map[util.FullPath]int
|
|
|
|
// tsHeap is a min-heap of active job timestamps with lazy deletion,
|
|
// used for O(log n) amortized watermark tracking.
|
|
tsHeap tsMinHeap
|
|
}
|
|
|
|
func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
|
|
t := &MetadataProcessor{
|
|
fn: fn,
|
|
activeJobs: make(map[int64]*syncJobPaths),
|
|
concurrencyLimit: concurrency,
|
|
activeFilePaths: make(map[util.FullPath]int),
|
|
activeDirPaths: make(map[util.FullPath]int),
|
|
descendantCount: make(map[util.FullPath]int),
|
|
}
|
|
t.processedTsWatermark.Store(offsetTsNs)
|
|
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
|
|
return t
|
|
}
|
|
|
|
// pathAncestors returns all proper ancestor directories of p.
|
|
// For "/a/b/c", returns ["/a/b", "/a", "/"].
|
|
func pathAncestors(p util.FullPath) []util.FullPath {
|
|
var ancestors []util.FullPath
|
|
s := string(p)
|
|
for {
|
|
parent := path.Dir(s)
|
|
if parent == s {
|
|
break
|
|
}
|
|
ancestors = append(ancestors, util.FullPath(parent))
|
|
s = parent
|
|
}
|
|
return ancestors
|
|
}
|
|
|
|
// addPathToIndex registers a path in the conflict detection indexes.
|
|
// Must be called under activeJobsLock.
|
|
func (t *MetadataProcessor) addPathToIndex(p util.FullPath, isDirectory bool) {
|
|
if isDirectory {
|
|
t.activeDirPaths[p]++
|
|
} else {
|
|
t.activeFilePaths[p]++
|
|
}
|
|
for _, ancestor := range pathAncestors(p) {
|
|
t.descendantCount[ancestor]++
|
|
}
|
|
}
|
|
|
|
// removePathFromIndex unregisters a path from the conflict detection indexes.
|
|
// Must be called under activeJobsLock.
|
|
func (t *MetadataProcessor) removePathFromIndex(p util.FullPath, isDirectory bool) {
|
|
if isDirectory {
|
|
if t.activeDirPaths[p] <= 1 {
|
|
delete(t.activeDirPaths, p)
|
|
} else {
|
|
t.activeDirPaths[p]--
|
|
}
|
|
} else {
|
|
if t.activeFilePaths[p] <= 1 {
|
|
delete(t.activeFilePaths, p)
|
|
} else {
|
|
t.activeFilePaths[p]--
|
|
}
|
|
}
|
|
for _, ancestor := range pathAncestors(p) {
|
|
if t.descendantCount[ancestor] <= 1 {
|
|
delete(t.descendantCount, ancestor)
|
|
} else {
|
|
t.descendantCount[ancestor]--
|
|
}
|
|
}
|
|
}
|
|
|
|
// pathConflicts checks if a single path conflicts with any active job.
|
|
// Conflict rules match pairShouldWaitFor:
|
|
// - file vs file: exact same path
|
|
// - file vs dir: file.IsUnder(dir)
|
|
// - dir vs file: file.IsUnder(dir)
|
|
// - dir vs dir: either IsUnder the other
|
|
func (t *MetadataProcessor) pathConflicts(p util.FullPath, isDirectory bool) bool {
|
|
if isDirectory {
|
|
// Any active job (file or dir) strictly under this directory?
|
|
if t.descendantCount[p] > 0 {
|
|
return true
|
|
}
|
|
} else {
|
|
// Exact same file already active?
|
|
if t.activeFilePaths[p] > 0 {
|
|
return true
|
|
}
|
|
}
|
|
// Any active directory that is a proper ancestor of p?
|
|
for _, ancestor := range pathAncestors(p) {
|
|
if t.activeDirPaths[ancestor] > 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
|
|
p, newPath, isDirectory := extractPathsFromMetadata(resp)
|
|
if t.pathConflicts(p, isDirectory) {
|
|
return true
|
|
}
|
|
if newPath != "" && t.pathConflicts(newPath, isDirectory) {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
|
|
if filer_pb.IsEmpty(resp) {
|
|
return
|
|
}
|
|
|
|
t.activeJobsLock.Lock()
|
|
defer t.activeJobsLock.Unlock()
|
|
|
|
for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
|
|
t.activeJobsCond.Wait()
|
|
}
|
|
|
|
p, newPath, isDirectory := extractPathsFromMetadata(resp)
|
|
jobPaths := &syncJobPaths{path: p, newPath: newPath, isDirectory: isDirectory}
|
|
|
|
t.activeJobs[resp.TsNs] = jobPaths
|
|
t.addPathToIndex(p, isDirectory)
|
|
if newPath != "" {
|
|
t.addPathToIndex(newPath, isDirectory)
|
|
}
|
|
|
|
heap.Push(&t.tsHeap, resp.TsNs)
|
|
|
|
go func() {
|
|
|
|
if err := util.Retry("metadata processor", func() error {
|
|
return t.fn(resp)
|
|
}); err != nil {
|
|
glog.Errorf("process %v: %v", resp, err)
|
|
}
|
|
|
|
t.activeJobsLock.Lock()
|
|
defer t.activeJobsLock.Unlock()
|
|
|
|
delete(t.activeJobs, resp.TsNs)
|
|
t.removePathFromIndex(jobPaths.path, jobPaths.isDirectory)
|
|
if jobPaths.newPath != "" {
|
|
t.removePathFromIndex(jobPaths.newPath, jobPaths.isDirectory)
|
|
}
|
|
|
|
// Lazy-clean stale entries from heap top (already-completed jobs).
|
|
// Each entry is pushed once and popped once: O(log n) amortized.
|
|
for t.tsHeap.Len() > 0 {
|
|
if _, active := t.activeJobs[t.tsHeap[0]]; active {
|
|
break
|
|
}
|
|
heap.Pop(&t.tsHeap)
|
|
}
|
|
// If this was the oldest job, advance the watermark.
|
|
if t.tsHeap.Len() == 0 || resp.TsNs < t.tsHeap[0] {
|
|
t.processedTsWatermark.Store(resp.TsNs)
|
|
}
|
|
t.activeJobsCond.Signal()
|
|
}()
|
|
}
|
|
|
|
func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (p, newPath util.FullPath, isDirectory bool) {
|
|
oldEntry := resp.EventNotification.OldEntry
|
|
newEntry := resp.EventNotification.NewEntry
|
|
// create
|
|
if filer_pb.IsCreate(resp) {
|
|
p = util.FullPath(resp.Directory).Child(newEntry.Name)
|
|
isDirectory = newEntry.IsDirectory
|
|
return
|
|
}
|
|
if filer_pb.IsDelete(resp) {
|
|
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
|
|
isDirectory = oldEntry.IsDirectory
|
|
return
|
|
}
|
|
if filer_pb.IsUpdate(resp) {
|
|
p = util.FullPath(resp.Directory).Child(newEntry.Name)
|
|
isDirectory = newEntry.IsDirectory
|
|
return
|
|
}
|
|
// renaming
|
|
p = util.FullPath(resp.Directory).Child(oldEntry.Name)
|
|
isDirectory = oldEntry.IsDirectory
|
|
newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
|
|
return
|
|
}
|