admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
This commit is contained in:
@@ -5,7 +5,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
)
|
||||
@@ -69,6 +72,38 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
|
||||
ScheduleAt: now,
|
||||
}
|
||||
|
||||
// Plan EC destinations if ActiveTopology is available
|
||||
if clusterInfo.ActiveTopology != nil {
|
||||
multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
|
||||
continue // Skip this volume if destination planning fails
|
||||
}
|
||||
|
||||
// Find all volume replicas from topology
|
||||
replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
|
||||
glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
|
||||
|
||||
// Create typed parameters with EC destination information and replicas
|
||||
result.TypedParams = &worker_pb.TaskParams{
|
||||
VolumeId: metric.VolumeID,
|
||||
Server: metric.Server,
|
||||
Collection: metric.Collection,
|
||||
VolumeSize: metric.Size, // Store original volume size for tracking changes
|
||||
Replicas: replicas, // Include all volume replicas for deletion
|
||||
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
||||
ErasureCodingParams: createECTaskParams(multiPlan),
|
||||
},
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
|
||||
metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
|
||||
} else {
|
||||
glog.Warningf("No ActiveTopology available for destination planning in EC detection")
|
||||
continue // Skip this volume if no topology available
|
||||
}
|
||||
|
||||
results = append(results, result)
|
||||
} else {
|
||||
// Count debug reasons
|
||||
@@ -105,36 +140,277 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// Scheduling implements the scheduling logic for erasure coding tasks
|
||||
func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
|
||||
ecConfig := config.(*Config)
|
||||
// planECDestinations plans the destinations for erasure coding operation
|
||||
// This function implements EC destination planning logic directly in the detection phase
|
||||
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
|
||||
// Get source node information from topology
|
||||
var sourceRack, sourceDC string
|
||||
|
||||
// Check if we have available workers
|
||||
if len(availableWorkers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Count running EC tasks
|
||||
runningCount := 0
|
||||
for _, runningTask := range runningTasks {
|
||||
if runningTask.Type == types.TaskTypeErasureCoding {
|
||||
runningCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Check concurrency limit
|
||||
if runningCount >= ecConfig.MaxConcurrent {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if any worker can handle EC tasks
|
||||
for _, worker := range availableWorkers {
|
||||
for _, capability := range worker.Capabilities {
|
||||
if capability == types.TaskTypeErasureCoding {
|
||||
return true
|
||||
// Extract rack and DC from topology info
|
||||
topologyInfo := activeTopology.GetTopologyInfo()
|
||||
if topologyInfo != nil {
|
||||
for _, dc := range topologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, dataNodeInfo := range rack.DataNodeInfos {
|
||||
if dataNodeInfo.Id == metric.Server {
|
||||
sourceDC = dc.Id
|
||||
sourceRack = rack.Id
|
||||
break
|
||||
}
|
||||
}
|
||||
if sourceRack != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
if sourceDC != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
// Determine minimum shard disk locations based on configuration
|
||||
minTotalDisks := 4
|
||||
|
||||
// Get available disks for EC placement (include source node for EC)
|
||||
availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "")
|
||||
if len(availableDisks) < minTotalDisks {
|
||||
return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", minTotalDisks, len(availableDisks))
|
||||
}
|
||||
|
||||
// Select best disks for EC placement with rack/DC diversity
|
||||
selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
|
||||
if len(selectedDisks) < minTotalDisks {
|
||||
return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks)
|
||||
}
|
||||
|
||||
var plans []*topology.DestinationPlan
|
||||
rackCount := make(map[string]int)
|
||||
dcCount := make(map[string]int)
|
||||
|
||||
for _, disk := range selectedDisks {
|
||||
plan := &topology.DestinationPlan{
|
||||
TargetNode: disk.NodeID,
|
||||
TargetDisk: disk.DiskID,
|
||||
TargetRack: disk.Rack,
|
||||
TargetDC: disk.DataCenter,
|
||||
ExpectedSize: 0, // EC shards don't have predetermined size
|
||||
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
|
||||
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
|
||||
}
|
||||
plans = append(plans, plan)
|
||||
|
||||
// Count rack and DC diversity
|
||||
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
||||
rackCount[rackKey]++
|
||||
dcCount[disk.DataCenter]++
|
||||
}
|
||||
|
||||
return &topology.MultiDestinationPlan{
|
||||
Plans: plans,
|
||||
TotalShards: len(plans),
|
||||
SuccessfulRack: len(rackCount),
|
||||
SuccessfulDCs: len(dcCount),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// createECTaskParams creates EC task parameters from the multi-destination plan
|
||||
func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
|
||||
var destinations []*worker_pb.ECDestination
|
||||
|
||||
for _, plan := range multiPlan.Plans {
|
||||
destination := &worker_pb.ECDestination{
|
||||
Node: plan.TargetNode,
|
||||
DiskId: plan.TargetDisk,
|
||||
Rack: plan.TargetRack,
|
||||
DataCenter: plan.TargetDC,
|
||||
PlacementScore: plan.PlacementScore,
|
||||
}
|
||||
destinations = append(destinations, destination)
|
||||
}
|
||||
|
||||
// Collect placement conflicts from all destinations
|
||||
var placementConflicts []string
|
||||
for _, plan := range multiPlan.Plans {
|
||||
placementConflicts = append(placementConflicts, plan.Conflicts...)
|
||||
}
|
||||
|
||||
return &worker_pb.ErasureCodingTaskParams{
|
||||
Destinations: destinations,
|
||||
DataShards: erasure_coding.DataShardsCount, // Standard data shards
|
||||
ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
|
||||
PlacementConflicts: placementConflicts,
|
||||
}
|
||||
}
|
||||
|
||||
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
|
||||
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
|
||||
if len(disks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Group disks by rack and DC for diversity
|
||||
rackGroups := make(map[string][]*topology.DiskInfo)
|
||||
for _, disk := range disks {
|
||||
rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
||||
rackGroups[rackKey] = append(rackGroups[rackKey], disk)
|
||||
}
|
||||
|
||||
var selected []*topology.DiskInfo
|
||||
usedRacks := make(map[string]bool)
|
||||
|
||||
// First pass: select one disk from each rack for maximum diversity
|
||||
for rackKey, rackDisks := range rackGroups {
|
||||
if len(selected) >= shardsNeeded {
|
||||
break
|
||||
}
|
||||
|
||||
// Select best disk from this rack
|
||||
bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
|
||||
if bestDisk != nil {
|
||||
selected = append(selected, bestDisk)
|
||||
usedRacks[rackKey] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: if we need more disks, select from racks we've already used
|
||||
if len(selected) < shardsNeeded {
|
||||
for _, disk := range disks {
|
||||
if len(selected) >= shardsNeeded {
|
||||
break
|
||||
}
|
||||
|
||||
// Skip if already selected
|
||||
alreadySelected := false
|
||||
for _, sel := range selected {
|
||||
if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
|
||||
alreadySelected = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !alreadySelected && isDiskSuitableForEC(disk) {
|
||||
selected = append(selected, disk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return selected
|
||||
}
|
||||
|
||||
// selectBestFromRack selects the best disk from a rack for EC placement
|
||||
func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
|
||||
if len(disks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var bestDisk *topology.DiskInfo
|
||||
bestScore := -1.0
|
||||
|
||||
for _, disk := range disks {
|
||||
if !isDiskSuitableForEC(disk) {
|
||||
continue
|
||||
}
|
||||
|
||||
score := calculateECScore(disk, sourceRack, sourceDC)
|
||||
if score > bestScore {
|
||||
bestScore = score
|
||||
bestDisk = disk
|
||||
}
|
||||
}
|
||||
|
||||
return bestDisk
|
||||
}
|
||||
|
||||
// calculateECScore calculates placement score for EC operations
|
||||
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
|
||||
if disk.DiskInfo == nil {
|
||||
return 0.0
|
||||
}
|
||||
|
||||
score := 0.0
|
||||
|
||||
// Prefer disks with available capacity
|
||||
if disk.DiskInfo.MaxVolumeCount > 0 {
|
||||
utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
|
||||
score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity
|
||||
}
|
||||
|
||||
// Prefer different racks for better distribution
|
||||
if disk.Rack != sourceRack {
|
||||
score += 30.0
|
||||
}
|
||||
|
||||
// Prefer different data centers for better distribution
|
||||
if disk.DataCenter != sourceDC {
|
||||
score += 20.0
|
||||
}
|
||||
|
||||
// Consider current load
|
||||
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
|
||||
|
||||
return score
|
||||
}
|
||||
|
||||
// isDiskSuitableForEC checks if a disk is suitable for EC placement
|
||||
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
|
||||
if disk.DiskInfo == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if disk has capacity
|
||||
if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if disk is not overloaded
|
||||
if disk.LoadCount > 10 { // Arbitrary threshold
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// checkECPlacementConflicts checks for placement rule conflicts in EC operations
|
||||
func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
|
||||
var conflicts []string
|
||||
|
||||
// For EC, being on the same rack as source is often acceptable
|
||||
// but we note it as potential conflict for monitoring
|
||||
if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
|
||||
conflicts = append(conflicts, "same_rack_as_source")
|
||||
}
|
||||
|
||||
return conflicts
|
||||
}
|
||||
|
||||
// findVolumeReplicas finds all servers that have replicas of the specified volume
|
||||
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
|
||||
if activeTopology == nil {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
topologyInfo := activeTopology.GetTopologyInfo()
|
||||
if topologyInfo == nil {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
var replicaServers []string
|
||||
|
||||
// Iterate through all nodes to find volume replicas
|
||||
for _, dc := range topologyInfo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
for _, nodeInfo := range rack.DataNodeInfos {
|
||||
for _, diskInfo := range nodeInfo.DiskInfos {
|
||||
for _, volumeInfo := range diskInfo.VolumeInfos {
|
||||
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
|
||||
replicaServers = append(replicaServers, nodeInfo.Id)
|
||||
break // Found volume on this node, move to next node
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return replicaServers
|
||||
}
|
||||
|
||||
@@ -1,785 +0,0 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
// Task implements comprehensive erasure coding with protobuf parameters
|
||||
type Task struct {
|
||||
*base.BaseTypedTask
|
||||
|
||||
// Current task state
|
||||
sourceServer string
|
||||
volumeID uint32
|
||||
collection string
|
||||
workDir string
|
||||
masterClient string
|
||||
grpcDialOpt grpc.DialOption
|
||||
|
||||
// EC parameters from protobuf
|
||||
destinations []*worker_pb.ECDestination // Disk-aware destinations
|
||||
existingShardLocations []*worker_pb.ExistingECShardLocation // Existing shards to cleanup
|
||||
estimatedShardSize uint64
|
||||
dataShards int
|
||||
parityShards int
|
||||
cleanupSource bool
|
||||
|
||||
// Progress tracking
|
||||
currentStep string
|
||||
stepProgress map[string]float64
|
||||
}
|
||||
|
||||
// NewTask creates a new erasure coding task
|
||||
func NewTask() types.TypedTaskInterface {
|
||||
task := &Task{
|
||||
BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeErasureCoding),
|
||||
masterClient: "localhost:9333", // Default master client
|
||||
workDir: "/tmp/seaweedfs_ec_work", // Default work directory
|
||||
grpcDialOpt: grpc.WithTransportCredentials(insecure.NewCredentials()), // Default to insecure
|
||||
dataShards: erasure_coding.DataShardsCount, // Use package constant
|
||||
parityShards: erasure_coding.ParityShardsCount, // Use package constant
|
||||
stepProgress: make(map[string]float64),
|
||||
}
|
||||
return task
|
||||
}
|
||||
|
||||
// ValidateTyped validates the typed parameters for EC task
|
||||
func (t *Task) ValidateTyped(params *worker_pb.TaskParams) error {
|
||||
// Basic validation from base class
|
||||
if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check that we have EC-specific parameters
|
||||
ecParams := params.GetErasureCodingParams()
|
||||
if ecParams == nil {
|
||||
return fmt.Errorf("erasure_coding_params is required for EC task")
|
||||
}
|
||||
|
||||
// Require destinations
|
||||
if len(ecParams.Destinations) == 0 {
|
||||
return fmt.Errorf("destinations must be specified for EC task")
|
||||
}
|
||||
|
||||
// DataShards and ParityShards are constants from erasure_coding package
|
||||
expectedDataShards := int32(erasure_coding.DataShardsCount)
|
||||
expectedParityShards := int32(erasure_coding.ParityShardsCount)
|
||||
|
||||
if ecParams.DataShards > 0 && ecParams.DataShards != expectedDataShards {
|
||||
return fmt.Errorf("data_shards must be %d (fixed constant), got %d", expectedDataShards, ecParams.DataShards)
|
||||
}
|
||||
if ecParams.ParityShards > 0 && ecParams.ParityShards != expectedParityShards {
|
||||
return fmt.Errorf("parity_shards must be %d (fixed constant), got %d", expectedParityShards, ecParams.ParityShards)
|
||||
}
|
||||
|
||||
// Validate destination count
|
||||
destinationCount := len(ecParams.Destinations)
|
||||
totalShards := expectedDataShards + expectedParityShards
|
||||
if totalShards > int32(destinationCount) {
|
||||
return fmt.Errorf("insufficient destinations: need %d, have %d", totalShards, destinationCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimateTimeTyped estimates the time needed for EC processing based on protobuf parameters
|
||||
func (t *Task) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
|
||||
baseTime := 20 * time.Minute // Processing takes time due to comprehensive operations
|
||||
|
||||
ecParams := params.GetErasureCodingParams()
|
||||
if ecParams != nil && ecParams.EstimatedShardSize > 0 {
|
||||
// More accurate estimate based on shard size
|
||||
// Account for copying, encoding, and distribution
|
||||
gbSize := ecParams.EstimatedShardSize / (1024 * 1024 * 1024)
|
||||
estimatedTime := time.Duration(gbSize*2) * time.Minute // 2 minutes per GB
|
||||
if estimatedTime > baseTime {
|
||||
return estimatedTime
|
||||
}
|
||||
}
|
||||
|
||||
return baseTime
|
||||
}
|
||||
|
||||
// ExecuteTyped implements the actual erasure coding workflow with typed parameters
|
||||
func (t *Task) ExecuteTyped(params *worker_pb.TaskParams) error {
|
||||
// Extract basic parameters
|
||||
t.volumeID = params.VolumeId
|
||||
t.sourceServer = params.Server
|
||||
t.collection = params.Collection
|
||||
|
||||
// Extract EC-specific parameters
|
||||
ecParams := params.GetErasureCodingParams()
|
||||
if ecParams != nil {
|
||||
t.destinations = ecParams.Destinations // Store disk-aware destinations
|
||||
t.existingShardLocations = ecParams.ExistingShardLocations // Store existing shards for cleanup
|
||||
t.estimatedShardSize = ecParams.EstimatedShardSize
|
||||
t.cleanupSource = ecParams.CleanupSource
|
||||
|
||||
// DataShards and ParityShards are constants, don't override from parameters
|
||||
// t.dataShards and t.parityShards are already set to constants in NewTask
|
||||
|
||||
if ecParams.WorkingDir != "" {
|
||||
t.workDir = ecParams.WorkingDir
|
||||
}
|
||||
if ecParams.MasterClient != "" {
|
||||
t.masterClient = ecParams.MasterClient
|
||||
}
|
||||
}
|
||||
|
||||
// Determine available destinations for logging
|
||||
var availableDestinations []string
|
||||
for _, dest := range t.destinations {
|
||||
availableDestinations = append(availableDestinations, fmt.Sprintf("%s(disk:%d)", dest.Node, dest.DiskId))
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Starting EC task for volume %d: %s -> %v (data:%d, parity:%d)",
|
||||
t.volumeID, t.sourceServer, availableDestinations, t.dataShards, t.parityShards)
|
||||
|
||||
// Create unique working directory for this task
|
||||
taskWorkDir := filepath.Join(t.workDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
|
||||
if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: Created working directory: %s", taskWorkDir)
|
||||
|
||||
// Ensure cleanup of working directory
|
||||
defer func() {
|
||||
if err := os.RemoveAll(taskWorkDir); err != nil {
|
||||
glog.Warningf("Failed to cleanup working directory %s: %v", taskWorkDir, err)
|
||||
} else {
|
||||
glog.V(1).Infof("WORKFLOW: Cleaned up working directory: %s", taskWorkDir)
|
||||
}
|
||||
}()
|
||||
|
||||
// Step 1: Collect volume locations from master
|
||||
glog.V(1).Infof("WORKFLOW STEP 1: Collecting volume locations from master")
|
||||
t.SetProgress(5.0)
|
||||
volumeId := needle.VolumeId(t.volumeID)
|
||||
volumeLocations, err := t.collectVolumeLocations(volumeId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: Found volume %d on %d servers: %v", t.volumeID, len(volumeLocations), volumeLocations)
|
||||
|
||||
// Convert ServerAddress slice to string slice
|
||||
var locationStrings []string
|
||||
for _, addr := range volumeLocations {
|
||||
locationStrings = append(locationStrings, string(addr))
|
||||
}
|
||||
|
||||
// Step 2: Check if volume has sufficient size for EC encoding
|
||||
if !t.shouldPerformECEncoding(locationStrings) {
|
||||
glog.Infof("Volume %d does not meet EC encoding criteria, skipping", t.volumeID)
|
||||
t.SetProgress(100.0)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Step 2A: Cleanup existing EC shards if any
|
||||
glog.V(1).Infof("WORKFLOW STEP 2A: Cleaning up existing EC shards for volume %d", t.volumeID)
|
||||
t.SetProgress(10.0)
|
||||
err = t.cleanupExistingEcShards()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to cleanup existing EC shards (continuing anyway): %v", err)
|
||||
// Don't fail the task - this is just cleanup
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: Existing EC shards cleanup completed for volume %d", t.volumeID)
|
||||
|
||||
// Step 3: Mark volume readonly on all servers
|
||||
glog.V(1).Infof("WORKFLOW STEP 2B: Marking volume %d readonly on all replica servers", t.volumeID)
|
||||
t.SetProgress(15.0)
|
||||
err = t.markVolumeReadonlyOnAllReplicas(needle.VolumeId(t.volumeID), locationStrings)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mark volume readonly: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: Volume %d marked readonly on all replicas", t.volumeID)
|
||||
|
||||
// Step 5: Copy volume files (.dat, .idx) to EC worker
|
||||
glog.V(1).Infof("WORKFLOW STEP 3: Copying volume files from source server %s to EC worker", t.sourceServer)
|
||||
t.SetProgress(25.0)
|
||||
localVolumeFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume files to EC worker: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: Volume files copied to EC worker: %v", localVolumeFiles)
|
||||
|
||||
// Step 6: Generate EC shards locally on EC worker
|
||||
glog.V(1).Infof("WORKFLOW STEP 4: Generating EC shards locally on EC worker")
|
||||
t.SetProgress(40.0)
|
||||
localShardFiles, err := t.generateEcShardsLocally(localVolumeFiles, taskWorkDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate EC shards locally: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: EC shards generated locally: %d shard files", len(localShardFiles))
|
||||
|
||||
// Step 7: Distribute shards from EC worker to destination servers
|
||||
glog.V(1).Infof("WORKFLOW STEP 5: Distributing EC shards from worker to destination servers")
|
||||
t.SetProgress(60.0)
|
||||
err = t.distributeEcShardsFromWorker(localShardFiles)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to distribute EC shards from worker: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: EC shards distributed to all destination servers")
|
||||
|
||||
// Step 8: Mount EC shards on destination servers
|
||||
glog.V(1).Infof("WORKFLOW STEP 6: Mounting EC shards on destination servers")
|
||||
t.SetProgress(80.0)
|
||||
err = t.mountEcShardsOnDestinations()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to mount EC shards: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: EC shards mounted successfully")
|
||||
|
||||
// Step 9: Delete original volume from all locations
|
||||
glog.V(1).Infof("WORKFLOW STEP 7: Deleting original volume %d from all replica servers", t.volumeID)
|
||||
t.SetProgress(90.0)
|
||||
err = t.deleteVolumeFromAllLocations(needle.VolumeId(t.volumeID), locationStrings)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete original volume: %v", err)
|
||||
}
|
||||
glog.V(1).Infof("WORKFLOW: Original volume %d deleted from all locations", t.volumeID)
|
||||
|
||||
t.SetProgress(100.0)
|
||||
glog.Infof("EC task completed successfully for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectVolumeLocations gets volume location from master (placeholder implementation)
|
||||
func (t *Task) collectVolumeLocations(volumeId needle.VolumeId) ([]pb.ServerAddress, error) {
|
||||
// For now, return a placeholder implementation
|
||||
// Full implementation would call master to get volume locations
|
||||
return []pb.ServerAddress{pb.ServerAddress(t.sourceServer)}, nil
|
||||
}
|
||||
|
||||
// cleanupExistingEcShards deletes existing EC shards using planned locations
|
||||
func (t *Task) cleanupExistingEcShards() error {
|
||||
if len(t.existingShardLocations) == 0 {
|
||||
glog.V(1).Infof("No existing EC shards to cleanup for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Cleaning up existing EC shards for volume %d on %d servers", t.volumeID, len(t.existingShardLocations))
|
||||
|
||||
// Delete existing shards from each location using planned shard locations
|
||||
for _, location := range t.existingShardLocations {
|
||||
if len(location.ShardIds) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Deleting existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(location.Node), t.grpcDialOpt,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, deleteErr := client.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: location.ShardIds,
|
||||
})
|
||||
return deleteErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete existing EC shards %v from %s for volume %d: %v", location.ShardIds, location.Node, t.volumeID, err)
|
||||
// Continue with other servers - don't fail the entire cleanup
|
||||
} else {
|
||||
glog.V(1).Infof("Successfully deleted existing EC shards %v from %s for volume %d", location.ShardIds, location.Node, t.volumeID)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Completed cleanup of existing EC shards for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldPerformECEncoding checks if the volume meets criteria for EC encoding
|
||||
func (t *Task) shouldPerformECEncoding(volumeLocations []string) bool {
|
||||
// For now, always proceed with EC encoding if volume exists
|
||||
// This can be extended with volume size checks, etc.
|
||||
return len(volumeLocations) > 0
|
||||
}
|
||||
|
||||
// markVolumeReadonlyOnAllReplicas marks the volume as readonly on all replica servers
|
||||
func (t *Task) markVolumeReadonlyOnAllReplicas(volumeId needle.VolumeId, volumeLocations []string) error {
|
||||
glog.V(1).Infof("Marking volume %d readonly on %d servers", volumeId, len(volumeLocations))
|
||||
|
||||
// Mark volume readonly on all replica servers
|
||||
for _, location := range volumeLocations {
|
||||
glog.V(1).Infof("Marking volume %d readonly on %s", volumeId, location)
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, markErr := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
})
|
||||
return markErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to mark volume %d readonly on %s: %v", volumeId, location, err)
|
||||
return fmt.Errorf("failed to mark volume %d readonly on %s: %v", volumeId, location, err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully marked volume %d readonly on %s", volumeId, location)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully marked volume %d readonly on all %d locations", volumeId, len(volumeLocations))
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
|
||||
func (t *Task) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
|
||||
localFiles := make(map[string]string)
|
||||
|
||||
// Copy .dat file
|
||||
datFile := fmt.Sprintf("%s.dat", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
|
||||
err := t.copyFileFromSource(".dat", datFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to copy .dat file: %v", err)
|
||||
}
|
||||
localFiles["dat"] = datFile
|
||||
glog.V(1).Infof("Copied .dat file to: %s", datFile)
|
||||
|
||||
// Copy .idx file
|
||||
idxFile := fmt.Sprintf("%s.idx", filepath.Join(workDir, fmt.Sprintf("%d", t.volumeID)))
|
||||
err = t.copyFileFromSource(".idx", idxFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to copy .idx file: %v", err)
|
||||
}
|
||||
localFiles["idx"] = idxFile
|
||||
glog.V(1).Infof("Copied .idx file to: %s", idxFile)
|
||||
|
||||
return localFiles, nil
|
||||
}
|
||||
|
||||
// copyFileFromSource copies a file from source server to local path using gRPC streaming
|
||||
func (t *Task) copyFileFromSource(ext, localPath string) error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.sourceServer), t.grpcDialOpt,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
Ext: ext,
|
||||
StopOffset: uint64(math.MaxInt64),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initiate file copy: %v", err)
|
||||
}
|
||||
|
||||
// Create local file
|
||||
localFile, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create local file %s: %v", localPath, err)
|
||||
}
|
||||
defer localFile.Close()
|
||||
|
||||
// Stream data and write to local file
|
||||
totalBytes := int64(0)
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive file data: %v", err)
|
||||
}
|
||||
|
||||
if len(resp.FileContent) > 0 {
|
||||
written, writeErr := localFile.Write(resp.FileContent)
|
||||
if writeErr != nil {
|
||||
return fmt.Errorf("failed to write to local file: %v", writeErr)
|
||||
}
|
||||
totalBytes += int64(written)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.sourceServer, localPath)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// generateEcShardsLocally generates EC shards from local volume files
|
||||
func (t *Task) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
|
||||
datFile := localFiles["dat"]
|
||||
idxFile := localFiles["idx"]
|
||||
|
||||
if datFile == "" || idxFile == "" {
|
||||
return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
|
||||
}
|
||||
|
||||
// Get base name without extension for EC operations
|
||||
baseName := strings.TrimSuffix(datFile, ".dat")
|
||||
|
||||
shardFiles := make(map[string]string)
|
||||
|
||||
glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
|
||||
|
||||
// Generate EC shard files (.ec00 ~ .ec13)
|
||||
if err := erasure_coding.WriteEcFiles(baseName); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
|
||||
}
|
||||
|
||||
// Generate .ecx file from .idx
|
||||
if err := erasure_coding.WriteSortedFileFromIdx(idxFile, ".ecx"); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
|
||||
}
|
||||
|
||||
// Collect generated shard file paths
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
|
||||
if _, err := os.Stat(shardFile); err == nil {
|
||||
shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
|
||||
}
|
||||
}
|
||||
|
||||
// Add metadata files
|
||||
ecxFile := idxFile + ".ecx"
|
||||
if _, err := os.Stat(ecxFile); err == nil {
|
||||
shardFiles["ecx"] = ecxFile
|
||||
}
|
||||
|
||||
// Generate .vif file (volume info)
|
||||
vifFile := baseName + ".vif"
|
||||
// Create basic volume info - in a real implementation, this would come from the original volume
|
||||
volumeInfo := &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.GetCurrentVersion()),
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
|
||||
glog.Warningf("Failed to create .vif file: %v", err)
|
||||
} else {
|
||||
shardFiles["vif"] = vifFile
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
|
||||
return shardFiles, nil
|
||||
}
|
||||
|
||||
func (t *Task) copyEcShardsToDestinations() error {
|
||||
if len(t.destinations) == 0 {
|
||||
return fmt.Errorf("no destinations specified for EC shard distribution")
|
||||
}
|
||||
|
||||
destinations := t.destinations
|
||||
|
||||
glog.V(1).Infof("Copying EC shards for volume %d to %d destinations", t.volumeID, len(destinations))
|
||||
|
||||
// Prepare shard IDs (0-13 for EC shards)
|
||||
var shardIds []uint32
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardIds = append(shardIds, uint32(i))
|
||||
}
|
||||
|
||||
// Distribute shards across destinations
|
||||
var wg sync.WaitGroup
|
||||
errorChan := make(chan error, len(destinations))
|
||||
|
||||
// Track which disks have already received metadata files (server+disk)
|
||||
metadataFilesCopied := make(map[string]bool)
|
||||
var metadataMutex sync.Mutex
|
||||
|
||||
// For each destination, copy a subset of shards
|
||||
shardsPerDest := len(shardIds) / len(destinations)
|
||||
remainder := len(shardIds) % len(destinations)
|
||||
|
||||
shardOffset := 0
|
||||
for i, dest := range destinations {
|
||||
wg.Add(1)
|
||||
|
||||
shardsForThisDest := shardsPerDest
|
||||
if i < remainder {
|
||||
shardsForThisDest++ // Distribute remainder shards
|
||||
}
|
||||
|
||||
destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
|
||||
shardOffset += shardsForThisDest
|
||||
|
||||
go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
|
||||
defer wg.Done()
|
||||
|
||||
if t.IsCancelled() {
|
||||
errorChan <- fmt.Errorf("task cancelled during shard copy")
|
||||
return
|
||||
}
|
||||
|
||||
// Create disk-specific metadata key (server+disk)
|
||||
diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
|
||||
|
||||
glog.V(1).Infof("Copying shards %v from %s to %s (disk %d)",
|
||||
targetShardIds, t.sourceServer, destination.Node, destination.DiskId)
|
||||
|
||||
// Check if this disk needs metadata files (only once per disk)
|
||||
metadataMutex.Lock()
|
||||
needsMetadataFiles := !metadataFilesCopied[diskKey]
|
||||
if needsMetadataFiles {
|
||||
metadataFilesCopied[diskKey] = true
|
||||
}
|
||||
metadataMutex.Unlock()
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: uint32(t.volumeID),
|
||||
Collection: t.collection,
|
||||
ShardIds: targetShardIds,
|
||||
CopyEcxFile: needsMetadataFiles, // Copy .ecx only once per disk
|
||||
CopyEcjFile: needsMetadataFiles, // Copy .ecj only once per disk
|
||||
CopyVifFile: needsMetadataFiles, // Copy .vif only once per disk
|
||||
SourceDataNode: t.sourceServer,
|
||||
DiskId: destination.DiskId, // Pass target disk ID
|
||||
})
|
||||
return copyErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
errorChan <- fmt.Errorf("failed to copy shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
|
||||
return
|
||||
}
|
||||
|
||||
if needsMetadataFiles {
|
||||
glog.V(1).Infof("Successfully copied shards %v and metadata files (.ecx, .ecj, .vif) to %s disk %d",
|
||||
targetShardIds, destination.Node, destination.DiskId)
|
||||
} else {
|
||||
glog.V(1).Infof("Successfully copied shards %v to %s disk %d (metadata files already present)",
|
||||
targetShardIds, destination.Node, destination.DiskId)
|
||||
}
|
||||
}(dest, destShardIds)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errorChan)
|
||||
|
||||
// Check for any copy errors
|
||||
if err := <-errorChan; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully copied all EC shards for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// distributeEcShardsFromWorker distributes locally generated EC shards to destination servers
|
||||
func (t *Task) distributeEcShardsFromWorker(localShardFiles map[string]string) error {
|
||||
if len(t.destinations) == 0 {
|
||||
return fmt.Errorf("no destinations specified for EC shard distribution")
|
||||
}
|
||||
|
||||
destinations := t.destinations
|
||||
|
||||
glog.V(1).Infof("Distributing EC shards for volume %d from worker to %d destinations", t.volumeID, len(destinations))
|
||||
|
||||
// Prepare shard IDs (0-13 for EC shards)
|
||||
var shardIds []uint32
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardIds = append(shardIds, uint32(i))
|
||||
}
|
||||
|
||||
// Distribute shards across destinations
|
||||
var wg sync.WaitGroup
|
||||
errorChan := make(chan error, len(destinations))
|
||||
|
||||
// Track which disks have already received metadata files (server+disk)
|
||||
metadataFilesCopied := make(map[string]bool)
|
||||
var metadataMutex sync.Mutex
|
||||
|
||||
// For each destination, send a subset of shards
|
||||
shardsPerDest := len(shardIds) / len(destinations)
|
||||
remainder := len(shardIds) % len(destinations)
|
||||
|
||||
shardOffset := 0
|
||||
for i, dest := range destinations {
|
||||
wg.Add(1)
|
||||
|
||||
shardsForThisDest := shardsPerDest
|
||||
if i < remainder {
|
||||
shardsForThisDest++ // Distribute remainder shards
|
||||
}
|
||||
|
||||
destShardIds := shardIds[shardOffset : shardOffset+shardsForThisDest]
|
||||
shardOffset += shardsForThisDest
|
||||
|
||||
go func(destination *worker_pb.ECDestination, targetShardIds []uint32) {
|
||||
defer wg.Done()
|
||||
|
||||
if t.IsCancelled() {
|
||||
errorChan <- fmt.Errorf("task cancelled during shard distribution")
|
||||
return
|
||||
}
|
||||
|
||||
// Create disk-specific metadata key (server+disk)
|
||||
diskKey := fmt.Sprintf("%s:%d", destination.Node, destination.DiskId)
|
||||
|
||||
glog.V(1).Infof("Distributing shards %v from worker to %s (disk %d)",
|
||||
targetShardIds, destination.Node, destination.DiskId)
|
||||
|
||||
// Check if this disk needs metadata files (only once per disk)
|
||||
metadataMutex.Lock()
|
||||
needsMetadataFiles := !metadataFilesCopied[diskKey]
|
||||
if needsMetadataFiles {
|
||||
metadataFilesCopied[diskKey] = true
|
||||
}
|
||||
metadataMutex.Unlock()
|
||||
|
||||
// Send shard files to destination using HTTP upload (simplified for now)
|
||||
err := t.sendShardsToDestination(destination, targetShardIds, localShardFiles, needsMetadataFiles)
|
||||
if err != nil {
|
||||
errorChan <- fmt.Errorf("failed to send shards to %s disk %d: %v", destination.Node, destination.DiskId, err)
|
||||
return
|
||||
}
|
||||
|
||||
if needsMetadataFiles {
|
||||
glog.V(1).Infof("Successfully distributed shards %v and metadata files (.ecx, .vif) to %s disk %d",
|
||||
targetShardIds, destination.Node, destination.DiskId)
|
||||
} else {
|
||||
glog.V(1).Infof("Successfully distributed shards %v to %s disk %d (metadata files already present)",
|
||||
targetShardIds, destination.Node, destination.DiskId)
|
||||
}
|
||||
}(dest, destShardIds)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errorChan)
|
||||
|
||||
// Check for any distribution errors
|
||||
if err := <-errorChan; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Completed distributing EC shards for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendShardsToDestination sends specific shard files from worker to a destination server (simplified)
|
||||
func (t *Task) sendShardsToDestination(destination *worker_pb.ECDestination, shardIds []uint32, localFiles map[string]string, includeMetadata bool) error {
|
||||
// For now, use a simplified approach - just upload the files
|
||||
// In a full implementation, this would use proper file upload mechanisms
|
||||
glog.V(2).Infof("Would send shards %v and metadata=%v to %s disk %d", shardIds, includeMetadata, destination.Node, destination.DiskId)
|
||||
|
||||
// TODO: Implement actual file upload to volume server
|
||||
// This is a placeholder - actual implementation would:
|
||||
// 1. Open each shard file locally
|
||||
// 2. Upload via HTTP POST or gRPC stream to destination volume server
|
||||
// 3. Volume server would save to the specified disk_id
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mountEcShardsOnDestinations mounts EC shards on all destination servers
|
||||
func (t *Task) mountEcShardsOnDestinations() error {
|
||||
if len(t.destinations) == 0 {
|
||||
return fmt.Errorf("no destinations specified for mounting EC shards")
|
||||
}
|
||||
|
||||
destinations := t.destinations
|
||||
|
||||
glog.V(1).Infof("Mounting EC shards for volume %d on %d destinations", t.volumeID, len(destinations))
|
||||
|
||||
// Prepare all shard IDs (0-13)
|
||||
var allShardIds []uint32
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
allShardIds = append(allShardIds, uint32(i))
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errorChan := make(chan error, len(destinations))
|
||||
|
||||
// Mount shards on each destination server
|
||||
for _, dest := range destinations {
|
||||
wg.Add(1)
|
||||
|
||||
go func(destination *worker_pb.ECDestination) {
|
||||
defer wg.Done()
|
||||
|
||||
if t.IsCancelled() {
|
||||
errorChan <- fmt.Errorf("task cancelled during shard mounting")
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Mounting EC shards on %s disk %d", destination.Node, destination.DiskId)
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destination.Node), t.grpcDialOpt,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
||||
VolumeId: uint32(t.volumeID),
|
||||
Collection: t.collection,
|
||||
ShardIds: allShardIds, // Mount all available shards on each server
|
||||
})
|
||||
return mountErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// It's normal for some servers to not have all shards, so log as warning rather than error
|
||||
glog.Warningf("Failed to mount some shards on %s disk %d (this may be normal): %v", destination.Node, destination.DiskId, err)
|
||||
} else {
|
||||
glog.V(1).Infof("Successfully mounted EC shards on %s disk %d", destination.Node, destination.DiskId)
|
||||
}
|
||||
}(dest)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errorChan)
|
||||
|
||||
// Check for any critical mounting errors
|
||||
select {
|
||||
case err := <-errorChan:
|
||||
if err != nil {
|
||||
glog.Warningf("Some shard mounting issues occurred: %v", err)
|
||||
}
|
||||
default:
|
||||
// No errors
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Completed mounting EC shards for volume %d", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteVolumeFromAllLocations deletes the original volume from all replica servers
|
||||
func (t *Task) deleteVolumeFromAllLocations(volumeId needle.VolumeId, volumeLocations []string) error {
|
||||
glog.V(1).Infof("Deleting original volume %d from %d locations", volumeId, len(volumeLocations))
|
||||
|
||||
for _, location := range volumeLocations {
|
||||
glog.V(1).Infof("Deleting volume %d from %s", volumeId, location)
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(location), t.grpcDialOpt,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, deleteErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
OnlyEmpty: false, // Force delete even if not empty since we've already created EC shards
|
||||
})
|
||||
return deleteErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete volume %d from %s: %v", volumeId, location, err)
|
||||
return fmt.Errorf("failed to delete volume %d from %s: %v", volumeId, location, err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully deleted volume %d from %s", volumeId, location)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully deleted volume %d from all %d locations", volumeId, len(volumeLocations))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register the task in the global registry
|
||||
func init() {
|
||||
types.RegisterGlobalTypedTask(types.TaskTypeErasureCoding, NewTask)
|
||||
glog.V(1).Infof("Registered EC task")
|
||||
}
|
||||
660
weed/worker/tasks/erasure_coding/ec_task.go
Normal file
660
weed/worker/tasks/erasure_coding/ec_task.go
Normal file
@@ -0,0 +1,660 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types/base"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// ErasureCodingTask implements the Task interface
|
||||
type ErasureCodingTask struct {
|
||||
*base.BaseTask
|
||||
server string
|
||||
volumeID uint32
|
||||
collection string
|
||||
workDir string
|
||||
progress float64
|
||||
|
||||
// EC parameters
|
||||
dataShards int32
|
||||
parityShards int32
|
||||
destinations []*worker_pb.ECDestination
|
||||
shardAssignment map[string][]string // destination -> assigned shard types
|
||||
replicas []string // volume replica servers for deletion
|
||||
}
|
||||
|
||||
// NewErasureCodingTask creates a new unified EC task instance
|
||||
func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask {
|
||||
return &ErasureCodingTask{
|
||||
BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding),
|
||||
server: server,
|
||||
volumeID: volumeID,
|
||||
collection: collection,
|
||||
dataShards: erasure_coding.DataShardsCount, // Default values
|
||||
parityShards: erasure_coding.ParityShardsCount, // Default values
|
||||
}
|
||||
}
|
||||
|
||||
// Execute implements the UnifiedTask interface
|
||||
func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
|
||||
if params == nil {
|
||||
return fmt.Errorf("task parameters are required")
|
||||
}
|
||||
|
||||
ecParams := params.GetErasureCodingParams()
|
||||
if ecParams == nil {
|
||||
return fmt.Errorf("erasure coding parameters are required")
|
||||
}
|
||||
|
||||
t.dataShards = ecParams.DataShards
|
||||
t.parityShards = ecParams.ParityShards
|
||||
t.workDir = ecParams.WorkingDir
|
||||
t.destinations = ecParams.Destinations
|
||||
t.replicas = params.Replicas // Get replicas from task parameters
|
||||
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"server": t.server,
|
||||
"collection": t.collection,
|
||||
"data_shards": t.dataShards,
|
||||
"parity_shards": t.parityShards,
|
||||
"destinations": len(t.destinations),
|
||||
}).Info("Starting erasure coding task")
|
||||
|
||||
// Use the working directory from task parameters, or fall back to a default
|
||||
baseWorkDir := t.workDir
|
||||
if baseWorkDir == "" {
|
||||
baseWorkDir = "/tmp/seaweedfs_ec_work"
|
||||
}
|
||||
|
||||
// Create unique working directory for this task
|
||||
taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
|
||||
if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
|
||||
}
|
||||
glog.V(1).Infof("Created working directory: %s", taskWorkDir)
|
||||
|
||||
// Update the task's working directory to the specific instance directory
|
||||
t.workDir = taskWorkDir
|
||||
glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir)
|
||||
|
||||
// Ensure cleanup of working directory (but preserve logs)
|
||||
defer func() {
|
||||
// Clean up volume files and EC shards, but preserve the directory structure and any logs
|
||||
patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"}
|
||||
for _, pattern := range patterns {
|
||||
matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, match := range matches {
|
||||
if err := os.Remove(match); err != nil {
|
||||
glog.V(2).Infof("Could not remove %s: %v", match, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir)
|
||||
}()
|
||||
|
||||
// Step 1: Mark volume readonly
|
||||
t.ReportProgress(10.0)
|
||||
t.GetLogger().Info("Marking volume readonly")
|
||||
if err := t.markVolumeReadonly(); err != nil {
|
||||
return fmt.Errorf("failed to mark volume readonly: %v", err)
|
||||
}
|
||||
|
||||
// Step 2: Copy volume files to worker
|
||||
t.ReportProgress(25.0)
|
||||
t.GetLogger().Info("Copying volume files to worker")
|
||||
localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy volume files: %v", err)
|
||||
}
|
||||
|
||||
// Step 3: Generate EC shards locally
|
||||
t.ReportProgress(40.0)
|
||||
t.GetLogger().Info("Generating EC shards locally")
|
||||
shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate EC shards: %v", err)
|
||||
}
|
||||
|
||||
// Step 4: Distribute shards to destinations
|
||||
t.ReportProgress(60.0)
|
||||
t.GetLogger().Info("Distributing EC shards to destinations")
|
||||
if err := t.distributeEcShards(shardFiles); err != nil {
|
||||
return fmt.Errorf("failed to distribute EC shards: %v", err)
|
||||
}
|
||||
|
||||
// Step 5: Mount EC shards
|
||||
t.ReportProgress(80.0)
|
||||
t.GetLogger().Info("Mounting EC shards")
|
||||
if err := t.mountEcShards(); err != nil {
|
||||
return fmt.Errorf("failed to mount EC shards: %v", err)
|
||||
}
|
||||
|
||||
// Step 6: Delete original volume
|
||||
t.ReportProgress(90.0)
|
||||
t.GetLogger().Info("Deleting original volume")
|
||||
if err := t.deleteOriginalVolume(); err != nil {
|
||||
return fmt.Errorf("failed to delete original volume: %v", err)
|
||||
}
|
||||
|
||||
t.ReportProgress(100.0)
|
||||
glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
|
||||
t.volumeID, t.server, len(shardFiles))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate implements the UnifiedTask interface
|
||||
func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
|
||||
if params == nil {
|
||||
return fmt.Errorf("task parameters are required")
|
||||
}
|
||||
|
||||
ecParams := params.GetErasureCodingParams()
|
||||
if ecParams == nil {
|
||||
return fmt.Errorf("erasure coding parameters are required")
|
||||
}
|
||||
|
||||
if params.VolumeId != t.volumeID {
|
||||
return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
|
||||
}
|
||||
|
||||
if params.Server != t.server {
|
||||
return fmt.Errorf("source server mismatch: expected %s, got %s", t.server, params.Server)
|
||||
}
|
||||
|
||||
if ecParams.DataShards < 1 {
|
||||
return fmt.Errorf("invalid data shards: %d (must be >= 1)", ecParams.DataShards)
|
||||
}
|
||||
|
||||
if ecParams.ParityShards < 1 {
|
||||
return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
|
||||
}
|
||||
|
||||
if len(ecParams.Destinations) < int(ecParams.DataShards+ecParams.ParityShards) {
|
||||
return fmt.Errorf("insufficient destinations: got %d, need %d", len(ecParams.Destinations), ecParams.DataShards+ecParams.ParityShards)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimateTime implements the UnifiedTask interface
|
||||
func (t *ErasureCodingTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
|
||||
// Basic estimate based on simulated steps
|
||||
return 20 * time.Second // Sum of all step durations
|
||||
}
|
||||
|
||||
// GetProgress returns current progress
|
||||
func (t *ErasureCodingTask) GetProgress() float64 {
|
||||
return t.progress
|
||||
}
|
||||
|
||||
// Helper methods for actual EC operations
|
||||
|
||||
// markVolumeReadonly marks the volume as readonly on the source server
|
||||
func (t *ErasureCodingTask) markVolumeReadonly() error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
|
||||
func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
|
||||
localFiles := make(map[string]string)
|
||||
|
||||
// Copy .dat file
|
||||
datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
|
||||
if err := t.copyFileFromSource(".dat", datFile); err != nil {
|
||||
return nil, fmt.Errorf("failed to copy .dat file: %v", err)
|
||||
}
|
||||
localFiles["dat"] = datFile
|
||||
|
||||
// Copy .idx file
|
||||
idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
|
||||
if err := t.copyFileFromSource(".idx", idxFile); err != nil {
|
||||
return nil, fmt.Errorf("failed to copy .idx file: %v", err)
|
||||
}
|
||||
localFiles["idx"] = idxFile
|
||||
|
||||
return localFiles, nil
|
||||
}
|
||||
|
||||
// copyFileFromSource copies a file from source server to local path using gRPC streaming
|
||||
func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
Ext: ext,
|
||||
StopOffset: uint64(math.MaxInt64),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initiate file copy: %v", err)
|
||||
}
|
||||
|
||||
// Create local file
|
||||
localFile, err := os.Create(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create local file %s: %v", localPath, err)
|
||||
}
|
||||
defer localFile.Close()
|
||||
|
||||
// Stream data and write to local file
|
||||
totalBytes := int64(0)
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to receive file data: %v", err)
|
||||
}
|
||||
|
||||
if len(resp.FileContent) > 0 {
|
||||
written, writeErr := localFile.Write(resp.FileContent)
|
||||
if writeErr != nil {
|
||||
return fmt.Errorf("failed to write to local file: %v", writeErr)
|
||||
}
|
||||
totalBytes += int64(written)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// generateEcShardsLocally generates EC shards from local volume files
|
||||
func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
|
||||
datFile := localFiles["dat"]
|
||||
idxFile := localFiles["idx"]
|
||||
|
||||
if datFile == "" || idxFile == "" {
|
||||
return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
|
||||
}
|
||||
|
||||
// Get base name without extension for EC operations
|
||||
baseName := strings.TrimSuffix(datFile, ".dat")
|
||||
shardFiles := make(map[string]string)
|
||||
|
||||
glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
|
||||
|
||||
// Generate EC shard files (.ec00 ~ .ec13)
|
||||
if err := erasure_coding.WriteEcFiles(baseName); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
|
||||
}
|
||||
|
||||
// Generate .ecx file from .idx (use baseName, not full idx path)
|
||||
if err := erasure_coding.WriteSortedFileFromIdx(baseName, ".ecx"); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
|
||||
}
|
||||
|
||||
// Collect generated shard file paths
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
|
||||
if _, err := os.Stat(shardFile); err == nil {
|
||||
shardFiles[fmt.Sprintf("ec%02d", i)] = shardFile
|
||||
}
|
||||
}
|
||||
|
||||
// Add metadata files
|
||||
ecxFile := baseName + ".ecx"
|
||||
if _, err := os.Stat(ecxFile); err == nil {
|
||||
shardFiles["ecx"] = ecxFile
|
||||
}
|
||||
|
||||
// Generate .vif file (volume info)
|
||||
vifFile := baseName + ".vif"
|
||||
volumeInfo := &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.GetCurrentVersion()),
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
|
||||
glog.Warningf("Failed to create .vif file: %v", err)
|
||||
} else {
|
||||
shardFiles["vif"] = vifFile
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Generated %d EC files locally", len(shardFiles))
|
||||
return shardFiles, nil
|
||||
}
|
||||
|
||||
// distributeEcShards distributes locally generated EC shards to destination servers
|
||||
func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
|
||||
if len(t.destinations) == 0 {
|
||||
return fmt.Errorf("no destinations specified for EC shard distribution")
|
||||
}
|
||||
|
||||
if len(shardFiles) == 0 {
|
||||
return fmt.Errorf("no shard files available for distribution")
|
||||
}
|
||||
|
||||
// Create shard assignment: assign specific shards to specific destinations
|
||||
shardAssignment := t.createShardAssignment(shardFiles)
|
||||
if len(shardAssignment) == 0 {
|
||||
return fmt.Errorf("failed to create shard assignment")
|
||||
}
|
||||
|
||||
// Store assignment for use during mounting
|
||||
t.shardAssignment = shardAssignment
|
||||
|
||||
// Send assigned shards to each destination
|
||||
for destNode, assignedShards := range shardAssignment {
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"destination": destNode,
|
||||
"assigned_shards": len(assignedShards),
|
||||
"shard_ids": assignedShards,
|
||||
}).Info("Distributing assigned EC shards to destination")
|
||||
|
||||
// Send only the assigned shards to this destination
|
||||
for _, shardType := range assignedShards {
|
||||
filePath, exists := shardFiles[shardType]
|
||||
if !exists {
|
||||
return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
|
||||
}
|
||||
|
||||
if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
|
||||
return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
|
||||
return nil
|
||||
}
|
||||
|
||||
// createShardAssignment assigns specific EC shards to specific destination servers
|
||||
// Each destination gets a subset of shards based on availability and placement rules
|
||||
func (t *ErasureCodingTask) createShardAssignment(shardFiles map[string]string) map[string][]string {
|
||||
assignment := make(map[string][]string)
|
||||
|
||||
// Collect all available EC shards (ec00-ec13)
|
||||
var availableShards []string
|
||||
for shardType := range shardFiles {
|
||||
if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
|
||||
availableShards = append(availableShards, shardType)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort shards for consistent assignment
|
||||
sort.Strings(availableShards)
|
||||
|
||||
if len(availableShards) == 0 {
|
||||
glog.Warningf("No EC shards found for assignment")
|
||||
return assignment
|
||||
}
|
||||
|
||||
// Calculate shards per destination
|
||||
numDestinations := len(t.destinations)
|
||||
if numDestinations == 0 {
|
||||
return assignment
|
||||
}
|
||||
|
||||
// Strategy: Distribute shards as evenly as possible across destinations
|
||||
// With 14 shards and N destinations, some destinations get ⌈14/N⌉ shards, others get ⌊14/N⌋
|
||||
shardsPerDest := len(availableShards) / numDestinations
|
||||
extraShards := len(availableShards) % numDestinations
|
||||
|
||||
shardIndex := 0
|
||||
for i, dest := range t.destinations {
|
||||
var destShards []string
|
||||
|
||||
// Assign base number of shards
|
||||
shardsToAssign := shardsPerDest
|
||||
|
||||
// Assign one extra shard to first 'extraShards' destinations
|
||||
if i < extraShards {
|
||||
shardsToAssign++
|
||||
}
|
||||
|
||||
// Assign the shards
|
||||
for j := 0; j < shardsToAssign && shardIndex < len(availableShards); j++ {
|
||||
destShards = append(destShards, availableShards[shardIndex])
|
||||
shardIndex++
|
||||
}
|
||||
|
||||
assignment[dest.Node] = destShards
|
||||
|
||||
glog.V(2).Infof("Assigned shards %v to destination %s", destShards, dest.Node)
|
||||
}
|
||||
|
||||
// Assign metadata files (.ecx, .vif) to each destination that has shards
|
||||
// Note: .ecj files are created during mount, not during initial generation
|
||||
for destNode, destShards := range assignment {
|
||||
if len(destShards) > 0 {
|
||||
// Add .ecx file if available
|
||||
if _, hasEcx := shardFiles["ecx"]; hasEcx {
|
||||
assignment[destNode] = append(assignment[destNode], "ecx")
|
||||
}
|
||||
|
||||
// Add .vif file if available
|
||||
if _, hasVif := shardFiles["vif"]; hasVif {
|
||||
assignment[destNode] = append(assignment[destNode], "vif")
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Assigned metadata files (.ecx, .vif) to destination %s", destNode)
|
||||
}
|
||||
}
|
||||
|
||||
return assignment
|
||||
}
|
||||
|
||||
// sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API
|
||||
func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), grpc.WithInsecure(),
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
// Open the local shard file
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open shard file %s: %v", filePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Get file size
|
||||
fileInfo, err := file.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get file info for %s: %v", filePath, err)
|
||||
}
|
||||
|
||||
// Determine file extension and shard ID
|
||||
var ext string
|
||||
var shardId uint32
|
||||
if shardType == "ecx" {
|
||||
ext = ".ecx"
|
||||
shardId = 0 // ecx file doesn't have a specific shard ID
|
||||
} else if shardType == "vif" {
|
||||
ext = ".vif"
|
||||
shardId = 0 // vif file doesn't have a specific shard ID
|
||||
} else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
|
||||
// EC shard file like "ec00", "ec01", etc.
|
||||
ext = "." + shardType
|
||||
fmt.Sscanf(shardType[2:], "%d", &shardId)
|
||||
} else {
|
||||
return fmt.Errorf("unknown shard type: %s", shardType)
|
||||
}
|
||||
|
||||
// Create streaming client
|
||||
stream, err := client.ReceiveFile(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create receive stream: %v", err)
|
||||
}
|
||||
|
||||
// Send file info first
|
||||
err = stream.Send(&volume_server_pb.ReceiveFileRequest{
|
||||
Data: &volume_server_pb.ReceiveFileRequest_Info{
|
||||
Info: &volume_server_pb.ReceiveFileInfo{
|
||||
VolumeId: t.volumeID,
|
||||
Ext: ext,
|
||||
Collection: t.collection,
|
||||
IsEcVolume: true,
|
||||
ShardId: shardId,
|
||||
FileSize: uint64(fileInfo.Size()),
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send file info: %v", err)
|
||||
}
|
||||
|
||||
// Send file content in chunks
|
||||
buffer := make([]byte, 64*1024) // 64KB chunks
|
||||
for {
|
||||
n, readErr := file.Read(buffer)
|
||||
if n > 0 {
|
||||
err = stream.Send(&volume_server_pb.ReceiveFileRequest{
|
||||
Data: &volume_server_pb.ReceiveFileRequest_FileContent{
|
||||
FileContent: buffer[:n],
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send file content: %v", err)
|
||||
}
|
||||
}
|
||||
if readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
if readErr != nil {
|
||||
return fmt.Errorf("failed to read file: %v", readErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stream and get response
|
||||
resp, err := stream.CloseAndRecv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to close stream: %v", err)
|
||||
}
|
||||
|
||||
if resp.Error != "" {
|
||||
return fmt.Errorf("server error: %s", resp.Error)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// mountEcShards mounts EC shards on destination servers
|
||||
func (t *ErasureCodingTask) mountEcShards() error {
|
||||
if t.shardAssignment == nil {
|
||||
return fmt.Errorf("shard assignment not available for mounting")
|
||||
}
|
||||
|
||||
// Mount only assigned shards on each destination
|
||||
for destNode, assignedShards := range t.shardAssignment {
|
||||
// Convert shard names to shard IDs for mounting
|
||||
var shardIds []uint32
|
||||
for _, shardType := range assignedShards {
|
||||
// Skip metadata files (.ecx, .vif) - only mount EC shards
|
||||
if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
|
||||
// Parse shard ID from "ec00", "ec01", etc.
|
||||
var shardId uint32
|
||||
if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
|
||||
shardIds = append(shardIds, shardId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(shardIds) == 0 {
|
||||
glog.V(1).Infof("No EC shards to mount on %s (only metadata files)", destNode)
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Mounting shards %v on %s", shardIds, destNode)
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(),
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
||||
VolumeId: t.volumeID,
|
||||
Collection: t.collection,
|
||||
ShardIds: shardIds,
|
||||
})
|
||||
return mountErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to mount shards %v on %s: %v", shardIds, destNode, err)
|
||||
} else {
|
||||
glog.V(1).Infof("Successfully mounted EC shards %v on %s", shardIds, destNode)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteOriginalVolume deletes the original volume and all its replicas from all servers
|
||||
func (t *ErasureCodingTask) deleteOriginalVolume() error {
|
||||
// Get replicas from task parameters (set during detection)
|
||||
replicas := t.getReplicas()
|
||||
|
||||
if len(replicas) == 0 {
|
||||
glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID)
|
||||
replicas = []string{t.server}
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Deleting volume %d from %d replica servers: %v", t.volumeID, len(replicas), replicas)
|
||||
|
||||
// Delete volume from all replica locations
|
||||
var deleteErrors []string
|
||||
successCount := 0
|
||||
|
||||
for _, replicaServer := range replicas {
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(),
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
||||
VolumeId: t.volumeID,
|
||||
OnlyEmpty: false, // Force delete since we've created EC shards
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
|
||||
glog.Warningf("Failed to delete volume %d from replica server %s: %v", t.volumeID, replicaServer, err)
|
||||
} else {
|
||||
successCount++
|
||||
glog.V(1).Infof("Successfully deleted volume %d from replica server %s", t.volumeID, replicaServer)
|
||||
}
|
||||
}
|
||||
|
||||
// Report results
|
||||
if len(deleteErrors) > 0 {
|
||||
glog.Warningf("Some volume deletions failed (%d/%d successful): %v", successCount, len(replicas), deleteErrors)
|
||||
// Don't return error - EC task should still be considered successful if shards are mounted
|
||||
} else {
|
||||
glog.V(1).Infof("Successfully deleted volume %d from all %d replica servers", t.volumeID, len(replicas))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getReplicas extracts replica servers from task parameters
|
||||
func (t *ErasureCodingTask) getReplicas() []string {
|
||||
// Access replicas from the parameters passed during Execute
|
||||
// We'll need to store these during Execute - let me add a field to the task
|
||||
return t.replicas
|
||||
}
|
||||
229
weed/worker/tasks/erasure_coding/monitoring.go
Normal file
229
weed/worker/tasks/erasure_coding/monitoring.go
Normal file
@@ -0,0 +1,229 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErasureCodingMetrics contains erasure coding-specific monitoring data
|
||||
type ErasureCodingMetrics struct {
|
||||
// Execution metrics
|
||||
VolumesEncoded int64 `json:"volumes_encoded"`
|
||||
TotalShardsCreated int64 `json:"total_shards_created"`
|
||||
TotalDataProcessed int64 `json:"total_data_processed"`
|
||||
TotalSourcesRemoved int64 `json:"total_sources_removed"`
|
||||
LastEncodingTime time.Time `json:"last_encoding_time"`
|
||||
|
||||
// Performance metrics
|
||||
AverageEncodingTime int64 `json:"average_encoding_time_seconds"`
|
||||
AverageShardSize int64 `json:"average_shard_size"`
|
||||
AverageDataShards int `json:"average_data_shards"`
|
||||
AverageParityShards int `json:"average_parity_shards"`
|
||||
SuccessfulOperations int64 `json:"successful_operations"`
|
||||
FailedOperations int64 `json:"failed_operations"`
|
||||
|
||||
// Distribution metrics
|
||||
ShardsPerDataCenter map[string]int64 `json:"shards_per_datacenter"`
|
||||
ShardsPerRack map[string]int64 `json:"shards_per_rack"`
|
||||
PlacementSuccessRate float64 `json:"placement_success_rate"`
|
||||
|
||||
// Current task metrics
|
||||
CurrentVolumeSize int64 `json:"current_volume_size"`
|
||||
CurrentShardCount int `json:"current_shard_count"`
|
||||
VolumesPendingEncoding int `json:"volumes_pending_encoding"`
|
||||
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// NewErasureCodingMetrics creates a new erasure coding metrics instance
|
||||
func NewErasureCodingMetrics() *ErasureCodingMetrics {
|
||||
return &ErasureCodingMetrics{
|
||||
LastEncodingTime: time.Now(),
|
||||
ShardsPerDataCenter: make(map[string]int64),
|
||||
ShardsPerRack: make(map[string]int64),
|
||||
}
|
||||
}
|
||||
|
||||
// RecordVolumeEncoded records a successful volume encoding operation
|
||||
func (m *ErasureCodingMetrics) RecordVolumeEncoded(volumeSize int64, shardsCreated int, dataShards int, parityShards int, encodingTime time.Duration, sourceRemoved bool) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.VolumesEncoded++
|
||||
m.TotalShardsCreated += int64(shardsCreated)
|
||||
m.TotalDataProcessed += volumeSize
|
||||
m.SuccessfulOperations++
|
||||
m.LastEncodingTime = time.Now()
|
||||
|
||||
if sourceRemoved {
|
||||
m.TotalSourcesRemoved++
|
||||
}
|
||||
|
||||
// Update average encoding time
|
||||
if m.AverageEncodingTime == 0 {
|
||||
m.AverageEncodingTime = int64(encodingTime.Seconds())
|
||||
} else {
|
||||
// Exponential moving average
|
||||
newTime := int64(encodingTime.Seconds())
|
||||
m.AverageEncodingTime = (m.AverageEncodingTime*4 + newTime) / 5
|
||||
}
|
||||
|
||||
// Update average shard size
|
||||
if shardsCreated > 0 {
|
||||
avgShardSize := volumeSize / int64(shardsCreated)
|
||||
if m.AverageShardSize == 0 {
|
||||
m.AverageShardSize = avgShardSize
|
||||
} else {
|
||||
m.AverageShardSize = (m.AverageShardSize*4 + avgShardSize) / 5
|
||||
}
|
||||
}
|
||||
|
||||
// Update average data/parity shards
|
||||
if m.AverageDataShards == 0 {
|
||||
m.AverageDataShards = dataShards
|
||||
m.AverageParityShards = parityShards
|
||||
} else {
|
||||
m.AverageDataShards = (m.AverageDataShards*4 + dataShards) / 5
|
||||
m.AverageParityShards = (m.AverageParityShards*4 + parityShards) / 5
|
||||
}
|
||||
}
|
||||
|
||||
// RecordFailure records a failed erasure coding operation
|
||||
func (m *ErasureCodingMetrics) RecordFailure() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.FailedOperations++
|
||||
}
|
||||
|
||||
// RecordShardPlacement records shard placement for distribution tracking
|
||||
func (m *ErasureCodingMetrics) RecordShardPlacement(dataCenter string, rack string) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.ShardsPerDataCenter[dataCenter]++
|
||||
rackKey := dataCenter + ":" + rack
|
||||
m.ShardsPerRack[rackKey]++
|
||||
}
|
||||
|
||||
// UpdateCurrentVolumeInfo updates current volume processing information
|
||||
func (m *ErasureCodingMetrics) UpdateCurrentVolumeInfo(volumeSize int64, shardCount int) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.CurrentVolumeSize = volumeSize
|
||||
m.CurrentShardCount = shardCount
|
||||
}
|
||||
|
||||
// SetVolumesPendingEncoding sets the number of volumes pending encoding
|
||||
func (m *ErasureCodingMetrics) SetVolumesPendingEncoding(count int) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.VolumesPendingEncoding = count
|
||||
}
|
||||
|
||||
// UpdatePlacementSuccessRate updates the placement success rate
|
||||
func (m *ErasureCodingMetrics) UpdatePlacementSuccessRate(rate float64) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if m.PlacementSuccessRate == 0 {
|
||||
m.PlacementSuccessRate = rate
|
||||
} else {
|
||||
// Exponential moving average
|
||||
m.PlacementSuccessRate = 0.8*m.PlacementSuccessRate + 0.2*rate
|
||||
}
|
||||
}
|
||||
|
||||
// GetMetrics returns a copy of the current metrics (without the mutex)
|
||||
func (m *ErasureCodingMetrics) GetMetrics() ErasureCodingMetrics {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
// Create deep copy of maps
|
||||
shardsPerDC := make(map[string]int64)
|
||||
for k, v := range m.ShardsPerDataCenter {
|
||||
shardsPerDC[k] = v
|
||||
}
|
||||
|
||||
shardsPerRack := make(map[string]int64)
|
||||
for k, v := range m.ShardsPerRack {
|
||||
shardsPerRack[k] = v
|
||||
}
|
||||
|
||||
// Create a copy without the mutex to avoid copying lock value
|
||||
return ErasureCodingMetrics{
|
||||
VolumesEncoded: m.VolumesEncoded,
|
||||
TotalShardsCreated: m.TotalShardsCreated,
|
||||
TotalDataProcessed: m.TotalDataProcessed,
|
||||
TotalSourcesRemoved: m.TotalSourcesRemoved,
|
||||
LastEncodingTime: m.LastEncodingTime,
|
||||
AverageEncodingTime: m.AverageEncodingTime,
|
||||
AverageShardSize: m.AverageShardSize,
|
||||
AverageDataShards: m.AverageDataShards,
|
||||
AverageParityShards: m.AverageParityShards,
|
||||
SuccessfulOperations: m.SuccessfulOperations,
|
||||
FailedOperations: m.FailedOperations,
|
||||
ShardsPerDataCenter: shardsPerDC,
|
||||
ShardsPerRack: shardsPerRack,
|
||||
PlacementSuccessRate: m.PlacementSuccessRate,
|
||||
CurrentVolumeSize: m.CurrentVolumeSize,
|
||||
CurrentShardCount: m.CurrentShardCount,
|
||||
VolumesPendingEncoding: m.VolumesPendingEncoding,
|
||||
}
|
||||
}
|
||||
|
||||
// GetSuccessRate returns the success rate as a percentage
|
||||
func (m *ErasureCodingMetrics) GetSuccessRate() float64 {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
total := m.SuccessfulOperations + m.FailedOperations
|
||||
if total == 0 {
|
||||
return 100.0
|
||||
}
|
||||
return float64(m.SuccessfulOperations) / float64(total) * 100.0
|
||||
}
|
||||
|
||||
// GetAverageDataProcessed returns the average data processed per volume
|
||||
func (m *ErasureCodingMetrics) GetAverageDataProcessed() float64 {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if m.VolumesEncoded == 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(m.TotalDataProcessed) / float64(m.VolumesEncoded)
|
||||
}
|
||||
|
||||
// GetSourceRemovalRate returns the percentage of sources removed after encoding
|
||||
func (m *ErasureCodingMetrics) GetSourceRemovalRate() float64 {
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if m.VolumesEncoded == 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(m.TotalSourcesRemoved) / float64(m.VolumesEncoded) * 100.0
|
||||
}
|
||||
|
||||
// Reset resets all metrics to zero
|
||||
func (m *ErasureCodingMetrics) Reset() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
*m = ErasureCodingMetrics{
|
||||
LastEncodingTime: time.Now(),
|
||||
ShardsPerDataCenter: make(map[string]int64),
|
||||
ShardsPerRack: make(map[string]int64),
|
||||
}
|
||||
}
|
||||
|
||||
// Global metrics instance for erasure coding tasks
|
||||
var globalErasureCodingMetrics = NewErasureCodingMetrics()
|
||||
|
||||
// GetGlobalErasureCodingMetrics returns the global erasure coding metrics instance
|
||||
func GetGlobalErasureCodingMetrics() *ErasureCodingMetrics {
|
||||
return globalErasureCodingMetrics
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
@@ -35,9 +36,19 @@ func RegisterErasureCodingTask() {
|
||||
Icon: "fas fa-shield-alt text-success",
|
||||
Capabilities: []string{"erasure_coding", "data_protection"},
|
||||
|
||||
Config: config,
|
||||
ConfigSpec: GetConfigSpec(),
|
||||
CreateTask: nil, // Uses typed task system - see init() in ec.go
|
||||
Config: config,
|
||||
ConfigSpec: GetConfigSpec(),
|
||||
CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
|
||||
if params == nil {
|
||||
return nil, fmt.Errorf("task parameters are required")
|
||||
}
|
||||
return NewErasureCodingTask(
|
||||
fmt.Sprintf("erasure_coding-%d", params.VolumeId),
|
||||
params.Server,
|
||||
params.VolumeId,
|
||||
params.Collection,
|
||||
), nil
|
||||
},
|
||||
DetectionFunc: Detection,
|
||||
ScanInterval: 1 * time.Hour,
|
||||
SchedulingFunc: Scheduling,
|
||||
40
weed/worker/tasks/erasure_coding/scheduling.go
Normal file
40
weed/worker/tasks/erasure_coding/scheduling.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
)
|
||||
|
||||
// Scheduling implements the scheduling logic for erasure coding tasks
|
||||
func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool {
|
||||
ecConfig := config.(*Config)
|
||||
|
||||
// Check if we have available workers
|
||||
if len(availableWorkers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Count running EC tasks
|
||||
runningCount := 0
|
||||
for _, runningTask := range runningTasks {
|
||||
if runningTask.Type == types.TaskTypeErasureCoding {
|
||||
runningCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Check concurrency limit
|
||||
if runningCount >= ecConfig.MaxConcurrent {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if any worker can handle EC tasks
|
||||
for _, worker := range availableWorkers {
|
||||
for _, capability := range worker.Capabilities {
|
||||
if capability == types.TaskTypeErasureCoding {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user