master: followers can also lookup and redirect
improve scalability
This commit is contained in:
@@ -95,7 +95,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
|
||||
return nil, err
|
||||
}
|
||||
var locs []*filer_pb.Location
|
||||
for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
|
||||
locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
for _, loc := range locations {
|
||||
locs = append(locs, &filer_pb.Location{
|
||||
Url: loc.Url,
|
||||
PublicUrl: loc.PublicUrl,
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/shell"
|
||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||
"google.golang.org/grpc"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
@@ -56,9 +58,11 @@ type MasterServer struct {
|
||||
clientChans map[string]chan *master_pb.VolumeLocation
|
||||
|
||||
grpcDialOpiton grpc.DialOption
|
||||
|
||||
MasterClient *wdclient.MasterClient
|
||||
}
|
||||
|
||||
func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
|
||||
func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
|
||||
|
||||
v := viper.GetViper()
|
||||
signingKey := v.GetString("jwt.signing.key")
|
||||
@@ -73,11 +77,14 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
|
||||
if option.VolumePreallocate {
|
||||
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
|
||||
}
|
||||
|
||||
grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
|
||||
ms := &MasterServer{
|
||||
option: option,
|
||||
preallocateSize: preallocateSize,
|
||||
clientChans: make(map[string]chan *master_pb.VolumeLocation),
|
||||
grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
|
||||
grpcDialOpiton: grpcDialOption,
|
||||
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
|
||||
}
|
||||
ms.bounedLeaderChan = make(chan int, 16)
|
||||
seq := sequence.NewMemorySequencer()
|
||||
@@ -92,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
|
||||
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
|
||||
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
|
||||
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
|
||||
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
|
||||
r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
|
||||
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
|
||||
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
|
||||
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
|
||||
@@ -102,7 +109,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
|
||||
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
|
||||
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
|
||||
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
|
||||
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
|
||||
r.HandleFunc("/{fileId}", ms.redirectHandler)
|
||||
}
|
||||
|
||||
ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
|
||||
|
||||
@@ -22,21 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
|
||||
if _, ok := volumeLocations[vid]; ok {
|
||||
continue
|
||||
}
|
||||
volumeId, err := needle.NewVolumeId(vid)
|
||||
if err == nil {
|
||||
machines := ms.Topo.Lookup(collection, volumeId)
|
||||
if machines != nil {
|
||||
var ret []operation.Location
|
||||
for _, dn := range machines {
|
||||
ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
|
||||
}
|
||||
volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret}
|
||||
} else {
|
||||
volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("volumeId %s not found.", vid)}
|
||||
}
|
||||
} else {
|
||||
volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("Unknown volumeId format: %s", vid)}
|
||||
}
|
||||
volumeLocations[vid] = ms.findVolumeLocation(collection, vid)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -59,10 +45,8 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
|
||||
vid = fileId[0:commaSep]
|
||||
}
|
||||
}
|
||||
vids := []string{vid}
|
||||
collection := r.FormValue("collection") //optional, but can be faster if too many collections
|
||||
volumeLocations := ms.lookupVolumeId(vids, collection)
|
||||
location := volumeLocations[vid]
|
||||
location := ms.findVolumeLocation(collection, vid)
|
||||
httpStatus := http.StatusOK
|
||||
if location.Error != "" {
|
||||
httpStatus = http.StatusNotFound
|
||||
@@ -74,6 +58,35 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
|
||||
writeJsonQuiet(w, r, httpStatus, location)
|
||||
}
|
||||
|
||||
// findVolumeLocation finds the volume location from master topo if it is leader,
|
||||
// or from master client if not leader
|
||||
func (ms *MasterServer) findVolumeLocation(collection string, vid string) operation.LookupResult {
|
||||
var locations []operation.Location
|
||||
var err error
|
||||
if ms.Topo.IsLeader() {
|
||||
volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
|
||||
machines := ms.Topo.Lookup(collection, volumeId)
|
||||
for _, loc := range machines {
|
||||
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
|
||||
}
|
||||
err = newVolumeIdErr
|
||||
} else {
|
||||
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
|
||||
for _, loc := range machines {
|
||||
locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl})
|
||||
}
|
||||
err = getVidLocationsErr
|
||||
}
|
||||
ret := operation.LookupResult{
|
||||
VolumeId: vid,
|
||||
Locations: locations,
|
||||
}
|
||||
if err != nil {
|
||||
ret.Error = err.Error()
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
|
||||
stats.AssignRequest()
|
||||
requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64)
|
||||
|
||||
@@ -95,23 +95,19 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
vid, _, _, _, _ := parseURLPath(r.URL.Path)
|
||||
volumeId, err := needle.NewVolumeId(vid)
|
||||
if err != nil {
|
||||
debug("parsing error:", err, r.URL.Path)
|
||||
return
|
||||
}
|
||||
collection := r.FormValue("collection")
|
||||
machines := ms.Topo.Lookup(collection, volumeId)
|
||||
if machines != nil && len(machines) > 0 {
|
||||
location := ms.findVolumeLocation(collection, vid)
|
||||
if location.Error == "" {
|
||||
loc := location.Locations[rand.Intn(len(location.Locations))]
|
||||
var url string
|
||||
if r.URL.RawQuery != "" {
|
||||
url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
|
||||
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
|
||||
} else {
|
||||
url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
|
||||
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
|
||||
}
|
||||
http.Redirect(w, r, url, http.StatusMovedPermanently)
|
||||
} else {
|
||||
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection))
|
||||
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user