filer refactoring: same auto chunking logic for POST and PUT, no size limit
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -22,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
|
||||
replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) {
|
||||
|
||||
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
|
||||
query := r.URL.Query()
|
||||
@@ -32,28 +33,9 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
|
||||
if maxMB <= 0 && fs.option.MaxMB > 0 {
|
||||
maxMB = int32(fs.option.MaxMB)
|
||||
}
|
||||
if maxMB <= 0 {
|
||||
glog.V(4).Infoln("AutoChunking not enabled")
|
||||
return false
|
||||
}
|
||||
glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
|
||||
|
||||
chunkSize := 1024 * 1024 * maxMB
|
||||
|
||||
contentLength := int64(0)
|
||||
if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
|
||||
contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
|
||||
if contentLength <= int64(chunkSize) {
|
||||
glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if contentLength <= 0 {
|
||||
glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
|
||||
return false
|
||||
}
|
||||
|
||||
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@@ -62,30 +44,32 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
|
||||
|
||||
var reply *FilerPostResult
|
||||
var err error
|
||||
var md5bytes []byte
|
||||
if r.Method == "POST" {
|
||||
reply, err = fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
||||
reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
||||
} else {
|
||||
reply, err = fs.doPutAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
||||
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
|
||||
}
|
||||
if err != nil {
|
||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||
} else if reply != nil {
|
||||
if len(md5bytes) > 0 {
|
||||
w.Header().Set("Content-MD5", util.Base64Encode(md5bytes))
|
||||
}
|
||||
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
|
||||
func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
|
||||
|
||||
multipartReader, multipartReaderErr := r.MultipartReader()
|
||||
if multipartReaderErr != nil {
|
||||
return nil, multipartReaderErr
|
||||
return nil, nil, multipartReaderErr
|
||||
}
|
||||
|
||||
part1, part1Err := multipartReader.NextPart()
|
||||
if part1Err != nil {
|
||||
return nil, part1Err
|
||||
return nil, nil, part1Err
|
||||
}
|
||||
|
||||
fileName := part1.FileName()
|
||||
@@ -97,9 +81,9 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
|
||||
contentType = ""
|
||||
}
|
||||
|
||||
fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
|
||||
fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
|
||||
@@ -108,21 +92,20 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
|
||||
return
|
||||
}
|
||||
|
||||
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset)
|
||||
md5bytes = md5Hash.Sum(nil)
|
||||
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
|
||||
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
|
||||
|
||||
fileName := ""
|
||||
contentType := ""
|
||||
|
||||
fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
|
||||
fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
|
||||
@@ -131,12 +114,26 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
|
||||
return
|
||||
}
|
||||
|
||||
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset)
|
||||
md5bytes = md5Hash.Sum(nil)
|
||||
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5Hash hash.Hash, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) {
|
||||
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) {
|
||||
|
||||
// detect file mode
|
||||
modeStr := r.URL.Query().Get("mode")
|
||||
if modeStr == "" {
|
||||
modeStr = "0660"
|
||||
}
|
||||
mode, err := strconv.ParseUint(modeStr, 8, 32)
|
||||
if err != nil {
|
||||
glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
|
||||
mode = 0660
|
||||
}
|
||||
|
||||
// fix the path
|
||||
path := r.URL.Path
|
||||
if strings.HasSuffix(path, "/") {
|
||||
if fileName != "" {
|
||||
@@ -144,20 +141,28 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||
}
|
||||
}
|
||||
|
||||
// fix the crTime
|
||||
existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
|
||||
crTime := time.Now()
|
||||
if err == nil && existingEntry != nil {
|
||||
crTime = existingEntry.Crtime
|
||||
}
|
||||
|
||||
|
||||
glog.V(4).Infoln("saving", path)
|
||||
entry := &filer2.Entry{
|
||||
FullPath: util.FullPath(path),
|
||||
Attr: filer2.Attr{
|
||||
Mtime: time.Now(),
|
||||
Crtime: time.Now(),
|
||||
Mode: 0660,
|
||||
Crtime: crTime,
|
||||
Mode: os.FileMode(mode),
|
||||
Uid: OS_UID,
|
||||
Gid: OS_GID,
|
||||
Replication: replication,
|
||||
Collection: collection,
|
||||
TtlSec: ttlSec,
|
||||
Mime: contentType,
|
||||
Md5: md5Hash.Sum(nil),
|
||||
Md5: md5bytes,
|
||||
},
|
||||
Chunks: fileChunks,
|
||||
}
|
||||
@@ -176,7 +181,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||
return filerResult, replyerr
|
||||
}
|
||||
|
||||
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) {
|
||||
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) {
|
||||
var fileChunks []*filer_pb.FileChunk
|
||||
|
||||
md5Hash := md5.New()
|
||||
@@ -184,7 +189,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
chunkOffset := int64(0)
|
||||
|
||||
for chunkOffset < contentLength {
|
||||
for {
|
||||
limitedReader := io.LimitReader(partReader, int64(chunkSize))
|
||||
|
||||
// assign one file id for one chunk
|
||||
@@ -207,7 +212,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
|
||||
// Save to chunk manifest structure
|
||||
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
|
||||
|
||||
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
|
||||
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
|
||||
|
||||
// reset variables for the next chunk
|
||||
chunkOffset = chunkOffset + int64(uploadResult.Size)
|
||||
@@ -250,4 +255,3 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe
|
||||
return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user