filer: filer.copy cleanup in case of failed uploads
This commit is contained in:
@@ -400,15 +400,21 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(chunksChan)
|
close(chunksChan)
|
||||||
|
|
||||||
if uploadError != nil {
|
|
||||||
return uploadError
|
|
||||||
}
|
|
||||||
|
|
||||||
var chunks []*filer_pb.FileChunk
|
var chunks []*filer_pb.FileChunk
|
||||||
for chunk := range chunksChan {
|
for chunk := range chunksChan {
|
||||||
chunks = append(chunks, chunk)
|
chunks = append(chunks, chunk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if uploadError != nil {
|
||||||
|
var fileIds []string
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
fileIds = append(fileIds, chunk.FileId)
|
||||||
|
}
|
||||||
|
operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
|
||||||
|
return uploadError
|
||||||
|
}
|
||||||
|
|
||||||
if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
request := &filer_pb.CreateEntryRequest{
|
request := &filer_pb.CreateEntryRequest{
|
||||||
Directory: task.destinationUrlPath,
|
Directory: task.destinationUrlPath,
|
||||||
|
|||||||
Reference in New Issue
Block a user