filer source: support filerProxy mode

This commit is contained in:
Chris Lu
2021-02-28 16:19:47 -08:00
parent 678c54d705
commit 9abb041763
3 changed files with 11 additions and 6 deletions

View File

@@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer var buffer bytes.Buffer
var shouldRetry bool var shouldRetry bool
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry { if !shouldRetry {

View File

@@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input) result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil { if err == nil {
glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) glog.V(1).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else { } else {
glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
} }
return err return nil
} }
// To upload a part // To upload a part
@@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
} }
buf := make([]byte, chunk.Size) buf := make([]byte, chunk.Size)
for _, fileUrl := range fileUrls { for _, fileUrl := range fileUrls {
_, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf) _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
if err != nil { if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err) glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else { } else {

View File

@@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
} }
for _, loc := range locations.Locations { if !fs.proxyByFiler {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part)) for _, loc := range locations.Locations {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part))
}
} else {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
} }
return return