redis3 adds distributed locking

This commit is contained in:
Chris Lu
2021-10-06 00:03:54 -07:00
parent f0d1e7bd05
commit 893f0587b1
8 changed files with 142 additions and 14 deletions

View File

@@ -10,7 +10,18 @@ import (
const maxNameBatchSizeLimit = 1000
func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
// lock and unlock
mutex := redisStore.redsync.NewMutex(key+"lock")
if err := mutex.Lock(); err != nil {
return fmt.Errorf("lock %s: %v", key, err)
}
defer func() {
mutex.Unlock()
}()
client := redisStore.Client
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
@@ -20,11 +31,11 @@ func insertChild(ctx context.Context, client redis.UniversalClient, key string,
store := newSkipListElementStore(key, client)
nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
// println("add", key, name)
if err := nameList.WriteName(name); err != nil {
glog.Errorf("add %s %s: %v", key, name, err)
return err
}
if !nameList.HasChanges() {
return nil
}
@@ -36,7 +47,16 @@ func insertChild(ctx context.Context, client redis.UniversalClient, key string,
return nil
}
func removeChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
// lock and unlock
mutex := redisStore.redsync.NewMutex(key+"lock")
if err := mutex.Lock(); err != nil {
return fmt.Errorf("lock %s: %v", key, err)
}
defer mutex.Unlock()
client := redisStore.Client
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
@@ -60,8 +80,16 @@ func removeChild(ctx context.Context, client redis.UniversalClient, key string,
return nil
}
func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error {
func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, onDeleteFn func(name string) error) error {
// lock and unlock
mutex := redisStore.redsync.NewMutex(key+"lock")
if err := mutex.Lock(); err != nil {
return fmt.Errorf("lock %s: %v", key, err)
}
defer mutex.Unlock()
client := redisStore.Client
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
@@ -84,13 +112,13 @@ func removeChildren(ctx context.Context, client redis.UniversalClient, key strin
if err = nameList.RemoteAllListElement(); err != nil {
return err
}
return nil
}
func listChildren(ctx context.Context, client redis.UniversalClient, key string, startFileName string, eachFn func(name string) bool) error {
func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, startFileName string, eachFn func(name string) bool) error {
client := redisStore.Client
data, err := client.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {