admin: fix manual job run to use scheduler dispatch with capacity management and retry (#8720)
RunPluginJobTypeAPI previously executed proposals with a naive sequential loop calling ExecutePluginJob per proposal. This had two bugs: 1. Double-lock: RunPluginJobTypeAPI held pluginLock while calling ExecutePluginJob, which tried to re-acquire the same lock for every job in the loop. 2. No capacity management: proposals were fired directly at workers without reserveScheduledExecutor, so every job beyond the worker concurrency limit received an immediate at_capacity error with no retry or backoff. Fix: add Plugin.DispatchProposals which reuses dispatchScheduledProposals - the same code path the scheduler loop uses - with executor reservation, configurable concurrency, and per-job retry with backoff. RunPluginJobTypeAPI now calls DispatchPluginProposals (a thin AdminServer wrapper) after holding pluginLock once. Co-authored-by: Anton Ustyugov <anton@devops>
This commit is contained in:
@@ -1284,6 +1284,23 @@ func (s *AdminServer) RunPluginDetectionWithReport(
|
|||||||
return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
|
return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DispatchPluginProposals dispatches a batch of proposals using the same
|
||||||
|
// capacity-aware dispatch logic as the scheduler loop (executor reservation with
|
||||||
|
// backoff, per-job retry on transient errors). The plugin lock must already be
|
||||||
|
// held by the caller.
|
||||||
|
func (s *AdminServer) DispatchPluginProposals(
|
||||||
|
ctx context.Context,
|
||||||
|
jobType string,
|
||||||
|
proposals []*plugin_pb.JobProposal,
|
||||||
|
clusterContext *plugin_pb.ClusterContext,
|
||||||
|
) (successCount, errorCount, canceledCount int, err error) {
|
||||||
|
if s.plugin == nil {
|
||||||
|
return 0, 0, 0, fmt.Errorf("plugin is not enabled")
|
||||||
|
}
|
||||||
|
sc, ec, cc := s.plugin.DispatchProposals(ctx, jobType, proposals, clusterContext)
|
||||||
|
return sc, ec, cc, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ExecutePluginJob dispatches one job to a capable worker and waits for completion.
|
// ExecutePluginJob dispatches one job to a capable worker and waits for completion.
|
||||||
func (s *AdminServer) ExecutePluginJob(
|
func (s *AdminServer) ExecutePluginJob(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|||||||
@@ -526,40 +526,10 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type executionResult struct {
|
successCount, errorCount, canceledCount, dispatchErr := s.DispatchPluginProposals(ctx, jobType, filteredProposals, clusterContext)
|
||||||
JobID string `json:"job_id"`
|
if dispatchErr != nil {
|
||||||
Success bool `json:"success"`
|
writeJSONError(w, http.StatusInternalServerError, dispatchErr.Error())
|
||||||
Error string `json:"error,omitempty"`
|
return
|
||||||
Completion map[string]interface{} `json:"completion,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
results := make([]executionResult, 0, len(filteredProposals))
|
|
||||||
successCount := 0
|
|
||||||
errorCount := 0
|
|
||||||
|
|
||||||
for index, proposal := range filteredProposals {
|
|
||||||
job := buildJobSpecFromProposal(jobType, proposal, index)
|
|
||||||
completed, execErr := s.ExecutePluginJob(ctx, job, clusterContext, req.Attempt)
|
|
||||||
|
|
||||||
result := executionResult{
|
|
||||||
JobID: job.JobId,
|
|
||||||
Success: execErr == nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
if completed != nil {
|
|
||||||
if payload, marshalErr := protoMessageToMap(completed); marshalErr == nil {
|
|
||||||
result.Completion = payload
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if execErr != nil {
|
|
||||||
result.Error = execErr.Error()
|
|
||||||
errorCount++
|
|
||||||
} else {
|
|
||||||
successCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
results = append(results, result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||||
@@ -567,10 +537,10 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request
|
|||||||
"detected_count": detectedCount,
|
"detected_count": detectedCount,
|
||||||
"ready_to_execute_count": len(filteredProposals),
|
"ready_to_execute_count": len(filteredProposals),
|
||||||
"skipped_active_count": skippedActiveCount,
|
"skipped_active_count": skippedActiveCount,
|
||||||
"executed_count": len(results),
|
"executed_count": successCount + errorCount + canceledCount,
|
||||||
"success_count": successCount,
|
"success_count": successCount,
|
||||||
"error_count": errorCount,
|
"error_count": errorCount,
|
||||||
"execution_results": results,
|
"canceled_count": canceledCount,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1287,6 +1287,35 @@ func isWaitingTrackedJobState(state string) bool {
|
|||||||
return normalized == "pending" || normalized == "job_state_pending"
|
return normalized == "pending" || normalized == "job_state_pending"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DispatchProposals dispatches a batch of proposals using the same capacity-aware
|
||||||
|
// dispatch logic as the scheduler loop: concurrent execution, executor reservation
|
||||||
|
// with backoff, and per-job retry on transient errors. The scheduler policy is
|
||||||
|
// loaded from the persisted job type config; if the job type has no config or is
|
||||||
|
// disabled a sensible default policy is used so manual runs always work.
|
||||||
|
func (r *Plugin) DispatchProposals(
|
||||||
|
ctx context.Context,
|
||||||
|
jobType string,
|
||||||
|
proposals []*plugin_pb.JobProposal,
|
||||||
|
clusterContext *plugin_pb.ClusterContext,
|
||||||
|
) (successCount, errorCount, canceledCount int) {
|
||||||
|
if len(proposals) == 0 {
|
||||||
|
return 0, 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
policy, enabled, err := r.loadSchedulerPolicy(jobType)
|
||||||
|
if err != nil || !enabled {
|
||||||
|
policy = schedulerPolicy{
|
||||||
|
ExecutionConcurrency: defaultScheduledExecutionConcurrency,
|
||||||
|
PerWorkerConcurrency: defaultScheduledPerWorkerConcurrency,
|
||||||
|
ExecutionTimeout: defaultScheduledExecutionTimeout,
|
||||||
|
RetryBackoff: defaultScheduledRetryBackoff,
|
||||||
|
ExecutorReserveBackoff: 200 * time.Millisecond,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.dispatchScheduledProposals(ctx, jobType, proposals, clusterContext, policy)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Plugin) filterScheduledProposals(proposals []*plugin_pb.JobProposal) []*plugin_pb.JobProposal {
|
func (r *Plugin) filterScheduledProposals(proposals []*plugin_pb.JobProposal) []*plugin_pb.JobProposal {
|
||||||
filtered := make([]*plugin_pb.JobProposal, 0, len(proposals))
|
filtered := make([]*plugin_pb.JobProposal, 0, len(proposals))
|
||||||
seenInRun := make(map[string]struct{}, len(proposals))
|
seenInRun := make(map[string]struct{}, len(proposals))
|
||||||
|
|||||||
Reference in New Issue
Block a user