Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
This commit is contained in:
@@ -41,9 +41,6 @@ const (
|
||||
// with consistent retry configuration across the application.
|
||||
// This centralizes the retry policy to ensure uniform behavior between
|
||||
// remote storage and replication sink implementations.
|
||||
//
|
||||
// Related: Use DefaultAzureOpTimeout for context.WithTimeout when calling Azure operations
|
||||
// to ensure the timeout accommodates all retry attempts configured here.
|
||||
func DefaultAzBlobClientOptions() *azblob.ClientOptions {
|
||||
return &azblob.ClientOptions{
|
||||
ClientOptions: azcore.ClientOptions{
|
||||
@@ -130,6 +127,32 @@ type azureRemoteStorageClient struct {
|
||||
|
||||
var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
|
||||
|
||||
func (az *azureRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
|
||||
key := loc.Path[1:]
|
||||
ctx, cancel := context.WithTimeout(context.Background(), DefaultAzureOpTimeout)
|
||||
defer cancel()
|
||||
resp, err := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key).GetProperties(ctx, nil)
|
||||
if err != nil {
|
||||
if bloberror.HasCode(err, bloberror.BlobNotFound) {
|
||||
return nil, remote_storage.ErrRemoteObjectNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("stat azure %s%s: %w", loc.Bucket, loc.Path, err)
|
||||
}
|
||||
remoteEntry = &filer_pb.RemoteEntry{
|
||||
StorageName: az.conf.Name,
|
||||
}
|
||||
if resp.ContentLength != nil {
|
||||
remoteEntry.RemoteSize = *resp.ContentLength
|
||||
}
|
||||
if resp.LastModified != nil {
|
||||
remoteEntry.RemoteMtime = resp.LastModified.Unix()
|
||||
}
|
||||
if resp.ETag != nil {
|
||||
remoteEntry.RemoteETag = string(*resp.ETag)
|
||||
}
|
||||
return remoteEntry, nil
|
||||
}
|
||||
|
||||
func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
|
||||
|
||||
pathKey := loc.Path[1:]
|
||||
@@ -241,29 +264,7 @@ func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocati
|
||||
}
|
||||
|
||||
func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
|
||||
key := loc.Path[1:]
|
||||
blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
|
||||
|
||||
props, err := blobClient.GetProperties(context.Background(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
remoteEntry := &filer_pb.RemoteEntry{
|
||||
StorageName: az.conf.Name,
|
||||
}
|
||||
|
||||
if props.LastModified != nil {
|
||||
remoteEntry.RemoteMtime = props.LastModified.Unix()
|
||||
}
|
||||
if props.ContentLength != nil {
|
||||
remoteEntry.RemoteSize = *props.ContentLength
|
||||
}
|
||||
if props.ETag != nil {
|
||||
remoteEntry.RemoteETag = string(*props.ETag)
|
||||
}
|
||||
|
||||
return remoteEntry, nil
|
||||
return az.StatFile(loc)
|
||||
}
|
||||
|
||||
func toMetadata(attributes map[string][]byte) map[string]*string {
|
||||
|
||||
@@ -10,7 +10,9 @@ import (
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestAzureStorageClientBasic tests basic Azure storage client operations
|
||||
@@ -378,3 +380,13 @@ func TestAzureStorageClientErrors(t *testing.T) {
|
||||
t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAzureRemoteStorageClientImplementsInterface(t *testing.T) {
|
||||
var _ remote_storage.RemoteStorageClient = (*azureRemoteStorageClient)(nil)
|
||||
}
|
||||
|
||||
func TestAzureErrRemoteObjectNotFoundIsAccessible(t *testing.T) {
|
||||
require.Error(t, remote_storage.ErrRemoteObjectNotFound)
|
||||
require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error())
|
||||
}
|
||||
|
||||
|
||||
@@ -2,11 +2,13 @@ package gcs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -126,6 +128,28 @@ func (gcs *gcsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const defaultGCSOpTimeout = 30 * time.Second
|
||||
|
||||
func (gcs *gcsRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
|
||||
key := loc.Path[1:]
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultGCSOpTimeout)
|
||||
defer cancel()
|
||||
attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrObjectNotExist) {
|
||||
return nil, remote_storage.ErrRemoteObjectNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("stat gcs %s%s: %w", loc.Bucket, loc.Path, err)
|
||||
}
|
||||
return &filer_pb.RemoteEntry{
|
||||
StorageName: gcs.conf.Name,
|
||||
RemoteMtime: attr.Updated.Unix(),
|
||||
RemoteSize: attr.Size,
|
||||
RemoteETag: attr.Etag,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (gcs *gcsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
|
||||
|
||||
key := loc.Path[1:]
|
||||
@@ -170,20 +194,7 @@ func (gcs *gcsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocatio
|
||||
}
|
||||
|
||||
func (gcs *gcsRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
|
||||
key := loc.Path[1:]
|
||||
attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(context.Background())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &filer_pb.RemoteEntry{
|
||||
RemoteMtime: attr.Updated.Unix(),
|
||||
RemoteSize: attr.Size,
|
||||
RemoteETag: attr.Etag,
|
||||
StorageName: gcs.conf.Name,
|
||||
}, nil
|
||||
|
||||
return gcs.StatFile(loc)
|
||||
}
|
||||
|
||||
func toMetadata(attributes map[string][]byte) map[string]string {
|
||||
|
||||
17
weed/remote_storage/gcs/gcs_storage_client_test.go
Normal file
17
weed/remote_storage/gcs/gcs_storage_client_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package gcs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGCSRemoteStorageClientImplementsInterface(t *testing.T) {
|
||||
var _ remote_storage.RemoteStorageClient = (*gcsRemoteStorageClient)(nil)
|
||||
}
|
||||
|
||||
func TestGCSErrRemoteObjectNotFoundIsAccessible(t *testing.T) {
|
||||
require.Error(t, remote_storage.ErrRemoteObjectNotFound)
|
||||
require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error())
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package remote_storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
@@ -69,8 +70,12 @@ type Bucket struct {
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// ErrRemoteObjectNotFound is returned by StatFile when the object does not exist in the remote storage backend.
|
||||
var ErrRemoteObjectNotFound = errors.New("remote object not found")
|
||||
|
||||
type RemoteStorageClient interface {
|
||||
Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error
|
||||
StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error)
|
||||
ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
|
||||
WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
|
||||
RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error)
|
||||
|
||||
@@ -3,9 +3,11 @@ package s3
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
@@ -119,6 +121,33 @@ func (s *s3RemoteStorageClient) Traverse(remote *remote_pb.RemoteStorageLocation
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *s3RemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
|
||||
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
|
||||
Bucket: aws.String(loc.Bucket),
|
||||
Key: aws.String(loc.Path[1:]),
|
||||
})
|
||||
if err != nil {
|
||||
if reqErr, ok := err.(awserr.RequestFailure); ok && reqErr.StatusCode() == http.StatusNotFound {
|
||||
return nil, remote_storage.ErrRemoteObjectNotFound
|
||||
}
|
||||
return nil, fmt.Errorf("stat s3 %s%s: %w", loc.Bucket, loc.Path, err)
|
||||
}
|
||||
remoteEntry = &filer_pb.RemoteEntry{
|
||||
StorageName: s.conf.Name,
|
||||
}
|
||||
if resp.ContentLength != nil {
|
||||
remoteEntry.RemoteSize = *resp.ContentLength
|
||||
}
|
||||
if resp.LastModified != nil {
|
||||
remoteEntry.RemoteMtime = resp.LastModified.Unix()
|
||||
}
|
||||
if resp.ETag != nil {
|
||||
remoteEntry.RemoteETag = *resp.ETag
|
||||
}
|
||||
return remoteEntry, nil
|
||||
}
|
||||
|
||||
func (s *s3RemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
|
||||
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
|
||||
u.PartSize = int64(4 * 1024 * 1024)
|
||||
@@ -208,21 +237,7 @@ func toTagging(attributes map[string][]byte) *s3.Tagging {
|
||||
}
|
||||
|
||||
func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
|
||||
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
|
||||
Bucket: aws.String(loc.Bucket),
|
||||
Key: aws.String(loc.Path[1:]),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &filer_pb.RemoteEntry{
|
||||
RemoteMtime: resp.LastModified.Unix(),
|
||||
RemoteSize: *resp.ContentLength,
|
||||
RemoteETag: *resp.ETag,
|
||||
StorageName: s.conf.Name,
|
||||
}, nil
|
||||
|
||||
return s.StatFile(loc)
|
||||
}
|
||||
|
||||
func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
awss3 "github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -55,3 +56,12 @@ func TestS3MakeUsesStaticCredentialsWhenKeysAreProvided(t *testing.T) {
|
||||
require.Equal(t, conf.S3AccessKey, credValue.AccessKeyID)
|
||||
require.Equal(t, conf.S3SecretKey, credValue.SecretAccessKey)
|
||||
}
|
||||
|
||||
func TestS3RemoteStorageClientImplementsInterface(t *testing.T) {
|
||||
var _ remote_storage.RemoteStorageClient = (*s3RemoteStorageClient)(nil)
|
||||
}
|
||||
|
||||
func TestS3ErrRemoteObjectNotFoundIsAccessible(t *testing.T) {
|
||||
require.Error(t, remote_storage.ErrRemoteObjectNotFound)
|
||||
require.Equal(t, "remote object not found", remote_storage.ErrRemoteObjectNotFound.Error())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user