* filer: expose metadata events and list snapshots * mount: invalidate hot directory caches * mount: read hot directories directly from filer * mount: add sequenced metadata cache applier * mount: apply metadata responses through cache applier * mount: replay snapshot-consistent directory builds * mount: dedupe self metadata events * mount: factor directory build cleanup * mount: replace proto marshal dedup with composite key and ring buffer The dedup logic was doing a full deterministic proto.Marshal on every metadata event just to produce a dedup key. Replace with a cheap composite string key (TsNs|Directory|OldName|NewName). Also replace the sliding-window slice (which leaked the backing array unboundedly) with a fixed-size ring buffer that reuses the same array. * filer: remove mutex and proto.Clone from request-scoped MetadataEventSink MetadataEventSink is created per-request and only accessed by the goroutine handling the gRPC call. The mutex and double proto.Clone (once in Record, once in Last) were unnecessary overhead on every filer write operation. Store the pointer directly instead. * mount: skip proto.Clone for caller-owned metadata events Add ApplyMetadataResponseOwned that takes ownership of the response without cloning. Local metadata events (mkdir, create, flush, etc.) are freshly constructed and never shared, so the clone is unnecessary. * filer: only populate MetadataEvent on successful DeleteEntry Avoid calling eventSink.Last() on error paths where the sink may contain a partial event from an intermediate child deletion during recursive deletes. * mount: avoid map allocation in collectDirectoryNotifications Replace the map with a fixed-size array and linear dedup. There are at most 3 directories to notify (old parent, new parent, new child if directory), so a 3-element array avoids the heap allocation on every metadata event. * mount: fix potential deadlock in enqueueApplyRequest Release applyStateMu before the blocking channel send. Previously, if the channel was full (cap 128), the send would block while holding the mutex, preventing Shutdown from acquiring it to set applyClosed. * mount: restore signature-based self-event filtering as fast path Re-add the signature check that was removed when content-based dedup was introduced. Checking signatures is O(1) on a small slice and avoids enqueuing and processing events that originated from this mount instance. The content-based dedup remains as a fallback. * filer: send snapshotTsNs only in first ListEntries response The snapshot timestamp is identical for every entry in a single ListEntries stream. Sending it in every response message wastes wire bandwidth for large directories. The client already reads it only from the first response. * mount: exit read-through mode after successful full directory listing MarkDirectoryRefreshed was defined but never called, so directories that entered read-through mode (hot invalidation threshold) stayed there permanently, hitting the filer on every readdir even when cold. Call it after a complete read-through listing finishes. * mount: include event shape and full paths in dedup key The previous dedup key only used Names, which could collapse distinct rename targets. Include the event shape (C/D/U/R), source directory, new parent path, and both entry names so structurally different events are never treated as duplicates. * mount: drain pending requests on shutdown in runApplyLoop After receiving the shutdown sentinel, drain any remaining requests from applyCh non-blockingly and signal each with errMetaCacheClosed so callers waiting on req.done are released. * mount: include IsDirectory in synthetic delete events metadataDeleteEvent now accepts an isDirectory parameter so the applier can distinguish directory deletes from file deletes. Rmdir passes true, Unlink passes false. * mount: fall back to synthetic event when MetadataEvent is nil In mknod and mkdir, if the filer response omits MetadataEvent (e.g. older filer without the field), synthesize an equivalent local metadata event so the cache is always updated. * mount: make Flush metadata apply best-effort after successful commit After filer_pb.CreateEntryWithResponse succeeds, the entry is persisted. Don't fail the Flush syscall if the local metadata cache apply fails — log and invalidate the directory cache instead. Also fall back to a synthetic event when MetadataEvent is nil. * mount: make Rename metadata apply best-effort The rename has already succeeded on the filer by the time we apply the local metadata event. Log failures instead of returning errors that would be dropped by the caller anyway. * mount: make saveEntry metadata apply best-effort with fallback After UpdateEntryWithResponse succeeds, treat local metadata apply as non-fatal. Log and invalidate the directory cache on failure. Also fall back to a synthetic event when MetadataEvent is nil. * filer_pb: preserve snapshotTsNs on error in ReadDirAllEntriesWithSnapshot Return the snapshot timestamp even when the first page fails, so callers receive the snapshot boundary when partial data was received. * filer: send snapshot token for empty directory listings When no entries are streamed, send a final ListEntriesResponse with only SnapshotTsNs so clients always receive the snapshot boundary. * mount: distinguish not-found vs transient errors in lookupEntry Return fuse.EIO for non-not-found filer errors instead of unconditionally returning ENOENT, so transient failures don't masquerade as missing entries. * mount: make CacheRemoteObject metadata apply best-effort The file content has already been cached successfully. Don't fail the read if the local metadata cache update fails. * mount: use consistent snapshot for readdir in direct mode Capture the SnapshotTsNs from the first loadDirectoryEntriesDirect call and store it on the DirectoryHandle. Subsequent batch loads pass this stored timestamp so all batches use the same snapshot. Also export DoSeaweedListWithSnapshot so mount can use it directly with snapshot passthrough. * filer_pb: fix test fake to send SnapshotTsNs only on first response Match the server behavior: only the first ListEntriesResponse in a page carries the snapshot timestamp, subsequent entries leave it zero. * Fix nil pointer dereference in ListEntries stream consumers Remove the empty-directory snapshot-only response from ListEntries that sent a ListEntriesResponse with Entry==nil, which crashed every raw stream consumer that assumed resp.Entry is always non-nil. Also add defensive nil checks for resp.Entry in all raw ListEntries stream consumers across: S3 listing, broker topic lookup, broker topic config, admin dashboard, topic retention, hybrid message scanner, Kafka integration, and consumer offset storage. * Add nil guards for resp.Entry in remaining ListEntries stream consumers Covers: S3 object lock check, MQ management dashboard (version/ partition/offset loops), and topic retention version loop. * Make applyLocalMetadataEvent best-effort in Link and Symlink The filer operations already succeeded; failing the syscall because the local cache apply failed is wrong. Log a warning and invalidate the parent directory cache instead. * Make applyLocalMetadataEvent best-effort in Mkdir/Rmdir/Mknod/Unlink The filer RPC already committed; don't fail the syscall when the local metadata cache apply fails. Log a warning and invalidate the parent directory cache to force a re-fetch on next access. * flushFileMetadata: add nil-fallback for metadata event and best-effort apply Synthesize a metadata event when resp.GetMetadataEvent() is nil (matching doFlush), and make the apply best-effort with cache invalidation on failure. * Prevent double-invocation of cleanupBuild in doEnsureVisited Add a cleanupDone guard so the deferred cleanup and inline error-path cleanup don't both call DeleteFolderChildren/AbortDirectoryBuild. * Fix comment: signature check is O(n) not O(1) * Prevent deferred cleanup after successful CompleteDirectoryBuild Set cleanupDone before returning from the success path so the deferred context-cancellation check cannot undo a published build. * Invalidate parent directory caches on rename metadata apply failure When applyLocalMetadataEvent fails during rename, invalidate the source and destination parent directory caches so subsequent accesses trigger a re-fetch from the filer. * Add event nil-fallback and cache invalidation to Link and Symlink Synthesize metadata events when the server doesn't return one, and invalidate parent directory caches on apply failure. * Match requested partition when scanning partition directories Parse the partition range format (NNNN-NNNN) and match against the requested partition parameter instead of using the first directory. * Preserve snapshot timestamp across empty directory listings Initialize actualSnapshotTsNs from the caller-requested value so it isn't lost when the server returns no entries. Re-add the server-side snapshot-only response for empty directories (all raw stream consumers now have nil guards for Entry). * Fix CreateEntry error wrapping to support errors.Is/errors.As Use errors.New + %w instead of %v for resp.Error so callers can unwrap the underlying error. * Fix object lock pagination: only advance on non-nil entries Move entriesReceived inside the nil check so nil entries don't cause repeated ListEntries calls with the same lastFileName. * Guard Attributes nil check before accessing Mtime in MQ management * Do not send nil-Entry response for empty directory listings The snapshot-only ListEntriesResponse (with Entry == nil) for empty directories breaks consumers that treat any received response as an entry (Java FilerClient, S3 listing). The Go client-side DoSeaweedListWithSnapshot already preserves the caller-requested snapshot via actualSnapshotTsNs initialization, so the server-side send is unnecessary. * Fix review findings: subscriber dedup, invalidation normalization, nil guards, shutdown race - Remove self-signature early-return in processEventFn so all events flow through the applier (directory-build buffering sees self-originated events that arrive after a snapshot) - Normalize NewParentPath in collectEntryInvalidations to avoid duplicate invalidations when NewParentPath is empty (same-directory update) - Guard resp.Entry.Attributes for nil in admin_server.go and topic_retention.go to prevent panics on entries without attributes - Fix enqueueApplyRequest race with shutdown by using select on both applyCh and applyDone, preventing sends after the apply loop exits - Add cleanupDone check to deferred cleanup in meta_cache_init.go for clarity alongside the existing guard in cleanupBuild - Add empty directory test case for snapshot consistency * Propagate authoritative metadata event from CacheRemoteObjectToLocalCluster and generate client-side snapshot for empty directories - Add metadata_event field to CacheRemoteObjectToLocalClusterResponse proto so the filer-emitted event is available to callers - Use WithMetadataEventSink in the server handler to capture the event from NotifyUpdateEvent and return it on the response - Update filehandle_read.go to prefer the RPC's metadata event over a locally fabricated one, falling back to metadataUpdateEvent when the server doesn't provide one (e.g., older filers) - Generate a client-side snapshot cutoff in DoSeaweedListWithSnapshot when the server sends no snapshot (empty directory), so callers like CompleteDirectoryBuild get a meaningful boundary for filtering buffered events * Skip directory notifications for dirs being built to prevent mid-build cache wipe When a metadata event is buffered during a directory build, applyMetadataSideEffects was still firing noteDirectoryUpdate for the building directory. If the directory accumulated enough updates to become "hot", markDirectoryReadThrough would call DeleteFolderChildren, wiping entries that EnsureVisited had already inserted. The build would then complete and mark the directory cached with incomplete data. Fix by using applyMetadataSideEffectsSkippingBuildingDirs for buffered events, which suppresses directory notifications for dirs currently in buildingDirs while still applying entry invalidations. * Add test for directory notification suppression during active build TestDirectoryNotificationsSuppressedDuringBuild verifies that metadata events targeting a directory under active EnsureVisited build do NOT fire onDirectoryUpdate for that directory. In production, this prevents markDirectoryReadThrough from calling DeleteFolderChildren mid-build, which would wipe entries already inserted by the listing. The test inserts an entry during a build, sends multiple metadata events for the building directory, asserts no notifications fired for it, verifies the entry survives, and confirms buffered events are replayed after CompleteDirectoryBuild. * Fix create invalidations, build guard, event shape, context, and snapshot error path - collectEntryInvalidations: invalidate FUSE kernel cache on pure create events (OldEntry==nil && NewEntry!=nil), not just updates and deletes - completeDirectoryBuildNow: only call markCachedFn when an active build existed (state != nil), preventing an unpopulated directory from being marked as cached - Add metadataCreateEvent helper that produces a create-shaped event (NewEntry only, no OldEntry) and use it in mkdir, mknod, symlink, and hardlink create fallback paths instead of metadataUpdateEvent which incorrectly set both OldEntry and NewEntry - applyMetadataResponseEnqueue: use context.Background() for the queued mutation so a cancelled caller context cannot abort the apply loop mid-write - DoSeaweedListWithSnapshot: move snapshot initialization before ListEntries call so the error path returns the preserved snapshot instead of 0 * Fix review findings: test loop, cache race, context safety, snapshot consistency - Fix build test loop starting at i=1 instead of i=0, missing new-0.txt verification - Re-check IsDirectoryCached after cache miss to avoid ENOENT race with markDirectoryReadThrough - Use context.Background() in enqueueAndWait so caller cancellation can't abort build/complete mid-way - Pass dh.snapshotTsNs in skip-batch loadDirectoryEntriesDirect for snapshot consistency - Prefer resp.MetadataEvent over fallback in Unlink event derivation - Add comment on MetadataEventSink.Record single-event assumption * Fix empty-directory snapshot clock skew and build cancellation race Empty-directory snapshot: Remove client-side time.Now() synthesis when the server returns no entries. Instead return snapshotTsNs=0, and in completeDirectoryBuildNow replay ALL buffered events when snapshot is 0. This eliminates the clock-skew bug where a client ahead of the filer would filter out legitimate post-list events. Build cancellation: Use context.Background() for BeginDirectoryBuild and CompleteDirectoryBuild calls in doEnsureVisited, so errgroup cancellation doesn't cause enqueueAndWait to return early and trigger cleanupBuild while the operation is still queued. * Add tests for empty-directory build replay and cancellation resilience TestEmptyDirectoryBuildReplaysAllBufferedEvents: verifies that when CompleteDirectoryBuild receives snapshotTsNs=0 (empty directory, no server snapshot), ALL buffered events are replayed regardless of their TsNs values — no clock-skew-sensitive filtering occurs. TestBuildCompletionSurvivesCallerCancellation: verifies that once CompleteDirectoryBuild is enqueued, a cancelled caller context does not prevent the build from completing. The apply loop runs with context.Background(), so the directory becomes cached and buffered events are replayed even when the caller gives up waiting. * Fix directory subtree cleanup, Link rollback, test robustness - applyMetadataResponseLocked: when a directory entry is deleted or moved, call DeleteFolderChildren on the old path so cached descendants don't leak as stale entries. - Link: save original HardLinkId/Counter before mutation. If CreateEntryWithResponse fails after the source was already updated, rollback the source entry to its original state via UpdateEntry. - TestBuildCompletionSurvivesCallerCancellation: replace fixed time.Sleep(50ms) with a deadline-based poll that checks IsDirectoryCached in a loop, failing only after 2s timeout. - TestReadDirAllEntriesWithSnapshotEmptyDirectory: assert that ListEntries was actually invoked on the mock client so the test exercises the RPC path. - newMetadataEvent: add early return when both oldEntry and newEntry are nil to avoid emitting events with empty Directory. --------- Co-authored-by: Copilot <copilot@github.com>
1637 lines
51 KiB
Go
1637 lines
51 KiB
Go
package dash
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
|
|
adminplugin "github.com/seaweedfs/seaweedfs/weed/admin/plugin"
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
clustermaintenance "github.com/seaweedfs/seaweedfs/weed/cluster/maintenance"
|
|
"github.com/seaweedfs/seaweedfs/weed/credential"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
|
|
|
_ "github.com/seaweedfs/seaweedfs/weed/credential/grpc" // Register gRPC credential store
|
|
)
|
|
|
|
const (
|
|
defaultCacheTimeout = 10 * time.Second
|
|
defaultFilerCacheTimeout = 30 * time.Second
|
|
defaultStatsCacheTimeout = 30 * time.Second
|
|
)
|
|
|
|
// FilerConfig holds filer configuration needed for bucket operations
|
|
type FilerConfig struct {
|
|
BucketsPath string
|
|
FilerGroup string
|
|
}
|
|
|
|
// getFilerConfig retrieves the filer configuration (buckets path and filer group)
|
|
func (s *AdminServer) getFilerConfig() (*FilerConfig, error) {
|
|
config := &FilerConfig{
|
|
BucketsPath: s3_constants.DefaultBucketsPath,
|
|
FilerGroup: "",
|
|
}
|
|
|
|
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("get filer configuration: %w", err)
|
|
}
|
|
if resp.DirBuckets != "" {
|
|
config.BucketsPath = resp.DirBuckets
|
|
}
|
|
config.FilerGroup = resp.FilerGroup
|
|
return nil
|
|
})
|
|
|
|
return config, err
|
|
}
|
|
|
|
// getCollectionName returns the collection name for a bucket, prefixed with filer group if configured
|
|
func getCollectionName(filerGroup, bucketName string) string {
|
|
if filerGroup != "" {
|
|
return fmt.Sprintf("%s_%s", filerGroup, bucketName)
|
|
}
|
|
return bucketName
|
|
}
|
|
|
|
type AdminServer struct {
|
|
masterClient *wdclient.MasterClient
|
|
templateFS http.FileSystem
|
|
dataDir string
|
|
grpcDialOption grpc.DialOption
|
|
cacheExpiration time.Duration
|
|
lastCacheUpdate time.Time
|
|
cachedTopology *ClusterTopology
|
|
|
|
// Filer discovery and caching
|
|
cachedFilers []string
|
|
lastFilerUpdate time.Time
|
|
filerCacheExpiration time.Duration
|
|
|
|
// Credential management
|
|
credentialManager *credential.CredentialManager
|
|
|
|
// Configuration persistence
|
|
configPersistence *ConfigPersistence
|
|
|
|
// Maintenance system
|
|
maintenanceManager *maintenance.MaintenanceManager
|
|
plugin *adminplugin.Plugin
|
|
pluginLock *AdminLockManager
|
|
adminPresenceLock *adminPresenceLock
|
|
expireJobHandler func(jobID string, reason string) (*adminplugin.TrackedJob, bool, error)
|
|
|
|
// Topic retention purger
|
|
topicRetentionPurger *TopicRetentionPurger
|
|
|
|
// Worker gRPC server
|
|
workerGrpcServer *WorkerGrpcServer
|
|
|
|
// Collection statistics caching
|
|
collectionStatsCache map[string]collectionStats
|
|
lastCollectionStatsUpdate time.Time
|
|
collectionStatsCacheThreshold time.Duration
|
|
|
|
s3TablesManager *s3tables.Manager
|
|
icebergPort int
|
|
}
|
|
|
|
// Type definitions moved to types.go
|
|
|
|
func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string, icebergPort int) *AdminServer {
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.admin")
|
|
|
|
// Create master client with multiple master support
|
|
masterClient := wdclient.NewMasterClient(
|
|
grpcDialOption,
|
|
"", // filerGroup - not needed for admin
|
|
"admin", // clientType
|
|
"", // clientHost - not needed for admin
|
|
"", // dataCenter - not needed for admin
|
|
"", // rack - not needed for admin
|
|
*pb.ServerAddresses(masters).ToServiceDiscovery(),
|
|
)
|
|
|
|
// Start master client connection process (like shell and filer do)
|
|
ctx := context.Background()
|
|
go masterClient.KeepConnectedToMaster(ctx)
|
|
|
|
lockManager := NewAdminLockManager(masterClient, adminLockClientName)
|
|
presenceLock := newAdminPresenceLock(masterClient)
|
|
if presenceLock != nil {
|
|
presenceLock.Start()
|
|
}
|
|
|
|
server := &AdminServer{
|
|
masterClient: masterClient,
|
|
templateFS: templateFS,
|
|
dataDir: dataDir,
|
|
grpcDialOption: grpcDialOption,
|
|
cacheExpiration: defaultCacheTimeout,
|
|
filerCacheExpiration: defaultFilerCacheTimeout,
|
|
configPersistence: NewConfigPersistence(dataDir),
|
|
collectionStatsCacheThreshold: defaultStatsCacheTimeout,
|
|
s3TablesManager: newS3TablesManager(),
|
|
icebergPort: icebergPort,
|
|
pluginLock: lockManager,
|
|
adminPresenceLock: presenceLock,
|
|
}
|
|
|
|
// Initialize topic retention purger
|
|
server.topicRetentionPurger = NewTopicRetentionPurger(server)
|
|
|
|
// Initialize credential manager with defaults
|
|
credentialManager, err := credential.NewCredentialManagerWithDefaults(credential.StoreTypeGrpc)
|
|
if err != nil {
|
|
glog.Warningf("Failed to initialize credential manager: %v", err)
|
|
// Continue without credential manager - will fall back to legacy approach
|
|
} else {
|
|
server.credentialManager = credentialManager
|
|
glog.V(0).Infof("Credential manager initialized with store type: %s", credentialManager.GetStore().GetName())
|
|
|
|
// For stores that need filer address function, configure them
|
|
if store := credentialManager.GetStore(); store != nil {
|
|
if filerFuncSetter, ok := store.(interface {
|
|
SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
|
|
}); ok {
|
|
// Configure the filer address function to dynamically return the current active filer
|
|
// This function will be called each time credentials need to be loaded/saved,
|
|
// so it will automatically use whatever filer is currently available (HA-aware)
|
|
filerFuncSetter.SetFilerAddressFunc(func() pb.ServerAddress {
|
|
return pb.ServerAddress(server.GetFilerAddress())
|
|
}, server.grpcDialOption)
|
|
glog.V(0).Infof("Credential store configured with dynamic filer address function")
|
|
} else {
|
|
glog.V(0).Infof("Credential store %s does not support filer address function", store.GetName())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize maintenance system - always initialize even without persistent storage
|
|
var maintenanceConfig *maintenance.MaintenanceConfig
|
|
if server.configPersistence.IsConfigured() {
|
|
var err error
|
|
maintenanceConfig, err = server.configPersistence.LoadMaintenanceConfig()
|
|
if err != nil {
|
|
glog.Errorf("Failed to load maintenance configuration: %v", err)
|
|
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
|
|
}
|
|
|
|
// Apply new defaults to handle schema changes (like enabling by default)
|
|
schema := maintenance.GetMaintenanceConfigSchema()
|
|
if err := schema.ApplyDefaultsToProtobuf(maintenanceConfig); err != nil {
|
|
glog.Warningf("Failed to apply schema defaults to loaded config: %v", err)
|
|
}
|
|
|
|
// Force enable maintenance system for new default behavior
|
|
// This handles the case where old configs had Enabled=false as default
|
|
if !maintenanceConfig.Enabled {
|
|
glog.V(1).Infof("Enabling maintenance system (new default behavior)")
|
|
maintenanceConfig.Enabled = true
|
|
}
|
|
|
|
glog.V(1).Infof("Maintenance system initialized with persistent configuration (enabled: %v)", maintenanceConfig.Enabled)
|
|
} else {
|
|
maintenanceConfig = maintenance.DefaultMaintenanceConfig()
|
|
glog.V(1).Infof("No data directory configured, maintenance system will run in memory-only mode (enabled: %v)", maintenanceConfig.Enabled)
|
|
}
|
|
|
|
// Always initialize maintenance manager
|
|
server.InitMaintenanceManager(maintenanceConfig)
|
|
|
|
// Load saved task configurations from persistence
|
|
server.loadTaskConfigurationsFromPersistence()
|
|
|
|
// Start maintenance manager if enabled
|
|
if maintenanceConfig.Enabled {
|
|
go func() {
|
|
// Give master client a bit of time to connect before starting scans
|
|
time.Sleep(2 * time.Second)
|
|
if err := server.StartMaintenanceManager(); err != nil {
|
|
glog.Errorf("Failed to start maintenance manager: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
pluginOpts := adminplugin.Options{
|
|
DataDir: dataDir,
|
|
ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) {
|
|
return server.buildDefaultPluginClusterContext(), nil
|
|
},
|
|
LockManager: lockManager,
|
|
ConfigDefaultsProvider: server.enrichConfigDefaults,
|
|
}
|
|
plugin, err := adminplugin.New(pluginOpts)
|
|
if err != nil && dataDir != "" {
|
|
glog.Warningf("Failed to initialize plugin with dataDir=%q: %v. Falling back to in-memory plugin state.", dataDir, err)
|
|
pluginOpts.DataDir = ""
|
|
plugin, err = adminplugin.New(pluginOpts)
|
|
}
|
|
if err != nil {
|
|
glog.Errorf("Failed to initialize plugin: %v", err)
|
|
} else {
|
|
server.plugin = plugin
|
|
glog.V(0).Infof("Plugin enabled")
|
|
}
|
|
|
|
return server
|
|
}
|
|
|
|
// loadTaskConfigurationsFromPersistence loads saved task configurations from protobuf files
|
|
func (s *AdminServer) loadTaskConfigurationsFromPersistence() {
|
|
if s.configPersistence == nil || !s.configPersistence.IsConfigured() {
|
|
glog.V(1).Infof("Config persistence not available, using default task configurations")
|
|
return
|
|
}
|
|
|
|
// Load task configurations dynamically using the config update registry
|
|
configUpdateRegistry := tasks.GetGlobalConfigUpdateRegistry()
|
|
configUpdateRegistry.UpdateAllConfigs(s.configPersistence)
|
|
}
|
|
|
|
// enrichConfigDefaults is called by the plugin when bootstrapping a job type's
|
|
// default config from its descriptor. For admin_script, it fetches maintenance
|
|
// scripts from the master and uses them as the script default.
|
|
//
|
|
// MIGRATION: This exists to help users migrate from master.toml [master.maintenance]
|
|
// to the admin script plugin worker. Remove after March 2027.
|
|
func (s *AdminServer) enrichConfigDefaults(cfg *plugin_pb.PersistedJobTypeConfig) *plugin_pb.PersistedJobTypeConfig {
|
|
if cfg.JobType != "admin_script" {
|
|
return cfg
|
|
}
|
|
|
|
var maintenanceScripts string
|
|
var sleepMinutes uint32
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maintenanceScripts = resp.MaintenanceScripts
|
|
sleepMinutes = resp.MaintenanceSleepMinutes
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
glog.V(1).Infof("Could not fetch master configuration for admin_script defaults: %v", err)
|
|
return cfg
|
|
}
|
|
|
|
script := cleanMaintenanceScript(maintenanceScripts)
|
|
if script == "" {
|
|
return cfg
|
|
}
|
|
|
|
interval := int64(sleepMinutes)
|
|
if interval <= 0 {
|
|
interval = clustermaintenance.DefaultMaintenanceSleepMinutes
|
|
}
|
|
|
|
glog.V(0).Infof("Enriching admin_script defaults from master maintenance scripts (interval=%dm)", interval)
|
|
|
|
if cfg.AdminConfigValues == nil {
|
|
cfg.AdminConfigValues = make(map[string]*plugin_pb.ConfigValue)
|
|
}
|
|
cfg.AdminConfigValues["script"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: script},
|
|
}
|
|
cfg.AdminConfigValues["run_interval_minutes"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: interval},
|
|
}
|
|
cfg.UpdatedBy = "master_migration"
|
|
|
|
return cfg
|
|
}
|
|
|
|
// cleanMaintenanceScript strips lock/unlock commands and normalizes a
|
|
// maintenance script string for use with the admin script plugin worker.
|
|
//
|
|
// MIGRATION: Used by enrichConfigDefaults. Remove after March 2027.
|
|
func cleanMaintenanceScript(script string) string {
|
|
script = strings.ReplaceAll(script, "\r\n", "\n")
|
|
var lines []string
|
|
for _, line := range strings.Split(script, "\n") {
|
|
trimmed := strings.TrimSpace(line)
|
|
if trimmed == "" || strings.HasPrefix(trimmed, "#") {
|
|
continue
|
|
}
|
|
// Strip inline comments (e.g., "lock # migration note")
|
|
if idx := strings.Index(trimmed, "#"); idx >= 0 {
|
|
trimmed = strings.TrimSpace(trimmed[:idx])
|
|
if trimmed == "" {
|
|
continue
|
|
}
|
|
}
|
|
firstToken := strings.ToLower(strings.Fields(trimmed)[0])
|
|
if firstToken == "lock" || firstToken == "unlock" {
|
|
continue
|
|
}
|
|
lines = append(lines, trimmed)
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
}
|
|
|
|
// GetCredentialManager returns the credential manager
|
|
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
|
|
return s.credentialManager
|
|
}
|
|
|
|
// Filer discovery methods moved to client_management.go
|
|
|
|
// Client management methods moved to client_management.go
|
|
|
|
// WithFilerClient and WithVolumeServerClient methods moved to client_management.go
|
|
|
|
// Cluster topology methods moved to cluster_topology.go
|
|
|
|
// getTopologyViaGRPC method moved to cluster_topology.go
|
|
|
|
// InvalidateCache method moved to cluster_topology.go
|
|
|
|
// GetS3BucketsData retrieves all Object Store buckets and aggregates total storage metrics
|
|
func (s *AdminServer) GetS3BucketsData() (S3BucketsData, error) {
|
|
buckets, err := s.GetS3Buckets()
|
|
if err != nil {
|
|
return S3BucketsData{}, err
|
|
}
|
|
|
|
var totalSize int64
|
|
for _, bucket := range buckets {
|
|
totalSize += bucket.PhysicalSize
|
|
}
|
|
|
|
return S3BucketsData{
|
|
Buckets: buckets,
|
|
TotalBuckets: len(buckets),
|
|
TotalSize: totalSize,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetS3Buckets retrieves all Object Store buckets from the filer and collects size/object data from collections
|
|
func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
|
|
var buckets []S3Bucket
|
|
|
|
// Collect volume information by collection with caching
|
|
collectionMap, _ := s.getCollectionStats()
|
|
|
|
// Get filer configuration (buckets path and filer group)
|
|
filerConfig, err := s.getFilerConfig()
|
|
if err != nil {
|
|
glog.Warningf("Failed to get filer configuration, using defaults: %v", err)
|
|
}
|
|
|
|
// Now list buckets from the filer and match with collection data
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// List buckets by looking at the buckets directory
|
|
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
|
Directory: filerConfig.BucketsPath,
|
|
Prefix: "",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if err.Error() == "EOF" {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
|
|
if resp.Entry != nil && resp.Entry.IsDirectory {
|
|
bucketName := resp.Entry.Name
|
|
if strings.HasPrefix(bucketName, ".") {
|
|
// Skip internal/system directories from Object Store bucket listing.
|
|
continue
|
|
}
|
|
if s3tables.IsTableBucketEntry(resp.Entry) || strings.HasSuffix(bucketName, "--table-s3") {
|
|
// Keep table buckets in the S3 Tables pages, not regular Object Store buckets.
|
|
continue
|
|
}
|
|
|
|
// Determine collection name for this bucket
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
|
|
|
|
// Get size and object count from collection data
|
|
var physicalSize int64
|
|
var logicalSize int64
|
|
var objectCount int64
|
|
if collectionData, exists := collectionMap[collectionName]; exists {
|
|
physicalSize = collectionData.PhysicalSize
|
|
logicalSize = collectionData.LogicalSize
|
|
objectCount = collectionData.FileCount
|
|
}
|
|
|
|
// Get quota information from entry
|
|
quota := resp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
|
|
// Get versioning, object lock, and owner information from extended attributes
|
|
versioningStatus := ""
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
var owner string
|
|
|
|
if resp.Entry.Extended != nil {
|
|
// Use shared utility to extract versioning information
|
|
versioningStatus = extractVersioningFromEntry(resp.Entry)
|
|
|
|
// Use shared utility to extract Object Lock information
|
|
objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(resp.Entry)
|
|
|
|
// Extract owner information
|
|
if ownerBytes, ok := resp.Entry.Extended[s3_constants.AmzIdentityId]; ok {
|
|
owner = string(ownerBytes)
|
|
}
|
|
}
|
|
|
|
var createdAt, lastModified time.Time
|
|
if resp.Entry.Attributes != nil {
|
|
createdAt = time.Unix(resp.Entry.Attributes.Crtime, 0)
|
|
lastModified = time.Unix(resp.Entry.Attributes.Mtime, 0)
|
|
}
|
|
bucket := S3Bucket{
|
|
Name: bucketName,
|
|
CreatedAt: createdAt,
|
|
LogicalSize: logicalSize,
|
|
PhysicalSize: physicalSize,
|
|
ObjectCount: objectCount,
|
|
LastModified: lastModified,
|
|
Quota: quota,
|
|
QuotaEnabled: quotaEnabled,
|
|
VersioningStatus: versioningStatus,
|
|
ObjectLockEnabled: objectLockEnabled,
|
|
ObjectLockMode: objectLockMode,
|
|
ObjectLockDuration: objectLockDuration,
|
|
Owner: owner,
|
|
}
|
|
buckets = append(buckets, bucket)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list Object Store buckets: %w", err)
|
|
}
|
|
|
|
return buckets, nil
|
|
}
|
|
|
|
// GetBucketDetails retrieves detailed information about a specific bucket
|
|
// Note: This no longer lists objects for performance reasons. Use GetS3Buckets for size/count data.
|
|
func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) {
|
|
// Get filer configuration (buckets path)
|
|
filerConfig, err := s.getFilerConfig()
|
|
if err != nil {
|
|
glog.Warningf("Failed to get filer configuration, using defaults: %v", err)
|
|
}
|
|
|
|
details := &BucketDetails{
|
|
Bucket: S3Bucket{
|
|
Name: bucketName,
|
|
},
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
// Get collection data for size and object count with caching
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
|
|
stats, err := s.getCollectionStats()
|
|
if err != nil {
|
|
glog.Warningf("Failed to get collection data: %v", err)
|
|
// Continue without collection data - use zero values
|
|
} else if data, ok := stats[collectionName]; ok {
|
|
details.Bucket.LogicalSize = data.LogicalSize
|
|
details.Bucket.PhysicalSize = data.PhysicalSize
|
|
details.Bucket.ObjectCount = data.FileCount
|
|
}
|
|
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
// Get bucket info
|
|
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: filerConfig.BucketsPath,
|
|
Name: bucketName,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("bucket not found: %w", err)
|
|
}
|
|
|
|
details.Bucket.CreatedAt = time.Unix(bucketResp.Entry.Attributes.Crtime, 0)
|
|
details.Bucket.LastModified = time.Unix(bucketResp.Entry.Attributes.Mtime, 0)
|
|
|
|
// Get quota information from entry
|
|
quota := bucketResp.Entry.Quota
|
|
quotaEnabled := quota > 0
|
|
if quota < 0 {
|
|
// Negative quota means disabled
|
|
quota = -quota
|
|
quotaEnabled = false
|
|
}
|
|
details.Bucket.Quota = quota
|
|
details.Bucket.QuotaEnabled = quotaEnabled
|
|
|
|
// Get versioning, object lock, and owner information from extended attributes
|
|
versioningStatus := ""
|
|
objectLockEnabled := false
|
|
objectLockMode := ""
|
|
var objectLockDuration int32 = 0
|
|
var owner string
|
|
|
|
if bucketResp.Entry.Extended != nil {
|
|
// Use shared utility to extract versioning information
|
|
versioningStatus = extractVersioningFromEntry(bucketResp.Entry)
|
|
|
|
// Use shared utility to extract Object Lock information
|
|
objectLockEnabled, objectLockMode, objectLockDuration = extractObjectLockInfoFromEntry(bucketResp.Entry)
|
|
|
|
// Extract owner information
|
|
if ownerBytes, ok := bucketResp.Entry.Extended[s3_constants.AmzIdentityId]; ok {
|
|
owner = string(ownerBytes)
|
|
}
|
|
}
|
|
|
|
details.Bucket.VersioningStatus = versioningStatus
|
|
details.Bucket.ObjectLockEnabled = objectLockEnabled
|
|
details.Bucket.ObjectLockMode = objectLockMode
|
|
details.Bucket.ObjectLockDuration = objectLockDuration
|
|
details.Bucket.Owner = owner
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return details, nil
|
|
}
|
|
|
|
// CreateS3Bucket creates a new S3 bucket
|
|
func (s *AdminServer) CreateS3Bucket(bucketName string) error {
|
|
return s.CreateS3BucketWithQuota(bucketName, 0, false)
|
|
}
|
|
|
|
// DeleteS3Bucket deletes an S3 bucket and all its contents
|
|
func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
|
|
ctx := context.Background()
|
|
|
|
// Get filer configuration (buckets path and filer group)
|
|
filerConfig, err := s.getFilerConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get filer configuration: %w", err)
|
|
}
|
|
|
|
// Check if bucket has Object Lock enabled and if there are locked objects
|
|
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
return s3api.CheckBucketForLockedObjects(ctx, client, filerConfig.BucketsPath, bucketName)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the collection first (same as s3.bucket.delete shell command)
|
|
// This ensures volume data is cleaned up properly
|
|
// Collection name must be prefixed with filer group if configured
|
|
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
|
|
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
_, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
|
|
Name: collectionName,
|
|
})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete collection %s: %w", collectionName, err)
|
|
}
|
|
|
|
// Then delete bucket directory recursively from filer
|
|
// Use same parameters as s3.bucket.delete shell command and S3 API
|
|
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
|
|
Directory: filerConfig.BucketsPath,
|
|
Name: bucketName,
|
|
IsDeleteData: false, // Collection already deleted, just remove metadata
|
|
IsRecursive: true,
|
|
IgnoreRecursiveError: true, // Same as S3 API and shell command
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete bucket: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetObjectStoreUsers retrieves object store users from identity.json
|
|
func (s *AdminServer) GetObjectStoreUsers(ctx context.Context) ([]ObjectStoreUser, error) {
|
|
if s.credentialManager == nil {
|
|
return []ObjectStoreUser{}, nil
|
|
}
|
|
|
|
s3cfg, err := s.credentialManager.LoadConfiguration(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load IAM configuration: %w", err)
|
|
}
|
|
|
|
var users []ObjectStoreUser
|
|
|
|
// Convert IAM identities to ObjectStoreUser format
|
|
for _, identity := range s3cfg.Identities {
|
|
// Skip anonymous identity
|
|
if identity.Name == "anonymous" {
|
|
continue
|
|
}
|
|
|
|
// Skip service accounts - they should not be parent users
|
|
if strings.HasPrefix(identity.Name, serviceAccountPrefix) {
|
|
continue
|
|
}
|
|
|
|
user := ObjectStoreUser{
|
|
Username: identity.Name,
|
|
Permissions: identity.Actions,
|
|
}
|
|
|
|
// Set email from account if available
|
|
if identity.Account != nil {
|
|
user.Email = identity.Account.EmailAddress
|
|
}
|
|
|
|
// Get first access key for display
|
|
if len(identity.Credentials) > 0 {
|
|
user.AccessKey = identity.Credentials[0].AccessKey
|
|
user.SecretKey = identity.Credentials[0].SecretKey
|
|
}
|
|
|
|
users = append(users, user)
|
|
}
|
|
|
|
return users, nil
|
|
}
|
|
|
|
// Volume server methods moved to volume_management.go
|
|
|
|
// Volume methods moved to volume_management.go
|
|
|
|
// sortVolumes method moved to volume_management.go
|
|
|
|
// GetClusterCollections method moved to collection_management.go
|
|
|
|
// GetClusterMasters retrieves cluster masters data
|
|
func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
|
|
var masters []MasterInfo
|
|
var leaderCount int
|
|
|
|
// First, get master information from topology
|
|
topology, err := s.GetClusterTopology()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a map to merge topology and raft data
|
|
masterMap := make(map[string]*MasterInfo)
|
|
|
|
// Add masters from topology
|
|
for _, master := range topology.Masters {
|
|
masterInfo := &MasterInfo{
|
|
Address: pb.ServerAddress(master.Address).ToHttpAddress(),
|
|
IsLeader: master.IsLeader,
|
|
Suffrage: "",
|
|
}
|
|
|
|
if master.IsLeader {
|
|
leaderCount++
|
|
}
|
|
|
|
masterMap[master.Address] = masterInfo
|
|
}
|
|
|
|
// Then, get additional master information from Raft cluster
|
|
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each raft server
|
|
for _, server := range resp.ClusterServers {
|
|
address := server.Address
|
|
httpAddress := pb.ServerAddress(address).ToHttpAddress()
|
|
|
|
// Update existing master info or create new one
|
|
if masterInfo, exists := masterMap[address]; exists {
|
|
// Update existing master with raft data
|
|
masterInfo.IsLeader = server.IsLeader
|
|
masterInfo.Suffrage = server.Suffrage
|
|
} else {
|
|
// Create new master info from raft data
|
|
masterInfo := &MasterInfo{
|
|
Address: httpAddress,
|
|
IsLeader: server.IsLeader,
|
|
Suffrage: server.Suffrage,
|
|
}
|
|
masterMap[address] = masterInfo
|
|
}
|
|
|
|
if server.IsLeader {
|
|
// Update leader count based on raft data
|
|
leaderCount = 1 // There should only be one leader
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
// If gRPC call fails, log the error but continue with topology data
|
|
currentMaster := s.masterClient.GetMaster(context.Background())
|
|
glog.Errorf("Failed to get raft cluster servers from master %s: %v", currentMaster, err)
|
|
}
|
|
|
|
// Convert map to slice
|
|
for _, masterInfo := range masterMap {
|
|
masters = append(masters, *masterInfo)
|
|
}
|
|
|
|
// Sort masters by address for consistent ordering on page refresh
|
|
sort.Slice(masters, func(i, j int) bool {
|
|
return masters[i].Address < masters[j].Address
|
|
})
|
|
|
|
// If no masters found at all, add the current master as fallback
|
|
if len(masters) == 0 {
|
|
currentMaster := s.masterClient.GetMaster(context.Background())
|
|
if currentMaster != "" {
|
|
masters = append(masters, MasterInfo{
|
|
Address: pb.ServerAddress(currentMaster).ToHttpAddress(),
|
|
IsLeader: true,
|
|
Suffrage: "Voter",
|
|
})
|
|
leaderCount = 1
|
|
}
|
|
}
|
|
|
|
return &ClusterMastersData{
|
|
Masters: masters,
|
|
TotalMasters: len(masters),
|
|
LeaderCount: leaderCount,
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterFilers retrieves cluster filers data
|
|
func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) {
|
|
var filers []FilerInfo
|
|
|
|
// Get filer information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.FilerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each filer node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
filerInfo := FilerInfo{
|
|
Address: pb.ServerAddress(node.Address).ToHttpAddress(),
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
filers = append(filers, filerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get filer nodes from master: %w", err)
|
|
}
|
|
|
|
// Sort filers by address for consistent ordering on page refresh
|
|
sort.Slice(filers, func(i, j int) bool {
|
|
return filers[i].Address < filers[j].Address
|
|
})
|
|
|
|
return &ClusterFilersData{
|
|
Filers: filers,
|
|
TotalFilers: len(filers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetClusterBrokers retrieves cluster message brokers data
|
|
func (s *AdminServer) GetClusterBrokers() (*ClusterBrokersData, error) {
|
|
var brokers []MessageBrokerInfo
|
|
|
|
// Get broker information from master using ListClusterNodes
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Process each broker node
|
|
for _, node := range resp.ClusterNodes {
|
|
createdAt := time.Unix(0, node.CreatedAtNs)
|
|
|
|
brokerInfo := MessageBrokerInfo{
|
|
Address: pb.ServerAddress(node.Address).ToHttpAddress(),
|
|
DataCenter: node.DataCenter,
|
|
Rack: node.Rack,
|
|
Version: node.Version,
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
brokers = append(brokers, brokerInfo)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get broker nodes from master: %w", err)
|
|
}
|
|
|
|
// Sort brokers by address for consistent ordering on page refresh
|
|
sort.Slice(brokers, func(i, j int) bool {
|
|
return brokers[i].Address < brokers[j].Address
|
|
})
|
|
|
|
return &ClusterBrokersData{
|
|
Brokers: brokers,
|
|
TotalBrokers: len(brokers),
|
|
LastUpdated: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// GetAllFilers method moved to client_management.go
|
|
|
|
// GetVolumeDetails method moved to volume_management.go
|
|
|
|
// VacuumVolume method moved to volume_management.go
|
|
|
|
// TriggerTopicRetentionPurgeAPI triggers topic retention purge via HTTP API
|
|
func (as *AdminServer) TriggerTopicRetentionPurgeAPI(w http.ResponseWriter, r *http.Request) {
|
|
err := as.TriggerTopicRetentionPurge()
|
|
if err != nil {
|
|
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{"message": "Topic retention purge triggered successfully"})
|
|
}
|
|
|
|
// GetConfigInfo returns information about the admin configuration
|
|
func (as *AdminServer) GetConfigInfo(w http.ResponseWriter, r *http.Request) {
|
|
configInfo := as.configPersistence.GetConfigInfo()
|
|
|
|
// Add additional admin server info
|
|
currentMaster := as.masterClient.GetMaster(context.Background())
|
|
configInfo["master_address"] = string(currentMaster)
|
|
configInfo["cache_expiration"] = as.cacheExpiration.String()
|
|
configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()
|
|
|
|
// Add maintenance system info
|
|
if as.maintenanceManager != nil {
|
|
configInfo["maintenance_enabled"] = true
|
|
configInfo["maintenance_running"] = as.maintenanceManager.IsRunning()
|
|
} else {
|
|
configInfo["maintenance_enabled"] = false
|
|
configInfo["maintenance_running"] = false
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{
|
|
"config_info": configInfo,
|
|
"title": "Configuration Information",
|
|
})
|
|
}
|
|
|
|
// StartWorkerGrpcServer starts the worker gRPC server
|
|
func (s *AdminServer) StartWorkerGrpcServer(grpcPort int) error {
|
|
if s.workerGrpcServer != nil {
|
|
return fmt.Errorf("worker gRPC server is already running")
|
|
}
|
|
|
|
s.workerGrpcServer = NewWorkerGrpcServer(s)
|
|
return s.workerGrpcServer.StartWithTLS(grpcPort)
|
|
}
|
|
|
|
// StopWorkerGrpcServer stops the worker gRPC server
|
|
func (s *AdminServer) StopWorkerGrpcServer() error {
|
|
if s.workerGrpcServer != nil {
|
|
err := s.workerGrpcServer.Stop()
|
|
s.workerGrpcServer = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetWorkerGrpcServer returns the worker gRPC server
|
|
func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
|
|
return s.workerGrpcServer
|
|
}
|
|
|
|
// GetWorkerGrpcPort returns the worker gRPC listen port, or 0 when unavailable.
|
|
func (s *AdminServer) GetWorkerGrpcPort() int {
|
|
if s.workerGrpcServer == nil {
|
|
return 0
|
|
}
|
|
return s.workerGrpcServer.ListenPort()
|
|
}
|
|
|
|
// GetPlugin returns the plugin instance when enabled.
|
|
func (s *AdminServer) GetPlugin() *adminplugin.Plugin {
|
|
return s.plugin
|
|
}
|
|
|
|
func (s *AdminServer) acquirePluginLock(reason string) (func(), error) {
|
|
if s == nil || s.pluginLock == nil {
|
|
return func() {}, nil
|
|
}
|
|
return s.pluginLock.Acquire(reason)
|
|
}
|
|
|
|
// RequestPluginJobTypeDescriptor asks one worker for job type schema and returns the descriptor.
|
|
func (s *AdminServer) RequestPluginJobTypeDescriptor(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.RequestConfigSchema(ctx, jobType, forceRefresh)
|
|
}
|
|
|
|
// LoadPluginJobTypeDescriptor loads persisted descriptor for one job type.
|
|
func (s *AdminServer) LoadPluginJobTypeDescriptor(jobType string) (*plugin_pb.JobTypeDescriptor, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.LoadDescriptor(jobType)
|
|
}
|
|
|
|
// SavePluginJobTypeConfig persists plugin job type config in admin data dir.
|
|
func (s *AdminServer) SavePluginJobTypeConfig(config *plugin_pb.PersistedJobTypeConfig) error {
|
|
if s.plugin == nil {
|
|
return fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.SaveJobTypeConfig(config)
|
|
}
|
|
|
|
// LoadPluginJobTypeConfig loads plugin job type config from persistence.
|
|
func (s *AdminServer) LoadPluginJobTypeConfig(jobType string) (*plugin_pb.PersistedJobTypeConfig, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.LoadJobTypeConfig(jobType)
|
|
}
|
|
|
|
// RunPluginDetection triggers one detection pass for a job type and returns proposed jobs.
|
|
func (s *AdminServer) RunPluginDetection(
|
|
ctx context.Context,
|
|
jobType string,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
maxResults int32,
|
|
) ([]*plugin_pb.JobProposal, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
return s.plugin.RunDetection(ctx, jobType, clusterContext, maxResults)
|
|
}
|
|
|
|
// FilterPluginProposalsWithActiveJobs drops proposals already represented by assigned/running jobs.
|
|
func (s *AdminServer) FilterPluginProposalsWithActiveJobs(
|
|
jobType string,
|
|
proposals []*plugin_pb.JobProposal,
|
|
) ([]*plugin_pb.JobProposal, int, error) {
|
|
if s.plugin == nil {
|
|
return nil, 0, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
filtered, skipped := s.plugin.FilterProposalsWithActiveJobs(jobType, proposals)
|
|
return filtered, skipped, nil
|
|
}
|
|
|
|
// RunPluginDetectionWithReport triggers one detection pass and returns request metadata and proposals.
|
|
func (s *AdminServer) RunPluginDetectionWithReport(
|
|
ctx context.Context,
|
|
jobType string,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
maxResults int32,
|
|
) (*adminplugin.DetectionReport, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
|
|
}
|
|
|
|
// ExecutePluginJob dispatches one job to a capable worker and waits for completion.
|
|
func (s *AdminServer) ExecutePluginJob(
|
|
ctx context.Context,
|
|
job *plugin_pb.JobSpec,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
attempt int32,
|
|
) (*plugin_pb.JobCompleted, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
jobType := ""
|
|
if job != nil {
|
|
jobType = strings.TrimSpace(job.JobType)
|
|
}
|
|
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin execution %s", jobType))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
return s.plugin.ExecuteJob(ctx, job, clusterContext, attempt)
|
|
}
|
|
|
|
// GetPluginRunHistory returns the bounded run history (last 10 success + last 10 error).
|
|
func (s *AdminServer) GetPluginRunHistory(jobType string) (*adminplugin.JobTypeRunHistory, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.LoadRunHistory(jobType)
|
|
}
|
|
|
|
// ListPluginJobTypes returns known plugin job types from connected worker registry and persisted data.
|
|
func (s *AdminServer) ListPluginJobTypes() ([]adminplugin.JobTypeInfo, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.ListKnownJobTypes()
|
|
}
|
|
|
|
// GetPluginWorkers returns currently connected plugin workers.
|
|
func (s *AdminServer) GetPluginWorkers() []*adminplugin.WorkerSession {
|
|
if s.plugin == nil {
|
|
return nil
|
|
}
|
|
return s.plugin.ListWorkers()
|
|
}
|
|
|
|
// ListPluginJobs returns tracked plugin jobs for monitoring.
|
|
func (s *AdminServer) ListPluginJobs(jobType, state string, limit int) []adminplugin.TrackedJob {
|
|
if s.plugin == nil {
|
|
return nil
|
|
}
|
|
return s.plugin.ListTrackedJobs(jobType, state, limit)
|
|
}
|
|
|
|
// GetPluginJob returns one tracked plugin job by ID.
|
|
func (s *AdminServer) GetPluginJob(jobID string) (*adminplugin.TrackedJob, bool) {
|
|
if s.plugin == nil {
|
|
return nil, false
|
|
}
|
|
return s.plugin.GetTrackedJob(jobID)
|
|
}
|
|
|
|
// GetPluginJobDetail returns detailed plugin job information with activity timeline.
|
|
func (s *AdminServer) GetPluginJobDetail(jobID string, activityLimit, relatedLimit int) (*adminplugin.JobDetail, bool, error) {
|
|
if s.plugin == nil {
|
|
return nil, false, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.BuildJobDetail(jobID, activityLimit, relatedLimit)
|
|
}
|
|
|
|
// ExpirePluginJob marks an active plugin job as failed so it no longer blocks scheduling.
|
|
func (s *AdminServer) ExpirePluginJob(jobID, reason string) (*adminplugin.TrackedJob, bool, error) {
|
|
if handler := s.expireJobHandler; handler != nil {
|
|
return handler(jobID, reason)
|
|
}
|
|
if s.plugin == nil {
|
|
return nil, false, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.ExpireJob(jobID, reason)
|
|
}
|
|
|
|
// ListPluginActivities returns plugin job activities for monitoring.
|
|
func (s *AdminServer) ListPluginActivities(jobType string, limit int) []adminplugin.JobActivity {
|
|
if s.plugin == nil {
|
|
return nil
|
|
}
|
|
return s.plugin.ListActivities(jobType, limit)
|
|
}
|
|
|
|
// ListPluginSchedulerStates returns per-job-type scheduler state.
|
|
func (s *AdminServer) ListPluginSchedulerStates() ([]adminplugin.SchedulerJobTypeState, error) {
|
|
if s.plugin == nil {
|
|
return nil, fmt.Errorf("plugin is not enabled")
|
|
}
|
|
return s.plugin.ListSchedulerStates()
|
|
}
|
|
|
|
// Maintenance system integration methods
|
|
|
|
// InitMaintenanceManager initializes the maintenance manager
|
|
func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
|
|
s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
|
|
|
|
// Set up task persistence if config persistence is available
|
|
if s.configPersistence != nil {
|
|
queue := s.maintenanceManager.GetQueue()
|
|
if queue != nil {
|
|
queue.SetPersistence(s.configPersistence)
|
|
|
|
// Load tasks from persistence on startup
|
|
if err := queue.LoadTasksFromPersistence(); err != nil {
|
|
glog.Errorf("Failed to load tasks from persistence: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
|
|
}
|
|
|
|
// GetMaintenanceManager returns the maintenance manager
|
|
func (s *AdminServer) GetMaintenanceManager() *maintenance.MaintenanceManager {
|
|
return s.maintenanceManager
|
|
}
|
|
|
|
// StartMaintenanceManager starts the maintenance manager
|
|
func (s *AdminServer) StartMaintenanceManager() error {
|
|
if s.maintenanceManager == nil {
|
|
return fmt.Errorf("maintenance manager not initialized")
|
|
}
|
|
return s.maintenanceManager.Start()
|
|
}
|
|
|
|
// StopMaintenanceManager stops the maintenance manager
|
|
func (s *AdminServer) StopMaintenanceManager() {
|
|
if s.maintenanceManager != nil {
|
|
s.maintenanceManager.Stop()
|
|
}
|
|
}
|
|
|
|
// TriggerTopicRetentionPurge triggers topic data purging based on retention policies
|
|
func (s *AdminServer) TriggerTopicRetentionPurge() error {
|
|
if s.topicRetentionPurger == nil {
|
|
return fmt.Errorf("topic retention purger not initialized")
|
|
}
|
|
|
|
glog.V(0).Infof("Triggering topic retention purge")
|
|
return s.topicRetentionPurger.PurgeExpiredTopicData()
|
|
}
|
|
|
|
// GetTopicRetentionPurger returns the topic retention purger
|
|
func (s *AdminServer) GetTopicRetentionPurger() *TopicRetentionPurger {
|
|
return s.topicRetentionPurger
|
|
}
|
|
|
|
// CreateTopicWithRetention creates a new topic with optional retention configuration
|
|
func (s *AdminServer) CreateTopicWithRetention(namespace, name string, partitionCount int32, retentionEnabled bool, retentionSeconds int64) error {
|
|
// Find broker leader to create the topic
|
|
brokerLeader, err := s.findBrokerLeader()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find broker leader: %w", err)
|
|
}
|
|
|
|
// Create retention configuration
|
|
var retention *mq_pb.TopicRetention
|
|
if retentionEnabled {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: true,
|
|
RetentionSeconds: retentionSeconds,
|
|
}
|
|
} else {
|
|
retention = &mq_pb.TopicRetention{
|
|
Enabled: false,
|
|
RetentionSeconds: 0,
|
|
}
|
|
}
|
|
|
|
// Create the topic via broker
|
|
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
PartitionCount: partitionCount,
|
|
Retention: retention,
|
|
})
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create topic: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Created topic %s.%s with %d partitions (retention: enabled=%v, seconds=%d)",
|
|
namespace, name, partitionCount, retentionEnabled, retentionSeconds)
|
|
return nil
|
|
}
|
|
|
|
// UpdateTopicRetention updates the retention configuration for an existing topic
|
|
func (s *AdminServer) UpdateTopicRetention(namespace, name string, enabled bool, retentionSeconds int64) error {
|
|
// Get broker information from master
|
|
var brokerAddress string
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
|
ClientType: cluster.BrokerType,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Find the first available broker
|
|
for _, node := range resp.ClusterNodes {
|
|
brokerAddress = node.Address
|
|
break
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get broker nodes from master: %w", err)
|
|
}
|
|
|
|
if brokerAddress == "" {
|
|
return fmt.Errorf("no active brokers found")
|
|
}
|
|
|
|
// Create gRPC connection
|
|
conn, err := grpc.NewClient(brokerAddress, s.grpcDialOption)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to broker: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := mq_pb.NewSeaweedMessagingClient(conn)
|
|
|
|
// First, get the current topic configuration to preserve existing settings
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
currentConfig, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current topic configuration: %w", err)
|
|
}
|
|
|
|
// Create the topic configuration request, preserving all existing settings
|
|
configRequest := &mq_pb.ConfigureTopicRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
},
|
|
// Preserve existing partition count - this is critical!
|
|
PartitionCount: currentConfig.PartitionCount,
|
|
// Preserve existing schema if it exists
|
|
MessageRecordType: currentConfig.MessageRecordType,
|
|
KeyColumns: currentConfig.KeyColumns,
|
|
}
|
|
|
|
// Update only the retention configuration
|
|
if enabled {
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: retentionSeconds,
|
|
Enabled: true,
|
|
}
|
|
} else {
|
|
// Set retention to disabled
|
|
configRequest.Retention = &mq_pb.TopicRetention{
|
|
RetentionSeconds: 0,
|
|
Enabled: false,
|
|
}
|
|
}
|
|
|
|
// Send the configuration request with preserved settings
|
|
_, err = client.ConfigureTopic(ctx, configRequest)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update topic retention: %w", err)
|
|
}
|
|
|
|
glog.V(0).Infof("Updated topic %s.%s retention (enabled: %v, seconds: %d) while preserving %d partitions",
|
|
namespace, name, enabled, retentionSeconds, currentConfig.PartitionCount)
|
|
return nil
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the admin server
|
|
func (s *AdminServer) Shutdown() {
|
|
glog.V(1).Infof("Shutting down admin server...")
|
|
|
|
// Stop maintenance manager
|
|
s.StopMaintenanceManager()
|
|
if s.adminPresenceLock != nil {
|
|
s.adminPresenceLock.Stop()
|
|
}
|
|
|
|
if s.plugin != nil {
|
|
s.plugin.Shutdown()
|
|
}
|
|
|
|
// Stop worker gRPC server
|
|
if err := s.StopWorkerGrpcServer(); err != nil {
|
|
glog.Errorf("Failed to stop worker gRPC server: %v", err)
|
|
}
|
|
|
|
// Shutdown credential manager
|
|
if s.credentialManager != nil {
|
|
s.credentialManager.Shutdown()
|
|
}
|
|
|
|
glog.V(1).Infof("Admin server shutdown complete")
|
|
}
|
|
|
|
// Function to extract Object Lock information from bucket entry using shared utilities
|
|
func extractObjectLockInfoFromEntry(entry *filer_pb.Entry) (bool, string, int32) {
|
|
// Try to load Object Lock configuration using shared utility
|
|
if config, found := s3api.LoadObjectLockConfigurationFromExtended(entry); found {
|
|
return s3api.ExtractObjectLockInfoFromConfig(config)
|
|
}
|
|
|
|
return false, "", 0
|
|
}
|
|
|
|
// Function to extract versioning information from bucket entry using shared utilities
|
|
func extractVersioningFromEntry(entry *filer_pb.Entry) string {
|
|
return s3api.GetVersioningStatus(entry)
|
|
}
|
|
|
|
// GetConfigPersistence returns the config persistence manager
|
|
func (as *AdminServer) GetConfigPersistence() *ConfigPersistence {
|
|
return as.configPersistence
|
|
}
|
|
|
|
// convertJSONToMaintenanceConfig converts JSON map to protobuf MaintenanceConfig
|
|
func convertJSONToMaintenanceConfig(jsonConfig map[string]interface{}) (*maintenance.MaintenanceConfig, error) {
|
|
config := &maintenance.MaintenanceConfig{}
|
|
|
|
// Helper function to get int32 from interface{}
|
|
getInt32 := func(key string) (int32, error) {
|
|
if val, ok := jsonConfig[key]; ok {
|
|
switch v := val.(type) {
|
|
case int:
|
|
return int32(v), nil
|
|
case int32:
|
|
return v, nil
|
|
case int64:
|
|
return int32(v), nil
|
|
case float64:
|
|
return int32(v), nil
|
|
default:
|
|
return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
// Helper function to get bool from interface{}
|
|
getBool := func(key string) bool {
|
|
if val, ok := jsonConfig[key]; ok {
|
|
if b, ok := val.(bool); ok {
|
|
return b
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var err error
|
|
|
|
// Convert basic fields
|
|
config.Enabled = getBool("enabled")
|
|
|
|
if config.ScanIntervalSeconds, err = getInt32("scan_interval_seconds"); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.WorkerTimeoutSeconds, err = getInt32("worker_timeout_seconds"); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.TaskTimeoutSeconds, err = getInt32("task_timeout_seconds"); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.RetryDelaySeconds, err = getInt32("retry_delay_seconds"); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.MaxRetries, err = getInt32("max_retries"); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.CleanupIntervalSeconds, err = getInt32("cleanup_interval_seconds"); err != nil {
|
|
return nil, err
|
|
}
|
|
if config.TaskRetentionSeconds, err = getInt32("task_retention_seconds"); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert policy if present
|
|
if policyData, ok := jsonConfig["policy"]; ok {
|
|
if policyMap, ok := policyData.(map[string]interface{}); ok {
|
|
policy := &maintenance.MaintenancePolicy{}
|
|
|
|
if globalMaxConcurrent, err := getInt32FromMap(policyMap, "global_max_concurrent"); err != nil {
|
|
return nil, err
|
|
} else {
|
|
policy.GlobalMaxConcurrent = globalMaxConcurrent
|
|
}
|
|
|
|
if defaultRepeatIntervalSeconds, err := getInt32FromMap(policyMap, "default_repeat_interval_seconds"); err != nil {
|
|
return nil, err
|
|
} else {
|
|
policy.DefaultRepeatIntervalSeconds = defaultRepeatIntervalSeconds
|
|
}
|
|
|
|
if defaultCheckIntervalSeconds, err := getInt32FromMap(policyMap, "default_check_interval_seconds"); err != nil {
|
|
return nil, err
|
|
} else {
|
|
policy.DefaultCheckIntervalSeconds = defaultCheckIntervalSeconds
|
|
}
|
|
|
|
// Convert task policies if present
|
|
if taskPoliciesData, ok := policyMap["task_policies"]; ok {
|
|
if taskPoliciesMap, ok := taskPoliciesData.(map[string]interface{}); ok {
|
|
policy.TaskPolicies = make(map[string]*maintenance.TaskPolicy)
|
|
|
|
for taskType, taskPolicyData := range taskPoliciesMap {
|
|
if taskPolicyMap, ok := taskPolicyData.(map[string]interface{}); ok {
|
|
taskPolicy := &maintenance.TaskPolicy{}
|
|
|
|
taskPolicy.Enabled = getBoolFromMap(taskPolicyMap, "enabled")
|
|
|
|
if maxConcurrent, err := getInt32FromMap(taskPolicyMap, "max_concurrent"); err != nil {
|
|
return nil, err
|
|
} else {
|
|
taskPolicy.MaxConcurrent = maxConcurrent
|
|
}
|
|
|
|
if repeatIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "repeat_interval_seconds"); err != nil {
|
|
return nil, err
|
|
} else {
|
|
taskPolicy.RepeatIntervalSeconds = repeatIntervalSeconds
|
|
}
|
|
|
|
if checkIntervalSeconds, err := getInt32FromMap(taskPolicyMap, "check_interval_seconds"); err != nil {
|
|
return nil, err
|
|
} else {
|
|
taskPolicy.CheckIntervalSeconds = checkIntervalSeconds
|
|
}
|
|
|
|
policy.TaskPolicies[taskType] = taskPolicy
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
config.Policy = policy
|
|
}
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// Helper functions for map conversion
|
|
func getInt32FromMap(m map[string]interface{}, key string) (int32, error) {
|
|
if val, ok := m[key]; ok {
|
|
switch v := val.(type) {
|
|
case int:
|
|
return int32(v), nil
|
|
case int32:
|
|
return v, nil
|
|
case int64:
|
|
return int32(v), nil
|
|
case float64:
|
|
return int32(v), nil
|
|
default:
|
|
return 0, fmt.Errorf("invalid type for %s: expected number, got %T", key, v)
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
func getBoolFromMap(m map[string]interface{}, key string) bool {
|
|
if val, ok := m[key]; ok {
|
|
if b, ok := val.(bool); ok {
|
|
return b
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type collectionStats struct {
|
|
PhysicalSize int64
|
|
LogicalSize int64
|
|
FileCount int64
|
|
}
|
|
|
|
func collectCollectionStats(topologyInfo *master_pb.TopologyInfo) map[string]collectionStats {
|
|
collectionMap := make(map[string]collectionStats)
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, node := range rack.DataNodeInfos {
|
|
for _, diskInfo := range node.DiskInfos {
|
|
for _, volInfo := range diskInfo.VolumeInfos {
|
|
collection := volInfo.Collection
|
|
if collection == "" {
|
|
collection = "default"
|
|
}
|
|
|
|
data := collectionMap[collection]
|
|
data.PhysicalSize += int64(volInfo.Size)
|
|
rp, _ := super_block.NewReplicaPlacementFromByte(byte(volInfo.ReplicaPlacement))
|
|
// NewReplicaPlacementFromByte never returns a nil rp. If there's an error,
|
|
// it returns a zero-valued ReplicaPlacement, for which GetCopyCount() is 1.
|
|
// This provides a safe fallback, so we can ignore the error.
|
|
replicaCount := int64(rp.GetCopyCount())
|
|
if volInfo.Size >= volInfo.DeletedByteCount {
|
|
data.LogicalSize += int64(volInfo.Size-volInfo.DeletedByteCount) / replicaCount
|
|
}
|
|
if volInfo.FileCount >= volInfo.DeleteCount {
|
|
data.FileCount += int64(volInfo.FileCount-volInfo.DeleteCount) / replicaCount
|
|
}
|
|
collectionMap[collection] = data
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return collectionMap
|
|
}
|
|
|
|
// getCollectionStats returns current collection statistics with caching
|
|
func (s *AdminServer) getCollectionStats() (map[string]collectionStats, error) {
|
|
now := time.Now()
|
|
if s.collectionStatsCache != nil && now.Sub(s.lastCollectionStatsUpdate) < s.collectionStatsCacheThreshold {
|
|
return s.collectionStatsCache, nil
|
|
}
|
|
|
|
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
|
|
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.TopologyInfo != nil {
|
|
s.collectionStatsCache = collectCollectionStats(resp.TopologyInfo)
|
|
s.lastCollectionStatsUpdate = now
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return s.collectionStatsCache, err
|
|
}
|