improve: large file sync throughput for remote.cache and filer.sync (#8676)
* improve large file sync throughput for remote.cache and filer.sync
Three main throughput improvements:
1. Adaptive chunk sizing for remote.cache: targets ~32 chunks per file
instead of always starting at 5MB. A 500MB file now uses ~16MB chunks
(32 chunks) instead of 5MB chunks (100 chunks), reducing per-chunk
overhead (volume assign, gRPC call, needle write) by 3x.
2. Configurable concurrency at every layer:
- remote.cache chunk concurrency: -chunkConcurrency flag (default 8)
- remote.cache S3 download concurrency: -downloadConcurrency flag
(default raised from 1 to 5 per chunk)
- filer.sync chunk concurrency: -chunkConcurrency flag (default 32)
3. S3 multipart download concurrency raised from 1 to 5: the S3 manager
downloader was using Concurrency=1, serializing all part downloads
within each chunk. This alone can 5x per-chunk download speed.
The concurrency values flow through the gRPC request chain:
shell command → CacheRemoteObjectToLocalClusterRequest →
FetchAndWriteNeedleRequest → S3 downloader
Zero values in the request mean "use server defaults", maintaining
full backward compatibility with existing callers.
Ref #8481
* fix: use full maxMB for chunk size cap and remove loop guard
Address review feedback:
- Use full maxMB instead of maxMB/2 for maxChunkSize to avoid
unnecessarily limiting chunk size for very large files.
- Remove chunkSize < maxChunkSize guard from the safety loop so it
can always grow past maxChunkSize when needed to stay under 1000
chunks (e.g., extremely large files with small maxMB).
* address review feedback: help text, validation, naming, docs
- Fix help text for -chunkConcurrency and -downloadConcurrency flags
to say "0 = server default" instead of advertising specific numeric
defaults that could drift from the server implementation.
- Validate chunkConcurrency and downloadConcurrency are within int32
range before narrowing, returning a user-facing error if out of range.
- Rename ReadRemoteErr to readRemoteErr to follow Go naming conventions.
- Add doc comment to SetChunkConcurrency noting it must be called
during initialization before replication goroutines start.
- Replace doubling loop in chunk size safety check with direct
ceil(remoteSize/1000) computation to guarantee the 1000-chunk cap.
* address Copilot review: clamp concurrency, fix chunk count, clarify proto docs
- Use ceiling division for chunk count check to avoid overcounting
when file size is an exact multiple of chunk size.
- Clamp chunkConcurrency (max 1024) and downloadConcurrency (max 1024
at filer, max 64 at volume server) to prevent excessive goroutines.
- Always use ReadFileWithConcurrency when the client supports it,
falling back to the implementation's default when value is 0.
- Clarify proto comments that download_concurrency only applies when
the remote storage client supports it (currently S3).
- Include specific server defaults in help text (e.g., "0 = server
default 8") so users see the actual values in -h output.
* fix data race on executionErr and use %w for error wrapping
- Protect concurrent writes to executionErr in remote.cache worker
goroutines with a sync.Mutex to eliminate the data race.
- Use %w instead of %v in volume_grpc_remote.go error formatting
to preserve the error chain for errors.Is/errors.As callers.
This commit is contained in:
@@ -472,6 +472,8 @@ message FilerConf {
|
|||||||
message CacheRemoteObjectToLocalClusterRequest {
|
message CacheRemoteObjectToLocalClusterRequest {
|
||||||
string directory = 1;
|
string directory = 1;
|
||||||
string name = 2;
|
string name = 2;
|
||||||
|
int32 chunk_concurrency = 3; // parallel chunk downloads per file, 0 = default (8)
|
||||||
|
int32 download_concurrency = 4; // multipart download concurrency per chunk (if supported by remote storage), 0 = default (5 for S3)
|
||||||
}
|
}
|
||||||
message CacheRemoteObjectToLocalClusterResponse {
|
message CacheRemoteObjectToLocalClusterResponse {
|
||||||
Entry entry = 1;
|
Entry entry = 1;
|
||||||
|
|||||||
@@ -48,8 +48,9 @@ type SyncOptions struct {
|
|||||||
bProxyByFiler *bool
|
bProxyByFiler *bool
|
||||||
metricsHttpIp *string
|
metricsHttpIp *string
|
||||||
metricsHttpPort *int
|
metricsHttpPort *int
|
||||||
concurrency *int
|
concurrency *int
|
||||||
aDoDeleteFiles *bool
|
chunkConcurrency *int
|
||||||
|
aDoDeleteFiles *bool
|
||||||
bDoDeleteFiles *bool
|
bDoDeleteFiles *bool
|
||||||
clientId int32
|
clientId int32
|
||||||
clientEpoch atomic.Int32
|
clientEpoch atomic.Int32
|
||||||
@@ -104,6 +105,7 @@ func init() {
|
|||||||
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
|
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
|
||||||
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
|
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
|
||||||
syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
|
syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
|
||||||
|
syncOptions.chunkConcurrency = cmdFilerSynchronize.Flag.Int("chunkConcurrency", 32, "The maximum number of chunks that will be replicated concurrently per file.")
|
||||||
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
|
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||||
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
|
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
|
||||||
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
|
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
|
||||||
@@ -210,6 +212,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
|
|||||||
*syncOptions.bDiskType,
|
*syncOptions.bDiskType,
|
||||||
*syncOptions.bDebug,
|
*syncOptions.bDebug,
|
||||||
*syncOptions.concurrency,
|
*syncOptions.concurrency,
|
||||||
|
*syncOptions.chunkConcurrency,
|
||||||
*syncOptions.bDoDeleteFiles,
|
*syncOptions.bDoDeleteFiles,
|
||||||
aFilerSignature,
|
aFilerSignature,
|
||||||
bFilerSignature,
|
bFilerSignature,
|
||||||
@@ -249,6 +252,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
|
|||||||
*syncOptions.aDiskType,
|
*syncOptions.aDiskType,
|
||||||
*syncOptions.aDebug,
|
*syncOptions.aDebug,
|
||||||
*syncOptions.concurrency,
|
*syncOptions.concurrency,
|
||||||
|
*syncOptions.chunkConcurrency,
|
||||||
*syncOptions.aDoDeleteFiles,
|
*syncOptions.aDoDeleteFiles,
|
||||||
bFilerSignature,
|
bFilerSignature,
|
||||||
aFilerSignature,
|
aFilerSignature,
|
||||||
@@ -281,7 +285,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
|
|||||||
}
|
}
|
||||||
|
|
||||||
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
|
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
|
||||||
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error {
|
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error {
|
||||||
|
|
||||||
// if first time, start from now
|
// if first time, start from now
|
||||||
// if has previously synced, resume from that point of time
|
// if has previously synced, resume from that point of time
|
||||||
@@ -297,6 +301,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
|
|||||||
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
|
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
|
||||||
filerSink := &filersink.FilerSink{}
|
filerSink := &filersink.FilerSink{}
|
||||||
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
|
||||||
|
filerSink.SetChunkConcurrency(chunkConcurrency)
|
||||||
filerSink.SetSourceFiler(filerSource)
|
filerSink.SetSourceFiler(filerSource)
|
||||||
|
|
||||||
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug)
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug)
|
||||||
|
|||||||
@@ -28,13 +28,15 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot
|
|||||||
|
|
||||||
// CacheRemoteObjectToLocalCluster caches a remote object to the local cluster.
|
// CacheRemoteObjectToLocalCluster caches a remote object to the local cluster.
|
||||||
// It returns the updated entry with local chunk locations.
|
// It returns the updated entry with local chunk locations.
|
||||||
// Parameters remoteConf and remoteLocation are kept for backward compatibility but are not used.
|
// chunkConcurrency and downloadConcurrency of 0 mean use server defaults.
|
||||||
func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) (*filer_pb.Entry, error) {
|
func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, parent util.FullPath, entry *filer_pb.Entry, chunkConcurrency int32, downloadConcurrency int32) (*filer_pb.Entry, error) {
|
||||||
var cachedEntry *filer_pb.Entry
|
var cachedEntry *filer_pb.Entry
|
||||||
err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
resp, cacheErr := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
|
resp, cacheErr := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
|
||||||
Directory: string(parent),
|
Directory: string(parent),
|
||||||
Name: entry.Name,
|
Name: entry.Name,
|
||||||
|
ChunkConcurrency: chunkConcurrency,
|
||||||
|
DownloadConcurrency: downloadConcurrency,
|
||||||
})
|
})
|
||||||
if cacheErr == nil && resp != nil {
|
if cacheErr == nil && resp != nil {
|
||||||
cachedEntry = resp.Entry
|
cachedEntry = resp.Entry
|
||||||
|
|||||||
@@ -472,6 +472,8 @@ message FilerConf {
|
|||||||
message CacheRemoteObjectToLocalClusterRequest {
|
message CacheRemoteObjectToLocalClusterRequest {
|
||||||
string directory = 1;
|
string directory = 1;
|
||||||
string name = 2;
|
string name = 2;
|
||||||
|
int32 chunk_concurrency = 3; // parallel chunk downloads per file, 0 = default (8)
|
||||||
|
int32 download_concurrency = 4; // multipart download concurrency per chunk (if supported by remote storage), 0 = default (5 for S3)
|
||||||
}
|
}
|
||||||
message CacheRemoteObjectToLocalClusterResponse {
|
message CacheRemoteObjectToLocalClusterResponse {
|
||||||
Entry entry = 1;
|
Entry entry = 1;
|
||||||
|
|||||||
@@ -3624,11 +3624,13 @@ func (x *FilerConf) GetLocations() []*FilerConf_PathConf {
|
|||||||
// Remote Storage related
|
// Remote Storage related
|
||||||
// ///////////////////////
|
// ///////////////////////
|
||||||
type CacheRemoteObjectToLocalClusterRequest struct {
|
type CacheRemoteObjectToLocalClusterRequest struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"`
|
Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"`
|
||||||
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
|
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
|
||||||
unknownFields protoimpl.UnknownFields
|
ChunkConcurrency int32 `protobuf:"varint,3,opt,name=chunk_concurrency,json=chunkConcurrency,proto3" json:"chunk_concurrency,omitempty"` // parallel chunk downloads per file, 0 = default (8)
|
||||||
sizeCache protoimpl.SizeCache
|
DownloadConcurrency int32 `protobuf:"varint,4,opt,name=download_concurrency,json=downloadConcurrency,proto3" json:"download_concurrency,omitempty"` // multipart download concurrency per chunk (if supported by remote storage), 0 = default (5 for S3)
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *CacheRemoteObjectToLocalClusterRequest) Reset() {
|
func (x *CacheRemoteObjectToLocalClusterRequest) Reset() {
|
||||||
@@ -3675,6 +3677,20 @@ func (x *CacheRemoteObjectToLocalClusterRequest) GetName() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *CacheRemoteObjectToLocalClusterRequest) GetChunkConcurrency() int32 {
|
||||||
|
if x != nil {
|
||||||
|
return x.ChunkConcurrency
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *CacheRemoteObjectToLocalClusterRequest) GetDownloadConcurrency() int32 {
|
||||||
|
if x != nil {
|
||||||
|
return x.DownloadConcurrency
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
type CacheRemoteObjectToLocalClusterResponse struct {
|
type CacheRemoteObjectToLocalClusterResponse struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
Entry *Entry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
|
Entry *Entry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
|
||||||
@@ -4785,10 +4801,12 @@ const file_filer_proto_rawDesc = "" +
|
|||||||
"\x16disable_chunk_deletion\x18\r \x01(\bR\x14disableChunkDeletion\x12\x12\n" +
|
"\x16disable_chunk_deletion\x18\r \x01(\bR\x14disableChunkDeletion\x12\x12\n" +
|
||||||
"\x04worm\x18\x0e \x01(\bR\x04worm\x129\n" +
|
"\x04worm\x18\x0e \x01(\bR\x04worm\x129\n" +
|
||||||
"\x19worm_grace_period_seconds\x18\x0f \x01(\x04R\x16wormGracePeriodSeconds\x12=\n" +
|
"\x19worm_grace_period_seconds\x18\x0f \x01(\x04R\x16wormGracePeriodSeconds\x12=\n" +
|
||||||
"\x1bworm_retention_time_seconds\x18\x10 \x01(\x04R\x18wormRetentionTimeSeconds\"Z\n" +
|
"\x1bworm_retention_time_seconds\x18\x10 \x01(\x04R\x18wormRetentionTimeSeconds\"\xba\x01\n" +
|
||||||
"&CacheRemoteObjectToLocalClusterRequest\x12\x1c\n" +
|
"&CacheRemoteObjectToLocalClusterRequest\x12\x1c\n" +
|
||||||
"\tdirectory\x18\x01 \x01(\tR\tdirectory\x12\x12\n" +
|
"\tdirectory\x18\x01 \x01(\tR\tdirectory\x12\x12\n" +
|
||||||
"\x04name\x18\x02 \x01(\tR\x04name\"\x9c\x01\n" +
|
"\x04name\x18\x02 \x01(\tR\x04name\x12+\n" +
|
||||||
|
"\x11chunk_concurrency\x18\x03 \x01(\x05R\x10chunkConcurrency\x121\n" +
|
||||||
|
"\x14download_concurrency\x18\x04 \x01(\x05R\x13downloadConcurrency\"\x9c\x01\n" +
|
||||||
"'CacheRemoteObjectToLocalClusterResponse\x12%\n" +
|
"'CacheRemoteObjectToLocalClusterResponse\x12%\n" +
|
||||||
"\x05entry\x18\x01 \x01(\v2\x0f.filer_pb.EntryR\x05entry\x12J\n" +
|
"\x05entry\x18\x01 \x01(\v2\x0f.filer_pb.EntryR\x05entry\x12J\n" +
|
||||||
"\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\x9b\x01\n" +
|
"\x0emetadata_event\x18\x02 \x01(\v2#.filer_pb.SubscribeMetadataResponseR\rmetadataEvent\"\x9b\x01\n" +
|
||||||
|
|||||||
@@ -636,6 +636,7 @@ message FetchAndWriteNeedleRequest {
|
|||||||
}
|
}
|
||||||
repeated Replica replicas = 6;
|
repeated Replica replicas = 6;
|
||||||
string auth = 7;
|
string auth = 7;
|
||||||
|
int32 download_concurrency = 8; // multipart download concurrency if supported by the remote storage client; for S3, 0 = default (5)
|
||||||
// remote conf
|
// remote conf
|
||||||
remote_pb.RemoteConf remote_conf = 15;
|
remote_pb.RemoteConf remote_conf = 15;
|
||||||
remote_pb.RemoteStorageLocation remote_location = 16;
|
remote_pb.RemoteStorageLocation remote_location = 16;
|
||||||
|
|||||||
@@ -5414,14 +5414,15 @@ func (*VolumeServerLeaveResponse) Descriptor() ([]byte, []int) {
|
|||||||
|
|
||||||
// remote storage
|
// remote storage
|
||||||
type FetchAndWriteNeedleRequest struct {
|
type FetchAndWriteNeedleRequest struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
|
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
|
||||||
NeedleId uint64 `protobuf:"varint,2,opt,name=needle_id,json=needleId,proto3" json:"needle_id,omitempty"`
|
NeedleId uint64 `protobuf:"varint,2,opt,name=needle_id,json=needleId,proto3" json:"needle_id,omitempty"`
|
||||||
Cookie uint32 `protobuf:"varint,3,opt,name=cookie,proto3" json:"cookie,omitempty"`
|
Cookie uint32 `protobuf:"varint,3,opt,name=cookie,proto3" json:"cookie,omitempty"`
|
||||||
Offset int64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"`
|
Offset int64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"`
|
||||||
Size int64 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"`
|
Size int64 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"`
|
||||||
Replicas []*FetchAndWriteNeedleRequest_Replica `protobuf:"bytes,6,rep,name=replicas,proto3" json:"replicas,omitempty"`
|
Replicas []*FetchAndWriteNeedleRequest_Replica `protobuf:"bytes,6,rep,name=replicas,proto3" json:"replicas,omitempty"`
|
||||||
Auth string `protobuf:"bytes,7,opt,name=auth,proto3" json:"auth,omitempty"`
|
Auth string `protobuf:"bytes,7,opt,name=auth,proto3" json:"auth,omitempty"`
|
||||||
|
DownloadConcurrency int32 `protobuf:"varint,8,opt,name=download_concurrency,json=downloadConcurrency,proto3" json:"download_concurrency,omitempty"` // multipart download concurrency if supported by the remote storage client; for S3, 0 = default (5)
|
||||||
// remote conf
|
// remote conf
|
||||||
RemoteConf *remote_pb.RemoteConf `protobuf:"bytes,15,opt,name=remote_conf,json=remoteConf,proto3" json:"remote_conf,omitempty"`
|
RemoteConf *remote_pb.RemoteConf `protobuf:"bytes,15,opt,name=remote_conf,json=remoteConf,proto3" json:"remote_conf,omitempty"`
|
||||||
RemoteLocation *remote_pb.RemoteStorageLocation `protobuf:"bytes,16,opt,name=remote_location,json=remoteLocation,proto3" json:"remote_location,omitempty"`
|
RemoteLocation *remote_pb.RemoteStorageLocation `protobuf:"bytes,16,opt,name=remote_location,json=remoteLocation,proto3" json:"remote_location,omitempty"`
|
||||||
@@ -5508,6 +5509,13 @@ func (x *FetchAndWriteNeedleRequest) GetAuth() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *FetchAndWriteNeedleRequest) GetDownloadConcurrency() int32 {
|
||||||
|
if x != nil {
|
||||||
|
return x.DownloadConcurrency
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
func (x *FetchAndWriteNeedleRequest) GetRemoteConf() *remote_pb.RemoteConf {
|
func (x *FetchAndWriteNeedleRequest) GetRemoteConf() *remote_pb.RemoteConf {
|
||||||
if x != nil {
|
if x != nil {
|
||||||
return x.RemoteConf
|
return x.RemoteConf
|
||||||
@@ -7119,7 +7127,7 @@ const file_volume_server_proto_rawDesc = "" +
|
|||||||
"\x04rack\x18\x05 \x01(\tR\x04rack\x129\n" +
|
"\x04rack\x18\x05 \x01(\tR\x04rack\x129\n" +
|
||||||
"\x05state\x18\x06 \x01(\v2#.volume_server_pb.VolumeServerStateR\x05state\"\x1a\n" +
|
"\x05state\x18\x06 \x01(\v2#.volume_server_pb.VolumeServerStateR\x05state\"\x1a\n" +
|
||||||
"\x18VolumeServerLeaveRequest\"\x1b\n" +
|
"\x18VolumeServerLeaveRequest\"\x1b\n" +
|
||||||
"\x19VolumeServerLeaveResponse\"\xdc\x03\n" +
|
"\x19VolumeServerLeaveResponse\"\x8f\x04\n" +
|
||||||
"\x1aFetchAndWriteNeedleRequest\x12\x1b\n" +
|
"\x1aFetchAndWriteNeedleRequest\x12\x1b\n" +
|
||||||
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1b\n" +
|
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1b\n" +
|
||||||
"\tneedle_id\x18\x02 \x01(\x04R\bneedleId\x12\x16\n" +
|
"\tneedle_id\x18\x02 \x01(\x04R\bneedleId\x12\x16\n" +
|
||||||
@@ -7127,7 +7135,8 @@ const file_volume_server_proto_rawDesc = "" +
|
|||||||
"\x06offset\x18\x04 \x01(\x03R\x06offset\x12\x12\n" +
|
"\x06offset\x18\x04 \x01(\x03R\x06offset\x12\x12\n" +
|
||||||
"\x04size\x18\x05 \x01(\x03R\x04size\x12P\n" +
|
"\x04size\x18\x05 \x01(\x03R\x04size\x12P\n" +
|
||||||
"\breplicas\x18\x06 \x03(\v24.volume_server_pb.FetchAndWriteNeedleRequest.ReplicaR\breplicas\x12\x12\n" +
|
"\breplicas\x18\x06 \x03(\v24.volume_server_pb.FetchAndWriteNeedleRequest.ReplicaR\breplicas\x12\x12\n" +
|
||||||
"\x04auth\x18\a \x01(\tR\x04auth\x126\n" +
|
"\x04auth\x18\a \x01(\tR\x04auth\x121\n" +
|
||||||
|
"\x14download_concurrency\x18\b \x01(\x05R\x13downloadConcurrency\x126\n" +
|
||||||
"\vremote_conf\x18\x0f \x01(\v2\x15.remote_pb.RemoteConfR\n" +
|
"\vremote_conf\x18\x0f \x01(\v2\x15.remote_pb.RemoteConfR\n" +
|
||||||
"remoteConf\x12I\n" +
|
"remoteConf\x12I\n" +
|
||||||
"\x0fremote_location\x18\x10 \x01(\v2 .remote_pb.RemoteStorageLocationR\x0eremoteLocation\x1aW\n" +
|
"\x0fremote_location\x18\x10 \x01(\v2 .remote_pb.RemoteStorageLocationR\x0eremoteLocation\x1aW\n" +
|
||||||
|
|||||||
@@ -89,6 +89,12 @@ type RemoteStorageClient interface {
|
|||||||
DeleteBucket(name string) (err error)
|
DeleteBucket(name string) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoteStorageConcurrentReader is an optional interface for remote storage clients
|
||||||
|
// that support configurable download concurrency for multipart downloads.
|
||||||
|
type RemoteStorageConcurrentReader interface {
|
||||||
|
ReadFileWithConcurrency(loc *remote_pb.RemoteStorageLocation, offset int64, size int64, concurrency int) (data []byte, err error)
|
||||||
|
}
|
||||||
|
|
||||||
type RemoteStorageClientMaker interface {
|
type RemoteStorageClientMaker interface {
|
||||||
Make(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error)
|
Make(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error)
|
||||||
HasBucket() bool
|
HasBucket() bool
|
||||||
|
|||||||
@@ -217,9 +217,16 @@ func (s *s3RemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
|
func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
|
||||||
|
return s.ReadFileWithConcurrency(loc, offset, size, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *s3RemoteStorageClient) ReadFileWithConcurrency(loc *remote_pb.RemoteStorageLocation, offset int64, size int64, concurrency int) (data []byte, err error) {
|
||||||
|
if concurrency <= 0 {
|
||||||
|
concurrency = 5
|
||||||
|
}
|
||||||
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
|
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
|
||||||
u.PartSize = int64(4 * 1024 * 1024)
|
u.PartSize = int64(4 * 1024 * 1024)
|
||||||
u.Concurrency = 1
|
u.Concurrency = concurrency
|
||||||
})
|
})
|
||||||
|
|
||||||
dataSlice := make([]byte, int(size))
|
dataSlice := make([]byte, int(size))
|
||||||
|
|||||||
@@ -91,6 +91,16 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetChunkConcurrency replaces the chunk replication executor with one using the
|
||||||
|
// given concurrency limit. Must be called during initialization, before any
|
||||||
|
// replication goroutines start, since it replaces fs.executor without
|
||||||
|
// synchronization.
|
||||||
|
func (fs *FilerSink) SetChunkConcurrency(concurrency int) {
|
||||||
|
if concurrency > 0 {
|
||||||
|
fs.executor = util.NewLimitedConcurrentExecutor(concurrency)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
|
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
|
||||||
|
|
||||||
dir, name := util.FullPath(key).DirAndName()
|
dir, name := util.FullPath(key).DirAndName()
|
||||||
|
|||||||
@@ -114,12 +114,23 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re
|
|||||||
}
|
}
|
||||||
assignRequest, altRequest := so.ToAssignRequests(1)
|
assignRequest, altRequest := so.ToAssignRequests(1)
|
||||||
|
|
||||||
// find a good chunk size
|
// adaptive chunk size: target ~32 chunks per file to balance
|
||||||
chunkSize := int64(5 * 1024 * 1024)
|
// per-chunk overhead (volume assign, gRPC, needle write) against parallelism
|
||||||
chunkCount := entry.Remote.RemoteSize/chunkSize + 1
|
chunkSize := int64(5 * 1024 * 1024) // 5MB floor
|
||||||
for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
|
maxChunkSize := int64(fs.option.MaxMB) * 1024 * 1024
|
||||||
chunkSize *= 2
|
if maxChunkSize < chunkSize {
|
||||||
chunkCount = entry.Remote.RemoteSize/chunkSize + 1
|
maxChunkSize = chunkSize
|
||||||
|
}
|
||||||
|
targetChunks := int64(32)
|
||||||
|
if entry.Remote.RemoteSize/targetChunks > chunkSize {
|
||||||
|
chunkSize = entry.Remote.RemoteSize / targetChunks
|
||||||
|
if chunkSize > maxChunkSize {
|
||||||
|
chunkSize = maxChunkSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// final safety check: ensure no more than 1000 chunks
|
||||||
|
if (entry.Remote.RemoteSize+chunkSize-1)/chunkSize > 1000 {
|
||||||
|
chunkSize = (entry.Remote.RemoteSize + 999) / 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
|
dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
|
||||||
@@ -129,7 +140,20 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re
|
|||||||
var fetchAndWriteErr error
|
var fetchAndWriteErr error
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
|
chunkConcurrency := int(req.ChunkConcurrency)
|
||||||
|
if chunkConcurrency <= 0 {
|
||||||
|
chunkConcurrency = 8
|
||||||
|
} else if chunkConcurrency > 1024 {
|
||||||
|
glog.V(0).Infof("capping chunkConcurrency from %d to 1024", chunkConcurrency)
|
||||||
|
chunkConcurrency = 1024
|
||||||
|
}
|
||||||
|
downloadConcurrency := req.DownloadConcurrency
|
||||||
|
if downloadConcurrency > 1024 {
|
||||||
|
glog.V(0).Infof("capping downloadConcurrency from %d to 1024", downloadConcurrency)
|
||||||
|
downloadConcurrency = 1024
|
||||||
|
}
|
||||||
|
|
||||||
|
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(chunkConcurrency)
|
||||||
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
|
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
|
||||||
localOffset := offset
|
localOffset := offset
|
||||||
|
|
||||||
@@ -183,14 +207,15 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re
|
|||||||
var etag string
|
var etag string
|
||||||
err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||||
resp, fetchErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
|
resp, fetchErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
|
||||||
VolumeId: uint32(fileId.VolumeId),
|
VolumeId: uint32(fileId.VolumeId),
|
||||||
NeedleId: uint64(fileId.Key),
|
NeedleId: uint64(fileId.Key),
|
||||||
Cookie: uint32(fileId.Cookie),
|
Cookie: uint32(fileId.Cookie),
|
||||||
Offset: localOffset,
|
Offset: localOffset,
|
||||||
Size: size,
|
Size: size,
|
||||||
Replicas: replicas,
|
Replicas: replicas,
|
||||||
Auth: string(assignResult.Auth),
|
Auth: string(assignResult.Auth),
|
||||||
RemoteConf: storageConf,
|
DownloadConcurrency: downloadConcurrency,
|
||||||
|
RemoteConf: storageConf,
|
||||||
RemoteLocation: &remote_pb.RemoteStorageLocation{
|
RemoteLocation: &remote_pb.RemoteStorageLocation{
|
||||||
Name: remoteStorageMountedLocation.Name,
|
Name: remoteStorageMountedLocation.Name,
|
||||||
Bucket: remoteStorageMountedLocation.Bucket,
|
Bucket: remoteStorageMountedLocation.Bucket,
|
||||||
|
|||||||
@@ -34,9 +34,21 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
|
|||||||
|
|
||||||
remoteStorageLocation := req.RemoteLocation
|
remoteStorageLocation := req.RemoteLocation
|
||||||
|
|
||||||
data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
|
var data []byte
|
||||||
if ReadRemoteErr != nil {
|
var readRemoteErr error
|
||||||
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
|
if cr, ok := client.(remote_storage.RemoteStorageConcurrentReader); ok {
|
||||||
|
concurrency := int(req.DownloadConcurrency)
|
||||||
|
if concurrency <= 0 {
|
||||||
|
concurrency = 0 // let the implementation choose its default
|
||||||
|
} else if concurrency > 64 {
|
||||||
|
concurrency = 64
|
||||||
|
}
|
||||||
|
data, readRemoteErr = cr.ReadFileWithConcurrency(remoteStorageLocation, req.Offset, req.Size, concurrency)
|
||||||
|
} else {
|
||||||
|
data, readRemoteErr = client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
|
||||||
|
}
|
||||||
|
if readRemoteErr != nil {
|
||||||
|
return nil, fmt.Errorf("read from remote %+v: %w", remoteStorageLocation, readRemoteErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
@@ -37,7 +38,9 @@ func (c *commandRemoteCache) Help() string {
|
|||||||
remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
|
remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
|
||||||
remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
|
remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
|
||||||
remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
|
remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
|
||||||
remote.cache -dir=/xxx -concurrent=32 # with custom concurrency
|
remote.cache -dir=/xxx -concurrent=32 # with custom file-level concurrency
|
||||||
|
remote.cache -dir=/xxx -chunkConcurrency=16 # parallel chunk downloads per file (0 = server default 8)
|
||||||
|
remote.cache -dir=/xxx -downloadConcurrency=10 # S3 multipart download concurrency per chunk (0 = server default 5)
|
||||||
remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
|
remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
|
||||||
remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
|
remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
|
||||||
remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
|
remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
|
||||||
@@ -64,6 +67,8 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
|
|||||||
cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
|
cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
|
||||||
deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
|
deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
|
||||||
concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
|
concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
|
||||||
|
chunkConcurrency := remoteCacheCommand.Int("chunkConcurrency", 0, "parallel chunk downloads per file (0 = server default 8)")
|
||||||
|
downloadConcurrency := remoteCacheCommand.Int("downloadConcurrency", 0, "S3 multipart download concurrency per chunk (0 = server default 5)")
|
||||||
dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
|
dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
|
||||||
fileFiler := newFileFilter(remoteCacheCommand)
|
fileFiler := newFileFilter(remoteCacheCommand)
|
||||||
|
|
||||||
@@ -74,6 +79,12 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
|
|||||||
if *dir == "" {
|
if *dir == "" {
|
||||||
return fmt.Errorf("need to specify -dir option")
|
return fmt.Errorf("need to specify -dir option")
|
||||||
}
|
}
|
||||||
|
if *chunkConcurrency < 0 || *chunkConcurrency > math.MaxInt32 {
|
||||||
|
return fmt.Errorf("chunkConcurrency must be between 0 and %d", math.MaxInt32)
|
||||||
|
}
|
||||||
|
if *downloadConcurrency < 0 || *downloadConcurrency > math.MaxInt32 {
|
||||||
|
return fmt.Errorf("downloadConcurrency must be between 0 and %d", math.MaxInt32)
|
||||||
|
}
|
||||||
|
|
||||||
mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
|
mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
|
||||||
if detectErr != nil {
|
if detectErr != nil {
|
||||||
@@ -82,10 +93,10 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
|
|||||||
}
|
}
|
||||||
|
|
||||||
// perform comprehensive sync
|
// perform comprehensive sync
|
||||||
return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler)
|
return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, int32(*chunkConcurrency), int32(*downloadConcurrency), *dryRun, fileFiler)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error {
|
func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, chunkConcurrency int32, downloadConcurrency int32, dryRun bool, fileFilter *FileFilter) error {
|
||||||
|
|
||||||
// visit remote storage
|
// visit remote storage
|
||||||
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
|
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
|
||||||
@@ -306,6 +317,7 @@ func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
|
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
|
||||||
var executionErr error
|
var executionErr error
|
||||||
|
var execErrMu sync.Mutex
|
||||||
|
|
||||||
for _, pathToCache := range filesToCache {
|
for _, pathToCache := range filesToCache {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@@ -341,15 +353,16 @@ func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer
|
|||||||
}
|
}
|
||||||
|
|
||||||
dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
|
dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
|
||||||
remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy))
|
|
||||||
|
|
||||||
fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
|
fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
|
||||||
|
|
||||||
if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil {
|
if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, util.FullPath(dir), localEntry, chunkConcurrency, downloadConcurrency); err != nil {
|
||||||
fmt.Fprintf(writer, "failed: %v\n", err)
|
fmt.Fprintf(writer, "failed: %v\n", err)
|
||||||
|
execErrMu.Lock()
|
||||||
if executionErr == nil {
|
if executionErr == nil {
|
||||||
executionErr = err
|
executionErr = err
|
||||||
}
|
}
|
||||||
|
execErrMu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Fprintf(writer, "done\n")
|
fmt.Fprintf(writer, "done\n")
|
||||||
|
|||||||
Reference in New Issue
Block a user