Merge branch 'master' into support_ssd_volume
This commit is contained in:
@@ -17,9 +17,10 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
|
||||
}
|
||||
|
||||
var writeErr error
|
||||
var shouldRetry bool
|
||||
|
||||
for _, fileUrl := range fileUrls {
|
||||
_, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
|
||||
writeErr = writeFunc(data)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -30,11 +31,12 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if shouldRetry && err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"google.golang.org/grpc"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
|
||||
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
|
||||
return nil
|
||||
}
|
||||
newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
|
||||
var dateKey string
|
||||
if r.sink.GetName() == "local_incremental" {
|
||||
var mTime int64
|
||||
if message.NewEntry != nil {
|
||||
mTime = message.NewEntry.Attributes.Mtime
|
||||
} else if message.OldEntry != nil {
|
||||
mTime = message.OldEntry.Attributes.Mtime
|
||||
}
|
||||
dateKey = time.Unix(mTime, 0).Format("2006-01-02")
|
||||
}
|
||||
newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):])
|
||||
glog.V(3).Infof("replicate %s => %s", key, newKey)
|
||||
key = newKey
|
||||
if message.OldEntry != nil && message.NewEntry == nil {
|
||||
|
||||
@@ -30,6 +30,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
|
||||
replicatedChunk, e := fs.replicateOneChunk(chunk, path)
|
||||
if e != nil {
|
||||
err = e
|
||||
return
|
||||
}
|
||||
replicatedChunks[index] = replicatedChunk
|
||||
}(sourceChunk, chunkIndex)
|
||||
@@ -98,6 +99,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
|
||||
}
|
||||
|
||||
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
|
||||
if fs.writeChunkByFiler {
|
||||
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ package filersink
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
@@ -17,15 +19,17 @@ import (
|
||||
)
|
||||
|
||||
type FilerSink struct {
|
||||
filerSource *source.FilerSource
|
||||
grpcAddress string
|
||||
dir string
|
||||
replication string
|
||||
collection string
|
||||
ttlSec int32
|
||||
filerSource *source.FilerSource
|
||||
grpcAddress string
|
||||
dir string
|
||||
replication string
|
||||
collection string
|
||||
ttlSec int32
|
||||
diskType string
|
||||
dataCenter string
|
||||
grpcDialOption grpc.DialOption
|
||||
dataCenter string
|
||||
grpcDialOption grpc.DialOption
|
||||
address string
|
||||
writeChunkByFiler bool
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -42,21 +46,27 @@ func (fs *FilerSink) GetSinkToDirectory() string {
|
||||
|
||||
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
|
||||
return fs.DoInitialize(
|
||||
"",
|
||||
configuration.GetString(prefix+"grpcAddress"),
|
||||
configuration.GetString(prefix+"directory"),
|
||||
configuration.GetString(prefix+"replication"),
|
||||
configuration.GetString(prefix+"collection"),
|
||||
configuration.GetInt(prefix+"ttlSec"),
|
||||
configuration.GetString(prefix+"disk"),
|
||||
security.LoadClientTLS(util.GetViper(), "grpc.client"))
|
||||
security.LoadClientTLS(util.GetViper(), "grpc.client"),
|
||||
false)
|
||||
}
|
||||
|
||||
func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
|
||||
fs.filerSource = s
|
||||
}
|
||||
|
||||
func (fs *FilerSink) DoInitialize(grpcAddress string, dir string,
|
||||
replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption) (err error) {
|
||||
func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
|
||||
replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
|
||||
fs.address = address
|
||||
if fs.address == "" {
|
||||
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
|
||||
}
|
||||
fs.grpcAddress = grpcAddress
|
||||
fs.dir = dir
|
||||
fs.replication = replication
|
||||
@@ -64,6 +74,7 @@ func (fs *FilerSink) DoInitialize(grpcAddress string, dir string,
|
||||
fs.ttlSec = int32(ttlSec)
|
||||
fs.diskType = diskType
|
||||
fs.grpcDialOption = grpcDialOption
|
||||
fs.writeChunkByFiler = writeChunkByFiler
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -209,7 +220,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
|
||||
})
|
||||
|
||||
}
|
||||
func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
|
||||
func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
|
||||
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
|
||||
if aErr != nil {
|
||||
return nil, nil, aErr
|
||||
|
||||
17
weed/replication/sink/localsink/local_incremental_sink.go
Normal file
17
weed/replication/sink/localsink/local_incremental_sink.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package localsink
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/replication/sink"
|
||||
)
|
||||
|
||||
type LocalIncSink struct {
|
||||
LocalSink
|
||||
}
|
||||
|
||||
func (localincsink *LocalIncSink) GetName() string {
|
||||
return "local_incremental"
|
||||
}
|
||||
|
||||
func init() {
|
||||
sink.Sinks = append(sink.Sinks, &LocalIncSink{})
|
||||
}
|
||||
101
weed/replication/sink/localsink/local_sink.go
Normal file
101
weed/replication/sink/localsink/local_sink.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package localsink
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/replication/repl_util"
|
||||
"github.com/chrislusf/seaweedfs/weed/replication/sink"
|
||||
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type LocalSink struct {
|
||||
Dir string
|
||||
filerSource *source.FilerSource
|
||||
}
|
||||
|
||||
func init() {
|
||||
sink.Sinks = append(sink.Sinks, &LocalSink{})
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
|
||||
localsink.filerSource = s
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) GetName() string {
|
||||
return "local"
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) isMultiPartEntry(key string) bool {
|
||||
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) initialize(dir string) error {
|
||||
localsink.Dir = dir
|
||||
return nil
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
|
||||
dir := configuration.GetString(prefix + "directory")
|
||||
glog.V(4).Infof("sink.local.directory: %v", dir)
|
||||
return localsink.initialize(dir)
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) GetSinkToDirectory() string {
|
||||
return localsink.Dir
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
|
||||
if localsink.isMultiPartEntry(key) {
|
||||
return nil
|
||||
}
|
||||
glog.V(4).Infof("Delete Entry key: %s", key)
|
||||
if err := os.Remove(key); err != nil {
|
||||
glog.V(0).Infof("remove entry key %s: %s", key, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
|
||||
if entry.IsDirectory || localsink.isMultiPartEntry(key) {
|
||||
return nil
|
||||
}
|
||||
glog.V(4).Infof("Create Entry key: %s", key)
|
||||
|
||||
totalSize := filer.FileSize(entry)
|
||||
chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
|
||||
|
||||
dir := filepath.Dir(key)
|
||||
|
||||
if _, err := os.Stat(dir); os.IsNotExist(err) {
|
||||
glog.V(4).Infof("Create Direcotry key: %s", dir)
|
||||
if err = os.MkdirAll(dir, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
writeFunc := func(data []byte) error {
|
||||
writeErr := ioutil.WriteFile(key, data, 0)
|
||||
return writeErr
|
||||
}
|
||||
|
||||
if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
|
||||
if localsink.isMultiPartEntry(key) {
|
||||
return true, nil
|
||||
}
|
||||
glog.V(4).Infof("Update Entry key: %s", key)
|
||||
// do delete and create
|
||||
return false, nil
|
||||
}
|
||||
@@ -25,19 +25,28 @@ type FilerSource struct {
|
||||
grpcAddress string
|
||||
grpcDialOption grpc.DialOption
|
||||
Dir string
|
||||
address string
|
||||
proxyByFiler bool
|
||||
}
|
||||
|
||||
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
|
||||
return fs.DoInitialize(
|
||||
"",
|
||||
configuration.GetString(prefix+"grpcAddress"),
|
||||
configuration.GetString(prefix+"directory"),
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) {
|
||||
func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) {
|
||||
fs.address = address
|
||||
if fs.address == "" {
|
||||
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
|
||||
}
|
||||
fs.grpcAddress = grpcAddress
|
||||
fs.Dir = dir
|
||||
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
|
||||
fs.proxyByFiler = readChunkFromFiler
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -81,9 +90,13 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
|
||||
return
|
||||
}
|
||||
|
||||
func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
|
||||
func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
|
||||
|
||||
fileUrls, err := fs.LookupFileId(part)
|
||||
if fs.proxyByFiler {
|
||||
return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId)
|
||||
}
|
||||
|
||||
fileUrls, err := fs.LookupFileId(fileId)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
|
||||
func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
|
||||
|
||||
// receive message
|
||||
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
|
||||
|
||||
@@ -2,13 +2,17 @@ package sub
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/streadway/amqp"
|
||||
"gocloud.dev/pubsub"
|
||||
_ "gocloud.dev/pubsub/awssnssqs"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
// _ "gocloud.dev/pubsub/azuresb"
|
||||
_ "gocloud.dev/pubsub/gcppubsub"
|
||||
_ "gocloud.dev/pubsub/natspubsub"
|
||||
@@ -19,6 +23,55 @@ func init() {
|
||||
NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
|
||||
}
|
||||
|
||||
func getPath(rawUrl string) string {
|
||||
parsedUrl, _ := url.Parse(rawUrl)
|
||||
return path.Join(parsedUrl.Host, parsedUrl.Path)
|
||||
}
|
||||
|
||||
func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl string) error {
|
||||
exchangeName := getPath(exchangeUrl)
|
||||
queueName := getPath(queueUrl)
|
||||
exchangeNameDLX := "DLX." + exchangeName
|
||||
queueNameDLX := "DLX." + queueName
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
if err := ch.ExchangeDeclare(
|
||||
exchangeNameDLX, "fanout", false, false, false, false, nil); err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
if err := ch.ExchangeDeclare(
|
||||
exchangeName, "fanout", false, false, false, false, nil); err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
if _, err := ch.QueueDeclare(
|
||||
queueName, false, false, false, false,
|
||||
amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
if _, err := ch.QueueDeclare(
|
||||
queueNameDLX, false, false, false, false,
|
||||
amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
|
||||
glog.Error(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type GoCDKPubSubInput struct {
|
||||
sub *pubsub.Subscription
|
||||
}
|
||||
@@ -28,23 +81,65 @@ func (k *GoCDKPubSubInput) GetName() string {
|
||||
}
|
||||
|
||||
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
|
||||
topicUrl := configuration.GetString(prefix + "topic_url")
|
||||
subURL := configuration.GetString(prefix + "sub_url")
|
||||
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
|
||||
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var conn *amqp.Connection
|
||||
if sub.As(&conn) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
_, err = ch.QueueInspect(getPath(subURL))
|
||||
if err != nil {
|
||||
if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
|
||||
if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
k.sub = sub
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
|
||||
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
|
||||
msg, err := k.sub.Receive(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
onFailureFn = func() {
|
||||
if msg.Nackable() {
|
||||
isRedelivered := false
|
||||
var delivery amqp.Delivery
|
||||
if msg.As(&delivery) {
|
||||
isRedelivered = delivery.Redelivered
|
||||
glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
|
||||
}
|
||||
if isRedelivered {
|
||||
if err := delivery.Nack(false, false); err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
} else {
|
||||
msg.Nack()
|
||||
}
|
||||
}
|
||||
}
|
||||
onSuccessFn = func() {
|
||||
msg.Ack()
|
||||
}
|
||||
key = msg.Metadata["key"]
|
||||
message = &filer_pb.EventNotification{}
|
||||
err = proto.Unmarshal(msg.Body, message)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return "", nil, onSuccessFn, onFailureFn, err
|
||||
}
|
||||
return key, message, nil
|
||||
return key, message, onSuccessFn, onFailureFn, nil
|
||||
}
|
||||
|
||||
@@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
|
||||
|
||||
go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
|
||||
k.messageChan <- m
|
||||
m.Ack()
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
|
||||
func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
|
||||
|
||||
m := <-k.messageChan
|
||||
|
||||
onSuccessFn = func() {
|
||||
m.Ack()
|
||||
}
|
||||
onFailureFn = func() {
|
||||
m.Nack()
|
||||
}
|
||||
|
||||
// process the message
|
||||
key = m.Attributes["key"]
|
||||
message = &filer_pb.EventNotification{}
|
||||
|
||||
@@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
|
||||
func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
|
||||
|
||||
msg := <-k.messageChan
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ type NotificationInput interface {
|
||||
GetName() string
|
||||
// Initialize initializes the file store
|
||||
Initialize(configuration util.Configuration, prefix string) error
|
||||
ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
|
||||
ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error)
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user