From 81369b8a8318c08cc23f364e58d7d74569e9587b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 17 Mar 2026 16:49:56 -0700 Subject: [PATCH] improve: large file sync throughput for remote.cache and filer.sync (#8676) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * improve large file sync throughput for remote.cache and filer.sync Three main throughput improvements: 1. Adaptive chunk sizing for remote.cache: targets ~32 chunks per file instead of always starting at 5MB. A 500MB file now uses ~16MB chunks (32 chunks) instead of 5MB chunks (100 chunks), reducing per-chunk overhead (volume assign, gRPC call, needle write) by 3x. 2. Configurable concurrency at every layer: - remote.cache chunk concurrency: -chunkConcurrency flag (default 8) - remote.cache S3 download concurrency: -downloadConcurrency flag (default raised from 1 to 5 per chunk) - filer.sync chunk concurrency: -chunkConcurrency flag (default 32) 3. S3 multipart download concurrency raised from 1 to 5: the S3 manager downloader was using Concurrency=1, serializing all part downloads within each chunk. This alone can 5x per-chunk download speed. The concurrency values flow through the gRPC request chain: shell command → CacheRemoteObjectToLocalClusterRequest → FetchAndWriteNeedleRequest → S3 downloader Zero values in the request mean "use server defaults", maintaining full backward compatibility with existing callers. Ref #8481 * fix: use full maxMB for chunk size cap and remove loop guard Address review feedback: - Use full maxMB instead of maxMB/2 for maxChunkSize to avoid unnecessarily limiting chunk size for very large files. - Remove chunkSize < maxChunkSize guard from the safety loop so it can always grow past maxChunkSize when needed to stay under 1000 chunks (e.g., extremely large files with small maxMB). * address review feedback: help text, validation, naming, docs - Fix help text for -chunkConcurrency and -downloadConcurrency flags to say "0 = server default" instead of advertising specific numeric defaults that could drift from the server implementation. - Validate chunkConcurrency and downloadConcurrency are within int32 range before narrowing, returning a user-facing error if out of range. - Rename ReadRemoteErr to readRemoteErr to follow Go naming conventions. - Add doc comment to SetChunkConcurrency noting it must be called during initialization before replication goroutines start. - Replace doubling loop in chunk size safety check with direct ceil(remoteSize/1000) computation to guarantee the 1000-chunk cap. * address Copilot review: clamp concurrency, fix chunk count, clarify proto docs - Use ceiling division for chunk count check to avoid overcounting when file size is an exact multiple of chunk size. - Clamp chunkConcurrency (max 1024) and downloadConcurrency (max 1024 at filer, max 64 at volume server) to prevent excessive goroutines. - Always use ReadFileWithConcurrency when the client supports it, falling back to the implementation's default when value is 0. - Clarify proto comments that download_concurrency only applies when the remote storage client supports it (currently S3). - Include specific server defaults in help text (e.g., "0 = server default 8") so users see the actual values in -h output. * fix data race on executionErr and use %w for error wrapping - Protect concurrent writes to executionErr in remote.cache worker goroutines with a sync.Mutex to eliminate the data race. - Use %w instead of %v in volume_grpc_remote.go error formatting to preserve the error chain for errors.Is/errors.As callers. --- other/java/client/src/main/proto/filer.proto | 2 + weed/command/filer_sync.go | 11 +++- weed/filer/read_remote.go | 10 ++-- weed/pb/filer.proto | 2 + weed/pb/filer_pb/filer.pb.go | 32 ++++++++--- weed/pb/volume_server.proto | 1 + weed/pb/volume_server_pb/volume_server.pb.go | 29 ++++++---- weed/remote_storage/remote_storage.go | 6 ++ weed/remote_storage/s3/s3_storage_client.go | 9 ++- weed/replication/sink/filersink/filer_sink.go | 10 ++++ weed/server/filer_grpc_server_remote.go | 55 ++++++++++++++----- weed/server/volume_grpc_remote.go | 18 +++++- weed/shell/command_remote_cache.go | 23 ++++++-- 13 files changed, 160 insertions(+), 48 deletions(-) diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index e17e51fe7..2f6a4bd99 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -472,6 +472,8 @@ message FilerConf { message CacheRemoteObjectToLocalClusterRequest { string directory = 1; string name = 2; + int32 chunk_concurrency = 3; // parallel chunk downloads per file, 0 = default (8) + int32 download_concurrency = 4; // multipart download concurrency per chunk (if supported by remote storage), 0 = default (5 for S3) } message CacheRemoteObjectToLocalClusterResponse { Entry entry = 1; diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 5663558f2..3294b4ed7 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -48,8 +48,9 @@ type SyncOptions struct { bProxyByFiler *bool metricsHttpIp *string metricsHttpPort *int - concurrency *int - aDoDeleteFiles *bool + concurrency *int + chunkConcurrency *int + aDoDeleteFiles *bool bDoDeleteFiles *bool clientId int32 clientEpoch atomic.Int32 @@ -104,6 +105,7 @@ func init() { syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond") syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.") + syncOptions.chunkConcurrency = cmdFilerSynchronize.Flag.Int("chunkConcurrency", 32, "The maximum number of chunks that will be replicated concurrently per file.") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip") @@ -210,6 +212,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.bDiskType, *syncOptions.bDebug, *syncOptions.concurrency, + *syncOptions.chunkConcurrency, *syncOptions.bDoDeleteFiles, aFilerSignature, bFilerSignature, @@ -249,6 +252,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.aDiskType, *syncOptions.aDebug, *syncOptions.concurrency, + *syncOptions.chunkConcurrency, *syncOptions.aDoDeleteFiles, bFilerSignature, aFilerSignature, @@ -281,7 +285,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd } func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { // if first time, start from now // if has previously synced, resume from that point of time @@ -297,6 +301,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) + filerSink.SetChunkConcurrency(chunkConcurrency) filerSink.SetSourceFiler(filerSource) persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug) diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index 474681049..b785014f3 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -28,13 +28,15 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot // CacheRemoteObjectToLocalCluster caches a remote object to the local cluster. // It returns the updated entry with local chunk locations. -// Parameters remoteConf and remoteLocation are kept for backward compatibility but are not used. -func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) (*filer_pb.Entry, error) { +// chunkConcurrency and downloadConcurrency of 0 mean use server defaults. +func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, parent util.FullPath, entry *filer_pb.Entry, chunkConcurrency int32, downloadConcurrency int32) (*filer_pb.Entry, error) { var cachedEntry *filer_pb.Entry err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, cacheErr := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ - Directory: string(parent), - Name: entry.Name, + Directory: string(parent), + Name: entry.Name, + ChunkConcurrency: chunkConcurrency, + DownloadConcurrency: downloadConcurrency, }) if cacheErr == nil && resp != nil { cachedEntry = resp.Entry diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index e17e51fe7..2f6a4bd99 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -472,6 +472,8 @@ message FilerConf { message CacheRemoteObjectToLocalClusterRequest { string directory = 1; string name = 2; + int32 chunk_concurrency = 3; // parallel chunk downloads per file, 0 = default (8) + int32 download_concurrency = 4; // multipart download concurrency per chunk (if supported by remote storage), 0 = default (5 for S3) } message CacheRemoteObjectToLocalClusterResponse { Entry entry = 1; diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index c90884b78..b6d39a3a7 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -3624,11 +3624,13 @@ func (x *FilerConf) GetLocations() []*FilerConf_PathConf { // Remote Storage related // /////////////////////// type CacheRemoteObjectToLocalClusterRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + ChunkConcurrency int32 `protobuf:"varint,3,opt,name=chunk_concurrency,json=chunkConcurrency,proto3" json:"chunk_concurrency,omitempty"` // parallel chunk downloads per file, 0 = default (8) + DownloadConcurrency int32 `protobuf:"varint,4,opt,name=download_concurrency,json=downloadConcurrency,proto3" json:"download_concurrency,omitempty"` // multipart download concurrency per chunk (if supported by remote storage), 0 = default (5 for S3) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *CacheRemoteObjectToLocalClusterRequest) Reset() { @@ -3675,6 +3677,20 @@ func (x *CacheRemoteObjectToLocalClusterRequest) GetName() string { return "" } +func (x *CacheRemoteObjectToLocalClusterRequest) GetChunkConcurrency() int32 { + if x != nil { + return x.ChunkConcurrency + } + return 0 +} + +func (x *CacheRemoteObjectToLocalClusterRequest) GetDownloadConcurrency() int32 { + if x != nil { + return x.DownloadConcurrency + } + return 0 +} + type CacheRemoteObjectToLocalClusterResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Entry *Entry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"` @@ -4785,10 +4801,12 @@ const file_filer_proto_rawDesc = "" + "\x16disable_chunk_deletion\x18\r \x01(\bR\x14disableChunkDeletion\x12\x12\n" + "\x04worm\x18\x0e \x01(\bR\x04worm\x129\n" + "\x19worm_grace_period_seconds\x18\x0f \x01(\x04R\x16wormGracePeriodSeconds\x12=\n" + - "\x1bworm_retention_time_seconds\x18\x10 \x01(\x04R\x18wormRetentionTimeSeconds\"Z\n" + + "\x1bworm_retention_time_seconds\x18\x10 \x01(\x04R\x18wormRetentionTimeSeconds\"\xba\x01\n" + "&CacheRemoteObjectToLocalClusterRequest\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12\x12\n" + - "\x04name\x18\x02 \x01(\tR\x04name\"\x9c\x01\n" + + "\x04name\x18\x02 \x01(\tR\x04name\x12+\n" + + "\x11chunk_concurrency\x18\x03 \x01(\x05R\x10chunkConcurrency\x121\n" + + "\x14download_concurrency\x18\x04 \x01(\x05R\x13downloadConcurrency\"\x9c\x01\n" + "'CacheRemoteObjectToLocalClusterResponse\x12%\n" + "\x05entry\x18\x01 \x01(\v2\x0f.filer_pb.EntryR\x05entry\x12J\n" + "\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\x9b\x01\n" + diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index c1a9282bd..c1a6243a1 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -636,6 +636,7 @@ message FetchAndWriteNeedleRequest { } repeated Replica replicas = 6; string auth = 7; + int32 download_concurrency = 8; // multipart download concurrency if supported by the remote storage client; for S3, 0 = default (5) // remote conf remote_pb.RemoteConf remote_conf = 15; remote_pb.RemoteStorageLocation remote_location = 16; diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 63c8cbc51..f96e8d308 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -5414,14 +5414,15 @@ func (*VolumeServerLeaveResponse) Descriptor() ([]byte, []int) { // remote storage type FetchAndWriteNeedleRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` - NeedleId uint64 `protobuf:"varint,2,opt,name=needle_id,json=needleId,proto3" json:"needle_id,omitempty"` - Cookie uint32 `protobuf:"varint,3,opt,name=cookie,proto3" json:"cookie,omitempty"` - Offset int64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` - Size int64 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"` - Replicas []*FetchAndWriteNeedleRequest_Replica `protobuf:"bytes,6,rep,name=replicas,proto3" json:"replicas,omitempty"` - Auth string `protobuf:"bytes,7,opt,name=auth,proto3" json:"auth,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` + NeedleId uint64 `protobuf:"varint,2,opt,name=needle_id,json=needleId,proto3" json:"needle_id,omitempty"` + Cookie uint32 `protobuf:"varint,3,opt,name=cookie,proto3" json:"cookie,omitempty"` + Offset int64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` + Size int64 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"` + Replicas []*FetchAndWriteNeedleRequest_Replica `protobuf:"bytes,6,rep,name=replicas,proto3" json:"replicas,omitempty"` + Auth string `protobuf:"bytes,7,opt,name=auth,proto3" json:"auth,omitempty"` + DownloadConcurrency int32 `protobuf:"varint,8,opt,name=download_concurrency,json=downloadConcurrency,proto3" json:"download_concurrency,omitempty"` // multipart download concurrency if supported by the remote storage client; for S3, 0 = default (5) // remote conf RemoteConf *remote_pb.RemoteConf `protobuf:"bytes,15,opt,name=remote_conf,json=remoteConf,proto3" json:"remote_conf,omitempty"` RemoteLocation *remote_pb.RemoteStorageLocation `protobuf:"bytes,16,opt,name=remote_location,json=remoteLocation,proto3" json:"remote_location,omitempty"` @@ -5508,6 +5509,13 @@ func (x *FetchAndWriteNeedleRequest) GetAuth() string { return "" } +func (x *FetchAndWriteNeedleRequest) GetDownloadConcurrency() int32 { + if x != nil { + return x.DownloadConcurrency + } + return 0 +} + func (x *FetchAndWriteNeedleRequest) GetRemoteConf() *remote_pb.RemoteConf { if x != nil { return x.RemoteConf @@ -7119,7 +7127,7 @@ const file_volume_server_proto_rawDesc = "" + "\x04rack\x18\x05 \x01(\tR\x04rack\x129\n" + "\x05state\x18\x06 \x01(\v2#.volume_server_pb.VolumeServerStateR\x05state\"\x1a\n" + "\x18VolumeServerLeaveRequest\"\x1b\n" + - "\x19VolumeServerLeaveResponse\"\xdc\x03\n" + + "\x19VolumeServerLeaveResponse\"\x8f\x04\n" + "\x1aFetchAndWriteNeedleRequest\x12\x1b\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1b\n" + "\tneedle_id\x18\x02 \x01(\x04R\bneedleId\x12\x16\n" + @@ -7127,7 +7135,8 @@ const file_volume_server_proto_rawDesc = "" + "\x06offset\x18\x04 \x01(\x03R\x06offset\x12\x12\n" + "\x04size\x18\x05 \x01(\x03R\x04size\x12P\n" + "\breplicas\x18\x06 \x03(\v24.volume_server_pb.FetchAndWriteNeedleRequest.ReplicaR\breplicas\x12\x12\n" + - "\x04auth\x18\a \x01(\tR\x04auth\x126\n" + + "\x04auth\x18\a \x01(\tR\x04auth\x121\n" + + "\x14download_concurrency\x18\b \x01(\x05R\x13downloadConcurrency\x126\n" + "\vremote_conf\x18\x0f \x01(\v2\x15.remote_pb.RemoteConfR\n" + "remoteConf\x12I\n" + "\x0fremote_location\x18\x10 \x01(\v2 .remote_pb.RemoteStorageLocationR\x0eremoteLocation\x1aW\n" + diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index 3c6bc2e6f..e23fd81df 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -89,6 +89,12 @@ type RemoteStorageClient interface { DeleteBucket(name string) (err error) } +// RemoteStorageConcurrentReader is an optional interface for remote storage clients +// that support configurable download concurrency for multipart downloads. +type RemoteStorageConcurrentReader interface { + ReadFileWithConcurrency(loc *remote_pb.RemoteStorageLocation, offset int64, size int64, concurrency int) (data []byte, err error) +} + type RemoteStorageClientMaker interface { Make(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) HasBucket() bool diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 4d12998e7..d4e00ce7f 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -217,9 +217,16 @@ func (s *s3RemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) ( } func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { + return s.ReadFileWithConcurrency(loc, offset, size, 5) +} + +func (s *s3RemoteStorageClient) ReadFileWithConcurrency(loc *remote_pb.RemoteStorageLocation, offset int64, size int64, concurrency int) (data []byte, err error) { + if concurrency <= 0 { + concurrency = 5 + } downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) { u.PartSize = int64(4 * 1024 * 1024) - u.Concurrency = 1 + u.Concurrency = concurrency }) dataSlice := make([]byte, int(size)) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 324bd7bd9..1bda6d1a0 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -91,6 +91,16 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string, return nil } +// SetChunkConcurrency replaces the chunk replication executor with one using the +// given concurrency limit. Must be called during initialization, before any +// replication goroutines start, since it replaces fs.executor without +// synchronization. +func (fs *FilerSink) SetChunkConcurrency(concurrency int) { + if concurrency > 0 { + fs.executor = util.NewLimitedConcurrentExecutor(concurrency) + } +} + func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { dir, name := util.FullPath(key).DirAndName() diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 3174c4206..fa6ad421b 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -114,12 +114,23 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re } assignRequest, altRequest := so.ToAssignRequests(1) - // find a good chunk size - chunkSize := int64(5 * 1024 * 1024) - chunkCount := entry.Remote.RemoteSize/chunkSize + 1 - for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 { - chunkSize *= 2 - chunkCount = entry.Remote.RemoteSize/chunkSize + 1 + // adaptive chunk size: target ~32 chunks per file to balance + // per-chunk overhead (volume assign, gRPC, needle write) against parallelism + chunkSize := int64(5 * 1024 * 1024) // 5MB floor + maxChunkSize := int64(fs.option.MaxMB) * 1024 * 1024 + if maxChunkSize < chunkSize { + maxChunkSize = chunkSize + } + targetChunks := int64(32) + if entry.Remote.RemoteSize/targetChunks > chunkSize { + chunkSize = entry.Remote.RemoteSize / targetChunks + if chunkSize > maxChunkSize { + chunkSize = maxChunkSize + } + } + // final safety check: ensure no more than 1000 chunks + if (entry.Remote.RemoteSize+chunkSize-1)/chunkSize > 1000 { + chunkSize = (entry.Remote.RemoteSize + 999) / 1000 } dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):]) @@ -129,7 +140,20 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re var fetchAndWriteErr error var wg sync.WaitGroup - limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) + chunkConcurrency := int(req.ChunkConcurrency) + if chunkConcurrency <= 0 { + chunkConcurrency = 8 + } else if chunkConcurrency > 1024 { + glog.V(0).Infof("capping chunkConcurrency from %d to 1024", chunkConcurrency) + chunkConcurrency = 1024 + } + downloadConcurrency := req.DownloadConcurrency + if downloadConcurrency > 1024 { + glog.V(0).Infof("capping downloadConcurrency from %d to 1024", downloadConcurrency) + downloadConcurrency = 1024 + } + + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(chunkConcurrency) for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { localOffset := offset @@ -183,14 +207,15 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re var etag string err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, fetchErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ - VolumeId: uint32(fileId.VolumeId), - NeedleId: uint64(fileId.Key), - Cookie: uint32(fileId.Cookie), - Offset: localOffset, - Size: size, - Replicas: replicas, - Auth: string(assignResult.Auth), - RemoteConf: storageConf, + VolumeId: uint32(fileId.VolumeId), + NeedleId: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + Offset: localOffset, + Size: size, + Replicas: replicas, + Auth: string(assignResult.Auth), + DownloadConcurrency: downloadConcurrency, + RemoteConf: storageConf, RemoteLocation: &remote_pb.RemoteStorageLocation{ Name: remoteStorageMountedLocation.Name, Bucket: remoteStorageMountedLocation.Bucket, diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index de562662e..3658cb256 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -34,9 +34,21 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser remoteStorageLocation := req.RemoteLocation - data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size) - if ReadRemoteErr != nil { - return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) + var data []byte + var readRemoteErr error + if cr, ok := client.(remote_storage.RemoteStorageConcurrentReader); ok { + concurrency := int(req.DownloadConcurrency) + if concurrency <= 0 { + concurrency = 0 // let the implementation choose its default + } else if concurrency > 64 { + concurrency = 64 + } + data, readRemoteErr = cr.ReadFileWithConcurrency(remoteStorageLocation, req.Offset, req.Size, concurrency) + } else { + data, readRemoteErr = client.ReadFile(remoteStorageLocation, req.Offset, req.Size) + } + if readRemoteErr != nil { + return nil, fmt.Errorf("read from remote %+v: %w", remoteStorageLocation, readRemoteErr) } var wg sync.WaitGroup diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 60ca33147..b035c69bb 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "math" "sync" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -37,7 +38,9 @@ func (c *commandRemoteCache) Help() string { remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default) remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote - remote.cache -dir=/xxx -concurrent=32 # with custom concurrency + remote.cache -dir=/xxx -concurrent=32 # with custom file-level concurrency + remote.cache -dir=/xxx -chunkConcurrency=16 # parallel chunk downloads per file (0 = server default 8) + remote.cache -dir=/xxx -downloadConcurrency=10 # S3 multipart download concurrency per chunk (0 = server default 5) remote.cache -dir=/xxx -include=*.pdf # only sync PDF files remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes @@ -64,6 +67,8 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote") deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote") concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations") + chunkConcurrency := remoteCacheCommand.Int("chunkConcurrency", 0, "parallel chunk downloads per file (0 = server default 8)") + downloadConcurrency := remoteCacheCommand.Int("downloadConcurrency", 0, "S3 multipart download concurrency per chunk (0 = server default 5)") dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes") fileFiler := newFileFilter(remoteCacheCommand) @@ -74,6 +79,12 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io if *dir == "" { return fmt.Errorf("need to specify -dir option") } + if *chunkConcurrency < 0 || *chunkConcurrency > math.MaxInt32 { + return fmt.Errorf("chunkConcurrency must be between 0 and %d", math.MaxInt32) + } + if *downloadConcurrency < 0 || *downloadConcurrency > math.MaxInt32 { + return fmt.Errorf("downloadConcurrency must be between 0 and %d", math.MaxInt32) + } mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) if detectErr != nil { @@ -82,10 +93,10 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io } // perform comprehensive sync - return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler) + return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, int32(*chunkConcurrency), int32(*downloadConcurrency), *dryRun, fileFiler) } -func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error { +func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, chunkConcurrency int32, downloadConcurrency int32, dryRun bool, fileFilter *FileFilter) error { // visit remote storage remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) @@ -306,6 +317,7 @@ func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer var wg sync.WaitGroup limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) var executionErr error + var execErrMu sync.Mutex for _, pathToCache := range filesToCache { wg.Add(1) @@ -341,15 +353,16 @@ func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer } dir, _ := util.FullPath(pathToCacheCopy).DirAndName() - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy)) fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy) - if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil { + if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, util.FullPath(dir), localEntry, chunkConcurrency, downloadConcurrency); err != nil { fmt.Fprintf(writer, "failed: %v\n", err) + execErrMu.Lock() if executionErr == nil { executionErr = err } + execErrMu.Unlock() return } fmt.Fprintf(writer, "done\n")