This commit is contained in:
Chris Lu
2019-11-27 03:09:42 -08:00
parent 67e5f5b558
commit 0f9ba84274
10 changed files with 571 additions and 176 deletions

View File

@@ -2,6 +2,7 @@ package backend
import (
"io"
"os"
"time"
)
@@ -12,6 +13,7 @@ type DataStorageBackend interface {
io.Closer
GetStat() (datSize int64, modTime time.Time, err error)
String() string
Instantiate(src *os.File) error
}
var (

View File

@@ -48,3 +48,7 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
func (df *DiskFile) String() string {
return df.fullFilePath
}
func (df *DiskFile) Instantiate(src *os.File) error {
panic("should not implement Instantiate for DiskFile")
}

View File

@@ -58,3 +58,7 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er
func (mmf *MemoryMappedFile) String() string {
return mmf.mm.File.Name()
}
func (mmf *MemoryMappedFile) Instantiate(src *os.File) error {
panic("should not implement Instantiate for MemoryMappedFile")
}

View File

@@ -2,6 +2,7 @@ package s3_backend
import (
"fmt"
"os"
"strings"
"time"
@@ -25,7 +26,6 @@ type S3Backend struct {
conn s3iface.S3API
region string
bucket string
dir string
vid needle.VolumeId
key string
}
@@ -84,11 +84,11 @@ func (s3backend *S3Backend) GetName() string {
return "s3"
}
func (s3backend *S3Backend) GetSinkToDirectory() string {
return s3backend.dir
func (s3backend S3Backend) Instantiate(src *os.File) error {
panic("implement me")
}
func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error {
glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
@@ -98,20 +98,19 @@ func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid nee
configuration.GetString("aws_secret_access_key"),
configuration.GetString("region"),
configuration.GetString("bucket"),
configuration.GetString("directory"),
prefix,
vid,
)
}
func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
vid needle.VolumeId) (err error) {
func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string,
prefix string, vid needle.VolumeId) (err error) {
s3backend.region = region
s3backend.bucket = bucket
s3backend.dir = dir
s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
s3backend.vid = vid
s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid)
if strings.HasPrefix(s3backend.key, "/") {
s3backend.key = s3backend.key[1:]
}

View File

@@ -0,0 +1,56 @@
package s3_backend
import (
"fmt"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string) error {
//open the file
f, err := os.Open(filename)
if err != nil {
return fmt.Errorf("failed to open file %q, %v", filename, err)
}
defer f.Close()
info, err := f.Stat()
if err != nil {
return fmt.Errorf("failed to stat file %q, %v", filename, err)
}
fileSize := info.Size()
partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
for partSize*1000 < fileSize {
partSize *= 4
}
// Create an uploader with the session and custom options
uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
u.PartSize = partSize
u.Concurrency = 15 // default is 15
})
// Upload the file to S3.
result, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(destBucket),
Key: aws.String(destKey),
Body: f,
ACL: aws.String("private"),
ServerSideEncryption: aws.String("AES256"),
StorageClass: aws.String("STANDARD_IA"),
})
//in case it fails to upload
if err != nil {
return fmt.Errorf("failed to upload file, %v", err)
}
fmt.Printf("file %s uploaded to %s\n", filename, result.Location)
return nil
}