refactoring
This commit is contained in:
@@ -261,7 +261,7 @@ func (l *DiskLocation) loadExistingVolumesWithId(needleMapKind NeedleMapKind, ld
|
|||||||
l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout, diskId)
|
l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout, diskId)
|
||||||
glog.V(2).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId)
|
glog.V(2).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId)
|
||||||
|
|
||||||
l.loadAllEcShardsWithCallback(l.ecShardNotifyHandler)
|
l.loadAllEcShards(l.ecShardNotifyHandler)
|
||||||
glog.V(2).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId)
|
glog.V(2).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,11 +136,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId) (err error) {
|
func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) {
|
||||||
return l.loadEcShardsWithCallback(shards, collection, vid, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *DiskLocation) loadEcShardsWithCallback(shards []string, collection string, vid needle.VolumeId, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) {
|
|
||||||
|
|
||||||
for _, shard := range shards {
|
for _, shard := range shards {
|
||||||
shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64)
|
shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64)
|
||||||
@@ -165,11 +161,7 @@ func (l *DiskLocation) loadEcShardsWithCallback(shards []string, collection stri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) loadAllEcShards() (err error) {
|
func (l *DiskLocation) loadAllEcShards(onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) {
|
||||||
return l.loadAllEcShardsWithCallback(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *DiskLocation) loadAllEcShardsWithCallback(onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) (err error) {
|
|
||||||
|
|
||||||
dirEntries, err := os.ReadDir(l.Directory)
|
dirEntries, err := os.ReadDir(l.Directory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -308,7 +300,7 @@ func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, vo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to load the EC shards
|
// Attempt to load the EC shards
|
||||||
if err := l.loadEcShardsWithCallback(shards, collection, volumeId, onShardLoad); err != nil {
|
if err := l.loadEcShards(shards, collection, volumeId, onShardLoad); err != nil {
|
||||||
// If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
|
// If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
|
||||||
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
// If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
|
||||||
if datExists {
|
if datExists {
|
||||||
|
|||||||
@@ -177,13 +177,13 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run loadAllEcShards
|
// Run loadAllEcShards
|
||||||
loadErr := diskLocation.loadAllEcShards()
|
loadErr := diskLocation.loadAllEcShards(nil)
|
||||||
if loadErr != nil {
|
if loadErr != nil {
|
||||||
t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
|
t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test idempotency - running again should not cause issues
|
// Test idempotency - running again should not cause issues
|
||||||
loadErr2 := diskLocation.loadAllEcShards()
|
loadErr2 := diskLocation.loadAllEcShards(nil)
|
||||||
if loadErr2 != nil {
|
if loadErr2 != nil {
|
||||||
t.Logf("Second loadAllEcShards returned error: %v", loadErr2)
|
t.Logf("Second loadAllEcShards returned error: %v", loadErr2)
|
||||||
}
|
}
|
||||||
@@ -550,7 +550,7 @@ func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
|
|||||||
// Do not create .ecx: trigger orphaned-shards cleanup when .dat exists
|
// Do not create .ecx: trigger orphaned-shards cleanup when .dat exists
|
||||||
|
|
||||||
// Run loadAllEcShards
|
// Run loadAllEcShards
|
||||||
loadErr := diskLocation.loadAllEcShards()
|
loadErr := diskLocation.loadAllEcShards(nil)
|
||||||
if loadErr != nil {
|
if loadErr != nil {
|
||||||
t.Logf("loadAllEcShards error: %v", loadErr)
|
t.Logf("loadAllEcShards error: %v", loadErr)
|
||||||
}
|
}
|
||||||
@@ -621,7 +621,7 @@ func TestDistributedEcVolumeNoFileDeletion(t *testing.T) {
|
|||||||
// NO .dat file - this is a distributed EC volume
|
// NO .dat file - this is a distributed EC volume
|
||||||
|
|
||||||
// Run loadAllEcShards - this should fail but NOT delete shard files
|
// Run loadAllEcShards - this should fail but NOT delete shard files
|
||||||
loadErr := diskLocation.loadAllEcShards()
|
loadErr := diskLocation.loadAllEcShards(nil)
|
||||||
if loadErr != nil {
|
if loadErr != nil {
|
||||||
t.Logf("loadAllEcShards returned error (expected): %v", loadErr)
|
t.Logf("loadAllEcShards returned error (expected): %v", loadErr)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user