[volume] refactor and add metrics for flight upload and download data limit condition (#6920)

* refactor concurrentDownloadLimit

* fix loop

* fix cmdServer

* fix: resolve conversation pr 6920

* Changes logging function (#6919)

* updated logging methods for stores

* updated logging methods for stores

* updated logging methods for filer

* updated logging methods for uploader and http_util

* updated logging methods for weed server

---------

Co-authored-by: akosov <a.kosov@kryptonite.ru>

* Improve lock ring (#6921)

* fix flaky lock ring test

* add more tests

* fix: build

* fix: rm import util/version

* fix: serverOptions

* refactoring

---------

Co-authored-by: Aleksey Kosov <rusyak777@list.ru>
Co-authored-by: akosov <a.kosov@kryptonite.ru>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
This commit is contained in:
Konstantin Lebedev
2025-07-03 06:03:49 +05:00
committed by GitHub
parent 1db7c2b8aa
commit 93007c1842
10 changed files with 410 additions and 134 deletions

View File

@@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"mime"
"net/http"
@@ -17,19 +19,18 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/images"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const reqIsProxied = "proxied"
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func NotFound(w http.ResponseWriter) {
@@ -42,6 +43,90 @@ func InternalError(w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)
}
func (vs *VolumeServer) proxyReqToTargetServer(w http.ResponseWriter, r *http.Request) {
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
volumeId, err := needle.NewVolumeId(vid)
if err != nil {
glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err)
w.WriteHeader(http.StatusBadRequest)
return
}
lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String())
if err != nil || len(lookupResult.Locations) <= 0 {
glog.V(0).Infoln("lookup error:", err, r.URL.Path)
NotFound(w)
return
}
var tragetUrl *url.URL
location := fmt.Sprintf("%s:%d", vs.store.Ip, vs.store.Port)
for _, loc := range lookupResult.Locations {
if !strings.Contains(loc.Url, location) {
rawURL, _ := util_http.NormalizeUrl(loc.Url)
tragetUrl, _ = url.Parse(rawURL)
break
}
}
if tragetUrl == nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.EmptyReadProxyLoc).Inc()
glog.Errorf("failed lookup target host is empty locations: %+v, %s", lookupResult.Locations, location)
NotFound(w)
return
}
if vs.ReadMode == "proxy" {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadProxyReq).Inc()
// proxy client request to target server
r.URL.Host = tragetUrl.Host
r.URL.Scheme = tragetUrl.Scheme
r.URL.Query().Add(reqIsProxied, "true")
request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
if err != nil {
glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
for k, vv := range r.Header {
for _, v := range vv {
request.Header.Add(k, v)
}
}
response, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.FailedReadProxyReq).Inc()
glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
defer util_http.CloseResponse(response)
// proxy target response to client
for k, vv := range response.Header {
if k == "Server" {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(response.StatusCode)
buf := mem.Allocate(128 * 1024)
defer mem.Free(buf)
io.CopyBuffer(w, response.Body, buf)
return
} else {
// redirect
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadRedirectReq).Inc()
tragetUrl.Path = fmt.Sprintf("%s/%s,%s", tragetUrl.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
arg.Set("collection", c)
}
arg.Set(reqIsProxied, "true")
tragetUrl.RawQuery = arg.Encode()
http.Redirect(w, r, tragetUrl.String(), http.StatusMovedPermanently)
return
}
}
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
@@ -73,62 +158,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
NotFound(w)
return
}
lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err != nil || len(lookupResult.Locations) <= 0 {
glog.V(0).Infoln("lookup error:", err, r.URL.Path)
NotFound(w)
return
}
if vs.ReadMode == "proxy" {
// proxy client request to target server
rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url)
u, _ := url.Parse(rawURL)
r.URL.Host = u.Host
r.URL.Scheme = u.Scheme
request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
if err != nil {
glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
for k, vv := range r.Header {
for _, v := range vv {
request.Header.Add(k, v)
}
}
response, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
defer util_http.CloseResponse(response)
// proxy target response to client
for k, vv := range response.Header {
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(response.StatusCode)
buf := mem.Allocate(128 * 1024)
defer mem.Free(buf)
io.CopyBuffer(w, response.Body, buf)
return
} else {
// redirect
rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl)
u, _ := url.Parse(rawURL)
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
arg.Set("collection", c)
}
u.RawQuery = arg.Encode()
http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
return
}
vs.proxyReqToTargetServer(w, r)
return
}
cookie := n.Cookie
@@ -145,14 +176,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
memoryCost = size
atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(memoryCost))
}
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn)
}
defer func() {
atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(memoryCost))
vs.inFlightDownloadDataLimitCond.Signal()
if vs.concurrentDownloadLimit != 0 {
vs.inFlightDownloadDataLimitCond.Broadcast()
}
}()
if err != nil && err != storage.ErrorDeleted && hasVolume {