s3api: fix static IAM policy enforcement after reload (#8532)
* s3api: honor attached IAM policies over legacy actions * s3api: hydrate IAM policy docs during config reload * s3api: use policy-aware auth when listing buckets * credential: propagate context through filer_etc policy reads * credential: make legacy policy deletes durable * s3api: exercise managed policy runtime loader * s3api: allow static IAM users without session tokens * iam: deny unmatched attached policies under default allow * iam: load embedded policy files from filer store * s3api: require session tokens for IAM presigning * s3api: sync runtime policies into zero-config IAM * credential: respect context in policy file loads * credential: serialize legacy policy deletes * iam: align filer policy store naming * s3api: use authenticated principals for presigning * iam: deep copy policy conditions * s3api: require request creation in policy tests * filer: keep ReadInsideFiler as the context-aware API * iam: harden filer policy store writes * credential: strengthen legacy policy serialization test * credential: forward runtime policy loaders through wrapper * s3api: harden runtime policy merging * iam: require typed already-exists errors
This commit is contained in:
@@ -24,7 +24,7 @@ func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3Ap
|
||||
s3cfg := &iam_pb.S3ApiConfiguration{}
|
||||
|
||||
// 1. Load from legacy single file (low priority)
|
||||
content, foundLegacy, err := store.readInsideFiler(filer.IamConfigDirectory, IamLegacyIdentityFile)
|
||||
content, foundLegacy, err := store.readInsideFiler(ctx, filer.IamConfigDirectory, IamLegacyIdentityFile)
|
||||
if err != nil {
|
||||
return s3cfg, err
|
||||
}
|
||||
@@ -93,7 +93,7 @@ func (store *FilerEtcStore) loadFromMultiFile(ctx context.Context, s3cfg *iam_pb
|
||||
if len(entry.Content) > 0 {
|
||||
content = entry.Content
|
||||
} else {
|
||||
c, err := filer.ReadInsideFiler(client, dir, entry.Name)
|
||||
c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to read identity file %s: %v", entry.Name, err)
|
||||
continue
|
||||
@@ -249,7 +249,7 @@ func (store *FilerEtcStore) CreateUser(ctx context.Context, identity *iam_pb.Ide
|
||||
func (store *FilerEtcStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
|
||||
var identity *iam_pb.Identity
|
||||
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := filer.ReadInsideFiler(client, filer.IamConfigDirectory+"/"+IamIdentitiesDirectory, username+".json")
|
||||
data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamIdentitiesDirectory, username+".json")
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return credential.ErrUserNotFound
|
||||
@@ -350,7 +350,7 @@ func (store *FilerEtcStore) GetUserByAccessKey(ctx context.Context, accessKey st
|
||||
if len(entry.Content) > 0 {
|
||||
content = entry.Content
|
||||
} else {
|
||||
c, err := filer.ReadInsideFiler(client, dir, entry.Name)
|
||||
c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -435,11 +435,11 @@ func (store *FilerEtcStore) saveIdentity(ctx context.Context, identity *iam_pb.I
|
||||
})
|
||||
}
|
||||
|
||||
func (store *FilerEtcStore) readInsideFiler(dir string, name string) ([]byte, bool, error) {
|
||||
func (store *FilerEtcStore) readInsideFiler(ctx context.Context, dir string, name string) ([]byte, bool, error) {
|
||||
var content []byte
|
||||
found := false
|
||||
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
c, err := filer.ReadInsideFiler(client, dir, name)
|
||||
c, err := filer.ReadInsideFiler(ctx, client, dir, name)
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return nil
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
|
||||
)
|
||||
|
||||
@@ -18,13 +19,113 @@ const (
|
||||
)
|
||||
|
||||
type PoliciesCollection struct {
|
||||
Policies map[string]policy_engine.PolicyDocument `json:"policies"`
|
||||
Policies map[string]policy_engine.PolicyDocument `json:"policies"`
|
||||
InlinePolicies map[string]map[string]policy_engine.PolicyDocument `json:"inlinePolicies"`
|
||||
}
|
||||
|
||||
func validatePolicyName(name string) error {
|
||||
return credential.ValidatePolicyName(name)
|
||||
}
|
||||
|
||||
func newPoliciesCollection() *PoliciesCollection {
|
||||
return &PoliciesCollection{
|
||||
Policies: make(map[string]policy_engine.PolicyDocument),
|
||||
InlinePolicies: make(map[string]map[string]policy_engine.PolicyDocument),
|
||||
}
|
||||
}
|
||||
|
||||
func (store *FilerEtcStore) loadLegacyPoliciesCollection(ctx context.Context) (*PoliciesCollection, bool, error) {
|
||||
policiesCollection := newPoliciesCollection()
|
||||
|
||||
content, foundLegacy, err := store.readInsideFiler(ctx, filer.IamConfigDirectory, filer.IamPoliciesFile)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if !foundLegacy || len(content) == 0 {
|
||||
return policiesCollection, foundLegacy, nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(content, policiesCollection); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if policiesCollection.Policies == nil {
|
||||
policiesCollection.Policies = make(map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
if policiesCollection.InlinePolicies == nil {
|
||||
policiesCollection.InlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument)
|
||||
}
|
||||
|
||||
return policiesCollection, true, nil
|
||||
}
|
||||
|
||||
func (store *FilerEtcStore) saveLegacyPoliciesCollection(ctx context.Context, policiesCollection *PoliciesCollection) error {
|
||||
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
content, err := json.MarshalIndent(policiesCollection, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, content)
|
||||
})
|
||||
}
|
||||
|
||||
func policyDocumentToPbPolicy(name string, policy policy_engine.PolicyDocument) (*iam_pb.Policy, error) {
|
||||
content, err := json.Marshal(policy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &iam_pb.Policy{Name: name, Content: string(content)}, nil
|
||||
}
|
||||
|
||||
// LoadManagedPolicies loads managed policies for the S3 runtime without
|
||||
// triggering legacy-to-multifile migration. This lets the runtime hydrate
|
||||
// policies while preserving any legacy inline policy data stored alongside
|
||||
// managed policies.
|
||||
func (store *FilerEtcStore) LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) {
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
policies := make(map[string]policy_engine.PolicyDocument, len(policiesCollection.Policies))
|
||||
for name, policy := range policiesCollection.Policies {
|
||||
policies[name] = policy
|
||||
}
|
||||
|
||||
if err := store.loadPoliciesFromMultiFile(ctx, policies); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
managedPolicies := make([]*iam_pb.Policy, 0, len(policies))
|
||||
for name, policy := range policies {
|
||||
pbPolicy, err := policyDocumentToPbPolicy(name, policy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
managedPolicies = append(managedPolicies, pbPolicy)
|
||||
}
|
||||
|
||||
return managedPolicies, nil
|
||||
}
|
||||
|
||||
// LoadInlinePolicies loads legacy inline policies keyed by user name. Inline
|
||||
// policies are still stored in the legacy shared policies file.
|
||||
func (store *FilerEtcStore) LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
inlinePolicies := make(map[string]map[string]policy_engine.PolicyDocument, len(policiesCollection.InlinePolicies))
|
||||
for userName, userPolicies := range policiesCollection.InlinePolicies {
|
||||
inlinePolicies[userName] = make(map[string]policy_engine.PolicyDocument, len(userPolicies))
|
||||
for policyName, policy := range userPolicies {
|
||||
inlinePolicies[userName][policyName] = policy
|
||||
}
|
||||
}
|
||||
|
||||
return inlinePolicies, nil
|
||||
}
|
||||
|
||||
// GetPolicies retrieves all IAM policies from the filer
|
||||
func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_engine.PolicyDocument, error) {
|
||||
policies := make(map[string]policy_engine.PolicyDocument)
|
||||
@@ -43,23 +144,12 @@ func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_
|
||||
filer.IamConfigDirectory, filer.IamPoliciesFile)
|
||||
|
||||
// 1. Load from legacy single file (low priority)
|
||||
content, foundLegacy, err := store.readInsideFiler(filer.IamConfigDirectory, filer.IamPoliciesFile)
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if foundLegacy && len(content) > 0 {
|
||||
policiesCollection := &PoliciesCollection{
|
||||
Policies: make(map[string]policy_engine.PolicyDocument),
|
||||
}
|
||||
if err := json.Unmarshal(content, policiesCollection); err != nil {
|
||||
glog.Errorf("Failed to parse legacy IAM policies from %s/%s: %v",
|
||||
filer.IamConfigDirectory, filer.IamPoliciesFile, err)
|
||||
} else {
|
||||
for name, policy := range policiesCollection.Policies {
|
||||
policies[name] = policy
|
||||
}
|
||||
}
|
||||
for name, policy := range policiesCollection.Policies {
|
||||
policies[name] = policy
|
||||
}
|
||||
|
||||
// 2. Load from multi-file structure (high priority, overrides legacy)
|
||||
@@ -67,14 +157,6 @@ func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 3. Perform migration if we loaded legacy config
|
||||
if foundLegacy {
|
||||
if err := store.migratePoliciesToMultiFile(ctx, policies); err != nil {
|
||||
glog.Errorf("Failed to migrate IAM policies to multi-file layout: %v", err)
|
||||
return policies, err
|
||||
}
|
||||
}
|
||||
|
||||
return policies, nil
|
||||
}
|
||||
|
||||
@@ -98,7 +180,7 @@ func (store *FilerEtcStore) loadPoliciesFromMultiFile(ctx context.Context, polic
|
||||
if len(entry.Content) > 0 {
|
||||
content = entry.Content
|
||||
} else {
|
||||
c, err := filer.ReadInsideFiler(client, dir, entry.Name)
|
||||
c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to read policy file %s: %v", entry.Name, err)
|
||||
continue
|
||||
@@ -115,7 +197,7 @@ func (store *FilerEtcStore) loadPoliciesFromMultiFile(ctx context.Context, polic
|
||||
|
||||
// The file name is "policyName.json"
|
||||
policyName := entry.Name
|
||||
if len(policyName) > 5 && policyName[len(policyName)-5:] == ".json" {
|
||||
if strings.HasSuffix(policyName, ".json") {
|
||||
policyName = policyName[:len(policyName)-5]
|
||||
policies[policyName] = policy
|
||||
}
|
||||
@@ -184,7 +266,23 @@ func (store *FilerEtcStore) DeletePolicy(ctx context.Context, name string) error
|
||||
if err := validatePolicyName(name); err != nil {
|
||||
return err
|
||||
}
|
||||
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
store.policyMu.Lock()
|
||||
defer store.policyMu.Unlock()
|
||||
|
||||
policiesCollection, foundLegacy, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deleteLegacyPolicy := false
|
||||
if foundLegacy {
|
||||
if _, exists := policiesCollection.Policies[name]; exists {
|
||||
delete(policiesCollection.Policies, name)
|
||||
deleteLegacyPolicy = true
|
||||
}
|
||||
}
|
||||
|
||||
if err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
|
||||
Directory: filer.IamConfigDirectory + "/" + IamPoliciesDirectory,
|
||||
Name: name + ".json",
|
||||
@@ -193,7 +291,15 @@ func (store *FilerEtcStore) DeletePolicy(ctx context.Context, name string) error
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if deleteLegacyPolicy {
|
||||
return store.saveLegacyPoliciesCollection(ctx, policiesCollection)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPolicy retrieves a specific IAM policy by name from the filer
|
||||
@@ -204,7 +310,7 @@ func (store *FilerEtcStore) GetPolicy(ctx context.Context, name string) (*policy
|
||||
|
||||
var policy *policy_engine.PolicyDocument
|
||||
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := filer.ReadInsideFiler(client, filer.IamConfigDirectory+"/"+IamPoliciesDirectory, name+".json")
|
||||
data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamPoliciesDirectory, name+".json")
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return nil
|
||||
@@ -239,6 +345,7 @@ func (store *FilerEtcStore) GetPolicy(ctx context.Context, name string) (*policy
|
||||
// ListPolicyNames returns all managed policy names stored in the filer.
|
||||
func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, error) {
|
||||
names := make([]string, 0)
|
||||
seenNames := make(map[string]struct{})
|
||||
|
||||
store.mu.RLock()
|
||||
configured := store.filerAddressFunc != nil
|
||||
@@ -248,7 +355,19 @@ func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, erro
|
||||
return names, nil
|
||||
}
|
||||
|
||||
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for name := range policiesCollection.Policies {
|
||||
if _, found := seenNames[name]; found {
|
||||
continue
|
||||
}
|
||||
names = append(names, name)
|
||||
seenNames[name] = struct{}{}
|
||||
}
|
||||
|
||||
err = store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
dir := filer.IamConfigDirectory + "/" + IamPoliciesDirectory
|
||||
entries, err := listEntries(ctx, client, dir)
|
||||
if err != nil {
|
||||
@@ -266,7 +385,11 @@ func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, erro
|
||||
if strings.HasSuffix(name, ".json") {
|
||||
name = name[:len(name)-5]
|
||||
}
|
||||
if _, found := seenNames[name]; found {
|
||||
continue
|
||||
}
|
||||
names = append(names, name)
|
||||
seenNames[name] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
369
weed/credential/filer_etc/filer_etc_policy_test.go
Normal file
369
weed/credential/filer_etc/filer_etc_policy_test.go
Normal file
@@ -0,0 +1,369 @@
|
||||
package filer_etc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type policyTestFilerServer struct {
|
||||
filer_pb.UnimplementedSeaweedFilerServer
|
||||
mu sync.RWMutex
|
||||
entries map[string]*filer_pb.Entry
|
||||
contentlessListEntry map[string]struct{}
|
||||
beforeLookup func(context.Context, string, string) error
|
||||
afterListEntry func(string, string)
|
||||
beforeDelete func(string, string) error
|
||||
beforeUpdate func(string, string) error
|
||||
}
|
||||
|
||||
func newPolicyTestFilerServer() *policyTestFilerServer {
|
||||
return &policyTestFilerServer{
|
||||
entries: make(map[string]*filer_pb.Entry),
|
||||
contentlessListEntry: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *policyTestFilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
|
||||
s.mu.RLock()
|
||||
beforeLookup := s.beforeLookup
|
||||
s.mu.RUnlock()
|
||||
if beforeLookup != nil {
|
||||
if err := beforeLookup(ctx, req.Directory, req.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
entry, found := s.entries[filerEntryKey(req.Directory, req.Name)]
|
||||
if !found {
|
||||
return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error())
|
||||
}
|
||||
|
||||
return &filer_pb.LookupDirectoryEntryResponse{Entry: cloneEntry(entry)}, nil
|
||||
}
|
||||
|
||||
func (s *policyTestFilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream grpc.ServerStreamingServer[filer_pb.ListEntriesResponse]) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
names := make([]string, 0)
|
||||
for key := range s.entries {
|
||||
dir, name := splitFilerEntryKey(key)
|
||||
if dir != req.Directory {
|
||||
continue
|
||||
}
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
|
||||
for _, name := range names {
|
||||
entry := cloneEntry(s.entries[filerEntryKey(req.Directory, name)])
|
||||
if _, found := s.contentlessListEntry[filerEntryKey(req.Directory, name)]; found {
|
||||
entry.Content = nil
|
||||
}
|
||||
if err := stream.Send(&filer_pb.ListEntriesResponse{Entry: entry}); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.afterListEntry != nil {
|
||||
s.afterListEntry(req.Directory, name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *policyTestFilerServer) CreateEntry(_ context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.entries[filerEntryKey(req.Directory, req.Entry.Name)] = cloneEntry(req.Entry)
|
||||
return &filer_pb.CreateEntryResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *policyTestFilerServer) UpdateEntry(_ context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
|
||||
s.mu.RLock()
|
||||
beforeUpdate := s.beforeUpdate
|
||||
s.mu.RUnlock()
|
||||
if beforeUpdate != nil {
|
||||
if err := beforeUpdate(req.Directory, req.Entry.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.entries[filerEntryKey(req.Directory, req.Entry.Name)] = cloneEntry(req.Entry)
|
||||
return &filer_pb.UpdateEntryResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *policyTestFilerServer) DeleteEntry(_ context.Context, req *filer_pb.DeleteEntryRequest) (*filer_pb.DeleteEntryResponse, error) {
|
||||
s.mu.RLock()
|
||||
beforeDelete := s.beforeDelete
|
||||
s.mu.RUnlock()
|
||||
if beforeDelete != nil {
|
||||
if err := beforeDelete(req.Directory, req.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
key := filerEntryKey(req.Directory, req.Name)
|
||||
if _, found := s.entries[key]; !found {
|
||||
return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error())
|
||||
}
|
||||
|
||||
delete(s.entries, key)
|
||||
return &filer_pb.DeleteEntryResponse{}, nil
|
||||
}
|
||||
|
||||
func newPolicyTestStore(t *testing.T) *FilerEtcStore {
|
||||
store, _ := newPolicyTestStoreWithServer(t)
|
||||
return store
|
||||
}
|
||||
|
||||
func newPolicyTestStoreWithServer(t *testing.T) (*FilerEtcStore, *policyTestFilerServer) {
|
||||
t.Helper()
|
||||
|
||||
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
server := newPolicyTestFilerServer()
|
||||
grpcServer := pb.NewGrpcServer()
|
||||
filer_pb.RegisterSeaweedFilerServer(grpcServer, server)
|
||||
go func() {
|
||||
_ = grpcServer.Serve(lis)
|
||||
}()
|
||||
|
||||
t.Cleanup(func() {
|
||||
grpcServer.Stop()
|
||||
_ = lis.Close()
|
||||
})
|
||||
|
||||
store := &FilerEtcStore{}
|
||||
host, portString, err := net.SplitHostPort(lis.Addr().String())
|
||||
require.NoError(t, err)
|
||||
grpcPort, err := strconv.Atoi(portString)
|
||||
require.NoError(t, err)
|
||||
store.SetFilerAddressFunc(func() pb.ServerAddress {
|
||||
return pb.NewServerAddress(host, 1, grpcPort)
|
||||
}, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
|
||||
return store, server
|
||||
}
|
||||
|
||||
func TestFilerEtcStoreListPolicyNamesIncludesLegacyPolicies(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := newPolicyTestStore(t)
|
||||
|
||||
legacyPolicies := newPoliciesCollection()
|
||||
legacyPolicies.Policies["legacy-only"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::legacy-only/*")
|
||||
legacyPolicies.Policies["shared"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::shared/*")
|
||||
require.NoError(t, store.saveLegacyPoliciesCollection(ctx, legacyPolicies))
|
||||
|
||||
require.NoError(t, store.savePolicy(ctx, "multi-file-only", testPolicyDocument("s3:PutObject", "arn:aws:s3:::multi-file-only/*")))
|
||||
require.NoError(t, store.savePolicy(ctx, "shared", testPolicyDocument("s3:DeleteObject", "arn:aws:s3:::shared/*")))
|
||||
|
||||
names, err := store.ListPolicyNames(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.ElementsMatch(t, []string{"legacy-only", "multi-file-only", "shared"}, names)
|
||||
}
|
||||
|
||||
func TestFilerEtcStoreDeletePolicyRemovesLegacyManagedCopy(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := newPolicyTestStore(t)
|
||||
|
||||
inlinePolicy := testPolicyDocument("s3:PutObject", "arn:aws:s3:::inline-user/*")
|
||||
legacyPolicies := newPoliciesCollection()
|
||||
legacyPolicies.Policies["legacy-only"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::legacy-only/*")
|
||||
legacyPolicies.InlinePolicies["inline-user"] = map[string]policy_engine.PolicyDocument{
|
||||
"PutOnly": inlinePolicy,
|
||||
}
|
||||
require.NoError(t, store.saveLegacyPoliciesCollection(ctx, legacyPolicies))
|
||||
|
||||
managedPolicies, err := store.LoadManagedPolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"legacy-only"}, managedPolicyNames(managedPolicies))
|
||||
|
||||
require.NoError(t, store.DeletePolicy(ctx, "legacy-only"))
|
||||
|
||||
managedPolicies, err = store.LoadManagedPolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, managedPolicies)
|
||||
|
||||
inlinePolicies, err := store.LoadInlinePolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
assertInlinePolicyPreserved(t, inlinePolicies, "inline-user", "PutOnly")
|
||||
|
||||
loadedLegacyPolicies, foundLegacy, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
require.NoError(t, err)
|
||||
require.True(t, foundLegacy)
|
||||
assert.Empty(t, loadedLegacyPolicies.Policies)
|
||||
assertInlinePolicyPreserved(t, loadedLegacyPolicies.InlinePolicies, "inline-user", "PutOnly")
|
||||
}
|
||||
|
||||
func TestFilerEtcStoreDeletePolicySerializesLegacyUpdates(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, server := newPolicyTestStoreWithServer(t)
|
||||
|
||||
legacyPolicies := newPoliciesCollection()
|
||||
legacyPolicies.Policies["first"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::first/*")
|
||||
legacyPolicies.Policies["second"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::second/*")
|
||||
require.NoError(t, store.saveLegacyPoliciesCollection(ctx, legacyPolicies))
|
||||
require.NoError(t, store.savePolicy(ctx, "first", testPolicyDocument("s3:GetObject", "arn:aws:s3:::first/*")))
|
||||
require.NoError(t, store.savePolicy(ctx, "second", testPolicyDocument("s3:GetObject", "arn:aws:s3:::second/*")))
|
||||
|
||||
firstSaveStarted := make(chan struct{})
|
||||
releaseFirstSave := make(chan struct{})
|
||||
secondReachedDelete := make(chan struct{}, 1)
|
||||
var blockOnce sync.Once
|
||||
|
||||
server.mu.Lock()
|
||||
server.beforeUpdate = func(dir string, name string) error {
|
||||
if dir == filer.IamConfigDirectory && name == filer.IamPoliciesFile {
|
||||
blockOnce.Do(func() {
|
||||
close(firstSaveStarted)
|
||||
<-releaseFirstSave
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
server.beforeDelete = func(dir string, name string) error {
|
||||
if dir == filer.IamConfigDirectory+"/"+IamPoliciesDirectory && name == "second.json" {
|
||||
select {
|
||||
case secondReachedDelete <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
server.mu.Unlock()
|
||||
|
||||
firstDeleteErr := make(chan error, 1)
|
||||
go func() {
|
||||
firstDeleteErr <- store.DeletePolicy(ctx, "first")
|
||||
}()
|
||||
|
||||
<-firstSaveStarted
|
||||
|
||||
secondDeleteErr := make(chan error, 1)
|
||||
go func() {
|
||||
secondDeleteErr <- store.DeletePolicy(ctx, "second")
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-secondReachedDelete:
|
||||
t.Fatal("second delete reached filer mutation while first delete was still blocked")
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
}
|
||||
|
||||
close(releaseFirstSave)
|
||||
|
||||
require.NoError(t, <-firstDeleteErr)
|
||||
require.NoError(t, <-secondDeleteErr)
|
||||
|
||||
loadedLegacyPolicies, foundLegacy, err := store.loadLegacyPoliciesCollection(ctx)
|
||||
require.NoError(t, err)
|
||||
require.True(t, foundLegacy)
|
||||
assert.Empty(t, loadedLegacyPolicies.Policies)
|
||||
}
|
||||
|
||||
func TestFilerEtcStoreLoadManagedPoliciesRespectsReadContext(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
store, server := newPolicyTestStoreWithServer(t)
|
||||
|
||||
require.NoError(t, store.savePolicy(context.Background(), "cancel-me", testPolicyDocument("s3:GetObject", "arn:aws:s3:::cancel-me/*")))
|
||||
|
||||
server.mu.Lock()
|
||||
server.contentlessListEntry[filerEntryKey(filer.IamConfigDirectory+"/"+IamPoliciesDirectory, "cancel-me.json")] = struct{}{}
|
||||
server.beforeLookup = func(ctx context.Context, dir string, name string) error {
|
||||
if dir == filer.IamConfigDirectory+"/"+IamPoliciesDirectory && name == "cancel-me.json" {
|
||||
cancel()
|
||||
return status.Error(codes.Canceled, context.Canceled.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
server.mu.Unlock()
|
||||
|
||||
managedPolicies, err := store.LoadManagedPolicies(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, managedPolicies)
|
||||
}
|
||||
|
||||
func testPolicyDocument(action string, resource string) policy_engine.PolicyDocument {
|
||||
return policy_engine.PolicyDocument{
|
||||
Version: policy_engine.PolicyVersion2012_10_17,
|
||||
Statement: []policy_engine.PolicyStatement{
|
||||
{
|
||||
Effect: policy_engine.PolicyEffectAllow,
|
||||
Action: policy_engine.NewStringOrStringSlice(action),
|
||||
Resource: policy_engine.NewStringOrStringSlice(resource),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func managedPolicyNames(policies []*iam_pb.Policy) []string {
|
||||
names := make([]string, 0, len(policies))
|
||||
for _, policy := range policies {
|
||||
names = append(names, policy.Name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
return names
|
||||
}
|
||||
|
||||
func assertInlinePolicyPreserved(t *testing.T, inlinePolicies map[string]map[string]policy_engine.PolicyDocument, userName string, policyName string) {
|
||||
t.Helper()
|
||||
|
||||
userPolicies, found := inlinePolicies[userName]
|
||||
require.True(t, found)
|
||||
|
||||
policy, found := userPolicies[policyName]
|
||||
require.True(t, found)
|
||||
assert.Equal(t, policy_engine.PolicyVersion2012_10_17, policy.Version)
|
||||
require.Len(t, policy.Statement, 1)
|
||||
assert.Equal(t, policy_engine.PolicyEffectAllow, policy.Statement[0].Effect)
|
||||
}
|
||||
|
||||
func cloneEntry(entry *filer_pb.Entry) *filer_pb.Entry {
|
||||
if entry == nil {
|
||||
return nil
|
||||
}
|
||||
return proto.Clone(entry).(*filer_pb.Entry)
|
||||
}
|
||||
|
||||
func filerEntryKey(dir string, name string) string {
|
||||
return dir + "\x00" + name
|
||||
}
|
||||
|
||||
func splitFilerEntryKey(key string) (dir string, name string) {
|
||||
for idx := 0; idx < len(key); idx++ {
|
||||
if key[idx] == '\x00' {
|
||||
return key[:idx], key[idx+1:]
|
||||
}
|
||||
}
|
||||
return key, ""
|
||||
}
|
||||
@@ -38,7 +38,7 @@ func (store *FilerEtcStore) loadServiceAccountsFromMultiFile(ctx context.Context
|
||||
if len(entry.Content) > 0 {
|
||||
content = entry.Content
|
||||
} else {
|
||||
c, err := filer.ReadInsideFiler(client, dir, entry.Name)
|
||||
c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to read service account file %s: %v", entry.Name, err)
|
||||
continue
|
||||
@@ -133,7 +133,7 @@ func (store *FilerEtcStore) GetServiceAccount(ctx context.Context, id string) (*
|
||||
}
|
||||
var sa *iam_pb.ServiceAccount
|
||||
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := filer.ReadInsideFiler(client, filer.IamConfigDirectory+"/"+IamServiceAccountsDirectory, id+".json")
|
||||
data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamServiceAccountsDirectory, id+".json")
|
||||
if err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return credential.ErrServiceAccountNotFound
|
||||
@@ -170,7 +170,7 @@ func (store *FilerEtcStore) ListServiceAccounts(ctx context.Context) ([]*iam_pb.
|
||||
if len(entry.Content) > 0 {
|
||||
content = entry.Content
|
||||
} else {
|
||||
c, err := filer.ReadInsideFiler(client, dir, entry.Name)
|
||||
c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to read service account file %s: %v", entry.Name, err)
|
||||
continue
|
||||
|
||||
@@ -20,6 +20,7 @@ type FilerEtcStore struct {
|
||||
filerAddressFunc func() pb.ServerAddress // Function to get current active filer
|
||||
grpcDialOption grpc.DialOption
|
||||
mu sync.RWMutex // Protects filerAddressFunc and grpcDialOption
|
||||
policyMu sync.Mutex // Serializes legacy managed-policy mutations
|
||||
}
|
||||
|
||||
func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName {
|
||||
|
||||
@@ -20,6 +20,14 @@ import (
|
||||
var _ CredentialStore = &PropagatingCredentialStore{}
|
||||
var _ PolicyManager = &PropagatingCredentialStore{}
|
||||
|
||||
type propagatingManagedPolicyLoader interface {
|
||||
LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error)
|
||||
}
|
||||
|
||||
type propagatingInlinePolicyLoader interface {
|
||||
LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error)
|
||||
}
|
||||
|
||||
type PropagatingCredentialStore struct {
|
||||
CredentialStore
|
||||
masterClient *wdclient.MasterClient
|
||||
@@ -240,6 +248,38 @@ func (s *PropagatingCredentialStore) ListPolicyNames(ctx context.Context) ([]str
|
||||
return s.CredentialStore.ListPolicyNames(ctx)
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) {
|
||||
if loader, ok := s.CredentialStore.(propagatingManagedPolicyLoader); ok {
|
||||
return loader.LoadManagedPolicies(ctx)
|
||||
}
|
||||
|
||||
policies, err := s.CredentialStore.GetPolicies(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
managedPolicies := make([]*iam_pb.Policy, 0, len(policies))
|
||||
for name, policyDocument := range policies {
|
||||
content, err := json.Marshal(policyDocument)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
managedPolicies = append(managedPolicies, &iam_pb.Policy{
|
||||
Name: name,
|
||||
Content: string(content),
|
||||
})
|
||||
}
|
||||
|
||||
return managedPolicies, nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) {
|
||||
if loader, ok := s.CredentialStore.(propagatingInlinePolicyLoader); ok {
|
||||
return loader.LoadInlinePolicies(ctx)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *PropagatingCredentialStore) CreatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error {
|
||||
if pm, ok := s.CredentialStore.(PolicyManager); ok {
|
||||
if err := pm.CreatePolicy(ctx, name, document); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user