Files
seaweedFS/weed/admin/handlers/admin_handlers.go
Chris Lu d95df76bca feat: separate scheduler lanes for iceberg, lifecycle, and volume management (#8787)
* feat: introduce scheduler lanes for independent per-workload scheduling

Split the single plugin scheduler loop into independent per-lane
goroutines so that volume management, iceberg compaction, and lifecycle
operations never block each other.

Each lane has its own:
- Goroutine (laneSchedulerLoop)
- Wake channel for immediate scheduling
- Admin lock scope (e.g. "plugin scheduler:default")
- Configurable idle sleep duration
- Loop state tracking

Three lanes are defined:
- default: vacuum, volume_balance, ec_balance, erasure_coding, admin_script
- iceberg: iceberg_maintenance
- lifecycle: s3_lifecycle (new, handler coming in a later commit)

Job types are mapped to lanes via a hardcoded map with LaneDefault as
the fallback. The SchedulerJobTypeState and SchedulerStatus types now
include a Lane field for API consumers.

* feat: per-lane execution reservation pools for resource isolation

Each scheduler lane now maintains its own execution reservation map
so that a busy volume lane cannot consume execution slots needed by
iceberg or lifecycle lanes. The per-lane pool is used by default when
dispatching jobs through the lane scheduler; the global pool remains
as a fallback for the public DispatchProposals API.

* feat: add per-lane scheduler status API and lane worker UI pages

- GET /api/plugin/lanes returns all lanes with status and job types
- GET /api/plugin/workers?lane=X filters workers by lane
- GET /api/plugin/scheduler-states?lane=X filters job types by lane
- GET /api/plugin/scheduler-status?lane=X returns lane-scoped status
- GET /plugin/lanes/{lane}/workers renders per-lane worker page
- SchedulerJobTypeState now includes a "lane" field

The lane worker pages show scheduler status, job type configuration,
and connected workers scoped to a single lane, with links back to
the main plugin overview.

* feat: add s3_lifecycle worker handler for object store lifecycle management

Implements a full plugin worker handler for S3 lifecycle management,
assigned to the new "lifecycle" scheduler lane.

Detection phase:
- Reads filer.conf to find buckets with TTL lifecycle rules
- Creates one job proposal per bucket with active lifecycle rules
- Supports bucket_filter wildcard pattern from admin config

Execution phase:
- Walks the bucket directory tree breadth-first
- Identifies expired objects by checking TtlSec + Crtime < now
- Deletes expired objects in configurable batches
- Reports progress with scanned/expired/error counts
- Supports dry_run mode for safe testing

Configurable via admin UI:
- batch_size: entries per filer listing page (default 1000)
- max_deletes_per_bucket: safety cap per run (default 10000)
- dry_run: detect without deleting
- delete_marker_cleanup: clean expired delete markers
- abort_mpu_days: abort stale multipart uploads

The handler integrates with the existing PutBucketLifecycle flow which
sets TtlSec on entries via filer.conf path rules.

* feat: add per-lane submenu items under Workers sidebar menu

Replace the single "Workers" sidebar link with a collapsible submenu
containing three lane entries:
- Default (volume management + admin scripts) -> /plugin
- Iceberg (table compaction) -> /plugin/lanes/iceberg/workers
- Lifecycle (S3 object expiration) -> /plugin/lanes/lifecycle/workers

The submenu auto-expands when on any /plugin page and highlights the
active lane. Icons match each lane's job type descriptor (server,
snowflake, hourglass).

* feat: scope plugin pages to their scheduler lane

The plugin overview, configuration, detection, queue, and execution
pages now filter workers, job types, scheduler states, and scheduler
status to only show data for their lane.

- Plugin() templ function accepts a lane parameter (default: "default")
- JavaScript appends ?lane= to /api/plugin/workers, /job-types,
  /scheduler-states, and /scheduler-status API calls
- GET /api/plugin/job-types now supports ?lane= filtering
- When ?job= is provided (e.g. ?job=iceberg_maintenance), the lane is
  auto-derived from the job type so the page scopes correctly

This ensures /plugin shows only default-lane workers and
/plugin/configuration?job=iceberg_maintenance scopes to the iceberg lane.

* fix: remove "Lane" from lane worker page titles and capitalize properly

"lifecycle Lane Workers" -> "Lifecycle Workers"
"iceberg Lane Workers" -> "Iceberg Workers"

* refactor: promote lane items to top-level sidebar menu entries

Move Default, Iceberg, and Lifecycle from a collapsible submenu to
direct top-level items under the WORKERS heading. Removes the
intermediate "Workers" parent link and collapse toggle.

* admin: unify plugin lane routes and handlers

* admin: filter plugin jobs and activities by lane

* admin: reuse plugin UI for worker lane pages

* fix: use ServerAddress.ToGrpcAddress() for filer connections in lifecycle handler

ClusterContext addresses use ServerAddress format (host:port.grpcPort).
Convert to the actual gRPC address via ToGrpcAddress() before dialing,
and add a Ping verification after connecting.

Fixes: "dial tcp: lookup tcp/8888.18888: unknown port"

* fix: resolve ServerAddress gRPC port in iceberg and lifecycle filer connections

ClusterContext addresses use ServerAddress format (host:httpPort.grpcPort).
Both the iceberg and lifecycle handlers now detect the compound format
and extract the gRPC port via ToGrpcAddress() before dialing. Plain
host:port addresses (e.g. from tests) are passed through unchanged.

Fixes: "dial tcp: lookup tcp/8888.18888: unknown port"

* align url

* Potential fix for code scanning alert no. 335: Incorrect conversion between integer types

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>

* fix: address PR review findings across scheduler lanes and lifecycle handler

- Fix variable shadowing: rename loop var `w` to `worker` in
  GetPluginWorkersAPI to avoid shadowing the http.ResponseWriter param
- Fix stale GetSchedulerStatus: aggregate loop states across all lanes
  instead of reading never-updated legacy schedulerLoopState
- Scope InProcessJobs to lane in GetLaneSchedulerStatus
- Fix AbortMPUDays=0 treated as unset: change <= 0 to < 0 so 0 disables
- Propagate listing errors in lifecycle bucket walk instead of swallowing
- Implement DeleteMarkerCleanup: scan for S3 delete marker entries and
  remove them
- Implement AbortMPUDays: scan .uploads directory and remove stale
  multipart uploads older than the configured threshold
- Fix success determination: mark job failed when result.errors > 0
  even if no fatal error occurred
- Add regression test for jobTypeLaneMap to catch drift from handler
  registrations

* fix: guard against nil result in lifecycle completion and trim filer addresses

- Guard result dereference in completion summary: use local vars
  defaulting to 0 when result is nil to prevent panic
- Append trimmed filer addresses instead of originals so whitespace
  is not passed to the gRPC dialer

* fix: propagate ctx cancellation from deleteExpiredObjects and add config logging

- deleteExpiredObjects now returns a third error value when the context
  is canceled mid-batch; the caller stops processing further batches
  and returns the cancellation error to the job completion handler
- readBoolConfig and readInt64Config now log unexpected ConfigValue
  types at V(1) for debugging, consistent with readStringConfig

* fix: propagate errors in lifecycle cleanup helpers and use correct delete marker key

- cleanupDeleteMarkers: return error on ctx cancellation and SeaweedList
  failures instead of silently continuing
- abortIncompleteMPUs: log SeaweedList errors instead of discarding
- isDeleteMarker: use ExtDeleteMarkerKey ("Seaweed-X-Amz-Delete-Marker")
  instead of ExtLatestVersionIsDeleteMarker which is for the parent entry
- batchSize cap: use math.MaxInt instead of math.MaxInt32

* fix: propagate ctx cancellation from abortIncompleteMPUs and log unrecognized bool strings

- abortIncompleteMPUs now returns (aborted, errors, ctxErr) matching
  cleanupDeleteMarkers; caller stops on cancellation or listing failure
- readBoolConfig logs unrecognized string values before falling back

* fix: shared per-bucket budget across lifecycle phases and allow cleanup without expired objects

- Thread a shared remaining counter through TTL deletion, delete marker
  cleanup, and MPU abort so the total operations per bucket never exceed
  MaxDeletesPerBucket
- Remove early return when no TTL-expired objects found so delete marker
  cleanup and MPU abort still run
- Add NOTE on cleanupDeleteMarkers about version-safety limitation

---------

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2026-03-26 19:28:13 -07:00

556 lines
28 KiB
Go

package handlers
import (
"net/http"
"net/url"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/sessions"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/seaweedfs/seaweedfs/weed/admin/dash"
"github.com/seaweedfs/seaweedfs/weed/admin/view/app"
"github.com/seaweedfs/seaweedfs/weed/admin/view/layout"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
"github.com/seaweedfs/seaweedfs/weed/stats"
)
// AdminHandlers contains all the HTTP handlers for the admin interface
type AdminHandlers struct {
adminServer *dash.AdminServer
sessionStore sessions.Store
authHandlers *AuthHandlers
clusterHandlers *ClusterHandlers
fileBrowserHandlers *FileBrowserHandlers
userHandlers *UserHandlers
policyHandlers *PolicyHandlers
pluginHandlers *PluginHandlers
mqHandlers *MessageQueueHandlers
serviceAccountHandlers *ServiceAccountHandlers
groupHandlers *GroupHandlers
}
// NewAdminHandlers creates a new instance of AdminHandlers
func NewAdminHandlers(adminServer *dash.AdminServer, store sessions.Store) *AdminHandlers {
authHandlers := NewAuthHandlers(adminServer, store)
clusterHandlers := NewClusterHandlers(adminServer)
fileBrowserHandlers := NewFileBrowserHandlers(adminServer)
userHandlers := NewUserHandlers(adminServer)
policyHandlers := NewPolicyHandlers(adminServer)
pluginHandlers := NewPluginHandlers(adminServer)
mqHandlers := NewMessageQueueHandlers(adminServer)
serviceAccountHandlers := NewServiceAccountHandlers(adminServer)
groupHandlers := NewGroupHandlers(adminServer)
return &AdminHandlers{
adminServer: adminServer,
sessionStore: store,
authHandlers: authHandlers,
clusterHandlers: clusterHandlers,
fileBrowserHandlers: fileBrowserHandlers,
userHandlers: userHandlers,
policyHandlers: policyHandlers,
pluginHandlers: pluginHandlers,
mqHandlers: mqHandlers,
serviceAccountHandlers: serviceAccountHandlers,
groupHandlers: groupHandlers,
}
}
// SetupRoutes configures all the routes for the admin interface
func (h *AdminHandlers) SetupRoutes(r *mux.Router, authRequired bool, adminUser, adminPassword, readOnlyUser, readOnlyPassword string, enableUI bool) {
// Health check (no auth required)
r.HandleFunc("/health", h.HealthCheck).Methods(http.MethodGet)
// Prometheus metrics endpoint (no auth required)
r.Handle("/metrics", promhttp.HandlerFor(stats.Gather, promhttp.HandlerOpts{})).Methods(http.MethodGet)
// Favicon route (no auth required) - redirect to static version
r.HandleFunc("/favicon.ico", func(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, dash.P(req.Context(), "/static/favicon.ico"), http.StatusMovedPermanently)
}).Methods(http.MethodGet)
// Skip UI routes if UI is not enabled
if !enableUI {
return
}
if authRequired {
// Authentication routes (no auth required)
r.HandleFunc("/login", h.authHandlers.ShowLogin).Methods(http.MethodGet)
r.Handle("/login", h.authHandlers.HandleLogin(adminUser, adminPassword, readOnlyUser, readOnlyPassword)).Methods(http.MethodPost)
r.HandleFunc("/logout", h.authHandlers.HandleLogout).Methods(http.MethodGet)
protected := r.NewRoute().Subrouter()
protected.Use(dash.RequireAuth(h.sessionStore))
h.registerUIRoutes(protected)
api := r.PathPrefix("/api").Subrouter()
api.Use(dash.RequireAuthAPI(h.sessionStore))
h.registerAPIRoutes(api, true)
return
}
// No authentication required - all routes are public
h.registerUIRoutes(r)
api := r.PathPrefix("/api").Subrouter()
h.registerAPIRoutes(api, false)
}
func (h *AdminHandlers) registerUIRoutes(r *mux.Router) {
// Main admin interface routes
r.HandleFunc("/", h.ShowDashboard).Methods(http.MethodGet)
r.HandleFunc("/admin", h.ShowDashboard).Methods(http.MethodGet)
// Object Store management routes
r.HandleFunc("/object-store/buckets", h.ShowS3Buckets).Methods(http.MethodGet)
r.HandleFunc("/object-store/buckets/{bucket}", h.ShowBucketDetails).Methods(http.MethodGet)
r.HandleFunc("/object-store/users", h.userHandlers.ShowObjectStoreUsers).Methods(http.MethodGet)
r.HandleFunc("/object-store/policies", h.policyHandlers.ShowPolicies).Methods(http.MethodGet)
r.HandleFunc("/object-store/groups", h.groupHandlers.ShowGroups).Methods(http.MethodGet)
r.HandleFunc("/object-store/service-accounts", h.serviceAccountHandlers.ShowServiceAccounts).Methods(http.MethodGet)
r.HandleFunc("/object-store/s3tables/buckets", h.ShowS3TablesBuckets).Methods(http.MethodGet)
r.HandleFunc("/object-store/s3tables/buckets/{bucket}/namespaces", h.ShowS3TablesNamespaces).Methods(http.MethodGet)
r.HandleFunc("/object-store/s3tables/buckets/{bucket}/namespaces/{namespace}/tables", h.ShowS3TablesTables).Methods(http.MethodGet)
r.HandleFunc("/object-store/s3tables/buckets/{bucket}/namespaces/{namespace}/tables/{table}", h.ShowS3TablesTableDetails).Methods(http.MethodGet)
r.HandleFunc("/object-store/iceberg", h.ShowIcebergCatalog).Methods(http.MethodGet)
r.HandleFunc("/object-store/iceberg/{catalog}/namespaces", h.ShowIcebergNamespaces).Methods(http.MethodGet)
r.HandleFunc("/object-store/iceberg/{catalog}/namespaces/{namespace}/tables", h.ShowIcebergTables).Methods(http.MethodGet)
r.HandleFunc("/object-store/iceberg/{catalog}/namespaces/{namespace}/tables/{table}", h.ShowIcebergTableDetails).Methods(http.MethodGet)
// File browser routes
r.HandleFunc("/files", h.fileBrowserHandlers.ShowFileBrowser).Methods(http.MethodGet)
// Cluster management routes
r.HandleFunc("/cluster/masters", h.clusterHandlers.ShowClusterMasters).Methods(http.MethodGet)
r.HandleFunc("/cluster/filers", h.clusterHandlers.ShowClusterFilers).Methods(http.MethodGet)
r.HandleFunc("/cluster/volume-servers", h.clusterHandlers.ShowClusterVolumeServers).Methods(http.MethodGet)
// Storage management routes
r.HandleFunc("/storage/volumes", h.clusterHandlers.ShowClusterVolumes).Methods(http.MethodGet)
r.HandleFunc("/storage/volumes/{id}/{server}", h.clusterHandlers.ShowVolumeDetails).Methods(http.MethodGet)
r.HandleFunc("/storage/collections", h.clusterHandlers.ShowClusterCollections).Methods(http.MethodGet)
r.HandleFunc("/storage/collections/{name}", h.clusterHandlers.ShowCollectionDetails).Methods(http.MethodGet)
r.HandleFunc("/storage/ec-shards", h.clusterHandlers.ShowClusterEcShards).Methods(http.MethodGet)
r.HandleFunc("/storage/ec-volumes/{id}", h.clusterHandlers.ShowEcVolumeDetails).Methods(http.MethodGet)
// Message Queue management routes
r.HandleFunc("/mq/brokers", h.mqHandlers.ShowBrokers).Methods(http.MethodGet)
r.HandleFunc("/mq/topics", h.mqHandlers.ShowTopics).Methods(http.MethodGet)
r.HandleFunc("/mq/topics/{namespace}/{topic}", h.mqHandlers.ShowTopicDetails).Methods(http.MethodGet)
// Plugin pages
r.HandleFunc("/plugin", h.pluginHandlers.ShowPlugin).Methods(http.MethodGet)
r.HandleFunc("/plugin/configuration", h.pluginHandlers.ShowPluginConfiguration).Methods(http.MethodGet)
r.HandleFunc("/plugin/queue", h.pluginHandlers.ShowPluginQueue).Methods(http.MethodGet)
r.HandleFunc("/plugin/detection", h.pluginHandlers.ShowPluginDetection).Methods(http.MethodGet)
r.HandleFunc("/plugin/execution", h.pluginHandlers.ShowPluginExecution).Methods(http.MethodGet)
r.HandleFunc("/plugin/monitoring", h.pluginHandlers.ShowPluginMonitoring).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}", h.pluginHandlers.ShowPluginLane).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}/configuration", h.pluginHandlers.ShowPluginLaneConfiguration).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}/queue", h.pluginHandlers.ShowPluginLaneQueue).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}/detection", h.pluginHandlers.ShowPluginLaneDetection).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}/execution", h.pluginHandlers.ShowPluginLaneExecution).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}/monitoring", h.pluginHandlers.ShowPluginLaneMonitoring).Methods(http.MethodGet)
r.HandleFunc("/plugin/lanes/{lane}/workers", h.pluginHandlers.ShowPluginLaneWorkers).Methods(http.MethodGet)
}
func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) {
wrapWrite := func(handler http.HandlerFunc) http.Handler {
if !enforceWrite {
return handler
}
return dash.RequireWriteAccess()(handler)
}
api.HandleFunc("/cluster/topology", h.clusterHandlers.GetClusterTopology).Methods(http.MethodGet)
api.HandleFunc("/cluster/masters", h.clusterHandlers.GetMasters).Methods(http.MethodGet)
api.HandleFunc("/cluster/volumes", h.clusterHandlers.GetVolumeServers).Methods(http.MethodGet)
api.HandleFunc("/admin", h.adminServer.ShowAdmin).Methods(http.MethodGet)
api.HandleFunc("/config", h.adminServer.GetConfigInfo).Methods(http.MethodGet)
s3Api := api.PathPrefix("/s3").Subrouter()
s3Api.HandleFunc("/buckets", h.adminServer.ListBucketsAPI).Methods(http.MethodGet)
s3Api.Handle("/buckets", wrapWrite(h.adminServer.CreateBucket)).Methods(http.MethodPost)
s3Api.Handle("/buckets/{bucket}", wrapWrite(h.adminServer.DeleteBucket)).Methods(http.MethodDelete)
s3Api.HandleFunc("/buckets/{bucket}", h.adminServer.ShowBucketDetails).Methods(http.MethodGet)
s3Api.Handle("/buckets/{bucket}/quota", wrapWrite(h.adminServer.UpdateBucketQuota)).Methods(http.MethodPut)
s3Api.Handle("/buckets/{bucket}/owner", wrapWrite(h.adminServer.UpdateBucketOwner)).Methods(http.MethodPut)
usersApi := api.PathPrefix("/users").Subrouter()
usersApi.HandleFunc("", h.userHandlers.GetUsers).Methods(http.MethodGet)
usersApi.Handle("", wrapWrite(h.userHandlers.CreateUser)).Methods(http.MethodPost)
usersApi.HandleFunc("/{username}", h.userHandlers.GetUserDetails).Methods(http.MethodGet)
usersApi.Handle("/{username}", wrapWrite(h.userHandlers.UpdateUser)).Methods(http.MethodPut)
usersApi.Handle("/{username}", wrapWrite(h.userHandlers.DeleteUser)).Methods(http.MethodDelete)
usersApi.Handle("/{username}/access-keys", wrapWrite(h.userHandlers.CreateAccessKey)).Methods(http.MethodPost)
usersApi.Handle("/{username}/access-keys/{accessKeyId}", wrapWrite(h.userHandlers.DeleteAccessKey)).Methods(http.MethodDelete)
usersApi.Handle("/{username}/access-keys/{accessKeyId}/status", wrapWrite(h.userHandlers.UpdateAccessKeyStatus)).Methods(http.MethodPut)
usersApi.HandleFunc("/{username}/policies", h.userHandlers.GetUserPolicies).Methods(http.MethodGet)
usersApi.Handle("/{username}/policies", wrapWrite(h.userHandlers.UpdateUserPolicies)).Methods(http.MethodPut)
saApi := api.PathPrefix("/service-accounts").Subrouter()
saApi.HandleFunc("", h.serviceAccountHandlers.GetServiceAccounts).Methods(http.MethodGet)
saApi.Handle("", wrapWrite(h.serviceAccountHandlers.CreateServiceAccount)).Methods(http.MethodPost)
saApi.HandleFunc("/{id}", h.serviceAccountHandlers.GetServiceAccountDetails).Methods(http.MethodGet)
saApi.Handle("/{id}", wrapWrite(h.serviceAccountHandlers.UpdateServiceAccount)).Methods(http.MethodPut)
saApi.Handle("/{id}", wrapWrite(h.serviceAccountHandlers.DeleteServiceAccount)).Methods(http.MethodDelete)
groupsApi := api.PathPrefix("/groups").Subrouter()
groupsApi.HandleFunc("", h.groupHandlers.GetGroups).Methods(http.MethodGet)
groupsApi.Handle("", wrapWrite(h.groupHandlers.CreateGroup)).Methods(http.MethodPost)
groupsApi.HandleFunc("/{name}", h.groupHandlers.GetGroupDetails).Methods(http.MethodGet)
groupsApi.Handle("/{name}", wrapWrite(h.groupHandlers.DeleteGroup)).Methods(http.MethodDelete)
groupsApi.Handle("/{name}/status", wrapWrite(h.groupHandlers.SetGroupStatus)).Methods(http.MethodPut)
groupsApi.HandleFunc("/{name}/members", h.groupHandlers.GetGroupMembers).Methods(http.MethodGet)
groupsApi.Handle("/{name}/members", wrapWrite(h.groupHandlers.AddGroupMember)).Methods(http.MethodPost)
groupsApi.Handle("/{name}/members/{username}", wrapWrite(h.groupHandlers.RemoveGroupMember)).Methods(http.MethodDelete)
groupsApi.HandleFunc("/{name}/policies", h.groupHandlers.GetGroupPolicies).Methods(http.MethodGet)
groupsApi.Handle("/{name}/policies", wrapWrite(h.groupHandlers.AttachGroupPolicy)).Methods(http.MethodPost)
groupsApi.Handle("/{name}/policies/{policyName}", wrapWrite(h.groupHandlers.DetachGroupPolicy)).Methods(http.MethodDelete)
policyApi := api.PathPrefix("/object-store/policies").Subrouter()
policyApi.HandleFunc("", h.policyHandlers.GetPolicies).Methods(http.MethodGet)
policyApi.Handle("", wrapWrite(h.policyHandlers.CreatePolicy)).Methods(http.MethodPost)
policyApi.HandleFunc("/{name}", h.policyHandlers.GetPolicy).Methods(http.MethodGet)
policyApi.Handle("/{name}", wrapWrite(h.policyHandlers.UpdatePolicy)).Methods(http.MethodPut)
policyApi.Handle("/{name}", wrapWrite(h.policyHandlers.DeletePolicy)).Methods(http.MethodDelete)
policyApi.HandleFunc("/validate", h.policyHandlers.ValidatePolicy).Methods(http.MethodPost)
s3TablesApi := api.PathPrefix("/s3tables").Subrouter()
s3TablesApi.HandleFunc("/buckets", h.adminServer.ListS3TablesBucketsAPI).Methods(http.MethodGet)
s3TablesApi.Handle("/buckets", wrapWrite(h.adminServer.CreateS3TablesBucket)).Methods(http.MethodPost)
s3TablesApi.Handle("/buckets", wrapWrite(h.adminServer.DeleteS3TablesBucket)).Methods(http.MethodDelete)
s3TablesApi.HandleFunc("/namespaces", h.adminServer.ListS3TablesNamespacesAPI).Methods(http.MethodGet)
s3TablesApi.Handle("/namespaces", wrapWrite(h.adminServer.CreateS3TablesNamespace)).Methods(http.MethodPost)
s3TablesApi.Handle("/namespaces", wrapWrite(h.adminServer.DeleteS3TablesNamespace)).Methods(http.MethodDelete)
s3TablesApi.HandleFunc("/tables", h.adminServer.ListS3TablesTablesAPI).Methods(http.MethodGet)
s3TablesApi.Handle("/tables", wrapWrite(h.adminServer.CreateS3TablesTable)).Methods(http.MethodPost)
s3TablesApi.Handle("/tables", wrapWrite(h.adminServer.DeleteS3TablesTable)).Methods(http.MethodDelete)
s3TablesApi.Handle("/bucket-policy", wrapWrite(h.adminServer.PutS3TablesBucketPolicy)).Methods(http.MethodPut)
s3TablesApi.HandleFunc("/bucket-policy", h.adminServer.GetS3TablesBucketPolicy).Methods(http.MethodGet)
s3TablesApi.Handle("/bucket-policy", wrapWrite(h.adminServer.DeleteS3TablesBucketPolicy)).Methods(http.MethodDelete)
s3TablesApi.Handle("/table-policy", wrapWrite(h.adminServer.PutS3TablesTablePolicy)).Methods(http.MethodPut)
s3TablesApi.HandleFunc("/table-policy", h.adminServer.GetS3TablesTablePolicy).Methods(http.MethodGet)
s3TablesApi.Handle("/table-policy", wrapWrite(h.adminServer.DeleteS3TablesTablePolicy)).Methods(http.MethodDelete)
s3TablesApi.Handle("/tags", wrapWrite(h.adminServer.TagS3TablesResource)).Methods(http.MethodPut)
s3TablesApi.HandleFunc("/tags", h.adminServer.ListS3TablesTags).Methods(http.MethodGet)
s3TablesApi.Handle("/tags", wrapWrite(h.adminServer.UntagS3TablesResource)).Methods(http.MethodDelete)
filesApi := api.PathPrefix("/files").Subrouter()
filesApi.Handle("/delete", wrapWrite(h.fileBrowserHandlers.DeleteFile)).Methods(http.MethodDelete)
filesApi.Handle("/delete-multiple", wrapWrite(h.fileBrowserHandlers.DeleteMultipleFiles)).Methods(http.MethodDelete)
filesApi.Handle("/create-folder", wrapWrite(h.fileBrowserHandlers.CreateFolder)).Methods(http.MethodPost)
filesApi.Handle("/upload", wrapWrite(h.fileBrowserHandlers.UploadFile)).Methods(http.MethodPost)
filesApi.HandleFunc("/download", h.fileBrowserHandlers.DownloadFile).Methods(http.MethodGet)
filesApi.HandleFunc("/view", h.fileBrowserHandlers.ViewFile).Methods(http.MethodGet)
filesApi.HandleFunc("/properties", h.fileBrowserHandlers.GetFileProperties).Methods(http.MethodGet)
volumeApi := api.PathPrefix("/volumes").Subrouter()
volumeApi.Handle("/{id}/{server}/vacuum", wrapWrite(h.clusterHandlers.VacuumVolume)).Methods(http.MethodPost)
pluginApi := api.PathPrefix("/plugin").Subrouter()
pluginApi.HandleFunc("/status", h.adminServer.GetPluginStatusAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/lanes", h.adminServer.GetPluginLanesAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/workers", h.adminServer.GetPluginWorkersAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/job-types", h.adminServer.GetPluginJobTypesAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/jobs", h.adminServer.GetPluginJobsAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/jobs/{jobId}", h.adminServer.GetPluginJobAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/jobs/{jobId}/detail", h.adminServer.GetPluginJobDetailAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/activities", h.adminServer.GetPluginActivitiesAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/scheduler-states", h.adminServer.GetPluginSchedulerStatesAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/scheduler-status", h.adminServer.GetPluginSchedulerStatusAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/job-types/{jobType}/descriptor", h.adminServer.GetPluginJobTypeDescriptorAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/job-types/{jobType}/schema", h.adminServer.RequestPluginJobTypeSchemaAPI).Methods(http.MethodPost)
pluginApi.HandleFunc("/job-types/{jobType}/config", h.adminServer.GetPluginJobTypeConfigAPI).Methods(http.MethodGet)
pluginApi.Handle("/job-types/{jobType}/config", wrapWrite(h.adminServer.UpdatePluginJobTypeConfigAPI)).Methods(http.MethodPut)
pluginApi.HandleFunc("/job-types/{jobType}/runs", h.adminServer.GetPluginRunHistoryAPI).Methods(http.MethodGet)
pluginApi.Handle("/job-types/{jobType}/detect", wrapWrite(h.adminServer.TriggerPluginDetectionAPI)).Methods(http.MethodPost)
pluginApi.Handle("/job-types/{jobType}/run", wrapWrite(h.adminServer.RunPluginJobTypeAPI)).Methods(http.MethodPost)
pluginApi.Handle("/jobs/execute", wrapWrite(h.adminServer.ExecutePluginJobAPI)).Methods(http.MethodPost)
pluginApi.Handle("/jobs/{jobId}/expire", wrapWrite(h.adminServer.ExpirePluginJobAPI)).Methods(http.MethodPost)
mqApi := api.PathPrefix("/mq").Subrouter()
mqApi.HandleFunc("/topics/{namespace}/{topic}", h.mqHandlers.GetTopicDetailsAPI).Methods(http.MethodGet)
mqApi.Handle("/topics/create", wrapWrite(h.mqHandlers.CreateTopicAPI)).Methods(http.MethodPost)
mqApi.Handle("/topics/retention/update", wrapWrite(h.mqHandlers.UpdateTopicRetentionAPI)).Methods(http.MethodPost)
mqApi.Handle("/retention/purge", wrapWrite(h.adminServer.TriggerTopicRetentionPurgeAPI)).Methods(http.MethodPost)
}
// HealthCheck returns the health status of the admin interface
func (h *AdminHandlers) HealthCheck(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"health": "ok"})
}
// ShowDashboard renders the main admin dashboard
func (h *AdminHandlers) ShowDashboard(w http.ResponseWriter, r *http.Request) {
// Get admin data from the server
adminData := h.getAdminData(r)
username := h.getUsername(r)
// Render HTML template
w.Header().Set("Content-Type", "text/html")
adminComponent := app.Admin(adminData)
viewCtx := layout.NewViewContext(r, username, dash.CSRFTokenFromContext(r.Context()))
layoutComponent := layout.Layout(viewCtx, adminComponent)
if err := layoutComponent.Render(r.Context(), w); err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error())
return
}
}
// ShowS3Buckets renders the Object Store buckets management page
func (h *AdminHandlers) ShowS3Buckets(w http.ResponseWriter, r *http.Request) {
// Get pagination and sorting parameters from query string
page := 1
if p := r.URL.Query().Get("page"); p != "" {
if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 {
page = parsed
}
}
pageSize := 100
if ps := r.URL.Query().Get("pageSize"); ps != "" {
if parsed, err := strconv.Atoi(ps); err == nil && parsed > 0 && parsed <= 1000 {
pageSize = parsed
}
}
sortBy := defaultQuery(r.URL.Query().Get("sortBy"), "name")
sortOrder := defaultQuery(r.URL.Query().Get("sortOrder"), "asc")
// Get Object Store buckets data with pagination
s3Data := h.getS3BucketsData(r, page, pageSize, sortBy, sortOrder)
username := h.getUsername(r)
// Render HTML template
w.Header().Set("Content-Type", "text/html")
s3Component := app.S3Buckets(s3Data)
viewCtx := layout.NewViewContext(r, username, dash.CSRFTokenFromContext(r.Context()))
layoutComponent := layout.Layout(viewCtx, s3Component)
if err := layoutComponent.Render(r.Context(), w); err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error())
return
}
}
// ShowS3TablesBuckets renders the S3 Tables buckets page
func (h *AdminHandlers) ShowS3TablesBuckets(w http.ResponseWriter, r *http.Request) {
username := h.getUsername(r)
data, err := h.adminServer.GetS3TablesBucketsData(r.Context())
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to get S3 Tables buckets: "+err.Error())
return
}
data.Username = username
w.Header().Set("Content-Type", "text/html")
component := app.S3TablesBuckets(data)
viewCtx := layout.NewViewContext(r, username, dash.CSRFTokenFromContext(r.Context()))
layoutComponent := layout.Layout(viewCtx, component)
if err := layoutComponent.Render(r.Context(), w); err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error())
}
}
// ShowS3TablesNamespaces renders namespaces for a table bucket
func (h *AdminHandlers) ShowS3TablesNamespaces(w http.ResponseWriter, r *http.Request) {
username := h.getUsername(r)
bucketName := mux.Vars(r)["bucket"]
arn, err := buildS3TablesBucketArn(bucketName)
if err != nil {
writeJSONError(w, http.StatusBadRequest, err.Error())
return
}
data, err := h.adminServer.GetS3TablesNamespacesData(r.Context(), arn)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to get S3 Tables namespaces: "+err.Error())
return
}
data.Username = username
w.Header().Set("Content-Type", "text/html")
component := app.S3TablesNamespaces(data)
viewCtx := layout.NewViewContext(r, username, dash.CSRFTokenFromContext(r.Context()))
layoutComponent := layout.Layout(viewCtx, component)
if err := layoutComponent.Render(r.Context(), w); err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error())
}
}
// ShowS3TablesTables renders tables for a namespace
func (h *AdminHandlers) ShowS3TablesTables(w http.ResponseWriter, r *http.Request) {
username := h.getUsername(r)
bucketName := mux.Vars(r)["bucket"]
namespace := mux.Vars(r)["namespace"]
arn, err := buildS3TablesBucketArn(bucketName)
if err != nil {
writeJSONError(w, http.StatusBadRequest, err.Error())
return
}
data, err := h.adminServer.GetS3TablesTablesData(r.Context(), arn, namespace)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to get S3 Tables tables: "+err.Error())
return
}
data.Username = username
w.Header().Set("Content-Type", "text/html")
component := app.S3TablesTables(data)
viewCtx := layout.NewViewContext(r, username, dash.CSRFTokenFromContext(r.Context()))
layoutComponent := layout.Layout(viewCtx, component)
if err := layoutComponent.Render(r.Context(), w); err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error())
}
}
// ShowS3TablesTableDetails renders Iceberg table metadata and snapshot details on the merged S3 Tables path.
func (h *AdminHandlers) ShowS3TablesTableDetails(w http.ResponseWriter, r *http.Request) {
bucketName := mux.Vars(r)["bucket"]
namespace := mux.Vars(r)["namespace"]
tableName := mux.Vars(r)["table"]
arn, err := buildS3TablesBucketArn(bucketName)
if err != nil {
writeJSONError(w, http.StatusBadRequest, err.Error())
return
}
username := h.getUsername(r)
data, err := h.adminServer.GetIcebergTableDetailsData(r.Context(), bucketName, arn, namespace, tableName)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to get table details: "+err.Error())
return
}
data.Username = username
w.Header().Set("Content-Type", "text/html")
component := app.IcebergTableDetails(data)
viewCtx := layout.NewViewContext(r, username, dash.CSRFTokenFromContext(r.Context()))
layoutComponent := layout.Layout(viewCtx, component)
if err := layoutComponent.Render(r.Context(), w); err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to render template: "+err.Error())
}
}
func buildS3TablesBucketArn(bucketName string) (string, error) {
return s3tables.BuildBucketARN(s3tables.DefaultRegion, s3_constants.AccountAdminId, bucketName)
}
// getUsername returns the username from context, defaulting to "admin" if not set
func (h *AdminHandlers) getUsername(r *http.Request) string {
username := dash.UsernameFromContext(r.Context())
if username == "" {
username = "admin"
}
return username
}
// ShowIcebergCatalog redirects legacy Iceberg catalog URL to the merged S3 Tables buckets page.
func (h *AdminHandlers) ShowIcebergCatalog(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, dash.P(r.Context(), "/object-store/s3tables/buckets"), http.StatusMovedPermanently)
}
// ShowIcebergNamespaces redirects legacy Iceberg namespaces URL to the merged S3 Tables namespaces page.
func (h *AdminHandlers) ShowIcebergNamespaces(w http.ResponseWriter, r *http.Request) {
catalogName := mux.Vars(r)["catalog"]
http.Redirect(w, r, dash.P(r.Context(), "/object-store/s3tables/buckets/"+url.PathEscape(catalogName)+"/namespaces"), http.StatusMovedPermanently)
}
// ShowIcebergTables redirects legacy Iceberg tables URL to the merged S3 Tables tables page.
func (h *AdminHandlers) ShowIcebergTables(w http.ResponseWriter, r *http.Request) {
catalogName := mux.Vars(r)["catalog"]
namespace := mux.Vars(r)["namespace"]
http.Redirect(w, r, dash.P(r.Context(), "/object-store/s3tables/buckets/"+url.PathEscape(catalogName)+"/namespaces/"+url.PathEscape(namespace)+"/tables"), http.StatusMovedPermanently)
}
// ShowIcebergTableDetails redirects legacy Iceberg table details URL to the merged S3 Tables details page.
func (h *AdminHandlers) ShowIcebergTableDetails(w http.ResponseWriter, r *http.Request) {
catalogName := mux.Vars(r)["catalog"]
namespace := mux.Vars(r)["namespace"]
tableName := mux.Vars(r)["table"]
http.Redirect(w, r, dash.P(r.Context(), "/object-store/s3tables/buckets/"+url.PathEscape(catalogName)+"/namespaces/"+url.PathEscape(namespace)+"/tables/"+url.PathEscape(tableName)), http.StatusMovedPermanently)
}
// ShowBucketDetails returns detailed information about a specific bucket
func (h *AdminHandlers) ShowBucketDetails(w http.ResponseWriter, r *http.Request) {
bucketName := mux.Vars(r)["bucket"]
details, err := h.adminServer.GetBucketDetails(bucketName)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "Failed to get bucket details: "+err.Error())
return
}
writeJSON(w, http.StatusOK, details)
}
// getS3BucketsData retrieves Object Store buckets data from the server with pagination
func (h *AdminHandlers) getS3BucketsData(r *http.Request, page, pageSize int, sortBy, sortOrder string) dash.S3BucketsData {
username := dash.UsernameFromContext(r.Context())
if username == "" {
username = "admin"
}
// Get Object Store buckets data
data, err := h.adminServer.GetS3BucketsData(page, pageSize, sortBy, sortOrder)
if err != nil {
// Return empty data on error
return dash.S3BucketsData{
Username: username,
Buckets: []dash.S3Bucket{},
TotalBuckets: 0,
TotalSize: 0,
LastUpdated: time.Now(),
CurrentPage: 1,
TotalPages: 1,
PageSize: pageSize,
SortBy: sortBy,
SortOrder: sortOrder,
}
}
data.Username = username
return data
}
// getAdminData retrieves admin data from the server (now uses consolidated method)
func (h *AdminHandlers) getAdminData(r *http.Request) dash.AdminData {
username := dash.UsernameFromContext(r.Context())
// Use the consolidated GetAdminData method from AdminServer
adminData, err := h.adminServer.GetAdminData(username)
if err != nil {
// Return default data when services are not available
if username == "" {
username = "admin"
}
masterNodes := []dash.MasterNode{
{
Address: "localhost:9333",
IsLeader: true,
},
}
return dash.AdminData{
Username: username,
TotalVolumes: 0,
TotalFiles: 0,
TotalSize: 0,
MasterNodes: masterNodes,
VolumeServers: []dash.VolumeServer{},
FilerNodes: []dash.FilerNode{},
DataCenters: []dash.DataCenter{},
LastUpdated: time.Now(),
}
}
return adminData
}
// Helper functions