Merge branch 'master' into a

This commit is contained in:
eddy-gfx
2022-04-06 18:45:55 -05:00
committed by GitHub
65 changed files with 3144 additions and 1515 deletions

View File

@@ -9,9 +9,10 @@ import (
)
const (
MasterType = "master"
FilerType = "filer"
BrokerType = "broker"
MasterType = "master"
VolumeServerType = "volumeServer"
FilerType = "filer"
BrokerType = "broker"
)
type ClusterNode struct {

View File

@@ -89,6 +89,7 @@ func init() {
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
// start webdav on filer
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")

View File

@@ -134,7 +134,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
masterPeers := make(map[string]pb.ServerAddress)
for _, peer := range peers {
masterPeers[peer.String()] = peer
masterPeers[string(peer)] = peer
}
r := mux.NewRouter()

View File

@@ -29,6 +29,7 @@ type MountOptions struct {
readOnly *bool
debug *bool
debugPort *int
localSocket *string
}
var (
@@ -63,6 +64,7 @@ func init() {
mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only")
mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging")
mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-<mount_dir_hash>.sock")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")

View File

@@ -12,9 +12,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/mount/unmount"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc/reflection"
"net"
"net/http"
"os"
"os/user"
@@ -98,6 +101,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
unmount.Unmount(dir)
// start on local unix socket
if *option.localSocket == "" {
mountDirHash := util.HashToInt32([]byte(dir))
if mountDirHash < 0 {
mountDirHash = -mountDirHash
}
*option.localSocket = fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash)
if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
}
}
montSocketListener, err := net.Listen("unix", *option.localSocket)
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
}
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
@@ -229,6 +248,11 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
unmount.Unmount(dir)
})
grpcS := pb.NewGrpcServer()
mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
reflection.Register(grpcS)
go grpcS.Serve(montSocketListener)
seaweedFileSystem.StartBackgroundTasks()
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)

View File

@@ -24,17 +24,18 @@ var (
)
type S3Options struct {
filer *string
bindIp *string
port *int
config *string
domainName *string
tlsPrivateKey *string
tlsCertificate *string
metricsHttpPort *int
allowEmptyFolder *bool
auditLogConfig *string
localFilerSocket *string
filer *string
bindIp *string
port *int
config *string
domainName *string
tlsPrivateKey *string
tlsCertificate *string
metricsHttpPort *int
allowEmptyFolder *bool
allowDeleteBucketNotEmpty *bool
auditLogConfig *string
localFilerSocket *string
}
func init() {
@@ -49,6 +50,7 @@ func init() {
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
}
var cmdS3 = &Command{
@@ -178,14 +180,15 @@ func (s3opt *S3Options) startS3Server() bool {
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: filerAddress,
Port: *s3opt.port,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
LocalFilerSocket: s3opt.localFilerSocket,
Filer: filerAddress,
Port: *s3opt.port,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
LocalFilerSocket: s3opt.localFilerSocket,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)

View File

@@ -138,6 +138,7 @@ func init() {
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port")

View File

@@ -23,6 +23,9 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
}
func FileSize(entry *filer_pb.Entry) (size uint64) {
if entry == nil || entry.Attributes == nil {
return 0
}
fileSize := entry.Attributes.FileSize
if entry.RemoteEntry != nil {
if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime {

View File

@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"os"
"sync"
)
var (
@@ -14,11 +15,12 @@ var (
type ActualChunkIndex int
type SwapFile struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64
freeActualChunkList []ActualChunkIndex
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
logicToActualChunkIndexLock sync.Mutex
chunkSize int64
freeActualChunkList []ActualChunkIndex
}
type SwapFileChunk struct {
@@ -52,6 +54,8 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
return nil
}
}
sf.logicToActualChunkIndexLock.Lock()
defer sf.logicToActualChunkIndexLock.Unlock()
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
if len(sf.freeActualChunkList) > 0 {
@@ -72,6 +76,9 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
}
func (sc *SwapFileChunk) FreeResource() {
sc.swapfile.logicToActualChunkIndexLock.Lock()
defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
}

View File

@@ -187,6 +187,9 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
func (up *UploadPipeline) Shutdown() {
up.swapFile.FreeResource()
up.sealedChunksLock.Lock()
defer up.sealedChunksLock.Unlock()
for logicChunkIndex, sealedChunk := range up.sealedChunks {
sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex))
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
@@ -59,6 +60,7 @@ type WFS struct {
// https://dl.acm.org/doi/fullHtml/10.1145/3310148
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
mount_pb.UnimplementedSeaweedMountServer
fs.Inode
option *Option
metaCache *meta_cache.MetaCache
@@ -129,6 +131,9 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
}
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
if fh.entry.Attributes == nil {
fh.entry.Attributes = &filer_pb.FuseAttributes{}
}
return path, fh, fh.entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path)

View File

@@ -0,0 +1,17 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
)
func (wfs *WFS) Configure(ctx context.Context, request *mount_pb.ConfigureRequest) (*mount_pb.ConfigureResponse, error) {
if wfs.option.Collection == "" {
return nil, fmt.Errorf("mount quota only works when mounted to a new folder with a collection")
}
glog.V(0).Infof("quota changed from %d to %d", wfs.option.Quota, request.CollectionCapacity)
wfs.option.Quota = request.GetCollectionCapacity()
return &mount_pb.ConfigureResponse{}, nil
}

View File

@@ -10,12 +10,14 @@ import (
func (wfs *WFS) loopCheckQuota() {
if wfs.option.Quota <= 0 {
return
}
for {
time.Sleep(61 * time.Second)
if wfs.option.Quota <= 0 {
continue
}
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
@@ -47,7 +49,6 @@ func (wfs *WFS) loopCheckQuota() {
glog.Warningf("read quota usage: %v", err)
}
time.Sleep(61 * time.Second)
}
}

View File

@@ -8,6 +8,7 @@ gen:
protoc filer.proto --go_out=./filer_pb --go-grpc_out=./filer_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc remote.proto --go_out=./remote_pb --go-grpc_out=./remote_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc messaging.proto --go_out=./messaging_pb --go-grpc_out=./messaging_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
# protoc filer.proto --java_out=../../other/java/client/src/main/java
cp filer.proto ../../other/java/client/src/main/proto

View File

@@ -48,6 +48,9 @@ service SeaweedFiler {
rpc Statistics (StatisticsRequest) returns (StatisticsResponse) {
}
rpc Ping (PingRequest) returns (PingResponse) {
}
rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) {
}
@@ -311,6 +314,13 @@ message StatisticsResponse {
uint64 file_count = 6;
}
message PingRequest {
string target = 1; // default to ping itself
string target_type = 2;
}
message PingResponse {
}
message GetFilerConfigurationRequest {
}
message GetFilerConfigurationResponse {

File diff suppressed because it is too large Load Diff

View File

@@ -31,6 +31,7 @@ type SeaweedFilerClient interface {
CollectionList(ctx context.Context, in *CollectionListRequest, opts ...grpc.CallOption) (*CollectionListResponse, error)
DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error)
Statistics(ctx context.Context, in *StatisticsRequest, opts ...grpc.CallOption) (*StatisticsResponse, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
GetFilerConfiguration(ctx context.Context, in *GetFilerConfigurationRequest, opts ...grpc.CallOption) (*GetFilerConfigurationResponse, error)
SubscribeMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeMetadataClient, error)
SubscribeLocalMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeLocalMetadataClient, error)
@@ -212,6 +213,15 @@ func (c *seaweedFilerClient) Statistics(ctx context.Context, in *StatisticsReque
return out, nil
}
func (c *seaweedFilerClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) {
out := new(PingResponse)
err := c.cc.Invoke(ctx, "/filer_pb.SeaweedFiler/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *seaweedFilerClient) GetFilerConfiguration(ctx context.Context, in *GetFilerConfigurationRequest, opts ...grpc.CallOption) (*GetFilerConfigurationResponse, error) {
out := new(GetFilerConfigurationResponse)
err := c.cc.Invoke(ctx, "/filer_pb.SeaweedFiler/GetFilerConfiguration", in, out, opts...)
@@ -369,6 +379,7 @@ type SeaweedFilerServer interface {
CollectionList(context.Context, *CollectionListRequest) (*CollectionListResponse, error)
DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error)
Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error)
SubscribeMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeMetadataServer) error
SubscribeLocalMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeLocalMetadataServer) error
@@ -423,6 +434,9 @@ func (UnimplementedSeaweedFilerServer) DeleteCollection(context.Context, *Delete
func (UnimplementedSeaweedFilerServer) Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Statistics not implemented")
}
func (UnimplementedSeaweedFilerServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedSeaweedFilerServer) GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetFilerConfiguration not implemented")
}
@@ -700,6 +714,24 @@ func _SeaweedFiler_Statistics_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedFilerServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/filer_pb.SeaweedFiler/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedFilerServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
func _SeaweedFiler_GetFilerConfiguration_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetFilerConfigurationRequest)
if err := dec(in); err != nil {
@@ -909,6 +941,10 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
MethodName: "Statistics",
Handler: _SeaweedFiler_Statistics_Handler,
},
{
MethodName: "Ping",
Handler: _SeaweedFiler_Ping_Handler,
},
{
MethodName: "GetFilerConfiguration",
Handler: _SeaweedFiler_GetFilerConfiguration_Handler,

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"math/rand"
"net/http"
@@ -206,6 +207,14 @@ func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption g
}
func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpcDialOption grpc.DialOption, fn func(client volume_server_pb.VolumeServerClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, volumeServer.ToGrpcAddress(), grpcDialOption)
}
func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses {

View File

@@ -35,7 +35,8 @@ service Seaweed {
}
rpc ReleaseAdminToken (ReleaseAdminTokenRequest) returns (ReleaseAdminTokenResponse) {
}
rpc Ping (PingRequest) returns (PingResponse) {
}
}
//////////////////////////////////////////////////
@@ -140,6 +141,8 @@ message VolumeLocation {
string leader = 5; // optional when leader is not itself
string data_center = 6; // optional when DataCenter is in use
uint32 grpc_port = 7;
repeated uint32 new_ec_vids = 8;
repeated uint32 deleted_ec_vids = 9;
}
message ClusterNodeUpdate {
@@ -328,3 +331,10 @@ message ReleaseAdminTokenRequest {
}
message ReleaseAdminTokenResponse {
}
message PingRequest {
string target = 1; // default to ping itself
string target_type = 2;
}
message PingResponse {
}

File diff suppressed because it is too large Load Diff

View File

@@ -32,6 +32,7 @@ type SeaweedClient interface {
ListClusterNodes(ctx context.Context, in *ListClusterNodesRequest, opts ...grpc.CallOption) (*ListClusterNodesResponse, error)
LeaseAdminToken(ctx context.Context, in *LeaseAdminTokenRequest, opts ...grpc.CallOption) (*LeaseAdminTokenResponse, error)
ReleaseAdminToken(ctx context.Context, in *ReleaseAdminTokenRequest, opts ...grpc.CallOption) (*ReleaseAdminTokenResponse, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
}
type seaweedClient struct {
@@ -212,6 +213,15 @@ func (c *seaweedClient) ReleaseAdminToken(ctx context.Context, in *ReleaseAdminT
return out, nil
}
func (c *seaweedClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) {
out := new(PingResponse)
err := c.cc.Invoke(ctx, "/master_pb.Seaweed/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// SeaweedServer is the server API for Seaweed service.
// All implementations must embed UnimplementedSeaweedServer
// for forward compatibility
@@ -230,6 +240,7 @@ type SeaweedServer interface {
ListClusterNodes(context.Context, *ListClusterNodesRequest) (*ListClusterNodesResponse, error)
LeaseAdminToken(context.Context, *LeaseAdminTokenRequest) (*LeaseAdminTokenResponse, error)
ReleaseAdminToken(context.Context, *ReleaseAdminTokenRequest) (*ReleaseAdminTokenResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
mustEmbedUnimplementedSeaweedServer()
}
@@ -279,6 +290,9 @@ func (UnimplementedSeaweedServer) LeaseAdminToken(context.Context, *LeaseAdminTo
func (UnimplementedSeaweedServer) ReleaseAdminToken(context.Context, *ReleaseAdminTokenRequest) (*ReleaseAdminTokenResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReleaseAdminToken not implemented")
}
func (UnimplementedSeaweedServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedSeaweedServer) mustEmbedUnimplementedSeaweedServer() {}
// UnsafeSeaweedServer may be embedded to opt out of forward compatibility for this service.
@@ -560,6 +574,24 @@ func _Seaweed_ReleaseAdminToken_Handler(srv interface{}, ctx context.Context, de
return interceptor(ctx, in, info, handler)
}
func _Seaweed_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/master_pb.Seaweed/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
// Seaweed_ServiceDesc is the grpc.ServiceDesc for Seaweed service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -615,6 +647,10 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{
MethodName: "ReleaseAdminToken",
Handler: _Seaweed_ReleaseAdminToken_Handler,
},
{
MethodName: "Ping",
Handler: _Seaweed_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{

25
weed/pb/mount.proto Normal file
View File

@@ -0,0 +1,25 @@
syntax = "proto3";
package messaging_pb;
option go_package = "github.com/chrislusf/seaweedfs/weed/pb/mount_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "MountProto";
//////////////////////////////////////////////////
service SeaweedMount {
rpc Configure (ConfigureRequest) returns (ConfigureResponse) {
}
}
//////////////////////////////////////////////////
message ConfigureRequest {
int64 collection_capacity = 1;
}
message ConfigureResponse {
}

View File

@@ -0,0 +1,208 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.17.3
// source: mount.proto
package mount_pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ConfigureRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
CollectionCapacity int64 `protobuf:"varint,1,opt,name=collection_capacity,json=collectionCapacity,proto3" json:"collection_capacity,omitempty"`
}
func (x *ConfigureRequest) Reset() {
*x = ConfigureRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_mount_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ConfigureRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConfigureRequest) ProtoMessage() {}
func (x *ConfigureRequest) ProtoReflect() protoreflect.Message {
mi := &file_mount_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConfigureRequest.ProtoReflect.Descriptor instead.
func (*ConfigureRequest) Descriptor() ([]byte, []int) {
return file_mount_proto_rawDescGZIP(), []int{0}
}
func (x *ConfigureRequest) GetCollectionCapacity() int64 {
if x != nil {
return x.CollectionCapacity
}
return 0
}
type ConfigureResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *ConfigureResponse) Reset() {
*x = ConfigureResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_mount_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ConfigureResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConfigureResponse) ProtoMessage() {}
func (x *ConfigureResponse) ProtoReflect() protoreflect.Message {
mi := &file_mount_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConfigureResponse.ProtoReflect.Descriptor instead.
func (*ConfigureResponse) Descriptor() ([]byte, []int) {
return file_mount_proto_rawDescGZIP(), []int{1}
}
var File_mount_proto protoreflect.FileDescriptor
var file_mount_proto_rawDesc = []byte{
0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x22, 0x43, 0x0a, 0x10, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x2f, 0x0a, 0x13, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x61,
0x70, 0x61, 0x63, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x63, 0x6f,
0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79,
0x22, 0x13, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x5e, 0x0a, 0x0c, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64,
0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x4e, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75,
0x72, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x4f, 0x0a, 0x10, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64,
0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x0a, 0x4d, 0x6f, 0x75, 0x6e, 0x74,
0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77,
0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x6f,
0x75, 0x6e, 0x74, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_mount_proto_rawDescOnce sync.Once
file_mount_proto_rawDescData = file_mount_proto_rawDesc
)
func file_mount_proto_rawDescGZIP() []byte {
file_mount_proto_rawDescOnce.Do(func() {
file_mount_proto_rawDescData = protoimpl.X.CompressGZIP(file_mount_proto_rawDescData)
})
return file_mount_proto_rawDescData
}
var file_mount_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_mount_proto_goTypes = []interface{}{
(*ConfigureRequest)(nil), // 0: messaging_pb.ConfigureRequest
(*ConfigureResponse)(nil), // 1: messaging_pb.ConfigureResponse
}
var file_mount_proto_depIdxs = []int32{
0, // 0: messaging_pb.SeaweedMount.Configure:input_type -> messaging_pb.ConfigureRequest
1, // 1: messaging_pb.SeaweedMount.Configure:output_type -> messaging_pb.ConfigureResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_mount_proto_init() }
func file_mount_proto_init() {
if File_mount_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_mount_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ConfigureRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_mount_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ConfigureResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_mount_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_mount_proto_goTypes,
DependencyIndexes: file_mount_proto_depIdxs,
MessageInfos: file_mount_proto_msgTypes,
}.Build()
File_mount_proto = out.File
file_mount_proto_rawDesc = nil
file_mount_proto_goTypes = nil
file_mount_proto_depIdxs = nil
}

View File

@@ -0,0 +1,101 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
package mount_pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedMountClient is the client API for SeaweedMount service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type SeaweedMountClient interface {
Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error)
}
type seaweedMountClient struct {
cc grpc.ClientConnInterface
}
func NewSeaweedMountClient(cc grpc.ClientConnInterface) SeaweedMountClient {
return &seaweedMountClient{cc}
}
func (c *seaweedMountClient) Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) {
out := new(ConfigureResponse)
err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMount/Configure", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// SeaweedMountServer is the server API for SeaweedMount service.
// All implementations must embed UnimplementedSeaweedMountServer
// for forward compatibility
type SeaweedMountServer interface {
Configure(context.Context, *ConfigureRequest) (*ConfigureResponse, error)
mustEmbedUnimplementedSeaweedMountServer()
}
// UnimplementedSeaweedMountServer must be embedded to have forward compatible implementations.
type UnimplementedSeaweedMountServer struct {
}
func (UnimplementedSeaweedMountServer) Configure(context.Context, *ConfigureRequest) (*ConfigureResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Configure not implemented")
}
func (UnimplementedSeaweedMountServer) mustEmbedUnimplementedSeaweedMountServer() {}
// UnsafeSeaweedMountServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to SeaweedMountServer will
// result in compilation errors.
type UnsafeSeaweedMountServer interface {
mustEmbedUnimplementedSeaweedMountServer()
}
func RegisterSeaweedMountServer(s grpc.ServiceRegistrar, srv SeaweedMountServer) {
s.RegisterService(&SeaweedMount_ServiceDesc, srv)
}
func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ConfigureRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedMountServer).Configure(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/messaging_pb.SeaweedMount/Configure",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMountServer).Configure(ctx, req.(*ConfigureRequest))
}
return interceptor(ctx, in, info, handler)
}
// SeaweedMount_ServiceDesc is the grpc.ServiceDesc for SeaweedMount service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var SeaweedMount_ServiceDesc = grpc.ServiceDesc{
ServiceName: "messaging_pb.SeaweedMount",
HandlerType: (*SeaweedMountServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Configure",
Handler: _SeaweedMount_Configure_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "mount.proto",
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.6.1
// protoc v3.17.3
// source: remote.proto
package remote_pb

View File

@@ -89,7 +89,7 @@ func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
func (sa ServerAddresses) ToAddressMap() (addresses map[string]ServerAddress) {
addresses = make(map[string]ServerAddress)
for _, address := range sa.ToAddresses() {
addresses[address.String()] = address
addresses[string(address)] = address
}
return
}

View File

@@ -107,6 +107,10 @@ service VolumeServer {
rpc VolumeNeedleStatus (VolumeNeedleStatusRequest) returns (VolumeNeedleStatusResponse) {
}
rpc Ping (PingRequest) returns (PingResponse) {
}
}
//////////////////////////////////////////////////
@@ -573,3 +577,10 @@ message VolumeNeedleStatusResponse {
uint32 crc = 5;
string ttl = 6;
}
message PingRequest {
string target = 1; // default to ping itself
string target_type = 2;
}
message PingResponse {
}

File diff suppressed because it is too large Load Diff

View File

@@ -64,6 +64,7 @@ type VolumeServerClient interface {
// <experimental> query
Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (VolumeServer_QueryClient, error)
VolumeNeedleStatus(ctx context.Context, in *VolumeNeedleStatusRequest, opts ...grpc.CallOption) (*VolumeNeedleStatusResponse, error)
Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
}
type volumeServerClient struct {
@@ -664,6 +665,15 @@ func (c *volumeServerClient) VolumeNeedleStatus(ctx context.Context, in *VolumeN
return out, nil
}
func (c *volumeServerClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) {
out := new(PingResponse)
err := c.cc.Invoke(ctx, "/volume_server_pb.VolumeServer/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// VolumeServerServer is the server API for VolumeServer service.
// All implementations must embed UnimplementedVolumeServerServer
// for forward compatibility
@@ -714,6 +724,7 @@ type VolumeServerServer interface {
// <experimental> query
Query(*QueryRequest, VolumeServer_QueryServer) error
VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error)
Ping(context.Context, *PingRequest) (*PingResponse, error)
mustEmbedUnimplementedVolumeServerServer()
}
@@ -841,6 +852,9 @@ func (UnimplementedVolumeServerServer) Query(*QueryRequest, VolumeServer_QuerySe
func (UnimplementedVolumeServerServer) VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
}
func (UnimplementedVolumeServerServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedVolumeServerServer) mustEmbedUnimplementedVolumeServerServer() {}
// UnsafeVolumeServerServer may be embedded to opt out of forward compatibility for this service.
@@ -1604,6 +1618,24 @@ func _VolumeServer_VolumeNeedleStatus_Handler(srv interface{}, ctx context.Conte
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/volume_server_pb.VolumeServer/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).Ping(ctx, req.(*PingRequest))
}
return interceptor(ctx, in, info, handler)
}
// VolumeServer_ServiceDesc is the grpc.ServiceDesc for VolumeServer service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -1731,6 +1763,10 @@ var VolumeServer_ServiceDesc = grpc.ServiceDesc{
MethodName: "VolumeNeedleStatus",
Handler: _VolumeServer_VolumeNeedleStatus_Handler,
},
{
MethodName: "Ping",
Handler: _VolumeServer_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{

View File

@@ -12,11 +12,11 @@ import (
"time"
)
const slash = "/"
func ParseLocationName(remote string) (locationName string) {
if strings.HasSuffix(string(remote), "/") {
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 2)
remote = strings.TrimSuffix(remote, slash)
parts := strings.SplitN(remote, slash, 2)
if len(parts) >= 1 {
return parts[0]
}
@@ -25,35 +25,31 @@ func ParseLocationName(remote string) (locationName string) {
func parseBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
loc = &remote_pb.RemoteStorageLocation{}
if strings.HasSuffix(string(remote), "/") {
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 3)
remote = strings.TrimSuffix(remote, slash)
parts := strings.SplitN(remote, slash, 3)
if len(parts) >= 1 {
loc.Name = parts[0]
}
if len(parts) >= 2 {
loc.Bucket = parts[1]
}
loc.Path = string(remote[len(loc.Name)+1+len(loc.Bucket):])
loc.Path = remote[len(loc.Name)+1+len(loc.Bucket):]
if loc.Path == "" {
loc.Path = "/"
loc.Path = slash
}
return
}
func parseNoBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
loc = &remote_pb.RemoteStorageLocation{}
if strings.HasSuffix(string(remote), "/") {
remote = remote[:len(remote)-1]
}
parts := strings.SplitN(string(remote), "/", 2)
remote = strings.TrimSuffix(remote, slash)
parts := strings.SplitN(remote, slash, 2)
if len(parts) >= 1 {
loc.Name = parts[0]
}
loc.Path = string(remote[len(loc.Name):])
loc.Path = remote[len(loc.Name):]
if loc.Path == "" {
loc.Path = "/"
loc.Path = slash
}
return
}

View File

@@ -3,6 +3,7 @@ package s3api
import (
"context"
"encoding/xml"
"errors"
"fmt"
"math"
"net/http"
@@ -134,6 +135,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
w.Header().Set("Location", "/" + bucket)
writeSuccessResponseEmpty(w, r)
}
@@ -148,6 +150,15 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
}
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if !s3a.option.AllowDeleteBucketNotEmpty {
entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 1)
if err != nil {
return fmt.Errorf("failed to list bucket %s: %v", bucket, err)
}
if len(entries) > 0 {
return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code)
}
}
// delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
@@ -162,6 +173,15 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
return nil
})
if err != nil {
s3ErrorCode := s3err.ErrInternalError
if err.Error() == s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code {
s3ErrorCode = s3err.ErrBucketNotEmpty
}
s3err.WriteErrorResponse(w, r, s3ErrorCode)
return
}
err = s3a.rm(s3a.option.BucketsPath, bucket, false, true)
if err != nil {

View File

@@ -63,7 +63,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
if r.Header.Get("Expires") != "" {
if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedExpires)
return
}
}

View File

@@ -19,14 +19,15 @@ import (
)
type S3ApiServerOption struct {
Filer pb.ServerAddress
Port int
Config string
DomainName string
BucketsPath string
GrpcDialOption grpc.DialOption
AllowEmptyFolder bool
LocalFilerSocket *string
Filer pb.ServerAddress
Port int
Config string
DomainName string
BucketsPath string
GrpcDialOption grpc.DialOption
AllowEmptyFolder bool
AllowDeleteBucketNotEmpty bool
LocalFilerSocket *string
}
type S3ApiServer struct {

View File

@@ -3,7 +3,6 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"os"
"path/filepath"
"strconv"
@@ -357,128 +356,3 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
return &filer_pb.DeleteCollectionResponse{}, err
}
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
var output *master_pb.StatisticsResponse
err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
Replication: req.Replication,
Collection: req.Collection,
Ttl: req.Ttl,
DiskType: req.DiskType,
})
if grpcErr != nil {
return grpcErr
}
output = grpcResponse
return nil
})
if err != nil {
return nil, err
}
return &filer_pb.StatisticsResponse{
TotalSize: output.TotalSize,
UsedSize: output.UsedSize,
FileCount: output.FileCount,
}, nil
}
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{
Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
DirBuckets: fs.filer.DirBucketsPath,
Cipher: fs.filer.Cipher,
Signature: fs.filer.Signature,
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: util.Version(),
ClusterId: string(clusterId),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
return t, nil
}
func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
m := make(map[string]bool)
for _, tp := range req.Resources {
m[tp] = true
}
fs.brokersLock.Lock()
fs.brokers[clientName] = m
glog.V(0).Infof("+ broker %v", clientName)
fs.brokersLock.Unlock()
defer func() {
fs.brokersLock.Lock()
delete(fs.brokers, clientName)
glog.V(0).Infof("- broker %v: %v", clientName, err)
fs.brokersLock.Unlock()
}()
for {
if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
glog.V(0).Infof("send broker %v: %+v", clientName, err)
return err
}
// println("replied")
if _, err := stream.Recv(); err != nil {
glog.V(0).Infof("recv broker %v: %v", clientName, err)
return err
}
// println("received")
}
}
func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
resp = &filer_pb.LocateBrokerResponse{}
fs.brokersLock.Lock()
defer fs.brokersLock.Unlock()
var localBrokers []*filer_pb.LocateBrokerResponse_Resource
for b, m := range fs.brokers {
if _, found := m[req.Resource]; found {
resp.Found = true
resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
{
GrpcAddresses: b,
ResourceCount: int32(len(m)),
},
}
return
}
localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
GrpcAddresses: b,
ResourceCount: int32(len(m)),
})
}
resp.Resources = localBrokers
return resp, nil
}

View File

@@ -0,0 +1,164 @@
package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
var output *master_pb.StatisticsResponse
err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
Replication: req.Replication,
Collection: req.Collection,
Ttl: req.Ttl,
DiskType: req.DiskType,
})
if grpcErr != nil {
return grpcErr
}
output = grpcResponse
return nil
})
if err != nil {
return nil, err
}
return &filer_pb.StatisticsResponse{
TotalSize: output.TotalSize,
UsedSize: output.UsedSize,
FileCount: output.FileCount,
}, nil
}
func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) {
resp = &filer_pb.PingResponse{}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(ctx, &filer_pb.PingRequest{})
return err
})
}
if req.TargetType == cluster.VolumeServerType {
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
return err
})
}
if req.TargetType == cluster.MasterType {
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(ctx, &master_pb.PingRequest{})
return err
})
}
if pingErr != nil {
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
}
return
}
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{
Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
DirBuckets: fs.filer.DirBucketsPath,
Cipher: fs.filer.Cipher,
Signature: fs.filer.Signature,
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: util.Version(),
ClusterId: string(clusterId),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
return t, nil
}
func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
m := make(map[string]bool)
for _, tp := range req.Resources {
m[tp] = true
}
fs.brokersLock.Lock()
fs.brokers[clientName] = m
glog.V(0).Infof("+ broker %v", clientName)
fs.brokersLock.Unlock()
defer func() {
fs.brokersLock.Lock()
delete(fs.brokers, clientName)
glog.V(0).Infof("- broker %v: %v", clientName, err)
fs.brokersLock.Unlock()
}()
for {
if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
glog.V(0).Infof("send broker %v: %+v", clientName, err)
return err
}
// println("replied")
if _, err := stream.Recv(); err != nil {
glog.V(0).Infof("recv broker %v: %v", clientName, err)
return err
}
// println("received")
}
}
func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
resp = &filer_pb.LocateBrokerResponse{}
fs.brokersLock.Lock()
defer fs.brokersLock.Unlock()
var localBrokers []*filer_pb.LocateBrokerResponse_Resource
for b, m := range fs.brokers {
if _, found := m[req.Resource]; found {
resp.Found = true
resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
{
GrpcAddresses: b,
ResourceCount: int32(len(m)),
},
}
return
}
localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
GrpcAddresses: b,
ResourceCount: int32(len(m)),
})
}
resp.Resources = localBrokers
return resp, nil
}

View File

@@ -164,6 +164,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
var entry *filer.Entry
var newChunks []*filer_pb.FileChunk
var mergedChunks []*filer_pb.FileChunk
isAppend := isAppend(r)
@@ -186,7 +187,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
entry.FileSize += uint64(chunkOffset)
}
mergedChunks = append(entry.Chunks, fileChunks...)
newChunks = append(entry.Chunks, fileChunks...)
// TODO
if len(entry.Content) > 0 {
@@ -196,7 +197,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
} else {
glog.V(4).Infoln("saving", path)
mergedChunks = fileChunks
newChunks = fileChunks
entry = &filer.Entry{
FullPath: util.FullPath(path),
Attr: filer.Attr{
@@ -217,6 +218,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
// maybe concatenate small chunks into one whole chunk
mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
if replyerr != nil {
glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
mergedChunks = newChunks
}
// maybe compact entry chunks
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
if replyerr != nil {

View File

@@ -0,0 +1,11 @@
package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
//TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
return inputChunks, nil
}

View File

@@ -133,13 +133,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
message.NewVids = append(message.NewVids, s.Id)
message.NewEcVids = append(message.NewEcVids, s.Id)
}
for _, s := range heartbeat.DeletedEcShards {
if dn.HasVolumesById(needle.VolumeId(s.Id)) {
if dn.HasEcShards(needle.VolumeId(s.Id)) {
continue
}
message.DeletedVids = append(message.DeletedVids, s.Id)
message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
}
}
@@ -151,17 +151,17 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// broadcast the ec vid changes to master clients
for _, s := range newShards {
message.NewVids = append(message.NewVids, uint32(s.VolumeId))
message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
}
for _, s := range deletedShards {
if dn.HasVolumesById(s.VolumeId) {
continue
}
message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
}
}
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}

View File

@@ -3,7 +3,11 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"math/rand"
"sync"
"time"
@@ -142,3 +146,29 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re
}
return resp, nil
}
func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
resp = &master_pb.PingResponse{}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(ctx, &filer_pb.PingRequest{})
return err
})
}
if req.TargetType == cluster.VolumeServerType {
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
return err
})
}
if req.TargetType == cluster.MasterType {
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(ctx, &master_pb.PingRequest{})
return err
})
}
if pingErr != nil {
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
}
return
}

View File

@@ -3,6 +3,10 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -247,3 +251,29 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
return resp, nil
}
func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
resp = &volume_server_pb.PingResponse{}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(ctx, &filer_pb.PingRequest{})
return err
})
}
if req.TargetType == cluster.VolumeServerType {
pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
return err
})
}
if req.TargetType == cluster.MasterType {
pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(ctx, &master_pb.PingRequest{})
return err
})
}
if pingErr != nil {
pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
}
return
}

View File

@@ -0,0 +1,233 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"io"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func init() {
Commands = append(Commands, &commandClusterCheck{})
}
type commandClusterCheck struct {
}
func (c *commandClusterCheck) Name() string {
return "cluster.check"
}
func (c *commandClusterCheck) Help() string {
return `check current cluster network connectivity
cluster.check
`
}
func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
if err = clusterPsCommand.Parse(args); err != nil {
return nil
}
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos))
emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""]
hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"]
if !emptyDiskTypeFound && !hddDiskTypeFound || emptyDiskTypeDiskInfo.VolumeCount == 0 && hddDiskTypeDiskInfo.VolumeCount == 0 {
return fmt.Errorf("Need to a hdd disk type!")
}
// collect filers
var filers []pb.ServerAddress
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
for _, node := range resp.ClusterNodes {
filers = append(filers, pb.ServerAddress(node.Address))
}
return err
})
if err != nil {
return
}
fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers)
// collect volume servers
var volumeServers []pb.ServerAddress
t, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
for _, dc := range t.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
}
}
}
fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers)
// collect all masters
var masters []pb.ServerAddress
for _, master := range commandEnv.MasterClient.GetMasters() {
masters = append(masters, master)
}
// check from master to volume servers
for _, master := range masters {
for _, volumeServer := range volumeServers {
fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: string(volumeServer),
TargetType: cluster.VolumeServerType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// check between masters
for _, sourceMaster := range masters {
for _, targetMaster := range masters {
if sourceMaster == targetMaster {
continue
}
fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
_, err := client.Ping(context.Background(), &master_pb.PingRequest{
Target: string(targetMaster),
TargetType: cluster.MasterType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// check from volume servers to masters
for _, volumeServer := range volumeServers {
for _, master := range masters {
fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master))
err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
Target: string(master),
TargetType: cluster.MasterType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// check from filers to masters
for _, filer := range filers {
for _, master := range masters {
fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(master),
TargetType: cluster.MasterType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// check from filers to volume servers
for _, filer := range filers {
for _, volumeServer := range volumeServers {
fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(volumeServer),
TargetType: cluster.VolumeServerType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// check between volume servers
for _, sourceVolumeServer := range volumeServers {
for _, targetVolumeServer := range volumeServers {
if sourceVolumeServer == targetVolumeServer {
continue
}
fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
Target: string(targetVolumeServer),
TargetType: cluster.VolumeServerType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
// check between filers, and need to connect to itself
for _, sourceFiler := range filers {
for _, targetFiler := range filers {
fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(targetFiler),
TargetType: cluster.FilerType,
})
return err
})
if err == nil {
fmt.Fprintf(writer, "ok\n")
} else {
fmt.Fprintf(writer, "%v\n", err)
}
}
}
return nil
}

View File

@@ -95,7 +95,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
if !found && len(locations) > 0 {
return fmt.Errorf("volume %d not found", vid)
}

View File

@@ -0,0 +1,64 @@
package shell
import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/resolver/passthrough"
"io"
)
func init() {
Commands = append(Commands, &commandMountConfigure{})
}
type commandMountConfigure struct {
}
func (c *commandMountConfigure) Name() string {
return "mount.configure"
}
func (c *commandMountConfigure) Help() string {
return `configure the mount on current server
mount.configure -dir=<mount_directory>
This command connects with local mount via unix socket, so it can only run locally.
The "mount_directory" value needs to be exactly the same as how mount was started in "weed mount -dir=<mount_directory>"
`
}
func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
mountDir := mountConfigureCommand.String("dir", "", "the mount directory same as how \"weed mount -dir=<mount_directory>\" was started")
mountQuota := mountConfigureCommand.Int("quotaMB", 0, "the quota in MB")
if err = mountConfigureCommand.Parse(args); err != nil {
return nil
}
mountDirHash := util.HashToInt32([]byte(*mountDir))
if mountDirHash < 0 {
mountDirHash = -mountDirHash
}
localSocket := fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash)
clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
defer clientConn.Close()
client := mount_pb.NewSeaweedMountClient(clientConn)
_, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{
CollectionCapacity: int64(*mountQuota) * 1024 * 1024,
})
return
}

View File

@@ -12,6 +12,7 @@ import (
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -92,8 +93,27 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("failed to collect all volume locations: %v", err)
}
isBucketsPath := false
var fillerBucketsPath string
if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "" {
fillerBucketsPath, err = readFilerBucketsPath(commandEnv)
if err != nil {
return fmt.Errorf("read filer buckets path: %v", err)
}
if strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath) {
isBucketsPath = true
}
}
if err != nil {
return fmt.Errorf("read filer buckets path: %v", err)
}
// collect each volume file ids
for volumeId, vinfo := range volumeIdToVInfo {
if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) {
delete(volumeIdToVInfo, volumeId)
continue
}
err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)

View File

@@ -58,7 +58,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
for _, ecShards := range actualShards {
if dn.hasEcShards(ecShards.VolumeId) {
if dn.HasEcShards(ecShards.VolumeId) {
continue
}
@@ -79,7 +79,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
return
}
func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {

View File

@@ -5,7 +5,7 @@ import (
)
var (
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.96)
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.97)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)

View File

@@ -58,9 +58,13 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
var toFlush *dataToFlush
m.Lock()
defer func() {
m.Unlock()
if toFlush != nil {
m.flushChan <- toFlush
}
if m.notifyFn != nil {
m.notifyFn()
}
@@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64)
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
// glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos)
m.flushChan <- m.copyToFlush()
toFlush = m.copyToFlush()
m.startTime = ts
if len(m.buf) < size+4 {
m.buf = make([]byte, 2*size+4)
@@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() {
return
}
toFlush := m.copyToFlush()
m.flushChan <- toFlush
m.Unlock()
if toFlush != nil {
m.flushChan <- toFlush
}
}
}

View File

@@ -41,6 +41,11 @@ func (mc *MasterClient) GetMaster() pb.ServerAddress {
return mc.currentMaster
}
func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
mc.WaitUntilConnected()
return mc.masters
}
func (mc *MasterClient) WaitUntilConnected() {
for mc.currentMaster == "" {
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
@@ -154,6 +159,14 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
for _, newEcVid := range resp.VolumeLocation.NewEcVids {
glog.V(1).Infof("%s: %s masterClient adds ec volume %d", mc.clientType, loc.Url, newEcVid)
mc.addEcLocation(newEcVid, loc)
}
for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
glog.V(1).Infof("%s: %s masterClient removes ec volume %d", mc.clientType, loc.Url, deletedEcVid)
mc.deleteEcLocation(deletedEcVid, loc)
}
}
if resp.ClusterNodeUpdate != nil {

View File

@@ -36,16 +36,18 @@ func (l Location) ServerAddress() pb.ServerAddress {
type vidMap struct {
sync.RWMutex
vid2Locations map[uint32][]Location
DataCenter string
cursor int32
vid2Locations map[uint32][]Location
ecVid2Locations map[uint32][]Location
DataCenter string
cursor int32
}
func newVidMap(dataCenter string) vidMap {
return vidMap{
vid2Locations: make(map[uint32][]Location),
DataCenter: dataCenter,
cursor: -1,
vid2Locations: make(map[uint32][]Location),
ecVid2Locations: make(map[uint32][]Location),
DataCenter: dataCenter,
cursor: -1,
}
}
@@ -124,7 +126,13 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
vc.RLock()
defer vc.RUnlock()
glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
locations, found = vc.vid2Locations[vid]
if found && len(locations) > 0 {
return
}
locations, found = vc.ecVid2Locations[vid]
return
}
@@ -132,6 +140,8 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("+ volume id %d: %+v", vid, location)
locations, found := vc.vid2Locations[vid]
if !found {
vc.vid2Locations[vid] = []Location{location}
@@ -148,10 +158,34 @@ func (vc *vidMap) addLocation(vid uint32, location Location) {
}
func (vc *vidMap) addEcLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("+ ec volume id %d: %+v", vid, location)
locations, found := vc.ecVid2Locations[vid]
if !found {
vc.ecVid2Locations[vid] = []Location{location}
return
}
for _, loc := range locations {
if loc.Url == location.Url {
return
}
}
vc.ecVid2Locations[vid] = append(locations, location)
}
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("- volume id %d: %+v", vid, location)
locations, found := vc.vid2Locations[vid]
if !found {
return
@@ -165,3 +199,23 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
}
}
func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()
glog.V(4).Infof("- ec volume id %d: %+v", vid, location)
locations, found := vc.ecVid2Locations[vid]
if !found {
return
}
for i, loc := range locations {
if loc.Url == location.Url {
vc.ecVid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
break
}
}
}