Add dynamic timeouts to plugin worker vacuum gRPC calls (#8593)
* add dynamic timeouts to plugin worker vacuum gRPC calls All vacuum gRPC calls used context.Background() with no deadline, so the plugin scheduler's execution timeout could kill a job while a large volume compact was still in progress. Use volume-size-scaled timeouts matching the topology vacuum approach: 3 min/GB for compact, 1 min/GB for check, commit, and cleanup. Fixes #8591 * scale scheduler execution timeout by volume size The scheduler's per-job execution timeout (default 240s) would kill vacuum jobs on large volumes before they finish. Three changes: 1. Vacuum detection now includes estimated_runtime_seconds in job proposals, computed as 5 min/GB of volume size. 2. The scheduler checks for estimated_runtime_seconds in job parameters and uses it as the execution timeout when larger than the default — a generic mechanism any handler can use. 3. Vacuum task gRPC calls now use the passed-in ctx as parent instead of context.Background(), so scheduler cancellation propagates to in-flight RPCs. * extend job type runtime when proposals need more time The JobTypeMaxRuntime (default 30 min) wraps both detection and execution. Its context is the parent of all per-job execution contexts, so even with per-job estimated_runtime_seconds, jobCtx would cancel everything when it expires. After detection, scan proposals for the maximum estimated_runtime_seconds. If any proposal needs more time than the remaining JobTypeMaxRuntime, create a new execution context with enough headroom. This lets large vacuum jobs complete without being killed by the job type deadline while still respecting the configured limit for normal-sized jobs. * log missing volume size metric, remove dead minimum runtime guard Add a debug log in vacuumTimeout when t.volumeSize is 0 so operators can investigate why metrics are missing for a volume. Remove the unreachable estimatedRuntimeSeconds < 180 check in buildVacuumProposal — volumeSizeGB always >= 1 (due to +1 floor), so estimatedRuntimeSeconds is always >= 300. * cap estimated runtime and fix status check context - Cap maxEstimatedRuntime and per-job timeout overrides to 8 hours to prevent unbounded timeouts from bad metrics. - Check execCtx.Err() instead of jobCtx.Err() for status reporting, since dispatch runs under execCtx which may have a longer deadline. A successful dispatch under execCtx was misreported as "timeout" when jobCtx had expired.
This commit is contained in:
@@ -32,6 +32,7 @@ const (
|
||||
defaultClusterContextTimeout = 10 * time.Second
|
||||
defaultWaitingBacklogFloor = 8
|
||||
defaultWaitingBacklogMultiplier = 4
|
||||
maxEstimatedRuntimeCap = 8 * time.Hour
|
||||
)
|
||||
|
||||
type schedulerPolicy struct {
|
||||
@@ -293,6 +294,26 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
|
||||
|
||||
r.setSchedulerLoopState(jobType, "executing")
|
||||
|
||||
// Scan proposals for the maximum estimated_runtime_seconds so the
|
||||
// execution phase gets enough time for large jobs (e.g. vacuum on
|
||||
// big volumes). If any proposal needs more time than the remaining
|
||||
// JobTypeMaxRuntime, extend the execution context accordingly.
|
||||
var maxEstimatedRuntime time.Duration
|
||||
for _, p := range filtered {
|
||||
if p.Parameters != nil {
|
||||
if est, ok := p.Parameters["estimated_runtime_seconds"]; ok {
|
||||
if v := est.GetInt64Value(); v > 0 {
|
||||
if d := time.Duration(v) * time.Second; d > maxEstimatedRuntime {
|
||||
maxEstimatedRuntime = d
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if maxEstimatedRuntime > maxEstimatedRuntimeCap {
|
||||
maxEstimatedRuntime = maxEstimatedRuntimeCap
|
||||
}
|
||||
|
||||
remaining = time.Until(start.Add(maxRuntime))
|
||||
if remaining <= 0 {
|
||||
r.appendActivity(JobActivity{
|
||||
@@ -306,6 +327,17 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
|
||||
return detected
|
||||
}
|
||||
|
||||
// If the longest estimated job exceeds the remaining JobTypeMaxRuntime,
|
||||
// create a new execution context with enough headroom instead of using
|
||||
// jobCtx which would cancel too early.
|
||||
execCtx := jobCtx
|
||||
execCancel := context.CancelFunc(func() {})
|
||||
if maxEstimatedRuntime > 0 && maxEstimatedRuntime > remaining {
|
||||
execCtx, execCancel = context.WithTimeout(context.Background(), maxEstimatedRuntime)
|
||||
remaining = maxEstimatedRuntime
|
||||
}
|
||||
defer execCancel()
|
||||
|
||||
execPolicy := policy
|
||||
if execPolicy.ExecutionTimeout <= 0 {
|
||||
execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout
|
||||
@@ -314,10 +346,10 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
|
||||
execPolicy.ExecutionTimeout = remaining
|
||||
}
|
||||
|
||||
successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy)
|
||||
successCount, errorCount, canceledCount := r.dispatchScheduledProposals(execCtx, jobType, filtered, clusterContext, execPolicy)
|
||||
|
||||
status := "success"
|
||||
if jobCtx.Err() != nil {
|
||||
if execCtx.Err() != nil {
|
||||
status = "timeout"
|
||||
} else if errorCount > 0 || canceledCount > 0 {
|
||||
status = "error"
|
||||
@@ -937,7 +969,24 @@ func (r *Plugin) executeScheduledJobWithExecutor(
|
||||
if parent == nil {
|
||||
parent = context.Background()
|
||||
}
|
||||
execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout)
|
||||
// Use the job's estimated runtime if provided and larger than the
|
||||
// default execution timeout. This lets handlers like vacuum scale
|
||||
// the timeout based on volume size so large volumes are not killed.
|
||||
timeout := policy.ExecutionTimeout
|
||||
if job.Parameters != nil {
|
||||
if est, ok := job.Parameters["estimated_runtime_seconds"]; ok {
|
||||
if v := est.GetInt64Value(); v > 0 {
|
||||
estimated := time.Duration(v) * time.Second
|
||||
if estimated > maxEstimatedRuntimeCap {
|
||||
estimated = maxEstimatedRuntimeCap
|
||||
}
|
||||
if estimated > timeout {
|
||||
timeout = estimated
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
execCtx, cancel := context.WithTimeout(parent, timeout)
|
||||
_, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt))
|
||||
cancel()
|
||||
if err == nil {
|
||||
|
||||
@@ -544,6 +544,10 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo
|
||||
summary = summary + " on " + result.Server
|
||||
}
|
||||
|
||||
// Estimate runtime: 5 min/GB (compact + commit + overhead)
|
||||
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
|
||||
estimatedRuntimeSeconds := volumeSizeGB * 5 * 60
|
||||
|
||||
return &plugin_pb.JobProposal{
|
||||
ProposalId: proposalID,
|
||||
DedupeKey: dedupeKey,
|
||||
@@ -564,6 +568,9 @@ func buildVacuumProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.Jo
|
||||
"collection": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
|
||||
},
|
||||
"estimated_runtime_seconds": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"task_type": "vacuum",
|
||||
|
||||
@@ -25,6 +25,7 @@ type VacuumTask struct {
|
||||
garbageThreshold float64
|
||||
progress float64
|
||||
grpcDialOption grpc.DialOption
|
||||
volumeSize uint64
|
||||
}
|
||||
|
||||
// NewVacuumTask creates a new unified vacuum task instance
|
||||
@@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
|
||||
}
|
||||
|
||||
t.garbageThreshold = vacuumParams.GarbageThreshold
|
||||
t.volumeSize = params.VolumeSize
|
||||
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
@@ -62,7 +64,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
|
||||
// Step 1: Check volume status and garbage ratio
|
||||
t.ReportProgress(10.0)
|
||||
t.GetLogger().Info("Checking volume status")
|
||||
eligible, currentGarbageRatio, err := t.checkVacuumEligibility()
|
||||
eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check vacuum eligibility: %v", err)
|
||||
}
|
||||
@@ -83,14 +85,14 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
|
||||
"threshold": t.garbageThreshold,
|
||||
}).Info("Performing vacuum operation")
|
||||
|
||||
if err := t.performVacuum(); err != nil {
|
||||
if err := t.performVacuum(ctx); err != nil {
|
||||
return fmt.Errorf("failed to perform vacuum: %v", err)
|
||||
}
|
||||
|
||||
// Step 3: Verify vacuum results
|
||||
t.ReportProgress(90.0)
|
||||
t.GetLogger().Info("Verifying vacuum results")
|
||||
if err := t.verifyVacuumResults(); err != nil {
|
||||
if err := t.verifyVacuumResults(ctx); err != nil {
|
||||
glog.Warningf("Vacuum verification failed: %v", err)
|
||||
// Don't fail the task - vacuum operation itself succeeded
|
||||
}
|
||||
@@ -146,15 +148,28 @@ func (t *VacuumTask) GetProgress() float64 {
|
||||
return t.progress
|
||||
}
|
||||
|
||||
// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the
|
||||
// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for
|
||||
// check, 3 minutes for compact).
|
||||
func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration {
|
||||
if t.volumeSize == 0 {
|
||||
glog.V(1).Infof("volume %d has no size metric, using minimum timeout", t.volumeID)
|
||||
}
|
||||
sizeGB := int64(t.volumeSize/1024/1024/1024) + 1
|
||||
return base * time.Duration(sizeGB)
|
||||
}
|
||||
|
||||
// Helper methods for real vacuum operations
|
||||
|
||||
// checkVacuumEligibility checks if the volume meets vacuum criteria
|
||||
func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
|
||||
func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) {
|
||||
var garbageRatio float64
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
|
||||
defer cancel()
|
||||
resp, err := client.VacuumVolumeCheck(checkCtx, &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -178,12 +193,14 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
|
||||
}
|
||||
|
||||
// performVacuum executes the actual vacuum operation
|
||||
func (t *VacuumTask) performVacuum() error {
|
||||
func (t *VacuumTask) performVacuum(ctx context.Context) error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
// Step 1: Compact the volume
|
||||
// Step 1: Compact the volume (3 min per GB, matching topology vacuum)
|
||||
t.GetLogger().Info("Compacting volume")
|
||||
stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
|
||||
compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute))
|
||||
defer compactCancel()
|
||||
stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -202,18 +219,22 @@ func (t *VacuumTask) performVacuum() error {
|
||||
glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
|
||||
}
|
||||
|
||||
// Step 2: Commit the vacuum
|
||||
// Step 2: Commit the vacuum (1 min per GB)
|
||||
t.GetLogger().Info("Committing vacuum operation")
|
||||
_, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
|
||||
commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
|
||||
defer commitCancel()
|
||||
_, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("vacuum commit failed: %v", err)
|
||||
}
|
||||
|
||||
// Step 3: Cleanup old files
|
||||
// Step 3: Cleanup old files (1 min per GB)
|
||||
t.GetLogger().Info("Cleaning up vacuum files")
|
||||
_, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
|
||||
cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
|
||||
defer cleanupCancel()
|
||||
_, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -226,10 +247,12 @@ func (t *VacuumTask) performVacuum() error {
|
||||
}
|
||||
|
||||
// verifyVacuumResults checks the volume status after vacuum
|
||||
func (t *VacuumTask) verifyVacuumResults() error {
|
||||
func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute))
|
||||
defer cancel()
|
||||
resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user