* ec: add diskType parameter to core EC functions
Add diskType parameter to:
- ecBalancer struct
- collectEcVolumeServersByDc()
- collectEcNodesForDC()
- collectEcNodes()
- EcBalance()
This allows EC operations to target specific disk types (hdd, ssd, etc.)
instead of being hardcoded to HardDriveType only.
For backward compatibility, all callers currently pass types.HardDriveType
as the default value. Subsequent commits will add -diskType flags to
the individual EC commands.
* ec: update helper functions to use configurable diskType
Update the following functions to accept/use diskType parameter:
- findEcVolumeShards()
- addEcVolumeShards()
- deleteEcVolumeShards()
- moveMountedShardToEcNode()
- countShardsByRack()
- pickNEcShardsToMoveFrom()
All ecBalancer methods now use ecb.diskType instead of hardcoded
types.HardDriveType. Non-ecBalancer callers (like volumeServer.evacuate
and ec.rebuild) use types.HardDriveType as the default.
Update all test files to pass diskType where needed.
* ec: add -diskType flag to ec.balance and ec.encode commands
Add -diskType flag to specify the target disk type for EC operations:
- ec.balance -diskType=ssd
- ec.encode -diskType=ssd
The disk type can be 'hdd', 'ssd', or empty for default (hdd).
This allows placing EC shards on SSD or other disk types instead of
only HDD.
Example usage:
ec.balance -collection=mybucket -diskType=ssd -apply
ec.encode -collection=mybucket -diskType=ssd -force
* test: add integration tests for EC disk type support
Add integration tests to verify the -diskType flag works correctly:
- TestECDiskTypeSupport: Tests EC encode and balance with SSD disk type
- TestECDiskTypeMixedCluster: Tests EC operations on a mixed HDD/SSD cluster
The tests verify:
- Volume servers can be configured with specific disk types
- ec.encode accepts -diskType flag and encodes to the correct disk type
- ec.balance accepts -diskType flag and balances on the correct disk type
- Mixed disk type clusters work correctly with separate collections
* ec: add -sourceDiskType to ec.encode and -diskType to ec.decode
ec.encode:
- Add -sourceDiskType flag to filter source volumes by disk type
- This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards)
- -diskType specifies target disk type for EC shards
ec.decode:
- Add -diskType flag to specify source disk type where EC shards are stored
- Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType
Examples:
# Encode SSD volumes to HDD EC shards (tier migration)
ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd
# Decode EC shards from SSD
ec.decode -collection=mybucket -diskType=ssd
Integration tests updated to cover new flags.
* ec: fix variable shadowing and add -diskType to ec.rebuild and volumeServer.evacuate
Address code review comments:
1. Fix variable shadowing in collectEcVolumeServersByDc():
- Rename loop variable 'diskType' to 'diskTypeKey' and 'diskTypeStr'
to avoid shadowing the function parameter
2. Fix hardcoded HardDriveType in ecBalancer methods:
- balanceEcRack(): use ecb.diskType instead of types.HardDriveType
- collectVolumeIdToEcNodes(): use ecb.diskType
3. Add -diskType flag to ec.rebuild command:
- Add diskType field to ecRebuilder struct
- Pass diskType to collectEcNodes() and addEcVolumeShards()
4. Add -diskType flag to volumeServer.evacuate command:
- Add diskType field to commandVolumeServerEvacuate struct
- Pass diskType to collectEcVolumeServersByDc() and moveMountedShardToEcNode()
* test: add diskType field to ecBalancer in TestPickEcNodeToBalanceShardsInto
Address nitpick comment: ensure test ecBalancer struct has diskType
field set for consistency with other tests.
* ec: filter disk selection by disk type in pickBestDiskOnNode
When evacuating or rebalancing EC shards, pickBestDiskOnNode now
filters disks by the target disk type. This ensures:
1. EC shards from SSD disks are moved to SSD disks on destination nodes
2. EC shards from HDD disks are moved to HDD disks on destination nodes
3. No cross-disk-type shard movement occurs
This maintains the storage tier isolation when moving EC shards
between nodes during evacuation or rebalancing operations.
* ec: allow disk type fallback during evacuation
Update pickBestDiskOnNode to accept a strictDiskType parameter:
- strictDiskType=true (balancing): Only use disks of matching type.
This maintains storage tier isolation during normal rebalancing.
- strictDiskType=false (evacuation): Prefer same disk type, but
fall back to other disk types if no matching disk is available.
This ensures evacuation can complete even when same-type capacity
is insufficient.
Priority order for evacuation:
1. Same disk type with lowest shard count (preferred)
2. Different disk type with lowest shard count (fallback)
* test: use defer for lock/unlock to prevent lock leaks
Use defer to ensure locks are always released, even on early returns
or test failures. This prevents lock leaks that could cause subsequent
tests to hang or fail.
Changes:
- Return early if lock acquisition fails
- Immediately defer unlock after successful lock
- Remove redundant explicit unlock calls at end of tests
- Fix unused variable warning (err -> encodeErr/locErr)
* ec: dynamically discover disk types from topology for evacuation
Disk types are free-form tags (e.g., 'ssd', 'nvme', 'archive') that come
from the topology, not a hardcoded set. Only 'hdd' (or empty) is the
default disk type.
Use collectVolumeDiskTypes() to discover all disk types present in the
cluster topology instead of hardcoding [HardDriveType, SsdType].
* test: add evacuation fallback and cross-rack EC placement tests
Add two new integration tests:
1. TestEvacuationFallbackBehavior:
- Tests that when same disk type has no capacity, shards fall back
to other disk types during evacuation
- Creates cluster with 1 SSD + 2 HDD servers (limited SSD capacity)
- Verifies pickBestDiskOnNode behavior with strictDiskType=false
2. TestCrossRackECPlacement:
- Tests EC shard distribution across different racks
- Creates cluster with 4 servers in 4 different racks
- Verifies shards are spread across multiple racks
- Tests that ec.balance respects rack placement
Helper functions added:
- startLimitedSsdCluster: 1 SSD + 2 HDD servers
- startMultiRackCluster: 4 servers in 4 racks
- countShardsPerRack: counts EC shards per rack from disk
* test: fix collection mismatch in TestCrossRackECPlacement
The EC commands were using collection 'rack_test' but uploaded test data
uses collection 'test' (default). This caused ec.encode/ec.balance to not
find the uploaded volume.
Fix: Change EC commands to use '-collection test' to match the uploaded data.
Addresses review comment from PR #7607.
* test: close log files in MultiDiskCluster.Stop() to prevent FD leaks
Track log files in MultiDiskCluster.logFiles and close them in Stop()
to prevent file descriptor accumulation in long-running or many-test
scenarios.
Addresses review comment about logging resources cleanup.
* test: improve EC integration tests with proper assertions
- Add assertNoFlagError helper to detect flag parsing regressions
- Update diskType subtests to fail on flag errors (ec.encode, ec.balance, ec.decode)
- Update verify_disktype_flag_parsing to check help output contains diskType
- Remove verify_fallback_disk_selection (was documentation-only, not executable)
- Add assertion to verify_cross_rack_distribution for minimum 2 racks
- Consolidate uploadTestDataWithDiskType to accept collection parameter
- Remove duplicate uploadTestDataWithDiskTypeMixed function
* test: extract captureCommandOutput helper and fix error handling
- Add captureCommandOutput helper to reduce code duplication in diskType tests
- Create commandRunner interface to match shell command Do method
- Update ec_encode_with_ssd_disktype, ec_balance_with_ssd_disktype,
ec_encode_with_source_disktype, ec_decode_with_disktype to use helper
- Fix filepath.Glob error handling in countShardsPerRack instead of ignoring it
* test: add flag validation to ec_balance_targets_correct_disk_type
Add assertNoFlagError calls after ec.balance commands to ensure
-diskType flag is properly recognized for both SSD and HDD disk types.
* test: add proper assertions for EC command results
- ec_encode_with_ssd_disktype: check for expected volume-related errors
- ec_balance_with_ssd_disktype: require success with require.NoError
- ec_encode_with_source_disktype: check for expected no-volume errors
- ec_decode_with_disktype: check for expected no-ec-volume errors
- upload_to_ssd_and_hdd: use require.NoError for setup validation
Tests now properly fail on unexpected errors rather than just logging.
* test: fix missing unlock in ec_encode_with_disk_awareness
Add defer unlock pattern to ensure lock is always released, matching
the pattern used in other subtests.
* test: improve helper robustness
- Make assertNoFlagError case-insensitive for pattern matching
- Use defer in captureCommandOutput to restore stdout/stderr and close
pipe ends to avoid FD leaks even if cmd.Do panics
413 lines
13 KiB
Go
413 lines
13 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandEcRebuild{})
|
|
}
|
|
|
|
type ecRebuilder struct {
|
|
commandEnv *CommandEnv
|
|
ecNodes []*EcNode
|
|
writer io.Writer
|
|
applyChanges bool
|
|
collections []string
|
|
diskType types.DiskType
|
|
|
|
ewg *ErrorWaitGroup
|
|
ecNodesMu sync.Mutex
|
|
}
|
|
|
|
type commandEcRebuild struct {
|
|
}
|
|
|
|
func (c *commandEcRebuild) Name() string {
|
|
return "ec.rebuild"
|
|
}
|
|
|
|
func (c *commandEcRebuild) Help() string {
|
|
return `find and rebuild missing ec shards among volume servers
|
|
|
|
ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-apply] [-maxParallelization N] [-diskType=<disk_type>]
|
|
|
|
Options:
|
|
-collection: specify a collection name, or "EACH_COLLECTION" to process all collections
|
|
-apply: actually perform the rebuild operations (default is dry-run mode)
|
|
-maxParallelization: number of volumes to rebuild concurrently (default: 10)
|
|
Increase for faster rebuilds with more system resources.
|
|
Decrease if experiencing resource contention or instability.
|
|
-diskType: disk type for EC shards (hdd, ssd, or empty for default hdd)
|
|
|
|
Algorithm:
|
|
|
|
For each type of volume server (different max volume count limit){
|
|
for each collection {
|
|
rebuildEcVolumes()
|
|
}
|
|
}
|
|
|
|
func rebuildEcVolumes(){
|
|
idealWritableVolumes = totalWritableVolumes / numVolumeServers
|
|
for {
|
|
sort all volume servers ordered by the number of local writable volumes
|
|
pick the volume server A with the lowest number of writable volumes x
|
|
pick the volume server B with the highest number of writable volumes y
|
|
if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
|
|
if B has a writable volume id v that A does not have {
|
|
move writable volume v from A to B
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandEcRebuild) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
|
|
maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
|
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
|
|
diskTypeStr := fixCommand.String("diskType", "", "disk type for EC shards (hdd, ssd, or empty for default hdd)")
|
|
// TODO: remove this alias
|
|
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
|
|
if err = fixCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges)
|
|
infoAboutSimulationMode(writer, *applyChanges, "-apply")
|
|
|
|
if err = commandEnv.confirmIsLocked(args); err != nil {
|
|
return
|
|
}
|
|
|
|
diskType := types.ToDiskType(*diskTypeStr)
|
|
|
|
// collect all ec nodes
|
|
allEcNodes, _, err := collectEcNodes(commandEnv, diskType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var collections []string
|
|
if *collection == "EACH_COLLECTION" {
|
|
collections, err = ListCollectionNames(commandEnv, false, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
collections = []string{*collection}
|
|
}
|
|
|
|
erb := &ecRebuilder{
|
|
commandEnv: commandEnv,
|
|
ecNodes: allEcNodes,
|
|
writer: writer,
|
|
applyChanges: *applyChanges,
|
|
collections: collections,
|
|
diskType: diskType,
|
|
|
|
ewg: NewErrorWaitGroup(*maxParallelization),
|
|
}
|
|
|
|
fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections))
|
|
for _, c := range collections {
|
|
erb.rebuildEcVolumes(c)
|
|
}
|
|
|
|
return erb.ewg.Wait()
|
|
}
|
|
|
|
func (erb *ecRebuilder) write(format string, a ...any) {
|
|
fmt.Fprintf(erb.writer, format, a...)
|
|
}
|
|
|
|
func (erb *ecRebuilder) isLocked() bool {
|
|
return erb.commandEnv.isLocked()
|
|
}
|
|
|
|
// countLocalShards returns the number of shards already present locally on the node for the given volume.
|
|
func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volumeId needle.VolumeId) int {
|
|
for _, diskInfo := range node.info.DiskInfos {
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
|
|
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
|
|
return len(shardBits.ShardIds())
|
|
}
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// selectAndReserveRebuilder atomically selects a rebuilder node with sufficient free slots
|
|
// and reserves slots only for the non-local shards that need to be copied/generated.
|
|
func (erb *ecRebuilder) selectAndReserveRebuilder(collection string, volumeId needle.VolumeId) (*EcNode, int, error) {
|
|
erb.ecNodesMu.Lock()
|
|
defer erb.ecNodesMu.Unlock()
|
|
|
|
if len(erb.ecNodes) == 0 {
|
|
return nil, 0, fmt.Errorf("no ec nodes available")
|
|
}
|
|
|
|
// Find the node with the most free slots, considering local shards
|
|
var bestNode *EcNode
|
|
var bestSlotsNeeded int
|
|
var maxAvailableSlots int
|
|
var minSlotsNeeded int = erasure_coding.TotalShardsCount // Start with maximum possible
|
|
for _, node := range erb.ecNodes {
|
|
localShards := erb.countLocalShards(node, collection, volumeId)
|
|
slotsNeeded := erasure_coding.TotalShardsCount - localShards
|
|
if slotsNeeded < 0 {
|
|
slotsNeeded = 0
|
|
}
|
|
|
|
if node.freeEcSlot > maxAvailableSlots {
|
|
maxAvailableSlots = node.freeEcSlot
|
|
}
|
|
|
|
if slotsNeeded < minSlotsNeeded {
|
|
minSlotsNeeded = slotsNeeded
|
|
}
|
|
|
|
if node.freeEcSlot >= slotsNeeded {
|
|
if bestNode == nil || node.freeEcSlot > bestNode.freeEcSlot {
|
|
bestNode = node
|
|
bestSlotsNeeded = slotsNeeded
|
|
}
|
|
}
|
|
}
|
|
|
|
if bestNode == nil {
|
|
return nil, 0, fmt.Errorf("no node has sufficient free slots for volume %d (need at least %d slots, max available: %d)",
|
|
volumeId, minSlotsNeeded, maxAvailableSlots)
|
|
}
|
|
|
|
// Reserve slots only for non-local shards
|
|
bestNode.freeEcSlot -= bestSlotsNeeded
|
|
|
|
return bestNode, bestSlotsNeeded, nil
|
|
}
|
|
|
|
// releaseRebuilder releases the reserved slots back to the rebuilder node.
|
|
func (erb *ecRebuilder) releaseRebuilder(node *EcNode, slotsToRelease int) {
|
|
erb.ecNodesMu.Lock()
|
|
defer erb.ecNodesMu.Unlock()
|
|
|
|
// Release slots by incrementing the free slot count
|
|
node.freeEcSlot += slotsToRelease
|
|
}
|
|
|
|
func (erb *ecRebuilder) rebuildEcVolumes(collection string) {
|
|
fmt.Printf("rebuildEcVolumes for %q\n", collection)
|
|
|
|
// collect vid => each shard locations, similar to ecShardMap in topology.go
|
|
ecShardMap := make(EcShardMap)
|
|
erb.ecNodesMu.Lock()
|
|
for _, ecNode := range erb.ecNodes {
|
|
ecShardMap.registerEcNode(ecNode, collection)
|
|
}
|
|
erb.ecNodesMu.Unlock()
|
|
|
|
for vid, locations := range ecShardMap {
|
|
shardCount := locations.shardCount()
|
|
if shardCount == erasure_coding.TotalShardsCount {
|
|
continue
|
|
}
|
|
if shardCount < erasure_coding.DataShardsCount {
|
|
// Capture variables for closure
|
|
vid := vid
|
|
shardCount := shardCount
|
|
erb.ewg.Add(func() error {
|
|
return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount)
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Capture variables for closure
|
|
vid := vid
|
|
locations := locations
|
|
|
|
erb.ewg.Add(func() error {
|
|
// Select rebuilder and reserve slots atomically per volume
|
|
rebuilder, slotsToReserve, err := erb.selectAndReserveRebuilder(collection, vid)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to select rebuilder for volume %d: %v", vid, err)
|
|
}
|
|
defer erb.releaseRebuilder(rebuilder, slotsToReserve)
|
|
|
|
return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error {
|
|
if !erb.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
|
|
|
|
// collect shard files to rebuilder local disk
|
|
var generatedShardIds []uint32
|
|
copiedShardIds, _, err := erb.prepareDataToRecover(rebuilder, collection, volumeId, locations)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
// clean up working files
|
|
|
|
// ask the rebuilder to delete the copied shards
|
|
err = sourceServerDeleteEcShards(erb.commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), copiedShardIds)
|
|
if err != nil {
|
|
erb.write("%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
|
|
}
|
|
|
|
}()
|
|
|
|
if !erb.applyChanges {
|
|
return nil
|
|
}
|
|
|
|
// generate ec shards, and maybe ecx file
|
|
generatedShardIds, err = erb.generateMissingShards(collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// mount the generated shards
|
|
err = mountEcShards(erb.commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), generatedShardIds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ensure ECNode updates are atomic
|
|
erb.ecNodesMu.Lock()
|
|
defer erb.ecNodesMu.Unlock()
|
|
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds, erb.diskType)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
|
|
|
|
err = operation.WithVolumeServerClient(false, sourceLocation, erb.commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
resp, rebuildErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
})
|
|
if rebuildErr == nil {
|
|
rebuiltShardIds = resp.RebuiltShardIds
|
|
}
|
|
return rebuildErr
|
|
})
|
|
return
|
|
}
|
|
|
|
func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations) (copiedShardIds []uint32, localShardIds []uint32, err error) {
|
|
|
|
needEcxFile := true
|
|
var localShardBits erasure_coding.ShardBits
|
|
for _, diskInfo := range rebuilder.info.DiskInfos {
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
|
|
needEcxFile = false
|
|
localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
|
|
}
|
|
}
|
|
}
|
|
|
|
for shardId, ecNodes := range locations {
|
|
if len(ecNodes) == 0 {
|
|
erb.write("missing shard %d.%d\n", volumeId, shardId)
|
|
continue
|
|
}
|
|
|
|
if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) {
|
|
localShardIds = append(localShardIds, uint32(shardId))
|
|
erb.write("use existing shard %d.%d\n", volumeId, shardId)
|
|
continue
|
|
}
|
|
|
|
var copyErr error
|
|
if erb.applyChanges {
|
|
copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), erb.commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
|
VolumeId: uint32(volumeId),
|
|
Collection: collection,
|
|
ShardIds: []uint32{uint32(shardId)},
|
|
CopyEcxFile: needEcxFile,
|
|
CopyEcjFile: true,
|
|
CopyVifFile: needEcxFile,
|
|
SourceDataNode: ecNodes[0].info.Id,
|
|
})
|
|
return copyErr
|
|
})
|
|
if copyErr == nil && needEcxFile {
|
|
needEcxFile = false
|
|
}
|
|
}
|
|
if copyErr != nil {
|
|
erb.write("%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr)
|
|
} else {
|
|
erb.write("%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id)
|
|
copiedShardIds = append(copiedShardIds, uint32(shardId))
|
|
}
|
|
|
|
}
|
|
|
|
if len(copiedShardIds)+len(localShardIds) >= erasure_coding.DataShardsCount {
|
|
return copiedShardIds, localShardIds, nil
|
|
}
|
|
|
|
return nil, nil, fmt.Errorf("%d shards are not enough to recover volume %d", len(copiedShardIds)+len(localShardIds), volumeId)
|
|
|
|
}
|
|
|
|
type EcShardMap map[needle.VolumeId]EcShardLocations
|
|
type EcShardLocations [][]*EcNode
|
|
|
|
func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) {
|
|
for _, diskInfo := range ecNode.info.DiskInfos {
|
|
for _, shardInfo := range diskInfo.EcShardInfos {
|
|
if shardInfo.Collection == collection {
|
|
existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)]
|
|
if !found {
|
|
// Use MaxShardCount (32) to support custom EC ratios
|
|
existing = make([][]*EcNode, erasure_coding.MaxShardCount)
|
|
ecShardMap[needle.VolumeId(shardInfo.Id)] = existing
|
|
}
|
|
for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() {
|
|
existing[shardId] = append(existing[shardId], ecNode)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ecShardLocations EcShardLocations) shardCount() (count int) {
|
|
for _, locations := range ecShardLocations {
|
|
if len(locations) > 0 {
|
|
count++
|
|
}
|
|
}
|
|
return
|
|
}
|