Admin UI: Fetch task logs (#7114)
* show task details * loading tasks * task UI works * generic rendering * rendering the export link * removing placementConflicts from task parameters * remove TaskSourceLocation * remove "Server ID" column * rendering balance task source * sources and targets * fix ec task generation * move info * render timeline * simplified worker id * simplify * read task logs from worker * isValidTaskID * address comments * Update weed/worker/tasks/balance/execution.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/worker/tasks/erasure_coding/ec_task.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/worker/tasks/task_log_handler.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix shard ids * plan distributing shard id * rendering planned shards in task details * remove Conflicts * worker logs correctly * pass in dc and rack * task logging * Update weed/admin/maintenance/maintenance_queue.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * display log details * logs have fields now * sort field keys * fix link * fix collection filtering * avoid hard coded ec shard counts --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -878,6 +879,46 @@ func (as *AdminServer) GetMaintenanceTask(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, task)
|
||||
}
|
||||
|
||||
// GetMaintenanceTaskDetailAPI returns detailed task information via API
|
||||
func (as *AdminServer) GetMaintenanceTaskDetailAPI(c *gin.Context) {
|
||||
taskID := c.Param("id")
|
||||
taskDetail, err := as.GetMaintenanceTaskDetail(taskID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "Task detail not found", "details": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, taskDetail)
|
||||
}
|
||||
|
||||
// ShowMaintenanceTaskDetail renders the task detail page
|
||||
func (as *AdminServer) ShowMaintenanceTaskDetail(c *gin.Context) {
|
||||
username := c.GetString("username")
|
||||
if username == "" {
|
||||
username = "admin" // Default fallback
|
||||
}
|
||||
|
||||
taskID := c.Param("id")
|
||||
taskDetail, err := as.GetMaintenanceTaskDetail(taskID)
|
||||
if err != nil {
|
||||
c.HTML(http.StatusNotFound, "error.html", gin.H{
|
||||
"error": "Task not found",
|
||||
"details": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare data for template
|
||||
data := gin.H{
|
||||
"username": username,
|
||||
"task": taskDetail.Task,
|
||||
"taskDetail": taskDetail,
|
||||
"title": fmt.Sprintf("Task Detail - %s", taskID),
|
||||
}
|
||||
|
||||
c.HTML(http.StatusOK, "task_detail.html", data)
|
||||
}
|
||||
|
||||
// CancelMaintenanceTask cancels a pending maintenance task
|
||||
func (as *AdminServer) CancelMaintenanceTask(c *gin.Context) {
|
||||
taskID := c.Param("id")
|
||||
@@ -1041,27 +1082,65 @@ func (as *AdminServer) getMaintenanceQueueStats() (*maintenance.QueueStats, erro
|
||||
// getMaintenanceTasks returns all maintenance tasks
|
||||
func (as *AdminServer) getMaintenanceTasks() ([]*maintenance.MaintenanceTask, error) {
|
||||
if as.maintenanceManager == nil {
|
||||
return []*MaintenanceTask{}, nil
|
||||
return []*maintenance.MaintenanceTask{}, nil
|
||||
}
|
||||
return as.maintenanceManager.GetTasks(maintenance.TaskStatusPending, "", 0), nil
|
||||
|
||||
// Collect all tasks from memory across all statuses
|
||||
allTasks := []*maintenance.MaintenanceTask{}
|
||||
statuses := []maintenance.MaintenanceTaskStatus{
|
||||
maintenance.TaskStatusPending,
|
||||
maintenance.TaskStatusAssigned,
|
||||
maintenance.TaskStatusInProgress,
|
||||
maintenance.TaskStatusCompleted,
|
||||
maintenance.TaskStatusFailed,
|
||||
maintenance.TaskStatusCancelled,
|
||||
}
|
||||
|
||||
for _, status := range statuses {
|
||||
tasks := as.maintenanceManager.GetTasks(status, "", 0)
|
||||
allTasks = append(allTasks, tasks...)
|
||||
}
|
||||
|
||||
// Also load any persisted tasks that might not be in memory
|
||||
if as.configPersistence != nil {
|
||||
persistedTasks, err := as.configPersistence.LoadAllTaskStates()
|
||||
if err == nil {
|
||||
// Add any persisted tasks not already in memory
|
||||
for _, persistedTask := range persistedTasks {
|
||||
found := false
|
||||
for _, memoryTask := range allTasks {
|
||||
if memoryTask.ID == persistedTask.ID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
allTasks = append(allTasks, persistedTask)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return allTasks, nil
|
||||
}
|
||||
|
||||
// getMaintenanceTask returns a specific maintenance task
|
||||
func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, error) {
|
||||
func (as *AdminServer) getMaintenanceTask(taskID string) (*maintenance.MaintenanceTask, error) {
|
||||
if as.maintenanceManager == nil {
|
||||
return nil, fmt.Errorf("maintenance manager not initialized")
|
||||
}
|
||||
|
||||
// Search for the task across all statuses since we don't know which status it has
|
||||
statuses := []MaintenanceTaskStatus{
|
||||
TaskStatusPending,
|
||||
TaskStatusAssigned,
|
||||
TaskStatusInProgress,
|
||||
TaskStatusCompleted,
|
||||
TaskStatusFailed,
|
||||
TaskStatusCancelled,
|
||||
statuses := []maintenance.MaintenanceTaskStatus{
|
||||
maintenance.TaskStatusPending,
|
||||
maintenance.TaskStatusAssigned,
|
||||
maintenance.TaskStatusInProgress,
|
||||
maintenance.TaskStatusCompleted,
|
||||
maintenance.TaskStatusFailed,
|
||||
maintenance.TaskStatusCancelled,
|
||||
}
|
||||
|
||||
// First, search for the task in memory across all statuses
|
||||
for _, status := range statuses {
|
||||
tasks := as.maintenanceManager.GetTasks(status, "", 0) // Get all tasks with this status
|
||||
for _, task := range tasks {
|
||||
@@ -1071,9 +1150,133 @@ func (as *AdminServer) getMaintenanceTask(taskID string) (*MaintenanceTask, erro
|
||||
}
|
||||
}
|
||||
|
||||
// If not found in memory, try to load from persistent storage
|
||||
if as.configPersistence != nil {
|
||||
task, err := as.configPersistence.LoadTaskState(taskID)
|
||||
if err == nil {
|
||||
glog.V(2).Infof("Loaded task %s from persistent storage", taskID)
|
||||
return task, nil
|
||||
}
|
||||
glog.V(2).Infof("Task %s not found in persistent storage: %v", taskID, err)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("task %s not found", taskID)
|
||||
}
|
||||
|
||||
// GetMaintenanceTaskDetail returns comprehensive task details including logs and assignment history
|
||||
func (as *AdminServer) GetMaintenanceTaskDetail(taskID string) (*maintenance.TaskDetailData, error) {
|
||||
// Get basic task information
|
||||
task, err := as.getMaintenanceTask(taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create task detail structure from the loaded task
|
||||
taskDetail := &maintenance.TaskDetailData{
|
||||
Task: task,
|
||||
AssignmentHistory: task.AssignmentHistory, // Use assignment history from persisted task
|
||||
ExecutionLogs: []*maintenance.TaskExecutionLog{},
|
||||
RelatedTasks: []*maintenance.MaintenanceTask{},
|
||||
LastUpdated: time.Now(),
|
||||
}
|
||||
|
||||
if taskDetail.AssignmentHistory == nil {
|
||||
taskDetail.AssignmentHistory = []*maintenance.TaskAssignmentRecord{}
|
||||
}
|
||||
|
||||
// Get worker information if task is assigned
|
||||
if task.WorkerID != "" {
|
||||
workers := as.maintenanceManager.GetWorkers()
|
||||
for _, worker := range workers {
|
||||
if worker.ID == task.WorkerID {
|
||||
taskDetail.WorkerInfo = worker
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get execution logs from worker if task is active/completed and worker is connected
|
||||
if task.Status == maintenance.TaskStatusInProgress || task.Status == maintenance.TaskStatusCompleted {
|
||||
if as.workerGrpcServer != nil && task.WorkerID != "" {
|
||||
workerLogs, err := as.workerGrpcServer.RequestTaskLogs(task.WorkerID, taskID, 100, "")
|
||||
if err == nil && len(workerLogs) > 0 {
|
||||
// Convert worker logs to maintenance logs
|
||||
for _, workerLog := range workerLogs {
|
||||
maintenanceLog := &maintenance.TaskExecutionLog{
|
||||
Timestamp: time.Unix(workerLog.Timestamp, 0),
|
||||
Level: workerLog.Level,
|
||||
Message: workerLog.Message,
|
||||
Source: "worker",
|
||||
TaskID: taskID,
|
||||
WorkerID: task.WorkerID,
|
||||
}
|
||||
// carry structured fields if present
|
||||
if len(workerLog.Fields) > 0 {
|
||||
maintenanceLog.Fields = make(map[string]string, len(workerLog.Fields))
|
||||
for k, v := range workerLog.Fields {
|
||||
maintenanceLog.Fields[k] = v
|
||||
}
|
||||
}
|
||||
// carry optional progress/status
|
||||
if workerLog.Progress != 0 {
|
||||
p := float64(workerLog.Progress)
|
||||
maintenanceLog.Progress = &p
|
||||
}
|
||||
if workerLog.Status != "" {
|
||||
maintenanceLog.Status = workerLog.Status
|
||||
}
|
||||
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, maintenanceLog)
|
||||
}
|
||||
} else if err != nil {
|
||||
// Add a diagnostic log entry when worker logs cannot be retrieved
|
||||
diagnosticLog := &maintenance.TaskExecutionLog{
|
||||
Timestamp: time.Now(),
|
||||
Level: "WARNING",
|
||||
Message: fmt.Sprintf("Failed to retrieve worker logs: %v", err),
|
||||
Source: "admin",
|
||||
TaskID: taskID,
|
||||
WorkerID: task.WorkerID,
|
||||
}
|
||||
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
|
||||
glog.V(1).Infof("Failed to get worker logs for task %s from worker %s: %v", taskID, task.WorkerID, err)
|
||||
}
|
||||
} else {
|
||||
// Add diagnostic information when worker is not available
|
||||
reason := "worker gRPC server not available"
|
||||
if task.WorkerID == "" {
|
||||
reason = "no worker assigned to task"
|
||||
}
|
||||
diagnosticLog := &maintenance.TaskExecutionLog{
|
||||
Timestamp: time.Now(),
|
||||
Level: "INFO",
|
||||
Message: fmt.Sprintf("Worker logs not available: %s", reason),
|
||||
Source: "admin",
|
||||
TaskID: taskID,
|
||||
WorkerID: task.WorkerID,
|
||||
}
|
||||
taskDetail.ExecutionLogs = append(taskDetail.ExecutionLogs, diagnosticLog)
|
||||
}
|
||||
}
|
||||
|
||||
// Get related tasks (other tasks on same volume/server)
|
||||
if task.VolumeID != 0 || task.Server != "" {
|
||||
allTasks := as.maintenanceManager.GetTasks("", "", 50) // Get recent tasks
|
||||
for _, relatedTask := range allTasks {
|
||||
if relatedTask.ID != taskID &&
|
||||
(relatedTask.VolumeID == task.VolumeID || relatedTask.Server == task.Server) {
|
||||
taskDetail.RelatedTasks = append(taskDetail.RelatedTasks, relatedTask)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save updated task detail to disk
|
||||
if err := as.configPersistence.SaveTaskDetail(taskID, taskDetail); err != nil {
|
||||
glog.V(1).Infof("Failed to save task detail for %s: %v", taskID, err)
|
||||
}
|
||||
|
||||
return taskDetail, nil
|
||||
}
|
||||
|
||||
// getMaintenanceWorkers returns all maintenance workers
|
||||
func (as *AdminServer) getMaintenanceWorkers() ([]*maintenance.MaintenanceWorker, error) {
|
||||
if as.maintenanceManager == nil {
|
||||
@@ -1157,6 +1360,34 @@ func (as *AdminServer) getMaintenanceWorkerDetails(workerID string) (*WorkerDeta
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetWorkerLogs fetches logs from a specific worker for a task
|
||||
func (as *AdminServer) GetWorkerLogs(c *gin.Context) {
|
||||
workerID := c.Param("id")
|
||||
taskID := c.Query("taskId")
|
||||
maxEntriesStr := c.DefaultQuery("maxEntries", "100")
|
||||
logLevel := c.DefaultQuery("logLevel", "")
|
||||
|
||||
maxEntries := int32(100)
|
||||
if maxEntriesStr != "" {
|
||||
if parsed, err := strconv.ParseInt(maxEntriesStr, 10, 32); err == nil {
|
||||
maxEntries = int32(parsed)
|
||||
}
|
||||
}
|
||||
|
||||
if as.workerGrpcServer == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Worker gRPC server not available"})
|
||||
return
|
||||
}
|
||||
|
||||
logs, err := as.workerGrpcServer.RequestTaskLogs(workerID, taskID, maxEntries, logLevel)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("Failed to get logs from worker: %v", err)})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"worker_id": workerID, "task_id": taskID, "logs": logs, "count": len(logs)})
|
||||
}
|
||||
|
||||
// getMaintenanceStats returns maintenance statistics
|
||||
func (as *AdminServer) getMaintenanceStats() (*MaintenanceStats, error) {
|
||||
if as.maintenanceManager == nil {
|
||||
@@ -1376,6 +1607,20 @@ func (s *AdminServer) GetWorkerGrpcServer() *WorkerGrpcServer {
|
||||
// InitMaintenanceManager initializes the maintenance manager
|
||||
func (s *AdminServer) InitMaintenanceManager(config *maintenance.MaintenanceConfig) {
|
||||
s.maintenanceManager = maintenance.NewMaintenanceManager(s, config)
|
||||
|
||||
// Set up task persistence if config persistence is available
|
||||
if s.configPersistence != nil {
|
||||
queue := s.maintenanceManager.GetQueue()
|
||||
if queue != nil {
|
||||
queue.SetPersistence(s.configPersistence)
|
||||
|
||||
// Load tasks from persistence on startup
|
||||
if err := queue.LoadTasksFromPersistence(); err != nil {
|
||||
glog.Errorf("Failed to load tasks from persistence: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Maintenance manager initialized (enabled: %v)", config.Enabled)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package dash
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/maintenance"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
|
||||
@@ -33,6 +37,12 @@ const (
|
||||
BalanceTaskConfigJSONFile = "task_balance.json"
|
||||
ReplicationTaskConfigJSONFile = "task_replication.json"
|
||||
|
||||
// Task persistence subdirectories and settings
|
||||
TasksSubdir = "tasks"
|
||||
TaskDetailsSubdir = "task_details"
|
||||
TaskLogsSubdir = "task_logs"
|
||||
MaxCompletedTasks = 10 // Only keep last 10 completed tasks
|
||||
|
||||
ConfigDirPermissions = 0755
|
||||
ConfigFilePermissions = 0644
|
||||
)
|
||||
@@ -45,6 +55,35 @@ type (
|
||||
ReplicationTaskConfig = worker_pb.ReplicationTaskConfig
|
||||
)
|
||||
|
||||
// isValidTaskID validates that a task ID is safe for use in file paths
|
||||
// This prevents path traversal attacks by ensuring the task ID doesn't contain
|
||||
// path separators or parent directory references
|
||||
func isValidTaskID(taskID string) bool {
|
||||
if taskID == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
// Reject task IDs with leading or trailing whitespace
|
||||
if strings.TrimSpace(taskID) != taskID {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check for path traversal patterns
|
||||
if strings.Contains(taskID, "/") ||
|
||||
strings.Contains(taskID, "\\") ||
|
||||
strings.Contains(taskID, "..") ||
|
||||
strings.Contains(taskID, ":") {
|
||||
return false
|
||||
}
|
||||
|
||||
// Additional safety: ensure it's not just dots or empty after trim
|
||||
if taskID == "." || taskID == ".." {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ConfigPersistence handles saving and loading configuration files
|
||||
type ConfigPersistence struct {
|
||||
dataDir string
|
||||
@@ -688,3 +727,509 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
|
||||
glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
|
||||
return policy
|
||||
}
|
||||
|
||||
// SaveTaskDetail saves detailed task information to disk
|
||||
func (cp *ConfigPersistence) SaveTaskDetail(taskID string, detail *maintenance.TaskDetailData) error {
|
||||
if cp.dataDir == "" {
|
||||
return fmt.Errorf("no data directory specified, cannot save task detail")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir)
|
||||
if err := os.MkdirAll(taskDetailDir, ConfigDirPermissions); err != nil {
|
||||
return fmt.Errorf("failed to create task details directory: %w", err)
|
||||
}
|
||||
|
||||
// Save task detail as JSON for easy reading and debugging
|
||||
taskDetailPath := filepath.Join(taskDetailDir, fmt.Sprintf("%s.json", taskID))
|
||||
jsonData, err := json.MarshalIndent(detail, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task detail to JSON: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(taskDetailPath, jsonData, ConfigFilePermissions); err != nil {
|
||||
return fmt.Errorf("failed to write task detail file: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Saved task detail for task %s to %s", taskID, taskDetailPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadTaskDetail loads detailed task information from disk
|
||||
func (cp *ConfigPersistence) LoadTaskDetail(taskID string) (*maintenance.TaskDetailData, error) {
|
||||
if cp.dataDir == "" {
|
||||
return nil, fmt.Errorf("no data directory specified, cannot load task detail")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID))
|
||||
if _, err := os.Stat(taskDetailPath); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("task detail file not found: %s", taskID)
|
||||
}
|
||||
|
||||
jsonData, err := os.ReadFile(taskDetailPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read task detail file: %w", err)
|
||||
}
|
||||
|
||||
var detail maintenance.TaskDetailData
|
||||
if err := json.Unmarshal(jsonData, &detail); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal task detail JSON: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Loaded task detail for task %s from %s", taskID, taskDetailPath)
|
||||
return &detail, nil
|
||||
}
|
||||
|
||||
// SaveTaskExecutionLogs saves execution logs for a task
|
||||
func (cp *ConfigPersistence) SaveTaskExecutionLogs(taskID string, logs []*maintenance.TaskExecutionLog) error {
|
||||
if cp.dataDir == "" {
|
||||
return fmt.Errorf("no data directory specified, cannot save task logs")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
taskLogsDir := filepath.Join(cp.dataDir, TaskLogsSubdir)
|
||||
if err := os.MkdirAll(taskLogsDir, ConfigDirPermissions); err != nil {
|
||||
return fmt.Errorf("failed to create task logs directory: %w", err)
|
||||
}
|
||||
|
||||
// Save logs as JSON for easy reading
|
||||
taskLogsPath := filepath.Join(taskLogsDir, fmt.Sprintf("%s.json", taskID))
|
||||
logsData := struct {
|
||||
TaskID string `json:"task_id"`
|
||||
Logs []*maintenance.TaskExecutionLog `json:"logs"`
|
||||
}{
|
||||
TaskID: taskID,
|
||||
Logs: logs,
|
||||
}
|
||||
jsonData, err := json.MarshalIndent(logsData, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task logs to JSON: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(taskLogsPath, jsonData, ConfigFilePermissions); err != nil {
|
||||
return fmt.Errorf("failed to write task logs file: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Saved %d execution logs for task %s to %s", len(logs), taskID, taskLogsPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadTaskExecutionLogs loads execution logs for a task
|
||||
func (cp *ConfigPersistence) LoadTaskExecutionLogs(taskID string) ([]*maintenance.TaskExecutionLog, error) {
|
||||
if cp.dataDir == "" {
|
||||
return nil, fmt.Errorf("no data directory specified, cannot load task logs")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID))
|
||||
if _, err := os.Stat(taskLogsPath); os.IsNotExist(err) {
|
||||
// Return empty slice if logs don't exist yet
|
||||
return []*maintenance.TaskExecutionLog{}, nil
|
||||
}
|
||||
|
||||
jsonData, err := os.ReadFile(taskLogsPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read task logs file: %w", err)
|
||||
}
|
||||
|
||||
var logsData struct {
|
||||
TaskID string `json:"task_id"`
|
||||
Logs []*maintenance.TaskExecutionLog `json:"logs"`
|
||||
}
|
||||
if err := json.Unmarshal(jsonData, &logsData); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal task logs JSON: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Loaded %d execution logs for task %s from %s", len(logsData.Logs), taskID, taskLogsPath)
|
||||
return logsData.Logs, nil
|
||||
}
|
||||
|
||||
// DeleteTaskDetail removes task detail and logs from disk
|
||||
func (cp *ConfigPersistence) DeleteTaskDetail(taskID string) error {
|
||||
if cp.dataDir == "" {
|
||||
return fmt.Errorf("no data directory specified, cannot delete task detail")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
// Delete task detail file
|
||||
taskDetailPath := filepath.Join(cp.dataDir, TaskDetailsSubdir, fmt.Sprintf("%s.json", taskID))
|
||||
if err := os.Remove(taskDetailPath); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to delete task detail file: %w", err)
|
||||
}
|
||||
|
||||
// Delete task logs file
|
||||
taskLogsPath := filepath.Join(cp.dataDir, TaskLogsSubdir, fmt.Sprintf("%s.json", taskID))
|
||||
if err := os.Remove(taskLogsPath); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to delete task logs file: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Deleted task detail and logs for task %s", taskID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListTaskDetails returns a list of all task IDs that have stored details
|
||||
func (cp *ConfigPersistence) ListTaskDetails() ([]string, error) {
|
||||
if cp.dataDir == "" {
|
||||
return nil, fmt.Errorf("no data directory specified, cannot list task details")
|
||||
}
|
||||
|
||||
taskDetailDir := filepath.Join(cp.dataDir, TaskDetailsSubdir)
|
||||
if _, err := os.Stat(taskDetailDir); os.IsNotExist(err) {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(taskDetailDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read task details directory: %w", err)
|
||||
}
|
||||
|
||||
var taskIDs []string
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() && filepath.Ext(entry.Name()) == ".json" {
|
||||
taskID := entry.Name()[:len(entry.Name())-5] // Remove .json extension
|
||||
taskIDs = append(taskIDs, taskID)
|
||||
}
|
||||
}
|
||||
|
||||
return taskIDs, nil
|
||||
}
|
||||
|
||||
// CleanupCompletedTasks removes old completed tasks beyond the retention limit
|
||||
func (cp *ConfigPersistence) CleanupCompletedTasks() error {
|
||||
if cp.dataDir == "" {
|
||||
return fmt.Errorf("no data directory specified, cannot cleanup completed tasks")
|
||||
}
|
||||
|
||||
tasksDir := filepath.Join(cp.dataDir, TasksSubdir)
|
||||
if _, err := os.Stat(tasksDir); os.IsNotExist(err) {
|
||||
return nil // No tasks directory, nothing to cleanup
|
||||
}
|
||||
|
||||
// Load all tasks and find completed/failed ones
|
||||
allTasks, err := cp.LoadAllTaskStates()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load tasks for cleanup: %w", err)
|
||||
}
|
||||
|
||||
// Filter completed and failed tasks, sort by completion time
|
||||
var completedTasks []*maintenance.MaintenanceTask
|
||||
for _, task := range allTasks {
|
||||
if (task.Status == maintenance.TaskStatusCompleted || task.Status == maintenance.TaskStatusFailed) && task.CompletedAt != nil {
|
||||
completedTasks = append(completedTasks, task)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by completion time (most recent first)
|
||||
sort.Slice(completedTasks, func(i, j int) bool {
|
||||
return completedTasks[i].CompletedAt.After(*completedTasks[j].CompletedAt)
|
||||
})
|
||||
|
||||
// Keep only the most recent MaxCompletedTasks, delete the rest
|
||||
if len(completedTasks) > MaxCompletedTasks {
|
||||
tasksToDelete := completedTasks[MaxCompletedTasks:]
|
||||
for _, task := range tasksToDelete {
|
||||
if err := cp.DeleteTaskState(task.ID); err != nil {
|
||||
glog.Warningf("Failed to delete old completed task %s: %v", task.ID, err)
|
||||
} else {
|
||||
glog.V(2).Infof("Cleaned up old completed task %s (completed: %v)", task.ID, task.CompletedAt)
|
||||
}
|
||||
}
|
||||
glog.V(1).Infof("Cleaned up %d old completed tasks (keeping %d most recent)", len(tasksToDelete), MaxCompletedTasks)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveTaskState saves a task state to protobuf file
|
||||
func (cp *ConfigPersistence) SaveTaskState(task *maintenance.MaintenanceTask) error {
|
||||
if cp.dataDir == "" {
|
||||
return fmt.Errorf("no data directory specified, cannot save task state")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(task.ID) {
|
||||
return fmt.Errorf("invalid task ID: %q contains illegal path characters", task.ID)
|
||||
}
|
||||
|
||||
tasksDir := filepath.Join(cp.dataDir, TasksSubdir)
|
||||
if err := os.MkdirAll(tasksDir, ConfigDirPermissions); err != nil {
|
||||
return fmt.Errorf("failed to create tasks directory: %w", err)
|
||||
}
|
||||
|
||||
taskFilePath := filepath.Join(tasksDir, fmt.Sprintf("%s.pb", task.ID))
|
||||
|
||||
// Convert task to protobuf
|
||||
pbTask := cp.maintenanceTaskToProtobuf(task)
|
||||
taskStateFile := &worker_pb.TaskStateFile{
|
||||
Task: pbTask,
|
||||
LastUpdated: time.Now().Unix(),
|
||||
AdminVersion: "unknown", // TODO: add version info
|
||||
}
|
||||
|
||||
pbData, err := proto.Marshal(taskStateFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal task state protobuf: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(taskFilePath, pbData, ConfigFilePermissions); err != nil {
|
||||
return fmt.Errorf("failed to write task state file: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Saved task state for task %s to %s", task.ID, taskFilePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadTaskState loads a task state from protobuf file
|
||||
func (cp *ConfigPersistence) LoadTaskState(taskID string) (*maintenance.MaintenanceTask, error) {
|
||||
if cp.dataDir == "" {
|
||||
return nil, fmt.Errorf("no data directory specified, cannot load task state")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return nil, fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID))
|
||||
if _, err := os.Stat(taskFilePath); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("task state file not found: %s", taskID)
|
||||
}
|
||||
|
||||
pbData, err := os.ReadFile(taskFilePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read task state file: %w", err)
|
||||
}
|
||||
|
||||
var taskStateFile worker_pb.TaskStateFile
|
||||
if err := proto.Unmarshal(pbData, &taskStateFile); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal task state protobuf: %w", err)
|
||||
}
|
||||
|
||||
// Convert protobuf to maintenance task
|
||||
task := cp.protobufToMaintenanceTask(taskStateFile.Task)
|
||||
|
||||
glog.V(2).Infof("Loaded task state for task %s from %s", taskID, taskFilePath)
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// LoadAllTaskStates loads all task states from disk
|
||||
func (cp *ConfigPersistence) LoadAllTaskStates() ([]*maintenance.MaintenanceTask, error) {
|
||||
if cp.dataDir == "" {
|
||||
return []*maintenance.MaintenanceTask{}, nil
|
||||
}
|
||||
|
||||
tasksDir := filepath.Join(cp.dataDir, TasksSubdir)
|
||||
if _, err := os.Stat(tasksDir); os.IsNotExist(err) {
|
||||
return []*maintenance.MaintenanceTask{}, nil
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(tasksDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read tasks directory: %w", err)
|
||||
}
|
||||
|
||||
var tasks []*maintenance.MaintenanceTask
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" {
|
||||
taskID := entry.Name()[:len(entry.Name())-3] // Remove .pb extension
|
||||
task, err := cp.LoadTaskState(taskID)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to load task state for %s: %v", taskID, err)
|
||||
continue
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Loaded %d task states from disk", len(tasks))
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// DeleteTaskState removes a task state file from disk
|
||||
func (cp *ConfigPersistence) DeleteTaskState(taskID string) error {
|
||||
if cp.dataDir == "" {
|
||||
return fmt.Errorf("no data directory specified, cannot delete task state")
|
||||
}
|
||||
|
||||
// Validate task ID to prevent path traversal
|
||||
if !isValidTaskID(taskID) {
|
||||
return fmt.Errorf("invalid task ID: %q contains illegal path characters", taskID)
|
||||
}
|
||||
|
||||
taskFilePath := filepath.Join(cp.dataDir, TasksSubdir, fmt.Sprintf("%s.pb", taskID))
|
||||
if err := os.Remove(taskFilePath); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to delete task state file: %w", err)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Deleted task state for task %s", taskID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// maintenanceTaskToProtobuf converts a MaintenanceTask to protobuf format
|
||||
func (cp *ConfigPersistence) maintenanceTaskToProtobuf(task *maintenance.MaintenanceTask) *worker_pb.MaintenanceTaskData {
|
||||
pbTask := &worker_pb.MaintenanceTaskData{
|
||||
Id: task.ID,
|
||||
Type: string(task.Type),
|
||||
Priority: cp.priorityToString(task.Priority),
|
||||
Status: string(task.Status),
|
||||
VolumeId: task.VolumeID,
|
||||
Server: task.Server,
|
||||
Collection: task.Collection,
|
||||
Reason: task.Reason,
|
||||
CreatedAt: task.CreatedAt.Unix(),
|
||||
ScheduledAt: task.ScheduledAt.Unix(),
|
||||
WorkerId: task.WorkerID,
|
||||
Error: task.Error,
|
||||
Progress: task.Progress,
|
||||
RetryCount: int32(task.RetryCount),
|
||||
MaxRetries: int32(task.MaxRetries),
|
||||
CreatedBy: task.CreatedBy,
|
||||
CreationContext: task.CreationContext,
|
||||
DetailedReason: task.DetailedReason,
|
||||
Tags: task.Tags,
|
||||
}
|
||||
|
||||
// Handle optional timestamps
|
||||
if task.StartedAt != nil {
|
||||
pbTask.StartedAt = task.StartedAt.Unix()
|
||||
}
|
||||
if task.CompletedAt != nil {
|
||||
pbTask.CompletedAt = task.CompletedAt.Unix()
|
||||
}
|
||||
|
||||
// Convert assignment history
|
||||
if task.AssignmentHistory != nil {
|
||||
for _, record := range task.AssignmentHistory {
|
||||
pbRecord := &worker_pb.TaskAssignmentRecord{
|
||||
WorkerId: record.WorkerID,
|
||||
WorkerAddress: record.WorkerAddress,
|
||||
AssignedAt: record.AssignedAt.Unix(),
|
||||
Reason: record.Reason,
|
||||
}
|
||||
if record.UnassignedAt != nil {
|
||||
pbRecord.UnassignedAt = record.UnassignedAt.Unix()
|
||||
}
|
||||
pbTask.AssignmentHistory = append(pbTask.AssignmentHistory, pbRecord)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert typed parameters if available
|
||||
if task.TypedParams != nil {
|
||||
pbTask.TypedParams = task.TypedParams
|
||||
}
|
||||
|
||||
return pbTask
|
||||
}
|
||||
|
||||
// protobufToMaintenanceTask converts protobuf format to MaintenanceTask
|
||||
func (cp *ConfigPersistence) protobufToMaintenanceTask(pbTask *worker_pb.MaintenanceTaskData) *maintenance.MaintenanceTask {
|
||||
task := &maintenance.MaintenanceTask{
|
||||
ID: pbTask.Id,
|
||||
Type: maintenance.MaintenanceTaskType(pbTask.Type),
|
||||
Priority: cp.stringToPriority(pbTask.Priority),
|
||||
Status: maintenance.MaintenanceTaskStatus(pbTask.Status),
|
||||
VolumeID: pbTask.VolumeId,
|
||||
Server: pbTask.Server,
|
||||
Collection: pbTask.Collection,
|
||||
Reason: pbTask.Reason,
|
||||
CreatedAt: time.Unix(pbTask.CreatedAt, 0),
|
||||
ScheduledAt: time.Unix(pbTask.ScheduledAt, 0),
|
||||
WorkerID: pbTask.WorkerId,
|
||||
Error: pbTask.Error,
|
||||
Progress: pbTask.Progress,
|
||||
RetryCount: int(pbTask.RetryCount),
|
||||
MaxRetries: int(pbTask.MaxRetries),
|
||||
CreatedBy: pbTask.CreatedBy,
|
||||
CreationContext: pbTask.CreationContext,
|
||||
DetailedReason: pbTask.DetailedReason,
|
||||
Tags: pbTask.Tags,
|
||||
}
|
||||
|
||||
// Handle optional timestamps
|
||||
if pbTask.StartedAt > 0 {
|
||||
startTime := time.Unix(pbTask.StartedAt, 0)
|
||||
task.StartedAt = &startTime
|
||||
}
|
||||
if pbTask.CompletedAt > 0 {
|
||||
completedTime := time.Unix(pbTask.CompletedAt, 0)
|
||||
task.CompletedAt = &completedTime
|
||||
}
|
||||
|
||||
// Convert assignment history
|
||||
if pbTask.AssignmentHistory != nil {
|
||||
task.AssignmentHistory = make([]*maintenance.TaskAssignmentRecord, 0, len(pbTask.AssignmentHistory))
|
||||
for _, pbRecord := range pbTask.AssignmentHistory {
|
||||
record := &maintenance.TaskAssignmentRecord{
|
||||
WorkerID: pbRecord.WorkerId,
|
||||
WorkerAddress: pbRecord.WorkerAddress,
|
||||
AssignedAt: time.Unix(pbRecord.AssignedAt, 0),
|
||||
Reason: pbRecord.Reason,
|
||||
}
|
||||
if pbRecord.UnassignedAt > 0 {
|
||||
unassignedTime := time.Unix(pbRecord.UnassignedAt, 0)
|
||||
record.UnassignedAt = &unassignedTime
|
||||
}
|
||||
task.AssignmentHistory = append(task.AssignmentHistory, record)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert typed parameters if available
|
||||
if pbTask.TypedParams != nil {
|
||||
task.TypedParams = pbTask.TypedParams
|
||||
}
|
||||
|
||||
return task
|
||||
}
|
||||
|
||||
// priorityToString converts MaintenanceTaskPriority to string for protobuf storage
|
||||
func (cp *ConfigPersistence) priorityToString(priority maintenance.MaintenanceTaskPriority) string {
|
||||
switch priority {
|
||||
case maintenance.PriorityLow:
|
||||
return "low"
|
||||
case maintenance.PriorityNormal:
|
||||
return "normal"
|
||||
case maintenance.PriorityHigh:
|
||||
return "high"
|
||||
case maintenance.PriorityCritical:
|
||||
return "critical"
|
||||
default:
|
||||
return "normal"
|
||||
}
|
||||
}
|
||||
|
||||
// stringToPriority converts string from protobuf to MaintenanceTaskPriority
|
||||
func (cp *ConfigPersistence) stringToPriority(priorityStr string) maintenance.MaintenanceTaskPriority {
|
||||
switch priorityStr {
|
||||
case "low":
|
||||
return maintenance.PriorityLow
|
||||
case "normal":
|
||||
return maintenance.PriorityNormal
|
||||
case "high":
|
||||
return maintenance.PriorityHigh
|
||||
case "critical":
|
||||
return maintenance.PriorityCritical
|
||||
default:
|
||||
return maintenance.PriorityNormal
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,17 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
)
|
||||
|
||||
// matchesCollection checks if a volume/EC volume collection matches the filter collection.
|
||||
// Handles the special case where empty collection ("") represents the "default" collection.
|
||||
func matchesCollection(volumeCollection, filterCollection string) bool {
|
||||
// Both empty means default collection matches default filter
|
||||
if volumeCollection == "" && filterCollection == "" {
|
||||
return true
|
||||
}
|
||||
// Direct string match for named collections
|
||||
return volumeCollection == filterCollection
|
||||
}
|
||||
|
||||
// GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering
|
||||
func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) {
|
||||
// Set defaults
|
||||
@@ -403,7 +414,7 @@ func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string,
|
||||
var ecVolumes []EcVolumeWithShards
|
||||
for _, volume := range volumeData {
|
||||
// Filter by collection if specified
|
||||
if collection == "" || volume.Collection == collection {
|
||||
if collection == "" || matchesCollection(volume.Collection, collection) {
|
||||
ecVolumes = append(ecVolumes, *volume)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,13 +83,7 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
|
||||
var filteredEcTotalSize int64
|
||||
|
||||
for _, volume := range volumes {
|
||||
// Handle "default" collection filtering for empty collections
|
||||
volumeCollection := volume.Collection
|
||||
if volumeCollection == "" {
|
||||
volumeCollection = "default"
|
||||
}
|
||||
|
||||
if volumeCollection == collection {
|
||||
if matchesCollection(volume.Collection, collection) {
|
||||
filteredVolumes = append(filteredVolumes, volume)
|
||||
filteredTotalSize += int64(volume.Size)
|
||||
}
|
||||
@@ -103,13 +97,7 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
|
||||
for _, node := range rack.DataNodeInfos {
|
||||
for _, diskInfo := range node.DiskInfos {
|
||||
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
||||
// Handle "default" collection filtering for empty collections
|
||||
ecCollection := ecShardInfo.Collection
|
||||
if ecCollection == "" {
|
||||
ecCollection = "default"
|
||||
}
|
||||
|
||||
if ecCollection == collection {
|
||||
if matchesCollection(ecShardInfo.Collection, collection) {
|
||||
// Add all shard sizes for this EC volume
|
||||
for _, shardSize := range ecShardInfo.ShardSizes {
|
||||
filteredEcTotalSize += shardSize
|
||||
@@ -500,7 +488,7 @@ func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, erro
|
||||
ecInfo.EcIndexBits |= ecShardInfo.EcIndexBits
|
||||
|
||||
// Collect shard sizes from this disk
|
||||
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
|
||||
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
|
||||
shardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
|
||||
if size, found := erasure_coding.GetShardSize(ecShardInfo, shardId); found {
|
||||
allShardSizes[shardId] = size
|
||||
|
||||
@@ -26,6 +26,10 @@ type WorkerGrpcServer struct {
|
||||
connections map[string]*WorkerConnection
|
||||
connMutex sync.RWMutex
|
||||
|
||||
// Log request correlation
|
||||
pendingLogRequests map[string]*LogRequestContext
|
||||
logRequestsMutex sync.RWMutex
|
||||
|
||||
// gRPC server
|
||||
grpcServer *grpc.Server
|
||||
listener net.Listener
|
||||
@@ -33,6 +37,14 @@ type WorkerGrpcServer struct {
|
||||
stopChan chan struct{}
|
||||
}
|
||||
|
||||
// LogRequestContext tracks pending log requests
|
||||
type LogRequestContext struct {
|
||||
TaskID string
|
||||
WorkerID string
|
||||
ResponseCh chan *worker_pb.TaskLogResponse
|
||||
Timeout time.Time
|
||||
}
|
||||
|
||||
// WorkerConnection represents an active worker connection
|
||||
type WorkerConnection struct {
|
||||
workerID string
|
||||
@@ -49,9 +61,10 @@ type WorkerConnection struct {
|
||||
// NewWorkerGrpcServer creates a new gRPC server for worker connections
|
||||
func NewWorkerGrpcServer(adminServer *AdminServer) *WorkerGrpcServer {
|
||||
return &WorkerGrpcServer{
|
||||
adminServer: adminServer,
|
||||
connections: make(map[string]*WorkerConnection),
|
||||
stopChan: make(chan struct{}),
|
||||
adminServer: adminServer,
|
||||
connections: make(map[string]*WorkerConnection),
|
||||
pendingLogRequests: make(map[string]*LogRequestContext),
|
||||
stopChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,6 +277,9 @@ func (s *WorkerGrpcServer) handleWorkerMessage(conn *WorkerConnection, msg *work
|
||||
case *worker_pb.WorkerMessage_TaskComplete:
|
||||
s.handleTaskCompletion(conn, m.TaskComplete)
|
||||
|
||||
case *worker_pb.WorkerMessage_TaskLogResponse:
|
||||
s.handleTaskLogResponse(conn, m.TaskLogResponse)
|
||||
|
||||
case *worker_pb.WorkerMessage_Shutdown:
|
||||
glog.Infof("Worker %s shutting down: %s", workerID, m.Shutdown.Reason)
|
||||
s.unregisterWorker(workerID)
|
||||
@@ -341,8 +357,13 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo
|
||||
// Create basic params if none exist
|
||||
taskParams = &worker_pb.TaskParams{
|
||||
VolumeId: task.VolumeID,
|
||||
Server: task.Server,
|
||||
Collection: task.Collection,
|
||||
Sources: []*worker_pb.TaskSource{
|
||||
{
|
||||
Node: task.Server,
|
||||
VolumeId: task.VolumeID,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,6 +417,35 @@ func (s *WorkerGrpcServer) handleTaskCompletion(conn *WorkerConnection, completi
|
||||
}
|
||||
}
|
||||
|
||||
// handleTaskLogResponse processes task log responses from workers
|
||||
func (s *WorkerGrpcServer) handleTaskLogResponse(conn *WorkerConnection, response *worker_pb.TaskLogResponse) {
|
||||
requestKey := fmt.Sprintf("%s:%s", response.WorkerId, response.TaskId)
|
||||
|
||||
s.logRequestsMutex.RLock()
|
||||
requestContext, exists := s.pendingLogRequests[requestKey]
|
||||
s.logRequestsMutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
glog.Warningf("Received unexpected log response for task %s from worker %s", response.TaskId, response.WorkerId)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Received log response for task %s from worker %s: %d entries", response.TaskId, response.WorkerId, len(response.LogEntries))
|
||||
|
||||
// Send response to waiting channel
|
||||
select {
|
||||
case requestContext.ResponseCh <- response:
|
||||
// Response delivered successfully
|
||||
case <-time.After(time.Second):
|
||||
glog.Warningf("Failed to deliver log response for task %s from worker %s: timeout", response.TaskId, response.WorkerId)
|
||||
}
|
||||
|
||||
// Clean up the pending request
|
||||
s.logRequestsMutex.Lock()
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
s.logRequestsMutex.Unlock()
|
||||
}
|
||||
|
||||
// unregisterWorker removes a worker connection
|
||||
func (s *WorkerGrpcServer) unregisterWorker(workerID string) {
|
||||
s.connMutex.Lock()
|
||||
@@ -453,6 +503,112 @@ func (s *WorkerGrpcServer) GetConnectedWorkers() []string {
|
||||
return workers
|
||||
}
|
||||
|
||||
// RequestTaskLogs requests execution logs from a worker for a specific task
|
||||
func (s *WorkerGrpcServer) RequestTaskLogs(workerID, taskID string, maxEntries int32, logLevel string) ([]*worker_pb.TaskLogEntry, error) {
|
||||
s.connMutex.RLock()
|
||||
conn, exists := s.connections[workerID]
|
||||
s.connMutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("worker %s is not connected", workerID)
|
||||
}
|
||||
|
||||
// Create response channel for this request
|
||||
responseCh := make(chan *worker_pb.TaskLogResponse, 1)
|
||||
requestKey := fmt.Sprintf("%s:%s", workerID, taskID)
|
||||
|
||||
// Register pending request
|
||||
requestContext := &LogRequestContext{
|
||||
TaskID: taskID,
|
||||
WorkerID: workerID,
|
||||
ResponseCh: responseCh,
|
||||
Timeout: time.Now().Add(10 * time.Second),
|
||||
}
|
||||
|
||||
s.logRequestsMutex.Lock()
|
||||
s.pendingLogRequests[requestKey] = requestContext
|
||||
s.logRequestsMutex.Unlock()
|
||||
|
||||
// Create log request message
|
||||
logRequest := &worker_pb.AdminMessage{
|
||||
AdminId: "admin-server",
|
||||
Timestamp: time.Now().Unix(),
|
||||
Message: &worker_pb.AdminMessage_TaskLogRequest{
|
||||
TaskLogRequest: &worker_pb.TaskLogRequest{
|
||||
TaskId: taskID,
|
||||
WorkerId: workerID,
|
||||
IncludeMetadata: true,
|
||||
MaxEntries: maxEntries,
|
||||
LogLevel: logLevel,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Send the request through the worker's outgoing channel
|
||||
select {
|
||||
case conn.outgoing <- logRequest:
|
||||
glog.V(1).Infof("Log request sent to worker %s for task %s", workerID, taskID)
|
||||
case <-time.After(5 * time.Second):
|
||||
// Clean up pending request on timeout
|
||||
s.logRequestsMutex.Lock()
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
s.logRequestsMutex.Unlock()
|
||||
return nil, fmt.Errorf("timeout sending log request to worker %s", workerID)
|
||||
}
|
||||
|
||||
// Wait for response
|
||||
select {
|
||||
case response := <-responseCh:
|
||||
if !response.Success {
|
||||
return nil, fmt.Errorf("worker log request failed: %s", response.ErrorMessage)
|
||||
}
|
||||
glog.V(1).Infof("Received %d log entries for task %s from worker %s", len(response.LogEntries), taskID, workerID)
|
||||
return response.LogEntries, nil
|
||||
case <-time.After(10 * time.Second):
|
||||
// Clean up pending request on timeout
|
||||
s.logRequestsMutex.Lock()
|
||||
delete(s.pendingLogRequests, requestKey)
|
||||
s.logRequestsMutex.Unlock()
|
||||
return nil, fmt.Errorf("timeout waiting for log response from worker %s", workerID)
|
||||
}
|
||||
}
|
||||
|
||||
// RequestTaskLogsFromAllWorkers requests logs for a task from all connected workers
|
||||
func (s *WorkerGrpcServer) RequestTaskLogsFromAllWorkers(taskID string, maxEntries int32, logLevel string) (map[string][]*worker_pb.TaskLogEntry, error) {
|
||||
s.connMutex.RLock()
|
||||
workerIDs := make([]string, 0, len(s.connections))
|
||||
for workerID := range s.connections {
|
||||
workerIDs = append(workerIDs, workerID)
|
||||
}
|
||||
s.connMutex.RUnlock()
|
||||
|
||||
results := make(map[string][]*worker_pb.TaskLogEntry)
|
||||
|
||||
for _, workerID := range workerIDs {
|
||||
logs, err := s.RequestTaskLogs(workerID, taskID, maxEntries, logLevel)
|
||||
if err != nil {
|
||||
glog.V(1).Infof("Failed to get logs from worker %s for task %s: %v", workerID, taskID, err)
|
||||
// Store empty result with error information for debugging
|
||||
results[workerID+"_error"] = []*worker_pb.TaskLogEntry{
|
||||
{
|
||||
Timestamp: time.Now().Unix(),
|
||||
Level: "ERROR",
|
||||
Message: fmt.Sprintf("Failed to retrieve logs from worker %s: %v", workerID, err),
|
||||
Fields: map[string]string{"source": "admin"},
|
||||
},
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(logs) > 0 {
|
||||
results[workerID] = logs
|
||||
} else {
|
||||
glog.V(2).Infof("No logs found for task %s on worker %s", taskID, workerID)
|
||||
}
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// convertTaskParameters converts task parameters to protobuf format
|
||||
func convertTaskParameters(params map[string]interface{}) map[string]string {
|
||||
result := make(map[string]string)
|
||||
|
||||
Reference in New Issue
Block a user