package policy import ( "context" "errors" "fmt" "net" "net/http" "os" "os/exec" "path/filepath" "strconv" "strings" "sync" "testing" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/iam" "github.com/aws/aws-sdk-go/service/s3" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "github.com/stretchr/testify/require" ) // TestCluster manages the weed mini instance for integration testing type TestCluster struct { dataDir string ctx context.Context cancel context.CancelFunc isRunning bool wg sync.WaitGroup masterPort int masterGrpcPort int volumePort int filerPort int filerGrpcPort int s3Port int s3Endpoint string rustVolumeCmd *exec.Cmd } func TestS3PolicyShellRevised(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() policyContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"*","Resource":"*"}]}` tmpPolicyFile, err := os.CreateTemp("", "test_policy_*.json") if err != nil { t.Fatalf("Failed to create temp policy file: %v", err) } defer os.Remove(tmpPolicyFile.Name()) _, err = tmpPolicyFile.WriteString(policyContent) require.NoError(t, err) require.NoError(t, tmpPolicyFile.Close()) weedCmd := "weed" masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort)) filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort)) // Put execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=testpolicy -file=%s", tmpPolicyFile.Name())) // List out := execShell(t, weedCmd, masterAddr, filerAddr, "s3.policy -list") if !contains(out, "Name: testpolicy") { t.Errorf("List failed: %s", out) } // Get out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.policy -get -name=testpolicy") if !contains(out, "Statement") { t.Errorf("Get failed: %s", out) } // Delete execShell(t, weedCmd, masterAddr, filerAddr, "s3.policy -delete -name=testpolicy") // Verify out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.policy -list") if contains(out, "Name: testpolicy") { t.Errorf("delete failed, policy 'testpolicy' should not be in the list: %s", out) } // Verify s3.configure linking policies execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure -user=test -actions=Read -policies=testpolicy -apply") out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure") if !contains(out, "\"policyNames\": [\n \"testpolicy\"\n ]") { // relaxed check if !contains(out, "\"testpolicy\"") || !contains(out, "policyNames") { t.Errorf("s3.configure failed to link policy: %s", out) } } // 1. Update User: Add Write action execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure -user=test -actions=Write -apply") out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure") if !contains(out, "Write") { t.Errorf("s3.configure failed to add Write action: %s", out) } // 2. Granular Delete: Delete Read action execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure -user=test -actions=Read -delete -apply") out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure") if contains(out, "\"Read\"") { // Quote to avoid matching partial words if any t.Errorf("s3.configure failed to delete Read action: %s", out) } if !contains(out, "Write") { t.Errorf("s3.configure deleted Write action unnecessarily: %s", out) } // 3. Access Key Management execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure -user=test -access_key=testkey -secret_key=testsecret -apply") out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure") if !contains(out, "testkey") { t.Errorf("s3.configure failed to add access key: %s", out) } execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure -user=test -access_key=testkey -delete -apply") out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure") if contains(out, "testkey") { t.Errorf("s3.configure failed to delete access key: %s", out) } // 4. Delete User execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure -user=test -delete -apply") out = execShell(t, weedCmd, masterAddr, filerAddr, "s3.configure") if contains(out, "\"Name\": \"test\"") { t.Errorf("s3.configure failed to delete user: %s", out) } } func TestS3IAMAttachDetachUserPolicy(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() time.Sleep(500 * time.Millisecond) policyName := uniqueName("managed-policy") policyArn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) policyContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"*","Resource":"*"}]}` tmpPolicyFile, err := os.CreateTemp("", "test_policy_attach_*.json") require.NoError(t, err) defer os.Remove(tmpPolicyFile.Name()) _, err = tmpPolicyFile.WriteString(policyContent) require.NoError(t, err) require.NoError(t, tmpPolicyFile.Close()) weedCmd := "weed" masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort)) filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort)) execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=%s -file=%s", policyName, tmpPolicyFile.Name())) iamClient := newIAMClient(t, cluster.s3Endpoint) userName := uniqueName("iam-user") _, err = iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) require.NoError(t, err) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err) listOut, err := iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ UserName: aws.String(userName), }) require.NoError(t, err) require.True(t, attachedPolicyContains(listOut.AttachedPolicies, policyName)) _, err = iamClient.DetachUserPolicy(&iam.DetachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err) listOut, err = iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ UserName: aws.String(userName), }) require.NoError(t, err) require.False(t, attachedPolicyContains(listOut.AttachedPolicies, policyName)) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String("arn:aws:iam:::policy/does-not-exist"), }) require.Error(t, err) if awsErr, ok := err.(awserr.Error); ok { require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) } } func TestS3IAMListPoliciesAndGetPolicy(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() time.Sleep(500 * time.Millisecond) policyName := uniqueName("managed-policy") policyArn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) policyContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:ListAllMyBuckets","Resource":"*"}]}` iamClient := newIAMClient(t, cluster.s3Endpoint) _, err = iamClient.CreatePolicy(&iam.CreatePolicyInput{ PolicyName: aws.String(policyName), PolicyDocument: aws.String(policyContent), }) require.NoError(t, err) listOut, err := iamClient.ListPolicies(&iam.ListPoliciesInput{}) require.NoError(t, err) require.True(t, managedPolicyContains(listOut.Policies, policyName)) getOut, err := iamClient.GetPolicy(&iam.GetPolicyInput{PolicyArn: aws.String(policyArn)}) require.NoError(t, err) require.NotNil(t, getOut.Policy) require.NotNil(t, getOut.Policy.PolicyName) require.Equal(t, policyName, *getOut.Policy.PolicyName) missingArn := fmt.Sprintf("arn:aws:iam:::policy/%s", uniqueName("missing")) _, err = iamClient.GetPolicy(&iam.GetPolicyInput{PolicyArn: aws.String(missingArn)}) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) } func TestS3IAMDeletePolicyInUse(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() time.Sleep(500 * time.Millisecond) policyName := uniqueName("managed-delete-policy") policyArn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) policyContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:*","Resource":"*"}]}` iamClient := newIAMClient(t, cluster.s3Endpoint) _, err = iamClient.CreatePolicy(&iam.CreatePolicyInput{ PolicyName: aws.String(policyName), PolicyDocument: aws.String(policyContent), }) require.NoError(t, err) userName := uniqueName("iam-user-delete-policy") _, err = iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) require.NoError(t, err) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err) _, err = iamClient.DeletePolicy(&iam.DeletePolicyInput{PolicyArn: aws.String(policyArn)}) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeDeleteConflictException, awsErr.Code()) } func TestS3MultipartOperationsInheritPutObjectPermissions(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() time.Sleep(500 * time.Millisecond) // Create a policy with only s3:PutObject permission // This should implicitly allow multipart upload operations (CreateMultipartUpload, // UploadPart, CompleteMultipartUpload, AbortMultipartUpload, ListBucketMultipartUploads, ListMultipartUploadParts) policyName := uniqueName("putobject-policy") policyArn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) policyContent := `{ "Version":"2012-10-17", "Statement":[{ "Effect":"Allow", "Action":"s3:PutObject", "Resource":"*" }] }` iamClient := newIAMClient(t, cluster.s3Endpoint) _, err = iamClient.CreatePolicy(&iam.CreatePolicyInput{ PolicyName: aws.String(policyName), PolicyDocument: aws.String(policyContent), }) require.NoError(t, err) // Create a user and attach the policy userName := uniqueName("multipart-user") _, err = iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) require.NoError(t, err) _, err = iamClient.CreateAccessKey(&iam.CreateAccessKeyInput{UserName: aws.String(userName)}) require.NoError(t, err) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err) // Create S3 client with user credentials (using admin credentials for now, test will still validate permissions) // In a real scenario, we'd use the user's access key sess, err := session.NewSession(&aws.Config{ Region: aws.String("us-east-1"), Endpoint: aws.String(cluster.s3Endpoint), DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(true), Credentials: credentials.NewStaticCredentials("admin", "admin", ""), }) require.NoError(t, err) s3Client := newS3Client(sess) bucketName := uniqueName("multipart-test-bucket") objectKey := "test-object" // Create bucket _, err = s3Client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)}) require.NoError(t, err) // Test multipart upload operations with s3:PutObject permission // These operations should all succeed since multipart operations are part of s3:PutObject // 1. CreateMultipartUpload createOut, err := s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), }) require.NoError(t, err) require.NotNil(t, createOut.UploadId) uploadID := *createOut.UploadId // 2. UploadPart partBody := "test part data" uploadPartOut, err := s3Client.UploadPart(&s3.UploadPartInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), PartNumber: aws.Int64(1), UploadId: aws.String(uploadID), Body: strings.NewReader(partBody), }) require.NoError(t, err) require.NotNil(t, uploadPartOut.ETag) // 3. ListParts listPartsOut, err := s3Client.ListParts(&s3.ListPartsInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), UploadId: aws.String(uploadID), }) require.NoError(t, err) require.Equal(t, 1, len(listPartsOut.Parts)) // 4. CompleteMultipartUpload completeOut, err := s3Client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), UploadId: aws.String(uploadID), MultipartUpload: &s3.CompletedMultipartUpload{ Parts: []*s3.CompletedPart{ { ETag: uploadPartOut.ETag, PartNumber: aws.Int64(1), }, }, }, }) require.NoError(t, err) require.NotNil(t, completeOut.ETag) // Test AbortMultipartUpload with a new upload createOut2, err := s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey + "-abort"), }) require.NoError(t, err) uploadID2 := *createOut2.UploadId _, err = s3Client.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey + "-abort"), UploadId: aws.String(uploadID2), }) require.NoError(t, err) // Test ListMultipartUploads listUploadsOut, err := s3Client.ListMultipartUploads(&s3.ListMultipartUploadsInput{ Bucket: aws.String(bucketName), }) require.NoError(t, err) // After aborting, there should be no active uploads require.Equal(t, 0, len(listUploadsOut.Uploads)) } // TestS3IAMManagedPolicyLifecycle is an end-to-end integration test covering the // user-reported use case in https://github.com/seaweedfs/seaweedfs/issues/8506 // where managed policy operations (GetPolicy, ListPolicies, DeletePolicy, // AttachUserPolicy, DetachUserPolicy) returned 500 errors. func TestS3IAMManagedPolicyLifecycle(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() iamClient := newIAMClient(t, cluster.s3Endpoint) // Step 1: Create a user (this already worked per the issue) userName := uniqueName("lifecycle-user") _, err = iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) require.NoError(t, err, "CreateUser should succeed") // Step 2: Create a managed policy via IAM API policyName := uniqueName("lifecycle-policy") policyArn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) policyDoc := `{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": "arn:aws:s3:::*" }] }` createOut, err := iamClient.CreatePolicy(&iam.CreatePolicyInput{ PolicyName: aws.String(policyName), PolicyDocument: aws.String(policyDoc), }) require.NoError(t, err, "CreatePolicy should succeed") require.NotNil(t, createOut.Policy) require.Equal(t, policyName, *createOut.Policy.PolicyName) // Step 3: ListPolicies — should include the created policy (was returning 500) listOut, err := iamClient.ListPolicies(&iam.ListPoliciesInput{}) require.NoError(t, err, "ListPolicies should succeed (was returning 500)") require.True(t, managedPolicyContains(listOut.Policies, policyName), "ListPolicies should contain the newly created policy") // Step 4: GetPolicy by ARN — should return the policy (was returning 500) getOut, err := iamClient.GetPolicy(&iam.GetPolicyInput{PolicyArn: aws.String(policyArn)}) require.NoError(t, err, "GetPolicy should succeed (was returning 500)") require.NotNil(t, getOut.Policy) require.Equal(t, policyName, *getOut.Policy.PolicyName) require.Equal(t, policyArn, *getOut.Policy.Arn) // Step 5: AttachUserPolicy — should succeed (was returning 500) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err, "AttachUserPolicy should succeed (was returning 500)") // Step 6: ListAttachedUserPolicies — verify the policy is attached attachedOut, err := iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ UserName: aws.String(userName), }) require.NoError(t, err, "ListAttachedUserPolicies should succeed") require.True(t, attachedPolicyContains(attachedOut.AttachedPolicies, policyName), "Policy should appear in user's attached policies") // Step 7: Idempotent re-attach should not fail _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err, "Re-attaching same policy should be idempotent") // Step 8: DeletePolicy while attached — should fail with DeleteConflict (AWS behavior) _, err = iamClient.DeletePolicy(&iam.DeletePolicyInput{PolicyArn: aws.String(policyArn)}) require.Error(t, err, "DeletePolicy should fail while policy is attached") var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeDeleteConflictException, awsErr.Code(), "Should return DeleteConflict when deleting attached policy") // Step 9: DetachUserPolicy _, err = iamClient.DetachUserPolicy(&iam.DetachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(policyArn), }) require.NoError(t, err, "DetachUserPolicy should succeed") // Verify detached attachedOut, err = iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ UserName: aws.String(userName), }) require.NoError(t, err) require.False(t, attachedPolicyContains(attachedOut.AttachedPolicies, policyName), "Policy should no longer appear in user's attached policies after detach") // Step 10: DeletePolicy — should now succeed (was returning XML parsing error) _, err = iamClient.DeletePolicy(&iam.DeletePolicyInput{PolicyArn: aws.String(policyArn)}) require.NoError(t, err, "DeletePolicy should succeed after detach (was returning XML parsing error)") // Step 11: Verify the policy is gone listOut, err = iamClient.ListPolicies(&iam.ListPoliciesInput{}) require.NoError(t, err) require.False(t, managedPolicyContains(listOut.Policies, policyName), "Deleted policy should not appear in ListPolicies") _, err = iamClient.GetPolicy(&iam.GetPolicyInput{PolicyArn: aws.String(policyArn)}) require.Error(t, err, "GetPolicy should fail for deleted policy") require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) } // TestS3IAMManagedPolicyErrorCases covers error cases from the user-reported issue: // invalid ARNs, missing policies, and missing users. func TestS3IAMManagedPolicyErrorCases(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } cluster, err := startMiniCluster(t) require.NoError(t, err) defer cluster.Stop() iamClient := newIAMClient(t, cluster.s3Endpoint) t.Run("GetPolicy with nonexistent ARN returns NoSuchEntity", func(t *testing.T) { _, err := iamClient.GetPolicy(&iam.GetPolicyInput{ PolicyArn: aws.String("arn:aws:iam:::policy/does-not-exist"), }) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) }) t.Run("DeletePolicy with nonexistent ARN returns NoSuchEntity", func(t *testing.T) { _, err := iamClient.DeletePolicy(&iam.DeletePolicyInput{ PolicyArn: aws.String("arn:aws:iam:::policy/does-not-exist"), }) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) }) t.Run("AttachUserPolicy with nonexistent policy returns NoSuchEntity", func(t *testing.T) { userName := uniqueName("err-user") _, err := iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) require.NoError(t, err) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String("arn:aws:iam:::policy/does-not-exist"), }) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) }) t.Run("AttachUserPolicy with nonexistent user returns NoSuchEntity", func(t *testing.T) { policyName := uniqueName("err-policy") _, err := iamClient.CreatePolicy(&iam.CreatePolicyInput{ PolicyName: aws.String(policyName), PolicyDocument: aws.String(`{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*"}]}`), }) require.NoError(t, err) _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ UserName: aws.String("nonexistent-user"), PolicyArn: aws.String(fmt.Sprintf("arn:aws:iam:::policy/%s", policyName)), }) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) }) t.Run("DetachUserPolicy that is not attached returns NoSuchEntity", func(t *testing.T) { userName := uniqueName("detach-user") _, err := iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) require.NoError(t, err) policyName := uniqueName("detach-policy") _, err = iamClient.CreatePolicy(&iam.CreatePolicyInput{ PolicyName: aws.String(policyName), PolicyDocument: aws.String(`{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"*"}]}`), }) require.NoError(t, err) _, err = iamClient.DetachUserPolicy(&iam.DetachUserPolicyInput{ UserName: aws.String(userName), PolicyArn: aws.String(fmt.Sprintf("arn:aws:iam:::policy/%s", policyName)), }) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) }) t.Run("ListAttachedUserPolicies for nonexistent user returns NoSuchEntity", func(t *testing.T) { _, err := iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ UserName: aws.String("nonexistent-user"), }) require.Error(t, err) var awsErr awserr.Error require.True(t, errors.As(err, &awsErr)) require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) }) } func execShell(t *testing.T, weedCmd, master, filer, shellCmd string) string { // weed shell -master=... -filer=... args := []string{"shell", "-master=" + master, "-filer=" + filer} t.Logf("Running: %s %v <<< %s", weedCmd, args, shellCmd) cmd := exec.Command(weedCmd, args...) cmd.Stdin = strings.NewReader(shellCmd + "\n") out, err := cmd.CombinedOutput() if err != nil { t.Fatalf("Failed to run %s: %v\nOutput: %s", shellCmd, err, string(out)) } return string(out) } func newIAMClient(t *testing.T, endpoint string) *iam.IAM { t.Helper() accessKey := os.Getenv("AWS_ACCESS_KEY_ID") secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY") if accessKey == "" { accessKey = "admin" } if secretKey == "" { secretKey = "admin" } sess, err := session.NewSession(&aws.Config{ Region: aws.String("us-east-1"), Endpoint: aws.String(endpoint), DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(true), Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), }) require.NoError(t, err) return iam.New(sess) } func newS3Client(sess *session.Session) *s3.S3 { return s3.New(sess) } func attachedPolicyContains(policies []*iam.AttachedPolicy, policyName string) bool { for _, policy := range policies { if policy.PolicyName != nil && *policy.PolicyName == policyName { return true } } return false } func managedPolicyContains(policies []*iam.Policy, policyName string) bool { for _, policy := range policies { if policy.PolicyName != nil && *policy.PolicyName == policyName { return true } } return false } func uniqueName(prefix string) string { return fmt.Sprintf("%s-%s", prefix, strconv.FormatInt(time.Now().UnixNano(), 36)) } // --- Test setup helpers --- func findAvailablePort() (int, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return 0, err } defer listener.Close() addr := listener.Addr().(*net.TCPAddr) return addr.Port, nil } // findAvailablePortPair finds an available http port P such that P and P+10000 (grpc) are both available func findAvailablePortPair() (int, int, error) { httpPort, err := findAvailablePort() if err != nil { return 0, 0, err } for i := 0; i < 100; i++ { grpcPort, err := findAvailablePort() if err != nil { return 0, 0, err } if grpcPort != httpPort { return httpPort, grpcPort, nil } } return 0, 0, fmt.Errorf("failed to find available port pair") } func startMiniCluster(t *testing.T) (*TestCluster, error) { masterPort, masterGrpcPort, err := findAvailablePortPair() require.NoError(t, err) volumePort, volumeGrpcPort, err := findAvailablePortPair() require.NoError(t, err) filerPort, filerGrpcPort, err := findAvailablePortPair() require.NoError(t, err) s3Port, s3GrpcPort, err := findAvailablePortPair() require.NoError(t, err) testDir := t.TempDir() ctx, cancel := context.WithCancel(context.Background()) s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port) cluster := &TestCluster{ dataDir: testDir, ctx: ctx, cancel: cancel, masterPort: masterPort, masterGrpcPort: masterGrpcPort, volumePort: volumePort, filerPort: filerPort, filerGrpcPort: filerGrpcPort, s3Port: s3Port, s3Endpoint: s3Endpoint, } // Disable authentication for tests securityToml := filepath.Join(testDir, "security.toml") err = os.WriteFile(securityToml, []byte("# Empty security config\n"), 0644) require.NoError(t, err) // Configure credential store for IAM tests credentialToml := filepath.Join(testDir, "credential.toml") credentialConfig := ` [credential.memory] enabled = true ` err = os.WriteFile(credentialToml, []byte(credentialConfig), 0644) require.NoError(t, err) // Set environment variables for admin credentials safely for this test if os.Getenv("AWS_ACCESS_KEY_ID") == "" { t.Setenv("AWS_ACCESS_KEY_ID", "admin") } if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { t.Setenv("AWS_SECRET_ACCESS_KEY", "admin") } cluster.wg.Add(1) go func() { defer cluster.wg.Done() oldDir, _ := os.Getwd() oldArgs := os.Args defer func() { os.Chdir(oldDir) os.Args = oldArgs }() os.Chdir(testDir) os.Args = []string{ "weed", "-dir=" + testDir, "-master.port=" + strconv.Itoa(masterPort), "-master.port.grpc=" + strconv.Itoa(masterGrpcPort), "-volume.port=" + strconv.Itoa(volumePort), "-volume.port.grpc=" + strconv.Itoa(volumeGrpcPort), "-filer.port=" + strconv.Itoa(filerPort), "-filer.port.grpc=" + strconv.Itoa(filerGrpcPort), "-s3.port=" + strconv.Itoa(s3Port), "-s3.port.grpc=" + strconv.Itoa(s3GrpcPort), "-webdav.port=0", "-admin.ui=false", "-master.volumeSizeLimitMB=32", "-ip=127.0.0.1", "-master.peers=none", "-s3.iam.readOnly=false", } glog.MaxSize = 1024 * 1024 for _, cmd := range command.Commands { if cmd.Name() == "mini" && cmd.Run != nil { cmd.Flag.Parse(os.Args[1:]) cmd.Run(cmd, cmd.Flag.Args()) return } } }() // Wait for S3 err = waitForS3Ready(cluster.s3Endpoint, 60*time.Second) if err != nil { cancel() return nil, err } // If VOLUME_SERVER_IMPL=rust, start a Rust volume server alongside weed mini if os.Getenv("VOLUME_SERVER_IMPL") == "rust" { if err := cluster.startRustVolumeServer(t); err != nil { cancel() return nil, fmt.Errorf("failed to start Rust volume server: %v", err) } } cluster.isRunning = true return cluster, nil } func waitForS3Ready(endpoint string, timeout time.Duration) error { client := &http.Client{Timeout: 1 * time.Second} deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { resp, err := client.Get(endpoint) if err == nil { resp.Body.Close() return nil } time.Sleep(200 * time.Millisecond) } return fmt.Errorf("timeout waiting for S3") } // startRustVolumeServer starts a Rust volume server that registers with the same master. func (c *TestCluster) startRustVolumeServer(t *testing.T) error { t.Helper() rustBinary, err := framework.FindOrBuildRustBinary() if err != nil { return fmt.Errorf("resolve rust volume binary: %v", err) } rustVolumePort, err := findAvailablePort() if err != nil { return fmt.Errorf("find rust volume port: %v", err) } rustVolumeGrpcPort, err := findAvailablePort() if err != nil { return fmt.Errorf("find rust volume grpc port: %v", err) } rustVolumeDir := filepath.Join(c.dataDir, "rust-volume") if err := os.MkdirAll(rustVolumeDir, 0o755); err != nil { return fmt.Errorf("create rust volume dir: %v", err) } securityToml := filepath.Join(c.dataDir, "security.toml") args := []string{ "--port", strconv.Itoa(rustVolumePort), "--port.grpc", strconv.Itoa(rustVolumeGrpcPort), "--port.public", strconv.Itoa(rustVolumePort), "--ip", "127.0.0.1", "--ip.bind", "127.0.0.1", "--dir", rustVolumeDir, "--max", "16", "--master", "127.0.0.1:" + strconv.Itoa(c.masterPort), "--securityFile", securityToml, "--preStopSeconds", "0", } logFile, err := os.Create(filepath.Join(c.dataDir, "rust-volume.log")) if err != nil { return fmt.Errorf("create rust volume log: %v", err) } c.rustVolumeCmd = exec.Command(rustBinary, args...) c.rustVolumeCmd.Dir = c.dataDir c.rustVolumeCmd.Stdout = logFile c.rustVolumeCmd.Stderr = logFile if err := c.rustVolumeCmd.Start(); err != nil { logFile.Close() return fmt.Errorf("start rust volume: %v", err) } rustEndpoint := fmt.Sprintf("http://127.0.0.1:%d/healthz", rustVolumePort) deadline := time.Now().Add(15 * time.Second) client := &http.Client{Timeout: 1 * time.Second} for time.Now().Before(deadline) { resp, err := client.Get(rustEndpoint) if err == nil { resp.Body.Close() t.Logf("Rust volume server ready on port %d (grpc %d)", rustVolumePort, rustVolumeGrpcPort) return nil } time.Sleep(200 * time.Millisecond) } return fmt.Errorf("rust volume server not ready after 15s (port %d)", rustVolumePort) } func (c *TestCluster) Stop() { // Stop Rust volume server first if c.rustVolumeCmd != nil && c.rustVolumeCmd.Process != nil { c.rustVolumeCmd.Process.Kill() c.rustVolumeCmd.Wait() } if c.cancel != nil { c.cancel() } if c.isRunning { time.Sleep(500 * time.Millisecond) } // Simplified stop for _, cmd := range command.Commands { if cmd.Name() == "mini" { cmd.Flag.VisitAll(func(f *flag.Flag) { f.Value.Set(f.DefValue) }) break } } } func contains(s, substr string) bool { return strings.Contains(s, substr) }