Add Redis support for Distributed Filer store.

This commit is contained in:
Chris Lu
2015-01-06 20:15:13 -08:00
parent d77f3120c3
commit e4531fc1e6
5 changed files with 87 additions and 17 deletions

View File

@@ -0,0 +1,48 @@
package redis_store
import (
redis "gopkg.in/redis.v2"
)
type RedisStore struct {
Client *redis.Client
}
func NewRedisStore(hostPort string, database int) *RedisStore {
client := redis.NewTCPClient(&redis.Options{
Addr: hostPort,
Password: "", // no password set
DB: int64(database),
})
return &RedisStore{Client: client}
}
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return fid, err
}
func (s *RedisStore) Put(fullFileName string, fid string) (err error) {
_, err = s.Client.Set(fullFileName, fid).Result()
if err == redis.Nil {
err = nil
}
return err
}
// Currently the fid is not returned
func (s *RedisStore) Delete(fullFileName string) (fid string, err error) {
_, err = s.Client.Del(fullFileName).Result()
if err == redis.Nil {
err = nil
}
return "", err
}
func (c *RedisStore) Close() {
if c.Client != nil {
c.Client.Close()
}
}

View File

@@ -24,6 +24,8 @@ type FilerOptions struct {
redirectOnRead *bool
cassandra_server *string
cassandra_keyspace *string
redis_server *string
redis_database *int
}
func init() {
@@ -36,6 +38,8 @@ func init() {
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the cassandra server, e.g., 127.0.0.1:6379")
f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
}
var cmdFiler = &Command{
@@ -70,6 +74,7 @@ func runFiler(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead,
*f.cassandra_server, *f.cassandra_keyspace,
*f.redis_server, *f.redis_database,
)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())

View File

@@ -160,6 +160,7 @@ func runServer(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
"", "",
"", 0,
)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())

View File

@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/weed-fs/go/filer/cassandra_store"
"github.com/chrislusf/weed-fs/go/filer/embedded_filer"
"github.com/chrislusf/weed-fs/go/filer/flat_namespace"
"github.com/chrislusf/weed-fs/go/filer/redis_store"
"github.com/chrislusf/weed-fs/go/glog"
)
@@ -23,6 +24,7 @@ type FilerServer struct {
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
replication string, redirectOnRead bool,
cassandra_server string, cassandra_keyspace string,
redis_server string, redis_database int,
) (fs *FilerServer, err error) {
fs = &FilerServer{
master: master,
@@ -32,19 +34,22 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
port: ":" + strconv.Itoa(port),
}
if cassandra_server == "" {
if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
}
fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store)
} else if redis_server != "" {
redis_store := redis_store.NewRedisStore(redis_server, redis_database)
fs.filer = flat_namespace.NewFlatNamesapceFiler(master, redis_store)
} else {
if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
glog.Fatalf("Can not start filer in dir %s : %v", err)
return
}
r.HandleFunc("/admin/mv", fs.moveHandler)
} else {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
}
fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store)
}
r.HandleFunc("/", fs.filerHandler)