Admin UI: replace gin with mux (#8420)
* Replace admin gin router with mux * Update layout_templ.go * Harden admin handlers * Add login CSRF handling * Fix filer copy naming conflict * address comments * address comments
This commit is contained in:
@@ -13,7 +13,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/plugin"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
@@ -32,17 +32,17 @@ const (
|
||||
)
|
||||
|
||||
// GetPluginStatusAPI returns plugin status.
|
||||
func (s *AdminServer) GetPluginStatusAPI(c *gin.Context) {
|
||||
func (s *AdminServer) GetPluginStatusAPI(w http.ResponseWriter, r *http.Request) {
|
||||
plugin := s.GetPlugin()
|
||||
if plugin == nil {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"enabled": false,
|
||||
"worker_grpc_port": s.GetWorkerGrpcPort(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"enabled": true,
|
||||
"configured": plugin.IsConfigured(),
|
||||
"base_dir": plugin.BaseDir(),
|
||||
@@ -52,101 +52,104 @@ func (s *AdminServer) GetPluginStatusAPI(c *gin.Context) {
|
||||
}
|
||||
|
||||
// GetPluginWorkersAPI returns currently connected plugin workers.
|
||||
func (s *AdminServer) GetPluginWorkersAPI(c *gin.Context) {
|
||||
func (s *AdminServer) GetPluginWorkersAPI(w http.ResponseWriter, r *http.Request) {
|
||||
workers := s.GetPluginWorkers()
|
||||
if workers == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
writeJSON(w, http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, workers)
|
||||
writeJSON(w, http.StatusOK, workers)
|
||||
}
|
||||
|
||||
// GetPluginJobTypesAPI returns known plugin job types from workers and persisted data.
|
||||
func (s *AdminServer) GetPluginJobTypesAPI(c *gin.Context) {
|
||||
func (s *AdminServer) GetPluginJobTypesAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobTypes, err := s.ListPluginJobTypes()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if jobTypes == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
writeJSON(w, http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, jobTypes)
|
||||
writeJSON(w, http.StatusOK, jobTypes)
|
||||
}
|
||||
|
||||
// GetPluginJobsAPI returns tracked jobs for monitoring.
|
||||
func (s *AdminServer) GetPluginJobsAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Query("job_type"))
|
||||
state := strings.TrimSpace(c.Query("state"))
|
||||
limit := parsePositiveInt(c.Query("limit"), 200)
|
||||
func (s *AdminServer) GetPluginJobsAPI(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
jobType := strings.TrimSpace(query.Get("job_type"))
|
||||
state := strings.TrimSpace(query.Get("state"))
|
||||
limit := parsePositiveInt(query.Get("limit"), 200)
|
||||
jobs := s.ListPluginJobs(jobType, state, limit)
|
||||
if jobs == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
writeJSON(w, http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, jobs)
|
||||
writeJSON(w, http.StatusOK, jobs)
|
||||
}
|
||||
|
||||
// GetPluginJobAPI returns one tracked job.
|
||||
func (s *AdminServer) GetPluginJobAPI(c *gin.Context) {
|
||||
jobID := strings.TrimSpace(c.Param("jobId"))
|
||||
func (s *AdminServer) GetPluginJobAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobID := strings.TrimSpace(mux.Vars(r)["jobId"])
|
||||
if jobID == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobId is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobId is required")
|
||||
return
|
||||
}
|
||||
|
||||
job, found := s.GetPluginJob(jobID)
|
||||
if !found {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "job not found"})
|
||||
writeJSONError(w, http.StatusNotFound, "job not found")
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, job)
|
||||
writeJSON(w, http.StatusOK, job)
|
||||
}
|
||||
|
||||
// GetPluginJobDetailAPI returns detailed information for one tracked plugin job.
|
||||
func (s *AdminServer) GetPluginJobDetailAPI(c *gin.Context) {
|
||||
jobID := strings.TrimSpace(c.Param("jobId"))
|
||||
func (s *AdminServer) GetPluginJobDetailAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobID := strings.TrimSpace(mux.Vars(r)["jobId"])
|
||||
if jobID == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobId is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobId is required")
|
||||
return
|
||||
}
|
||||
|
||||
activityLimit := parsePositiveInt(c.Query("activity_limit"), 500)
|
||||
relatedLimit := parsePositiveInt(c.Query("related_limit"), 20)
|
||||
query := r.URL.Query()
|
||||
activityLimit := parsePositiveInt(query.Get("activity_limit"), 500)
|
||||
relatedLimit := parsePositiveInt(query.Get("related_limit"), 20)
|
||||
|
||||
detail, found, err := s.GetPluginJobDetail(jobID, activityLimit, relatedLimit)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if !found || detail == nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "job detail not found"})
|
||||
writeJSONError(w, http.StatusNotFound, "job detail not found")
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, detail)
|
||||
writeJSON(w, http.StatusOK, detail)
|
||||
}
|
||||
|
||||
// GetPluginActivitiesAPI returns recent plugin activities.
|
||||
func (s *AdminServer) GetPluginActivitiesAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Query("job_type"))
|
||||
limit := parsePositiveInt(c.Query("limit"), 500)
|
||||
func (s *AdminServer) GetPluginActivitiesAPI(w http.ResponseWriter, r *http.Request) {
|
||||
query := r.URL.Query()
|
||||
jobType := strings.TrimSpace(query.Get("job_type"))
|
||||
limit := parsePositiveInt(query.Get("limit"), 500)
|
||||
activities := s.ListPluginActivities(jobType, limit)
|
||||
if activities == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
writeJSON(w, http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, activities)
|
||||
writeJSON(w, http.StatusOK, activities)
|
||||
}
|
||||
|
||||
// GetPluginSchedulerStatesAPI returns per-job-type scheduler status for monitoring.
|
||||
func (s *AdminServer) GetPluginSchedulerStatesAPI(c *gin.Context) {
|
||||
jobTypeFilter := strings.TrimSpace(c.Query("job_type"))
|
||||
func (s *AdminServer) GetPluginSchedulerStatesAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobTypeFilter := strings.TrimSpace(r.URL.Query().Get("job_type"))
|
||||
|
||||
states, err := s.ListPluginSchedulerStates()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -157,71 +160,71 @@ func (s *AdminServer) GetPluginSchedulerStatesAPI(c *gin.Context) {
|
||||
filtered = append(filtered, state)
|
||||
}
|
||||
}
|
||||
c.JSON(http.StatusOK, filtered)
|
||||
writeJSON(w, http.StatusOK, filtered)
|
||||
return
|
||||
}
|
||||
|
||||
if states == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
writeJSON(w, http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, states)
|
||||
writeJSON(w, http.StatusOK, states)
|
||||
}
|
||||
|
||||
// RequestPluginJobTypeSchemaAPI asks a worker for one job type schema.
|
||||
func (s *AdminServer) RequestPluginJobTypeSchemaAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
forceRefresh := c.DefaultQuery("force_refresh", "false") == "true"
|
||||
forceRefresh := strings.EqualFold(r.URL.Query().Get("force_refresh"), "true")
|
||||
|
||||
ctx, cancel := context.WithTimeout(c.Request.Context(), defaultPluginDetectionTimeout)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), defaultPluginDetectionTimeout)
|
||||
defer cancel()
|
||||
descriptor, err := s.RequestPluginJobTypeDescriptor(ctx, jobType, forceRefresh)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
renderProtoJSON(c, http.StatusOK, descriptor)
|
||||
renderProtoJSON(w, http.StatusOK, descriptor)
|
||||
}
|
||||
|
||||
// GetPluginJobTypeDescriptorAPI returns persisted descriptor for a job type.
|
||||
func (s *AdminServer) GetPluginJobTypeDescriptorAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) GetPluginJobTypeDescriptorAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
descriptor, err := s.LoadPluginJobTypeDescriptor(jobType)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if descriptor == nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "descriptor not found"})
|
||||
writeJSONError(w, http.StatusNotFound, "descriptor not found")
|
||||
return
|
||||
}
|
||||
|
||||
renderProtoJSON(c, http.StatusOK, descriptor)
|
||||
renderProtoJSON(w, http.StatusOK, descriptor)
|
||||
}
|
||||
|
||||
// GetPluginJobTypeConfigAPI loads persisted config for a job type.
|
||||
func (s *AdminServer) GetPluginJobTypeConfigAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) GetPluginJobTypeConfigAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
config, err := s.LoadPluginJobTypeConfig(jobType)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if config == nil {
|
||||
@@ -233,20 +236,20 @@ func (s *AdminServer) GetPluginJobTypeConfigAPI(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
renderProtoJSON(c, http.StatusOK, config)
|
||||
renderProtoJSON(w, http.StatusOK, config)
|
||||
}
|
||||
|
||||
// UpdatePluginJobTypeConfigAPI stores persisted config for a job type.
|
||||
func (s *AdminServer) UpdatePluginJobTypeConfigAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) UpdatePluginJobTypeConfigAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
config := &plugin_pb.PersistedJobTypeConfig{}
|
||||
if err := parseProtoJSONBody(c, config); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
if err := parseProtoJSONBody(w, r, config); err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -264,35 +267,35 @@ func (s *AdminServer) UpdatePluginJobTypeConfigAPI(c *gin.Context) {
|
||||
config.WorkerConfigValues = map[string]*plugin_pb.ConfigValue{}
|
||||
}
|
||||
|
||||
username := c.GetString("username")
|
||||
username := UsernameFromContext(r.Context())
|
||||
if username == "" {
|
||||
username = "admin"
|
||||
}
|
||||
config.UpdatedBy = username
|
||||
|
||||
if err := s.SavePluginJobTypeConfig(config); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
renderProtoJSON(c, http.StatusOK, config)
|
||||
renderProtoJSON(w, http.StatusOK, config)
|
||||
}
|
||||
|
||||
// GetPluginRunHistoryAPI returns bounded run history for a job type.
|
||||
func (s *AdminServer) GetPluginRunHistoryAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) GetPluginRunHistoryAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
history, err := s.GetPluginRunHistory(jobType)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if history == nil {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"job_type": jobType,
|
||||
"successful_runs": []interface{}{},
|
||||
"error_runs": []interface{}{},
|
||||
@@ -301,14 +304,14 @@ func (s *AdminServer) GetPluginRunHistoryAPI(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, history)
|
||||
writeJSON(w, http.StatusOK, history)
|
||||
}
|
||||
|
||||
// TriggerPluginDetectionAPI runs one detector for this job type and returns proposals.
|
||||
func (s *AdminServer) TriggerPluginDetectionAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) TriggerPluginDetectionAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -318,19 +321,19 @@ func (s *AdminServer) TriggerPluginDetectionAPI(c *gin.Context) {
|
||||
TimeoutSeconds int `json:"timeout_seconds"`
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&req); err != nil && err != io.EOF {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body: " + err.Error()})
|
||||
if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil && err != io.EOF {
|
||||
writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
clusterContext, err := s.parseOrBuildClusterContext(req.ClusterContext)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
timeout := normalizeTimeout(req.TimeoutSeconds, defaultPluginDetectionTimeout, maxPluginDetectionTimeout)
|
||||
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
||||
defer cancel()
|
||||
|
||||
report, err := s.RunPluginDetectionWithReport(ctx, jobType, clusterContext, req.MaxResults)
|
||||
@@ -384,7 +387,7 @@ func (s *AdminServer) TriggerPluginDetectionAPI(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
response := gin.H{
|
||||
response := map[string]interface{}{
|
||||
"job_type": jobType,
|
||||
"request_id": requestID,
|
||||
"detector_worker_id": detectorWorkerID,
|
||||
@@ -396,18 +399,18 @@ func (s *AdminServer) TriggerPluginDetectionAPI(c *gin.Context) {
|
||||
|
||||
if err != nil {
|
||||
response["error"] = err.Error()
|
||||
c.JSON(http.StatusInternalServerError, response)
|
||||
writeJSON(w, http.StatusInternalServerError, response)
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, response)
|
||||
writeJSON(w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
// RunPluginJobTypeAPI runs full workflow for one job type: detect then dispatch detected jobs.
|
||||
func (s *AdminServer) RunPluginJobTypeAPI(c *gin.Context) {
|
||||
jobType := strings.TrimSpace(c.Param("jobType"))
|
||||
func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
if jobType == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "jobType is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -418,8 +421,8 @@ func (s *AdminServer) RunPluginJobTypeAPI(c *gin.Context) {
|
||||
Attempt int32 `json:"attempt"`
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&req); err != nil && err != io.EOF {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body: " + err.Error()})
|
||||
if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil && err != io.EOF {
|
||||
writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
|
||||
return
|
||||
}
|
||||
if req.Attempt < 1 {
|
||||
@@ -428,24 +431,24 @@ func (s *AdminServer) RunPluginJobTypeAPI(c *gin.Context) {
|
||||
|
||||
clusterContext, err := s.parseOrBuildClusterContext(req.ClusterContext)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
timeout := normalizeTimeout(req.TimeoutSeconds, defaultPluginRunTimeout, maxPluginRunTimeout)
|
||||
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
||||
defer cancel()
|
||||
|
||||
proposals, err := s.RunPluginDetection(ctx, jobType, clusterContext, req.MaxResults)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
detectedCount := len(proposals)
|
||||
|
||||
filteredProposals, skippedActiveCount, err := s.FilterPluginProposalsWithActiveJobs(jobType, proposals)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -485,7 +488,7 @@ func (s *AdminServer) RunPluginJobTypeAPI(c *gin.Context) {
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"job_type": jobType,
|
||||
"detected_count": detectedCount,
|
||||
"ready_to_execute_count": len(filteredProposals),
|
||||
@@ -498,7 +501,7 @@ func (s *AdminServer) RunPluginJobTypeAPI(c *gin.Context) {
|
||||
}
|
||||
|
||||
// ExecutePluginJobAPI executes one job on a capable worker and waits for completion.
|
||||
func (s *AdminServer) ExecutePluginJobAPI(c *gin.Context) {
|
||||
func (s *AdminServer) ExecutePluginJobAPI(w http.ResponseWriter, r *http.Request) {
|
||||
var req struct {
|
||||
Job json.RawMessage `json:"job"`
|
||||
ClusterContext json.RawMessage `json:"cluster_context"`
|
||||
@@ -506,24 +509,24 @@ func (s *AdminServer) ExecutePluginJobAPI(c *gin.Context) {
|
||||
TimeoutSeconds int `json:"timeout_seconds"`
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body: " + err.Error()})
|
||||
if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
|
||||
return
|
||||
}
|
||||
if len(req.Job) == 0 {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "job is required"})
|
||||
writeJSONError(w, http.StatusBadRequest, "job is required")
|
||||
return
|
||||
}
|
||||
|
||||
job := &plugin_pb.JobSpec{}
|
||||
if err := (protojson.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(req.Job, job); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid job payload: " + err.Error()})
|
||||
writeJSONError(w, http.StatusBadRequest, "invalid job payload: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
clusterContext, err := s.parseOrBuildClusterContext(req.ClusterContext)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -532,7 +535,7 @@ func (s *AdminServer) ExecutePluginJobAPI(c *gin.Context) {
|
||||
}
|
||||
|
||||
timeout := normalizeTimeout(req.TimeoutSeconds, defaultPluginExecutionTimeout, maxPluginExecutionTimeout)
|
||||
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
||||
defer cancel()
|
||||
|
||||
completed, err := s.ExecutePluginJob(ctx, job, clusterContext, req.Attempt)
|
||||
@@ -540,15 +543,15 @@ func (s *AdminServer) ExecutePluginJobAPI(c *gin.Context) {
|
||||
if completed != nil {
|
||||
payload, marshalErr := protoMessageToMap(completed)
|
||||
if marshalErr == nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error(), "completion": payload})
|
||||
writeJSON(w, http.StatusInternalServerError, map[string]interface{}{"error": err.Error(), "completion": payload})
|
||||
return
|
||||
}
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
renderProtoJSON(c, http.StatusOK, completed)
|
||||
renderProtoJSON(w, http.StatusOK, completed)
|
||||
}
|
||||
|
||||
func (s *AdminServer) parseOrBuildClusterContext(raw json.RawMessage) (*plugin_pb.ClusterContext, error) {
|
||||
@@ -636,8 +639,8 @@ func (s *AdminServer) buildDefaultPluginClusterContext() *plugin_pb.ClusterConte
|
||||
|
||||
const parseProtoJSONBodyMaxBytes = 1 << 20 // 1 MB
|
||||
|
||||
func parseProtoJSONBody(c *gin.Context, message proto.Message) error {
|
||||
limitedBody := http.MaxBytesReader(c.Writer, c.Request.Body, parseProtoJSONBodyMaxBytes)
|
||||
func parseProtoJSONBody(w http.ResponseWriter, r *http.Request, message proto.Message) error {
|
||||
limitedBody := http.MaxBytesReader(w, r.Body, parseProtoJSONBodyMaxBytes)
|
||||
data, err := io.ReadAll(limitedBody)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read request body: %w", err)
|
||||
@@ -651,17 +654,19 @@ func parseProtoJSONBody(c *gin.Context, message proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func renderProtoJSON(c *gin.Context, statusCode int, message proto.Message) {
|
||||
func renderProtoJSON(w http.ResponseWriter, statusCode int, message proto.Message) {
|
||||
payload, err := protojson.MarshalOptions{
|
||||
UseProtoNames: true,
|
||||
EmitUnpopulated: true,
|
||||
}.Marshal(message)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to encode response: " + err.Error()})
|
||||
writeJSONError(w, http.StatusInternalServerError, "failed to encode response: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
c.Data(statusCode, "application/json", payload)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(statusCode)
|
||||
_, _ = w.Write(payload)
|
||||
}
|
||||
|
||||
func protoMessageToMap(message proto.Message) (map[string]interface{}, error) {
|
||||
|
||||
Reference in New Issue
Block a user