Merge remote-tracking branch 'origin/master' into filer_mongodb

# Conflicts:
#	go.mod
#	go.sum
#	weed/server/filer_server.go
This commit is contained in:
bukton
2020-04-19 00:21:45 +07:00
101 changed files with 3019 additions and 1123 deletions

View File

@@ -21,6 +21,7 @@ type Attr struct {
UserName string
GroupNames []string
SymlinkTarget string
Md5 []byte
}
func (attr Attr) IsDirectory() bool {

View File

@@ -52,6 +52,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
Md5: entry.Attr.Md5,
}
}
@@ -71,6 +72,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget
t.Md5 = attr.Md5
return t
}
@@ -93,6 +95,10 @@ func EqualEntry(a, b *Entry) bool {
return false
}
if !bytes.Equal(a.Md5, b.Md5) {
return false
}
for i := 0; i < len(a.Chunks); i++ {
if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
return false

View File

@@ -20,7 +20,21 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
return
}
func ETag(chunks []*filer_pb.FileChunk) (etag string) {
func ETag(entry *filer_pb.Entry) (etag string) {
if entry.Attributes == nil || entry.Attributes.Md5 == nil {
return ETagChunks(entry.Chunks)
}
return fmt.Sprintf("%x", entry.Attributes.Md5)
}
func ETagEntry(entry *Entry) (etag string) {
if entry.Attr.Md5 == nil {
return ETagChunks(entry.Chunks)
}
return fmt.Sprintf("%x", entry.Attr.Md5)
}
func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
if len(chunks) == 1 {
return chunks[0].ETag
}
@@ -71,11 +85,15 @@ type ChunkView struct {
Offset int64
Size uint64
LogicOffset int64
IsFullChunk bool
ChunkSize uint64
CipherKey []byte
IsGzipped bool
}
func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
}
func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles := NonOverlappingVisibleIntervals(chunks)
@@ -97,13 +115,12 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
for _, chunk := range visibles {
if chunk.start <= offset && offset < chunk.stop && offset < stop {
isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop
views = append(views, &ChunkView{
FileId: chunk.fileId,
Offset: offset - chunk.start, // offset is the data starting location in this file id
Size: uint64(min(chunk.stop, stop) - offset),
LogicOffset: offset,
IsFullChunk: isFullChunk,
ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped,
})
@@ -132,7 +149,7 @@ var bufPool = sync.Pool{
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, true, chunk.CipherKey, chunk.IsGzipped)
newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsGzipped)
length := len(visibles)
if length == 0 {
@@ -146,11 +163,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
logPrintf(" before", visibles)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, false, v.cipherKey, v.isGzipped))
newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
@@ -202,18 +219,18 @@ type VisibleInterval struct {
stop int64
modifiedTime int64
fileId string
isFullChunk bool
chunkSize uint64
cipherKey []byte
isGzipped bool
}
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool, cipherKey []byte, isGzipped bool) VisibleInterval {
func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTime: modifiedTime,
isFullChunk: isFullChunk,
chunkSize: chunkSize,
cipherKey: cipherKey,
isGzipped: isGzipped,
}

View File

@@ -13,8 +13,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/queue"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -32,20 +32,24 @@ type Filer struct {
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
DirQueuesPath string
FsyncBuckets []string
buckets *FilerBuckets
Cipher bool
metaLogBuffer *queue.LogBuffer
metaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, notifyFn func()) *Filer {
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerGrpcPort, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
}
f.metaLogBuffer = queue.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.metaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
go f.loopProcessingDeletion()

View File

@@ -13,6 +13,7 @@ type BucketName string
type BucketOption struct {
Name BucketName
Replication string
fsync bool
}
type FilerBuckets struct {
dirBucketsPath string
@@ -20,36 +21,42 @@ type FilerBuckets struct {
sync.RWMutex
}
func (f *Filer) LoadBuckets(dirBucketsPath string) {
func (f *Filer) LoadBuckets() {
f.buckets = &FilerBuckets{
buckets: make(map[BucketName]*BucketOption),
}
f.DirBucketsPath = dirBucketsPath
limit := math.MaxInt32
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(dirBucketsPath), "", false, limit)
entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit)
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
return
}
shouldFsyncMap := make(map[string]bool)
for _, bucket := range f.FsyncBuckets {
shouldFsyncMap[bucket] = true
}
glog.V(1).Infof("buckets found: %d", len(entries))
f.buckets.Lock()
for _, entry := range entries {
_, shouldFsnyc := shouldFsyncMap[entry.Name()]
f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
Name: BucketName(entry.Name()),
Replication: entry.Replication,
fsync: shouldFsnyc,
}
}
f.buckets.Unlock()
}
func (f *Filer) ReadBucketOption(buketName string) (replication string) {
func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
f.buckets.RLock()
defer f.buckets.RUnlock()
@@ -57,9 +64,9 @@ func (f *Filer) ReadBucketOption(buketName string) (replication string) {
option, found := f.buckets.buckets[BucketName(buketName)]
if !found {
return ""
return "", false
}
return option.Replication
return option.Replication, option.fsync
}

View File

@@ -25,7 +25,7 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
// println("fullpath:", fullpath)
if strings.HasPrefix(fullpath, "/.meta") {
if strings.HasPrefix(fullpath, SystemLogDir) {
return
}
@@ -45,32 +45,34 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
notification.Queue.SendMessage(fullpath, eventNotification)
}
f.logMetaEvent(time.Now(), fullpath, eventNotification)
f.logMetaEvent(fullpath, eventNotification)
}
func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *filer_pb.EventNotification) {
func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) {
dir, _ := util.FullPath(fullpath).DirAndName()
event := &filer_pb.FullEventNotification{
event := &filer_pb.SubscribeMetadataResponse{
Directory: dir,
EventNotification: eventNotification,
}
data, err := proto.Marshal(event)
if err != nil {
glog.Errorf("failed to marshal filer_pb.FullEventNotification %+v: %v", event, err)
glog.Errorf("failed to marshal filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
return
}
f.metaLogBuffer.AddToBuffer(ts, []byte(dir), data)
f.metaLogBuffer.AddToBuffer([]byte(dir), data)
}
func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
targetFile := fmt.Sprintf("/.meta/log/%04d/%02d/%02d/%02d/%02d/%02d.%09d.log",
targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
startTime.Second(), startTime.Nanosecond())
// startTime.Second(), startTime.Nanosecond(),
)
if err := f.appendToFile(targetFile, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
@@ -95,11 +97,11 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err)
}
event := &filer_pb.FullEventNotification{}
event := &filer_pb.SubscribeMetadataResponse{}
err = proto.Unmarshal(logEntry.Data, event)
if err != nil {
glog.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err)
return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err)
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
}
err = eachEventFn(event.Directory, event.EventNotification)

View File

@@ -13,25 +13,10 @@ import (
func (f *Filer) appendToFile(targetFile string, data []byte) error {
// assign a volume location
assignRequest := &operation.VolumeAssignRequest{
Count: 1,
assignResult, uploadResult, err2 := f.assignAndUpload(data)
if err2 != nil {
return err2
}
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
if err != nil {
return fmt.Errorf("AssignVolume: %v", err)
}
if assignResult.Error != "" {
return fmt.Errorf("AssignVolume error: %v", assignResult.Error)
}
// upload data
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
uploadResult, err := operation.UploadData(targetUrl, "", false, data, false, "", nil, assignResult.Auth)
if err != nil {
return fmt.Errorf("upload data %s: %v", targetUrl, err)
}
// println("uploaded to", targetUrl)
// find out existing entry
fullpath := util.FullPath(targetFile)
@@ -68,3 +53,29 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
return err
}
func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
// assign a volume location
assignRequest := &operation.VolumeAssignRequest{
Count: 1,
Collection: f.metaLogCollection,
Replication: f.metaLogReplication,
WritableVolumeCount: 1,
}
assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
if err != nil {
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
}
if assignResult.Error != "" {
return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
}
// upload data
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
// println("uploaded to", targetUrl)
return assignResult, uploadResult, nil
}

View File

@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil)
filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil)
filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}

View File

@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil)
filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}
@@ -66,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
filer := filer2.NewFiler(nil, nil, 0, nil)
filer := filer2.NewFiler(nil, nil, 0, "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDB2Store{}

View File

@@ -9,8 +9,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/pb_cache"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -22,12 +22,12 @@ type ChunkReadAt struct {
lookupFileId func(fileId string) (targetUrl string, err error)
readerLock sync.Mutex
chunkCache *pb_cache.ChunkCache
chunkCache *chunk_cache.ChunkCache
}
// var _ = io.ReaderAt(&ChunkReadAt{})
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *pb_cache.ChunkCache) *ChunkReadAt {
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
@@ -105,9 +105,11 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
// fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
chunkData := c.chunkCache.GetChunk(chunkView.FileId)
hasDataInCache := false
chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
if chunkData != nil {
glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
hasDataInCache = true
} else {
chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
if err != nil {
@@ -121,7 +123,9 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err
data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)]
c.chunkCache.SetChunk(chunkView.FileId, chunkData)
if !hasDataInCache {
c.chunkCache.SetChunk(chunkView.FileId, chunkData)
}
return data, nil
}

View File

@@ -0,0 +1,42 @@
package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis"
)
func init() {
filer2.Stores = append(filer2.Stores, &RedisCluster2Store{})
}
type RedisCluster2Store struct {
UniversalRedis2Store
}
func (store *RedisCluster2Store) GetName() string {
return "redis_cluster2"
}
func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
configuration.SetDefault(prefix+"useReadOnly", true)
configuration.SetDefault(prefix+"routeByLatency", true)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),
configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"),
)
}
func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses,
Password: password,
ReadOnly: readOnly,
RouteByLatency: routeByLatency,
})
return
}

View File

@@ -0,0 +1,36 @@
package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/go-redis/redis"
)
func init() {
filer2.Stores = append(filer2.Stores, &Redis2Store{})
}
type Redis2Store struct {
UniversalRedis2Store
}
func (store *Redis2Store) GetName() string {
return "redis2"
}
func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
)
}
func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
return
}

View File

@@ -0,0 +1,162 @@
package redis2
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
const (
DIR_LIST_MARKER = "\x00"
)
type UniversalRedis2Store struct {
Client redis.UniversalClient
}
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *UniversalRedis2Store) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer2.Entry, err error) {
data, err := store.Client.Get(string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer2.Entry{
FullPath: fullpath,
}
err = entry.DecodeAttributesAndChunks([]byte(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
_, err = store.Client.Del(string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
dir, name := fullpath.DirAndName()
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
}
return nil
}
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("delete folder %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
}
return nil
}
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
dirListKey := genDirectoryListKey(string(fullpath))
start := int64(0)
if startFileName != "" {
start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
if !inclusive {
start++
}
}
members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
// fetch entry meta
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
entry, err := store.FindEntry(ctx, path)
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
store.Client.Del(string(path)).Result()
store.Client.ZRem(dirListKey, fileName).Result()
continue
}
}
entries = append(entries, entry)
}
}
return entries, err
}
func genDirectoryListKey(dir string) (dirList string) {
return dir + DIR_LIST_MARKER
}
func (store *UniversalRedis2Store) Shutdown() {
store.Client.Close()
}

View File

@@ -31,7 +31,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
for _, chunkView := range chunkViews {
urlString := fileId2Url[chunkView.FileId]
err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
@@ -128,7 +128,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {

6
weed/filer2/topics.go Normal file
View File

@@ -0,0 +1,6 @@
package filer2
const (
TopicsDir = "/topics"
SystemLogDir = TopicsDir + "/.system/log"
)