add leveldb store

1. switch to viper for filer store configuration
2. simplify FindEntry() return values, removing “found”
3. add leveldb store
This commit is contained in:
Chris Lu
2018-05-26 03:49:46 -07:00
parent c34feca59c
commit 9e77563c99
17 changed files with 382 additions and 193 deletions

View File

@@ -14,12 +14,9 @@ import (
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
found, entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
if err != nil {
return nil, err
}
if !found {
return nil, fmt.Errorf("%s not found under %s", req.Name, req.Directory)
return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
}
return &filer_pb.LookupDirectoryEntryResponse{
@@ -65,13 +62,10 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get
fullpath := filer2.NewFullPath(req.ParentDir, req.Name)
found, entry, err := fs.filer.FindEntry(fullpath)
entry, err := fs.filer.FindEntry(fullpath)
if err != nil {
return nil, err
}
if !found {
attributes.FileSize = 0
return nil, fmt.Errorf("file %s not found", fullpath)
return nil, fmt.Errorf("FindEntry %s: %v", fullpath, err)
}
attributes.FileSize = entry.Size()
@@ -138,12 +132,9 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
fullpath := filepath.Join(req.Directory, req.Entry.Name)
found, entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
if err != nil {
return &filer_pb.UpdateEntryResponse{}, err
}
if !found {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("file not found: %s", fullpath)
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
// remove old chunks if not included in the new ones

View File

@@ -1,10 +1,8 @@
package weed_server
import (
"encoding/json"
"math/rand"
"net/http"
"os"
"strconv"
"sync"
"time"
@@ -16,7 +14,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
)
type filerConf struct {
@@ -25,21 +24,6 @@ type filerConf struct {
PostgresConf *postgres_store.PostgresConf `json:"postgres"`
}
func parseConfFile(confPath string) (*filerConf, error) {
var setting filerConf
configFile, err := os.Open(confPath)
defer configFile.Close()
if err != nil {
return nil, err
}
jsonParser := json.NewDecoder(configFile)
if err = jsonParser.Decode(&setting); err != nil {
return nil, err
}
return &setting, nil
}
type FilerServer struct {
port string
master string
@@ -54,13 +38,10 @@ type FilerServer struct {
masterNodes *storage.MasterNodes
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, master string, dir string, collection string,
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, master string, collection string,
replication string, redirectOnRead bool, disableDirListing bool,
confFile string,
maxMB int,
secret string,
cassandra_server string, cassandra_keyspace string,
redis_server string, redis_password string, redis_database int,
) (fs *FilerServer, err error) {
fs = &FilerServer{
master: master,
@@ -71,42 +52,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int,
maxMB: maxMB,
port: ip + ":" + strconv.Itoa(port),
}
var setting *filerConf
if confFile != "" {
setting, err = parseConfFile(confFile)
if err != nil {
return nil, err
}
} else {
setting = new(filerConf)
}
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
// mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
// fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if setting.PostgresConf != nil {
// fs.filer = postgres_store.NewPostgresStore(master, *setting.PostgresConf)
} else if cassandra_server != "" {
// cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
// if err != nil {
// glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
// }
// fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
} else if redis_server != "" {
// redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database)
// fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
} else {
/*
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
return
}
*/
}
fs.filer = filer2.NewFiler(master)
fs.filer.SetStore(memdb.NewMemDbStore())
fs.filer.LoadConfiguration()
defaultMux.HandleFunc("/admin/register", fs.registerHandler)
defaultMux.HandleFunc("/", fs.filerHandler)

View File

@@ -49,7 +49,7 @@ func (fs *FilerServer) registerHandler(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("register %s to %s parse fileSize %s", fileId, path, r.FormValue("fileSize"))
err = fs.filer.CreateEntry(entry)
if err != nil {
glog.V(4).Infof("register %s to %s error: %v", fileId, path, err)
glog.V(0).Infof("register %s to %s error: %v", fileId, path, err)
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
w.WriteHeader(http.StatusOK)

View File

@@ -79,9 +79,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
path = path[:len(path)-1]
}
found, entry, err := fs.filer.FindEntry(filer2.FullPath(path))
if !found || err != nil {
glog.V(3).Infof("Not found %s: %v", path, err)
entry, err := fs.filer.FindEntry(filer2.FullPath(path))
if err != nil {
glog.V(1).Infof("Not found %s: %v", path, err)
w.WriteHeader(http.StatusNotFound)
return
}
@@ -96,7 +96,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
if len(entry.Chunks) == 0 {
glog.V(3).Infof("Empty %s: %v", path)
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
w.WriteHeader(http.StatusNoContent)
return
}

View File

@@ -76,20 +76,17 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea
}
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
var found bool
var entry *filer2.Entry
if found, entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil {
if entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if found {
} else {
fileId = entry.Chunks[0].FileId
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound)
}
} else {
w.WriteHeader(http.StatusNotFound)
}
return
}
@@ -319,7 +316,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// also delete the old fid unless PUT operation
if r.Method != "PUT" {
if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil && found {
if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
oldFid := entry.Chunks[0].FileId
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
@@ -485,7 +482,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
path := r.URL.Path
// also delete the old fid unless PUT operation
if r.Method != "PUT" {
if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); found && err == nil {
if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
for _, chunk := range entry.Chunks {
oldFid := chunk.FileId
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))