collect ec shard from multiple locations
fix https://github.com/seaweedfs/seaweedfs/issues/4365
This commit is contained in:
@@ -408,7 +408,9 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
|
|||||||
|
|
||||||
glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
|
glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
|
||||||
|
|
||||||
v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
|
// collect .ec00 ~ .ec09 files
|
||||||
|
shardFileNames := make([]string, erasure_coding.DataShardsCount)
|
||||||
|
v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
|
||||||
if !found {
|
if !found {
|
||||||
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
|
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
|
||||||
}
|
}
|
||||||
@@ -417,6 +419,12 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
|
|||||||
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
|
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
||||||
|
if shardFileNames[shardId] == "" {
|
||||||
|
return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
|
dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
|
||||||
// calculate .dat file size
|
// calculate .dat file size
|
||||||
datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
|
datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
|
||||||
@@ -425,7 +433,7 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
|
|||||||
}
|
}
|
||||||
|
|
||||||
// write .dat file from .ec00 ~ .ec09 files
|
// write .dat file from .ec00 ~ .ec09 files
|
||||||
if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
|
if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize, shardFileNames); err != nil {
|
||||||
return nil, fmt.Errorf("WriteDatFile %s: %v", dataBaseFileName, err)
|
return nil, fmt.Errorf("WriteDatFile %s: %v", dataBaseFileName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,6 +38,22 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
|
||||||
|
l.ecVolumesLock.RLock()
|
||||||
|
defer l.ecVolumesLock.RUnlock()
|
||||||
|
|
||||||
|
ecVolume, found = l.ecVolumes[vid]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, ecShard := range ecVolume.Shards {
|
||||||
|
if ecShard.ShardId < erasure_coding.ShardId(len(shardFileNames)) {
|
||||||
|
shardFileNames[ecShard.ShardId] = erasure_coding.EcShardFileName(ecVolume.Collection, l.Directory, int(ecVolume.VolumeId)) + erasure_coding.ToExt(int(ecShard.ShardId))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
|
func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
|
||||||
l.ecVolumesLock.RLock()
|
l.ecVolumesLock.RLock()
|
||||||
defer l.ecVolumesLock.RUnlock()
|
defer l.ecVolumesLock.RUnlock()
|
||||||
|
|||||||
@@ -151,7 +151,7 @@ func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
|
// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
|
||||||
func WriteDatFile(baseFileName string, datFileSize int64) error {
|
func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error {
|
||||||
|
|
||||||
datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if openErr != nil {
|
if openErr != nil {
|
||||||
@@ -161,13 +161,19 @@ func WriteDatFile(baseFileName string, datFileSize int64) error {
|
|||||||
|
|
||||||
inputFiles := make([]*os.File, DataShardsCount)
|
inputFiles := make([]*os.File, DataShardsCount)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
||||||
|
if inputFiles[shardId] != nil {
|
||||||
|
inputFiles[shardId].Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
for shardId := 0; shardId < DataShardsCount; shardId++ {
|
||||||
shardFileName := baseFileName + ToExt(shardId)
|
inputFiles[shardId], openErr = os.OpenFile(shardFileNames[shardId], os.O_RDONLY, 0)
|
||||||
inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0)
|
|
||||||
if openErr != nil {
|
if openErr != nil {
|
||||||
return openErr
|
return openErr
|
||||||
}
|
}
|
||||||
defer inputFiles[shardId].Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
|
for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
|
||||||
|
|||||||
@@ -116,6 +116,17 @@ func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, boo
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shardFiles is a list of shard files, which is used to return the shard locations
|
||||||
|
func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
|
||||||
|
for _, location := range s.Locations {
|
||||||
|
if s, found := location.CollectEcShards(vid, shardFileNames); found {
|
||||||
|
ecVolume = s
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
|
func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
|
||||||
for _, location := range s.Locations {
|
for _, location := range s.Locations {
|
||||||
location.DestroyEcVolume(vid)
|
location.DestroyEcVolume(vid)
|
||||||
|
|||||||
Reference in New Issue
Block a user