Merge remote-tracking branch 'origin/master'
This commit is contained in:
@@ -199,8 +199,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
||||
return client.WriteDirectory(dest, message.NewEntry)
|
||||
}
|
||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
||||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
||||
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
@@ -264,9 +263,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
||||
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
|
||||
} else {
|
||||
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
|
||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
|
||||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
|
||||
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
@@ -303,9 +300,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
||||
if message.NewEntry.IsDirectory {
|
||||
return client.WriteDirectory(newDest, message.NewEntry)
|
||||
}
|
||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
|
||||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
|
||||
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
|
||||
@@ -108,8 +108,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
||||
return client.WriteDirectory(dest, message.NewEntry)
|
||||
}
|
||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
||||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
||||
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
@@ -146,9 +145,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
||||
if err := client.DeleteFile(oldDest); err != nil {
|
||||
return err
|
||||
}
|
||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
||||
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
@@ -160,6 +157,20 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
||||
return eachEntryFunc, nil
|
||||
}
|
||||
|
||||
func retriedWriteFile(client remote_storage.RemoteStorageClient, filerSource *source.FilerSource, newEntry *filer_pb.Entry, dest *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
|
||||
var writeErr error
|
||||
err = util.Retry("writeFile", func() error {
|
||||
reader := filer.NewFileReader(filerSource, newEntry)
|
||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||
remoteEntry, writeErr = client.WriteFile(dest, newEntry, reader)
|
||||
if writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
|
||||
// 1. specified by timeAgo
|
||||
// 2. last offset timestamp for this directory
|
||||
|
||||
@@ -61,12 +61,12 @@ connection_max_lifetime_seconds = 0
|
||||
interpolateParams = false
|
||||
# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
|
||||
enableUpsert = true
|
||||
upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
|
||||
upsertQuery = """INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
|
||||
|
||||
[mysql2] # or memsql, tidb
|
||||
enabled = false
|
||||
createTable = """
|
||||
CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
|
||||
CREATE TABLE IF NOT EXISTS `%s` (
|
||||
dirhash BIGINT,
|
||||
name VARCHAR(1000) BINARY,
|
||||
directory TEXT BINARY,
|
||||
@@ -85,7 +85,7 @@ connection_max_lifetime_seconds = 0
|
||||
interpolateParams = false
|
||||
# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
|
||||
enableUpsert = true
|
||||
upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
|
||||
upsertQuery = """INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
|
||||
|
||||
[postgres] # or cockroachdb, YugabyteDB
|
||||
# CREATE TABLE IF NOT EXISTS filemeta (
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -143,9 +142,6 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe
|
||||
var shouldRetry bool
|
||||
var totalWritten int
|
||||
|
||||
rand.Shuffle(len(urlStrings), func(i, j int) {
|
||||
urlStrings[i], urlStrings[j] = urlStrings[j], urlStrings[i]
|
||||
})
|
||||
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
|
||||
for _, urlString := range urlStrings {
|
||||
var localProcesed int
|
||||
|
||||
@@ -177,8 +177,8 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
|
||||
bucket, _ := getBucketAndObject(r)
|
||||
glog.V(3).Infof("HeadBucketHandler %s", bucket)
|
||||
|
||||
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
||||
s3err.WriteErrorResponse(w, r, err)
|
||||
if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || err == filer_pb.ErrNotFound {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -226,7 +226,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
|
||||
if receiveErr == io.EOF {
|
||||
break
|
||||
}
|
||||
if resp.ModifiedTsNs != 0 {
|
||||
if resp!=nil && resp.ModifiedTsNs != 0 {
|
||||
modifiedTsNs = resp.ModifiedTsNs
|
||||
}
|
||||
if receiveErr != nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
@@ -34,11 +35,12 @@ func (c *commandVolumeConfigureReplication) Help() string {
|
||||
`
|
||||
}
|
||||
|
||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
||||
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) {
|
||||
|
||||
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
|
||||
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
|
||||
collectionPattern := configureReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
||||
if err = configureReplicationCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -55,7 +57,6 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
||||
if err != nil {
|
||||
return fmt.Errorf("replication format: %v", err)
|
||||
}
|
||||
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
|
||||
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv)
|
||||
@@ -64,6 +65,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
||||
}
|
||||
|
||||
vid := needle.VolumeId(*volumeIdInt)
|
||||
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
|
||||
|
||||
// find all data nodes with volumes that needs replication change
|
||||
var allLocations []location
|
||||
@@ -71,7 +73,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
||||
loc := newLocation(dc, string(rack), dn)
|
||||
for _, diskInfo := range dn.DiskInfos {
|
||||
for _, v := range diskInfo.VolumeInfos {
|
||||
if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 {
|
||||
if volumeFilter(v) {
|
||||
allLocations = append(allLocations, loc)
|
||||
continue
|
||||
}
|
||||
@@ -106,3 +108,19 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool {
|
||||
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
|
||||
if volumeId > 0 {
|
||||
return func(v *master_pb.VolumeInformationMessage) bool {
|
||||
return v.Id == volumeId && v.ReplicaPlacement != replicaPlacementInt32
|
||||
}
|
||||
}
|
||||
return func(v *master_pb.VolumeInformationMessage) bool {
|
||||
matched, err := filepath.Match(collectionPattern, v.Collection)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return matched
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.77)
|
||||
VERSION_NUMBER = fmt.Sprintf("%.02f", 2.79)
|
||||
VERSION = sizeLimit + " " + VERSION_NUMBER
|
||||
COMMIT = ""
|
||||
)
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -69,13 +70,21 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
|
||||
if !found {
|
||||
return nil, fmt.Errorf("volume %d not found", id)
|
||||
}
|
||||
var sameDcServers, otherDcServers []string
|
||||
for _, loc := range locations {
|
||||
if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
|
||||
serverUrls = append(serverUrls, loc.Url)
|
||||
otherDcServers = append(otherDcServers, loc.Url)
|
||||
} else {
|
||||
serverUrls = append([]string{loc.Url}, serverUrls...)
|
||||
sameDcServers = append(sameDcServers, loc.Url)
|
||||
}
|
||||
}
|
||||
rand.Shuffle(len(sameDcServers), func(i, j int) {
|
||||
sameDcServers[i], sameDcServers[j] = sameDcServers[j], sameDcServers[i]
|
||||
})
|
||||
rand.Shuffle(len(otherDcServers), func(i, j int) {
|
||||
otherDcServers[i], otherDcServers[j] = otherDcServers[j], otherDcServers[i]
|
||||
})
|
||||
serverUrls = append(sameDcServers, otherDcServers...)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user