filer write request use context without cancellation (#7567)
* filer use context without cancellation * pass along context
This commit is contained in:
@@ -371,7 +371,7 @@ func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction
|
|||||||
} else {
|
} else {
|
||||||
reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
|
reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
|
||||||
}
|
}
|
||||||
req, postErr := http.NewRequest(http.MethodPost, option.UploadUrl, reqReader)
|
req, postErr := http.NewRequestWithContext(ctx, http.MethodPost, option.UploadUrl, reqReader)
|
||||||
if postErr != nil {
|
if postErr != nil {
|
||||||
glog.V(1).InfofCtx(ctx, "create upload request %s: %v", option.UploadUrl, postErr)
|
glog.V(1).InfofCtx(ctx, "create upload request %s: %v", option.UploadUrl, postErr)
|
||||||
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
|
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
|
||||||
|
|||||||
@@ -45,7 +45,10 @@ func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.Stor
|
|||||||
|
|
||||||
ar, altRequest := so.ToAssignRequests(1)
|
ar, altRequest := so.ToAssignRequests(1)
|
||||||
|
|
||||||
assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
// Use a context that ignores cancellation from the request context
|
||||||
|
assignCtx := context.WithoutCancel(ctx)
|
||||||
|
|
||||||
|
assignResult, ae := operation.Assign(assignCtx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
||||||
if ae != nil {
|
if ae != nil {
|
||||||
glog.ErrorfCtx(ctx, "failing to assign a file id: %v", ae)
|
glog.ErrorfCtx(ctx, "failing to assign a file id: %v", ae)
|
||||||
err = ae
|
err = ae
|
||||||
|
|||||||
@@ -339,7 +339,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength)
|
dbErr := fs.filer.CreateEntry(context.WithoutCancel(ctx), entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength)
|
||||||
if dbErr != nil {
|
if dbErr != nil {
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
@@ -380,7 +380,8 @@ func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOpt
|
|||||||
}
|
}
|
||||||
|
|
||||||
var uploadErr error
|
var uploadErr error
|
||||||
uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption)
|
uploadCtx := context.WithoutCancel(ctx)
|
||||||
|
uploadResult, uploadErr, _ = uploader.Upload(uploadCtx, reader, uploadOption)
|
||||||
if uploadErr != nil {
|
if uploadErr != nil {
|
||||||
return uploadErr
|
return uploadErr
|
||||||
}
|
}
|
||||||
@@ -436,7 +437,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
|
|||||||
Name: util.FullPath(path).Name(),
|
Name: util.FullPath(path).Name(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
|
if dbErr := fs.filer.CreateEntry(context.WithoutCancel(ctx), entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
|
||||||
replyerr = dbErr
|
replyerr = dbErr
|
||||||
filerResult.Error = dbErr.Error()
|
filerResult.Error = dbErr.Error()
|
||||||
glog.V(0).InfofCtx(ctx, "failing to create dir %s on filer server : %v", path, dbErr)
|
glog.V(0).InfofCtx(ctx, "failing to create dir %s on filer server : %v", path, dbErr)
|
||||||
|
|||||||
@@ -185,7 +185,10 @@ func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limited
|
|||||||
return nil, err, []byte{}
|
return nil, err, []byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption)
|
// Use a context that ignores cancellation from the request context
|
||||||
|
uploadCtx := context.WithoutCancel(ctx)
|
||||||
|
|
||||||
|
uploadResult, err, data := uploader.Upload(uploadCtx, limitedReader, uploadOption)
|
||||||
if uploadResult != nil && uploadResult.RetryCount > 0 {
|
if uploadResult != nil && uploadResult.RetryCount > 0 {
|
||||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
|
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
|
||||||
}
|
}
|
||||||
@@ -244,7 +247,6 @@ func (fs *FilerServer) dataToChunkWithSSE(ctx context.Context, r *http.Request,
|
|||||||
var sseType filer_pb.SSEType = filer_pb.SSEType_NONE
|
var sseType filer_pb.SSEType = filer_pb.SSEType_NONE
|
||||||
var sseMetadata []byte
|
var sseMetadata []byte
|
||||||
|
|
||||||
|
|
||||||
// Create chunk with SSE metadata if available
|
// Create chunk with SSE metadata if available
|
||||||
var chunk *filer_pb.FileChunk
|
var chunk *filer_pb.FileChunk
|
||||||
if sseType != filer_pb.SSEType_NONE {
|
if sseType != filer_pb.SSEType_NONE {
|
||||||
|
|||||||
Reference in New Issue
Block a user