filer prefer volume server in same data center (#3405)

* initial prefer same data center
https://github.com/seaweedfs/seaweedfs/issues/3404

* GetDataCenter

* prefer same data center for ReplicationSource

* GetDataCenterId

* remove glog
This commit is contained in:
Konstantin Lebedev
2022-08-05 05:35:00 +05:00
committed by GitHub
parent 28a1f42962
commit 4d08393b7c
36 changed files with 925 additions and 795 deletions

View File

@@ -105,9 +105,10 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
}
for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: uint32(loc.GrpcPort),
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: uint32(loc.GrpcPort),
DataCenter: loc.DataCenter,
})
}
resp.LocationsMap[vidString] = &filer_pb.Locations{

View File

@@ -49,7 +49,17 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
return
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
assignUrl := assignResult.Url
// Prefer same data center
if fs.option.DataCenter != "" {
for _, repl := range assignResult.Replicas {
if repl.DataCenter == fs.option.DataCenter {
assignUrl = repl.Url
break
}
}
}
urlLocation = "http://" + assignUrl + "/" + assignResult.Fid
if so.Fsync {
urlLocation += "?fsync=true"
}

View File

@@ -180,7 +180,6 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
time.Sleep(time.Duration(i+1) * 251 * time.Millisecond)
continue
}
// upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {

View File

@@ -137,14 +137,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
var dataCenter string
if dc := dn.GetDataCenter(); dc != nil {
dataCenter = string(dc.Id())
}
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
DataCenter: dataCenter,
DataCenter: dn.GetDataCenterId(),
}
if len(heartbeat.NewVolumes) > 0 {
stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()

View File

@@ -85,8 +85,9 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV
var locations []*master_pb.Location
for _, loc := range result.Locations {
locations = append(locations, &master_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
Url: loc.Url,
PublicUrl: loc.PublicUrl,
DataCenter: loc.DataCenter,
})
}
var auth string
@@ -165,17 +166,19 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
var replicas []*master_pb.Location
for _, r := range dnList.Rest() {
replicas = append(replicas, &master_pb.Location{
Url: r.Url(),
PublicUrl: r.PublicUrl,
GrpcPort: uint32(r.GrpcPort),
Url: r.Url(),
PublicUrl: r.PublicUrl,
GrpcPort: uint32(r.GrpcPort),
DataCenter: r.GetDataCenterId(),
})
}
return &master_pb.AssignResponse{
Fid: fid,
Location: &master_pb.Location{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
GrpcPort: uint32(dn.GrpcPort),
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
GrpcPort: uint32(dn.GrpcPort),
DataCenter: dn.GetDataCenterId(),
},
Count: count,
Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
@@ -253,8 +256,9 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
var locations []*master_pb.Location
for _, dn := range shardLocations {
locations = append(locations, &master_pb.Location{
Url: string(dn.Id()),
PublicUrl: dn.PublicUrl,
Url: string(dn.Id()),
PublicUrl: dn.PublicUrl,
DataCenter: dn.GetDataCenterId(),
})
}
resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{

View File

@@ -72,13 +72,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
} else {
machines := ms.Topo.Lookup(collection, volumeId)
for _, loc := range machines {
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
locations = append(locations, operation.Location{
Url: loc.Url(), PublicUrl: loc.PublicUrl, DataCenter: loc.GetDataCenterId(),
})
}
}
} else {
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
for _, loc := range machines {
locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl})
locations = append(locations, operation.Location{
Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter,
})
}
err = getVidLocationsErr
}

View File

@@ -137,6 +137,9 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (fs *WebDavFileSystem) GetDataCenter() string {
return fs.filer.MasterClient.DataCenter
}
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")