Files
seaweedFS/test/volume_server/framework/cluster.go
Chris Lu beeb375a88 Add volume server integration test suite and CI workflow (#8322)
* docs(volume_server): add integration test development plan

* test(volume_server): add integration harness and profile matrix

* test(volume_server/http): add admin and options integration coverage

* test(volume_server/grpc): add state and status integration coverage

* test(volume_server): auto-build weed binary and harden cluster startup

* test(volume_server/http): add upload read range head delete coverage

* test(volume_server/grpc): expand admin lifecycle and state coverage

* docs(volume_server): update progress tracker for implemented tests

* test(volume_server/http): cover if-none-match and invalid-range branches

* test(volume_server/grpc): add batch delete integration coverage

* docs(volume_server): log latest HTTP and gRPC test coverage

* ci(volume_server): run volume server integration tests in github actions

* test(volume_server/grpc): add needle status configure ping and leave coverage

* docs(volume_server): record additional grpc coverage progress

* test(volume_server/grpc): add vacuum integration coverage

* docs(volume_server): record vacuum test coverage progress

* test(volume_server/grpc): add read and write needle blob error-path coverage

* docs(volume_server): record data rw grpc coverage progress

* test(volume_server/http): add jwt auth integration coverage

* test(volume_server/grpc): add sync copy and stream error-path coverage

* docs(volume_server): record jwt and sync/copy test coverage

* test(volume_server/grpc): add scrub and query integration coverage

* test(volume_server/grpc): add volume tail sender and receiver coverage

* docs(volume_server): record scrub query and tail test progress

* test(volume_server/grpc): add readonly writable and collection lifecycle coverage

* test(volume_server/http): add public-port cors and method parity coverage

* test(volume_server/grpc): add blob meta and read-all success path coverage

* test(volume_server/grpc): expand scrub and query variation coverage

* test(volume_server/grpc): add tiering and remote fetch error-path coverage

* test(volume_server/http): add unchanged write and delete edge-case coverage

* test(volume_server/grpc): add ping unknown and unreachable target coverage

* test(volume_server/grpc): add volume delete only-empty variation coverage

* test(volume_server/http): add jwt fid-mismatch auth coverage

* test(volume_server/grpc): add scrub ec auto-select empty coverage

* test(volume_server/grpc): stabilize ping timestamp assertion

* docs(volume_server): update integration coverage progress log

* test(volume_server/grpc): add tier remote backend and config variation coverage

* docs(volume_server): record tier remote variation progress

* test(volume_server/grpc): add incremental copy and receive-file protocol coverage

* test(volume_server/http): add read path shape and if-modified-since coverage

* test(volume_server/grpc): add copy-file compaction and receive-file success coverage

* test(volume_server/http): add passthrough headers and static asset coverage

* test(volume_server/grpc): add ping filer unreachable coverage

* docs(volume_server): record copy receive and http variant progress

* test(volume_server/grpc): add erasure coding maintenance and missing-path coverage

* docs(volume_server): record initial erasure coding rpc coverage

* test(volume_server/http): add multi-range multipart response coverage

* docs(volume_server): record multi-range http coverage progress

* test(volume_server/grpc): add query empty-stripe no-match coverage

* docs(volume_server): record query no-match stream behavior coverage

* test(volume_server/http): add upload throttling timeout and replicate bypass coverage

* docs(volume_server): record upload throttling coverage progress

* test(volume_server/http): add download throttling timeout coverage

* docs(volume_server): record download throttling coverage progress

* test(volume_server/http): add jwt wrong-cookie fid mismatch coverage

* docs(volume_server): record jwt wrong-cookie mismatch coverage

* test(volume_server/http): add jwt expired-token rejection coverage

* docs(volume_server): record jwt expired-token coverage

* test(volume_server/http): add jwt query and cookie transport coverage

* docs(volume_server): record jwt token transport coverage

* test(volume_server/http): add jwt token-source precedence coverage

* docs(volume_server): record jwt token-source precedence coverage

* test(volume_server/http): add jwt header-over-cookie precedence coverage

* docs(volume_server): record jwt header cookie precedence coverage

* test(volume_server/http): add jwt query-over-cookie precedence coverage

* docs(volume_server): record jwt query cookie precedence coverage

* test(volume_server/grpc): add setstate version mismatch and nil-state coverage

* docs(volume_server): record setstate validation coverage

* test(volume_server/grpc): add readonly persist-true lifecycle coverage

* docs(volume_server): record readonly persist variation coverage

* test(volume_server/http): add options origin cors header coverage

* docs(volume_server): record options origin cors coverage

* test(volume_server/http): add trace unsupported-method parity coverage

* docs(volume_server): record trace method parity coverage

* test(volume_server/grpc): add batch delete cookie-check variation coverage

* docs(volume_server): record batch delete cookie-check coverage

* test(volume_server/grpc): add admin lifecycle missing and maintenance variants

* docs(volume_server): record admin lifecycle edge-case coverage

* test(volume_server/grpc): add mixed batch delete status matrix coverage

* docs(volume_server): record mixed batch delete matrix coverage

* test(volume_server/http): add jwt-profile ui access gating coverage

* docs(volume_server): record jwt ui-gating http coverage

* test(volume_server/http): add propfind unsupported-method parity coverage

* docs(volume_server): record propfind method parity coverage

* test(volume_server/grpc): add volume configure success and rollback-path coverage

* docs(volume_server): record volume configure branch coverage

* test(volume_server/grpc): add volume needle status missing-path coverage

* docs(volume_server): record volume needle status error-path coverage

* test(volume_server/http): add readDeleted query behavior coverage

* docs(volume_server): record readDeleted http behavior coverage

* test(volume_server/http): add delete ts override parity coverage

* docs(volume_server): record delete ts parity coverage

* test(volume_server/grpc): add invalid blob/meta offset coverage

* docs(volume_server): record invalid blob/meta offset coverage

* test(volume_server/grpc): add read-all mixed volume abort coverage

* docs(volume_server): record read-all mixed-volume abort coverage

* test(volume_server/http): assert head response body parity

* docs(volume_server): record head body parity assertion

* test(volume_server/grpc): assert status state and memory payload completeness

* docs(volume_server): record volume server status payload coverage

* test(volume_server/grpc): add batch delete chunk-manifest rejection coverage

* docs(volume_server): record batch delete chunk-manifest coverage

* test(volume_server/grpc): add query cookie-mismatch eof parity coverage

* docs(volume_server): record query cookie-mismatch parity coverage

* test(volume_server/grpc): add ping master success target coverage

* docs(volume_server): record ping master success coverage

* test(volume_server/http): add head if-none-match conditional parity

* docs(volume_server): record head if-none-match parity coverage

* test(volume_server/http): add head if-modified-since parity coverage

* docs(volume_server): record head if-modified-since parity coverage

* test(volume_server/http): add connect unsupported-method parity coverage

* docs(volume_server): record connect method parity coverage

* test(volume_server/http): assert options allow-headers cors parity

* docs(volume_server): record options allow-headers coverage

* test(volume_server/framework): add dual volume cluster integration harness

* test(volume_server/http): add missing-local read mode proxy redirect local coverage

* docs(volume_server): record read mode missing-local matrix coverage

* test(volume_server/http): add download over-limit replica proxy fallback coverage

* docs(volume_server): record download replica fallback coverage

* test(volume_server/http): add missing-local readDeleted proxy redirect parity coverage

* docs(volume_server): record missing-local readDeleted mode coverage

* test(volume_server/framework): add single-volume cluster with filer harness

* test(volume_server/grpc): add ping filer success target coverage

* docs(volume_server): record ping filer success coverage

* test(volume_server/http): add proxied-loop guard download timeout coverage

* docs(volume_server): record proxied-loop download coverage

* test(volume_server/http): add disabled upload and download limit coverage

* docs(volume_server): record disabled throttling path coverage

* test(volume_server/grpc): add idempotent volume server leave coverage

* docs(volume_server): record leave idempotence coverage

* test(volume_server/http): add redirect collection query preservation coverage

* docs(volume_server): record redirect collection query coverage

* test(volume_server/http): assert admin server headers on status and health

* docs(volume_server): record admin server header coverage

* test(volume_server/http): assert healthz request-id echo parity

* docs(volume_server): record healthz request-id parity coverage

* test(volume_server/http): add over-limit invalid-vid download branch coverage

* docs(volume_server): record over-limit invalid-vid branch coverage

* test(volume_server/http): add public-port static asset coverage

* docs(volume_server): record public static endpoint coverage

* test(volume_server/http): add public head method parity coverage

* docs(volume_server): record public head parity coverage

* test(volume_server/http): add throttling wait-then-proceed path coverage

* docs(volume_server): record throttling wait-then-proceed coverage

* test(volume_server/http): add read cookie-mismatch not-found coverage

* docs(volume_server): record read cookie-mismatch coverage

* test(volume_server/http): add throttling timeout-recovery coverage

* docs(volume_server): record throttling timeout-recovery coverage

* test(volume_server/grpc): add ec generate mount info unmount lifecycle coverage

* docs(volume_server): record ec positive lifecycle coverage

* test(volume_server/grpc): add ec shard read and blob delete lifecycle coverage

* docs(volume_server): record ec shard read/blob delete lifecycle coverage

* test(volume_server/grpc): add ec rebuild and to-volume error branch coverage

* docs(volume_server): record ec rebuild and to-volume branch coverage

* test(volume_server/grpc): add ec shards-to-volume success roundtrip coverage

* docs(volume_server): record ec shards-to-volume success coverage

* test(volume_server/grpc): add ec receive and copy-file missing-source coverage

* docs(volume_server): record ec receive and copy-file coverage

* test(volume_server/grpc): add ec last-shard delete cleanup coverage

* docs(volume_server): record ec last-shard delete cleanup coverage

* test(volume_server/grpc): add volume copy success path coverage

* docs(volume_server): record volume copy success coverage

* test(volume_server/grpc): add volume copy overwrite-destination coverage

* docs(volume_server): record volume copy overwrite coverage

* test(volume_server/http): add write error-path variant coverage

* docs(volume_server): record http write error-path coverage

* test(volume_server/http): add conditional header precedence coverage

* docs(volume_server): record conditional header precedence coverage

* test(volume_server/http): add oversized combined range guard coverage

* docs(volume_server): record oversized range guard coverage

* test(volume_server/http): add image resize and crop read coverage

* docs(volume_server): record image transform coverage

* test(volume_server/http): add chunk-manifest expansion and bypass coverage

* docs(volume_server): record chunk-manifest read coverage

* test(volume_server/http): add compressed read encoding matrix coverage

* docs(volume_server): record compressed read matrix coverage

* test(volume_server/grpc): add tail receiver source replication coverage

* docs(volume_server): record tail receiver replication coverage

* test(volume_server/grpc): add tail sender large-needle chunking coverage

* docs(volume_server): record tail sender chunking coverage

* test(volume_server/grpc): add ec-backed volume needle status coverage

* docs(volume_server): record ec-backed needle status coverage

* test(volume_server/grpc): add ec shard copy from peer success coverage

* docs(volume_server): record ec shard copy success coverage

* test(volume_server/http): add chunk-manifest delete child cleanup coverage

* docs(volume_server): record chunk-manifest delete cleanup coverage

* test(volume_server/http): add chunk-manifest delete failure-path coverage

* docs(volume_server): record chunk-manifest delete failure coverage

* test(volume_server/grpc): add ec shard copy source-unavailable coverage

* docs(volume_server): record ec shard copy source-unavailable coverage

* parallel
2026-02-13 00:40:56 -08:00

443 lines
11 KiB
Go

package framework
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
)
const (
defaultWaitTimeout = 30 * time.Second
defaultWaitTick = 200 * time.Millisecond
testVolumeSizeLimitMB = 32
)
// Cluster is a lightweight SeaweedFS master + one volume server test harness.
type Cluster struct {
testingTB testing.TB
profile matrix.Profile
weedBinary string
baseDir string
configDir string
logsDir string
keepLogs bool
masterPort int
masterGrpcPort int
volumePort int
volumeGrpcPort int
volumePubPort int
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
cleanupOnce sync.Once
}
// StartSingleVolumeCluster boots one master and one volume server.
func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster {
t.Helper()
weedBinary, err := FindOrBuildWeedBinary()
if err != nil {
t.Fatalf("resolve weed binary: %v", err)
}
baseDir, keepLogs, err := newWorkDir()
if err != nil {
t.Fatalf("create temp test directory: %v", err)
}
configDir := filepath.Join(baseDir, "config")
logsDir := filepath.Join(baseDir, "logs")
masterDataDir := filepath.Join(baseDir, "master")
volumeDataDir := filepath.Join(baseDir, "volume")
for _, dir := range []string{configDir, logsDir, masterDataDir, volumeDataDir} {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
t.Fatalf("create %s: %v", dir, mkErr)
}
}
if err = writeSecurityConfig(configDir, profile); err != nil {
t.Fatalf("write security config: %v", err)
}
masterPort, masterGrpcPort, err := allocateMasterPortPair()
if err != nil {
t.Fatalf("allocate master port pair: %v", err)
}
ports, err := allocatePorts(3)
if err != nil {
t.Fatalf("allocate ports: %v", err)
}
c := &Cluster{
testingTB: t,
profile: profile,
weedBinary: weedBinary,
baseDir: baseDir,
configDir: configDir,
logsDir: logsDir,
keepLogs: keepLogs,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
volumePort: ports[0],
volumeGrpcPort: ports[1],
volumePubPort: ports[0],
}
if profile.SplitPublicPort {
c.volumePubPort = ports[2]
}
if err = c.startMaster(masterDataDir); err != nil {
c.Stop()
t.Fatalf("start master: %v", err)
}
if err = c.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil {
masterLog := c.tailLog("master.log")
c.Stop()
t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog)
}
if err = c.startVolume(volumeDataDir); err != nil {
masterLog := c.tailLog("master.log")
c.Stop()
t.Fatalf("start volume: %v\nmaster log tail:\n%s", err, masterLog)
}
if err = c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil {
volumeLog := c.tailLog("volume.log")
c.Stop()
t.Fatalf("wait for volume readiness: %v\nvolume log tail:\n%s", err, volumeLog)
}
if err = c.waitForTCP(c.VolumeGRPCAddress()); err != nil {
volumeLog := c.tailLog("volume.log")
c.Stop()
t.Fatalf("wait for volume grpc readiness: %v\nvolume log tail:\n%s", err, volumeLog)
}
t.Cleanup(func() {
c.Stop()
})
return c
}
// Stop terminates all processes and cleans temporary files.
func (c *Cluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
stopProcess(c.volumeCmd)
stopProcess(c.masterCmd)
if !c.keepLogs && !c.testingTB.Failed() {
_ = os.RemoveAll(c.baseDir)
} else if c.baseDir != "" {
c.testingTB.Logf("volume server integration logs kept at %s", c.baseDir)
}
})
}
func (c *Cluster) startMaster(dataDir string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "master.log"))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"master",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.masterPort),
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
"-mdir=" + dataDir,
"-peers=none",
"-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB),
"-defaultReplication=000",
}
c.masterCmd = exec.Command(c.weedBinary, args...)
c.masterCmd.Dir = c.baseDir
c.masterCmd.Stdout = logFile
c.masterCmd.Stderr = logFile
return c.masterCmd.Start()
}
func (c *Cluster) startVolume(dataDir string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "volume.log"))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"volume",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.volumePort),
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPort),
"-port.public=" + strconv.Itoa(c.volumePubPort),
"-dir=" + dataDir,
"-max=16",
"-master=127.0.0.1:" + strconv.Itoa(c.masterPort),
"-readMode=" + c.profile.ReadMode,
"-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
"-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB),
}
if c.profile.InflightUploadTimeout > 0 {
args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String())
}
if c.profile.InflightDownloadTimeout > 0 {
args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String())
}
c.volumeCmd = exec.Command(c.weedBinary, args...)
c.volumeCmd.Dir = c.baseDir
c.volumeCmd.Stdout = logFile
c.volumeCmd.Stderr = logFile
return c.volumeCmd.Start()
}
func (c *Cluster) waitForHTTP(url string) error {
client := &http.Client{Timeout: 1 * time.Second}
deadline := time.Now().Add(defaultWaitTimeout)
for time.Now().Before(deadline) {
resp, err := client.Get(url)
if err == nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode < 500 {
return nil
}
}
time.Sleep(defaultWaitTick)
}
return fmt.Errorf("timed out waiting for %s", url)
}
func (c *Cluster) waitForTCP(addr string) error {
deadline := time.Now().Add(defaultWaitTimeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, time.Second)
if err == nil {
_ = conn.Close()
return nil
}
time.Sleep(defaultWaitTick)
}
return fmt.Errorf("timed out waiting for tcp %s", addr)
}
func stopProcess(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
}
_ = cmd.Process.Signal(os.Interrupt)
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(10 * time.Second):
_ = cmd.Process.Kill()
<-done
case <-done:
}
}
func allocatePorts(count int) ([]int, error) {
listeners := make([]net.Listener, 0, count)
ports := make([]int, 0, count)
for i := 0; i < count; i++ {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
for _, ll := range listeners {
_ = ll.Close()
}
return nil, err
}
listeners = append(listeners, l)
ports = append(ports, l.Addr().(*net.TCPAddr).Port)
}
for _, l := range listeners {
_ = l.Close()
}
return ports, nil
}
func allocateMasterPortPair() (int, int, error) {
for masterPort := 10000; masterPort <= 55535; masterPort++ {
masterGrpcPort := masterPort + 10000
l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort)))
if err != nil {
continue
}
l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterGrpcPort)))
if err != nil {
_ = l1.Close()
continue
}
_ = l2.Close()
_ = l1.Close()
return masterPort, masterGrpcPort, nil
}
return 0, 0, errors.New("unable to find available master port pair")
}
func newWorkDir() (dir string, keepLogs bool, err error) {
keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1"
dir, err = os.MkdirTemp("", "seaweedfs_volume_server_it_")
return dir, keepLogs, err
}
func writeSecurityConfig(configDir string, profile matrix.Profile) error {
var b strings.Builder
if profile.EnableJWT {
if profile.JWTSigningKey == "" || profile.JWTReadKey == "" {
return errors.New("jwt profile requires both write and read keys")
}
b.WriteString("[jwt.signing]\n")
b.WriteString("key = \"")
b.WriteString(profile.JWTSigningKey)
b.WriteString("\"\n")
b.WriteString("expires_after_seconds = 60\n\n")
b.WriteString("[jwt.signing.read]\n")
b.WriteString("key = \"")
b.WriteString(profile.JWTReadKey)
b.WriteString("\"\n")
b.WriteString("expires_after_seconds = 60\n")
}
if b.Len() == 0 {
b.WriteString("# optional security config generated for integration tests\n")
}
return os.WriteFile(filepath.Join(configDir, "security.toml"), []byte(b.String()), 0o644)
}
// FindOrBuildWeedBinary returns an executable weed binary, building one when needed.
func FindOrBuildWeedBinary() (string, error) {
if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" {
if isExecutableFile(fromEnv) {
return fromEnv, nil
}
return "", fmt.Errorf("WEED_BINARY is set but not executable: %s", fromEnv)
}
repoRoot := ""
if _, file, _, ok := runtime.Caller(0); ok {
repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", ".."))
candidate := filepath.Join(repoRoot, "weed", "weed")
if isExecutableFile(candidate) {
return candidate, nil
}
}
if repoRoot == "" {
return "", errors.New("unable to detect repository root")
}
binDir := filepath.Join(os.TempDir(), "seaweedfs_volume_server_it_bin")
if err := os.MkdirAll(binDir, 0o755); err != nil {
return "", fmt.Errorf("create binary directory %s: %w", binDir, err)
}
binPath := filepath.Join(binDir, "weed")
if isExecutableFile(binPath) {
return binPath, nil
}
cmd := exec.Command("go", "build", "-o", binPath, ".")
cmd.Dir = filepath.Join(repoRoot, "weed")
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("build weed binary: %w\n%s", err, out.String())
}
if !isExecutableFile(binPath) {
return "", fmt.Errorf("built weed binary is not executable: %s", binPath)
}
return binPath, nil
}
func isExecutableFile(path string) bool {
info, err := os.Stat(path)
if err != nil || info.IsDir() {
return false
}
mode := info.Mode().Perm()
return mode&0o111 != 0
}
func (c *Cluster) tailLog(logName string) string {
f, err := os.Open(filepath.Join(c.logsDir, logName))
if err != nil {
return ""
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := make([]string, 0, 40)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > 40 {
lines = lines[1:]
}
}
return strings.Join(lines, "\n")
}
func (c *Cluster) MasterAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
}
func (c *Cluster) VolumeAdminAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort))
}
func (c *Cluster) VolumePublicAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPort))
}
func (c *Cluster) VolumeGRPCAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort))
}
// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort
func (c *Cluster) VolumeServerAddress() string {
return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(), c.volumeGrpcPort)
}
func (c *Cluster) MasterURL() string {
return "http://" + c.MasterAddress()
}
func (c *Cluster) VolumeAdminURL() string {
return "http://" + c.VolumeAdminAddress()
}
func (c *Cluster) VolumePublicURL() string {
return "http://" + c.VolumePublicAddress()
}
func (c *Cluster) BaseDir() string {
return c.baseDir
}