Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
This commit is contained in:
@@ -4,12 +4,13 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
|
||||
)
|
||||
|
||||
const (
|
||||
adminLockName = "shell"
|
||||
adminLockName = cluster.AdminShellLockName
|
||||
adminLockClientName = "admin-plugin"
|
||||
)
|
||||
|
||||
|
||||
61
weed/admin/dash/admin_presence_lock.go
Normal file
61
weed/admin/dash/admin_presence_lock.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package dash
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
|
||||
)
|
||||
|
||||
const adminPresenceClientName = "admin-server"
|
||||
|
||||
type adminPresenceLock struct {
|
||||
locker *exclusive_locks.ExclusiveLocker
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func newAdminPresenceLock(masterClient *wdclient.MasterClient) *adminPresenceLock {
|
||||
if masterClient == nil {
|
||||
return nil
|
||||
}
|
||||
return &adminPresenceLock{
|
||||
locker: exclusive_locks.NewExclusiveLocker(masterClient, cluster.AdminServerPresenceLockName),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *adminPresenceLock) Start() {
|
||||
if l == nil || l.locker == nil {
|
||||
return
|
||||
}
|
||||
l.locker.SetMessage("admin server connected")
|
||||
go func() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
if !l.locker.IsLocked() {
|
||||
l.locker.RequestLock(adminPresenceClientName)
|
||||
}
|
||||
select {
|
||||
case <-l.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (l *adminPresenceLock) Stop() {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-l.stopCh:
|
||||
default:
|
||||
close(l.stopCh)
|
||||
}
|
||||
if l.locker != nil {
|
||||
l.locker.ReleaseLock()
|
||||
}
|
||||
}
|
||||
@@ -99,6 +99,7 @@ type AdminServer struct {
|
||||
maintenanceManager *maintenance.MaintenanceManager
|
||||
plugin *adminplugin.Plugin
|
||||
pluginLock *AdminLockManager
|
||||
adminPresenceLock *adminPresenceLock
|
||||
expireJobHandler func(jobID string, reason string) (*adminplugin.TrackedJob, bool, error)
|
||||
|
||||
// Topic retention purger
|
||||
@@ -137,6 +138,10 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
|
||||
go masterClient.KeepConnectedToMaster(ctx)
|
||||
|
||||
lockManager := NewAdminLockManager(masterClient, adminLockClientName)
|
||||
presenceLock := newAdminPresenceLock(masterClient)
|
||||
if presenceLock != nil {
|
||||
presenceLock.Start()
|
||||
}
|
||||
|
||||
server := &AdminServer{
|
||||
masterClient: masterClient,
|
||||
@@ -150,6 +155,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
|
||||
s3TablesManager: newS3TablesManager(),
|
||||
icebergPort: icebergPort,
|
||||
pluginLock: lockManager,
|
||||
adminPresenceLock: presenceLock,
|
||||
}
|
||||
|
||||
// Initialize topic retention purger
|
||||
@@ -1286,6 +1292,9 @@ func (s *AdminServer) Shutdown() {
|
||||
|
||||
// Stop maintenance manager
|
||||
s.StopMaintenanceManager()
|
||||
if s.adminPresenceLock != nil {
|
||||
s.adminPresenceLock.Stop()
|
||||
}
|
||||
|
||||
if s.plugin != nil {
|
||||
s.plugin.Shutdown()
|
||||
|
||||
6
weed/cluster/admin_locks.go
Normal file
6
weed/cluster/admin_locks.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package cluster
|
||||
|
||||
const (
|
||||
AdminShellLockName = "shell"
|
||||
AdminServerPresenceLockName = "admin-server"
|
||||
)
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
[master.maintenance]
|
||||
# periodically run these scripts are the same as running them from 'weed shell'
|
||||
# Scripts are skipped while an admin server is connected.
|
||||
scripts = """{{DEFAULT_MAINTENANCE_SCRIPTS}}"""
|
||||
sleep_minutes = 17 # sleep minutes between each script execution
|
||||
|
||||
|
||||
@@ -333,6 +333,14 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *MasterServer) isAdminServerConnected() bool {
|
||||
if ms == nil || ms.adminLocks == nil {
|
||||
return false
|
||||
}
|
||||
_, _, isLocked := ms.adminLocks.isLocked(cluster.AdminServerPresenceLockName)
|
||||
return isLocked
|
||||
}
|
||||
|
||||
func (ms *MasterServer) startAdminScripts() {
|
||||
v := util.GetViper()
|
||||
v.SetDefault("master.maintenance.scripts", maintenance.DefaultMasterMaintenanceScripts)
|
||||
@@ -371,6 +379,10 @@ func (ms *MasterServer) startAdminScripts() {
|
||||
for {
|
||||
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
|
||||
if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" {
|
||||
if ms.isAdminServerConnected() {
|
||||
glog.V(1).Infof("Skipping master maintenance scripts because admin server is connected")
|
||||
continue
|
||||
}
|
||||
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup))
|
||||
if shellOptions.FilerAddress == "" {
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user