test: add Trino Iceberg catalog integration test (#8228)

* test: add Trino Iceberg catalog integration test

- Create test/s3/catalog_trino/trino_catalog_test.go with TestTrinoIcebergCatalog
- Tests integration between Trino SQL engine and SeaweedFS Iceberg REST catalog
- Starts weed mini with all services and Trino in Docker container
- Validates Iceberg catalog schema creation and listing operations
- Uses native S3 filesystem support in Trino with path-style access
- Add workflow job to s3-tables-tests.yml for CI execution

* fix: preserve AWS environment credentials when replacing S3 configuration

When S3 configuration is loaded from filer/db, it replaces the identities list
and inadvertently removes AWS_ACCESS_KEY_ID credentials that were added from
environment variables. This caused auth to remain disabled even though valid
credentials were present.

Fix by preserving environment-based identities when replacing the configuration
and re-adding them after the replacement. This ensures environment credentials
persist across configuration reloads and properly enable authentication.

* fix: use correct ServerAddress format with gRPC port encoding

The admin server couldn't connect to master because the master address
was missing the gRPC port information. Use pb.NewServerAddress() which
properly encodes both HTTP and gRPC ports in the address string.

Changes:
- weed/command/mini.go: Use pb.NewServerAddress for master address in admin
- test/s3/policy/policy_test.go: Store and use gRPC ports for master/filer addresses

This fix applies to:
1. Admin server connection to master (mini.go)
2. Test shell commands that need master/filer addresses (policy_test.go)

* move

* move

* fix: always include gRPC port in server address encoding

The NewServerAddress() function was omitting the gRPC port from the address
string when it matched the port+10000 convention. However, gRPC port allocation
doesn't always follow this convention - when the calculated port is busy, an
alternative port is allocated.

This caused a bug where:
1. Master's gRPC port was allocated as 50661 (sequential, not port+10000)
2. Address was encoded as '192.168.1.66:50660' (gRPC port omitted)
3. Admin client called ToGrpcAddress() which assumed port+10000 offset
4. Admin tried to connect to 60660 but master was on 50661 → connection failed

Fix: Always include explicit gRPC port in address format (host:httpPort.grpcPort)
unless gRPC port is 0. This makes addresses unambiguous and works regardless of
the port allocation strategy used.

Impacts: All server-to-server gRPC connections now use properly formatted addresses.

* test: fix Iceberg REST API readiness check

The Iceberg REST API endpoints require authentication. When checked without
credentials, the API returns 403 Forbidden (not 401 Unauthorized).  The
readiness check now accepts both auth error codes (401/403) as indicators
that the service is up and ready, it just needs credentials.

This fixes the 'Iceberg REST API did not become ready' test failure.

* Fix AWS SigV4 signature verification for base64-encoded payload hashes

   AWS SigV4 canonical requests must use hex-encoded SHA256 hashes,
   but the X-Amz-Content-Sha256 header may be transmitted as base64.

   Changes:
   - Added normalizePayloadHash() function to convert base64 to hex
   - Call normalizePayloadHash() in extractV4AuthInfoFromHeader()
   - Added encoding/base64 import

   Fixes 403 Forbidden errors on POST requests to Iceberg REST API
   when clients send base64-encoded content hashes in the header.

   Impacted services: Iceberg REST API, S3Tables

* Fix AWS SigV4 signature verification for base64-encoded payload hashes

   AWS SigV4 canonical requests must use hex-encoded SHA256 hashes,
   but the X-Amz-Content-Sha256 header may be transmitted as base64.

   Changes:
   - Added normalizePayloadHash() function to convert base64 to hex
   - Call normalizePayloadHash() in extractV4AuthInfoFromHeader()
   - Added encoding/base64 import
   - Removed unused fmt import

   Fixes 403 Forbidden errors on POST requests to Iceberg REST API
   when clients send base64-encoded content hashes in the header.

   Impacted services: Iceberg REST API, S3Tables

* pass sigv4

* s3api: fix identity preservation and logging levels

- Ensure environment-based identities are preserved during config replacement
- Update accessKeyIdent and nameToIdentity maps correctly
- Downgrade informational logs to V(2) to reduce noise

* test: fix trino integration test and s3 policy test

- Pin Trino image version to 479
- Fix port binding to 0.0.0.0 for Docker connectivity
- Fix S3 policy test hang by correctly assigning MiniClusterCtx
- Improve port finding robustness in policy tests

* ci: pre-pull trino image to avoid timeouts

- Pull trinodb/trino:479 after Docker setup
- Ensure image is ready before integration tests start

* iceberg: remove unused checkAuth and improve logging

- Remove unused checkAuth method
- Downgrade informational logs to V(2)
- Ensure loggingMiddleware uses a status writer for accurate reported codes
- Narrow catch-all route to avoid interfering with other subsystems

* iceberg: fix build failure by removing unused s3api import

* Update iceberg.go

* use warehouse

* Update trino_catalog_test.go
This commit is contained in:
Chris Lu
2026-02-06 13:12:25 -08:00
committed by GitHub
parent a04e8dd00b
commit a3b83f8808
13 changed files with 839 additions and 135 deletions

View File

@@ -963,8 +963,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
// Determine bind IP for health checks
bindIp := getBindIp()
// Prepare master address
masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port)
// Prepare master address with gRPC port
masterAddr := string(pb.NewServerAddress(*miniIp, *miniMasterOptions.port, *miniMasterOptions.portGrpc))
// Set admin options
*miniAdminOptions.master = masterAddr

View File

@@ -15,7 +15,7 @@ type ServerAddresses string
type ServerSrvAddress string
func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
if grpcPort == 0 || grpcPort == port+10000 {
if grpcPort == 0 {
return ServerAddress(util.JoinHostPort(host, port))
}
return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort))
@@ -25,10 +25,6 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress {
if grpcPort == 0 {
return ServerAddress(address)
}
_, port, _ := hostAndPort(address)
if uint64(grpcPort) == port+10000 {
return ServerAddress(address)
}
return ServerAddress(address + "." + strconv.Itoa(grpcPort))
}

View File

@@ -538,17 +538,45 @@ func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3
}
iam.m.Lock()
// Save existing environment-based identities before replacement
// This ensures AWS_ACCESS_KEY_ID credentials are preserved
envIdentities := make([]*Identity, 0)
for _, ident := range iam.identities {
if ident.IsStatic && strings.HasPrefix(ident.Name, "admin-") {
// This is an environment-based admin identity, preserve it
envIdentities = append(envIdentities, ident)
}
}
// atomically switch
iam.identities = identities
iam.identityAnonymous = identityAnonymous
iam.accounts = accounts
iam.emailAccount = emailAccount
iam.accessKeyIdent = accessKeyIdent
iam.nameToIdentity = nameToIdentity
iam.accessKeyIdent = accessKeyIdent
iam.policies = policies
// Re-add environment-based identities that were preserved
for _, envIdent := range envIdentities {
// Check if this identity already exists in the new config
exists := false
for _, ident := range iam.identities {
if ident.Name == envIdent.Name {
exists = true
break
}
}
if !exists {
iam.identities = append(iam.identities, envIdent)
iam.accessKeyIdent[envIdent.Credentials[0].AccessKey] = envIdent
iam.nameToIdentity[envIdent.Name] = envIdent
}
}
// Update authentication state based on whether identities exist
// Once enabled, keep it enabled (one-way toggle)
authJustEnabled := iam.updateAuthenticationState(len(identities))
authJustEnabled := iam.updateAuthenticationState(len(iam.identities))
iam.m.Unlock()
if authJustEnabled {
@@ -778,9 +806,10 @@ func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3Ap
iam.identityAnonymous = identityAnonymous
iam.accounts = accounts
iam.emailAccount = emailAccount
iam.accessKeyIdent = accessKeyIdent
iam.nameToIdentity = nameToIdentity
iam.accessKeyIdent = accessKeyIdent
iam.policies = policies
iam.accessKeyIdent = accessKeyIdent
// Update authentication state based on whether identities exist
// Once enabled, keep it enabled (one-way toggle)
authJustEnabled := iam.updateAuthenticationState(len(identities))

View File

@@ -22,6 +22,7 @@ import (
"crypto/hmac"
"crypto/sha256"
"crypto/subtle"
"encoding/base64"
"encoding/hex"
"io"
"net"
@@ -104,6 +105,24 @@ func getContentSha256Cksum(r *http.Request) string {
return emptySHA256
}
// normalizePayloadHash converts base64-encoded payload hash to hex format.
// AWS SigV4 canonical requests always use hex-encoded SHA256.
func normalizePayloadHash(payloadHashValue string) string {
// Special values and hex-encoded hashes don't need conversion
if payloadHashValue == emptySHA256 || payloadHashValue == unsignedPayload ||
payloadHashValue == streamingContentSHA256 || payloadHashValue == streamingContentSHA256Trailer ||
payloadHashValue == streamingUnsignedPayload || len(payloadHashValue) == 64 {
return payloadHashValue
}
// Try to decode as base64 and convert to hex
if decodedBytes, err := base64.StdEncoding.DecodeString(payloadHashValue); err == nil && len(decodedBytes) == 32 {
return hex.EncodeToString(decodedBytes)
}
return payloadHashValue
}
// signValues data type represents structured form of AWS Signature V4 header.
type signValues struct {
Credential credentialHeader
@@ -485,6 +504,10 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode)
}
}
// Normalize payload hash to hex format for canonical request
// AWS SigV4 canonical requests always use hex-encoded SHA256
normalizedPayload := normalizePayloadHash(hashedPayload)
return &v4AuthInfo{
Signature: signV4Values.Signature,
AccessKey: signV4Values.Credential.accessKey,
@@ -493,7 +516,7 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode)
Region: signV4Values.Credential.scope.region,
Service: signV4Values.Credential.scope.service,
Scope: signV4Values.Credential.getScope(),
HashedPayload: hashedPayload,
HashedPayload: normalizedPayload,
IsPresigned: false,
}, s3err.ErrNone
}

View File

@@ -18,37 +18,11 @@ import (
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
func (s *Server) checkAuth(w http.ResponseWriter, r *http.Request, action s3api.Action, bucketName string) bool {
identityName := s3_constants.GetIdentityNameFromContext(r)
if identityName == "" {
writeError(w, http.StatusUnauthorized, "NotAuthorizedException", "Authentication required")
return false
}
identityObj := s3_constants.GetIdentityFromContext(r)
if identityObj == nil {
writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: missing identity")
return false
}
identity, ok := identityObj.(*s3api.Identity)
if !ok {
writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: invalid identity")
return false
}
if !identity.CanDo(action, bucketName, "") {
writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied")
return false
}
return true
}
// FilerClient provides access to the filer for storage operations.
type FilerClient interface {
WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error
@@ -79,17 +53,20 @@ func NewServer(filerClient FilerClient, authenticator S3Authenticator) *Server {
// RegisterRoutes registers Iceberg REST API routes on the provided router.
func (s *Server) RegisterRoutes(router *mux.Router) {
// Configuration endpoint
router.HandleFunc("/v1/config", s.Auth(s.handleConfig)).Methods(http.MethodGet)
// Add middleware to log all requests/responses
router.Use(loggingMiddleware)
// Namespace endpoints
// Configuration endpoint - no auth needed for config
router.HandleFunc("/v1/config", s.handleConfig).Methods(http.MethodGet)
// Namespace endpoints - wrapped with Auth middleware
router.HandleFunc("/v1/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet)
router.HandleFunc("/v1/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost)
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet)
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead)
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete)
// Table endpoints
// Table endpoints - wrapped with Auth middleware
router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet)
router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost)
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet)
@@ -97,7 +74,7 @@ func (s *Server) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete)
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost)
// With prefix support
// With prefix support - wrapped with Auth middleware
router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet)
router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost)
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet)
@@ -110,7 +87,48 @@ func (s *Server) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete)
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost)
glog.V(0).Infof("Registered Iceberg REST Catalog routes")
// Catch-all for debugging
router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("Catch-all route hit: %s %s", r.Method, r.RequestURI)
writeError(w, http.StatusNotFound, "NotFound", "Path not found")
})
glog.V(2).Infof("Registered Iceberg REST Catalog routes")
}
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("Iceberg REST request: %s %s from %s", r.Method, r.RequestURI, r.RemoteAddr)
// Log all headers for debugging
glog.V(2).Infof("Iceberg REST headers:")
for name, values := range r.Header {
for _, value := range values {
// Redact sensitive headers
if name == "Authorization" && len(value) > 20 {
glog.V(2).Infof(" %s: %s...%s", name, value[:20], value[len(value)-10:])
} else {
glog.V(2).Infof(" %s: %s", name, value)
}
}
}
// Create a response writer that captures the status code
wrapped := &responseWriter{ResponseWriter: w}
next.ServeHTTP(wrapped, r)
glog.V(2).Infof("Iceberg REST response: %s %s -> %d", r.Method, r.RequestURI, wrapped.statusCode)
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (w *responseWriter) WriteHeader(code int) {
w.statusCode = code
w.ResponseWriter.WriteHeader(code)
}
func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc {
@@ -293,8 +311,8 @@ func getBucketFromPrefix(r *http.Request) string {
if prefix := vars["prefix"]; prefix != "" {
return prefix
}
// Default bucket if no prefix
return "default"
// Default bucket if no prefix - use "warehouse" for Iceberg
return "warehouse"
}
// buildTableBucketARN builds an ARN for a table bucket.
@@ -305,25 +323,28 @@ func buildTableBucketARN(bucketName string) string {
// handleConfig returns catalog configuration.
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
glog.Infof("handleConfig: START")
glog.Infof("handleConfig: setting Content-Type header")
w.Header().Set("Content-Type", "application/json")
config := CatalogConfig{
Defaults: map[string]string{},
Overrides: map[string]string{},
}
writeJSON(w, http.StatusOK, config)
glog.Infof("handleConfig: encoding JSON")
if err := json.NewEncoder(w).Encode(config); err != nil {
glog.Warningf("handleConfig: Failed to encode config: %v", err)
}
glog.Infof("handleConfig: COMPLETE")
}
// handleListNamespaces lists namespaces in a catalog.
func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Use S3 Tables manager to list namespaces
var resp s3tables.ListNamespacesResponse
req := &s3tables.ListNamespacesRequest{
@@ -333,11 +354,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName)
})
if err != nil {
glog.V(1).Infof("Iceberg: ListNamespaces error: %v", err)
glog.Infof("Iceberg: ListNamespaces error: %v", err)
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
return
}
@@ -357,11 +378,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
// handleCreateNamespace creates a new namespace.
func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) {
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
var req CreateNamespaceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body")
@@ -382,15 +403,18 @@ func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, "")
glog.Errorf("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName)
})
if err != nil {
glog.Errorf("Iceberg: handleCreateNamespace error: %v", err)
if strings.Contains(err.Error(), "already exists") {
writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error())
return
}
glog.V(1).Infof("Iceberg: CreateNamespace error: %v", err)
glog.Infof("Iceberg: CreateNamespace error: %v", err)
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
return
}
@@ -418,11 +442,11 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Use S3 Tables manager to get namespace
getReq := &s3tables.GetNamespaceRequest{
TableBucketARN: bucketARN,
@@ -432,7 +456,7 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName)
})
if err != nil {
@@ -462,11 +486,11 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
getReq := &s3tables.GetNamespaceRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@@ -475,7 +499,7 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName)
})
if err != nil {
@@ -500,11 +524,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
deleteReq := &s3tables.DeleteNamespaceRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@@ -512,10 +536,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, identityName)
})
if err != nil {
if strings.Contains(err.Error(), "not found") {
writeError(w, http.StatusNotFound, "NoSuchNamespaceException", fmt.Sprintf("Namespace does not exist: %v", namespace))
return
@@ -542,11 +567,11 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
listReq := &s3tables.ListTablesRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@@ -556,7 +581,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName)
})
if err != nil {
@@ -605,11 +630,11 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Generate UUID for the new table
tableUUID := uuid.New()
location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), req.Name)
@@ -657,7 +682,7 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, identityName)
})
if err != nil {
@@ -696,11 +721,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@@ -710,10 +735,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
if strings.Contains(err.Error(), "not found") {
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
return
@@ -771,11 +797,11 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@@ -785,7 +811,7 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
@@ -807,11 +833,11 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
deleteReq := &s3tables.DeleteTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@@ -820,7 +846,7 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, identityName)
})
if err != nil {
@@ -849,9 +875,10 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Parse the commit request
var req CommitTableRequest
@@ -860,8 +887,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// First, load current table metadata
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
@@ -872,7 +897,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
@@ -985,7 +1010,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
// 1. Write metadata file (this would normally be an S3 PutObject,
// but s3tables manager handles the metadata storage logic)
// For now, we assume s3tables.UpdateTable handles the reference update.
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName)
})
if err != nil {
@@ -1002,14 +1027,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, result)
}
// loadTableResultJSON is used for JSON serialization of LoadTableResult.
// It wraps table.Metadata (which is an interface) for proper JSON output.
type loadTableResultJSON struct {
MetadataLocation string `json:"metadata-location,omitempty"`
Metadata table.Metadata `json:"metadata"`
Config iceberg.Properties `json:"config,omitempty"`
}
// newTableMetadata creates a new table.Metadata object with the given parameters.
// Uses iceberg-go's MetadataBuilder pattern for proper spec compliance.
func newTableMetadata(

View File

@@ -632,6 +632,7 @@ func buildUntagResourceRequest(r *http.Request) (interface{}, error) {
// which performs granular permission checks based on the specific operation.
func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("S3Tables: authenticateS3Tables called, iam.isEnabled()=%t", s3a.iam.isEnabled())
if !s3a.iam.isEnabled() {
f(w, r)
return
@@ -640,15 +641,19 @@ func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFun
// Use AuthSignatureOnly to authenticate the request without authorizing specific actions
identity, errCode := s3a.iam.AuthSignatureOnly(r)
if errCode != s3err.ErrNone {
glog.Errorf("S3Tables: AuthSignatureOnly failed: %v", errCode)
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Store the authenticated identity in request context
if identity != nil && identity.Name != "" {
glog.V(2).Infof("S3Tables: authenticated identity Name=%s Account.Id=%s", identity.Name, identity.Account.Id)
ctx := s3_constants.SetIdentityNameInContext(r.Context(), identity.Name)
ctx = s3_constants.SetIdentityInContext(ctx, identity)
r = r.WithContext(ctx)
} else {
glog.V(2).Infof("S3Tables: authenticated identity is nil or empty name")
}
f(w, r)

View File

@@ -164,9 +164,32 @@ func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request,
// This is also used as the principal for permission checks, ensuring alignment between
// the caller identity and ownership verification when IAM is enabled.
func (h *S3TablesHandler) getAccountID(r *http.Request) string {
identityRaw := s3_constants.GetIdentityFromContext(r)
if identityRaw != nil {
// Use reflection to access the Account.Id field to avoid import cycle
val := reflect.ValueOf(identityRaw)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
if val.Kind() == reflect.Struct {
accountField := val.FieldByName("Account")
if accountField.IsValid() && !accountField.IsNil() {
accountVal := accountField.Elem()
if accountVal.Kind() == reflect.Struct {
idField := accountVal.FieldByName("Id")
if idField.IsValid() && idField.Kind() == reflect.String {
id := idField.String()
return id
}
}
}
}
}
if identityName := s3_constants.GetIdentityNameFromContext(r); identityName != "" {
return identityName
}
if accountID := r.Header.Get(s3_constants.AmzAccountId); accountID != "" {
return accountID
}

View File

@@ -9,13 +9,16 @@ import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// handleCreateNamespace creates a new namespace in a table bucket
func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
glog.Errorf("S3Tables: handleCreateNamespace called")
var req CreateNamespaceRequest
if err := h.readRequestBody(r, &req); err != nil {
glog.Errorf("S3Tables: handleCreateNamespace failed to read request body: %v", err)
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
@@ -83,12 +86,14 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
principal := h.getAccountID(r)
identityActions := getIdentityActions(r)
glog.Infof("S3Tables: CreateNamespace permission check - principal=%s, owner=%s, actions=%v", principal, bucketMetadata.OwnerAccountID, identityActions)
if !CheckPermissionWithContext("CreateNamespace", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableBucketTags: bucketTags,
IdentityActions: identityActions,
}) {
glog.Infof("S3Tables: Permission denied for CreateNamespace - principal=%s, owner=%s", principal, bucketMetadata.OwnerAccountID)
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create namespace in this bucket")
return ErrAccessDenied
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
@@ -110,6 +111,8 @@ func CheckPermissionWithContext(operation, principal, owner, resourcePolicy, res
return true
}
glog.V(2).Infof("S3Tables: CheckPermission operation=%s principal=%s owner=%s", operation, principal, owner)
return checkPermission(operation, principal, owner, resourcePolicy, resourceARN, ctx)
}