Merge pull request #3 from chrislusf/master

This commit is contained in:
bingoohuang
2021-02-20 16:45:02 +08:00
committed by GitHub
20 changed files with 127 additions and 104 deletions

View File

@@ -52,7 +52,7 @@ type FilerOptions struct {
func init() {
cmdFiler.Run = runFiler // break init cycle
f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
@@ -83,6 +83,8 @@ func init() {
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port")
filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files")
filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files")
filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive")
filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")

View File

@@ -121,6 +121,8 @@ func init() {
webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port")
webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files")
webdavOptions.replication = cmdServer.Flag.String("webdav.replication", "", "replication to create the files")
webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive")
webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file")
webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")

View File

@@ -25,6 +25,8 @@ type WebDavOption struct {
filer *string
port *int
collection *string
replication *string
disk *string
tlsPrivateKey *string
tlsCertificate *string
cacheDir *string
@@ -36,6 +38,8 @@ func init() {
webDavStandaloneOptions.filer = cmdWebDav.Flag.String("filer", "localhost:8888", "filer server address")
webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port")
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.replication = cmdWebDav.Flag.String("replication", "", "replication to create the files")
webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
@@ -107,6 +111,8 @@ func (wo *WebDavOption) startWebDav() bool {
FilerGrpcAddress: filerGrpcAddress,
GrpcDialOption: grpcDialOption,
Collection: *wo.collection,
Replication: *wo.replication,
DiskType: *wo.disk,
Uid: uid,
Gid: gid,
Cipher: cipher,

View File

@@ -1,11 +0,0 @@
[elastic7]
enabled = true
servers = [
"http://localhost:9200",
]
username = ""
password = ""
sniff_enabled = false
healthcheck_enabled = false
# increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000

View File

@@ -46,17 +46,19 @@ func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) {
return fc.LoadFromBytes(entry.Content)
}
return fc.loadFromChunks(filer, entry.Chunks)
return fc.loadFromChunks(filer, entry.Content, entry.Chunks)
}
func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) {
data, err := filer.readEntry(chunks)
if err != nil {
glog.Errorf("read filer conf content: %v", err)
return
func (fc *FilerConf) loadFromChunks(filer *Filer, content []byte, chunks []*filer_pb.FileChunk) (err error) {
if len(content) == 0 {
content, err = filer.readEntry(chunks)
if err != nil {
glog.Errorf("read filer conf content: %v", err)
return
}
}
return fc.LoadFromBytes(data)
return fc.LoadFromBytes(content)
}
func (fc *FilerConf) LoadFromBytes(data []byte) (err error) {

View File

@@ -40,7 +40,7 @@ func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
fc := NewFilerConf()
err := fc.loadFromChunks(f, entry.Chunks)
err := fc.loadFromChunks(f, entry.Content, entry.Chunks)
if err != nil {
glog.Errorf("read filer conf chunks: %v", err)
return

View File

@@ -64,7 +64,6 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(Max_Message_Size),
grpc.MaxCallRecvMsgSize(Max_Message_Size),
grpc.WaitForReady(true),
),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // client ping server if no activity for this long

View File

@@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("No free volumes left!")
return nil, fmt.Errorf("no free volumes left for "+option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {

View File

@@ -113,7 +113,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
return
}
ms.vgLock.Lock()

View File

@@ -33,6 +33,7 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
Replication string
DiskType string
Uid uint32
Gid uint32
@@ -225,7 +226,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
Uid: fs.option.Uid,
Gid: fs.option.Gid,
Collection: fs.option.Collection,
Replication: "000",
Replication: fs.option.Replication,
TtlSec: 0,
},
},
@@ -381,7 +382,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Replication: f.fs.option.Replication,
Collection: f.fs.option.Collection,
DiskType: f.fs.option.DiskType,
Path: name,

View File

@@ -20,11 +20,11 @@ type commandVolumeTierMove struct {
}
func (c *commandVolumeTierMove) Name() string {
return "volume.tier.upload"
return "volume.tier.move"
}
func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another
return `<WIP> change a volume from one disk type to another
volume.tier.move -source=hdd -target=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
volume.tier.move -target=hdd [-collection=""] -volumeId=<volume_id>

View File

@@ -92,7 +92,7 @@ func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool
func printGenericHelp() {
msg :=
`Type: "help <command>" for help on <command>
`Type: "help <command>" for help on <command>. Most commands support "<command> -h" also for options.
`
fmt.Print(msg)

View File

@@ -6,9 +6,9 @@ import (
)
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
DiffDataCenterCount int
SameRackCount int `json:"node,omitempty"`
DiffRackCount int `json:"rack,omitempty"`
DiffDataCenterCount int `json:"dc,omitempty"`
}
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {

View File

@@ -207,7 +207,26 @@ func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
ret["PublicUrl"] = dn.PublicUrl
ret["Disks"] = dn.diskUsages.ToMap()
// aggregated volume info
var volumeCount, ecShardCount, maxVolumeCount int64
var volumeIds string
for _, diskUsage := range dn.diskUsages.usages {
volumeCount += diskUsage.volumeCount
ecShardCount += diskUsage.ecShardCount
maxVolumeCount += diskUsage.maxVolumeCount
}
for _, disk := range dn.Children() {
d := disk.(*Disk)
volumeIds += " " + d.GetVolumeIds()
}
ret["Volumes"] = volumeCount
ret["EcShards"] = ecShardCount
ret["Max"] = maxVolumeCount
ret["VolumeIds"] = volumeIds
return ret
}

View File

@@ -58,16 +58,6 @@ func (d *DiskUsages) negative() *DiskUsages {
return t
}
func (d *DiskUsages) ToMap() interface{} {
d.RLock()
defer d.RUnlock()
ret := make(map[string]interface{})
for diskType, diskUsage := range d.usages {
ret[diskType.String()] = diskUsage.ToMap()
}
return ret
}
func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo {
ret := make(map[string]*master_pb.DiskInfo)
for diskType, diskUsageCounts := range d.usages {
@@ -135,15 +125,6 @@ func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts {
}
}
func (diskUsage *DiskUsageCounts) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Volumes"] = diskUsage.volumeCount
ret["EcShards"] = diskUsage.ecShardCount
ret["Max"] = diskUsage.maxVolumeCount
ret["Free"] = diskUsage.FreeSpace()
return ret
}
func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
du.Lock()
defer du.Unlock()

View File

@@ -1,6 +1,7 @@
package topology
import (
"encoding/json"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
@@ -25,15 +26,15 @@ This package is created to resolve these replica placement issues:
*/
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
DiskType types.DiskType
Prealloacte int64
DataCenter string
Rack string
DataNode string
MemoryMapMaxSizeMb uint32
Collection string `json:"collection,omitempty"`
ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"`
Ttl *needle.TTL `json:"ttl,omitempty"`
DiskType types.DiskType `json:"disk,omitempty"`
Prealloacte int64 `json:"prealloacte,omitempty"`
DataCenter string `json:"dataCenter,omitempty"`
Rack string `json:"rack,omitempty"`
DataNode string `json:"dataNode,omitempty"`
MemoryMapMaxSizeMb uint32 `json:"memoryMapMaxSizeMb,omitempty"`
}
type VolumeGrowth struct {
@@ -41,7 +42,8 @@ type VolumeGrowth struct {
}
func (o *VolumeGrowOption) String() string {
return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode)
blob, _ := json.Marshal(o)
return string(blob)
}
func NewDefaultVolumeGrowth() *VolumeGrowth {