Fix Maintenance Task Sorting and Refactor Log Persistence (#8199)
* fix float stepping * do not auto refresh * only logs when non 200 status * fix maintenance task sorting and cleanup redundant handler logic * Refactor log retrieval to persist to disk and fix slowness - Move log retrieval to disk-based persistence in GetMaintenanceTaskDetail - Implement background log fetching on task completion in worker_grpc_server.go - Implement async background refresh for in-progress tasks - Completely remove blocking gRPC calls from the UI path to fix 10s timeouts - Cleanup debug logs and performance profiling code * Ensure consistent deterministic sorting in config_persistence cleanup * Replace magic numbers with constants and remove debug logs - Added descriptive constants for truncation limits and timeouts in admin_server.go and worker_grpc_server.go - Replaced magic numbers with these constants throughout the codebase - Verified removal of stdout debug printing - Ensured consistent truncation logic during log persistence * Address code review feedback on history truncation and logging logic - Fix AssignmentHistory double-serialization by copying task in GetMaintenanceTaskDetail - Fix handleTaskCompletion logging logic (mutually exclusive success/failure logs) - Remove unused Timeout field from LogRequestContext and sync select timeouts with constants - Ensure AssignmentHistory is only provided in the top-level field for better JSON structure * Implement goroutine leak protection and request deduplication - Add request deduplication in RequestTaskLogs to prevent multiple concurrent fetches for the same task - Implement safe cleanup in timeout handlers to avoid race conditions in pendingLogRequests map - Add a 10s cooldown for background log refreshes in GetMaintenanceTaskDetail to prevent spamming - Ensure all persistent log-fetching goroutines are bounded and efficiently managed * Fix potential nil pointer panics in maintenance handlers - Add nil checks for adminServer in ShowTaskDetail, ShowMaintenanceWorkers, and UpdateTaskConfig - Update getMaintenanceQueueData to return a descriptive error instead of nil when adminServer is uninitialized - Ensure internal helper methods consistently check for adminServer initialization before use * Strictly enforce disk-only log reading - Remove background log fetching from GetMaintenanceTaskDetail to prevent timeouts and network calls during page view - Remove unused lastLogFetch tracking fields to clean up dead code - Ensure logs are only updated upon task completion via handleTaskCompletion * Refactor GetWorkerLogs to read from disk - Update /api/maintenance/workers/:id/logs endpoint to use configPersistence.LoadTaskExecutionLogs - Remove synchronous gRPC call RequestTaskLogs to prevent timeouts and bad gateway errors - Ensure consistent log retrieval behavior across the application (disk-only) * Fix timestamp parsing in log viewer - Update task_detail.templ JS to handle both ISO 8601 strings and Unix timestamps - Fix "Invalid time value" error when displaying logs fetched from disk - Regenerate templates * master: fallback to HDD if SSD volumes are full in Assign * worker: improve EC detection logging and fix skip counters * worker: add Sync method to TaskLogger interface * worker: implement Sync and ensure logs are flushed before task completion * admin: improve task log retrieval with retries and better timeouts * admin: robust timestamp parsing in task detail view
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
@@ -17,6 +18,15 @@ import (
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
maxLogFetchLimit = 1000
|
||||
maxLogMessageSize = 2000
|
||||
maxLogFieldsCount = 20
|
||||
logRequestTimeout = 10 * time.Second
|
||||
logResponseTimeout = 30 * time.Second
|
||||
logSendTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// WorkerGrpcServer implements the WorkerService gRPC interface
|
||||
type WorkerGrpcServer struct {
|
||||
worker_pb.UnimplementedWorkerServiceServer
|
||||
@@ -42,7 +52,6 @@ type LogRequestContext struct {
|
||||
TaskID string
|
||||
WorkerID string
|
||||
ResponseCh chan *worker_pb.TaskLogResponse
|
||||
Timeout time.Time
|
||||
}
|
||||
|
||||
// WorkerConnection represents an active worker connection
|
||||
@@ -89,8 +98,9 @@ func (s *WorkerGrpcServer) StartWithTLS(port int) error {
|
||||
s.listener = listener
|
||||
s.running = true
|
||||
|
||||
// Start cleanup routine
|
||||
// Start background routines
|
||||
go s.cleanupRoutine()
|
||||
go s.activeLogFetchLoop()
|
||||
|
||||
// Start serving in a goroutine
|
||||
go func() {
|
||||
@@ -437,9 +447,90 @@ func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completi
|
||||
} else {
|
||||
glog.Errorf("Worker %s failed task %s: %s", conn.workerID, completion.TaskId, completion.ErrorMessage)
|
||||
}
|
||||
|
||||
// Fetch and persist logs
|
||||
go s.FetchAndSaveLogs(conn.workerID, completion.TaskId)
|
||||
}
|
||||
}
|
||||
|
||||
// FetchAndSaveLogs retrieves logs from a worker and saves them to disk
|
||||
func (s *WorkerGrpcServer) FetchAndSaveLogs(workerID, taskID string) error {
|
||||
// Add a small initial delay to allow worker to finalize and sync logs
|
||||
// especially when this is called immediately after TaskComplete
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
var workerLogs []*worker_pb.TaskLogEntry
|
||||
var err error
|
||||
|
||||
// Retry a few times if fetch fails, as logs might be in the middle of a terminal sync
|
||||
for attempt := 1; attempt <= 3; attempt++ {
|
||||
workerLogs, err = s.RequestTaskLogs(workerID, taskID, maxLogFetchLimit, "")
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if attempt < 3 {
|
||||
glog.V(1).Infof("Fetch logs attempt %d failed for task %s: %v. Retrying in 1s...", attempt, taskID, err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to fetch logs for task %s after 3 attempts: %v", taskID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Convert logs
|
||||
var maintenanceLogs []*maintenance.TaskExecutionLog
|
||||
for _, workerLog := range workerLogs {
|
||||
maintenanceLog := &maintenance.TaskExecutionLog{
|
||||
Timestamp: time.Unix(workerLog.Timestamp, 0),
|
||||
Level: workerLog.Level,
|
||||
Message: workerLog.Message,
|
||||
Source: "worker",
|
||||
TaskID: taskID,
|
||||
WorkerID: workerID,
|
||||
}
|
||||
|
||||
// Truncate very long messages to prevent rendering issues and disk bloat
|
||||
if len(maintenanceLog.Message) > maxLogMessageSize {
|
||||
maintenanceLog.Message = maintenanceLog.Message[:maxLogMessageSize] + "... (truncated)"
|
||||
}
|
||||
|
||||
// carry structured fields if present
|
||||
if len(workerLog.Fields) > 0 {
|
||||
maintenanceLog.Fields = make(map[string]string)
|
||||
fieldCount := 0
|
||||
for k, v := range workerLog.Fields {
|
||||
if fieldCount >= maxLogFieldsCount {
|
||||
maintenanceLog.Fields["..."] = fmt.Sprintf("(%d more fields truncated)", len(workerLog.Fields)-maxLogFieldsCount)
|
||||
break
|
||||
}
|
||||
maintenanceLog.Fields[k] = v
|
||||
fieldCount++
|
||||
}
|
||||
}
|
||||
|
||||
// carry optional progress/status
|
||||
if workerLog.Progress != 0 {
|
||||
p := float64(workerLog.Progress)
|
||||
maintenanceLog.Progress = &p
|
||||
}
|
||||
if workerLog.Status != "" {
|
||||
maintenanceLog.Status = workerLog.Status
|
||||
}
|
||||
maintenanceLogs = append(maintenanceLogs, maintenanceLog)
|
||||
}
|
||||
|
||||
// Persist logs
|
||||
if s.adminServer.configPersistence != nil {
|
||||
if err := s.adminServer.configPersistence.SaveTaskExecutionLogs(taskID, maintenanceLogs); err != nil {
|
||||
glog.Errorf("Failed to persist logs for task %s: %v", taskID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleTaskLogResponse processes task log responses from workers
|
||||
func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, response *worker_pb.TaskLogResponse) {
|
||||
requestKey := fmt.Sprintf("%s:%s", response.WorkerId, response.TaskId)
|
||||
@@ -575,10 +666,13 @@ func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries i
|
||||
TaskID: taskID,
|
||||
WorkerID: workerID,
|
||||
ResponseCh: responseCh,
|
||||
Timeout: time.Now().Add(10 * time.Second),
|
||||
}
|
||||
|
||||
s.logRequestsMutex.Lock()
|
||||
if _, exists := s.pendingLogRequests[requestKey]; exists {
|
||||
s.logRequestsMutex.Unlock()
|
||||
return nil, fmt.Errorf("a log request for task %s is already in progress", taskID)
|
||||
}
|
||||
s.pendingLogRequests[requestKey] = requestContext
|
||||
s.logRequestsMutex.Unlock()
|
||||
|
||||
@@ -601,10 +695,12 @@ func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries i
|
||||
select {
|
||||
case conn.outgoing <- logRequest:
|
||||
glog.V(1).Infof("Log request sent to worker %s for task %s", workerID, taskID)
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-time.After(logSendTimeout):
|
||||
// Clean up pending request on timeout
|
||||
s.logRequestsMutex.Lock()
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
if s.pendingLogRequests[requestKey] == requestContext {
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
}
|
||||
s.logRequestsMutex.Unlock()
|
||||
return nil, fmt.Errorf("timeout sending log request to worker %s", workerID)
|
||||
}
|
||||
@@ -617,10 +713,12 @@ func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries i
|
||||
}
|
||||
glog.V(1).Infof("Received %d log entries for task %s from worker %s", len(response.LogEntries), taskID, workerID)
|
||||
return response.LogEntries, nil
|
||||
case <-time.After(10 * time.Second):
|
||||
case <-time.After(logResponseTimeout):
|
||||
// Clean up pending request on timeout
|
||||
s.logRequestsMutex.Lock()
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
if s.pendingLogRequests[requestKey] == requestContext {
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
}
|
||||
s.logRequestsMutex.Unlock()
|
||||
return nil, fmt.Errorf("timeout waiting for log response from worker %s", workerID)
|
||||
}
|
||||
@@ -684,3 +782,38 @@ func findClientAddress(ctx context.Context) string {
|
||||
}
|
||||
return pr.Addr.String()
|
||||
}
|
||||
|
||||
// activeLogFetchLoop periodically fetches logs for all in-progress tasks
|
||||
func (s *WorkerGrpcServer) activeLogFetchLoop() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stopChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if !s.running || s.adminServer == nil || s.adminServer.maintenanceManager == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get all in-progress tasks
|
||||
tasks := s.adminServer.maintenanceManager.GetTasks(maintenance.TaskStatusInProgress, "", 0)
|
||||
if len(tasks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Background log fetcher: found %d in-progress tasks", len(tasks))
|
||||
for _, task := range tasks {
|
||||
if task.WorkerID != "" {
|
||||
// Use a goroutine to avoid blocking the loop
|
||||
go func(wID, tID string) {
|
||||
if err := s.FetchAndSaveLogs(wID, tID); err != nil {
|
||||
glog.V(2).Infof("Background log fetch failed for task %s on worker %s: %v", tID, wID, err)
|
||||
}
|
||||
}(task.WorkerID, task.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user