* chore: remove unreachable dead code across the codebase Remove ~50,000 lines of unreachable code identified by static analysis. Major removals: - weed/filer/redis_lua: entire unused Redis Lua filer store implementation - weed/wdclient/net2, resource_pool: unused connection/resource pool packages - weed/plugin/worker/lifecycle: unused lifecycle plugin worker - weed/s3api: unused S3 policy templates, presigned URL IAM, streaming copy, multipart IAM, key rotation, and various SSE helper functions - weed/mq/kafka: unused partition mapping, compression, schema, and protocol functions - weed/mq/offset: unused SQL storage and migration code - weed/worker: unused registry, task, and monitoring functions - weed/query: unused SQL engine, parquet scanner, and type functions - weed/shell: unused EC proportional rebalance functions - weed/storage/erasure_coding/distribution: unused distribution analysis functions - Individual unreachable functions removed from 150+ files across admin, credential, filer, iam, kms, mount, mq, operation, pb, s3api, server, shell, storage, topology, and util packages * fix(s3): reset shared memory store in IAM test to prevent flaky failure TestLoadIAMManagerFromConfig_EmptyConfigWithFallbackKey was flaky because the MemoryStore credential backend is a singleton registered via init(). Earlier tests that create anonymous identities pollute the shared store, causing LookupAnonymous() to unexpectedly return true. Fix by calling Reset() on the memory store before the test runs. * style: run gofmt on changed files * fix: restore KMS functions used by integration tests * fix(plugin): prevent panic on send to closed worker session channel The Plugin.sendToWorker method could panic with "send on closed channel" when a worker disconnected while a message was being sent. The race was between streamSession.close() closing the outgoing channel and sendToWorker writing to it concurrently. Add a done channel to streamSession that is closed before the outgoing channel, and check it in sendToWorker's select to safely detect closed sessions without panicking.
226 lines
6.5 KiB
Go
226 lines
6.5 KiB
Go
package tasks
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
)
|
|
|
|
// TaskLogHandler handles task log requests from admin server
|
|
type TaskLogHandler struct {
|
|
baseLogDir string
|
|
}
|
|
|
|
// NewTaskLogHandler creates a new task log handler
|
|
func NewTaskLogHandler(baseLogDir string) *TaskLogHandler {
|
|
if baseLogDir == "" {
|
|
baseLogDir = "/tmp/seaweedfs/task_logs"
|
|
}
|
|
// Best-effort ensure the base directory exists so reads don't fail due to missing dir
|
|
if err := os.MkdirAll(baseLogDir, 0755); err != nil {
|
|
glog.Warningf("Failed to create base task log directory %s: %v", baseLogDir, err)
|
|
}
|
|
return &TaskLogHandler{
|
|
baseLogDir: baseLogDir,
|
|
}
|
|
}
|
|
|
|
// HandleLogRequest processes a task log request and returns the response
|
|
func (h *TaskLogHandler) HandleLogRequest(request *worker_pb.TaskLogRequest) *worker_pb.TaskLogResponse {
|
|
response := &worker_pb.TaskLogResponse{
|
|
TaskId: request.TaskId,
|
|
WorkerId: request.WorkerId,
|
|
Success: false,
|
|
}
|
|
|
|
// Find the task log directory
|
|
logDir, err := h.findTaskLogDirectory(request.TaskId)
|
|
if err != nil {
|
|
response.ErrorMessage = fmt.Sprintf("Task log directory not found: %v", err)
|
|
glog.Warningf("Task log request failed for %s: %v", request.TaskId, err)
|
|
|
|
// Add diagnostic information to help debug the issue
|
|
response.LogEntries = []*worker_pb.TaskLogEntry{
|
|
{
|
|
Timestamp: time.Now().Unix(),
|
|
Level: "WARNING",
|
|
Message: fmt.Sprintf("Task logs not available: %v", err),
|
|
Fields: map[string]string{"source": "task_log_handler"},
|
|
},
|
|
{
|
|
Timestamp: time.Now().Unix(),
|
|
Level: "INFO",
|
|
Message: fmt.Sprintf("This usually means the task was never executed on this worker or logs were cleaned up. Base log directory: %s", h.baseLogDir),
|
|
Fields: map[string]string{"source": "task_log_handler"},
|
|
},
|
|
}
|
|
// response.Success remains false to indicate logs were not found
|
|
return response
|
|
}
|
|
|
|
// Read metadata if requested
|
|
if request.IncludeMetadata {
|
|
metadata, err := h.readTaskMetadata(logDir)
|
|
if err != nil {
|
|
response.ErrorMessage = fmt.Sprintf("Failed to read task metadata: %v", err)
|
|
glog.Warningf("Failed to read metadata for task %s: %v", request.TaskId, err)
|
|
return response
|
|
}
|
|
response.Metadata = metadata
|
|
}
|
|
|
|
// Read log entries
|
|
logEntries, err := h.readTaskLogEntries(logDir, request)
|
|
if err != nil {
|
|
response.ErrorMessage = fmt.Sprintf("Failed to read task logs: %v", err)
|
|
glog.Warningf("Failed to read logs for task %s: %v", request.TaskId, err)
|
|
return response
|
|
}
|
|
|
|
response.LogEntries = logEntries
|
|
response.Success = true
|
|
|
|
glog.V(1).Infof("Successfully retrieved %d log entries for task %s", len(logEntries), request.TaskId)
|
|
return response
|
|
}
|
|
|
|
// findTaskLogDirectory searches for the task log directory by task ID
|
|
func (h *TaskLogHandler) findTaskLogDirectory(taskID string) (string, error) {
|
|
entries, err := os.ReadDir(h.baseLogDir)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to read base log directory %s: %w", h.baseLogDir, err)
|
|
}
|
|
|
|
// Look for directories that start with the task ID
|
|
var candidateDirs []string
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
candidateDirs = append(candidateDirs, entry.Name())
|
|
if strings.HasPrefix(entry.Name(), taskID+"_") {
|
|
return filepath.Join(h.baseLogDir, entry.Name()), nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enhanced error message with diagnostic information
|
|
return "", fmt.Errorf("task log directory not found for task ID: %s (searched %d directories in %s, directories found: %v)",
|
|
taskID, len(candidateDirs), h.baseLogDir, candidateDirs)
|
|
}
|
|
|
|
// readTaskMetadata reads task metadata from the log directory
|
|
func (h *TaskLogHandler) readTaskMetadata(logDir string) (*worker_pb.TaskLogMetadata, error) {
|
|
metadata, err := GetTaskLogMetadata(logDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert to protobuf metadata
|
|
pbMetadata := &worker_pb.TaskLogMetadata{
|
|
TaskId: metadata.TaskID,
|
|
TaskType: metadata.TaskType,
|
|
WorkerId: metadata.WorkerID,
|
|
StartTime: metadata.StartTime.Unix(),
|
|
Status: metadata.Status,
|
|
Progress: float32(metadata.Progress),
|
|
VolumeId: metadata.VolumeID,
|
|
Server: metadata.Server,
|
|
Collection: metadata.Collection,
|
|
LogFilePath: metadata.LogFilePath,
|
|
CreatedAt: metadata.CreatedAt.Unix(),
|
|
CustomData: make(map[string]string),
|
|
}
|
|
|
|
// Set end time and duration if available
|
|
if metadata.EndTime != nil {
|
|
pbMetadata.EndTime = metadata.EndTime.Unix()
|
|
}
|
|
if metadata.Duration != nil {
|
|
pbMetadata.DurationMs = metadata.Duration.Milliseconds()
|
|
}
|
|
|
|
// Convert custom data
|
|
for key, value := range metadata.CustomData {
|
|
if strValue, ok := value.(string); ok {
|
|
pbMetadata.CustomData[key] = strValue
|
|
} else {
|
|
pbMetadata.CustomData[key] = fmt.Sprintf("%v", value)
|
|
}
|
|
}
|
|
|
|
return pbMetadata, nil
|
|
}
|
|
|
|
// readTaskLogEntries reads and filters log entries based on the request
|
|
func (h *TaskLogHandler) readTaskLogEntries(logDir string, request *worker_pb.TaskLogRequest) ([]*worker_pb.TaskLogEntry, error) {
|
|
entries, err := ReadTaskLogs(logDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Apply filters
|
|
var filteredEntries []TaskLogEntry
|
|
|
|
for _, entry := range entries {
|
|
// Filter by log level
|
|
if request.LogLevel != "" && !strings.EqualFold(entry.Level, request.LogLevel) {
|
|
continue
|
|
}
|
|
|
|
// Filter by time range
|
|
if request.StartTime > 0 && entry.Timestamp.Unix() < request.StartTime {
|
|
continue
|
|
}
|
|
if request.EndTime > 0 && entry.Timestamp.Unix() > request.EndTime {
|
|
continue
|
|
}
|
|
|
|
filteredEntries = append(filteredEntries, entry)
|
|
}
|
|
|
|
// Limit entries if requested
|
|
if request.MaxEntries > 0 && len(filteredEntries) > int(request.MaxEntries) {
|
|
// Take the most recent entries
|
|
start := len(filteredEntries) - int(request.MaxEntries)
|
|
filteredEntries = filteredEntries[start:]
|
|
}
|
|
|
|
// Convert to protobuf entries
|
|
var pbEntries []*worker_pb.TaskLogEntry
|
|
for _, entry := range filteredEntries {
|
|
pbEntry := &worker_pb.TaskLogEntry{
|
|
Timestamp: entry.Timestamp.Unix(),
|
|
Level: entry.Level,
|
|
Message: entry.Message,
|
|
Fields: make(map[string]string),
|
|
}
|
|
|
|
// Set progress if available
|
|
if entry.Progress != nil {
|
|
pbEntry.Progress = float32(*entry.Progress)
|
|
}
|
|
|
|
// Set status if available
|
|
if entry.Status != nil {
|
|
pbEntry.Status = *entry.Status
|
|
}
|
|
|
|
// Convert fields
|
|
for key, value := range entry.Fields {
|
|
if strValue, ok := value.(string); ok {
|
|
pbEntry.Fields[key] = strValue
|
|
} else {
|
|
pbEntry.Fields[key] = fmt.Sprintf("%v", value)
|
|
}
|
|
}
|
|
|
|
pbEntries = append(pbEntries, pbEntry)
|
|
}
|
|
|
|
return pbEntries, nil
|
|
}
|