Merge branch 'master' into bptree
This commit is contained in:
@@ -62,7 +62,7 @@ func init() {
|
||||
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
|
||||
f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to")
|
||||
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
|
||||
f.portGrpc = cmdFiler.Flag.Int("port.grpc", 18888, "filer server grpc listen port")
|
||||
f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port")
|
||||
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
|
||||
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
|
||||
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
|
||||
@@ -87,7 +87,7 @@ func init() {
|
||||
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
|
||||
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
|
||||
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
|
||||
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
|
||||
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
|
||||
|
||||
// start webdav on filer
|
||||
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
|
||||
@@ -180,6 +180,9 @@ func (fo *FilerOptions) startFiler() {
|
||||
if *fo.publicPort != 0 {
|
||||
publicVolumeMux = http.NewServeMux()
|
||||
}
|
||||
if *fo.portGrpc == 0 {
|
||||
*fo.portGrpc = 10000 + *fo.port
|
||||
}
|
||||
|
||||
defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
|
||||
|
||||
|
||||
@@ -71,12 +71,12 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
|
||||
}
|
||||
|
||||
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
|
||||
if filterFunc == nil {
|
||||
return true
|
||||
}
|
||||
if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
|
||||
return false
|
||||
}
|
||||
if filterFunc == nil {
|
||||
return true
|
||||
}
|
||||
if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ type MasterOptions struct {
|
||||
func init() {
|
||||
cmdMaster.Run = runMaster // break init cycle
|
||||
m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
|
||||
m.portGrpc = cmdMaster.Flag.Int("port.grpc", 19333, "grpc listen port")
|
||||
m.portGrpc = cmdMaster.Flag.Int("port.grpc", 0, "grpc listen port")
|
||||
m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier")
|
||||
m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to")
|
||||
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
|
||||
@@ -113,6 +113,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
|
||||
|
||||
backend.LoadConfiguration(util.GetViper())
|
||||
|
||||
if *masterOption.portGrpc == 0 {
|
||||
*masterOption.portGrpc = 10000 + *masterOption.port
|
||||
}
|
||||
|
||||
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
|
||||
|
||||
r := mux.NewRouter()
|
||||
|
||||
@@ -23,7 +23,7 @@ var (
|
||||
func init() {
|
||||
cmdMasterFollower.Run = runMasterFollower // break init cycle
|
||||
mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port")
|
||||
mf.portGrpc = cmdMasterFollower.Flag.Int("port.grpc", 19334, "grpc listen port")
|
||||
mf.portGrpc = cmdMasterFollower.Flag.Int("port.grpc", 0, "grpc listen port")
|
||||
mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to")
|
||||
mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
|
||||
|
||||
@@ -70,6 +70,10 @@ func runMasterFollower(cmd *Command, args []string) bool {
|
||||
util.LoadConfiguration("security", false)
|
||||
util.LoadConfiguration("master", false)
|
||||
|
||||
if *mf.portGrpc == 0 {
|
||||
*mf.portGrpc = 10000 + *mf.port
|
||||
}
|
||||
|
||||
startMasterFollower(mf)
|
||||
|
||||
return true
|
||||
|
||||
@@ -42,7 +42,7 @@ func init() {
|
||||
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
|
||||
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
|
||||
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
|
||||
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders")
|
||||
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
|
||||
}
|
||||
|
||||
var cmdS3 = &Command{
|
||||
|
||||
@@ -86,7 +86,7 @@ func init() {
|
||||
serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging")
|
||||
|
||||
masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
|
||||
masterOptions.portGrpc = cmdServer.Flag.Int("master.port.grpc", 19333, "master server grpc listen port")
|
||||
masterOptions.portGrpc = cmdServer.Flag.Int("master.port.grpc", 0, "master server grpc listen port")
|
||||
masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
|
||||
masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
|
||||
masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
|
||||
@@ -99,7 +99,7 @@ func init() {
|
||||
|
||||
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
||||
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
||||
filerOptions.portGrpc = cmdServer.Flag.Int("filer.port.grpc", 18888, "filer server grpc listen port")
|
||||
filerOptions.portGrpc = cmdServer.Flag.Int("filer.port.grpc", 0, "filer server grpc listen port")
|
||||
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
|
||||
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
|
||||
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
|
||||
@@ -111,7 +111,7 @@ func init() {
|
||||
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
|
||||
|
||||
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
|
||||
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 18080, "volume server grpc listen port")
|
||||
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
|
||||
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
|
||||
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
|
||||
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
|
||||
@@ -132,7 +132,7 @@ func init() {
|
||||
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
|
||||
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
|
||||
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
|
||||
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
|
||||
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
|
||||
|
||||
webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port")
|
||||
webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files")
|
||||
|
||||
@@ -70,7 +70,7 @@ type VolumeServerOptions struct {
|
||||
func init() {
|
||||
cmdVolume.Run = runVolume // break init cycle
|
||||
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
|
||||
v.portGrpc = cmdVolume.Flag.Int("port.grpc", 18080, "grpc listen port")
|
||||
v.portGrpc = cmdVolume.Flag.Int("port.grpc", 0, "grpc listen port")
|
||||
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
|
||||
v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
|
||||
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
|
||||
@@ -197,6 +197,9 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
|
||||
if *v.publicPort == 0 {
|
||||
*v.publicPort = *v.port
|
||||
}
|
||||
if *v.portGrpc == 0 {
|
||||
*v.portGrpc = 10000 + *v.port
|
||||
}
|
||||
if *v.publicUrl == "" {
|
||||
*v.publicUrl = util.JoinHostPort(*v.ip, *v.publicPort)
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ type Filer struct {
|
||||
Signature int32
|
||||
FilerConf *FilerConf
|
||||
RemoteStorage *FilerRemoteStorage
|
||||
UniqueFileId uint32
|
||||
}
|
||||
|
||||
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
|
||||
@@ -54,6 +55,7 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
|
||||
GrpcDialOption: grpcDialOption,
|
||||
FilerConf: NewFilerConf(),
|
||||
RemoteStorage: NewFilerRemoteStorage(),
|
||||
UniqueFileId: uint32(util.RandomInt32()),
|
||||
}
|
||||
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
|
||||
f.metaLogCollection = collection
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -92,8 +93,8 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
|
||||
|
||||
startTime, stopTime = startTime.UTC(), stopTime.UTC()
|
||||
|
||||
targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
|
||||
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
|
||||
targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir,
|
||||
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId,
|
||||
// startTime.Second(), startTime.Nanosecond(),
|
||||
)
|
||||
|
||||
@@ -111,7 +112,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
|
||||
|
||||
startTime = startTime.UTC()
|
||||
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
|
||||
startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
|
||||
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
|
||||
|
||||
sizeBuf := make([]byte, 4)
|
||||
startTsNs := startTime.UnixNano()
|
||||
@@ -122,14 +123,15 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
|
||||
}
|
||||
for _, dayEntry := range dayEntries {
|
||||
// println("checking day", dayEntry.FullPath)
|
||||
hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "")
|
||||
hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
|
||||
if listHourMinuteErr != nil {
|
||||
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
|
||||
}
|
||||
for _, hourMinuteEntry := range hourMinuteEntries {
|
||||
// println("checking hh-mm", hourMinuteEntry.FullPath)
|
||||
if dayEntry.Name() == startDate {
|
||||
if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
|
||||
hourMinute := util.FileNameBase(hourMinuteEntry.Name())
|
||||
if strings.Compare(hourMinute, startHourMinute) < 0 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,8 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
|
||||
Gid: OS_GID,
|
||||
},
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("find %s: %v", fullpath, err)
|
||||
} else {
|
||||
offset = int64(TotalSize(entry.Chunks))
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
|
||||
}
|
||||
dir := event.Directory
|
||||
// println("received meta change", dir, "size", len(data))
|
||||
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0)
|
||||
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
|
||||
if maybeReplicateMetadataChange != nil {
|
||||
maybeReplicateMetadataChange(event)
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/tecbot/gorocksdb"
|
||||
gorocksdb "github.com/linxGnu/grocksdb"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
|
||||
@@ -5,7 +5,7 @@ package rocksdb
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/tecbot/gorocksdb"
|
||||
gorocksdb "github.com/linxGnu/grocksdb"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
)
|
||||
@@ -38,3 +38,8 @@ func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []by
|
||||
func (t *TTLFilter) Name() string {
|
||||
return "TTLFilter"
|
||||
}
|
||||
func (t *TTLFilter) SetIgnoreSnapshots(value bool) {
|
||||
}
|
||||
|
||||
func (t *TTLFilter) Destroy() {
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
||||
"io"
|
||||
"strings"
|
||||
@@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
|
||||
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
|
||||
startTime = startTime.UTC()
|
||||
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
|
||||
startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
|
||||
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
|
||||
|
||||
sizeBuf := make([]byte, 4)
|
||||
startTsNs := startTime.UnixNano()
|
||||
@@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
|
||||
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
|
||||
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
|
||||
if dayEntry.Name == startDate {
|
||||
if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
|
||||
hourMinute := util.FileNameBase(hourMinuteEntry.Name)
|
||||
if strings.Compare(hourMinute, startHourMinute) < 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ type ServerAddress string
|
||||
type ServerAddresses string
|
||||
|
||||
func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
|
||||
if grpcPort == port+10000 {
|
||||
if grpcPort == 0 || grpcPort == port+10000 {
|
||||
return ServerAddress(util.JoinHostPort(host, port))
|
||||
}
|
||||
return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort))
|
||||
|
||||
@@ -58,6 +58,8 @@ service VolumeServer {
|
||||
}
|
||||
rpc WriteNeedleBlob (WriteNeedleBlobRequest) returns (WriteNeedleBlobResponse) {
|
||||
}
|
||||
rpc ReadAllNeedles (ReadAllNeedlesRequest) returns (stream ReadAllNeedlesResponse) {
|
||||
}
|
||||
|
||||
rpc VolumeTailSender (VolumeTailSenderRequest) returns (stream VolumeTailSenderResponse) {
|
||||
}
|
||||
@@ -284,6 +286,16 @@ message WriteNeedleBlobRequest {
|
||||
message WriteNeedleBlobResponse {
|
||||
}
|
||||
|
||||
message ReadAllNeedlesRequest {
|
||||
repeated uint32 volume_ids = 1;
|
||||
}
|
||||
message ReadAllNeedlesResponse {
|
||||
uint32 volume_id = 1;
|
||||
uint64 needle_id = 2;
|
||||
uint32 cookie = 3;
|
||||
bytes needle_blob = 5;
|
||||
}
|
||||
|
||||
message VolumeTailSenderRequest {
|
||||
uint32 volume_id = 1;
|
||||
uint64 since_ns = 2;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -49,11 +49,6 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
||||
|
||||
if processedTsNs != 0 {
|
||||
lastReadTime = time.Unix(0, processedTsNs)
|
||||
} else {
|
||||
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
|
||||
time.Sleep(1127 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||
@@ -66,6 +61,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
||||
}, eachLogEntryFn)
|
||||
if readInMemoryLogErr != nil {
|
||||
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
|
||||
time.Sleep(1127 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
|
||||
|
||||
36
weed/server/volume_grpc_read_all.go
Normal file
36
weed/server/volume_grpc_read_all.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) {
|
||||
|
||||
for _, vid := range req.VolumeIds {
|
||||
if err := vs.streaReadOneVolume(needle.VolumeId(vid), stream, err); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer, err error) error {
|
||||
v := vs.store.GetVolume(vid)
|
||||
if v == nil {
|
||||
return fmt.Errorf("not found volume id %d", vid)
|
||||
}
|
||||
|
||||
scanner := &storage.VolumeFileScanner4ReadAll{
|
||||
Stream: stream,
|
||||
V: v,
|
||||
}
|
||||
|
||||
offset := int64(v.SuperBlock.BlockSize())
|
||||
|
||||
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, offset, scanner)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -208,6 +208,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
|
||||
|
||||
}
|
||||
|
||||
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
|
||||
var resp *master_pb.LookupVolumeResponse
|
||||
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
|
||||
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.VolumeIdLocations, nil
|
||||
}
|
||||
|
||||
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
|
||||
|
||||
var resp *master_pb.VolumeListResponse
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
@@ -56,6 +58,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
|
||||
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
||||
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
|
||||
retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
|
||||
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
|
||||
|
||||
if err = volFixReplicationCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -66,44 +70,87 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
|
||||
|
||||
takeAction := !*skipChange
|
||||
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
underReplicatedVolumeIdsCount := 1
|
||||
for underReplicatedVolumeIdsCount > 0 {
|
||||
fixedVolumeReplicas := map[string]int{}
|
||||
|
||||
// find all volumes that needs replication
|
||||
// collect all data nodes
|
||||
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(allLocations) == 0 {
|
||||
return fmt.Errorf("no data nodes at all")
|
||||
}
|
||||
// find all volumes that needs replication
|
||||
// collect all data nodes
|
||||
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
||||
|
||||
// find all under replicated volumes
|
||||
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
|
||||
for vid, replicas := range volumeReplicas {
|
||||
replica := replicas[0]
|
||||
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
||||
if replicaPlacement.GetCopyCount() > len(replicas) {
|
||||
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
|
||||
} else if replicaPlacement.GetCopyCount() < len(replicas) {
|
||||
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
|
||||
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
||||
if len(allLocations) == 0 {
|
||||
return fmt.Errorf("no data nodes at all")
|
||||
}
|
||||
|
||||
// find all under replicated volumes
|
||||
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
|
||||
for vid, replicas := range volumeReplicas {
|
||||
replica := replicas[0]
|
||||
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
||||
if replicaPlacement.GetCopyCount() > len(replicas) {
|
||||
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
|
||||
} else if replicaPlacement.GetCopyCount() < len(replicas) {
|
||||
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
|
||||
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
||||
}
|
||||
}
|
||||
|
||||
if len(overReplicatedVolumeIds) > 0 {
|
||||
if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
|
||||
if underReplicatedVolumeIdsCount > 0 {
|
||||
// find the most under populated data nodes
|
||||
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if *skipChange {
|
||||
break
|
||||
}
|
||||
|
||||
// check that the topology has been updated
|
||||
if len(fixedVolumeReplicas) > 0 {
|
||||
fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
|
||||
for k, _ := range fixedVolumeReplicas {
|
||||
fixedVolumes = append(fixedVolumes, k)
|
||||
}
|
||||
volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, volumeIdLocation := range volumeIdLocations {
|
||||
volumeId := volumeIdLocation.VolumeOrFileId
|
||||
volumeIdLocationCount := len(volumeIdLocation.Locations)
|
||||
i := 0
|
||||
for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
|
||||
fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
|
||||
time.Sleep(time.Duration(i+1) * time.Second * 7)
|
||||
volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
volumeIdLocationCount = len(volumeLocIds[0].Locations)
|
||||
if *retryCount > i {
|
||||
return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(overReplicatedVolumeIds) > 0 {
|
||||
return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
|
||||
}
|
||||
|
||||
if len(underReplicatedVolumeIds) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// find the most under populated data nodes
|
||||
return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
|
||||
@@ -156,16 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) {
|
||||
|
||||
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
|
||||
fixedVolumes = map[string]int{}
|
||||
if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
|
||||
underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
|
||||
}
|
||||
for _, vid := range underReplicatedVolumeIds {
|
||||
for i := 0; i < retryCount+1; i++ {
|
||||
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
|
||||
if takeAction {
|
||||
fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return fixedVolumes, nil
|
||||
}
|
||||
|
||||
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
|
||||
|
||||
@@ -47,7 +47,7 @@ func downloadFromS3(sess s3iface.S3API, destFileName string, sourceBucket string
|
||||
Key: aws.String(sourceKey),
|
||||
})
|
||||
if err != nil {
|
||||
return fileSize, fmt.Errorf("failed to download file %s: %v", destFileName, err)
|
||||
return fileSize, fmt.Errorf("failed to download /buckets/%s%s to %s: %v", sourceBucket, sourceKey, destFileName, err)
|
||||
}
|
||||
|
||||
glog.V(1).Infof("downloaded file %s\n", destFileName)
|
||||
|
||||
@@ -31,9 +31,9 @@ type Needle struct {
|
||||
Data []byte `comment:"The actual file data"`
|
||||
Flags byte `comment:"boolean flags"` //version2
|
||||
NameSize uint8 //version2
|
||||
Name []byte `comment:"maximum 256 characters"` //version2
|
||||
Name []byte `comment:"maximum 255 characters"` //version2
|
||||
MimeSize uint8 //version2
|
||||
Mime []byte `comment:"maximum 256 characters"` //version2
|
||||
Mime []byte `comment:"maximum 255 characters"` //version2
|
||||
PairsSize uint16 //version2
|
||||
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
|
||||
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
|
||||
|
||||
42
weed/storage/volume_read_all.go
Normal file
42
weed/storage/volume_read_all.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
|
||||
)
|
||||
|
||||
type VolumeFileScanner4ReadAll struct {
|
||||
Stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
|
||||
V *Volume
|
||||
}
|
||||
|
||||
func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error {
|
||||
return nil
|
||||
|
||||
}
|
||||
func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
|
||||
|
||||
nv, ok := scanner.V.nm.Get(n.Id)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if nv.Offset.ToActualOffset() != offset {
|
||||
return nil
|
||||
}
|
||||
|
||||
sendErr := scanner.Stream.Send(&volume_server_pb.ReadAllNeedlesResponse{
|
||||
VolumeId: uint32(scanner.V.Id),
|
||||
NeedleId: uint64(n.Id),
|
||||
Cookie: uint32(n.Cookie),
|
||||
NeedleBlob: n.Data,
|
||||
})
|
||||
if sendErr != nil {
|
||||
return sendErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -302,7 +302,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
|
||||
}
|
||||
counter++
|
||||
if rand.Intn(counter) < 1 {
|
||||
vid, locationList = v, volumeLocationList
|
||||
vid, locationList = v, volumeLocationList.Copy()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.69)
|
||||
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.70)
|
||||
VERSION = sizeLimit + " " + VERSION_NUMBER
|
||||
COMMIT = ""
|
||||
)
|
||||
|
||||
@@ -87,3 +87,11 @@ func ResolvePath(path string) string {
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func FileNameBase(filename string) string {
|
||||
lastDotIndex := strings.LastIndex(filename, ".")
|
||||
if lastDotIndex < 0 {
|
||||
return filename
|
||||
}
|
||||
return filename[:lastDotIndex]
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi
|
||||
return lb
|
||||
}
|
||||
|
||||
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
|
||||
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
|
||||
|
||||
m.Lock()
|
||||
defer func() {
|
||||
@@ -68,20 +68,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
|
||||
|
||||
// need to put the timestamp inside the lock
|
||||
var ts time.Time
|
||||
if eventTsNs == 0 {
|
||||
if processingTsNs == 0 {
|
||||
ts = time.Now()
|
||||
eventTsNs = ts.UnixNano()
|
||||
processingTsNs = ts.UnixNano()
|
||||
} else {
|
||||
ts = time.Unix(0, eventTsNs)
|
||||
ts = time.Unix(0, processingTsNs)
|
||||
}
|
||||
if m.lastTsNs >= eventTsNs {
|
||||
if m.lastTsNs >= processingTsNs {
|
||||
// this is unlikely to happen, but just in case
|
||||
eventTsNs = m.lastTsNs + 1
|
||||
ts = time.Unix(0, eventTsNs)
|
||||
processingTsNs = m.lastTsNs + 1
|
||||
ts = time.Unix(0, processingTsNs)
|
||||
}
|
||||
m.lastTsNs = eventTsNs
|
||||
m.lastTsNs = processingTsNs
|
||||
logEntry := &filer_pb.LogEntry{
|
||||
TsNs: eventTsNs,
|
||||
TsNs: processingTsNs,
|
||||
PartitionKeyHash: util.HashToInt32(partitionKey),
|
||||
Data: data,
|
||||
}
|
||||
@@ -189,7 +189,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
|
||||
defer m.RUnlock()
|
||||
|
||||
if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
|
||||
return nil, ResumeFromDiskError
|
||||
if time.Now().Sub(m.lastFlushTime) < m.flushInterval*2 {
|
||||
diff := m.lastFlushTime.Sub(lastReadTime)
|
||||
glog.V(4).Infof("lastFlush:%v lastRead:%v diff:%v", m.lastFlushTime, lastReadTime, diff)
|
||||
return nil, ResumeFromDiskError
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user