* S3: Fix encrypted file copy with multiple chunks (#7530) When copying encrypted files with multiple chunks (encrypted volumes via -filer.encryptVolumeData), the copied file could not be read. This was caused by the chunk copy operation not preserving the IsCompressed flag, which led to improper handling of compressed/encrypted data during upload. The fix: 1. Modified uploadChunkData to accept an isCompressed parameter 2. Updated copySingleChunk to pass the source chunk's IsCompressed flag 3. Updated copySingleChunkForRange for partial copy operations 4. Updated all other callers to pass the appropriate compression flag 5. Added comprehensive tests for encrypted volume copy scenarios This ensures that when copying chunks: - The IsCompressed flag from the source chunk is passed to the upload - Compressed data is marked as compressed, preventing double-compression - Already-encrypted data is not re-encrypted (Cipher: false is correct) - All chunk metadata (CipherKey, IsCompressed, ETag) is preserved Tests added: - TestCreateDestinationChunkPreservesEncryption: Verifies metadata preservation - TestCopySingleChunkWithEncryption: Tests various encryption/compression scenarios - TestCopyChunksPreservesMetadata: Tests multi-chunk metadata preservation - TestEncryptedVolumeScenario: Documents and tests the exact issue #7530 scenario Fixes #7530 * Address PR review feedback: simplify tests and improve clarity - Removed TestUploadChunkDataCompressionFlag (panic-based test) - Removed TestCopySingleChunkWithEncryption (duplicate coverage) - Removed TestCopyChunksPreservesMetadata (duplicate coverage) - Added ETag verification to TestEncryptedVolumeCopyScenario - Renamed to TestEncryptedVolumeCopyScenario for better clarity - All test coverage now in TestCreateDestinationChunkPreservesEncryption and TestEncryptedVolumeCopyScenario which focus on the actual behavior
560 lines
17 KiB
Go
560 lines
17 KiB
Go
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"crypto/md5"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"net/http"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// StreamingCopySpec defines the specification for streaming copy operations
|
|
type StreamingCopySpec struct {
|
|
SourceReader io.Reader
|
|
TargetSize int64
|
|
EncryptionSpec *EncryptionSpec
|
|
CompressionSpec *CompressionSpec
|
|
HashCalculation bool
|
|
BufferSize int
|
|
}
|
|
|
|
// EncryptionSpec defines encryption parameters for streaming
|
|
type EncryptionSpec struct {
|
|
NeedsDecryption bool
|
|
NeedsEncryption bool
|
|
SourceKey interface{} // SSECustomerKey or SSEKMSKey
|
|
DestinationKey interface{} // SSECustomerKey or SSEKMSKey
|
|
SourceType EncryptionType
|
|
DestinationType EncryptionType
|
|
SourceMetadata map[string][]byte // Source metadata for IV extraction
|
|
DestinationIV []byte // Generated IV for destination
|
|
}
|
|
|
|
// CompressionSpec defines compression parameters for streaming
|
|
type CompressionSpec struct {
|
|
IsCompressed bool
|
|
CompressionType string
|
|
NeedsDecompression bool
|
|
NeedsCompression bool
|
|
}
|
|
|
|
// StreamingCopyManager handles streaming copy operations
|
|
type StreamingCopyManager struct {
|
|
s3a *S3ApiServer
|
|
bufferSize int
|
|
}
|
|
|
|
// NewStreamingCopyManager creates a new streaming copy manager
|
|
func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
|
|
return &StreamingCopyManager{
|
|
s3a: s3a,
|
|
bufferSize: 64 * 1024, // 64KB default buffer
|
|
}
|
|
}
|
|
|
|
// ExecuteStreamingCopy performs a streaming copy operation
|
|
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
|
|
// Create streaming copy specification
|
|
spec, err := scm.createStreamingSpec(entry, r, state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create streaming spec: %w", err)
|
|
}
|
|
|
|
// Create source reader from entry
|
|
sourceReader, err := scm.createSourceReader(entry)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create source reader: %w", err)
|
|
}
|
|
defer sourceReader.Close()
|
|
|
|
spec.SourceReader = sourceReader
|
|
|
|
// Create processing pipeline
|
|
processedReader, err := scm.createProcessingPipeline(spec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create processing pipeline: %w", err)
|
|
}
|
|
|
|
// Stream to destination
|
|
return scm.streamToDestination(ctx, processedReader, spec, dstPath)
|
|
}
|
|
|
|
// createStreamingSpec creates a streaming specification based on copy parameters
|
|
func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
|
|
spec := &StreamingCopySpec{
|
|
BufferSize: scm.bufferSize,
|
|
HashCalculation: true,
|
|
}
|
|
|
|
// Calculate target size
|
|
sizeCalc := NewCopySizeCalculator(entry, r)
|
|
spec.TargetSize = sizeCalc.CalculateTargetSize()
|
|
|
|
// Create encryption specification
|
|
encSpec, err := scm.createEncryptionSpec(entry, r, state)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
spec.EncryptionSpec = encSpec
|
|
|
|
// Create compression specification
|
|
spec.CompressionSpec = scm.createCompressionSpec(entry, r)
|
|
|
|
return spec, nil
|
|
}
|
|
|
|
// createEncryptionSpec creates encryption specification for streaming
|
|
func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
|
|
spec := &EncryptionSpec{
|
|
NeedsDecryption: state.IsSourceEncrypted(),
|
|
NeedsEncryption: state.IsTargetEncrypted(),
|
|
SourceMetadata: entry.Extended, // Pass source metadata for IV extraction
|
|
}
|
|
|
|
// Set source encryption details
|
|
if state.SrcSSEC {
|
|
spec.SourceType = EncryptionTypeSSEC
|
|
sourceKey, err := ParseSSECCopySourceHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
|
|
}
|
|
spec.SourceKey = sourceKey
|
|
} else if state.SrcSSEKMS {
|
|
spec.SourceType = EncryptionTypeSSEKMS
|
|
// Extract SSE-KMS key from metadata
|
|
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
|
|
sseKey, err := DeserializeSSEKMSMetadata(keyData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
|
|
}
|
|
spec.SourceKey = sseKey
|
|
}
|
|
} else if state.SrcSSES3 {
|
|
spec.SourceType = EncryptionTypeSSES3
|
|
// Extract SSE-S3 key from metadata
|
|
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
|
|
keyManager := GetSSES3KeyManager()
|
|
sseKey, err := DeserializeSSES3Metadata(keyData, keyManager)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
|
|
}
|
|
spec.SourceKey = sseKey
|
|
}
|
|
}
|
|
|
|
// Set destination encryption details
|
|
if state.DstSSEC {
|
|
spec.DestinationType = EncryptionTypeSSEC
|
|
destKey, err := ParseSSECHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse SSE-C headers: %w", err)
|
|
}
|
|
spec.DestinationKey = destKey
|
|
} else if state.DstSSEKMS {
|
|
spec.DestinationType = EncryptionTypeSSEKMS
|
|
// Parse KMS parameters
|
|
keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
|
|
}
|
|
|
|
// Create SSE-KMS key for destination
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: keyID,
|
|
EncryptionContext: encryptionContext,
|
|
BucketKeyEnabled: bucketKeyEnabled,
|
|
}
|
|
spec.DestinationKey = sseKey
|
|
} else if state.DstSSES3 {
|
|
spec.DestinationType = EncryptionTypeSSES3
|
|
// Generate or retrieve SSE-S3 key
|
|
keyManager := GetSSES3KeyManager()
|
|
sseKey, err := keyManager.GetOrCreateKey("")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get SSE-S3 key: %w", err)
|
|
}
|
|
spec.DestinationKey = sseKey
|
|
}
|
|
|
|
return spec, nil
|
|
}
|
|
|
|
// createCompressionSpec creates compression specification for streaming
|
|
func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
|
|
return &CompressionSpec{
|
|
IsCompressed: isCompressedEntry(entry),
|
|
// For now, we don't change compression during copy
|
|
NeedsDecompression: false,
|
|
NeedsCompression: false,
|
|
}
|
|
}
|
|
|
|
// createSourceReader creates a reader for the source entry
|
|
func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
|
|
// Create a multi-chunk reader that streams from all chunks
|
|
return scm.s3a.createMultiChunkReader(entry)
|
|
}
|
|
|
|
// createProcessingPipeline creates a processing pipeline for the copy operation
|
|
func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
|
|
reader := spec.SourceReader
|
|
|
|
// Add decryption if needed
|
|
if spec.EncryptionSpec.NeedsDecryption {
|
|
decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create decryption reader: %w", err)
|
|
}
|
|
reader = decryptedReader
|
|
}
|
|
|
|
// Add decompression if needed
|
|
if spec.CompressionSpec.NeedsDecompression {
|
|
decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create decompression reader: %w", err)
|
|
}
|
|
reader = decompressedReader
|
|
}
|
|
|
|
// Add compression if needed
|
|
if spec.CompressionSpec.NeedsCompression {
|
|
compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create compression reader: %w", err)
|
|
}
|
|
reader = compressedReader
|
|
}
|
|
|
|
// Add encryption if needed
|
|
if spec.EncryptionSpec.NeedsEncryption {
|
|
encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create encryption reader: %w", err)
|
|
}
|
|
reader = encryptedReader
|
|
}
|
|
|
|
// Add hash calculation if needed
|
|
if spec.HashCalculation {
|
|
reader = scm.createHashReader(reader)
|
|
}
|
|
|
|
return reader, nil
|
|
}
|
|
|
|
// createDecryptionReader creates a decryption reader based on encryption type
|
|
func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
|
|
switch encSpec.SourceType {
|
|
case EncryptionTypeSSEC:
|
|
if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
|
|
// Get IV from metadata
|
|
iv, err := GetSSECIVFromMetadata(encSpec.SourceMetadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get IV from metadata: %w", err)
|
|
}
|
|
return CreateSSECDecryptedReader(reader, sourceKey, iv)
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-C source key type")
|
|
|
|
case EncryptionTypeSSEKMS:
|
|
if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
|
|
return CreateSSEKMSDecryptedReader(reader, sseKey)
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-KMS source key type")
|
|
|
|
case EncryptionTypeSSES3:
|
|
if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
|
|
// For SSE-S3, the IV is stored within the SSES3Key metadata, not as separate metadata
|
|
iv := sseKey.IV
|
|
if len(iv) == 0 {
|
|
return nil, fmt.Errorf("SSE-S3 key is missing IV for streaming copy")
|
|
}
|
|
return CreateSSES3DecryptedReader(reader, sseKey, iv)
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-S3 source key type")
|
|
|
|
default:
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// createEncryptionReader creates an encryption reader based on encryption type
|
|
func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
|
|
switch encSpec.DestinationType {
|
|
case EncryptionTypeSSEC:
|
|
if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
|
|
encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store IV in destination metadata (this would need to be handled by caller)
|
|
encSpec.DestinationIV = iv
|
|
return encryptedReader, nil
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-C destination key type")
|
|
|
|
case EncryptionTypeSSEKMS:
|
|
if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
|
|
encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store IV from the updated key
|
|
encSpec.DestinationIV = updatedKey.IV
|
|
return encryptedReader, nil
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-KMS destination key type")
|
|
|
|
case EncryptionTypeSSES3:
|
|
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
|
|
encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store IV for metadata
|
|
encSpec.DestinationIV = iv
|
|
return encryptedReader, nil
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-S3 destination key type")
|
|
|
|
default:
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// createDecompressionReader creates a decompression reader
|
|
func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
|
|
if !compSpec.NeedsDecompression {
|
|
return reader, nil
|
|
}
|
|
|
|
switch compSpec.CompressionType {
|
|
case "gzip":
|
|
// Use SeaweedFS's streaming gzip decompression
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
defer pw.Close()
|
|
_, err := util.GunzipStream(pw, reader)
|
|
if err != nil {
|
|
pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
|
|
}
|
|
}()
|
|
return pr, nil
|
|
default:
|
|
// Unknown compression type, return as-is
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// createCompressionReader creates a compression reader
|
|
func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
|
|
if !compSpec.NeedsCompression {
|
|
return reader, nil
|
|
}
|
|
|
|
switch compSpec.CompressionType {
|
|
case "gzip":
|
|
// Use SeaweedFS's streaming gzip compression
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
defer pw.Close()
|
|
_, err := util.GzipStream(pw, reader)
|
|
if err != nil {
|
|
pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
|
|
}
|
|
}()
|
|
return pr, nil
|
|
default:
|
|
// Unknown compression type, return as-is
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
|
|
type HashReader struct {
|
|
reader io.Reader
|
|
md5Hash hash.Hash
|
|
sha256Hash hash.Hash
|
|
}
|
|
|
|
// NewHashReader creates a new hash calculating reader
|
|
func NewHashReader(reader io.Reader) *HashReader {
|
|
return &HashReader{
|
|
reader: reader,
|
|
md5Hash: md5.New(),
|
|
sha256Hash: sha256.New(),
|
|
}
|
|
}
|
|
|
|
// Read implements io.Reader and calculates hashes as data flows through
|
|
func (hr *HashReader) Read(p []byte) (n int, err error) {
|
|
n, err = hr.reader.Read(p)
|
|
if n > 0 {
|
|
// Update both hashes with the data read
|
|
hr.md5Hash.Write(p[:n])
|
|
hr.sha256Hash.Write(p[:n])
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// MD5Sum returns the current MD5 hash
|
|
func (hr *HashReader) MD5Sum() []byte {
|
|
return hr.md5Hash.Sum(nil)
|
|
}
|
|
|
|
// SHA256Sum returns the current SHA256 hash
|
|
func (hr *HashReader) SHA256Sum() []byte {
|
|
return hr.sha256Hash.Sum(nil)
|
|
}
|
|
|
|
// MD5Hex returns the MD5 hash as a hex string
|
|
func (hr *HashReader) MD5Hex() string {
|
|
return hex.EncodeToString(hr.MD5Sum())
|
|
}
|
|
|
|
// SHA256Hex returns the SHA256 hash as a hex string
|
|
func (hr *HashReader) SHA256Hex() string {
|
|
return hex.EncodeToString(hr.SHA256Sum())
|
|
}
|
|
|
|
// createHashReader creates a hash calculation reader
|
|
func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
|
|
return NewHashReader(reader)
|
|
}
|
|
|
|
// streamToDestination streams the processed data to the destination
|
|
func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
|
|
// For now, we'll use the existing chunk-based approach
|
|
// In a full implementation, this would stream directly to the destination
|
|
// without creating intermediate chunks
|
|
|
|
// This is a placeholder that converts back to chunk-based approach
|
|
// A full streaming implementation would write directly to the destination
|
|
return scm.streamToChunks(ctx, reader, spec, dstPath)
|
|
}
|
|
|
|
// streamToChunks converts streaming data back to chunks (temporary implementation)
|
|
func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
|
|
// This is a simplified implementation that reads the stream and creates chunks
|
|
// A full implementation would be more sophisticated
|
|
|
|
var chunks []*filer_pb.FileChunk
|
|
buffer := make([]byte, spec.BufferSize)
|
|
offset := int64(0)
|
|
|
|
for {
|
|
n, err := reader.Read(buffer)
|
|
if n > 0 {
|
|
// Create chunk for this data
|
|
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
|
|
if chunkErr != nil {
|
|
return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
|
|
}
|
|
chunks = append(chunks, chunk)
|
|
offset += int64(n)
|
|
}
|
|
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read stream: %w", err)
|
|
}
|
|
}
|
|
|
|
return chunks, nil
|
|
}
|
|
|
|
// createChunkFromData creates a chunk from streaming data
|
|
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
|
|
// Assign new volume
|
|
assignResult, err := scm.s3a.assignNewVolume(dstPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("assign volume: %w", err)
|
|
}
|
|
|
|
// Create chunk
|
|
chunk := &filer_pb.FileChunk{
|
|
Offset: offset,
|
|
Size: uint64(len(data)),
|
|
}
|
|
|
|
// Set file ID
|
|
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Upload data
|
|
if err := scm.s3a.uploadChunkData(data, assignResult, false); err != nil {
|
|
return nil, fmt.Errorf("upload chunk data: %w", err)
|
|
}
|
|
|
|
return chunk, nil
|
|
}
|
|
|
|
// createMultiChunkReader creates a reader that streams from multiple chunks
|
|
func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
|
|
// Create a multi-reader that combines all chunks
|
|
var readers []io.Reader
|
|
|
|
for _, chunk := range entry.GetChunks() {
|
|
chunkReader, err := s3a.createChunkReader(chunk)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create chunk reader: %w", err)
|
|
}
|
|
readers = append(readers, chunkReader)
|
|
}
|
|
|
|
multiReader := io.MultiReader(readers...)
|
|
return &multiReadCloser{reader: multiReader}, nil
|
|
}
|
|
|
|
// createChunkReader creates a reader for a single chunk
|
|
func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
|
|
// Get chunk URL
|
|
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("lookup volume URL: %w", err)
|
|
}
|
|
|
|
// Create HTTP request for chunk data
|
|
req, err := http.NewRequest("GET", srcUrl, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create HTTP request: %w", err)
|
|
}
|
|
|
|
// Execute request
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("execute HTTP request: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
resp.Body.Close()
|
|
return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
|
|
}
|
|
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// multiReadCloser wraps a multi-reader with a close method
|
|
type multiReadCloser struct {
|
|
reader io.Reader
|
|
}
|
|
|
|
func (mrc *multiReadCloser) Read(p []byte) (int, error) {
|
|
return mrc.reader.Read(p)
|
|
}
|
|
|
|
func (mrc *multiReadCloser) Close() error {
|
|
return nil
|
|
}
|