mount: when outside cluster network, use filer as proxy to access volume servers
This commit is contained in:
@@ -147,7 +147,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
|
||||
}
|
||||
}
|
||||
file.entry.Chunks = chunks
|
||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
|
||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
|
||||
file.reader = nil
|
||||
file.wfs.deleteFileChunks(truncatedChunks)
|
||||
}
|
||||
@@ -329,7 +329,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
|
||||
|
||||
func (file *File) setEntry(entry *filer_pb.Entry) {
|
||||
file.entry = entry
|
||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
|
||||
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
|
||||
file.reader = nil
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||
|
||||
var chunkResolveErr error
|
||||
if fh.f.entryViewCache == nil {
|
||||
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
|
||||
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
|
||||
if chunkResolveErr != nil {
|
||||
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
|
||||
}
|
||||
@@ -128,7 +128,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
|
||||
|
||||
if fh.f.reader == nil {
|
||||
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
|
||||
fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
|
||||
fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
|
||||
}
|
||||
|
||||
totalRead, err := fh.f.reader.ReadAt(buff, offset)
|
||||
@@ -269,7 +269,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
|
||||
|
||||
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
|
||||
|
||||
chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
|
||||
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
|
||||
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
|
||||
if manifestErr != nil {
|
||||
// not good, but should be ok
|
||||
|
||||
@@ -3,6 +3,8 @@ package filesys
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
@@ -24,6 +26,7 @@ import (
|
||||
)
|
||||
|
||||
type Option struct {
|
||||
FilerAddress string
|
||||
FilerGrpcAddress string
|
||||
GrpcDialOption grpc.DialOption
|
||||
FilerMountRootPath string
|
||||
@@ -237,3 +240,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
|
||||
}
|
||||
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
|
||||
}
|
||||
|
||||
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
|
||||
if wfs.option.OutsideContainerClusterMode {
|
||||
return func(fileId string) (targetUrls []string, err error) {
|
||||
return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
|
||||
}
|
||||
}
|
||||
return filer.LookupFn(wfs)
|
||||
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
|
||||
fileIds = append(fileIds, chunk.GetFileIdString())
|
||||
continue
|
||||
}
|
||||
dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
|
||||
dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(wfs.LookupFn(), chunk)
|
||||
if manifestResolveErr != nil {
|
||||
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
|
||||
}
|
||||
@@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
|
||||
}
|
||||
for _, loc := range locations.Locations {
|
||||
lr.Locations = append(lr.Locations, operation.Location{
|
||||
Url: wfs.AdjustedUrl(loc),
|
||||
Url: loc.Url,
|
||||
PublicUrl: loc.PublicUrl,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -25,10 +25,3 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
|
||||
if wfs.option.OutsideContainerClusterMode {
|
||||
return location.PublicUrl
|
||||
}
|
||||
return location.Url
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||
Url: resp.Url,
|
||||
PublicUrl: resp.PublicUrl,
|
||||
}
|
||||
host = wfs.AdjustedUrl(loc)
|
||||
host = loc.Url
|
||||
collection, replication = resp.Collection, resp.Replication
|
||||
|
||||
return nil
|
||||
@@ -53,6 +53,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
|
||||
}
|
||||
|
||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||
if wfs.option.OutsideContainerClusterMode {
|
||||
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
|
||||
}
|
||||
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
|
||||
|
||||
Reference in New Issue
Block a user