* admin: add plugin runtime UI page and route wiring * pb: add plugin gRPC contract and generated bindings * admin/plugin: implement worker registry, runtime, monitoring, and config store * admin/dash: wire plugin runtime and expose plugin workflow APIs * command: add flags to enable plugin runtime * admin: rename remaining plugin v2 wording to plugin * admin/plugin: add detectable job type registry helper * admin/plugin: add scheduled detection and dispatch orchestration * admin/plugin: prefetch job type descriptors when workers connect * admin/plugin: add known job type discovery API and UI * admin/plugin: refresh design doc to match current implementation * admin/plugin: enforce per-worker scheduler concurrency limits * admin/plugin: use descriptor runtime defaults for scheduler policy * admin/ui: auto-load first known plugin job type on page open * admin/plugin: bootstrap persisted config from descriptor defaults * admin/plugin: dedupe scheduled proposals by dedupe key * admin/ui: add job type and state filters for plugin monitoring * admin/ui: add per-job-type plugin activity summary * admin/plugin: split descriptor read API from schema refresh * admin/ui: keep plugin summary metrics global while tables are filtered * admin/plugin: retry executor reservation before timing out * admin/plugin: expose scheduler states for monitoring * admin/ui: show per-job-type scheduler states in plugin monitor * pb/plugin: rename protobuf package to plugin * admin/plugin: rename pluginRuntime wiring to plugin * admin/plugin: remove runtime naming from plugin APIs and UI * admin/plugin: rename runtime files to plugin naming * admin/plugin: persist jobs and activities for monitor recovery * admin/plugin: lease one detector worker per job type * admin/ui: show worker load from plugin heartbeats * admin/plugin: skip stale workers for detector and executor picks * plugin/worker: add plugin worker command and stream runtime scaffold * plugin/worker: implement vacuum detect and execute handlers * admin/plugin: document external vacuum plugin worker starter * command: update plugin.worker help to reflect implemented flow * command/admin: drop legacy Plugin V2 label * plugin/worker: validate vacuum job type and respect min interval * plugin/worker: test no-op detect when min interval not elapsed * command/admin: document plugin.worker external process * plugin/worker: advertise configured concurrency in hello * command/plugin.worker: add jobType handler selection * command/plugin.worker: test handler selection by job type * command/plugin.worker: persist worker id in workingDir * admin/plugin: document plugin.worker jobType and workingDir flags * plugin/worker: support cancel request for in-flight work * plugin/worker: test cancel request acknowledgements * command/plugin.worker: document workingDir and jobType behavior * plugin/worker: emit executor activity events for monitor * plugin/worker: test executor activity builder * admin/plugin: send last successful run in detection request * admin/plugin: send cancel request when detect or execute context ends * admin/plugin: document worker cancel request responsibility * admin/handlers: expose plugin scheduler states API in no-auth mode * admin/handlers: test plugin scheduler states route registration * admin/plugin: keep worker id on worker-generated activity records * admin/plugin: test worker id propagation in monitor activities * admin/dash: always initialize plugin service * command/admin: remove plugin enable flags and default to enabled * admin/dash: drop pluginEnabled constructor parameter * admin/plugin UI: stop checking plugin enabled state * admin/plugin: remove docs for plugin enable flags * admin/dash: remove unused plugin enabled check method * admin/dash: fallback to in-memory plugin init when dataDir fails * admin/plugin API: expose worker gRPC port in status * command/plugin.worker: resolve admin gRPC port via plugin status * split plugin UI into overview/configuration/monitoring pages * Update layout_templ.go * add volume_balance plugin worker handler * wire plugin.worker CLI for volume_balance job type * add erasure_coding plugin worker handler * wire plugin.worker CLI for erasure_coding job type * support multi-job handlers in plugin worker runtime * allow plugin.worker jobType as comma-separated list * admin/plugin UI: rename to Workers and simplify config view * plugin worker: queue detection requests instead of capacity reject * Update plugin_worker.go * plugin volume_balance: remove force_move/timeout from worker config UI * plugin erasure_coding: enforce local working dir and cleanup * admin/plugin UI: rename admin settings to job scheduling * admin/plugin UI: persist and robustly render detection results * admin/plugin: record and return detection trace metadata * admin/plugin UI: show detection process and decision trace * plugin: surface detector decision trace as activities * mini: start a plugin worker by default * admin/plugin UI: split monitoring into detection and execution tabs * plugin worker: emit detection decision trace for EC and balance * admin workers UI: split monitoring into detection and execution pages * plugin scheduler: skip proposals for active assigned/running jobs * admin workers UI: add job queue tab * plugin worker: add dummy stress detector and executor job type * admin workers UI: reorder tabs to detection queue execution * admin workers UI: regenerate plugin template * plugin defaults: include dummy stress and add stress tests * plugin dummy stress: rotate detection selections across runs * plugin scheduler: remove cross-run proposal dedupe * plugin queue: track pending scheduled jobs * plugin scheduler: wait for executor capacity before dispatch * plugin scheduler: skip detection when waiting backlog is high * plugin: add disk-backed job detail API and persistence * admin ui: show plugin job detail modal from job id links * plugin: generate unique job ids instead of reusing proposal ids * plugin worker: emit heartbeats on work state changes * plugin registry: round-robin tied executor and detector picks * add temporary EC overnight stress runner * plugin job details: persist and render EC execution plans * ec volume details: color data and parity shard badges * shard labels: keep parity ids numeric and color-only distinction * admin: remove legacy maintenance UI routes and templates * admin: remove dead maintenance endpoint helpers * Update layout_templ.go * remove dummy_stress worker and command support * refactor plugin UI to job-type top tabs and sub-tabs * migrate weed worker command to plugin runtime * remove plugin.worker command and keep worker runtime with metrics * update helm worker args for jobType and execution flags * set plugin scheduling defaults to global 16 and per-worker 4 * stress: fix RPC context reuse and remove redundant variables in ec_stress_runner * admin/plugin: fix lifecycle races, safe channel operations, and terminal state constants * admin/dash: randomize job IDs and fix priority zero-value overwrite in plugin API * admin/handlers: implement buffered rendering to prevent response corruption * admin/plugin: implement debounced persistence flusher and optimize BuildJobDetail memory lookups * admin/plugin: fix priority overwrite and implement bounded wait in scheduler reserve * admin/plugin: implement atomic file writes and fix run record side effects * admin/plugin: use P prefix for parity shard labels in execution plans * admin/plugin: enable parallel execution for cancellation tests * admin: refactor time.Time fields to pointers for better JSON omitempty support * admin/plugin: implement pointer-safe time assignments and comparisons in plugin core * admin/plugin: fix time assignment and sorting logic in plugin monitor after pointer refactor * admin/plugin: update scheduler activity tracking to use time pointers * admin/plugin: fix time-based run history trimming after pointer refactor * admin/dash: fix JobSpec struct literal in plugin API after pointer refactor * admin/view: add D/P prefixes to EC shard badges for UI consistency * admin/plugin: use lifecycle-aware context for schema prefetching * Update ec_volume_details_templ.go * admin/stress: fix proposal sorting and log volume cleanup errors * stress: refine ec stress runner with math/rand and collection name - Added Collection field to VolumeEcShardsDeleteRequest for correct filename construction. - Replaced crypto/rand with seeded math/rand PRNG for bulk payloads. - Added documentation for EcMinAge zero-value behavior. - Added logging for ignored errors in volume/shard deletion. * admin: return internal server error for plugin store failures Changed error status code from 400 Bad Request to 500 Internal Server Error for failures in GetPluginJobDetail to correctly reflect server-side errors. * admin: implement safe channel sends and graceful shutdown sync - Added sync.WaitGroup to Plugin struct to manage background goroutines. - Implemented safeSendCh helper using recover() to prevent panics on closed channels. - Ensured Shutdown() waits for all background operations to complete. * admin: robustify plugin monitor with nil-safe time and record init - Standardized nil-safe assignment for *time.Time pointers (CreatedAt, UpdatedAt, CompletedAt). - Ensured persistJobDetailSnapshot initializes new records correctly if they don't exist on disk. - Fixed debounced persistence to trigger immediate write on job completion. * admin: improve scheduler shutdown behavior and logic guards - Replaced brittle error string matching with explicit r.shutdownCh selection for shutdown detection. - Removed redundant nil guard in buildScheduledJobSpec. - Standardized WaitGroup usage for schedulerLoop. * admin: implement deep copy for job parameters and atomic write fixes - Implemented deepCopyGenericValue and used it in cloneTrackedJob to prevent shared state. - Ensured atomicWriteFile creates parent directories before writing. * admin: remove unreachable branch in shard classification Removed an unreachable 'totalShards <= 0' check in classifyShardID as dataShards and parityShards are already guarded. * admin: secure UI links and use canonical shard constants - Added rel="noopener noreferrer" to external links for security. - Replaced magic number 14 with erasure_coding.TotalShardsCount. - Used renderEcShardBadge for missing shard list consistency. * admin: stabilize plugin tests and fix regressions - Composed a robust plugin_monitor_test.go to handle asynchronous persistence. - Updated all time.Time literals to use timeToPtr helper. - Added explicit Shutdown() calls in tests to synchronize with debounced writes. - Fixed syntax errors and orphaned struct literals in tests. * Potential fix for code scanning alert no. 278: Slice memory allocation with excessive size value Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Potential fix for code scanning alert no. 283: Uncontrolled data used in path expression Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * admin: finalize refinements for error handling, scheduler, and race fixes - Standardized HTTP 500 status codes for store failures in plugin_api.go. - Tracked scheduled detection goroutines with sync.WaitGroup for safe shutdown. - Fixed race condition in safeSendDetectionComplete by extracting channel under lock. - Implemented deep copy for JobActivity details. - Used defaultDirPerm constant in atomicWriteFile. * test(ec): migrate admin dockertest to plugin APIs * admin/plugin_api: fix RunPluginJobTypeAPI to return 500 for server-side detection/filter errors * admin/plugin_api: fix ExecutePluginJobAPI to return 500 for job execution failures * admin/plugin_api: limit parseProtoJSONBody request body to 1MB to prevent unbounded memory usage * admin/plugin: consolidate regex to package-level validJobTypePattern; add char validation to sanitizeJobID * admin/plugin: fix racy Shutdown channel close with sync.Once * admin/plugin: track sendLoop and recv goroutines in WorkerStream with r.wg * admin/plugin: document writeProtoFiles atomicity — .pb is source of truth, .json is human-readable only * admin/plugin: extract activityLess helper to deduplicate nil-safe OccurredAt sort comparators * test/ec: check http.NewRequest errors to prevent nil req panics * test/ec: replace deprecated ioutil/math/rand, fix stale step comment 5.1→3.1 * plugin(ec): raise default detection and scheduling throughput limits * topology: include empty disks in volume list and EC capacity fallback * topology: remove hard 10-task cap for detection planning * Update ec_volume_details_templ.go * adjust default * fix tests --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
533 lines
18 KiB
Go
533 lines
18 KiB
Go
package topology
|
|
|
|
import (
|
|
"reflect"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"testing"
|
|
)
|
|
|
|
func TestRemoveDataCenter(t *testing.T) {
|
|
topo := setup(topologyLayout)
|
|
topo.UnlinkChildNode(NodeId("dc2"))
|
|
if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 15 {
|
|
t.Fail()
|
|
}
|
|
topo.UnlinkChildNode(NodeId("dc3"))
|
|
if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 12 {
|
|
t.Fail()
|
|
}
|
|
}
|
|
|
|
func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
|
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
|
|
|
dc := topo.GetOrCreateDataCenter("dc1")
|
|
rack := dc.GetOrCreateRack("rack1")
|
|
maxVolumeCounts := make(map[string]uint32)
|
|
maxVolumeCounts[""] = 25
|
|
maxVolumeCounts["ssd"] = 12
|
|
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
|
|
|
|
{
|
|
volumeCount := 7
|
|
var volumeMessages []*master_pb.VolumeInformationMessage
|
|
for k := 1; k <= volumeCount; k++ {
|
|
volumeMessage := &master_pb.VolumeInformationMessage{
|
|
Id: uint32(k),
|
|
Size: uint64(25432),
|
|
Collection: "",
|
|
FileCount: uint64(2343),
|
|
DeleteCount: uint64(345),
|
|
DeletedByteCount: 34524,
|
|
ReadOnly: false,
|
|
ReplicaPlacement: uint32(0),
|
|
Version: uint32(needle.GetCurrentVersion()),
|
|
Ttl: 0,
|
|
}
|
|
volumeMessages = append(volumeMessages, volumeMessage)
|
|
}
|
|
|
|
for k := 1; k <= volumeCount; k++ {
|
|
volumeMessage := &master_pb.VolumeInformationMessage{
|
|
Id: uint32(volumeCount + k),
|
|
Size: uint64(25432),
|
|
Collection: "",
|
|
FileCount: uint64(2343),
|
|
DeleteCount: uint64(345),
|
|
DeletedByteCount: 34524,
|
|
ReadOnly: false,
|
|
ReplicaPlacement: uint32(0),
|
|
Version: uint32(needle.GetCurrentVersion()),
|
|
Ttl: 0,
|
|
DiskType: "ssd",
|
|
}
|
|
volumeMessages = append(volumeMessages, volumeMessage)
|
|
}
|
|
|
|
topo.SyncDataNodeRegistration(volumeMessages, dn)
|
|
|
|
usageCounts := topo.diskUsages.usages[types.HardDriveType]
|
|
|
|
assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
|
|
assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
|
|
assert(t, "ssdVolumeCount", int(topo.diskUsages.usages[types.SsdType].volumeCount), volumeCount)
|
|
}
|
|
|
|
{
|
|
volumeCount := 7 - 1
|
|
var volumeMessages []*master_pb.VolumeInformationMessage
|
|
for k := 1; k <= volumeCount; k++ {
|
|
volumeMessage := &master_pb.VolumeInformationMessage{
|
|
Id: uint32(k),
|
|
Size: uint64(254320),
|
|
Collection: "",
|
|
FileCount: uint64(2343),
|
|
DeleteCount: uint64(345),
|
|
DeletedByteCount: 345240,
|
|
ReadOnly: false,
|
|
ReplicaPlacement: uint32(0),
|
|
Version: uint32(needle.GetCurrentVersion()),
|
|
Ttl: 0,
|
|
}
|
|
volumeMessages = append(volumeMessages, volumeMessage)
|
|
}
|
|
topo.SyncDataNodeRegistration(volumeMessages, dn)
|
|
|
|
//rp, _ := storage.NewReplicaPlacementFromString("000")
|
|
//layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
|
|
//assert(t, "writables", len(layout.writables), volumeCount)
|
|
|
|
usageCounts := topo.diskUsages.usages[types.HardDriveType]
|
|
|
|
assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
|
|
assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
|
|
}
|
|
|
|
{
|
|
volumeCount := 6
|
|
newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
|
|
Id: uint32(3),
|
|
Collection: "",
|
|
ReplicaPlacement: uint32(0),
|
|
Version: uint32(needle.GetCurrentVersion()),
|
|
Ttl: 0,
|
|
}
|
|
topo.IncrementalSyncDataNodeRegistration(
|
|
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
|
|
nil,
|
|
dn)
|
|
rp, _ := super_block.NewReplicaPlacementFromString("000")
|
|
layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
|
|
assert(t, "writables after repeated add", len(layout.writables), volumeCount)
|
|
|
|
usageCounts := topo.diskUsages.usages[types.HardDriveType]
|
|
|
|
assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
|
|
assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
|
|
|
|
topo.IncrementalSyncDataNodeRegistration(
|
|
nil,
|
|
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
|
|
dn)
|
|
assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
|
|
assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount-1)
|
|
assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount-1)
|
|
|
|
topo.IncrementalSyncDataNodeRegistration(
|
|
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
|
|
nil,
|
|
dn)
|
|
|
|
for vid := range layout.vid2location {
|
|
println("after add volume id", vid)
|
|
}
|
|
for _, vid := range layout.writables {
|
|
println("after add writable volume id", vid)
|
|
}
|
|
|
|
assert(t, "writables after add back", len(layout.writables), volumeCount)
|
|
|
|
}
|
|
|
|
topo.UnRegisterDataNode(dn)
|
|
|
|
usageCounts := topo.diskUsages.usages[types.HardDriveType]
|
|
|
|
assert(t, "activeVolumeCount2", int(usageCounts.activeVolumeCount), 0)
|
|
|
|
}
|
|
|
|
func TestDataNodeToDataNodeInfo_IncludeEmptyDiskFromUsage(t *testing.T) {
|
|
dn := NewDataNode("node-1")
|
|
dn.Ip = "127.0.0.1"
|
|
dn.Port = 18080
|
|
dn.GrpcPort = 28080
|
|
|
|
// Simulate a node that has slot counters but no mounted volumes yet.
|
|
usage := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
|
|
usage.maxVolumeCount = 8
|
|
|
|
info := dn.ToDataNodeInfo()
|
|
diskInfo, found := info.DiskInfos[""]
|
|
if !found {
|
|
t.Fatalf("expected default disk entry for empty node")
|
|
}
|
|
if diskInfo.MaxVolumeCount != 8 {
|
|
t.Fatalf("unexpected max volume count: got=%d want=8", diskInfo.MaxVolumeCount)
|
|
}
|
|
if len(diskInfo.VolumeInfos) != 0 {
|
|
t.Fatalf("expected no volumes for empty disk, got=%d", len(diskInfo.VolumeInfos))
|
|
}
|
|
}
|
|
|
|
func assert(t *testing.T, message string, actual, expected int) {
|
|
if actual != expected {
|
|
t.Fatalf("unexpected %s: %d, expected: %d", message, actual, expected)
|
|
}
|
|
}
|
|
|
|
func TestAddRemoveVolume(t *testing.T) {
|
|
|
|
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
|
|
|
dc := topo.GetOrCreateDataCenter("dc1")
|
|
rack := dc.GetOrCreateRack("rack1")
|
|
maxVolumeCounts := make(map[string]uint32)
|
|
maxVolumeCounts[""] = 25
|
|
maxVolumeCounts["ssd"] = 12
|
|
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
|
|
|
|
v := storage.VolumeInfo{
|
|
Id: needle.VolumeId(1),
|
|
Size: 100,
|
|
Collection: "xcollection",
|
|
DiskType: "ssd",
|
|
FileCount: 123,
|
|
DeleteCount: 23,
|
|
DeletedByteCount: 45,
|
|
ReadOnly: false,
|
|
Version: needle.GetCurrentVersion(),
|
|
ReplicaPlacement: &super_block.ReplicaPlacement{},
|
|
Ttl: needle.EMPTY_TTL,
|
|
}
|
|
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
topo.RegisterVolumeLayout(v, dn)
|
|
topo.RegisterVolumeLayout(v, dn)
|
|
|
|
if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
|
|
t.Errorf("collection %v should exist", v.Collection)
|
|
}
|
|
|
|
topo.UnRegisterVolumeLayout(v, dn)
|
|
|
|
if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
|
|
t.Errorf("collection %v should not exist", v.Collection)
|
|
}
|
|
}
|
|
|
|
func TestVolumeReadOnlyStatusChange(t *testing.T) {
|
|
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
|
|
|
dc := topo.GetOrCreateDataCenter("dc1")
|
|
rack := dc.GetOrCreateRack("rack1")
|
|
maxVolumeCounts := make(map[string]uint32)
|
|
maxVolumeCounts[""] = 25
|
|
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
|
|
|
|
// Create a writable volume
|
|
v := storage.VolumeInfo{
|
|
Id: needle.VolumeId(1),
|
|
Size: 100,
|
|
Collection: "",
|
|
DiskType: "",
|
|
FileCount: 10,
|
|
DeleteCount: 0,
|
|
DeletedByteCount: 0,
|
|
ReadOnly: false, // Initially writable
|
|
Version: needle.GetCurrentVersion(),
|
|
ReplicaPlacement: &super_block.ReplicaPlacement{},
|
|
Ttl: needle.EMPTY_TTL,
|
|
}
|
|
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
topo.RegisterVolumeLayout(v, dn)
|
|
|
|
// Check initial active count (should be 1 since volume is writable)
|
|
usageCounts := topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "initial activeVolumeCount", int(usageCounts.activeVolumeCount), 1)
|
|
assert(t, "initial remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
|
|
|
|
// Change volume to read-only
|
|
v.ReadOnly = true
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
|
|
// Check active count after marking read-only (should be 0)
|
|
usageCounts = topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "activeVolumeCount after read-only", int(usageCounts.activeVolumeCount), 0)
|
|
|
|
// Change volume back to writable
|
|
v.ReadOnly = false
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
|
|
// Check active count after marking writable again (should be 1)
|
|
usageCounts = topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "activeVolumeCount after writable again", int(usageCounts.activeVolumeCount), 1)
|
|
}
|
|
|
|
func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) {
|
|
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
|
|
|
dc := topo.GetOrCreateDataCenter("dc1")
|
|
rack := dc.GetOrCreateRack("rack1")
|
|
maxVolumeCounts := make(map[string]uint32)
|
|
maxVolumeCounts[""] = 25
|
|
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
|
|
|
|
// Create a writable, local volume
|
|
v := storage.VolumeInfo{
|
|
Id: needle.VolumeId(1),
|
|
Size: 100,
|
|
Collection: "",
|
|
DiskType: "",
|
|
FileCount: 10,
|
|
DeleteCount: 0,
|
|
DeletedByteCount: 0,
|
|
ReadOnly: false, // Initially writable
|
|
RemoteStorageName: "", // Initially local
|
|
Version: needle.GetCurrentVersion(),
|
|
ReplicaPlacement: &super_block.ReplicaPlacement{},
|
|
Ttl: needle.EMPTY_TTL,
|
|
}
|
|
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
topo.RegisterVolumeLayout(v, dn)
|
|
|
|
// Check initial counts
|
|
usageCounts := topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "initial activeVolumeCount", int(usageCounts.activeVolumeCount), 1)
|
|
assert(t, "initial remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
|
|
|
|
// Simultaneously change to read-only AND remote
|
|
v.ReadOnly = true
|
|
v.RemoteStorageName = "s3"
|
|
v.RemoteStorageKey = "key1"
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
|
|
// Check counts after both changes
|
|
usageCounts = topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "activeVolumeCount after read-only+remote", int(usageCounts.activeVolumeCount), 0)
|
|
assert(t, "remoteVolumeCount after read-only+remote", int(usageCounts.remoteVolumeCount), 1)
|
|
|
|
// Change back to writable but keep remote
|
|
v.ReadOnly = false
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
|
|
// Check counts - should be writable (active=1) and still remote
|
|
usageCounts = topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "activeVolumeCount after writable+remote", int(usageCounts.activeVolumeCount), 1)
|
|
assert(t, "remoteVolumeCount after writable+remote", int(usageCounts.remoteVolumeCount), 1)
|
|
|
|
// Change back to local AND read-only simultaneously
|
|
v.ReadOnly = true
|
|
v.RemoteStorageName = ""
|
|
v.RemoteStorageKey = ""
|
|
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
|
|
|
// Check final counts
|
|
usageCounts = topo.diskUsages.usages[types.HardDriveType]
|
|
assert(t, "final activeVolumeCount", int(usageCounts.activeVolumeCount), 0)
|
|
assert(t, "final remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
|
|
}
|
|
|
|
func TestListCollections(t *testing.T) {
|
|
rp, _ := super_block.NewReplicaPlacementFromString("002")
|
|
|
|
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
|
dc := topo.GetOrCreateDataCenter("dc1")
|
|
rack := dc.GetOrCreateRack("rack1")
|
|
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil)
|
|
|
|
topo.RegisterVolumeLayout(storage.VolumeInfo{
|
|
Id: needle.VolumeId(1111),
|
|
ReplicaPlacement: rp,
|
|
}, dn)
|
|
topo.RegisterVolumeLayout(storage.VolumeInfo{
|
|
Id: needle.VolumeId(2222),
|
|
ReplicaPlacement: rp,
|
|
Collection: "vol_collection_a",
|
|
}, dn)
|
|
topo.RegisterVolumeLayout(storage.VolumeInfo{
|
|
Id: needle.VolumeId(3333),
|
|
ReplicaPlacement: rp,
|
|
Collection: "vol_collection_b",
|
|
}, dn)
|
|
|
|
topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{
|
|
VolumeId: needle.VolumeId(4444),
|
|
Collection: "ec_collection_a",
|
|
ShardsInfo: erasure_coding.NewShardsInfo(),
|
|
}, dn)
|
|
topo.RegisterEcShards(&erasure_coding.EcVolumeInfo{
|
|
VolumeId: needle.VolumeId(5555),
|
|
Collection: "ec_collection_b",
|
|
ShardsInfo: erasure_coding.NewShardsInfo(),
|
|
}, dn)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
includeNormalVolumes bool
|
|
includeEcVolumes bool
|
|
want []string
|
|
}{
|
|
{
|
|
name: "no volume types selected",
|
|
includeNormalVolumes: false,
|
|
includeEcVolumes: false,
|
|
want: nil,
|
|
}, {
|
|
name: "normal volumes",
|
|
includeNormalVolumes: true,
|
|
includeEcVolumes: false,
|
|
want: []string{"", "vol_collection_a", "vol_collection_b"},
|
|
}, {
|
|
name: "EC volumes",
|
|
includeNormalVolumes: false,
|
|
includeEcVolumes: true,
|
|
want: []string{"ec_collection_a", "ec_collection_b"},
|
|
}, {
|
|
name: "normal + EC volumes",
|
|
includeNormalVolumes: true,
|
|
includeEcVolumes: true,
|
|
want: []string{"", "ec_collection_a", "ec_collection_b", "vol_collection_a", "vol_collection_b"},
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
got := topo.ListCollections(tc.includeNormalVolumes, tc.includeEcVolumes)
|
|
|
|
if !reflect.DeepEqual(got, tc.want) {
|
|
t.Errorf("got %v, want %v", got, tc.want)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestDataNodeIdBasedIdentification(t *testing.T) {
|
|
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
|
|
dc := topo.GetOrCreateDataCenter("dc1")
|
|
rack := dc.GetOrCreateRack("rack1")
|
|
|
|
maxVolumeCounts := make(map[string]uint32)
|
|
maxVolumeCounts[""] = 10
|
|
|
|
// Test 1: Create a DataNode with explicit id
|
|
dn1 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-1", maxVolumeCounts)
|
|
if string(dn1.Id()) != "node-1" {
|
|
t.Errorf("expected node id 'node-1', got '%s'", dn1.Id())
|
|
}
|
|
if dn1.Ip != "10.0.0.1" {
|
|
t.Errorf("expected ip '10.0.0.1', got '%s'", dn1.Ip)
|
|
}
|
|
|
|
// Test 2: Same id with different IP should return the same DataNode (K8s pod reschedule scenario)
|
|
dn2 := rack.GetOrCreateDataNode("10.0.0.2", 8080, 18080, "10.0.0.2:8080", "node-1", maxVolumeCounts)
|
|
if dn1 != dn2 {
|
|
t.Errorf("expected same DataNode for same id, got different nodes")
|
|
}
|
|
// IP should be updated to the new value
|
|
if dn2.Ip != "10.0.0.2" {
|
|
t.Errorf("expected ip to be updated to '10.0.0.2', got '%s'", dn2.Ip)
|
|
}
|
|
if dn2.PublicUrl != "10.0.0.2:8080" {
|
|
t.Errorf("expected publicUrl to be updated to '10.0.0.2:8080', got '%s'", dn2.PublicUrl)
|
|
}
|
|
|
|
// Test 3: Different id should create a new DataNode
|
|
dn3 := rack.GetOrCreateDataNode("10.0.0.3", 8080, 18080, "10.0.0.3:8080", "node-2", maxVolumeCounts)
|
|
if string(dn3.Id()) != "node-2" {
|
|
t.Errorf("expected node id 'node-2', got '%s'", dn3.Id())
|
|
}
|
|
if dn1 == dn3 {
|
|
t.Errorf("expected different DataNode for different id")
|
|
}
|
|
|
|
// Test 4: Empty id should fall back to ip:port (backward compatibility)
|
|
dn4 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts)
|
|
if string(dn4.Id()) != "10.0.0.4:8080" {
|
|
t.Errorf("expected node id '10.0.0.4:8080' for empty id, got '%s'", dn4.Id())
|
|
}
|
|
|
|
// Test 5: Same ip:port with empty id should return the same DataNode
|
|
dn5 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts)
|
|
if dn4 != dn5 {
|
|
t.Errorf("expected same DataNode for same ip:port with empty id")
|
|
}
|
|
|
|
// Verify we have 3 unique DataNodes total:
|
|
// - node-1 (dn1/dn2 share the same id)
|
|
// - node-2 (dn3)
|
|
// - 10.0.0.4:8080 (dn4/dn5 share the same ip:port)
|
|
children := rack.Children()
|
|
if len(children) != 3 {
|
|
t.Errorf("expected 3 DataNodes, got %d", len(children))
|
|
}
|
|
|
|
// Test 6: Transition from ip:port to explicit id
|
|
// First, the node exists with ip:port as id (dn4/dn5)
|
|
// Now the same volume server starts sending an explicit id
|
|
dn6 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "node-4-explicit", maxVolumeCounts)
|
|
// Should return the same DataNode instance
|
|
if dn6 != dn4 {
|
|
t.Errorf("expected same DataNode instance during transition")
|
|
}
|
|
// But the id should now be updated to the explicit id
|
|
if string(dn6.Id()) != "node-4-explicit" {
|
|
t.Errorf("expected node id to transition to 'node-4-explicit', got '%s'", dn6.Id())
|
|
}
|
|
// The node should be re-keyed in the children map
|
|
if rack.FindDataNodeById("node-4-explicit") != dn6 {
|
|
t.Errorf("expected to find DataNode by new explicit id")
|
|
}
|
|
// Old ip:port key should no longer work
|
|
if rack.FindDataNodeById("10.0.0.4:8080") != nil {
|
|
t.Errorf("expected old ip:port id to be removed from children map")
|
|
}
|
|
|
|
// Still 3 unique DataNodes (node-1, node-2, node-4-explicit)
|
|
children = rack.Children()
|
|
if len(children) != 3 {
|
|
t.Errorf("expected 3 DataNodes after transition, got %d", len(children))
|
|
}
|
|
|
|
// Test 7: Prevent incorrect transition when a new node reuses ip:port of a node with explicit id
|
|
// Scenario: node-1 runs at 10.0.0.1:8080, dies, new node-99 starts at same ip:port
|
|
// The transition should NOT happen because node-1 already has an explicit id
|
|
dn7 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-99", maxVolumeCounts)
|
|
// Should create a NEW DataNode, not reuse node-1
|
|
if dn7 == dn1 {
|
|
t.Errorf("expected new DataNode for node-99, got reused node-1")
|
|
}
|
|
if string(dn7.Id()) != "node-99" {
|
|
t.Errorf("expected node id 'node-99', got '%s'", dn7.Id())
|
|
}
|
|
// node-1 should still exist with its original id
|
|
if rack.FindDataNodeById("node-1") == nil {
|
|
t.Errorf("node-1 should still exist")
|
|
}
|
|
// Now we have 4 DataNodes
|
|
children = rack.Children()
|
|
if len(children) != 4 {
|
|
t.Errorf("expected 4 DataNodes, got %d", len(children))
|
|
}
|
|
}
|