Merge branch 'master' into mq

This commit is contained in:
chrislu
2024-08-10 10:01:57 -07:00
243 changed files with 5782 additions and 2536 deletions

View File

@@ -66,7 +66,7 @@ var cmdBackup = &Command{
func runBackup(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if *s.volumeId == -1 {
@@ -138,7 +138,9 @@ func runBackup(cmd *Command, args []string) bool {
if datSize > stats.TailOffset {
// remove the old data
v.Destroy(false)
if err := v.Destroy(false); err != nil {
fmt.Printf("Error destroying volume: %v\n", err)
}
// recreate an empty volume
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
if err != nil {

View File

@@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type BenchmarkOptions struct {
@@ -111,7 +112,7 @@ var (
func runBenchmark(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
@@ -214,7 +215,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if isSecure {
jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
}
if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
if e := util_http.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++
} else {
s.failed++
@@ -295,7 +296,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
bytes, _, err = util.Get(url)
bytes, _, err = util_http.Get(url)
if err == nil {
break
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -47,7 +48,7 @@ var cmdDownload = &Command{
}
func runDownload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for _, fid := range args {
@@ -63,11 +64,11 @@ func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpti
if lookupError != nil {
return lookupError
}
filename, _, rc, err := util.DownloadFile(fileUrl, jwt)
filename, _, rc, err := util_http.DownloadFile(fileUrl, jwt)
if err != nil {
return err
}
defer util.CloseResponse(rc)
defer util_http.CloseResponse(rc)
if filename == "" {
filename = fileId
}
@@ -116,10 +117,10 @@ func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption
return "", nil, lookupError
}
var rc *http.Response
if filename, _, rc, e = util.DownloadFile(fileUrl, jwt); e != nil {
if filename, _, rc, e = util_http.DownloadFile(fileUrl, jwt); e != nil {
return "", nil, e
}
defer util.CloseResponse(rc)
defer util_http.CloseResponse(rc)
content, e = io.ReadAll(rc.Body)
return
}

View File

@@ -1,6 +1,9 @@
package command
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
@@ -10,8 +13,6 @@ import (
"strings"
"time"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -20,6 +21,10 @@ import (
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/spf13/viper"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/credentials/tls/certprovider/pemfile"
"google.golang.org/grpc/reflection"
)
var (
@@ -52,6 +57,7 @@ type FilerOptions struct {
disableHttp *bool
cipher *bool
metricsHttpPort *int
metricsHttpIp *string
saveToFilerLimit *int
defaultLevelDbDirectory *string
concurrentUploadLimitMB *int
@@ -63,7 +69,7 @@ type FilerOptions struct {
diskType *string
allowedOrigins *string
exposeDirectoryData *bool
joinExistingFiler *bool
certProvider certprovider.Provider
}
func init() {
@@ -85,6 +91,7 @@ func init() {
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.metricsHttpIp = cmdFiler.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
@@ -96,7 +103,6 @@ func init() {
f.diskType = cmdFiler.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
f.allowedOrigins = cmdFiler.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins")
f.exposeDirectoryData = cmdFiler.Flag.Bool("exposeDirectoryData", true, "whether to return directory metadata and content in Filer UI")
f.joinExistingFiler = cmdFiler.Flag.Bool("joinExistingFiler", false, "enable if new filer wants to join existing cluster")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -124,6 +130,7 @@ func init() {
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
filerWebDavOptions.maxMB = cmdFiler.Flag.Int("webdav.maxMB", 4, "split files larger than the limit")
filerWebDavOptions.filerRootPath = cmdFiler.Flag.String("webdav.filer.path", "/", "use this remote path from filer server")
// start iam on filer
@@ -172,9 +179,17 @@ func runFiler(cmd *Command, args []string) bool {
go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil)
}
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
go stats_collect.StartMetricsServer(*f.bindIp, *f.metricsHttpPort)
switch {
case *f.metricsHttpIp != "":
// noting to do, use f.metricsHttpIp
case *f.bindIp != "":
*f.metricsHttpIp = *f.bindIp
case *f.ip != "":
*f.metricsHttpIp = *f.ip
}
go stats_collect.StartMetricsServer(*f.metricsHttpIp, *f.metricsHttpPort)
filerAddress := pb.NewServerAddress(*f.ip, *f.port, *f.portGrpc).String()
startDelay := time.Duration(2)
@@ -222,6 +237,15 @@ func runFiler(cmd *Command, args []string) bool {
return true
}
// GetCertificateWithUpdate Auto refreshing TSL certificate
func (fo *FilerOptions) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
certs, err := fo.certProvider.KeyMaterial(context.Background())
if certs == nil {
return nil, err
}
return &certs.Certs[0], err
}
func (fo *FilerOptions) startFiler() {
defaultMux := http.NewServeMux()
@@ -264,7 +288,6 @@ func (fo *FilerOptions) startFiler() {
DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
DiskType: *fo.diskType,
AllowedOrigins: strings.Split(*fo.allowedOrigins, ","),
JoinExistingFiler: *fo.joinExistingFiler,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
@@ -332,15 +355,62 @@ func (fo *FilerOptions) startFiler() {
httpS.Serve(filerSocketListener)
}()
}
if filerLocalListener != nil {
go func() {
if err := httpS.Serve(filerLocalListener); err != nil {
glog.Errorf("Filer Fail to serve: %v", e)
}
}()
}
if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}
if viper.GetString("https.filer.key") != "" {
certFile := viper.GetString("https.filer.cert")
keyFile := viper.GetString("https.filer.key")
caCertFile := viper.GetString("https.filer.ca")
disbaleTlsVerifyClientCert := viper.GetBool("https.filer.disable_tls_verify_client_cert")
pemfileOptions := pemfile.Options{
CertFile: certFile,
KeyFile: keyFile,
RefreshDuration: security.CredRefreshingInterval,
}
if fo.certProvider, err = pemfile.NewProvider(pemfileOptions); err != nil {
glog.Fatalf("pemfile.NewProvider(%v) failed: %v", pemfileOptions, err)
}
caCertPool := x509.NewCertPool()
if caCertFile != "" {
caCertFile, err := os.ReadFile(caCertFile)
if err != nil {
glog.Fatalf("error reading CA certificate: %v", err)
}
caCertPool.AppendCertsFromPEM(caCertFile)
}
clientAuth := tls.NoClientCert
if !disbaleTlsVerifyClientCert {
clientAuth = tls.RequireAndVerifyClientCert
}
httpS.TLSConfig = &tls.Config{
GetCertificate: fo.GetCertificateWithUpdate,
ClientAuth: clientAuth,
ClientCAs: caCertPool,
}
if filerLocalListener != nil {
go func() {
if err := httpS.ServeTLS(filerLocalListener, "", ""); err != nil {
glog.Errorf("Filer Fail to serve: %v", e)
}
}()
}
if err := httpS.ServeTLS(filerListener, "", ""); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}
} else {
if filerLocalListener != nil {
go func() {
if err := httpS.Serve(filerLocalListener); err != nil {
glog.Errorf("Filer Fail to serve: %v", e)
}
}()
}
if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"regexp"
"strings"
"time"
)
@@ -58,7 +59,7 @@ var cmdFilerBackup = &Command{
func runFilerBackup(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
util.LoadConfiguration("replication", true)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
@@ -148,17 +149,22 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
}()
}
prefix := sourcePath
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "backup_" + dataSink.GetName(),
ClientId: clientId,
ClientEpoch: clientEpoch,
SelfSignature: 0,
PathPrefix: sourcePath,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: startFrom.UnixNano(),
StopTsNs: 0,
EventErrorType: pb.TrivialOnError,
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)

View File

@@ -59,7 +59,7 @@ var cmdFilerCat = &Command{
func runFilerCat(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
if len(args) == 0 {
return false

View File

@@ -83,7 +83,7 @@ var cmdFilerCopy = &Command{
func runCopy(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
if len(args) <= 1 {
return false
@@ -344,7 +344,12 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return err
}
finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
uploader, uploaderErr := operation.NewUploader()
if uploaderErr != nil {
return uploaderErr
}
finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
@@ -423,7 +428,13 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
<-concurrentChunks
}()
fileId, uploadResult, err, _ := operation.UploadWithRetry(
uploader, err := operation.NewUploader()
if err != nil {
uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err)
return
}
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
@@ -472,7 +483,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
operation.DeleteFiles(func(_ context.Context) pb.ServerAddress {
operation.DeleteFileIds(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(copy.masters[0])
}, false, worker.options.grpcDialOption, fileIds)
return uploadError
@@ -535,8 +546,12 @@ func detectMimeType(f *os.File) string {
}
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
uploader, uploaderErr := operation.NewUploader()
if uploaderErr != nil {
return nil, fmt.Errorf("upload data: %v", uploaderErr)
}
finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,

View File

@@ -8,6 +8,7 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
"reflect"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -55,7 +56,7 @@ The backup writes to another filer store specified in a backup_filer.toml.
func runFilerMetaBackup(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
// load backup_filer.toml
@@ -63,9 +64,9 @@ func runFilerMetaBackup(cmd *Command, args []string) bool {
v.SetConfigFile(*metaBackup.backupFilerConfig)
if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
glog.Fatalf("Failed to load %s file: %v\nPlease use this command to generate the a %s.toml file\n"+
" weed scaffold -config=%s -output=.\n\n\n",
*metaBackup.backupFilerConfig, "backup_filer", "filer")
*metaBackup.backupFilerConfig, err, "backup_filer", "filer")
}
if err := metaBackup.initStore(v); err != nil {
@@ -197,17 +198,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
metaBackup.clientEpoch++
prefix := *metaBackup.filerDirectory
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "meta_backup",
ClientId: metaBackup.clientId,
ClientEpoch: metaBackup.clientEpoch,
SelfSignature: 0,
PathPrefix: *metaBackup.filerDirectory,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: startTime.UnixNano(),
StopTsNs: 0,
EventErrorType: pb.TrivialOnError,
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset)

View File

@@ -45,7 +45,7 @@ var (
func runFilerMetaTail(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()

View File

@@ -78,7 +78,7 @@ var cmdFilerRemoteGateway = &Command{
func runFilerRemoteGateway(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteGatewayOptions.grpcDialOption = grpcDialOption

View File

@@ -55,12 +55,12 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
ClientId: option.clientId,
ClientEpoch: option.clientEpoch,
SelfSignature: 0,
PathPrefix: option.bucketsDir,
PathPrefix: option.bucketsDir + "/",
AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
DirectoriesToWatch: nil,
StartTsNs: lastOffsetTs.UnixNano(),
StopTsNs: 0,
EventErrorType: pb.TrivialOnError,
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)

View File

@@ -73,7 +73,7 @@ var cmdFilerRemoteSynchronize = &Command{
func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteSyncOptions.grpcDialOption = grpcDialOption

View File

@@ -64,17 +64,22 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
option.clientEpoch++
prefix := mountedDir
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "filer.remote.sync",
ClientId: option.clientId,
ClientEpoch: option.clientEpoch,
SelfSignature: 0,
PathPrefix: mountedDir,
PathPrefix: prefix,
AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
DirectoriesToWatch: nil,
StartTsNs: lastOffsetTs.UnixNano(),
StopTsNs: 0,
EventErrorType: pb.TrivialOnError,
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)

View File

@@ -30,7 +30,7 @@ var cmdFilerReplicate = &Command{
func runFilerReplicate(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
util.LoadConfiguration("replication", true)
util.LoadConfiguration("notification", true)
config := util.GetViper()

View File

@@ -118,7 +118,7 @@ var cmdFilerSynchronize = &Command{
func runFilerSynchronize(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
@@ -296,12 +296,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
})
prefix := sourcePath
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: clientName,
ClientId: clientId,
ClientEpoch: clientEpoch,
SelfSignature: targetFilerSignature,
PathPrefix: sourcePath,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: sourceFilerOffsetTsNs,
@@ -469,14 +474,14 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
}
} else {
// new key is outside of the watched directory
// new key is outside the watched directory
if doDeleteFiles {
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
}
} else {
// old key is outside of the watched directory
// old key is outside the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
@@ -486,7 +491,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
return nil
}
} else {
// new key is also outside of the watched directory
// new key is also outside the watched directory
// skip
}
}

View File

@@ -47,7 +47,7 @@ func runIam(cmd *Command, args []string) bool {
func (iamopt *IamOptions) startIamServer() bool {
filerAddress := pb.ServerAddress(*iamopt.filer)
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {

View File

@@ -53,6 +53,7 @@ type MasterOptions struct {
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
metricsHttpIp *string
heartbeatInterval *time.Duration
electionTimeout *time.Duration
raftHashicorp *bool
@@ -77,6 +78,7 @@ func init() {
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
m.metricsHttpIp = cmdMaster.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.")
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
@@ -103,7 +105,7 @@ var (
func runMaster(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
util.LoadConfiguration("master", false)
grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
@@ -121,7 +123,15 @@ func runMaster(cmd *Command, args []string) bool {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
go stats_collect.StartMetricsServer(*m.ipBind, *m.metricsHttpPort)
switch {
case *m.metricsHttpIp != "":
// noting to do, use m.metricsHttpIp
case *m.ipBind != "":
*m.metricsHttpIp = *m.ipBind
case *m.ip != "":
*m.metricsHttpIp = *m.ip
}
go stats_collect.StartMetricsServer(*m.metricsHttpIp, *m.metricsHttpPort)
startMaster(m, masterWhiteList)
return true
@@ -180,10 +190,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD")
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods(http.MethodGet)
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods(http.MethodGet, http.MethodHead)
if *masterOption.raftHashicorp {
r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods(http.MethodGet)
}
// starting grpc server
grpcPort := *masterOption.portGrpc

View File

@@ -68,7 +68,7 @@ var cmdMasterFollower = &Command{
func runMasterFollower(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
util.LoadConfiguration("master", false)
if *mf.portGrpc == 0 {

View File

@@ -66,7 +66,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// try to connect to filer
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
var err error

View File

@@ -54,7 +54,7 @@ var cmdMqBroker = &Command{
func runMqBroker(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap()

View File

@@ -49,6 +49,7 @@ type S3Options struct {
tlsCACertificate *string
tlsVerifyClientCert *bool
metricsHttpPort *int
metricsHttpIp *string
allowEmptyFolder *bool
allowDeleteBucketNotEmpty *bool
auditLogConfig *string
@@ -75,6 +76,7 @@ func init() {
s3StandaloneOptions.tlsCACertificate = cmdS3.Flag.String("cacert.file", "", "path to the TLS CA certificate file")
s3StandaloneOptions.tlsVerifyClientCert = cmdS3.Flag.Bool("tlsVerifyClientCert", false, "whether to verify the client's certificate")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
s3StandaloneOptions.metricsHttpIp = cmdS3.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.")
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
@@ -163,17 +165,26 @@ var cmdS3 = &Command{
func runS3(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
go stats_collect.StartMetricsServer(*s3StandaloneOptions.bindIp, *s3StandaloneOptions.metricsHttpPort)
switch {
case *s3StandaloneOptions.metricsHttpIp != "":
// noting to do, use s3StandaloneOptions.metricsHttpIp
case *s3StandaloneOptions.bindIp != "":
*s3StandaloneOptions.metricsHttpIp = *s3StandaloneOptions.bindIp
}
go stats_collect.StartMetricsServer(*s3StandaloneOptions.metricsHttpIp, *s3StandaloneOptions.metricsHttpPort)
return s3StandaloneOptions.startS3Server()
}
// GetCertificateWithUpdate Auto refreshing TSL certificate
func (S3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
certs, err := S3opt.certProvider.KeyMaterial(context.Background())
func (s3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
certs, err := s3opt.certProvider.KeyMaterial(context.Background())
if certs == nil {
return nil, err
}
return &certs.Certs[0], err
}
@@ -320,6 +331,10 @@ func (s3opt *S3Options) startS3Server() bool {
ClientAuth: clientAuth,
ClientCAs: caCertPool,
}
err = security.FixTlsConfig(util.GetViper(), httpS.TLSConfig)
if err != nil {
glog.Fatalf("error with tls config: %v", err)
}
if *s3opt.portHttps == 0 {
glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
if s3ApiLocalListener != nil {

View File

@@ -285,7 +285,7 @@ password = ""
ssl = false
ssl_ca_file = ""
ssl_cert_file = ""
ssl_key_file = "
ssl_key_file = ""
insecure_skip_verify = false
option_pool_size = 0
database = "seaweedfs"

View File

@@ -95,18 +95,29 @@ allowed_commonNames = "" # comma-separated SSL certificate common names
cert = ""
key = ""
# volume server https options
# Note: work in progress!
# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
# https client for master|volume|filer|etc connection
# It is necessary that the parameters [https.volume]|[https.master]|[https.filer] are set
[https.client]
enabled = true
cert = ""
key = ""
ca = ""
# volume server https options
[https.volume]
cert = ""
key = ""
ca = ""
# master server https options
[https.master]
cert = ""
key = ""
ca = ""
# filer server https options
[https.filer]
cert = ""
key = ""
ca = ""
# disable_tls_verify_client_cert = true|false (default: false)

View File

@@ -66,6 +66,7 @@ var (
volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly (deprecated, use minFreeSpace instead).")
volumeMinFreeSpace = cmdServer.Flag.String("volume.minFreeSpace", "", "min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). Low disk space will mark all volumes as ReadOnly.")
serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
serverMetricsHttpIp = cmdServer.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingMasterServer = cmdServer.Flag.Bool("master", true, "whether to start master server")
@@ -97,7 +98,7 @@ func init() {
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("master.metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
masterOptions.raftResumeState = cmdServer.Flag.Bool("master.resumeState", false, "resume previous state on start master server")
masterOptions.raftHashicorp = cmdServer.Flag.Bool("master.raftHashicorp", false, "use hashicorp raft")
masterOptions.raftBootstrap = cmdMaster.Flag.Bool("master.raftBootstrap", false, "Whether to bootstrap the Raft cluster")
masterOptions.raftBootstrap = cmdServer.Flag.Bool("master.raftBootstrap", false, "Whether to bootstrap the Raft cluster")
masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers")
@@ -119,7 +120,6 @@ func init() {
filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
filerOptions.diskType = cmdServer.Flag.String("filer.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
filerOptions.exposeDirectoryData = cmdServer.Flag.Bool("filer.exposeDirectoryData", true, "expose directory data via filer. If false, filer UI will be innaccessible.")
filerOptions.joinExistingFiler = cmdServer.Flag.Bool("filer.joinExistingFiler", false, "enable if new filer wants to join existing cluster")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
@@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool {
go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil)
}
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
util.LoadConfiguration("master", false)
grace.SetupProfiling(*serverOptions.cpuprofile, *serverOptions.memprofile)
@@ -207,6 +207,10 @@ func runServer(cmd *Command, args []string) bool {
serverBindIp = serverIp
}
if *serverMetricsHttpIp == "" {
*serverMetricsHttpIp = *serverBindIp
}
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
@@ -245,7 +249,7 @@ func runServer(cmd *Command, args []string) bool {
webdavOptions.filer = &filerAddress
mqBrokerOptions.filerGroup = filerOptions.filerGroup
go stats_collect.StartMetricsServer(*serverBindIp, *serverMetricsHttpPort)
go stats_collect.StartMetricsServer(*serverMetricsHttpIp, *serverMetricsHttpPort)
folders := strings.Split(*volumeDataFolders, ",")

View File

@@ -35,7 +35,7 @@ var cmdShell = &Command{
func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
shellOptions.Directory = "/"

View File

@@ -20,6 +20,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"golang.org/x/net/context/ctxhttp"
)
@@ -198,7 +199,7 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R
if err != nil {
return Release{}, err
}
defer util.CloseResponse(res)
defer util_http.CloseResponse(res)
if res.StatusCode != http.StatusOK {
content := res.Header.Get("Content-Type")
@@ -258,7 +259,7 @@ func getGithubData(ctx context.Context, url string) ([]byte, error) {
if err != nil {
return nil, err
}
defer util.CloseResponse(res)
defer util_http.CloseResponse(res)
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
@@ -308,7 +309,12 @@ func extractToFile(buf []byte, filename, target string) error {
trd := tar.NewReader(gr)
hdr, terr := trd.Next()
if terr != nil {
glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr)
if hdr != nil {
glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr)
} else {
glog.Errorf("uncompress file is nil, failed:%s", terr)
}
return terr
}
rd = trd

View File

@@ -69,7 +69,7 @@ var cmdUpload = &Command{
func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
defaultReplication, err := readMasterConfiguration(grpcDialOption, pb.ServerAddress(*upload.master))

View File

@@ -65,6 +65,7 @@ type VolumeServerOptions struct {
pprof *bool
preStopSeconds *int
metricsHttpPort *int
metricsHttpIp *string
// pulseSeconds *int
inflightUploadDataTimeout *time.Duration
hasSlowRead *bool
@@ -99,6 +100,7 @@ func init() {
v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.metricsHttpIp = cmdVolume.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", true, "<experimental> if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.")
@@ -123,7 +125,7 @@ var (
func runVolume(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
// If --pprof is set we assume the caller wants to be able to collect
// cpu and memory profiles via go tool pprof
@@ -131,7 +133,15 @@ func runVolume(cmd *Command, args []string) bool {
grace.SetupProfiling(*v.cpuProfile, *v.memProfile)
}
go stats_collect.StartMetricsServer(*v.bindIp, *v.metricsHttpPort)
switch {
case *v.metricsHttpIp != "":
// noting to do, use v.metricsHttpIp
case *v.bindIp != "":
*v.metricsHttpIp = *v.bindIp
case *v.ip != "":
*v.metricsHttpIp = *v.ip
}
go stats_collect.StartMetricsServer(*v.metricsHttpIp, *v.metricsHttpPort)
minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent)
v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses()

View File

@@ -60,7 +60,7 @@ var cmdWebDav = &Command{
func runWebDav(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadSecurityConfiguration()
glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.Version(), *webDavStandaloneOptions.port)

View File

@@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/util"
"strings"
"sync"
@@ -140,6 +141,8 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full
}
}
} else {
err = fmt.Errorf("invalid bucket name %s", bucket)
}
return
@@ -340,6 +343,9 @@ func (store *AbstractSqlStore) Shutdown() {
}
func isValidBucket(bucket string) bool {
if s3bucket.VerifyS3BucketName(bucket) != nil {
return false
}
return bucket != DEFAULT_TABLE && bucket != ""
}

View File

@@ -1,10 +1,11 @@
package filer
import (
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"sync"
)
type ChunkGroup struct {
@@ -54,9 +55,11 @@ func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
if !found {
rangeStop = min(rangeStop, fileSize)
for i := rangeStart; i < rangeStop; i++ {
buff[i-offset] = 0
}
n = int(int64(n) + rangeStop - rangeStart)
continue
}
xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart)

View File

@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -120,7 +121,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err
}
return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
@@ -132,7 +133,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri
for _, urlString := range urlStrings {
var localProcessed int
var writeErr error
shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped {

View File

@@ -78,7 +78,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
return f
}
func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
func (f *Filer) MaybeBootstrapFromOnePeer(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
if len(existingNodes) == 0 {
return
}
@@ -91,25 +91,13 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
}
glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId)
f.UniqueFilerEpoch++
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "bootstrap",
ClientId: f.UniqueFilerId,
ClientEpoch: f.UniqueFilerEpoch,
SelfSignature: f.Signature,
PathPrefix: "/",
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: 0,
StopTsNs: snapshotTime.UnixNano(),
EventErrorType: pb.FatalOnError,
}
err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
return pb.WithFilerClient(false, f.UniqueFilerId, pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.StreamBfs(client, "/", snapshotTime.UnixNano(), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
return f.Store.InsertEntry(context.Background(), FromPbEntry(string(parentPath), entry))
})
})
return
}
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {

View File

@@ -102,7 +102,7 @@ func (fc *FilerConf) LoadFromBytes(data []byte) (err error) {
func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) {
for _, location := range conf.Locations {
err = fc.AddLocationConf(location)
err = fc.SetLocationConf(location)
if err != nil {
// this is not recoverable
return nil
@@ -111,7 +111,24 @@ func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) {
return nil
}
func (fc *FilerConf) GetLocationConf(locationPrefix string)(locConf *filer_pb.FilerConf_PathConf, found bool) {
return fc.rules.Get([]byte(locationPrefix))
}
func (fc *FilerConf) SetLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error) {
err = fc.rules.Put([]byte(locConf.LocationPrefix), locConf)
if err != nil {
glog.Errorf("put location prefix: %v", err)
}
return
}
func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error) {
existingConf, found := fc.rules.Get([]byte(locConf.LocationPrefix))
if found {
mergePathConf(existingConf, locConf)
locConf = existingConf
}
err = fc.rules.Put([]byte(locConf.LocationPrefix), locConf)
if err != nil {
glog.Errorf("put location prefix: %v", err)
@@ -170,6 +187,7 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
a.DataCenter = util.Nvl(b.DataCenter, a.DataCenter)
a.Rack = util.Nvl(b.Rack, a.Rack)
a.DataNode = util.Nvl(b.DataNode, a.DataNode)
a.DisableChunkDeletion = b.DisableChunkDeletion || a.DisableChunkDeletion
}
func (fc *FilerConf) ToProto() *filer_pb.FilerConf {

View File

@@ -17,7 +17,7 @@ const (
type OnChunksFunc func([]*filer_pb.FileChunk) error
type OnHardLinkIdsFunc func([]HardLinkId) error
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32, ifNotModifiedAfter int64) (err error) {
if p == "/" {
return nil
}
@@ -26,6 +26,9 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if findErr != nil {
return findErr
}
if ifNotModifiedAfter > 0 && entry.Attr.Mtime.Unix() > ifNotModifiedAfter {
return nil
}
isDeleteCollection := f.isBucket(entry)
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
@@ -50,12 +53,12 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
}
if shouldDeleteChunks && !isDeleteCollection {
f.DirectDeleteChunks(entry.GetChunks())
f.DeleteChunks(p, entry.GetChunks())
}
if isDeleteCollection {
collectionName := entry.Name()
f.doDeleteCollection(collectionName)
f.DoDeleteCollection(collectionName)
}
return nil
@@ -133,7 +136,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
return nil
}
func (f *Filer) doDeleteCollection(collectionName string) (err error) {
func (f *Filer) DoDeleteCollection(collectionName string) (err error) {
return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{

View File

@@ -1,11 +1,12 @@
package filer
import (
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/util"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -22,6 +23,7 @@ func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []str
locations = append(locations, operation.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: loc.GrpcPort,
})
}
m[vid] = &operation.LookupResult{
@@ -53,7 +55,7 @@ func (f *Filer) loopProcessingDeletion() {
fileIds = fileIds[:0]
}
deletionCount = len(toDeleteFileIds)
_, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
_, err := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
if err != nil {
if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
@@ -70,50 +72,6 @@ func (f *Filer) loopProcessingDeletion() {
}
}
func (f *Filer) doDeleteFileIds(fileIds []string) {
lookupFunc := LookupByMasterClientFn(f.MasterClient)
DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
for len(fileIds) > 0 {
var toDeleteFileIds []string
if len(fileIds) > DeletionBatchSize {
toDeleteFileIds = fileIds[:DeletionBatchSize]
fileIds = fileIds[DeletionBatchSize:]
} else {
toDeleteFileIds = fileIds
fileIds = fileIds[:0]
}
deletionCount := len(toDeleteFileIds)
_, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
if err != nil {
if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
}
}
}
}
func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) {
var fileIdsToDelete []string
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
fileIdsToDelete = append(fileIdsToDelete, chunk.GetFileIdString())
continue
}
dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
}
for _, dChunk := range dataChunks {
fileIdsToDelete = append(fileIdsToDelete, dChunk.GetFileIdString())
}
fileIdsToDelete = append(fileIdsToDelete, chunk.GetFileIdString())
}
f.doDeleteFileIds(fileIdsToDelete)
}
func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) {
f.doDeleteChunks(chunks)
}

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"io"
"math"
"regexp"
"strings"
"time"
@@ -116,101 +115,34 @@ var (
func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) {
startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
var stopDate, stopHourMinute string
if stopTsNs != 0 {
stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
visitor, visitErr := f.collectPersistedLogBuffer(startPosition, stopTsNs)
if visitErr != nil {
if visitErr == io.EOF {
return
}
err = fmt.Errorf("reading from persisted logs: %v", visitErr)
return
}
sizeBuf := make([]byte, 4)
startTsNs := startPosition.UnixNano()
dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
if listDayErr != nil {
return lastTsNs, isDone, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
if stopDate != "" {
if strings.Compare(dayEntry.Name(), stopDate) > 0 {
var logEntry *filer_pb.LogEntry
for {
logEntry, visitErr = visitor.GetNext()
if visitErr != nil {
if visitErr == io.EOF {
break
}
err = fmt.Errorf("read next from persisted logs: %v", visitErr)
return
}
// println("checking day", dayEntry.FullPath)
hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
if listHourMinuteErr != nil {
return lastTsNs, isDone, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
isDone, visitErr = eachLogEntryFn(logEntry)
if visitErr != nil {
err = fmt.Errorf("process persisted log entry: %v", visitErr)
return
}
for _, hourMinuteEntry := range hourMinuteEntries {
// println("checking hh-mm", hourMinuteEntry.FullPath)
if dayEntry.Name() == startDate {
hourMinute := util.FileNameBase(hourMinuteEntry.Name())
if strings.Compare(hourMinute, startHourMinute) < 0 {
continue
}
}
if dayEntry.Name() == stopDate {
hourMinute := util.FileNameBase(hourMinuteEntry.Name())
if strings.Compare(hourMinute, stopHourMinute) > 0 {
break
}
}
// println("processing", hourMinuteEntry.FullPath)
chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.GetChunks())
if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, stopTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
continue
}
if VolumeNotFoundPattern.MatchString(err.Error()) {
glog.Warningf("skipping reading %s: %v", hourMinuteEntry.FullPath, err)
continue
}
return lastTsNs, isDone, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
}
chunkedFileReader.Close()
lastTsNs = logEntry.TsNs
if isDone {
return
}
}
return lastTsNs, isDone, nil
}
func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) {
for {
n, err := r.Read(sizeBuf)
if err != nil {
return lastTsNs, err
}
if n != 4 {
return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n)
}
size := util.BytesToUint32(sizeBuf)
// println("entry size", size)
entryData := make([]byte, size)
n, err = r.Read(entryData)
if err != nil {
return lastTsNs, err
}
if n != int(size) {
return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
}
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
return lastTsNs, err
}
if logEntry.TsNs <= startTsNs {
continue
}
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
return lastTsNs, err
}
// println("each log: ", logEntry.TsNs)
if _, err := eachLogEntryFn(logEntry); err != nil {
return lastTsNs, err
} else {
lastTsNs = logEntry.TsNs
}
}
return
}

View File

@@ -77,7 +77,13 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
PairMap: nil,
Jwt: assignResult.Auth,
}
uploadResult, err := operation.UploadData(data, uploadOption)
uploader, err := operation.NewUploader()
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
uploadResult, err := uploader.UploadData(data, uploadOption)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}

View File

@@ -0,0 +1,352 @@
package filer
import (
"container/heap"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/protobuf/proto"
"io"
"math"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type LogFileEntry struct {
TsNs int64
FileEntry *Entry
}
func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64) (v *OrderedLogVisitor, err error) {
if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs {
return nil, io.EOF
}
startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
if listDayErr != nil {
return nil, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
return NewOrderedLogVisitor(f, startPosition, stopTsNs, dayEntries)
}
// ----------
type LogEntryItem struct {
Entry *filer_pb.LogEntry
filer string
}
// LogEntryItemPriorityQueue a priority queue for LogEntry
type LogEntryItemPriorityQueue []*LogEntryItem
func (pq LogEntryItemPriorityQueue) Len() int { return len(pq) }
func (pq LogEntryItemPriorityQueue) Less(i, j int) bool {
return pq[i].Entry.TsNs < pq[j].Entry.TsNs
}
func (pq LogEntryItemPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
func (pq *LogEntryItemPriorityQueue) Push(x any) {
item := x.(*LogEntryItem)
*pq = append(*pq, item)
}
func (pq *LogEntryItemPriorityQueue) Pop() any {
n := len(*pq)
item := (*pq)[n-1]
*pq = (*pq)[:n-1]
return item
}
// ----------
type OrderedLogVisitor struct {
perFilerIteratorMap map[string]*LogFileQueueIterator
pq *LogEntryItemPriorityQueue
logFileEntryCollector *LogFileEntryCollector
}
func NewOrderedLogVisitor(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) (*OrderedLogVisitor, error) {
perFilerQueueMap := make(map[string]*LogFileQueueIterator)
// initialize the priority queue
pq := &LogEntryItemPriorityQueue{}
heap.Init(pq)
t := &OrderedLogVisitor{
perFilerIteratorMap: perFilerQueueMap,
pq: pq,
logFileEntryCollector: NewLogFileEntryCollector(f, startPosition, stopTsNs, dayEntries),
}
if err := t.logFileEntryCollector.collectMore(t); err != nil && err != io.EOF {
return nil, err
}
return t, nil
}
func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) {
if o.pq.Len() == 0 {
return nil, io.EOF
}
item := heap.Pop(o.pq).(*LogEntryItem)
filerId := item.filer
// fill the pq with the next log entry from the same filer
it := o.perFilerIteratorMap[filerId]
next, nextErr := it.getNext(o)
if nextErr != nil {
if nextErr == io.EOF {
// do nothing since the filer has no more log entries
}else {
return nil, fmt.Errorf("failed to get next log entry: %v", nextErr)
}
} else {
heap.Push(o.pq, &LogEntryItem{
Entry: next,
filer: filerId,
})
}
return item.Entry, nil
}
func getFilerId(name string) string {
idx := strings.LastIndex(name, ".")
if idx < 0 {
return ""
}
return name[idx+1:]
}
// ----------
type LogFileEntryCollector struct {
f *Filer
startTsNs int64
stopTsNs int64
dayEntryQueue *util.Queue[*Entry]
startDate string
startHourMinute string
stopDate string
stopHourMinute string
}
func NewLogFileEntryCollector(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) *LogFileEntryCollector {
dayEntryQueue := util.NewQueue[*Entry]()
for _, dayEntry := range dayEntries {
dayEntryQueue.Enqueue(dayEntry)
// println("enqueue day entry", dayEntry.Name())
}
startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
var stopDate, stopHourMinute string
if stopTsNs != 0 {
stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
}
return &LogFileEntryCollector{
f: f,
startTsNs: startPosition.UnixNano(),
stopTsNs: stopTsNs,
dayEntryQueue: dayEntryQueue,
startDate: startDate,
startHourMinute: startHourMinute,
stopDate: stopDate,
stopHourMinute: stopHourMinute,
}
}
func (c *LogFileEntryCollector) hasMore() bool {
return c.dayEntryQueue.Len() > 0
}
func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
dayEntry := c.dayEntryQueue.Dequeue()
if dayEntry == nil {
return io.EOF
}
// println("dequeue day entry", dayEntry.Name())
if c.stopDate != "" {
if strings.Compare(dayEntry.Name(), c.stopDate) > 0 {
return io.EOF
}
}
hourMinuteEntries, _, listHourMinuteErr := c.f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
if listHourMinuteErr != nil {
return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
freshFilerIds := make(map[string]string)
for _, hourMinuteEntry := range hourMinuteEntries {
// println("checking hh-mm", hourMinuteEntry.FullPath)
hourMinute := util.FileNameBase(hourMinuteEntry.Name())
if dayEntry.Name() == c.startDate {
if strings.Compare(hourMinute, c.startHourMinute) < 0 {
continue
}
}
if dayEntry.Name() == c.stopDate {
if strings.Compare(hourMinute, c.stopHourMinute) > 0 {
break
}
}
tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute)
// println(" enqueue", tsMinute)
t, parseErr := time.Parse("2006-01-02-15-04", tsMinute)
if parseErr != nil {
glog.Errorf("failed to parse %s: %v", tsMinute, parseErr)
continue
}
filerId := getFilerId(hourMinuteEntry.Name())
iter, found := v.perFilerIteratorMap[filerId]
if !found {
iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
v.perFilerIteratorMap[filerId] = iter
freshFilerIds[filerId] = hourMinuteEntry.Name()
}
iter.q.Enqueue(&LogFileEntry{
TsNs: t.UnixNano(),
FileEntry: hourMinuteEntry,
})
}
// fill the pq with the next log entry if it is a new filer
for filerId, entryName := range freshFilerIds {
iter, found := v.perFilerIteratorMap[filerId]
if !found {
glog.Errorf("Unexpected! failed to find iterator for filer %s", filerId)
continue
}
next, nextErr := iter.getNext(v)
if nextErr != nil {
if nextErr == io.EOF {
// do nothing since the filer has no more log entries
}else {
return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err)
}
} else {
heap.Push(v.pq, &LogEntryItem{
Entry: next,
filer: filerId,
})
}
}
return nil
}
// ----------
type LogFileQueueIterator struct {
q *util.Queue[*LogFileEntry]
masterClient *wdclient.MasterClient
startTsNs int64
stopTsNs int64
currentFileIterator *LogFileIterator
}
func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
return &LogFileQueueIterator{
q: q,
masterClient: masterClient,
startTsNs: startTsNs,
stopTsNs: stopTsNs,
}
}
// getNext will return io.EOF when done
func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) {
for {
if iter.currentFileIterator != nil {
logEntry, err = iter.currentFileIterator.getNext()
if err != io.EOF {
return
}
}
// now either iter.currentFileIterator is nil or err is io.EOF
if iter.q.Len() == 0 {
return nil, io.EOF
}
t := iter.q.Dequeue()
if t == nil {
continue
}
// skip the file if it is after the stopTsNs
if iter.stopTsNs != 0 && t.TsNs > iter.stopTsNs {
return nil, io.EOF
}
next := iter.q.Peek()
if next == nil {
if collectErr := v.logFileEntryCollector.collectMore(v); collectErr != nil && collectErr != io.EOF {
return nil, collectErr
}
}
// skip the file if the next entry is before the startTsNs
if next != nil && next.TsNs <= iter.startTsNs {
continue
}
iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
}
}
// ----------
type LogFileIterator struct {
r io.Reader
sizeBuf []byte
startTsNs int64
stopTsNs int64
}
func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
return &LogFileIterator{
r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
sizeBuf: make([]byte, 4),
startTsNs: startTsNs,
stopTsNs: stopTsNs,
}
}
// getNext will return io.EOF when done
func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
var n int
for {
n, err = iter.r.Read(iter.sizeBuf)
if err != nil {
return
}
if n != 4 {
return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n)
}
size := util.BytesToUint32(iter.sizeBuf)
// println("entry size", size)
entryData := make([]byte, size)
n, err = iter.r.Read(entryData)
if err != nil {
return
}
if n != int(size) {
return nil, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
}
logEntry = &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
return
}
if logEntry.TsNs <= iter.startTsNs {
continue
}
if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs {
return nil, io.EOF
}
return
}
}

View File

@@ -85,7 +85,7 @@ func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string,
func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
store = fsw.defaultStore
if path == "/" {
if path == "/" || path == "//" {
return
}
var storeId string
@@ -182,7 +182,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
}()
existingEntry, findErr := fsw.FindEntry(ctx, fp)
if findErr == filer_pb.ErrNotFound {
if findErr == filer_pb.ErrNotFound || existingEntry == nil {
return nil
}
if len(existingEntry.HardLinkId) != 0 {

View File

@@ -25,8 +25,9 @@ func init() {
}
type LevelDB2Store struct {
dbs []*leveldb.DB
dbCount int
dbs []*leveldb.DB
dbCount int
ReadOnly bool
}
func (store *LevelDB2Store) GetName() string {
@@ -49,6 +50,7 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
Filter: filter.NewBloomFilter(8), // false positive rate 0.02
ReadOnly: store.ReadOnly,
}
for d := 0; d < dbCount; d++ {

View File

@@ -31,9 +31,10 @@ func init() {
}
type LevelDB3Store struct {
dir string
dbs map[string]*leveldb.DB
dbsLock sync.RWMutex
dir string
dbs map[string]*leveldb.DB
dbsLock sync.RWMutex
ReadOnly bool
}
func (store *LevelDB3Store) GetName() string {
@@ -69,12 +70,14 @@ func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
Filter: bloom,
ReadOnly: store.ReadOnly,
}
if name != DEFAULT {
opts = &opt.Options{
BlockCacheCapacity: 16 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 8 * 1024 * 1024, // default value is 4MiB
Filter: bloom,
ReadOnly: store.ReadOnly,
}
}

View File

@@ -2,6 +2,7 @@ package filer
import (
"context"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -35,3 +36,40 @@ func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) err
return nil
}
// ParallelProcessDirectoryStructure processes each entry in parallel, and also ensure parent directories are processed first.
// This also assumes the parent directories are in the entryChan already.
func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry)(error)) (firstErr error) {
executors := util.NewLimitedConcurrentExecutor(concurrency)
var wg sync.WaitGroup
for entry := range entryChan {
wg.Add(1)
if entry.IsDirectory() {
func() {
defer wg.Done()
if err := eachEntryFn(entry); err != nil {
if firstErr == nil {
firstErr = err
}
}
}()
} else {
executors.Execute(func() {
defer wg.Done()
if err := eachEntryFn(entry); err != nil {
if firstErr == nil {
firstErr = err
}
}
})
}
if firstErr != nil {
break
}
}
wg.Wait()
return
}

View File

@@ -2,7 +2,6 @@ package filer
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
@@ -10,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type ReaderCache struct {
@@ -171,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() {
s.data = mem.Allocate(s.chunkSize)
_, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
_, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
if s.err != nil {
mem.Free(s.data)
s.data = nil

View File

@@ -1,7 +1,7 @@
package redis
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -1,7 +1,7 @@
package redis
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -7,7 +7,7 @@ import (
"strings"
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
)

View File

@@ -1,7 +1,7 @@
package redis2
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -1,7 +1,7 @@
package redis2
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
"time"

View File

@@ -1,7 +1,7 @@
package redis2
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -57,7 +57,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
if name != "" {
if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}).Err(); err != nil {
if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
)

View File

@@ -4,7 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/util/skiplist"
)
@@ -399,9 +399,9 @@ func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int {
func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error {
key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
var members []*redis.Z
var members []redis.Z
for _, name := range names {
members = append(members, &redis.Z{
members = append(members, redis.Z{
Score: 0,
Member: name,
})

View File

@@ -1,7 +1,7 @@
package redis3
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util/skiplist"
"google.golang.org/protobuf/proto"

View File

@@ -3,7 +3,7 @@ package redis3
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/glog"
)

View File

@@ -3,7 +3,7 @@ package redis3
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/stvp/tempredis"
"strconv"
"testing"
@@ -116,7 +116,7 @@ func BenchmarkRedis(b *testing.B) {
})
for i := 0; i < b.N; i++ {
client.ZAddNX(context.Background(), "/yyy/bin", &redis.Z{Score: 0, Member: strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx"})
client.ZAddNX(context.Background(), "/yyy/bin", redis.Z{Score: 0, Member: strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx"})
}
}
@@ -149,7 +149,7 @@ func xTestNameListAdd(t *testing.T) {
ts1 := time.Now()
for i := 0; i < N; i++ {
client.ZAddNX(context.Background(), "/x", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
client.ZAddNX(context.Background(), "/x", redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
}
ts2 := time.Now()
@@ -205,6 +205,6 @@ func xBenchmarkRedis(b *testing.B) {
})
for i := 0; i < b.N; i++ {
client.ZAddNX(context.Background(), "/xxx/bin", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
client.ZAddNX(context.Background(), "/xxx/bin", redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
}
}

View File

@@ -1,9 +1,9 @@
package redis3
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -3,9 +3,9 @@ package redis3
import (
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -1,9 +1,9 @@
package redis3
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -3,7 +3,7 @@ package redis3
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util/skiplist"
"google.golang.org/protobuf/proto"

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
redsync "github.com/go-redsync/redsync/v4"
"github.com/seaweedfs/seaweedfs/weed/filer"

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
)

View File

@@ -1,7 +1,7 @@
package redis_lua
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -1,7 +1,7 @@
package redis_lua
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
"time"

View File

@@ -1,7 +1,7 @@
package redis_lua
import (
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
)

View File

@@ -2,7 +2,7 @@ package stored_procedure
import (
_ "embed"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)
func init() {

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/redis_lua/stored_procedure"

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/seaweedfs/seaweedfs/weed/filer"
)

View File

@@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var getLookupFileIdBackoffSchedule = []time.Duration{
@@ -194,7 +195,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
return err
}
n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@@ -350,7 +351,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {

View File

@@ -72,7 +72,7 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) {
// ListBuckets
// apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN), "LIST"))
apiRouter.Methods("POST").Path("/").HandlerFunc(iama.iam.Auth(iama.DoActions, ACTION_ADMIN))
apiRouter.Methods(http.MethodPost).Path("/").HandlerFunc(iama.iam.Auth(iama.DoActions, ACTION_ADMIN))
//
// NotFound
apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)

View File

@@ -189,7 +189,7 @@ func TestDeleteUser(t *testing.T) {
func executeRequest(req *http.Request, v interface{}) (*httptest.ResponseRecorder, error) {
rr := httptest.NewRecorder()
apiRouter := mux.NewRouter().SkipClean(true)
apiRouter.Path("/").Methods("POST").HandlerFunc(ias.DoActions)
apiRouter.Path("/").Methods(http.MethodPost).HandlerFunc(ias.DoActions)
apiRouter.ServeHTTP(rr, req)
return rr, xml.Unmarshal(rr.Body.Bytes(), &v)
}

View File

@@ -66,8 +66,8 @@ func (fh *FileHandle) FullPath() util.FullPath {
return fp
}
func (fh *FileHandle) GetEntry() *filer_pb.Entry {
return fh.entry.GetEntry()
func (fh *FileHandle) GetEntry() *LockedEntry {
return fh.entry
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
@@ -90,13 +90,6 @@ func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entr
}
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
if fh.entry == nil {
return
}
fh.entry.AppendChunks(chunks)
}
@@ -105,9 +98,6 @@ func (fh *FileHandle) ReleaseHandle() {
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fh.dirtyPages.Destroy()
if IsDebugFileReadWrite {
fh.mirrorFile.Close()

View File

@@ -1,6 +1,7 @@
package mount
import (
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -8,7 +9,6 @@ import (
type FileHandleToInode struct {
sync.RWMutex
nextFh FileHandleId
inode2fh map[uint64]*FileHandle
fh2inode map[FileHandleId]uint64
}
@@ -17,7 +17,6 @@ func NewFileHandleToInode() *FileHandleToInode {
return &FileHandleToInode{
inode2fh: make(map[uint64]*FileHandle),
fh2inode: make(map[FileHandleId]uint64),
nextFh: 0,
}
}
@@ -43,14 +42,13 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
defer i.Unlock()
fh, found := i.inode2fh[inode]
if !found {
fh = newFileHandle(wfs, i.nextFh, inode, entry)
i.nextFh++
fh = newFileHandle(wfs, FileHandleId(util.RandomUint64()), inode, entry)
i.inode2fh[inode] = fh
i.fh2inode[fh.fh] = inode
} else {
fh.counter++
}
if fh.GetEntry() != entry {
if fh.GetEntry().GetEntry() != entry {
fh.SetEntry(entry)
}
return fh

View File

@@ -29,23 +29,19 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e
fileFullPath := fh.FullPath()
entry := fh.GetEntry()
if entry == nil {
return 0, 0, io.EOF
}
if entry.IsInRemoteOnly() {
glog.V(4).Infof("download remote entry %s", fileFullPath)
newEntry, err := fh.downloadRemoteEntry(entry)
err := fh.downloadRemoteEntry(entry)
if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
return 0, 0, err
}
entry = newEntry
}
fileSize := int64(entry.Attributes.FileSize)
if fileSize == 0 {
fileSize = int64(filer.FileSize(entry))
fileSize = int64(filer.FileSize(entry.GetEntry()))
}
if fileSize == 0 {
@@ -70,7 +66,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e
return int64(totalRead), ts, err
}
func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
func (fh *FileHandle) downloadRemoteEntry(entry *LockedEntry) (error) {
fileFullPath := fh.FullPath()
dir, _ := fileFullPath.DirAndName()
@@ -88,12 +84,12 @@ func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entr
return fmt.Errorf("CacheRemoteObjectToLocalCluster file %s: %v", fileFullPath, err)
}
entry = resp.Entry
entry.SetEntry(resp.Entry)
fh.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry))
return nil
})
return entry, err
return err
}

View File

@@ -16,6 +16,8 @@ func (le *LockedEntry) GetEntry() *filer_pb.Entry {
return le.Entry
}
// SetEntry sets the entry of the LockedEntry
// entry should never be nil
func (le *LockedEntry) SetEntry(entry *filer_pb.Entry) {
le.Lock()
defer le.Unlock()

View File

@@ -4,6 +4,7 @@ import (
"context"
"os"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
@@ -118,6 +119,9 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
if err != nil {
return nil, err
}
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)).Before(time.Now()) {
return nil, filer_pb.ErrNotFound
}
mc.mapIdFromFilerToLocal(entry)
return
}
@@ -143,6 +147,9 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)).Before(time.Now()) {
return true
}
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)
})

View File

@@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"strings"
)
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
@@ -57,12 +58,17 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
prefix := dir
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "mount",
ClientId: selfSignature,
ClientEpoch: 1,
SelfSignature: selfSignature,
PathPrefix: dir,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: lastTsNs,

View File

@@ -112,9 +112,6 @@ func NewSeaweedFileSystem(option *Option) *WFS {
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
// Recreate dirty pages
fh.dirtyPages.Destroy()
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
@@ -122,7 +119,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
// Update handle entry
newentry, status := wfs.maybeLoadEntry(filePath)
if status == fuse.OK {
if fh.GetEntry() != newentry {
if fh.GetEntry().GetEntry() != newentry {
fh.SetEntry(newentry)
}
}

View File

@@ -59,7 +59,7 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
fh.entryLock.RLock()
if entry := fh.GetEntry(); entry != nil {
if entry := fh.GetEntry().GetEntry(); entry != nil {
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
localEntry = filer.FromPbEntry(string(dirPath), entry)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/util"
"math"
"sync"
)
@@ -43,10 +44,7 @@ func NewDirectoryHandleToInode() *DirectoryHandleToInode {
}
func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) {
wfs.fhmap.Lock()
fh := wfs.fhmap.nextFh
wfs.fhmap.nextFh++
wfs.fhmap.Unlock()
fh := FileHandleId(util.RandomUint64())
wfs.dhmap.Lock()
defer wfs.dhmap.Unlock()
@@ -173,7 +171,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
entry = filer.FromPbEntry(string(dirPath), fh.GetEntry())
entry = filer.FromPbEntry(string(dirPath), fh.GetEntry().GetEntry())
}
wfs.outputFilerEntry(entryOut, inode, entry)
}

View File

@@ -38,10 +38,8 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
// lock the file until the proper offset was calculated
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Lseek", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()
fileSize := int64(filer.FileSize(fh.GetEntry()))
fileSize := int64(filer.FileSize(fh.GetEntry().GetEntry()))
offset := max(int64(in.Offset), 0)
glog.V(4).Infof(

View File

@@ -116,13 +116,8 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
entry := fh.GetEntry()
if entry == nil {
return nil
}
entry.Name = name // this flush may be just after a rename operation
if entry.Attributes != nil {
@@ -141,7 +136,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
request := &filer_pb.CreateEntryRequest{
Directory: string(dir),
Entry: entry,
Entry: entry.GetEntry(),
Signatures: []int32{wfs.signature},
SkipCheckParentDirectory: true,
}

View File

@@ -14,8 +14,12 @@ import (
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
uploader, err := operation.NewUploader()
if err != nil {
return
}
fileId, uploadResult, err, data := operation.UploadWithRetry(
fileId, uploadResult, err, data := uploader.UploadWithRetry(
wfs,
&filer_pb.AssignVolumeRequest{
Count: 1,

View File

@@ -13,6 +13,7 @@ import (
"math"
"sync/atomic"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
@@ -130,7 +131,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
for _, urlString := range urlStrings {
// TODO optimization opportunity: reuse the buffer
var data []byte
if data, _, err = util.Get(urlString); err == nil {
if data, _, err = util_http.Get(urlString); err == nil {
processed = true
if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
return

View File

@@ -55,7 +55,13 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
reader := util.NewBytesReader(data)
fileId, uploadResult, err, _ = operation.UploadWithRetry(
uploader, err := operation.NewUploader()
if err != nil {
return
}
fileId, uploadResult, err, _ = uploader.UploadWithRetry(
b,
&filer_pb.AssignVolumeRequest{
Count: 1,

View File

@@ -9,6 +9,7 @@ import (
"strings"
"sync"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -45,6 +46,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
func main() {
flag.Parse()
util_http.InitGlobalHttpClient()
config := &pub_client.PublisherConfiguration{
Topic: topic.NewTopic(*namespace, *t),

View File

@@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -86,6 +87,7 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
func main() {
flag.Parse()
util_http.InitGlobalHttpClient()
recordType := schema.RecordTypeBegin().
WithField("key", schema.TypeBytes).

View File

@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"strings"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -25,6 +26,7 @@ var (
func main() {
flag.Parse()
util_http.InitGlobalHttpClient()
subscriberConfig := &sub_client.SubscriberConfiguration{
ConsumerGroup: "test",

View File

@@ -13,6 +13,7 @@ import (
"google.golang.org/protobuf/proto"
"strings"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -51,6 +52,7 @@ func FromSchemaRecordValue(recordValue *schema_pb.RecordValue) *MyRecord {
func main() {
flag.Parse()
util_http.InitGlobalHttpClient()
subscriberConfig := &sub_client.SubscriberConfiguration{
ConsumerGroup: "test",

View File

@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -79,7 +80,7 @@ func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, g
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
results, err := DeleteFileIds(masterFn, usePublicUrl, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@@ -95,7 +96,7 @@ func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, g
}
func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (written int64, e error) {
req, err := http.NewRequest("GET", fileUrl, nil)
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
if err != nil {
return written, err
}
@@ -103,11 +104,11 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
}
resp, err := util.Do(req)
resp, err := util_http.Do(req)
if err != nil {
return written, err
}
defer util.CloseResponse(resp)
defer util_http.CloseResponse(resp)
switch resp.StatusCode {
case http.StatusRequestedRangeNotSatisfiable:

View File

@@ -28,8 +28,8 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
return fid[:commaIndex], fid[commaIndex+1:], nil
}
// DeleteFiles batch deletes a list of fileIds
func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
// DeleteFileIds batch deletes a list of fileIds
func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (results map[string]*LookupResult, err error) {
results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
@@ -43,11 +43,11 @@ func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.Di
return
}
return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
return DeleteFileIdsWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
}
func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
@@ -102,7 +102,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
if deleteResults, deleteErr := DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
err = deleteErr
} else if deleteResults != nil {
resultChan <- deleteResults
@@ -120,8 +120,8 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
return ret, err
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
// DeleteFileIdsAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {

View File

@@ -38,15 +38,11 @@ If the content is already compressed, need to know the content size.
*/
func TestCreateNeedleFromRequest(t *testing.T) {
mc := &MockClient{}
tmp := HttpClient
HttpClient = mc
defer func() {
HttpClient = tmp
}()
mockClient := &MockClient{}
uploader := newUploader(mockClient)
{
mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) {
assert.Equal(t, nil, err, "upload: %v", err)
assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime))
assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
@@ -62,7 +58,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption)
uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent))
}
@@ -73,7 +69,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
}
{
mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) {
assert.Equal(t, nil, err, "upload: %v", err)
assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
@@ -90,7 +86,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
Upload(bytes.NewReader(gzippedData), uploadOption)
uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
}
/*

View File

@@ -217,7 +217,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
PairMap: nil,
Jwt: jwt,
}
ret, e, _ := Upload(fi.Reader, uploadOption)
uploader, e := NewUploader()
if e != nil {
return 0, e
}
ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
if e != nil {
return 0, e
}
@@ -239,7 +245,13 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
PairMap: nil,
Jwt: jwt,
}
uploadResult, uploadError, _ := Upload(reader, uploadOption)
uploader, uploaderError := NewUploader()
if uploaderError != nil {
return 0, uploaderError
}
uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
if uploadError != nil {
return 0, uploadError
}
@@ -265,6 +277,12 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
PairMap: nil,
Jwt: jwt,
}
_, e = UploadData(buf, uploadOption)
uploader, e := NewUploader()
if e != nil {
return e
}
_, e = uploader.UploadData(buf, uploadOption)
return e
}

View File

@@ -9,7 +9,7 @@ import (
"io"
"mime"
"mime/multipart"
"net"
"sync"
"net/http"
"net/textproto"
"path/filepath"
@@ -21,6 +21,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
)
type UploadOption struct {
@@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN
}
}
var (
fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
uploader *Uploader
uploaderErr error
once sync.Once
)
// HTTPClient interface for testing
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
var (
HttpClient HTTPClient
)
// Uploader
type Uploader struct {
httpClient HTTPClient
}
func init() {
HttpClient = &http.Client{Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
func NewUploader() (*Uploader, error) {
once.Do(func () {
// With Dial context
var httpClient *util_http_client.HTTPClient
httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext)
if uploaderErr != nil {
uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr)
}
if httpClient != nil {
uploader = newUploader(httpClient)
}
})
return uploader, uploaderErr
}
func newUploader(httpClient HTTPClient) (*Uploader) {
return &Uploader{
httpClient: httpClient,
}
}
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
doUploadFunc := func() error {
var host string
@@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
uploadOption.Jwt = auth
var uploadErr error
uploadResult, uploadErr, data = doUpload(reader, uploadOption)
uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
@@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
return
}
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
uploadResult, err = retriedUploadData(data, option)
func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
uploadResult, err = uploader.retriedUploadData(data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
uploadResult, err, data = doUpload(reader, option)
func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
uploadResult, err, data = uploader.doUpload(reader, option)
return
}
func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
@@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul
return
}
}
uploadResult, uploadErr := retriedUploadData(data, option)
uploadResult, uploadErr := uploader.retriedUploadData(data, option)
return uploadResult, uploadErr, data
}
func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
uploadResult, err = doUploadData(data, option)
uploadResult, err = uploader.doUploadData(data, option)
if err == nil {
uploadResult.RetryCount = i
return
@@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR
return
}
func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
@@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
}
// upload data
uploadResult, err = upload_content(func(w io.Writer) (err error) {
uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
@@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
uploadResult.Size = uint32(clearDataLen)
} else {
// upload data
uploadResult, err = upload_content(func(w io.Writer) (err error) {
uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
@@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
return uploadResult, err
}
func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
@@ -325,7 +343,7 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
} else {
reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
}
req, postErr := http.NewRequest("POST", option.UploadUrl, reqReader)
req, postErr := http.NewRequest(http.MethodPost, option.UploadUrl, reqReader)
if postErr != nil {
glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr)
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
@@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
// print("+")
resp, post_err := HttpClient.Do(req)
defer util.CloseResponse(resp)
resp, post_err := uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
resp, post_err = HttpClient.Do(req)
defer util.CloseResponse(resp)
resp, post_err = uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)
}
}
if post_err != nil {

View File

@@ -54,6 +54,9 @@ service SeaweedFiler {
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
rpc TraverseBfsMetadata (TraverseBfsMetadataRequest) returns (stream TraverseBfsMetadataResponse) {
}
rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
}
@@ -218,6 +221,7 @@ message DeleteEntryRequest {
bool ignore_recursive_error = 6;
bool is_from_other_cluster = 7;
repeated int32 signatures = 8;
int64 if_not_modified_after = 9;
}
message DeleteEntryResponse {
@@ -341,6 +345,8 @@ message GetFilerConfigurationResponse {
string version = 11;
string cluster_id = 12;
string filer_group = 13;
int32 major_version = 14;
int32 minor_version = 15;
}
message SubscribeMetadataRequest {
@@ -360,6 +366,15 @@ message SubscribeMetadataResponse {
int64 ts_ns = 3;
}
message TraverseBfsMetadataRequest {
string directory = 1;
repeated string excluded_prefixes = 2;
}
message TraverseBfsMetadataResponse {
string directory = 1;
Entry entry = 2;
}
message LogEntry {
int64 ts_ns = 1;
int32 partition_key_hash = 2;

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,10 @@
package filer_pb
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"io"
"sync"
"time"
@@ -12,7 +15,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
K := 5
var jobQueueWg sync.WaitGroup
queue := util.NewQueue()
queue := util.NewQueue[util.FullPath]()
jobQueueWg.Add(1)
queue.Enqueue(parentPath)
terminates := make([]chan bool, K)
@@ -26,11 +29,11 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
return
default:
t := queue.Dequeue()
if t == nil {
if t == "" {
time.Sleep(329 * time.Millisecond)
continue
}
dir := t.(util.FullPath)
dir := t
processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn)
if processErr != nil {
err = processErr
@@ -47,7 +50,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
return
}
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue[util.FullPath], jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
@@ -65,3 +68,28 @@ func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queu
})
}
func StreamBfs(client SeaweedFilerClient, dir util.FullPath, olderThanTsNs int64, fn func(parentPath util.FullPath, entry *Entry)error) (err error) {
glog.V(0).Infof("TraverseBfsMetadata %v if before %v", dir, time.Unix(0, olderThanTsNs))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.TraverseBfsMetadata(ctx, &TraverseBfsMetadataRequest{
Directory: string(dir),
})
if err != nil {
return fmt.Errorf("traverse bfs metadata: %v", err)
}
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("traverse bfs metadata: %v", err)
}
if err := fn(util.FullPath(resp.Directory), resp.Entry); err != nil {
return err
}
}
return nil
}

View File

@@ -34,6 +34,7 @@ const (
SeaweedFiler_Statistics_FullMethodName = "/filer_pb.SeaweedFiler/Statistics"
SeaweedFiler_Ping_FullMethodName = "/filer_pb.SeaweedFiler/Ping"
SeaweedFiler_GetFilerConfiguration_FullMethodName = "/filer_pb.SeaweedFiler/GetFilerConfiguration"
SeaweedFiler_TraverseBfsMetadata_FullMethodName = "/filer_pb.SeaweedFiler/TraverseBfsMetadata"
SeaweedFiler_SubscribeMetadata_FullMethodName = "/filer_pb.SeaweedFiler/SubscribeMetadata"
SeaweedFiler_SubscribeLocalMetadata_FullMethodName = "/filer_pb.SeaweedFiler/SubscribeLocalMetadata"
SeaweedFiler_KvGet_FullMethodName = "/filer_pb.SeaweedFiler/KvGet"
@@ -64,6 +65,7 @@ type SeaweedFilerClient interface {
Statistics(ctx context.Context, in *StatisticsRequest, opts ...grpc.CallOption) (*StatisticsResponse, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
GetFilerConfiguration(ctx context.Context, in *GetFilerConfigurationRequest, opts ...grpc.CallOption) (*GetFilerConfigurationResponse, error)
TraverseBfsMetadata(ctx context.Context, in *TraverseBfsMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_TraverseBfsMetadataClient, error)
SubscribeMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeMetadataClient, error)
SubscribeLocalMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeLocalMetadataClient, error)
KvGet(ctx context.Context, in *KvGetRequest, opts ...grpc.CallOption) (*KvGetResponse, error)
@@ -265,8 +267,40 @@ func (c *seaweedFilerClient) GetFilerConfiguration(ctx context.Context, in *GetF
return out, nil
}
func (c *seaweedFilerClient) TraverseBfsMetadata(ctx context.Context, in *TraverseBfsMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_TraverseBfsMetadataClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[2], SeaweedFiler_TraverseBfsMetadata_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &seaweedFilerTraverseBfsMetadataClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SeaweedFiler_TraverseBfsMetadataClient interface {
Recv() (*TraverseBfsMetadataResponse, error)
grpc.ClientStream
}
type seaweedFilerTraverseBfsMetadataClient struct {
grpc.ClientStream
}
func (x *seaweedFilerTraverseBfsMetadataClient) Recv() (*TraverseBfsMetadataResponse, error) {
m := new(TraverseBfsMetadataResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *seaweedFilerClient) SubscribeMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeMetadataClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[2], SeaweedFiler_SubscribeMetadata_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[3], SeaweedFiler_SubscribeMetadata_FullMethodName, opts...)
if err != nil {
return nil, err
}
@@ -298,7 +332,7 @@ func (x *seaweedFilerSubscribeMetadataClient) Recv() (*SubscribeMetadataResponse
}
func (c *seaweedFilerClient) SubscribeLocalMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeLocalMetadataClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[3], SeaweedFiler_SubscribeLocalMetadata_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &SeaweedFiler_ServiceDesc.Streams[4], SeaweedFiler_SubscribeLocalMetadata_FullMethodName, opts...)
if err != nil {
return nil, err
}
@@ -411,6 +445,7 @@ type SeaweedFilerServer interface {
Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error)
TraverseBfsMetadata(*TraverseBfsMetadataRequest, SeaweedFiler_TraverseBfsMetadataServer) error
SubscribeMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeMetadataServer) error
SubscribeLocalMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeLocalMetadataServer) error
KvGet(context.Context, *KvGetRequest) (*KvGetResponse, error)
@@ -473,6 +508,9 @@ func (UnimplementedSeaweedFilerServer) Ping(context.Context, *PingRequest) (*Pin
func (UnimplementedSeaweedFilerServer) GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetFilerConfiguration not implemented")
}
func (UnimplementedSeaweedFilerServer) TraverseBfsMetadata(*TraverseBfsMetadataRequest, SeaweedFiler_TraverseBfsMetadataServer) error {
return status.Errorf(codes.Unimplemented, "method TraverseBfsMetadata not implemented")
}
func (UnimplementedSeaweedFilerServer) SubscribeMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeMetadataServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeMetadata not implemented")
}
@@ -789,6 +827,27 @@ func _SeaweedFiler_GetFilerConfiguration_Handler(srv interface{}, ctx context.Co
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_TraverseBfsMetadata_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(TraverseBfsMetadataRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SeaweedFilerServer).TraverseBfsMetadata(m, &seaweedFilerTraverseBfsMetadataServer{stream})
}
type SeaweedFiler_TraverseBfsMetadataServer interface {
Send(*TraverseBfsMetadataResponse) error
grpc.ServerStream
}
type seaweedFilerTraverseBfsMetadataServer struct {
grpc.ServerStream
}
func (x *seaweedFilerTraverseBfsMetadataServer) Send(m *TraverseBfsMetadataResponse) error {
return x.ServerStream.SendMsg(m)
}
func _SeaweedFiler_SubscribeMetadata_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeMetadataRequest)
if err := stream.RecvMsg(m); err != nil {
@@ -1056,6 +1115,11 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
Handler: _SeaweedFiler_StreamRenameEntry_Handler,
ServerStreams: true,
},
{
StreamName: "TraverseBfsMetadata",
Handler: _SeaweedFiler_TraverseBfsMetadata_Handler,
ServerStreams: true,
},
{
StreamName: "SubscribeMetadata",
Handler: _SeaweedFiler_SubscribeMetadata_Handler,

View File

@@ -17,6 +17,7 @@ const (
TrivialOnError EventErrorType = iota
FatalOnError
RetryForeverOnError
DontLogError
)
// MetadataFollowOption is used to control the behavior of the metadata following
@@ -96,6 +97,8 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
glog.Errorf("process %v: %v", resp, err)
return true
})
case DontLogError:
// pass
default:
glog.Errorf("process %v: %v", resp, err)
}

Some files were not shown because too many files have changed in this diff Show More