refactoring

This commit is contained in:
Chris Lu
2020-05-26 00:03:31 -07:00
parent d4235afe4d
commit 5d3ec22975
2 changed files with 12 additions and 4 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks"
)
type ShellOptions struct {
@@ -28,7 +29,7 @@ type CommandEnv struct {
env map[string]string
MasterClient *wdclient.MasterClient
option ShellOptions
locker *ExclusiveLocker
locker *exclusive_locks.ExclusiveLocker
}
type command interface {
@@ -47,7 +48,7 @@ func NewCommandEnv(options ShellOptions) *CommandEnv {
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
option: options,
}
ce.locker = NewExclusiveLocker(ce.MasterClient)
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
return ce
}
@@ -70,7 +71,7 @@ func (ce *CommandEnv) isDirectory(path string) bool {
func (ce *CommandEnv) confirmIsLocked() error {
if ce.locker.isLocking {
if ce.locker.IsLocking() {
return nil
}

View File

@@ -1,104 +0,0 @@
package shell
import (
"context"
"sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
const (
RenewInteval = 4 * time.Second
SafeRenewInteval = 3 * time.Second
InitLockInteval = 1 * time.Second
AdminLockName = "admin"
)
type ExclusiveLocker struct {
masterClient *wdclient.MasterClient
token int64
lockTsNs int64
isLocking bool
}
func NewExclusiveLocker(masterClient *wdclient.MasterClient) *ExclusiveLocker {
return &ExclusiveLocker{
masterClient: masterClient,
}
}
func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) {
for time.Unix(0, atomic.LoadInt64(&l.lockTsNs)).Add(SafeRenewInteval).Before(time.Now()) {
// wait until now is within the safe lock period, no immediate renewal to change the token
time.Sleep(100 * time.Millisecond)
}
return atomic.LoadInt64(&l.token), atomic.LoadInt64(&l.lockTsNs)
}
func (l *ExclusiveLocker) RequestLock() {
// retry to get the lease
for {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
})
if err == nil {
atomic.StoreInt64(&l.token, resp.Token)
atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
}
return err
}); err != nil {
// println("leasing problem", err.Error())
time.Sleep(InitLockInteval)
} else {
break
}
}
l.isLocking = true
// start a goroutine to renew the lease
go func() {
for l.isLocking {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
})
if err == nil {
atomic.StoreInt64(&l.token, resp.Token)
atomic.StoreInt64(&l.lockTsNs, resp.LockTsNs)
// println("ts", l.lockTsNs, "token", l.token)
}
return err
}); err != nil {
glog.Errorf("failed to renew lock: %v", err)
return
} else {
time.Sleep(RenewInteval)
}
}
}()
}
func (l *ExclusiveLocker) ReleaseLock() {
l.isLocking = false
l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
})
return nil
})
atomic.StoreInt64(&l.token, 0)
atomic.StoreInt64(&l.lockTsNs, 0)
}