Merge pull request #855 from chrislusf/add_jwt

Add jwt
This commit is contained in:
Chris Lu
2019-02-18 15:05:32 -08:00
committed by GitHub
70 changed files with 792 additions and 565 deletions

View File

@@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"sync"
"github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -164,6 +165,7 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
var fileId, host string
var auth security.EncodedJwt
if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
@@ -181,7 +183,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
return err
}
fileId, host = resp.FileId, resp.Url
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
return nil
}); err != nil {
@@ -190,7 +192,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
bufReader := bytes.NewReader(buf)
uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err)

View File

@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -230,7 +231,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
})
}
func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error {
func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
var vids []string
for _, fileId := range fileIds {
@@ -267,7 +268,7 @@ func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, file
return m, err
}
_, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
_, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
return err
}

View File

@@ -19,6 +19,7 @@ import (
type Option struct {
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
@@ -46,8 +47,6 @@ type WFS struct {
pathToHandleLock sync.Mutex
bufPool sync.Pool
fileIdsDeletionChan chan []string
stats statsCache
}
type statsCache struct {
@@ -65,11 +64,8 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return make([]byte, option.ChunkSizeLimit)
},
},
fileIdsDeletionChan: make(chan []string, 32),
}
go wfs.loopProcessingDeletion()
return wfs
}
@@ -82,7 +78,7 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
}

View File

@@ -2,39 +2,9 @@ package filesys
import (
"context"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (wfs *WFS) loopProcessingDeletion() {
ticker := time.NewTicker(2 * time.Second)
wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var fileIds []string
for {
select {
case fids := <-wfs.fileIdsDeletionChan:
fileIds = append(fileIds, fids...)
if len(fileIds) >= 1024 {
glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
deleteFileIds(context.Background(), client, fileIds)
fileIds = fileIds[:0]
}
case <-ticker.C:
if len(fileIds) > 0 {
glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
deleteFileIds(context.Background(), client, fileIds)
fileIds = fileIds[:0]
}
}
}
})
}
func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
if len(chunks) == 0 {
return
@@ -45,14 +15,8 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
fileIds = append(fileIds, chunk.FileId)
}
var async = false
if async {
wfs.fileIdsDeletionChan <- fileIds
return
}
wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
deleteFileIds(context.Background(), client, fileIds)
deleteFileIds(context.Background(), wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}