Merge branch 'master' into mq-subscribe
This commit is contained in:
@@ -107,7 +107,7 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi
|
||||
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
|
||||
return err
|
||||
}
|
||||
err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
|
||||
err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -123,7 +123,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
|
||||
return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
|
||||
}
|
||||
|
||||
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
|
||||
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
|
||||
|
||||
var shouldRetry bool
|
||||
var totalWritten int
|
||||
@@ -132,7 +132,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe
|
||||
for _, urlString := range urlStrings {
|
||||
var localProcessed int
|
||||
var writeErr error
|
||||
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
||||
shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
|
||||
if totalWritten > localProcessed {
|
||||
toBeSkipped := totalWritten - localProcessed
|
||||
if len(data) <= toBeSkipped {
|
||||
|
||||
@@ -303,7 +303,7 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
|
||||
}
|
||||
}
|
||||
}
|
||||
if count < limit && lastFileName <= prefix {
|
||||
if count < limit && lastFileName < prefix {
|
||||
notPrefixed = notPrefixed[:0]
|
||||
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
|
||||
notPrefixed = append(notPrefixed, entry)
|
||||
|
||||
@@ -69,11 +69,17 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
|
||||
|
||||
type DoStreamContent func(writer io.Writer) error
|
||||
|
||||
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
|
||||
return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0)
|
||||
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
|
||||
return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0)
|
||||
}
|
||||
|
||||
func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
|
||||
type VolumeServerJwtFunction func(fileId string) string
|
||||
|
||||
func noJwtFunc(string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
|
||||
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
|
||||
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
|
||||
|
||||
@@ -119,7 +125,8 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
|
||||
}
|
||||
urlStrings := fileId2Url[chunkView.FileId]
|
||||
start := time.Now()
|
||||
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
|
||||
jwt := jwtFunc(chunkView.FileId)
|
||||
err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
|
||||
offset += int64(chunkView.ViewSize)
|
||||
remaining -= int64(chunkView.ViewSize)
|
||||
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
|
||||
@@ -143,7 +150,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
|
||||
}
|
||||
|
||||
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
|
||||
streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size)
|
||||
streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ type SftpServer struct {
|
||||
|
||||
var _ = ftpserver.MainDriver(&SftpServer{})
|
||||
|
||||
// NewServer returns a new FTP server driver
|
||||
// NewFtpServer returns a new FTP server driver
|
||||
func NewFtpServer(ftpListener net.Listener, option *FtpServerOption) (*SftpServer, error) {
|
||||
var err error
|
||||
server := &SftpServer{
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"google.golang.org/grpc"
|
||||
"sync"
|
||||
@@ -193,6 +194,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
|
||||
})
|
||||
|
||||
if lastError != nil {
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -262,6 +264,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
|
||||
WritableVolumeCount: so.VolumeGrowthCount,
|
||||
}
|
||||
if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
|
||||
ar.WritableVolumeCount = uint32(count)
|
||||
altRequest = &VolumeAssignRequest{
|
||||
Count: uint64(count),
|
||||
Replication: so.Replication,
|
||||
|
||||
@@ -311,7 +311,7 @@ func parseSignature(signElement string) (string, s3err.ErrorCode) {
|
||||
return signature, s3err.ErrNone
|
||||
}
|
||||
|
||||
// doesPolicySignatureMatch - Verify query headers with post policy
|
||||
// doesPolicySignatureV4Match - Verify query headers with post policy
|
||||
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html
|
||||
//
|
||||
// returns ErrNone if the signature matches.
|
||||
|
||||
@@ -262,7 +262,7 @@ func getMD5HashBase64(data []byte) string {
|
||||
return base64.StdEncoding.EncodeToString(getMD5Sum(data))
|
||||
}
|
||||
|
||||
// getSHA256Hash returns SHA-256 sum of given data.
|
||||
// getSHA256Sum returns SHA-256 sum of given data.
|
||||
func getSHA256Sum(data []byte) []byte {
|
||||
hash := sha256.New()
|
||||
hash.Write(data)
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||
"golang.org/x/exp/slices"
|
||||
"math"
|
||||
"path/filepath"
|
||||
@@ -16,12 +17,19 @@ import (
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/google/uuid"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
)
|
||||
|
||||
const (
|
||||
multipartExt = ".part"
|
||||
multiPartMinSize = 5 * 1024 * 1024
|
||||
)
|
||||
|
||||
type InitiateMultipartUploadResult struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
|
||||
s3.CreateMultipartUploadOutput
|
||||
@@ -70,60 +78,128 @@ type CompleteMultipartUploadResult struct {
|
||||
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
|
||||
|
||||
glog.V(2).Infof("completeMultipartUpload input %v", input)
|
||||
|
||||
completedParts := parts.Parts
|
||||
slices.SortFunc(completedParts, func(a, b CompletedPart) int {
|
||||
return a.PartNumber - b.PartNumber
|
||||
})
|
||||
if len(parts.Parts) == 0 {
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||
return nil, s3err.ErrNoSuchUpload
|
||||
}
|
||||
completedPartNumbers := []int{}
|
||||
completedPartMap := make(map[int][]string)
|
||||
for _, part := range parts.Parts {
|
||||
if _, ok := completedPartMap[part.PartNumber]; !ok {
|
||||
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
|
||||
}
|
||||
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
|
||||
}
|
||||
sort.Ints(completedPartNumbers)
|
||||
|
||||
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
||||
|
||||
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList)
|
||||
if err != nil || len(entries) == 0 {
|
||||
if err != nil {
|
||||
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||
return nil, s3err.ErrNoSuchUpload
|
||||
}
|
||||
|
||||
if len(entries) == 0 {
|
||||
entryName, dirName := s3a.getEntryNameAndDir(input)
|
||||
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
|
||||
if uploadId, ok := entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id]; ok && *input.UploadId == string(uploadId) {
|
||||
return &CompleteMultipartUploadResult{
|
||||
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
|
||||
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
|
||||
Bucket: input.Bucket,
|
||||
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
|
||||
Key: objectKey(input.Key),
|
||||
},
|
||||
}, s3err.ErrNone
|
||||
}
|
||||
}
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||
return nil, s3err.ErrNoSuchUpload
|
||||
}
|
||||
|
||||
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
|
||||
if err != nil {
|
||||
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
|
||||
return nil, s3err.ErrNoSuchUpload
|
||||
}
|
||||
|
||||
// check whether completedParts is more than received parts
|
||||
{
|
||||
partNumbers := make(map[int]struct{}, len(entries))
|
||||
for _, entry := range entries {
|
||||
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
||||
partNumberString := entry.Name[:len(entry.Name)-len(".part")]
|
||||
partNumber, err := strconv.Atoi(partNumberString)
|
||||
if err == nil {
|
||||
partNumbers[partNumber] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, part := range completedParts {
|
||||
if _, found := partNumbers[part.PartNumber]; !found {
|
||||
return nil, s3err.ErrInvalidPart
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mime := pentry.Attributes.Mime
|
||||
|
||||
var finalParts []*filer_pb.FileChunk
|
||||
var offset int64
|
||||
|
||||
deleteEntries := []*filer_pb.Entry{}
|
||||
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
|
||||
entityTooSmall := false
|
||||
for _, entry := range entries {
|
||||
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
||||
partETag, found := findByPartNumber(entry.Name, completedParts)
|
||||
if !found {
|
||||
foundEntry := false
|
||||
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
|
||||
if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
|
||||
continue
|
||||
}
|
||||
partNumber, err := parsePartNumber(entry.Name)
|
||||
if err != nil {
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
|
||||
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err)
|
||||
continue
|
||||
}
|
||||
completedPartsByNumber, ok := completedPartMap[partNumber]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, partETag := range completedPartsByNumber {
|
||||
partETag = strings.Trim(partETag, `"`)
|
||||
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
|
||||
if partETag != "" && len(partETag) == 32 && entryETag != "" {
|
||||
if entryETag != partETag {
|
||||
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag)
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
|
||||
}
|
||||
if len(entry.Chunks) == 0 {
|
||||
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
|
||||
continue
|
||||
}
|
||||
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
|
||||
if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
|
||||
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
|
||||
return nil, s3err.ErrInvalidPart
|
||||
//there maybe multi same part, because of client retry
|
||||
partEntries[partNumber] = append(partEntries[partNumber], entry)
|
||||
foundEntry = true
|
||||
}
|
||||
if foundEntry {
|
||||
if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] &&
|
||||
entry.Attributes.FileSize < multiPartMinSize {
|
||||
glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name)
|
||||
entityTooSmall = true
|
||||
}
|
||||
} else {
|
||||
deleteEntries = append(deleteEntries, entry)
|
||||
}
|
||||
}
|
||||
if entityTooSmall {
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc()
|
||||
return nil, s3err.ErrEntityTooSmall
|
||||
}
|
||||
mime := pentry.Attributes.Mime
|
||||
var finalParts []*filer_pb.FileChunk
|
||||
var offset int64
|
||||
for _, partNumber := range completedPartNumbers {
|
||||
partEntriesByNumber, ok := partEntries[partNumber]
|
||||
if !ok {
|
||||
glog.Errorf("part %d has no entry", partNumber)
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
|
||||
return nil, s3err.ErrInvalidPart
|
||||
}
|
||||
found := false
|
||||
if len(partEntriesByNumber) > 1 {
|
||||
slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int {
|
||||
return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs)
|
||||
})
|
||||
}
|
||||
for _, entry := range partEntriesByNumber {
|
||||
if found {
|
||||
deleteEntries = append(deleteEntries, entry)
|
||||
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc()
|
||||
continue
|
||||
}
|
||||
for _, chunk := range entry.GetChunks() {
|
||||
p := &filer_pb.FileChunk{
|
||||
@@ -137,28 +213,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||
finalParts = append(finalParts, p)
|
||||
offset += int64(chunk.Size)
|
||||
}
|
||||
found = true
|
||||
}
|
||||
}
|
||||
|
||||
entryName := filepath.Base(*input.Key)
|
||||
dirName := filepath.ToSlash(filepath.Dir(*input.Key))
|
||||
if dirName == "." {
|
||||
dirName = ""
|
||||
}
|
||||
if strings.HasPrefix(dirName, "/") {
|
||||
dirName = dirName[1:]
|
||||
}
|
||||
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
|
||||
|
||||
// remove suffix '/'
|
||||
if strings.HasSuffix(dirName, "/") {
|
||||
dirName = dirName[:len(dirName)-1]
|
||||
}
|
||||
|
||||
entryName, dirName := s3a.getEntryNameAndDir(input)
|
||||
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id] = []byte(*input.UploadId)
|
||||
for k, v := range pentry.Extended {
|
||||
if k != "key" {
|
||||
entry.Extended[k] = v
|
||||
@@ -186,6 +250,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||
},
|
||||
}
|
||||
|
||||
for _, deleteEntry := range deleteEntries {
|
||||
//delete unused part data
|
||||
glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name)
|
||||
if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
|
||||
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
|
||||
}
|
||||
}
|
||||
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
|
||||
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
|
||||
}
|
||||
@@ -193,29 +264,33 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
||||
return
|
||||
}
|
||||
|
||||
func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) {
|
||||
partNumber, formatErr := strconv.Atoi(fileName[:4])
|
||||
if formatErr != nil {
|
||||
return
|
||||
func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) {
|
||||
entryName := filepath.Base(*input.Key)
|
||||
dirName := filepath.ToSlash(filepath.Dir(*input.Key))
|
||||
if dirName == "." {
|
||||
dirName = ""
|
||||
}
|
||||
x := sort.Search(len(parts), func(i int) bool {
|
||||
return parts[i].PartNumber >= partNumber
|
||||
})
|
||||
if x >= len(parts) {
|
||||
return
|
||||
if strings.HasPrefix(dirName, "/") {
|
||||
dirName = dirName[1:]
|
||||
}
|
||||
if parts[x].PartNumber != partNumber {
|
||||
return
|
||||
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
|
||||
|
||||
// remove suffix '/'
|
||||
if strings.HasSuffix(dirName, "/") {
|
||||
dirName = dirName[:len(dirName)-1]
|
||||
}
|
||||
y := 0
|
||||
for i, part := range parts[x:] {
|
||||
if part.PartNumber == partNumber {
|
||||
y = i
|
||||
} else {
|
||||
break
|
||||
}
|
||||
return entryName, dirName
|
||||
}
|
||||
|
||||
func parsePartNumber(fileName string) (int, error) {
|
||||
var partNumberString string
|
||||
index := strings.Index(fileName, "_")
|
||||
if index != -1 {
|
||||
partNumberString = fileName[:index]
|
||||
} else {
|
||||
partNumberString = fileName[:len(fileName)-len(multipartExt)]
|
||||
}
|
||||
return parts[x+y].ETag, true
|
||||
return strconv.Atoi(partNumberString)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
|
||||
@@ -331,7 +406,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
|
||||
StorageClass: aws.String("STANDARD"),
|
||||
}
|
||||
|
||||
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
|
||||
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts))
|
||||
if err != nil {
|
||||
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
|
||||
return nil, s3err.ErrNoSuchUpload
|
||||
@@ -343,9 +418,8 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
|
||||
output.IsTruncated = aws.Bool(!isLast)
|
||||
|
||||
for _, entry := range entries {
|
||||
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
||||
partNumberString := entry.Name[:len(entry.Name)-len(".part")]
|
||||
partNumber, err := strconv.Atoi(partNumberString)
|
||||
if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory {
|
||||
partNumber, err := parsePartNumber(entry.Name)
|
||||
if err != nil {
|
||||
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
|
||||
continue
|
||||
|
||||
@@ -50,88 +50,27 @@ func TestListPartsResult(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func Test_findByPartNumber(t *testing.T) {
|
||||
type args struct {
|
||||
fileName string
|
||||
parts []CompletedPart
|
||||
}
|
||||
|
||||
parts := []CompletedPart{
|
||||
{
|
||||
ETag: "xxx",
|
||||
PartNumber: 1,
|
||||
},
|
||||
{
|
||||
ETag: "lll",
|
||||
PartNumber: 1,
|
||||
},
|
||||
{
|
||||
ETag: "yyy",
|
||||
PartNumber: 3,
|
||||
},
|
||||
{
|
||||
ETag: "zzz",
|
||||
PartNumber: 5,
|
||||
},
|
||||
}
|
||||
|
||||
func Test_parsePartNumber(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantEtag string
|
||||
wantFound bool
|
||||
name string
|
||||
fileName string
|
||||
partNum int
|
||||
}{
|
||||
{
|
||||
"first",
|
||||
args{
|
||||
"0001.part",
|
||||
parts,
|
||||
},
|
||||
"lll",
|
||||
true,
|
||||
"0001_uuid.part",
|
||||
1,
|
||||
},
|
||||
{
|
||||
"second",
|
||||
args{
|
||||
"0002.part",
|
||||
parts,
|
||||
},
|
||||
"",
|
||||
false,
|
||||
},
|
||||
{
|
||||
"third",
|
||||
args{
|
||||
"0003.part",
|
||||
parts,
|
||||
},
|
||||
"yyy",
|
||||
true,
|
||||
},
|
||||
{
|
||||
"fourth",
|
||||
args{
|
||||
"0004.part",
|
||||
parts,
|
||||
},
|
||||
"",
|
||||
false,
|
||||
},
|
||||
{
|
||||
"fifth",
|
||||
args{
|
||||
"0005.part",
|
||||
parts,
|
||||
},
|
||||
"zzz",
|
||||
true,
|
||||
"0002.part",
|
||||
2,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts)
|
||||
assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
|
||||
assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
|
||||
partNumber, _ := parsePartNumber(tt.fileName)
|
||||
assert.Equalf(t, tt.partNum, partNumber, "parsePartNumber(%v)", tt.fileName)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ const (
|
||||
AmzTagCount = "x-amz-tagging-count"
|
||||
|
||||
X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key"
|
||||
X_SeaweedFS_Header_Upload_Id = "X-Seaweedfs-Upload-Id"
|
||||
|
||||
// S3 ACL headers
|
||||
AmzCannedAcl = "X-Amz-Acl"
|
||||
|
||||
@@ -351,7 +351,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
|
||||
|
||||
}
|
||||
|
||||
// DeleteBucketMetricsConfiguration Delete Bucket Lifecycle
|
||||
// DeleteBucketLifecycleHandler Delete Bucket Lifecycle
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html
|
||||
func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
|
||||
@@ -2,16 +2,17 @@ package s3api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"modernc.org/strutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"modernc.org/strutil"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
@@ -170,8 +171,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
|
||||
|
||||
rangeHeader := r.Header.Get("x-amz-copy-source-range")
|
||||
|
||||
dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
|
||||
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
|
||||
dstUrl := s3a.genPartUploadUrl(dstBucket, uploadID, partID)
|
||||
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
|
||||
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
@@ -247,8 +248,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
|
||||
|
||||
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
|
||||
|
||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
|
||||
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
|
||||
uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID)
|
||||
|
||||
if partID == 1 && r.Header.Get("Content-Type") == "" {
|
||||
dataReader = mimeDetect(r, dataReader)
|
||||
@@ -271,6 +271,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
|
||||
return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
|
||||
return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
|
||||
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
|
||||
}
|
||||
|
||||
// Generate uploadID hash string from object
|
||||
func (s3a *S3ApiServer) generateUploadID(object string) string {
|
||||
if strings.HasPrefix(object, "/") {
|
||||
|
||||
@@ -374,7 +374,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
|
||||
}
|
||||
if cursor.maxKeys <= 0 {
|
||||
cursor.isTruncated = true
|
||||
return
|
||||
continue
|
||||
}
|
||||
entry := resp.Entry
|
||||
nextMarker = entry.Name
|
||||
|
||||
@@ -91,6 +91,7 @@ type FilerServer struct {
|
||||
secret security.SigningKey
|
||||
filer *filer.Filer
|
||||
filerGuard *security.Guard
|
||||
volumeGuard *security.Guard
|
||||
grpcDialOption grpc.DialOption
|
||||
|
||||
// metrics read from the master
|
||||
@@ -113,6 +114,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
||||
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
|
||||
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
|
||||
|
||||
volumeSigningKey := v.GetString("jwt.signing.key")
|
||||
v.SetDefault("jwt.signing.expires_after_seconds", 10)
|
||||
volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
|
||||
|
||||
volumeReadSigningKey := v.GetString("jwt.signing.read.key")
|
||||
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
|
||||
volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
|
||||
|
||||
v.SetDefault("cors.allowed_origins.values", "*")
|
||||
|
||||
allowedOrigins := v.GetString("cors.allowed_origins.values")
|
||||
@@ -145,6 +154,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
|
||||
fs.filer.Cipher = option.Cipher
|
||||
// we do not support IP whitelist right now
|
||||
fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
|
||||
fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec)
|
||||
|
||||
fs.checkWithMaster()
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package weed_server
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
"io"
|
||||
@@ -20,6 +21,26 @@ func init() {
|
||||
}}
|
||||
}
|
||||
|
||||
func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
|
||||
encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
|
||||
|
||||
if encodedJwt == "" {
|
||||
return
|
||||
}
|
||||
|
||||
r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
|
||||
}
|
||||
|
||||
func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrite bool) string {
|
||||
var encodedJwt security.EncodedJwt
|
||||
if isWrite {
|
||||
encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.SigningKey, fs.volumeGuard.ExpiresAfterSec, fileId)
|
||||
} else {
|
||||
encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId)
|
||||
}
|
||||
return string(encodedJwt)
|
||||
}
|
||||
|
||||
func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
|
||||
|
||||
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
@@ -261,7 +262,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
||||
streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
|
||||
if err != nil {
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
|
||||
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
|
||||
@@ -277,3 +278,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (fs *FilerServer) maybeGetVolumeReadJwtAuthorizationToken(fileId string) string {
|
||||
return string(security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId))
|
||||
}
|
||||
|
||||
@@ -97,6 +97,9 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
|
||||
continue
|
||||
}
|
||||
dn := dnList.Head()
|
||||
if dn == nil {
|
||||
continue
|
||||
}
|
||||
var replicas []*master_pb.Location
|
||||
for _, r := range dnList.Rest() {
|
||||
replicas = append(replicas, &master_pb.Location{
|
||||
|
||||
@@ -149,7 +149,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
|
||||
} else {
|
||||
ms.maybeAddJwtAuthorization(w, fid, true)
|
||||
dn := dnList.Head()
|
||||
|
||||
if dn == nil {
|
||||
continue
|
||||
}
|
||||
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -315,8 +315,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
||||
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
|
||||
|
||||
if !takeAction {
|
||||
// adjust free volume count
|
||||
dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
|
||||
// adjust volume count
|
||||
dst.dataNode.DiskInfos[replica.info.DiskType].VolumeCount++
|
||||
break
|
||||
}
|
||||
|
||||
@@ -349,8 +349,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
||||
return err
|
||||
}
|
||||
|
||||
// adjust free volume count
|
||||
dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
|
||||
// adjust volume count
|
||||
dst.dataNode.DiskInfos[replica.info.DiskType].VolumeCount++
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,7 +241,13 @@ var (
|
||||
Name: "request_total",
|
||||
Help: "Counter of s3 requests.",
|
||||
}, []string{"type", "code", "bucket"})
|
||||
|
||||
S3HandlerCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: "s3",
|
||||
Name: "handler_total",
|
||||
Help: "Counter of s3 server handlers.",
|
||||
}, []string{"type"})
|
||||
S3RequestHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: Namespace,
|
||||
@@ -292,6 +298,7 @@ func init() {
|
||||
Gather.MustRegister(VolumeServerResourceGauge)
|
||||
|
||||
Gather.MustRegister(S3RequestCounter)
|
||||
Gather.MustRegister(S3HandlerCounter)
|
||||
Gather.MustRegister(S3RequestHistogram)
|
||||
Gather.MustRegister(S3TimeToFirstByteHistogram)
|
||||
}
|
||||
|
||||
@@ -40,6 +40,17 @@ const (
|
||||
ErrorReadInternal = "read.internal.error"
|
||||
ErrorWriteEntry = "write.entry.failed"
|
||||
RepeatErrorUploadContent = "upload.content.repeat.failed"
|
||||
ErrorChunkAssign = "chunkAssign.failed"
|
||||
ErrorReadCache = "read.cache.failed"
|
||||
ErrorReadStream = "read.stream.failed"
|
||||
|
||||
// s3 handler
|
||||
ErrorCompletedNoSuchUpload = "errorCompletedNoSuchUpload"
|
||||
ErrorCompleteEntityTooSmall = "errorCompleteEntityTooSmall"
|
||||
ErrorCompletedPartEmpty = "errorCompletedPartEmpty"
|
||||
ErrorCompletedPartNumber = "errorCompletedPartNumber"
|
||||
ErrorCompletedPartNotFound = "errorCompletedPartNotFound"
|
||||
ErrorCompletedEtagInvalid = "errorCompletedEtagInvalid"
|
||||
ErrorCompletedEtagMismatch = "errorCompletedEtagMismatch"
|
||||
ErrorCompletedPartEntryMismatch = "errorCompletedPartEntryMismatch"
|
||||
)
|
||||
|
||||
@@ -81,13 +81,20 @@ func (df *DiskFile) Close() error {
|
||||
if df.File == nil {
|
||||
return nil
|
||||
}
|
||||
if err := df.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := df.File.Close(); err != nil {
|
||||
return err
|
||||
err := df.Sync()
|
||||
var err1 error
|
||||
if df.File != nil {
|
||||
// always try to close
|
||||
err1 = df.File.Close()
|
||||
}
|
||||
// assume closed
|
||||
df.File = nil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
||||
@@ -419,11 +420,13 @@ func TestPickForWrite(t *testing.T) {
|
||||
Rack: "",
|
||||
DataNode: "",
|
||||
}
|
||||
v := util.GetViper()
|
||||
v.Set("master.volume_growth.threshold", 0.9)
|
||||
for _, rpStr := range []string{"001", "010", "100"} {
|
||||
rp, _ := super_block.NewReplicaPlacementFromString(rpStr)
|
||||
vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType)
|
||||
volumeGrowOption.ReplicaPlacement = rp
|
||||
for _, dc := range []string{"", "dc1", "dc2", "dc3"} {
|
||||
for _, dc := range []string{"", "dc1", "dc2", "dc3", "dc0"} {
|
||||
volumeGrowOption.DataCenter = dc
|
||||
for _, r := range []string{""} {
|
||||
volumeGrowOption.Rack = r
|
||||
@@ -432,8 +435,13 @@ func TestPickForWrite(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
volumeGrowOption.DataNode = dn
|
||||
fileId, count, _, _, err := topo.PickForWrite(1, volumeGrowOption, vl)
|
||||
if err != nil {
|
||||
fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl)
|
||||
if dc == "dc0" {
|
||||
if err == nil || count != 0 || !shouldGrow {
|
||||
fmt.Println(dc, r, dn, "pick for write should be with error")
|
||||
t.Fail()
|
||||
}
|
||||
} else if err != nil {
|
||||
fmt.Println(dc, r, dn, "pick for write error :", err)
|
||||
t.Fail()
|
||||
} else if count == 0 {
|
||||
@@ -442,6 +450,9 @@ func TestPickForWrite(t *testing.T) {
|
||||
} else if len(fileId) == 0 {
|
||||
fmt.Println(dc, r, dn, "pick for write file id is empty")
|
||||
t.Fail()
|
||||
} else if shouldGrow {
|
||||
fmt.Println(dc, r, dn, "pick for write error : not should grow")
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,13 +234,18 @@ func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) {
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) isAllWritable(vid needle.VolumeId) bool {
|
||||
for _, dn := range vl.vid2location[vid].list {
|
||||
if v, getError := dn.GetVolumesById(vid); getError == nil {
|
||||
if v.ReadOnly {
|
||||
return false
|
||||
if location, ok := vl.vid2location[vid]; ok {
|
||||
for _, dn := range location.list {
|
||||
if v, getError := dn.GetVolumesById(vid); getError == nil {
|
||||
if v.ReadOnly {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -301,7 +306,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
|
||||
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
|
||||
shouldGrow = true
|
||||
}
|
||||
return vid, count, locationList, shouldGrow, nil
|
||||
return vid, count, locationList.Copy(), shouldGrow, nil
|
||||
}
|
||||
return 0, 0, nil, shouldGrow, errors.New("Strangely vid " + vid.String() + " is on no machine!")
|
||||
}
|
||||
@@ -336,7 +341,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
|
||||
return
|
||||
}
|
||||
}
|
||||
return vid, count, locationList, shouldGrow, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode)
|
||||
return vid, count, locationList, true, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode)
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) HasGrowRequest() bool {
|
||||
|
||||
@@ -28,6 +28,9 @@ func (dnll *VolumeLocationList) Copy() *VolumeLocationList {
|
||||
|
||||
func (dnll *VolumeLocationList) Head() *DataNode {
|
||||
//mark first node as master volume
|
||||
if dnll.Length() == 0 {
|
||||
return nil
|
||||
}
|
||||
return dnll.list[0]
|
||||
}
|
||||
|
||||
|
||||
@@ -53,11 +53,15 @@ func Post(url string, values url.Values) ([]byte, error) {
|
||||
// github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
|
||||
// may need increasing http.Client.Timeout
|
||||
func Get(url string) ([]byte, bool, error) {
|
||||
return GetAuthenticated(url, "")
|
||||
}
|
||||
|
||||
func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
|
||||
request, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
maybeAddAuth(request, jwt)
|
||||
request.Header.Add("Accept-Encoding", "gzip")
|
||||
|
||||
response, err := client.Do(request)
|
||||
@@ -101,11 +105,15 @@ func Head(url string) (http.Header, error) {
|
||||
return r.Header, nil
|
||||
}
|
||||
|
||||
func Delete(url string, jwt string) error {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
func maybeAddAuth(req *http.Request, jwt string) {
|
||||
if jwt != "" {
|
||||
req.Header.Set("Authorization", "BEARER "+string(jwt))
|
||||
}
|
||||
}
|
||||
|
||||
func Delete(url string, jwt string) error {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
maybeAddAuth(req, jwt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -133,9 +141,7 @@ func Delete(url string, jwt string) error {
|
||||
|
||||
func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if jwt != "" {
|
||||
req.Header.Set("Authorization", "BEARER "+string(jwt))
|
||||
}
|
||||
maybeAddAuth(req, jwt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -193,9 +199,7 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
if len(jwt) > 0 {
|
||||
req.Header.Set("Authorization", "BEARER "+jwt)
|
||||
}
|
||||
maybeAddAuth(req, jwt)
|
||||
|
||||
response, err := client.Do(req)
|
||||
if err != nil {
|
||||
@@ -229,7 +233,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
|
||||
|
||||
if cipherKey != nil {
|
||||
var n int
|
||||
_, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
|
||||
_, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
|
||||
n = copy(buf, data)
|
||||
})
|
||||
return int64(n), err
|
||||
@@ -298,11 +302,16 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
|
||||
}
|
||||
|
||||
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
|
||||
return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
||||
}
|
||||
|
||||
func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
|
||||
if cipherKey != nil {
|
||||
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
||||
return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", fileUrl, nil)
|
||||
maybeAddAuth(req, jwt)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -354,8 +363,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
|
||||
|
||||
}
|
||||
|
||||
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
||||
encryptedData, retryable, err := Get(fileUrl)
|
||||
func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
||||
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
|
||||
if err != nil {
|
||||
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
|
||||
}
|
||||
@@ -392,9 +401,7 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt
|
||||
req.Header.Add("Accept-Encoding", "gzip")
|
||||
}
|
||||
|
||||
if len(jwt) > 0 {
|
||||
req.Header.Set("Authorization", "BEARER "+jwt)
|
||||
}
|
||||
maybeAddAuth(req, jwt)
|
||||
|
||||
r, err := client.Do(req)
|
||||
if err != nil {
|
||||
|
||||
@@ -26,7 +26,7 @@ type SkipList struct {
|
||||
// elementCount int
|
||||
}
|
||||
|
||||
// NewSeedEps returns a new empty, initialized Skiplist.
|
||||
// NewSeed returns a new empty, initialized Skiplist.
|
||||
// Given a seed, a deterministic height/list behaviour can be achieved.
|
||||
// Eps is used to compare keys given by the ExtractKey() function on equality.
|
||||
func NewSeed(seed int64, listStore ListStore) *SkipList {
|
||||
|
||||
Reference in New Issue
Block a user