filer: add username and keyPrefix support for Redis stores (#7591)
* filer: add username and keyPrefix support for Redis stores Addresses https://github.com/seaweedfs/seaweedfs/issues/7299 - Add username config option to redis2, redis_cluster2, redis_lua, and redis_lua_cluster stores (sentinel stores already had it) - Add keyPrefix config option to all Redis stores to prefix all keys, useful for Envoy Redis Proxy or multi-tenant Redis setups * refactor: reduce duplication in redis.NewClient creation Address code review feedback by defining redis.Options once and conditionally setting TLSConfig instead of duplicating the entire NewClient call. * filer.toml: add username and keyPrefix to redis2.tmp example
This commit is contained in:
@@ -201,8 +201,11 @@ table = "seaweedfs"
|
|||||||
[redis2]
|
[redis2]
|
||||||
enabled = false
|
enabled = false
|
||||||
address = "localhost:6379"
|
address = "localhost:6379"
|
||||||
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
database = 0
|
database = 0
|
||||||
|
# prefix for filer redis keys
|
||||||
|
keyPrefix = ""
|
||||||
enable_tls = false
|
enable_tls = false
|
||||||
ca_cert_path = ""
|
ca_cert_path = ""
|
||||||
client_cert_path = ""
|
client_cert_path = ""
|
||||||
@@ -217,6 +220,8 @@ masterName = "master"
|
|||||||
username = ""
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
database = 0
|
database = 0
|
||||||
|
# prefix for filer redis keys
|
||||||
|
keyPrefix = ""
|
||||||
enable_tls = false
|
enable_tls = false
|
||||||
ca_cert_path = ""
|
ca_cert_path = ""
|
||||||
client_cert_path = ""
|
client_cert_path = ""
|
||||||
@@ -232,7 +237,10 @@ addresses = [
|
|||||||
"localhost:30005",
|
"localhost:30005",
|
||||||
"localhost:30006",
|
"localhost:30006",
|
||||||
]
|
]
|
||||||
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
|
# prefix for filer redis keys
|
||||||
|
keyPrefix = ""
|
||||||
enable_tls = false
|
enable_tls = false
|
||||||
ca_cert_path = ""
|
ca_cert_path = ""
|
||||||
client_cert_path = ""
|
client_cert_path = ""
|
||||||
@@ -248,8 +256,11 @@ superLargeDirectories = []
|
|||||||
[redis_lua]
|
[redis_lua]
|
||||||
enabled = false
|
enabled = false
|
||||||
address = "localhost:6379"
|
address = "localhost:6379"
|
||||||
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
database = 0
|
database = 0
|
||||||
|
# prefix for filer redis keys
|
||||||
|
keyPrefix = ""
|
||||||
enable_tls = false
|
enable_tls = false
|
||||||
ca_cert_path = ""
|
ca_cert_path = ""
|
||||||
client_cert_path = ""
|
client_cert_path = ""
|
||||||
@@ -264,6 +275,8 @@ masterName = "master"
|
|||||||
username = ""
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
database = 0
|
database = 0
|
||||||
|
# prefix for filer redis keys
|
||||||
|
keyPrefix = ""
|
||||||
enable_tls = false
|
enable_tls = false
|
||||||
ca_cert_path = ""
|
ca_cert_path = ""
|
||||||
client_cert_path = ""
|
client_cert_path = ""
|
||||||
@@ -279,7 +292,10 @@ addresses = [
|
|||||||
"localhost:30005",
|
"localhost:30005",
|
||||||
"localhost:30006",
|
"localhost:30006",
|
||||||
]
|
]
|
||||||
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
|
# prefix for filer redis keys
|
||||||
|
keyPrefix = ""
|
||||||
enable_tls = false
|
enable_tls = false
|
||||||
ca_cert_path = ""
|
ca_cert_path = ""
|
||||||
client_cert_path = ""
|
client_cert_path = ""
|
||||||
@@ -373,8 +389,10 @@ dialTimeOut = 10
|
|||||||
enabled = false
|
enabled = false
|
||||||
location = "/tmp/"
|
location = "/tmp/"
|
||||||
address = "localhost:6379"
|
address = "localhost:6379"
|
||||||
|
username = ""
|
||||||
password = ""
|
password = ""
|
||||||
database = 1
|
database = 1
|
||||||
|
keyPrefix = ""
|
||||||
|
|
||||||
[tikv]
|
[tikv]
|
||||||
enabled = false
|
enabled = false
|
||||||
|
|||||||
@@ -25,20 +25,24 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr
|
|||||||
|
|
||||||
return store.initialize(
|
return store.initialize(
|
||||||
configuration.GetStringSlice(prefix+"addresses"),
|
configuration.GetStringSlice(prefix+"addresses"),
|
||||||
|
configuration.GetString(prefix+"username"),
|
||||||
configuration.GetString(prefix+"password"),
|
configuration.GetString(prefix+"password"),
|
||||||
|
configuration.GetString(prefix+"keyPrefix"),
|
||||||
configuration.GetBool(prefix+"useReadOnly"),
|
configuration.GetBool(prefix+"useReadOnly"),
|
||||||
configuration.GetBool(prefix+"routeByLatency"),
|
configuration.GetBool(prefix+"routeByLatency"),
|
||||||
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
|
func (store *RedisCluster2Store) initialize(addresses []string, username string, password string, keyPrefix string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
|
||||||
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
|
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
|
||||||
Addrs: addresses,
|
Addrs: addresses,
|
||||||
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
ReadOnly: readOnly,
|
ReadOnly: readOnly,
|
||||||
RouteByLatency: routeByLatency,
|
RouteByLatency: routeByLatency,
|
||||||
})
|
})
|
||||||
|
store.keyPrefix = keyPrefix
|
||||||
store.loadSuperLargeDirectories(superLargeDirectories)
|
store.loadSuperLargeDirectories(superLargeDirectories)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,10 +26,11 @@ func (store *Redis2SentinelStore) Initialize(configuration util.Configuration, p
|
|||||||
configuration.GetString(prefix+"username"),
|
configuration.GetString(prefix+"username"),
|
||||||
configuration.GetString(prefix+"password"),
|
configuration.GetString(prefix+"password"),
|
||||||
configuration.GetInt(prefix+"database"),
|
configuration.GetInt(prefix+"database"),
|
||||||
|
configuration.GetString(prefix+"keyPrefix"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *Redis2SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
|
func (store *Redis2SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int, keyPrefix string) (err error) {
|
||||||
store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
|
store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
MasterName: masterName,
|
MasterName: masterName,
|
||||||
SentinelAddrs: addresses,
|
SentinelAddrs: addresses,
|
||||||
@@ -41,5 +42,6 @@ func (store *Redis2SentinelStore) initialize(addresses []string, masterName stri
|
|||||||
ReadTimeout: time.Second * 30,
|
ReadTimeout: time.Second * 30,
|
||||||
WriteTimeout: time.Second * 5,
|
WriteTimeout: time.Second * 5,
|
||||||
})
|
})
|
||||||
|
store.keyPrefix = keyPrefix
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,8 +27,10 @@ func (store *Redis2Store) GetName() string {
|
|||||||
func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
|
func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
|
||||||
return store.initialize(
|
return store.initialize(
|
||||||
configuration.GetString(prefix+"address"),
|
configuration.GetString(prefix+"address"),
|
||||||
|
configuration.GetString(prefix+"username"),
|
||||||
configuration.GetString(prefix+"password"),
|
configuration.GetString(prefix+"password"),
|
||||||
configuration.GetInt(prefix+"database"),
|
configuration.GetInt(prefix+"database"),
|
||||||
|
configuration.GetString(prefix+"keyPrefix"),
|
||||||
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
||||||
configuration.GetBool(prefix+"enable_mtls"),
|
configuration.GetBool(prefix+"enable_mtls"),
|
||||||
configuration.GetString(prefix+"ca_cert_path"),
|
configuration.GetString(prefix+"ca_cert_path"),
|
||||||
@@ -37,7 +39,13 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string, enableMtls bool, caCertPath string, clientCertPath string, clientKeyPath string) (err error) {
|
func (store *Redis2Store) initialize(hostPort string, username string, password string, database int, keyPrefix string, superLargeDirectories []string, enableMtls bool, caCertPath string, clientCertPath string, clientKeyPath string) (err error) {
|
||||||
|
opt := &redis.Options{
|
||||||
|
Addr: hostPort,
|
||||||
|
Username: username,
|
||||||
|
Password: password,
|
||||||
|
DB: database,
|
||||||
|
}
|
||||||
if enableMtls {
|
if enableMtls {
|
||||||
clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath)
|
clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -59,25 +67,15 @@ func (store *Redis2Store) initialize(hostPort string, password string, database
|
|||||||
glog.Fatalf("Error parsing redis host and port from %s: %v", hostPort, err)
|
glog.Fatalf("Error parsing redis host and port from %s: %v", hostPort, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tlsConfig := &tls.Config{
|
opt.TLSConfig = &tls.Config{
|
||||||
Certificates: []tls.Certificate{clientCert},
|
Certificates: []tls.Certificate{clientCert},
|
||||||
RootCAs: caCertPool,
|
RootCAs: caCertPool,
|
||||||
ServerName: redisHost,
|
ServerName: redisHost,
|
||||||
MinVersion: tls.VersionTLS12,
|
MinVersion: tls.VersionTLS12,
|
||||||
}
|
}
|
||||||
store.Client = redis.NewClient(&redis.Options{
|
|
||||||
Addr: hostPort,
|
|
||||||
Password: password,
|
|
||||||
DB: database,
|
|
||||||
TLSConfig: tlsConfig,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
store.Client = redis.NewClient(&redis.Options{
|
|
||||||
Addr: hostPort,
|
|
||||||
Password: password,
|
|
||||||
DB: database,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
store.Client = redis.NewClient(opt)
|
||||||
|
store.keyPrefix = keyPrefix
|
||||||
store.loadSuperLargeDirectories(superLargeDirectories)
|
store.loadSuperLargeDirectories(superLargeDirectories)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ const (
|
|||||||
|
|
||||||
type UniversalRedis2Store struct {
|
type UniversalRedis2Store struct {
|
||||||
Client redis.UniversalClient
|
Client redis.UniversalClient
|
||||||
|
keyPrefix string
|
||||||
superLargeDirectoryHash map[string]bool
|
superLargeDirectoryHash map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -35,6 +36,13 @@ func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectori
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (store *UniversalRedis2Store) getKey(key string) string {
|
||||||
|
if store.keyPrefix == "" {
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
return store.keyPrefix + key
|
||||||
|
}
|
||||||
|
|
||||||
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
|
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||||
return ctx, nil
|
return ctx, nil
|
||||||
}
|
}
|
||||||
@@ -57,7 +65,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
|
|||||||
}
|
}
|
||||||
|
|
||||||
if name != "" {
|
if name != "" {
|
||||||
if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
|
if err = store.Client.ZAddNX(ctx, store.getKey(genDirectoryListKey(dir)), redis.Z{Score: 0, Member: name}).Err(); err != nil {
|
||||||
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
|
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -75,7 +83,7 @@ func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *fil
|
|||||||
value = util.MaybeGzipData(value)
|
value = util.MaybeGzipData(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
|
if err = store.Client.Set(ctx, store.getKey(string(entry.FullPath)), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
|
||||||
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
|
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -88,7 +96,7 @@ func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer
|
|||||||
|
|
||||||
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
|
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
|
||||||
|
|
||||||
data, err := store.Client.Get(ctx, string(fullpath)).Result()
|
data, err := store.Client.Get(ctx, store.getKey(string(fullpath))).Result()
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
@@ -110,12 +118,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
|
|||||||
|
|
||||||
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
|
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
|
||||||
|
|
||||||
_, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
|
_, err = store.Client.Del(ctx, store.getKey(genDirectoryListKey(string(fullpath)))).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
|
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = store.Client.Del(ctx, string(fullpath)).Result()
|
_, err = store.Client.Del(ctx, store.getKey(string(fullpath))).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("delete %s : %v", fullpath, err)
|
return fmt.Errorf("delete %s : %v", fullpath, err)
|
||||||
}
|
}
|
||||||
@@ -125,7 +133,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if name != "" {
|
if name != "" {
|
||||||
_, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result()
|
_, err = store.Client.ZRem(ctx, store.getKey(genDirectoryListKey(dir)), name).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
|
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
|
||||||
}
|
}
|
||||||
@@ -140,7 +148,7 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
members, err := store.Client.ZRangeByLex(ctx, genDirectoryListKey(string(fullpath)), &redis.ZRangeBy{
|
members, err := store.Client.ZRangeByLex(ctx, store.getKey(genDirectoryListKey(string(fullpath))), &redis.ZRangeBy{
|
||||||
Min: "-",
|
Min: "-",
|
||||||
Max: "+",
|
Max: "+",
|
||||||
}).Result()
|
}).Result()
|
||||||
@@ -150,12 +158,12 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
|
|||||||
|
|
||||||
for _, fileName := range members {
|
for _, fileName := range members {
|
||||||
path := util.NewFullPath(string(fullpath), fileName)
|
path := util.NewFullPath(string(fullpath), fileName)
|
||||||
_, err = store.Client.Del(ctx, string(path)).Result()
|
_, err = store.Client.Del(ctx, store.getKey(string(path))).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
|
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
|
||||||
}
|
}
|
||||||
// not efficient, but need to remove if it is a directory
|
// not efficient, but need to remove if it is a directory
|
||||||
store.Client.Del(ctx, genDirectoryListKey(string(path)))
|
store.Client.Del(ctx, store.getKey(genDirectoryListKey(string(path))))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -167,7 +175,7 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont
|
|||||||
|
|
||||||
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
||||||
|
|
||||||
dirListKey := genDirectoryListKey(string(dirPath))
|
dirListKey := store.getKey(genDirectoryListKey(string(dirPath)))
|
||||||
|
|
||||||
min := "-"
|
min := "-"
|
||||||
if startFileName != "" {
|
if startFileName != "" {
|
||||||
@@ -201,7 +209,7 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
|
|||||||
} else {
|
} else {
|
||||||
if entry.TtlSec > 0 {
|
if entry.TtlSec > 0 {
|
||||||
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
|
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
|
||||||
store.Client.Del(ctx, string(path)).Result()
|
store.Client.Del(ctx, store.getKey(string(path))).Result()
|
||||||
store.Client.ZRem(ctx, dirListKey, fileName).Result()
|
store.Client.ZRem(ctx, dirListKey, fileName).Result()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,20 +25,24 @@ func (store *RedisLuaClusterStore) Initialize(configuration util.Configuration,
|
|||||||
|
|
||||||
return store.initialize(
|
return store.initialize(
|
||||||
configuration.GetStringSlice(prefix+"addresses"),
|
configuration.GetStringSlice(prefix+"addresses"),
|
||||||
|
configuration.GetString(prefix+"username"),
|
||||||
configuration.GetString(prefix+"password"),
|
configuration.GetString(prefix+"password"),
|
||||||
|
configuration.GetString(prefix+"keyPrefix"),
|
||||||
configuration.GetBool(prefix+"useReadOnly"),
|
configuration.GetBool(prefix+"useReadOnly"),
|
||||||
configuration.GetBool(prefix+"routeByLatency"),
|
configuration.GetBool(prefix+"routeByLatency"),
|
||||||
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *RedisLuaClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
|
func (store *RedisLuaClusterStore) initialize(addresses []string, username string, password string, keyPrefix string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
|
||||||
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
|
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
|
||||||
Addrs: addresses,
|
Addrs: addresses,
|
||||||
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
ReadOnly: readOnly,
|
ReadOnly: readOnly,
|
||||||
RouteByLatency: routeByLatency,
|
RouteByLatency: routeByLatency,
|
||||||
})
|
})
|
||||||
|
store.keyPrefix = keyPrefix
|
||||||
store.loadSuperLargeDirectories(superLargeDirectories)
|
store.loadSuperLargeDirectories(superLargeDirectories)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,10 +26,11 @@ func (store *RedisLuaSentinelStore) Initialize(configuration util.Configuration,
|
|||||||
configuration.GetString(prefix+"username"),
|
configuration.GetString(prefix+"username"),
|
||||||
configuration.GetString(prefix+"password"),
|
configuration.GetString(prefix+"password"),
|
||||||
configuration.GetInt(prefix+"database"),
|
configuration.GetInt(prefix+"database"),
|
||||||
|
configuration.GetString(prefix+"keyPrefix"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
|
func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int, keyPrefix string) (err error) {
|
||||||
store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
|
store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
MasterName: masterName,
|
MasterName: masterName,
|
||||||
SentinelAddrs: addresses,
|
SentinelAddrs: addresses,
|
||||||
@@ -41,5 +42,6 @@ func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName st
|
|||||||
ReadTimeout: time.Second * 30,
|
ReadTimeout: time.Second * 30,
|
||||||
WriteTimeout: time.Second * 5,
|
WriteTimeout: time.Second * 5,
|
||||||
})
|
})
|
||||||
|
store.keyPrefix = keyPrefix
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,18 +21,22 @@ func (store *RedisLuaStore) GetName() string {
|
|||||||
func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) {
|
func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) {
|
||||||
return store.initialize(
|
return store.initialize(
|
||||||
configuration.GetString(prefix+"address"),
|
configuration.GetString(prefix+"address"),
|
||||||
|
configuration.GetString(prefix+"username"),
|
||||||
configuration.GetString(prefix+"password"),
|
configuration.GetString(prefix+"password"),
|
||||||
configuration.GetInt(prefix+"database"),
|
configuration.GetInt(prefix+"database"),
|
||||||
|
configuration.GetString(prefix+"keyPrefix"),
|
||||||
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *RedisLuaStore) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
|
func (store *RedisLuaStore) initialize(hostPort string, username string, password string, database int, keyPrefix string, superLargeDirectories []string) (err error) {
|
||||||
store.Client = redis.NewClient(&redis.Options{
|
store.Client = redis.NewClient(&redis.Options{
|
||||||
Addr: hostPort,
|
Addr: hostPort,
|
||||||
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
DB: database,
|
DB: database,
|
||||||
})
|
})
|
||||||
|
store.keyPrefix = keyPrefix
|
||||||
store.loadSuperLargeDirectories(superLargeDirectories)
|
store.loadSuperLargeDirectories(superLargeDirectories)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ const (
|
|||||||
|
|
||||||
type UniversalRedisLuaStore struct {
|
type UniversalRedisLuaStore struct {
|
||||||
Client redis.UniversalClient
|
Client redis.UniversalClient
|
||||||
|
keyPrefix string
|
||||||
superLargeDirectoryHash map[string]bool
|
superLargeDirectoryHash map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,6 +37,13 @@ func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirecto
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (store *UniversalRedisLuaStore) getKey(key string) string {
|
||||||
|
if store.keyPrefix == "" {
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
return store.keyPrefix + key
|
||||||
|
}
|
||||||
|
|
||||||
func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
||||||
return ctx, nil
|
return ctx, nil
|
||||||
}
|
}
|
||||||
@@ -60,7 +68,7 @@ func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *fil
|
|||||||
dir, name := entry.FullPath.DirAndName()
|
dir, name := entry.FullPath.DirAndName()
|
||||||
|
|
||||||
err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
|
err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
|
||||||
[]string{string(entry.FullPath), genDirectoryListKey(dir)},
|
[]string{store.getKey(string(entry.FullPath)), store.getKey(genDirectoryListKey(dir))},
|
||||||
value, entry.TtlSec,
|
value, entry.TtlSec,
|
||||||
store.isSuperLargeDirectory(dir), 0, name).Err()
|
store.isSuperLargeDirectory(dir), 0, name).Err()
|
||||||
|
|
||||||
@@ -78,7 +86,7 @@ func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *fil
|
|||||||
|
|
||||||
func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
|
func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
|
||||||
|
|
||||||
data, err := store.Client.Get(ctx, string(fullpath)).Result()
|
data, err := store.Client.Get(ctx, store.getKey(string(fullpath))).Result()
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return nil, filer_pb.ErrNotFound
|
return nil, filer_pb.ErrNotFound
|
||||||
}
|
}
|
||||||
@@ -103,7 +111,7 @@ func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath u
|
|||||||
dir, name := fullpath.DirAndName()
|
dir, name := fullpath.DirAndName()
|
||||||
|
|
||||||
err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
|
err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
|
||||||
[]string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
|
[]string{store.getKey(string(fullpath)), store.getKey(genDirectoryListKey(string(fullpath))), store.getKey(genDirectoryListKey(dir))},
|
||||||
store.isSuperLargeDirectory(dir), name).Err()
|
store.isSuperLargeDirectory(dir), name).Err()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -120,7 +128,7 @@ func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, f
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
|
err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
|
||||||
[]string{string(fullpath)}).Err()
|
[]string{store.getKey(string(fullpath))}).Err()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
|
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
|
||||||
@@ -135,7 +143,7 @@ func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Co
|
|||||||
|
|
||||||
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
||||||
|
|
||||||
dirListKey := genDirectoryListKey(string(dirPath))
|
dirListKey := store.getKey(genDirectoryListKey(string(dirPath)))
|
||||||
|
|
||||||
min := "-"
|
min := "-"
|
||||||
if startFileName != "" {
|
if startFileName != "" {
|
||||||
|
|||||||
Reference in New Issue
Block a user