Merge branch 'master' into mq-subscribe

This commit is contained in:
Chris Lu
2024-02-04 10:44:01 -08:00
38 changed files with 378 additions and 261 deletions

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"io"
"io/fs"
"mime/multipart"
@@ -18,6 +17,9 @@ import (
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -282,7 +284,7 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) error {
func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error {
rangeReq := r.Header.Get("Range")
bufferedWriter := writePool.Get().(*bufio.Writer)
bufferedWriter.Reset(w)
@@ -293,7 +295,13 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
if err := writeFn(bufferedWriter, 0, totalSize); err != nil {
writeFn, err := prepareWriteFn(0, totalSize)
if err != nil {
glog.Errorf("processRangeRequest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("processRangeRequest: %v", err)
}
if err = writeFn(bufferedWriter); err != nil {
glog.Errorf("processRangeRequest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("processRangeRequest: %v", err)
@@ -335,8 +343,14 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
writeFn, err := prepareWriteFn(ra.start, ra.length)
if err != nil {
glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("processRangeRequest: %v", err)
}
w.WriteHeader(http.StatusPartialContent)
err = writeFn(bufferedWriter, ra.start, ra.length)
err = writeFn(bufferedWriter)
if err != nil {
glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -346,11 +360,20 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
}
// process multiple ranges
for _, ra := range ranges {
writeFnByRange := make(map[int](func(writer io.Writer) error))
for i, ra := range ranges {
if ra.start > totalSize {
http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("out of range: %v", err)
}
writeFn, err := prepareWriteFn(ra.start, ra.length)
if err != nil {
glog.Errorf("processRangeRequest range[%d] err: %v", i, err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return fmt.Errorf("processRangeRequest range[%d] err: %v", i, err)
}
writeFnByRange[i] = writeFn
}
sendSize := rangesMIMESize(ranges, mimeType, totalSize)
pr, pw := io.Pipe()
@@ -359,13 +382,18 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
sendContent := pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
for i, ra := range ranges {
part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
if e != nil {
pw.CloseWithError(e)
return
}
if e = writeFn(part, ra.start, ra.length); e != nil {
writeFn := writeFnByRange[i]
if writeFn == nil {
pw.CloseWithError(e)
return
}
if e = writeFn(part); e != nil {
pw.CloseWithError(e)
return
}

View File

@@ -67,12 +67,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent
ifModifiedSinceHeader := r.Header.Get("If-Modified-Since")
if ifNoneMatchETagHeader != "" {
if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) {
setEtag(w, etag)
w.WriteHeader(http.StatusNotModified)
return true
}
} else if ifModifiedSinceHeader != "" {
if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil {
if !t.Before(entry.Attr.Mtime) {
setEtag(w, etag)
w.WriteHeader(http.StatusNotModified)
return true
}
@@ -147,11 +149,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
etag := filer.ETagEntry(entry)
if checkPreconditions(w, r, entry) {
return
}
etag := filer.ETagEntry(entry)
w.Header().Set("Accept-Ranges", "bytes")
// mime type
@@ -229,14 +231,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
glog.Errorf("failed to write entry content: %v", err)
}
return err
return func(writer io.Writer) error {
_, err := writer.Write(entry.Content[offset : offset+size])
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
glog.Errorf("failed to write entry content: %v", err)
}
return err
}, nil
}
chunks := entry.GetChunks()
if entry.IsInRemoteOnly() {
@@ -247,17 +251,25 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}); err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc()
glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
return fmt.Errorf("cache %s: %v", entry.FullPath, err)
return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err)
} else {
chunks = resp.Entry.GetChunks()
}
}
err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err)
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
return nil, err
}
return err
return func(writer io.Writer) error {
err := streamFn(writer)
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err)
}
return err
}, nil
})
}

View File

@@ -15,6 +15,7 @@ import (
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
@@ -382,12 +383,14 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
return processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if _, e = rs.Seek(offset, 0); e != nil {
return processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
return func(writer io.Writer) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
}
_, e = io.CopyN(writer, rs, size)
return e
}
_, e = io.CopyN(writer, rs, size)
return e
}, nil
})
}
@@ -409,8 +412,10 @@ func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType str
return
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
return func(writer io.Writer) error {
return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
}, nil
})
}