Implement IAM propagation to S3 servers (#8130)

* Implement IAM propagation to S3 servers

- Add PropagatingCredentialStore to propagate IAM changes to S3 servers via gRPC
- Add Policy management RPCs to S3 proto and S3ApiServer
- Update CredentialManager to use PropagatingCredentialStore when MasterClient is available
- Wire FilerServer to enable propagation

* Implement parallel IAM propagation and fix S3 cluster registration

- Parallelized IAM change propagation with 10s timeout.
- Refined context usage in PropagatingCredentialStore.
- Added S3Type support to cluster node management.
- Enabled S3 servers to register with gRPC address to the master.
- Ensured IAM configuration reload after policy updates via gRPC.

* Optimize IAM propagation with direct in-memory cache updates

* Secure IAM propagation: Use metadata to skip persistence only on propagation

* pb: refactor IAM and S3 services for unidirectional IAM propagation

- Move SeaweedS3IamCache service from iam.proto to s3.proto.
- Remove legacy IAM management RPCs and empty SeaweedS3 service from s3.proto.
- Enforce that S3 servers only use the synchronization interface.

* pb: regenerate Go code for IAM and S3 services

Updated generated code following the proto refactoring of IAM synchronization services.

* s3api: implement read-only mode for Embedded IAM API

- Add readOnly flag to EmbeddedIamApi to reject write operations via HTTP.
- Enable read-only mode by default in S3ApiServer.
- Handle AccessDenied error in writeIamErrorResponse.
- Embed SeaweedS3IamCacheServer in S3ApiServer.

* credential: refactor PropagatingCredentialStore for unidirectional IAM flow

- Update to use s3_pb.SeaweedS3IamCacheClient for propagation to S3 servers.
- Propagate full Identity object via PutIdentity for consistency.
- Remove redundant propagation of specific user/account/policy management RPCs.
- Add timeout context for propagation calls.

* s3api: implement SeaweedS3IamCacheServer for unidirectional sync

- Update S3ApiServer to implement the cache synchronization gRPC interface.
- Methods (PutIdentity, RemoveIdentity, etc.) now perform direct in-memory cache updates.
- Register SeaweedS3IamCacheServer in command/s3.go.
- Remove registration for the legacy and now empty SeaweedS3 service.

* s3api: update tests for read-only IAM and propagation

- Added TestEmbeddedIamReadOnly to verify rejection of write operations in read-only mode.
- Update test setup to pass readOnly=false to NewEmbeddedIamApi in routing tests.
- Updated EmbeddedIamApiForTest helper with read-only checks matching production behavior.

* s3api: add back temporary debug logs for IAM updates

Log IAM updates received via:
- gRPC propagation (PutIdentity, PutPolicy, etc.)
- Metadata configuration reloads (LoadS3ApiConfigurationFromCredentialManager)
- Core identity management (UpsertIdentity, RemoveIdentity)

* IAM: finalize propagation fix with reduced logging and clarified architecture

* Allow configuring IAM read-only mode for S3 server integration tests

* s3api: add defensive validation to UpsertIdentity

* s3api: fix log message to reference correct IAM read-only flag

* test/s3/iam: ensure WaitForS3Service checks for IAM write permissions

* test: enable writable IAM in Makefile for integration tests

* IAM: add GetPolicy/ListPolicies RPCs to s3.proto

* S3: add GetBucketPolicy and ListBucketPolicies helpers

* S3: support storing generic IAM policies in IdentityAccessManagement

* S3: implement IAM policy RPCs using IdentityAccessManagement

* IAM: fix stale user identity on rename propagation
This commit is contained in:
Chris Lu
2026-01-26 22:59:43 -08:00
committed by GitHub
parent 0a6b289025
commit 551a31e156
26 changed files with 1131 additions and 1036 deletions

View File

@@ -44,6 +44,7 @@ type IdentityAccessManagement struct {
identities []*Identity
accessKeyIdent map[string]*Identity
nameToIdentity map[string]*Identity // O(1) lookup by identity name
policies map[string]*iam_pb.Policy
accounts map[string]*Account
emailAccount map[string]*Account
hashes map[string]*sync.Pool
@@ -83,6 +84,7 @@ type Identity struct {
PrincipalArn string // ARN for IAM authorization (e.g., "arn:aws:iam::account-id:user/username")
Disabled bool // User status: false = enabled (default), true = disabled
Claims map[string]interface{} // JWT claims for policy substitution
IsStatic bool // Whether identity was loaded from static config (immutable)
}
// Account represents a system user, a system user can
@@ -187,6 +189,7 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
iam.staticIdentityNames = make(map[string]bool)
for _, identity := range iam.identities {
iam.staticIdentityNames[identity.Name] = true
identity.IsStatic = true
}
iam.m.Unlock()
}
@@ -294,6 +297,7 @@ func (iam *IdentityAccessManagement) loadEnvironmentVariableCredentials() {
Actions: []Action{
s3_constants.ACTION_ADMIN,
},
IsStatic: true,
}
iam.m.Lock()
@@ -407,19 +411,20 @@ func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3Api
if hasStaticConfig {
// Merge mode: preserve static identities, add/update dynamic ones
return iam.mergeS3ApiConfiguration(config)
return iam.MergeS3ApiConfiguration(config)
}
// Normal mode: completely replace configuration
return iam.replaceS3ApiConfiguration(config)
return iam.ReplaceS3ApiConfiguration(config)
}
// replaceS3ApiConfiguration completely replaces the current configuration (used when no static config)
func (iam *IdentityAccessManagement) replaceS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
// ReplaceS3ApiConfiguration completely replaces the current configuration (used when no static config)
func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
var identities []*Identity
var identityAnonymous *Identity
accessKeyIdent := make(map[string]*Identity)
nameToIdentity := make(map[string]*Identity)
policies := make(map[string]*iam_pb.Policy)
accounts := make(map[string]*Account)
emailAccount := make(map[string]*Account)
foundAccountAdmin := false
@@ -458,6 +463,9 @@ func (iam *IdentityAccessManagement) replaceS3ApiConfiguration(config *iam_pb.S3
}
emailAccount[AccountAnonymous.EmailAddress] = accounts[AccountAnonymous.Id]
}
for _, policy := range config.Policies {
policies[policy.Name] = policy
}
for _, ident := range config.Identities {
glog.V(3).Infof("loading identity %s (disabled=%v)", ident.Name, ident.Disabled)
t := &Identity{
@@ -537,6 +545,7 @@ func (iam *IdentityAccessManagement) replaceS3ApiConfiguration(config *iam_pb.S3
iam.emailAccount = emailAccount
iam.accessKeyIdent = accessKeyIdent
iam.nameToIdentity = nameToIdentity
iam.policies = policies
// Update authentication state based on whether identities exist
// Once enabled, keep it enabled (one-way toggle)
authJustEnabled := iam.updateAuthenticationState(len(identities))
@@ -566,10 +575,10 @@ func (iam *IdentityAccessManagement) replaceS3ApiConfiguration(config *iam_pb.S3
return nil
}
// mergeS3ApiConfiguration merges dynamic configuration with existing static configuration
// MergeS3ApiConfiguration merges dynamic configuration with existing static configuration
// Static identities (from file) are preserved and cannot be updated
// Dynamic identities (from filer/admin) can be added or updated
func (iam *IdentityAccessManagement) mergeS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
// Start with current configuration (which includes static identities)
iam.m.RLock()
identities := make([]*Identity, len(iam.identities))
@@ -583,6 +592,10 @@ func (iam *IdentityAccessManagement) mergeS3ApiConfiguration(config *iam_pb.S3Ap
for k, v := range iam.nameToIdentity {
nameToIdentity[k] = v
}
policies := make(map[string]*iam_pb.Policy)
for k, v := range iam.policies {
policies[k] = v
}
accounts := make(map[string]*Account)
for k, v := range iam.accounts {
accounts[k] = v
@@ -755,6 +768,10 @@ func (iam *IdentityAccessManagement) mergeS3ApiConfiguration(config *iam_pb.S3Ap
glog.V(3).Infof("Loaded service account %s for dynamic parent %s (expiration: %d)", sa.Id, sa.ParentUser, sa.Expiration)
}
for _, policy := range config.Policies {
policies[policy.Name] = policy
}
iam.m.Lock()
// atomically switch
iam.identities = identities
@@ -763,6 +780,7 @@ func (iam *IdentityAccessManagement) mergeS3ApiConfiguration(config *iam_pb.S3Ap
iam.emailAccount = emailAccount
iam.accessKeyIdent = accessKeyIdent
iam.nameToIdentity = nameToIdentity
iam.policies = policies
// Update authentication state based on whether identities exist
// Once enabled, keep it enabled (one-way toggle)
authJustEnabled := iam.updateAuthenticationState(len(identities))
@@ -792,6 +810,56 @@ func (iam *IdentityAccessManagement) mergeS3ApiConfiguration(config *iam_pb.S3Ap
return nil
}
func (iam *IdentityAccessManagement) RemoveIdentity(name string) {
glog.V(1).Infof("IAM: remove identity %s", name)
iam.m.Lock()
defer iam.m.Unlock()
identity, ok := iam.nameToIdentity[name]
if !ok {
return
}
if identity.IsStatic {
glog.V(1).Infof("IAM: skipping removal of static identity %s (immutable)", name)
return
}
// Remove from identities slice
for i, ident := range iam.identities {
if ident.Name == name {
iam.identities = append(iam.identities[:i], iam.identities[i+1:]...)
break
}
}
// Remove from maps
delete(iam.nameToIdentity, name)
for _, cred := range identity.Credentials {
if iam.accessKeyIdent[cred.AccessKey] == identity {
delete(iam.accessKeyIdent, cred.AccessKey)
}
}
if identity == iam.identityAnonymous {
iam.identityAnonymous = nil
}
}
func (iam *IdentityAccessManagement) UpsertIdentity(ident *iam_pb.Identity) error {
if ident == nil {
return fmt.Errorf("upsert identity failed: nil identity")
}
if ident.Name == "" {
return fmt.Errorf("upsert identity failed: empty identity name")
}
glog.V(1).Infof("IAM: upsert identity %s", ident.Name)
return iam.MergeS3ApiConfiguration(&iam_pb.S3ApiConfiguration{
Identities: []*iam_pb.Identity{ident},
})
}
// isEnabled reports whether S3 auth should be enforced for this server.
//
// Auth is considered enabled if either:
@@ -1316,6 +1384,7 @@ func (iam *IdentityAccessManagement) GetCredentialManager() *credential.Credenti
// LoadS3ApiConfigurationFromCredentialManager loads configuration using the credential manager
func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromCredentialManager() error {
glog.V(0).Infof("IAM: reloading configuration from credential manager")
glog.V(1).Infof("Loading S3 API configuration from credential manager")
s3ApiConfiguration, err := iam.credentialManager.LoadConfiguration(context.Background())
@@ -1503,3 +1572,43 @@ func (iam *IdentityAccessManagement) authorizeWithIAM(r *http.Request, identity
// Use IAM integration for authorization
return iam.iamIntegration.AuthorizeAction(ctx, iamIdentity, action, bucket, object, r)
}
// PutPolicy adds or updates a policy
func (iam *IdentityAccessManagement) PutPolicy(name string, content string) error {
iam.m.Lock()
defer iam.m.Unlock()
if iam.policies == nil {
iam.policies = make(map[string]*iam_pb.Policy)
}
iam.policies[name] = &iam_pb.Policy{Name: name, Content: content}
return nil
}
// GetPolicy retrieves a policy by name
func (iam *IdentityAccessManagement) GetPolicy(name string) (*iam_pb.Policy, error) {
iam.m.RLock()
defer iam.m.RUnlock()
if policy, ok := iam.policies[name]; ok {
return policy, nil
}
return nil, fmt.Errorf("policy not found: %s", name)
}
// DeletePolicy removes a policy
func (iam *IdentityAccessManagement) DeletePolicy(name string) error {
iam.m.Lock()
defer iam.m.Unlock()
delete(iam.policies, name)
return nil
}
// ListPolicies lists all policies
func (iam *IdentityAccessManagement) ListPolicies() []*iam_pb.Policy {
iam.m.RLock()
defer iam.m.RUnlock()
var policies []*iam_pb.Policy
for _, p := range iam.policies {
policies = append(policies, p)
}
return policies
}