Files
seaweedFS/test/metadata_subscribe/metadata_subscribe_integration_test.go
Chris Lu d970c15d71 fix: prevent filer.backup stall in single-filer setups (#7695)
* fix: prevent filer.backup stall in single-filer setups (#4977)

When MetaAggregator.MetaLogBuffer is empty (which happens in single-filer
setups with no peers), ReadFromBuffer was returning nil error, causing
LoopProcessLogData to enter an infinite wait loop on ListenersCond.

This fix returns ResumeFromDiskError instead, allowing SubscribeMetadata
to loop back and read from persisted logs on disk. This ensures filer.backup
continues processing events even when the in-memory aggregator buffer is empty.

Fixes #4977

* test: add integration tests for metadata subscription

Add integration tests for metadata subscription functionality:

- TestMetadataSubscribeBasic: Tests basic subscription and event receiving
- TestMetadataSubscribeSingleFilerNoStall: Regression test for #4977,
  verifies subscription doesn't stall under high load in single-filer setups
- TestMetadataSubscribeResumeFromDisk: Tests resuming subscription from disk

Related to #4977

* ci: add GitHub Actions workflow for metadata subscribe tests

Add CI workflow that runs on:
- Push/PR to master affecting filer, log_buffer, or metadata subscribe code
- Runs the integration tests for metadata subscription
- Uploads logs on failure for debugging

Related to #4977

* fix: use multipart form-data for file uploads in integration tests

The filer expects multipart/form-data for file uploads, not raw POST body.
This fixes the 'Content-Type isn't multipart/form-data' error.

* test: use -peers=none for faster master startup

* test: add -peers=none to remaining master startup in ec tests

* fix: use filer HTTP port 8888, WithFilerClient adds 10000 for gRPC

WithFilerClient calls ToGrpcAddress() which adds 10000 to the port.
Passing 18888 resulted in connecting to 28888. Use 8888 instead.

* test: add concurrent writes and million updates tests

- TestMetadataSubscribeConcurrentWrites: 50 goroutines writing 20 files each
- TestMetadataSubscribeMillionUpdates: 1 million metadata entries via gRPC
  (metadata only, no actual file content for speed)

* fix: address PR review comments

- Handle os.MkdirAll errors explicitly instead of ignoring
- Handle log file creation errors with proper error messages
- Replace silent event dropping with 100ms timeout and warning log

* Update metadata_subscribe_integration_test.go
2025-12-09 20:15:35 -08:00

918 lines
26 KiB
Go

package metadata_subscribe
import (
"bytes"
"context"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// TestMetadataSubscribeBasic tests basic metadata subscription functionality
func TestMetadataSubscribeBasic(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_metadata_subscribe_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
cluster, err := startSeaweedFSCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
// Wait for servers to be ready
require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
t.Logf("SeaweedFS cluster started successfully")
t.Run("subscribe_and_receive_events", func(t *testing.T) {
// Create a channel to receive events
eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100)
errChan := make(chan error, 1)
// Start subscribing in a goroutine
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
go func() {
err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan)
if err != nil && !strings.Contains(err.Error(), "context canceled") {
errChan <- err
}
}()
// Wait for subscription to be established
time.Sleep(2 * time.Second)
// Create test files via HTTP
testFiles := []string{
"/test/file1.txt",
"/test/file2.txt",
"/test/subdir/file3.txt",
}
for _, path := range testFiles {
err := uploadFile("http://127.0.0.1:8888"+path, []byte("test content for "+path))
require.NoError(t, err, "Failed to upload %s", path)
t.Logf("Uploaded %s", path)
}
// Collect events with timeout
receivedPaths := make(map[string]bool)
timeout := time.After(30 * time.Second)
eventLoop:
for {
select {
case event := <-eventsChan:
if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
path := filepath.Join(event.Directory, event.EventNotification.NewEntry.Name)
t.Logf("Received event for: %s", path)
receivedPaths[path] = true
}
// Check if we received all expected events
allReceived := true
for _, p := range testFiles {
if !receivedPaths[p] {
allReceived = false
break
}
}
if allReceived {
break eventLoop
}
case err := <-errChan:
t.Fatalf("Subscription error: %v", err)
case <-timeout:
t.Logf("Timeout waiting for events. Received %d/%d events", len(receivedPaths), len(testFiles))
break eventLoop
}
}
// Verify we received events for all test files
for _, path := range testFiles {
assert.True(t, receivedPaths[path], "Should have received event for %s", path)
}
})
}
// TestMetadataSubscribeSingleFilerNoStall tests that subscription doesn't stall
// in single-filer setups (regression test for issue #4977)
func TestMetadataSubscribeSingleFilerNoStall(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_single_filer_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
cluster, err := startSeaweedFSCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
// Wait for servers to be ready
require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
t.Logf("Single-filer cluster started")
t.Run("high_load_subscription_no_stall", func(t *testing.T) {
// This test simulates the scenario from issue #4977:
// High-load writes while a subscriber tries to keep up
var receivedCount int64
var uploadedCount int64
eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 1000)
errChan := make(chan error, 1)
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
// Start subscriber
go func() {
err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan)
if err != nil && !strings.Contains(err.Error(), "context canceled") {
errChan <- err
}
}()
// Wait for subscription to be established
time.Sleep(2 * time.Second)
// Start counting received events
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-eventsChan:
if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
if !event.EventNotification.NewEntry.IsDirectory {
atomic.AddInt64(&receivedCount, 1)
}
}
case <-subCtx.Done():
return
}
}
}()
// Upload files concurrently (simulate high load)
numFiles := 100
numWorkers := 10
uploadWg := sync.WaitGroup{}
for w := 0; w < numWorkers; w++ {
uploadWg.Add(1)
go func(workerId int) {
defer uploadWg.Done()
for i := 0; i < numFiles/numWorkers; i++ {
path := fmt.Sprintf("/load_test/worker%d/file%d.txt", workerId, i)
err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d-%d", workerId, i)))
if err == nil {
atomic.AddInt64(&uploadedCount, 1)
}
}
}(w)
}
uploadWg.Wait()
uploaded := atomic.LoadInt64(&uploadedCount)
t.Logf("Uploaded %d files", uploaded)
// Wait for events to be received (with timeout to detect stall)
stallTimeout := time.After(60 * time.Second)
checkInterval := time.NewTicker(2 * time.Second)
defer checkInterval.Stop()
lastReceived := atomic.LoadInt64(&receivedCount)
staleCount := 0
waitLoop:
for {
select {
case <-stallTimeout:
received := atomic.LoadInt64(&receivedCount)
t.Logf("Timeout: received %d/%d events (%.1f%%)",
received, uploaded, float64(received)/float64(uploaded)*100)
break waitLoop
case <-checkInterval.C:
received := atomic.LoadInt64(&receivedCount)
if received >= uploaded {
t.Logf("All %d events received", received)
break waitLoop
}
if received == lastReceived {
staleCount++
if staleCount >= 5 {
// If no progress for 10 seconds, subscription may be stalled
t.Logf("WARNING: No progress for %d checks. Received %d/%d (%.1f%%)",
staleCount, received, uploaded, float64(received)/float64(uploaded)*100)
}
} else {
staleCount = 0
t.Logf("Progress: received %d/%d events (%.1f%%)",
received, uploaded, float64(received)/float64(uploaded)*100)
}
lastReceived = received
case err := <-errChan:
t.Fatalf("Subscription error: %v", err)
}
}
subCancel()
wg.Wait()
received := atomic.LoadInt64(&receivedCount)
// With the fix for #4977, we should receive a high percentage of events
// Before the fix, this would stall at ~20-40%
percentage := float64(received) / float64(uploaded) * 100
t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage)
// We should receive at least 80% of events (allowing for some timing issues)
assert.GreaterOrEqual(t, percentage, 80.0,
"Should receive at least 80%% of events (received %.1f%%)", percentage)
})
}
// TestMetadataSubscribeResumeFromDisk tests that subscription can resume from disk
func TestMetadataSubscribeResumeFromDisk(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_resume_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
cluster, err := startSeaweedFSCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
t.Run("upload_before_subscribe", func(t *testing.T) {
// Upload files BEFORE starting subscription
numFiles := 20
for i := 0; i < numFiles; i++ {
path := fmt.Sprintf("/pre_subscribe/file%d.txt", i)
err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d", i)))
require.NoError(t, err)
}
t.Logf("Uploaded %d files before subscription", numFiles)
// Wait for logs to be flushed to disk
time.Sleep(15 * time.Second)
// Now start subscription from the beginning
eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100)
errChan := make(chan error, 1)
subCtx, subCancel := context.WithTimeout(ctx, 30*time.Second)
defer subCancel()
go func() {
err := subscribeToMetadataFromBeginning(subCtx, "127.0.0.1:8888", "/pre_subscribe/", eventsChan)
if err != nil && !strings.Contains(err.Error(), "context") {
errChan <- err
}
}()
// Count received events
receivedCount := 0
timeout := time.After(30 * time.Second)
countLoop:
for {
select {
case event := <-eventsChan:
if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
if !event.EventNotification.NewEntry.IsDirectory {
receivedCount++
t.Logf("Received event %d: %s/%s", receivedCount,
event.Directory, event.EventNotification.NewEntry.Name)
}
}
if receivedCount >= numFiles {
break countLoop
}
case err := <-errChan:
t.Fatalf("Subscription error: %v", err)
case <-timeout:
t.Logf("Timeout: received %d/%d events", receivedCount, numFiles)
break countLoop
}
}
// Should receive all pre-uploaded files from disk
assert.GreaterOrEqual(t, receivedCount, numFiles-2, // Allow small margin
"Should receive most pre-uploaded files from disk (received %d/%d)", receivedCount, numFiles)
})
}
// TestMetadataSubscribeConcurrentWrites tests subscription with concurrent writes
func TestMetadataSubscribeConcurrentWrites(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_concurrent_writes_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
cluster, err := startSeaweedFSCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
t.Logf("Cluster started for concurrent writes test")
t.Run("concurrent_goroutine_writes", func(t *testing.T) {
var receivedCount int64
var uploadedCount int64
eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 10000)
errChan := make(chan error, 1)
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
// Start subscriber
go func() {
err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/concurrent/", eventsChan)
if err != nil && !strings.Contains(err.Error(), "context") {
errChan <- err
}
}()
time.Sleep(2 * time.Second)
// Start counting received events
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-eventsChan:
if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
if !event.EventNotification.NewEntry.IsDirectory {
atomic.AddInt64(&receivedCount, 1)
}
}
case <-subCtx.Done():
return
}
}
}()
// Launch many concurrent writers
numWorkers := 50
filesPerWorker := 20
totalExpected := int64(numWorkers * filesPerWorker)
uploadWg := sync.WaitGroup{}
for w := 0; w < numWorkers; w++ {
uploadWg.Add(1)
go func(workerId int) {
defer uploadWg.Done()
for i := 0; i < filesPerWorker; i++ {
path := fmt.Sprintf("/concurrent/w%d/f%d.txt", workerId, i)
content := []byte(fmt.Sprintf("worker%d-file%d", workerId, i))
if err := uploadFile("http://127.0.0.1:8888"+path, content); err == nil {
atomic.AddInt64(&uploadedCount, 1)
}
}
}(w)
}
uploadWg.Wait()
uploaded := atomic.LoadInt64(&uploadedCount)
t.Logf("Uploaded %d/%d files from %d concurrent workers", uploaded, totalExpected, numWorkers)
// Wait for events with progress tracking
stallTimeout := time.After(90 * time.Second)
checkInterval := time.NewTicker(3 * time.Second)
defer checkInterval.Stop()
lastReceived := int64(0)
stableCount := 0
waitLoop:
for {
select {
case <-stallTimeout:
break waitLoop
case <-checkInterval.C:
received := atomic.LoadInt64(&receivedCount)
if received >= uploaded {
t.Logf("All %d events received", received)
break waitLoop
}
if received == lastReceived {
stableCount++
if stableCount >= 5 {
t.Logf("No progress for %d checks, received %d/%d", stableCount, received, uploaded)
break waitLoop
}
} else {
stableCount = 0
t.Logf("Progress: %d/%d (%.1f%%)", received, uploaded, float64(received)/float64(uploaded)*100)
}
lastReceived = received
case err := <-errChan:
t.Fatalf("Subscription error: %v", err)
}
}
subCancel()
wg.Wait()
received := atomic.LoadInt64(&receivedCount)
percentage := float64(received) / float64(uploaded) * 100
t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage)
// Should receive at least 80% of events
assert.GreaterOrEqual(t, percentage, 80.0,
"Should receive at least 80%% of concurrent write events")
})
}
// TestMetadataSubscribeMillionUpdates tests subscription with 1 million metadata updates
// This test creates metadata entries directly via gRPC without actual file content
func TestMetadataSubscribeMillionUpdates(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_million_updates_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
cluster, err := startSeaweedFSCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
t.Logf("Cluster started for million updates test")
t.Run("million_metadata_updates", func(t *testing.T) {
var receivedCount int64
var createdCount int64
totalEntries := int64(1_000_000)
eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100000)
errChan := make(chan error, 1)
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
// Start subscriber
go func() {
err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/million/", eventsChan)
if err != nil && !strings.Contains(err.Error(), "context") {
errChan <- err
}
}()
time.Sleep(2 * time.Second)
// Start counting received events
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-eventsChan:
if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
if !event.EventNotification.NewEntry.IsDirectory {
atomic.AddInt64(&receivedCount, 1)
}
}
case <-subCtx.Done():
return
}
}
}()
// Create metadata entries directly via gRPC (no actual file content)
numWorkers := 100
entriesPerWorker := int(totalEntries) / numWorkers
startTime := time.Now()
createWg := sync.WaitGroup{}
for w := 0; w < numWorkers; w++ {
createWg.Add(1)
go func(workerId int) {
defer createWg.Done()
grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
err := pb.WithFilerClient(false, 0, pb.ServerAddress("127.0.0.1:8888"), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
for i := 0; i < entriesPerWorker; i++ {
dir := fmt.Sprintf("/million/bucket%d", workerId%100)
name := fmt.Sprintf("entry_%d_%d", workerId, i)
_, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
FileSize: 100,
Mtime: time.Now().Unix(),
FileMode: 0644,
Uid: 1000,
Gid: 1000,
},
},
})
if err == nil {
atomic.AddInt64(&createdCount, 1)
}
// Log progress every 10000 entries per worker
if i > 0 && i%10000 == 0 {
created := atomic.LoadInt64(&createdCount)
elapsed := time.Since(startTime)
rate := float64(created) / elapsed.Seconds()
t.Logf("Worker %d: created %d entries, total %d (%.0f/sec)",
workerId, i, created, rate)
}
}
return nil
})
if err != nil {
t.Logf("Worker %d error: %v", workerId, err)
}
}(w)
}
// Progress reporter
progressDone := make(chan struct{})
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
created := atomic.LoadInt64(&createdCount)
received := atomic.LoadInt64(&receivedCount)
elapsed := time.Since(startTime)
createRate := float64(created) / elapsed.Seconds()
receiveRate := float64(received) / elapsed.Seconds()
t.Logf("Progress: created %d (%.0f/sec), received %d (%.0f/sec), lag %d",
created, createRate, received, receiveRate, created-received)
case <-progressDone:
return
}
}
}()
createWg.Wait()
close(progressDone)
created := atomic.LoadInt64(&createdCount)
elapsed := time.Since(startTime)
t.Logf("Created %d entries in %v (%.0f/sec)", created, elapsed, float64(created)/elapsed.Seconds())
// Wait for subscription to catch up
catchupTimeout := time.After(5 * time.Minute)
checkInterval := time.NewTicker(5 * time.Second)
defer checkInterval.Stop()
lastReceived := int64(0)
stableCount := 0
waitLoop:
for {
select {
case <-catchupTimeout:
t.Logf("Catchup timeout reached")
break waitLoop
case <-checkInterval.C:
received := atomic.LoadInt64(&receivedCount)
if received >= created {
t.Logf("All %d events received", received)
break waitLoop
}
if received == lastReceived {
stableCount++
if stableCount >= 10 {
t.Logf("No progress for %d checks", stableCount)
break waitLoop
}
} else {
stableCount = 0
rate := float64(received-lastReceived) / 5.0
t.Logf("Catching up: %d/%d (%.1f%%) at %.0f/sec",
received, created, float64(received)/float64(created)*100, rate)
}
lastReceived = received
case err := <-errChan:
t.Fatalf("Subscription error: %v", err)
}
}
subCancel()
wg.Wait()
received := atomic.LoadInt64(&receivedCount)
percentage := float64(received) / float64(created) * 100
totalTime := time.Since(startTime)
t.Logf("Final: created %d, received %d (%.1f%%) in %v", created, received, percentage, totalTime)
// For million entries, we expect at least 90% to be received
assert.GreaterOrEqual(t, percentage, 90.0,
"Should receive at least 90%% of million metadata events (received %.1f%%)", percentage)
})
}
// Helper types and functions
type TestCluster struct {
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
filerCmd *exec.Cmd
testDir string
}
func (c *TestCluster) Stop() {
if c.filerCmd != nil && c.filerCmd.Process != nil {
c.filerCmd.Process.Kill()
c.filerCmd.Wait()
}
if c.volumeCmd != nil && c.volumeCmd.Process != nil {
c.volumeCmd.Process.Kill()
c.volumeCmd.Wait()
}
if c.masterCmd != nil && c.masterCmd.Process != nil {
c.masterCmd.Process.Kill()
c.masterCmd.Wait()
}
}
func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, error) {
weedBinary := findWeedBinary()
if weedBinary == "" {
return nil, fmt.Errorf("weed binary not found")
}
cluster := &TestCluster{testDir: dataDir}
// Create directories
masterDir := filepath.Join(dataDir, "master")
volumeDir := filepath.Join(dataDir, "volume")
filerDir := filepath.Join(dataDir, "filer")
if err := os.MkdirAll(masterDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create master dir: %v", err)
}
if err := os.MkdirAll(volumeDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create volume dir: %v", err)
}
if err := os.MkdirAll(filerDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create filer dir: %v", err)
}
// Start master server
masterCmd := exec.CommandContext(ctx, weedBinary, "master",
"-port", "9333",
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
"-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
if err != nil {
return nil, fmt.Errorf("failed to create master log file: %v", err)
}
masterCmd.Stdout = masterLogFile
masterCmd.Stderr = masterLogFile
if err := masterCmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start master: %v", err)
}
cluster.masterCmd = masterCmd
time.Sleep(2 * time.Second)
// Start volume server
volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
"-port", "8080",
"-dir", volumeDir,
"-max", "10",
"-master", "127.0.0.1:9333",
"-ip", "127.0.0.1",
)
volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log"))
if err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create volume log file: %v", err)
}
volumeCmd.Stdout = volumeLogFile
volumeCmd.Stderr = volumeLogFile
if err := volumeCmd.Start(); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to start volume: %v", err)
}
cluster.volumeCmd = volumeCmd
time.Sleep(2 * time.Second)
// Start filer server
filerCmd := exec.CommandContext(ctx, weedBinary, "filer",
"-port", "8888",
"-master", "127.0.0.1:9333",
"-ip", "127.0.0.1",
)
filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log"))
if err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create filer log file: %v", err)
}
filerCmd.Stdout = filerLogFile
filerCmd.Stderr = filerLogFile
if err := filerCmd.Start(); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to start filer: %v", err)
}
cluster.filerCmd = filerCmd
time.Sleep(3 * time.Second)
return cluster, nil
}
func findWeedBinary() string {
candidates := []string{
"../../../weed/weed",
"../../weed/weed",
"./weed",
"weed",
}
for _, candidate := range candidates {
if _, err := os.Stat(candidate); err == nil {
return candidate
}
}
if path, err := exec.LookPath("weed"); err == nil {
return path
}
return ""
}
func waitForHTTPServer(url string, timeout time.Duration) error {
start := time.Now()
for time.Since(start) < timeout {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
return nil
}
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for server %s", url)
}
func uploadFile(url string, content []byte) error {
// Create multipart form data
var buf bytes.Buffer
writer := multipart.NewWriter(&buf)
// Extract filename from URL path
parts := strings.Split(url, "/")
filename := parts[len(parts)-1]
if filename == "" {
filename = "file.txt"
}
// Create form file field
part, err := writer.CreateFormFile("file", filename)
if err != nil {
return fmt.Errorf("create form file: %v", err)
}
if _, err := part.Write(content); err != nil {
return fmt.Errorf("write content: %v", err)
}
if err := writer.Close(); err != nil {
return fmt.Errorf("close writer: %v", err)
}
req, err := http.NewRequest("POST", url, &buf)
if err != nil {
return err
}
req.Header.Set("Content-Type", writer.FormDataContentType())
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
func subscribeToMetadata(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, time.Now().UnixNano(), eventsChan)
}
func subscribeToMetadataFromBeginning(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
// Start from Unix epoch to get all events
return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, 0, eventsChan)
}
func subscribeToMetadataWithOptions(ctx context.Context, filerGrpcAddress, pathPrefix string, sinceNs int64, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if grpcDialOption == nil {
grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
}
return pb.WithFilerClient(false, 0, pb.ServerAddress(filerGrpcAddress), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "integration_test",
PathPrefix: pathPrefix,
SinceNs: sinceNs,
ClientId: util.RandomInt32(),
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
}
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF || ctx.Err() != nil {
return nil
}
return err
}
select {
case eventsChan <- resp:
case <-ctx.Done():
return nil
case <-time.After(100 * time.Millisecond):
// Channel full after brief wait, log warning
glog.Warningf("Event channel full, skipping event for %s", resp.Directory)
}
}
})
}