fix(gcs): resolve credential conflict and improve backup logging (#7951)
* fix(gcs): resolve credential conflict and improve backup logging - Workaround GCS SDK's "multiple credential options" error by manually constructing an authenticated HTTP client. - Include source entry path in filer backup error logs for better visibility on missing volumes/404s. * fix: address PR review feedback - Add nil check for EventNotification in getSourceKey - Avoid reassigning google_application_credentials parameter in gcs_sink.go * fix(gcs): return errors instead of calling glog.Fatalf in initialize Adheres to Go best practices and allows for more graceful failure handling by callers. * read from bind ip
This commit is contained in:
@@ -148,13 +148,13 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
|||||||
}
|
}
|
||||||
// ignore HTTP 404 from remote reads
|
// ignore HTTP 404 from remote reads
|
||||||
if errors.Is(err, http.ErrNotFound) {
|
if errors.Is(err, http.ErrNotFound) {
|
||||||
glog.V(0).Infof("got 404 error, ignore it: %s", err.Error())
|
glog.V(0).Infof("got 404 error for %s, ignore it: %s", getSourceKey(resp), err.Error())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// also ignore missing volume/lookup errors coming from LookupFileId or vid map
|
// also ignore missing volume/lookup errors coming from LookupFileId or vid map
|
||||||
errStr := err.Error()
|
errStr := err.Error()
|
||||||
if strings.Contains(errStr, "LookupFileId") || (strings.Contains(errStr, "volume id") && strings.Contains(errStr, "not found")) {
|
if strings.Contains(errStr, "LookupFileId") || (strings.Contains(errStr, "volume id") && strings.Contains(errStr, "not found")) {
|
||||||
glog.V(0).Infof("got missing-volume error, ignore it: %s", errStr)
|
glog.V(0).Infof("got missing-volume error for %s, ignore it: %s", getSourceKey(resp), errStr)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@@ -206,3 +206,17 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
|
|||||||
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSourceKey(resp *filer_pb.SubscribeMetadataResponse) string {
|
||||||
|
if resp == nil || resp.EventNotification == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
message := resp.EventNotification
|
||||||
|
if message.NewEntry != nil {
|
||||||
|
return string(util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
|
||||||
|
}
|
||||||
|
if message.OldEntry != nil {
|
||||||
|
return string(util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|||||||
@@ -918,6 +918,9 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
|
|||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Determine bind IP for health checks
|
||||||
|
bindIp := getBindIp()
|
||||||
|
|
||||||
// Prepare master address
|
// Prepare master address
|
||||||
masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port)
|
masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port)
|
||||||
|
|
||||||
@@ -961,7 +964,7 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// Wait for admin server's HTTP port to be ready before launching worker
|
// Wait for admin server's HTTP port to be ready before launching worker
|
||||||
adminAddr := fmt.Sprintf("http://127.0.0.1:%d", *miniAdminOptions.port)
|
adminAddr := fmt.Sprintf("http://%s:%d", bindIp, *miniAdminOptions.port)
|
||||||
glog.V(1).Infof("Waiting for admin server to be ready at %s...", adminAddr)
|
glog.V(1).Infof("Waiting for admin server to be ready at %s...", adminAddr)
|
||||||
if err := waitForAdminServerReady(adminAddr); err != nil {
|
if err := waitForAdminServerReady(adminAddr); err != nil {
|
||||||
glog.Fatalf("Admin server readiness check failed: %v", err)
|
glog.Fatalf("Admin server readiness check failed: %v", err)
|
||||||
@@ -971,20 +974,21 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
|
|||||||
startMiniWorker()
|
startMiniWorker()
|
||||||
|
|
||||||
// Wait for worker to be ready by polling its gRPC port
|
// Wait for worker to be ready by polling its gRPC port
|
||||||
workerGrpcAddr := fmt.Sprintf("127.0.0.1:%d", *miniAdminOptions.grpcPort)
|
workerGrpcAddr := fmt.Sprintf("%s:%d", bindIp, *miniAdminOptions.grpcPort)
|
||||||
waitForWorkerReady(workerGrpcAddr)
|
waitForWorkerReady(workerGrpcAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready
|
// waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready
|
||||||
func waitForAdminServerReady(adminAddr string) error {
|
func waitForAdminServerReady(adminAddr string) error {
|
||||||
maxAttempts := 40 // 40 * 500ms = 20 seconds max wait
|
healthAddr := fmt.Sprintf("%s/health", adminAddr)
|
||||||
|
maxAttempts := 60 // 60 * 500ms = 30 seconds max wait
|
||||||
attempt := 0
|
attempt := 0
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
Timeout: 500 * time.Millisecond,
|
Timeout: 1 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
for attempt < maxAttempts {
|
for attempt < maxAttempts {
|
||||||
resp, err := client.Get(adminAddr)
|
resp, err := client.Get(healthAddr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
glog.V(1).Infof("Admin server is ready at %s", adminAddr)
|
glog.V(1).Infof("Admin server is ready at %s", adminAddr)
|
||||||
|
|||||||
@@ -4,14 +4,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
|
|
||||||
|
|
||||||
"cloud.google.com/go/storage"
|
"cloud.google.com/go/storage"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
|
||||||
|
"golang.org/x/oauth2"
|
||||||
|
"golang.org/x/oauth2/google"
|
||||||
"google.golang.org/api/option"
|
"google.golang.org/api/option"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
|
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
||||||
@@ -60,16 +61,30 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str
|
|||||||
g.dir = dir
|
g.dir = dir
|
||||||
|
|
||||||
// Creates a client.
|
// Creates a client.
|
||||||
if google_application_credentials == "" {
|
var clientOpts []option.ClientOption
|
||||||
var found bool
|
if google_application_credentials != "" {
|
||||||
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
|
var data []byte
|
||||||
if !found {
|
var err error
|
||||||
glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
|
if strings.HasPrefix(google_application_credentials, "{") {
|
||||||
|
data = []byte(google_application_credentials)
|
||||||
|
} else {
|
||||||
|
googleCredentialsPath := util.ResolvePath(google_application_credentials)
|
||||||
|
data, err = os.ReadFile(googleCredentialsPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read credentials file %s: %v", googleCredentialsPath, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
creds, err := google.CredentialsFromJSON(context.Background(), data, storage.ScopeFullControl)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to parse credentials: %v", err)
|
||||||
|
}
|
||||||
|
httpClient := oauth2.NewClient(context.Background(), creds.TokenSource)
|
||||||
|
clientOpts = append(clientOpts, option.WithHTTPClient(httpClient), option.WithoutAuthentication())
|
||||||
}
|
}
|
||||||
client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials))
|
client, err := storage.NewClient(context.Background(), clientOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Failed to create client: %v", err)
|
return fmt.Errorf("failed to create client with credentials \"%s\" env \"%s\": %v",
|
||||||
|
google_application_credentials, os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
g.client = client
|
g.client = client
|
||||||
|
|||||||
Reference in New Issue
Block a user