* Implement IAM propagation to S3 servers - Add PropagatingCredentialStore to propagate IAM changes to S3 servers via gRPC - Add Policy management RPCs to S3 proto and S3ApiServer - Update CredentialManager to use PropagatingCredentialStore when MasterClient is available - Wire FilerServer to enable propagation * Implement parallel IAM propagation and fix S3 cluster registration - Parallelized IAM change propagation with 10s timeout. - Refined context usage in PropagatingCredentialStore. - Added S3Type support to cluster node management. - Enabled S3 servers to register with gRPC address to the master. - Ensured IAM configuration reload after policy updates via gRPC. * Optimize IAM propagation with direct in-memory cache updates * Secure IAM propagation: Use metadata to skip persistence only on propagation * pb: refactor IAM and S3 services for unidirectional IAM propagation - Move SeaweedS3IamCache service from iam.proto to s3.proto. - Remove legacy IAM management RPCs and empty SeaweedS3 service from s3.proto. - Enforce that S3 servers only use the synchronization interface. * pb: regenerate Go code for IAM and S3 services Updated generated code following the proto refactoring of IAM synchronization services. * s3api: implement read-only mode for Embedded IAM API - Add readOnly flag to EmbeddedIamApi to reject write operations via HTTP. - Enable read-only mode by default in S3ApiServer. - Handle AccessDenied error in writeIamErrorResponse. - Embed SeaweedS3IamCacheServer in S3ApiServer. * credential: refactor PropagatingCredentialStore for unidirectional IAM flow - Update to use s3_pb.SeaweedS3IamCacheClient for propagation to S3 servers. - Propagate full Identity object via PutIdentity for consistency. - Remove redundant propagation of specific user/account/policy management RPCs. - Add timeout context for propagation calls. * s3api: implement SeaweedS3IamCacheServer for unidirectional sync - Update S3ApiServer to implement the cache synchronization gRPC interface. - Methods (PutIdentity, RemoveIdentity, etc.) now perform direct in-memory cache updates. - Register SeaweedS3IamCacheServer in command/s3.go. - Remove registration for the legacy and now empty SeaweedS3 service. * s3api: update tests for read-only IAM and propagation - Added TestEmbeddedIamReadOnly to verify rejection of write operations in read-only mode. - Update test setup to pass readOnly=false to NewEmbeddedIamApi in routing tests. - Updated EmbeddedIamApiForTest helper with read-only checks matching production behavior. * s3api: add back temporary debug logs for IAM updates Log IAM updates received via: - gRPC propagation (PutIdentity, PutPolicy, etc.) - Metadata configuration reloads (LoadS3ApiConfigurationFromCredentialManager) - Core identity management (UpsertIdentity, RemoveIdentity) * IAM: finalize propagation fix with reduced logging and clarified architecture * Allow configuring IAM read-only mode for S3 server integration tests * s3api: add defensive validation to UpsertIdentity * s3api: fix log message to reference correct IAM read-only flag * test/s3/iam: ensure WaitForS3Service checks for IAM write permissions * test: enable writable IAM in Makefile for integration tests * IAM: add GetPolicy/ListPolicies RPCs to s3.proto * S3: add GetBucketPolicy and ListBucketPolicies helpers * S3: support storing generic IAM policies in IdentityAccessManagement * S3: implement IAM policy RPCs using IdentityAccessManagement * IAM: fix stale user identity on rename propagation
164 lines
4.8 KiB
Go
164 lines
4.8 KiB
Go
package cluster
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
const (
|
|
MasterType = "master"
|
|
VolumeServerType = "volumeServer"
|
|
FilerType = "filer"
|
|
BrokerType = "broker"
|
|
S3Type = "s3"
|
|
)
|
|
|
|
type FilerGroupName string
|
|
type DataCenter string
|
|
type Rack string
|
|
|
|
type ClusterNode struct {
|
|
Address pb.ServerAddress
|
|
Version string
|
|
counter int
|
|
CreatedTs time.Time
|
|
DataCenter DataCenter
|
|
Rack Rack
|
|
}
|
|
|
|
type ClusterNodeGroups struct {
|
|
groupMembers map[FilerGroupName]*GroupMembers
|
|
sync.RWMutex
|
|
}
|
|
type Cluster struct {
|
|
filerGroups *ClusterNodeGroups
|
|
brokerGroups *ClusterNodeGroups
|
|
s3Groups *ClusterNodeGroups
|
|
}
|
|
|
|
func newClusterNodeGroups() *ClusterNodeGroups {
|
|
return &ClusterNodeGroups{
|
|
groupMembers: map[FilerGroupName]*GroupMembers{},
|
|
}
|
|
}
|
|
func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers {
|
|
members, found := g.groupMembers[filerGroup]
|
|
if !found && createIfNotFound {
|
|
members = newGroupMembers()
|
|
g.groupMembers[filerGroup] = members
|
|
}
|
|
return members
|
|
}
|
|
|
|
func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
|
|
g.Lock()
|
|
defer g.Unlock()
|
|
m := g.getGroupMembers(filerGroup, true)
|
|
if t := m.addMember(dataCenter, rack, address, version); t != nil {
|
|
return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
|
|
}
|
|
return nil
|
|
}
|
|
func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
|
|
g.Lock()
|
|
defer g.Unlock()
|
|
m := g.getGroupMembers(filerGroup, false)
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
if m.removeMember(address) {
|
|
return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
|
|
}
|
|
return nil
|
|
}
|
|
func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []*ClusterNode) {
|
|
g.Lock()
|
|
defer g.Unlock()
|
|
m := g.getGroupMembers(filerGroup, false)
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
for _, node := range m.members {
|
|
nodes = append(nodes, node)
|
|
}
|
|
return
|
|
}
|
|
|
|
func NewCluster() *Cluster {
|
|
return &Cluster{
|
|
filerGroups: newClusterNodeGroups(),
|
|
brokerGroups: newClusterNodeGroups(),
|
|
s3Groups: newClusterNodeGroups(),
|
|
}
|
|
}
|
|
|
|
func (cluster *Cluster) getGroupMembers(filerGroup FilerGroupName, nodeType string, createIfNotFound bool) *GroupMembers {
|
|
switch nodeType {
|
|
case FilerType:
|
|
return cluster.filerGroups.getGroupMembers(filerGroup, createIfNotFound)
|
|
case BrokerType:
|
|
return cluster.brokerGroups.getGroupMembers(filerGroup, createIfNotFound)
|
|
case S3Type:
|
|
return cluster.s3Groups.getGroupMembers(filerGroup, createIfNotFound)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
|
|
filerGroup := FilerGroupName(ns)
|
|
switch nodeType {
|
|
case FilerType:
|
|
return cluster.filerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
|
|
case BrokerType:
|
|
return cluster.brokerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
|
|
case S3Type:
|
|
return cluster.s3Groups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
|
|
case MasterType:
|
|
return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
|
|
filerGroup := FilerGroupName(ns)
|
|
switch nodeType {
|
|
case FilerType:
|
|
return cluster.filerGroups.RemoveClusterNode(filerGroup, nodeType, address)
|
|
case BrokerType:
|
|
return cluster.brokerGroups.RemoveClusterNode(filerGroup, nodeType, address)
|
|
case S3Type:
|
|
return cluster.s3Groups.RemoveClusterNode(filerGroup, nodeType, address)
|
|
case MasterType:
|
|
return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType string) (nodes []*ClusterNode) {
|
|
switch nodeType {
|
|
case FilerType:
|
|
return cluster.filerGroups.ListClusterNode(filerGroup)
|
|
case BrokerType:
|
|
return cluster.brokerGroups.ListClusterNode(filerGroup)
|
|
case S3Type:
|
|
return cluster.s3Groups.ListClusterNode(filerGroup)
|
|
case MasterType:
|
|
}
|
|
return
|
|
}
|
|
|
|
func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
|
|
result = append(result, &master_pb.KeepConnectedResponse{
|
|
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
|
|
FilerGroup: string(filerGroup),
|
|
NodeType: nodeType,
|
|
Address: string(address),
|
|
IsAdd: isAdd,
|
|
},
|
|
})
|
|
return
|
|
}
|