use one http client
This commit is contained in:
@@ -169,10 +169,13 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst
|
|||||||
if len(srcEntry.GetChunks()) > 0 {
|
if len(srcEntry.GetChunks()) > 0 {
|
||||||
srcChunks := srcEntry.GetChunks()
|
srcChunks := srcEntry.GetChunks()
|
||||||
|
|
||||||
|
// Create HTTP client once for reuse across all chunk operations
|
||||||
|
client := &http.Client{Timeout: 60 * time.Second}
|
||||||
|
|
||||||
// Check if any chunks are manifest chunks - these require special handling
|
// Check if any chunks are manifest chunks - these require special handling
|
||||||
if filer.HasChunkManifest(srcChunks) {
|
if filer.HasChunkManifest(srcChunks) {
|
||||||
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks")
|
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks")
|
||||||
newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so)
|
newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err)
|
return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err)
|
||||||
}
|
}
|
||||||
@@ -180,7 +183,7 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst
|
|||||||
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks))
|
glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks))
|
||||||
} else {
|
} else {
|
||||||
// Regular chunks without manifest - copy directly
|
// Regular chunks without manifest - copy directly
|
||||||
newChunks, err := fs.copyChunks(ctx, srcChunks, so)
|
newChunks, err := fs.copyChunks(ctx, srcChunks, so, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to copy chunks: %w", err)
|
return nil, fmt.Errorf("failed to copy chunks: %w", err)
|
||||||
}
|
}
|
||||||
@@ -199,14 +202,11 @@ func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dst
|
|||||||
}
|
}
|
||||||
|
|
||||||
// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
|
// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
|
||||||
func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
|
func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
|
||||||
if len(srcChunks) == 0 {
|
if len(srcChunks) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create HTTP client once for reuse across all chunk copies
|
|
||||||
client := &http.Client{Timeout: 60 * time.Second}
|
|
||||||
|
|
||||||
// Optimize: Batch volume lookup for all chunks to reduce RPC calls
|
// Optimize: Batch volume lookup for all chunks to reduce RPC calls
|
||||||
volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks)
|
volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -275,7 +275,7 @@ func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.Fil
|
|||||||
}
|
}
|
||||||
|
|
||||||
// copyChunksWithManifest handles copying chunks that include manifest chunks
|
// copyChunksWithManifest handles copying chunks that include manifest chunks
|
||||||
func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
|
func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
|
||||||
if len(srcChunks) == 0 {
|
if len(srcChunks) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@@ -290,7 +290,7 @@ func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*
|
|||||||
// First, copy all non-manifest chunks directly
|
// First, copy all non-manifest chunks directly
|
||||||
if len(nonManifestChunks) > 0 {
|
if len(nonManifestChunks) > 0 {
|
||||||
glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks))
|
glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks))
|
||||||
newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so)
|
newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err)
|
return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err)
|
||||||
}
|
}
|
||||||
@@ -315,13 +315,13 @@ func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*
|
|||||||
manifestChunk.GetFileIdString(), len(resolvedChunks))
|
manifestChunk.GetFileIdString(), len(resolvedChunks))
|
||||||
|
|
||||||
// Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
|
// Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
|
||||||
newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so)
|
newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err)
|
return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new manifest chunk that references the copied data chunks
|
// Create a new manifest chunk that references the copied data chunks
|
||||||
newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so)
|
newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so, client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create new manifest chunk: %w", err)
|
return nil, fmt.Errorf("failed to create new manifest chunk: %w", err)
|
||||||
}
|
}
|
||||||
@@ -339,7 +339,7 @@ func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createManifestChunk creates a new manifest chunk that references the provided data chunks
|
// createManifestChunk creates a new manifest chunk that references the provided data chunks
|
||||||
func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption) (*filer_pb.FileChunk, error) {
|
func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) (*filer_pb.FileChunk, error) {
|
||||||
// Create the manifest data structure
|
// Create the manifest data structure
|
||||||
filer_pb.BeforeEntrySerialization(dataChunks)
|
filer_pb.BeforeEntrySerialization(dataChunks)
|
||||||
|
|
||||||
@@ -353,9 +353,6 @@ func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*fi
|
|||||||
return nil, fmt.Errorf("failed to marshal manifest: %w", err)
|
return nil, fmt.Errorf("failed to marshal manifest: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create HTTP client once for reuse
|
|
||||||
client := &http.Client{Timeout: 60 * time.Second}
|
|
||||||
|
|
||||||
// Save the manifest data as a new chunk
|
// Save the manifest data as a new chunk
|
||||||
saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
|
saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
|
||||||
// Assign a new file ID
|
// Assign a new file ID
|
||||||
|
|||||||
Reference in New Issue
Block a user