Merge branch 'master' into mq-subscribe

This commit is contained in:
chrislu
2024-02-25 08:00:37 -08:00
41 changed files with 299 additions and 266 deletions

View File

@@ -21,6 +21,7 @@ type ReaderCache struct {
}
type SingleChunkCacher struct {
completedTimeNew int64
sync.Mutex
parent *ReaderCache
chunkFileId string
@@ -32,7 +33,6 @@ type SingleChunkCacher struct {
shouldCache bool
wg sync.WaitGroup
cacheStartedCh chan struct{}
completedTimeNew int64
}
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {

View File

@@ -123,13 +123,6 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
return
}
if s3a.iam.isEnabled() {
if _, errCode = s3a.iam.authRequest(r, s3_constants.ACTION_ADMIN); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
}
fn := func(entry *filer_pb.Entry) {
if identityId := r.Header.Get(s3_constants.AmzIdentityId); identityId != "" {
if entry.Extended == nil {

View File

@@ -15,6 +15,7 @@ import (
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/policy"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
@@ -123,6 +124,18 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
r.Header.Set("Content-Type", contentType)
// Add s3 postpolicy support header
for k, _ := range formValues {
if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
r.Header.Set(k, formValues.Get(k))
continue
}
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
r.Header.Set(k, formValues.Get(k))
}
}
etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket)
if errCode != s3err.ErrNone {

View File

@@ -276,9 +276,10 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET"))
// PutBucket
bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT"))
bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketHandler, ACTION_ADMIN)), "PUT"))
// DeleteBucket
bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE"))
bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_ADMIN)), "DELETE"))
// ListObjectsV1 (Legacy)
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))

View File

@@ -213,14 +213,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
if err != nil {
glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
httpStatus := http.StatusInternalServerError
if err == filer_pb.ErrNotFound {
httpStatus = http.StatusNoContent
writeJsonQuiet(w, r, httpStatus, nil)
writeJsonQuiet(w, r, http.StatusNoContent, nil)
return
}
writeJsonError(w, r, httpStatus, err)
glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}

View File

@@ -50,7 +50,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
}
if err != nil {
if strings.HasPrefix(err.Error(), "read input:") || err.Error() == io.ErrUnexpectedEOF.Error() {
writeJsonError(w, r, 499, err)
writeJsonError(w, r, util.HttpStatusCancelled, err)
} else if strings.HasSuffix(err.Error(), "is a file") || strings.HasSuffix(err.Error(), "already exists") {
writeJsonError(w, r, http.StatusConflict, err)
} else {

View File

@@ -53,7 +53,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
select {
case <-r.Context().Done():
glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err())
w.WriteHeader(http.StatusInternalServerError)
w.WriteHeader(util.HttpStatusCancelled)
vs.inFlightDownloadDataLimitCond.L.Unlock()
return
default:

View File

@@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/util"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"io"
"math"
@@ -137,7 +138,7 @@ type ItemEntry struct {
path util.FullPath
}
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) {
timeNowAtSec := time.Now().Unix()
return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
@@ -160,19 +161,24 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo
return nil
},
func(outputChan chan interface{}) {
var wg sync.WaitGroup
itemErrCount := atomic.NewUint64(0)
for itemEntry := range outputChan {
i := itemEntry.(*ItemEntry)
itemPath := string(i.path)
fileMsg := fmt.Sprintf("file:%s", itemPath)
errItem := make(map[string]error)
errItemLock := sync.RWMutex{}
itemIsVerifed := atomic.NewBool(true)
for _, chunk := range i.chunks {
if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
for _, volumeServer := range volumeIds {
if *c.concurrency == 0 {
if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil {
fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n",
fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
fileMsg, chunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
}
continue
}
@@ -180,43 +186,48 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo
waitChan, ok := c.waitChan[string(volumeServer)]
c.waitChanLock.RUnlock()
if !ok {
fmt.Fprintf(c.writer, "%s failed to get channel for %s chunk: %d:%d: %+v\n",
string(volumeServer), fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s: %+v\n",
string(volumeServer), fileMsg, chunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
continue
}
wg.Add(1)
waitChan <- struct{}{}
go func(fId *filer_pb.FileId, path string, volumeServer pb.ServerAddress, msg string) {
if err = c.verifyEntry(volumeServer, fId); err != nil {
errItemLock.Lock()
errItem[path] = err
fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n",
msg, fId.VolumeId, fId.FileKey, err)
errItemLock.Unlock()
go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
defer wg.Done()
if err = c.verifyEntry(volumeServer, fChunk.Fid); err != nil {
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
msg, fChunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
}
<-waitChan
}(chunk.Fid, itemPath, volumeServer, fileMsg)
}(chunk, itemPath, volumeServer, fileMsg)
}
} else {
err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
fmt.Fprintf(c.writer, "%s %d:%d: %+v\n",
fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
fileMsg, chunk.GetFileIdString(), err)
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
itemErrCount.Add(1)
}
break
}
}
errItemLock.RLock()
err, _ = errItem[itemPath]
errItemLock.RUnlock()
if err != nil {
errCount++
continue
if itemIsVerifed.Load() {
if *c.verbose {
fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks))
}
fileCount++
}
if *c.verbose {
fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks))
}
fileCount++
}
wg.Wait()
errCount = itemErrCount.Load()
})
}

File diff suppressed because one or more lines are too long

View File

@@ -301,7 +301,7 @@ func getSystemInfo() (_SYSTEM_INFO, error) {
// PSIZE_T lpMinimumWorkingSetSize,
// PSIZE_T lpMaximumWorkingSetSize
// );
// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getprocessworkingsetsize
// https://learn.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-getprocessworkingsetsize
func getProcessWorkingSetSize(process uintptr, dwMinWorkingSet *uint64, dwMaxWorkingSet *uint64) error {
r1, _, err := syscall.Syscall(procGetProcessWorkingSetSize.Addr(), 3, process, uintptr(unsafe.Pointer(dwMinWorkingSet)), uintptr(unsafe.Pointer(dwMaxWorkingSet)))
@@ -318,7 +318,7 @@ func getProcessWorkingSetSize(process uintptr, dwMinWorkingSet *uint64, dwMaxWor
// SIZE_T dwMinimumWorkingSetSize,
// SIZE_T dwMaximumWorkingSetSize
// );
// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setprocessworkingsetsize
// https://learn.microsoft.com/en-us/windows/win32/api/memoryapi/nf-memoryapi-setprocessworkingsetsize
func setProcessWorkingSetSize(process uintptr, dwMinWorkingSet uint64, dwMaxWorkingSet uint64) error {
r1, _, err := syscall.Syscall(procSetProcessWorkingSetSize.Addr(), 3, process, uintptr(dwMinWorkingSet), uintptr(dwMaxWorkingSet))

View File

@@ -118,12 +118,8 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
return n.AppendAtNs, nil
}
if fileSize > fileTailOffset {
glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
err = datFile.Truncate(fileTailOffset)
if err == nil {
return n.AppendAtNs, nil
}
return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
glog.Warningf("data file %s actual %d bytes expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
return n.AppendAtNs, fmt.Errorf("data file %s actual %d bytes expected %d bytes", datFile.Name(), fileSize, fileTailOffset)
}
glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
}

View File

@@ -423,7 +423,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil {
return err
}
defer dstDatBackend.Close()
defer func() {
dstDatBackend.Sync()
dstDatBackend.Close()
}()
oldNm := needle_map.NewMemDb()
defer oldNm.Close()
@@ -484,7 +487,20 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
if err != nil {
return err
}
dstDatSize, _, err := dstDatBackend.GetStat()
if err != nil {
return err
}
if v.nm.ContentSize() > v.nm.DeletedSize() {
expectedContentSize := v.nm.ContentSize() - v.nm.DeletedSize()
if expectedContentSize > uint64(dstDatSize) {
return fmt.Errorf("volume %s unexpected new data size: %d does not match size of content minus deleted: %d",
v.Id.String(), dstDatSize, expectedContentSize)
}
} else {
glog.Warningf("volume %s content size: %d less deleted size: %d, new size: %d",
v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize)
}
err = newNm.SaveToIdx(datIdxName)
if err != nil {
return err

View File

@@ -4,6 +4,8 @@ import (
"fmt"
)
const HttpStatusCancelled = 499
var (
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.62)
VERSION = sizeLimit + " " + VERSION_NUMBER