feat: add in-flight metric for s3/file/volume-server (#6120)
This commit is contained in:
@@ -11,6 +11,10 @@ import (
|
|||||||
|
|
||||||
func track(f http.HandlerFunc, action string) http.HandlerFunc {
|
func track(f http.HandlerFunc, action string) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
inFlightGauge := stats_collect.S3InFlightRequestsGauge.WithLabelValues(action)
|
||||||
|
inFlightGauge.Inc()
|
||||||
|
defer inFlightGauge.Dec()
|
||||||
|
|
||||||
bucket, _ := s3_constants.GetBucketAndObject(r)
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
||||||
w.Header().Set("Server", "SeaweedFS "+util.VERSION)
|
w.Header().Set("Server", "SeaweedFS "+util.VERSION)
|
||||||
recorder := stats_collect.NewStatusResponseWriter(w)
|
recorder := stats_collect.NewStatusResponseWriter(w)
|
||||||
|
|||||||
@@ -21,6 +21,11 @@ import (
|
|||||||
|
|
||||||
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
|
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
inFlightGauge := stats.FilerInFlightRequestsGauge.WithLabelValues(r.Method)
|
||||||
|
inFlightGauge.Inc()
|
||||||
|
defer inFlightGauge.Dec()
|
||||||
|
|
||||||
statusRecorder := stats.NewStatusResponseWriter(w)
|
statusRecorder := stats.NewStatusResponseWriter(w)
|
||||||
w = statusRecorder
|
w = statusRecorder
|
||||||
origin := r.Header.Get("Origin")
|
origin := r.Header.Get("Origin")
|
||||||
|
|||||||
@@ -31,6 +31,10 @@ security settings:
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
|
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(r.Method)
|
||||||
|
inFlightGauge.Inc()
|
||||||
|
defer inFlightGauge.Dec()
|
||||||
|
|
||||||
statusRecorder := stats.NewStatusResponseWriter(w)
|
statusRecorder := stats.NewStatusResponseWriter(w)
|
||||||
w = statusRecorder
|
w = statusRecorder
|
||||||
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
|
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
|
||||||
|
|||||||
@@ -127,6 +127,14 @@ var (
|
|||||||
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
|
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
|
||||||
}, []string{"type"})
|
}, []string{"type"})
|
||||||
|
|
||||||
|
FilerInFlightRequestsGauge = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: Namespace,
|
||||||
|
Subsystem: "filer",
|
||||||
|
Name: "in_flight_requests",
|
||||||
|
Help: "Current number of in-flight requests being handled by filer.",
|
||||||
|
}, []string{"type"})
|
||||||
|
|
||||||
FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec(
|
FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: Namespace,
|
Namespace: Namespace,
|
||||||
@@ -210,6 +218,14 @@ var (
|
|||||||
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
|
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
|
||||||
}, []string{"type"})
|
}, []string{"type"})
|
||||||
|
|
||||||
|
VolumeServerInFlightRequestsGauge = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: Namespace,
|
||||||
|
Subsystem: "volumeServer",
|
||||||
|
Name: "in_flight_requests",
|
||||||
|
Help: "Current number of in-flight requests being handled by volume server.",
|
||||||
|
}, []string{"type"})
|
||||||
|
|
||||||
VolumeServerVolumeGauge = prometheus.NewGaugeVec(
|
VolumeServerVolumeGauge = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: Namespace,
|
Namespace: Namespace,
|
||||||
@@ -280,6 +296,13 @@ var (
|
|||||||
Help: "Bucketed histogram of s3 time to first byte request processing time.",
|
Help: "Bucketed histogram of s3 time to first byte request processing time.",
|
||||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 27),
|
Buckets: prometheus.ExponentialBuckets(0.001, 2, 27),
|
||||||
}, []string{"type", "bucket"})
|
}, []string{"type", "bucket"})
|
||||||
|
S3InFlightRequestsGauge = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: Namespace,
|
||||||
|
Subsystem: "s3",
|
||||||
|
Name: "in_flight_requests",
|
||||||
|
Help: "Current number of in-flight requests being handled by s3.",
|
||||||
|
}, []string{"type"})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -295,6 +318,7 @@ func init() {
|
|||||||
Gather.MustRegister(FilerRequestCounter)
|
Gather.MustRegister(FilerRequestCounter)
|
||||||
Gather.MustRegister(FilerHandlerCounter)
|
Gather.MustRegister(FilerHandlerCounter)
|
||||||
Gather.MustRegister(FilerRequestHistogram)
|
Gather.MustRegister(FilerRequestHistogram)
|
||||||
|
Gather.MustRegister(FilerInFlightRequestsGauge)
|
||||||
Gather.MustRegister(FilerStoreCounter)
|
Gather.MustRegister(FilerStoreCounter)
|
||||||
Gather.MustRegister(FilerStoreHistogram)
|
Gather.MustRegister(FilerStoreHistogram)
|
||||||
Gather.MustRegister(FilerSyncOffsetGauge)
|
Gather.MustRegister(FilerSyncOffsetGauge)
|
||||||
@@ -305,6 +329,7 @@ func init() {
|
|||||||
Gather.MustRegister(VolumeServerRequestCounter)
|
Gather.MustRegister(VolumeServerRequestCounter)
|
||||||
Gather.MustRegister(VolumeServerHandlerCounter)
|
Gather.MustRegister(VolumeServerHandlerCounter)
|
||||||
Gather.MustRegister(VolumeServerRequestHistogram)
|
Gather.MustRegister(VolumeServerRequestHistogram)
|
||||||
|
Gather.MustRegister(VolumeServerInFlightRequestsGauge)
|
||||||
Gather.MustRegister(VolumeServerVacuumingCompactCounter)
|
Gather.MustRegister(VolumeServerVacuumingCompactCounter)
|
||||||
Gather.MustRegister(VolumeServerVacuumingCommitCounter)
|
Gather.MustRegister(VolumeServerVacuumingCommitCounter)
|
||||||
Gather.MustRegister(VolumeServerVacuumingHistogram)
|
Gather.MustRegister(VolumeServerVacuumingHistogram)
|
||||||
@@ -317,6 +342,7 @@ func init() {
|
|||||||
Gather.MustRegister(S3RequestCounter)
|
Gather.MustRegister(S3RequestCounter)
|
||||||
Gather.MustRegister(S3HandlerCounter)
|
Gather.MustRegister(S3HandlerCounter)
|
||||||
Gather.MustRegister(S3RequestHistogram)
|
Gather.MustRegister(S3RequestHistogram)
|
||||||
|
Gather.MustRegister(S3InFlightRequestsGauge)
|
||||||
Gather.MustRegister(S3TimeToFirstByteHistogram)
|
Gather.MustRegister(S3TimeToFirstByteHistogram)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,11 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
|
|||||||
|
|
||||||
if s.GetVolume(volumeId) != nil {
|
if s.GetVolume(volumeId) != nil {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(stats.WriteToLocalDisk)
|
||||||
|
inFlightGauge.Inc()
|
||||||
|
defer inFlightGauge.Dec()
|
||||||
|
|
||||||
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
|
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
|
||||||
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds())
|
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -59,6 +64,11 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
|
|||||||
|
|
||||||
if len(remoteLocations) > 0 { //send to other replica locations
|
if len(remoteLocations) > 0 { //send to other replica locations
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(stats.WriteToReplicas)
|
||||||
|
inFlightGauge.Inc()
|
||||||
|
defer inFlightGauge.Dec()
|
||||||
|
|
||||||
err = DistributedOperation(remoteLocations, func(location operation.Location) error {
|
err = DistributedOperation(remoteLocations, func(location operation.Location) error {
|
||||||
u := url.URL{
|
u := url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
|
|||||||
Reference in New Issue
Block a user