use grpc to replace http APIs for batch volume id lookup and batch delete
1. remove batch volume id lookup http API /vol/lookup 2. remove batch delete http API /delete
This commit is contained in:
@@ -10,8 +10,8 @@ import (
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -70,16 +70,22 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
|
||||
}
|
||||
|
||||
func (cm *ChunkManifest) DeleteChunks(master string) error {
|
||||
deleteError := 0
|
||||
var fileIds []string
|
||||
for _, ci := range cm.Chunks {
|
||||
if e := DeleteFile(master, ci.Fid, ""); e != nil {
|
||||
deleteError++
|
||||
glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
|
||||
fileIds = append(fileIds, ci.Fid)
|
||||
}
|
||||
results, err := DeleteFiles(master, fileIds)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("delete %+v: %v", fileIds, err)
|
||||
return fmt.Errorf("chunk delete: %v", err)
|
||||
}
|
||||
for _, result := range results {
|
||||
if result.Error != "" {
|
||||
glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
|
||||
return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error)
|
||||
}
|
||||
}
|
||||
if deleteError > 0 {
|
||||
return errors.New("Not all chunks deleted.")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,15 @@
|
||||
package operation
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"net/http"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type DeleteResult struct {
|
||||
@@ -22,27 +19,6 @@ type DeleteResult struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error {
|
||||
err := util.Delete(fileUrlOnVolume, jwt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
|
||||
fileUrl, err := LookupFileId(master, fileId)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("Delete %s lookup: %v, master: %s", fileId, err, master)
|
||||
return nil
|
||||
}
|
||||
err = util.Delete(fileUrl, jwt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to delete %s:%v", fileUrl, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseFileId(fid string) (vid string, key_cookie string, err error) {
|
||||
commaIndex := strings.Index(fid, ",")
|
||||
if commaIndex <= 0 {
|
||||
@@ -51,20 +27,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
|
||||
return fid[:commaIndex], fid[commaIndex+1:], nil
|
||||
}
|
||||
|
||||
type DeleteFilesResult struct {
|
||||
Errors []string
|
||||
Results []DeleteResult
|
||||
}
|
||||
// DeleteFiles batch deletes a list of fileIds
|
||||
func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
|
||||
|
||||
var ret []*volume_server_pb.DeleteResult
|
||||
|
||||
func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
|
||||
vid_to_fileIds := make(map[string][]string)
|
||||
ret := &DeleteFilesResult{}
|
||||
var vids []string
|
||||
for _, fileId := range fileIds {
|
||||
vid, _, err := ParseFileId(fileId)
|
||||
if err != nil {
|
||||
ret.Results = append(ret.Results, DeleteResult{
|
||||
Fid: vid,
|
||||
ret = append(ret, &volume_server_pb.DeleteResult{
|
||||
FileId: vid,
|
||||
Status: http.StatusBadRequest,
|
||||
Error: err.Error()},
|
||||
)
|
||||
@@ -85,7 +59,11 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
|
||||
server_to_fileIds := make(map[string][]string)
|
||||
for vid, result := range lookupResults {
|
||||
if result.Error != "" {
|
||||
ret.Errors = append(ret.Errors, result.Error)
|
||||
ret = append(ret, &volume_server_pb.DeleteResult{
|
||||
FileId: vid,
|
||||
Status: http.StatusBadRequest,
|
||||
Error: err.Error()},
|
||||
)
|
||||
continue
|
||||
}
|
||||
for _, location := range result.Locations {
|
||||
@@ -103,25 +81,52 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
|
||||
wg.Add(1)
|
||||
go func(server string, fidList []string) {
|
||||
defer wg.Done()
|
||||
values := make(url.Values)
|
||||
for _, fid := range fidList {
|
||||
values.Add("fid", fid)
|
||||
|
||||
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil {
|
||||
err = deleteErr
|
||||
} else {
|
||||
ret = append(ret, deleteResults...)
|
||||
}
|
||||
jsonBlob, err := util.Post("http://"+server+"/delete", values)
|
||||
if err != nil {
|
||||
ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
|
||||
return
|
||||
}
|
||||
var result []DeleteResult
|
||||
err = json.Unmarshal(jsonBlob, &result)
|
||||
if err != nil {
|
||||
ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
|
||||
return
|
||||
}
|
||||
ret.Results = append(ret.Results, result...)
|
||||
|
||||
}(server, fidList)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return ret, nil
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
|
||||
func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
|
||||
|
||||
err = withVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
req := &volume_server_pb.BatchDeleteRequest{
|
||||
FileIds: fileIds,
|
||||
}
|
||||
|
||||
resp, err := volumeServerClient.BatchDelete(context.Background(), req)
|
||||
|
||||
fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ret = append(ret, resp.Results...)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, result := range ret {
|
||||
if result.Error != "" {
|
||||
return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
53
weed/operation/grpc_client.go
Normal file
53
weed/operation/grpc_client.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package operation
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"strconv"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func withVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
|
||||
|
||||
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
grpcConnection, err := util.GrpcDial(grpcAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to dial %s: %v", grpcAddress, err)
|
||||
}
|
||||
defer grpcConnection.Close()
|
||||
|
||||
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
|
||||
|
||||
return fn(client)
|
||||
}
|
||||
|
||||
func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) {
|
||||
sepIndex := strings.LastIndex(volumeServer, ":")
|
||||
port, err := strconv.Atoi(volumeServer[sepIndex+1:])
|
||||
if err != nil {
|
||||
glog.Errorf("failed to parse volume server address: %v", volumeServer)
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
|
||||
}
|
||||
|
||||
func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
|
||||
|
||||
grpcConnection, err := util.GrpcDial(masterServer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail to dial %s: %v", masterServer, err)
|
||||
}
|
||||
defer grpcConnection.Close()
|
||||
|
||||
client := master_pb.NewSeaweedClient(grpcConnection)
|
||||
|
||||
return fn(client)
|
||||
}
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"context"
|
||||
)
|
||||
|
||||
type Location struct {
|
||||
@@ -95,24 +97,41 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
|
||||
}
|
||||
|
||||
//only query unknown_vids
|
||||
values := make(url.Values)
|
||||
for _, vid := range unknown_vids {
|
||||
values.Add("volumeId", vid)
|
||||
}
|
||||
jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values)
|
||||
|
||||
err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
|
||||
req := &master_pb.LookupVolumeRequest{
|
||||
VolumeIds: unknown_vids,
|
||||
}
|
||||
resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
|
||||
if grpcErr != nil {
|
||||
return grpcErr
|
||||
}
|
||||
|
||||
//set newly checked vids to cache
|
||||
for _, vidLocations := range resp.VolumeIdLocations {
|
||||
var locations []Location
|
||||
for _, loc := range vidLocations.Locations {
|
||||
locations = append(locations, Location{
|
||||
Url: loc.Url,
|
||||
PublicUrl: loc.PublicUrl,
|
||||
})
|
||||
}
|
||||
if vidLocations.Error != "" {
|
||||
vc.Set(vidLocations.VolumeId, locations, 10*time.Minute)
|
||||
}
|
||||
ret[vidLocations.VolumeId] = LookupResult{
|
||||
VolumeId: vidLocations.VolumeId,
|
||||
Locations: locations,
|
||||
Error: vidLocations.Error,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = json.Unmarshal(jsonBlob, &ret)
|
||||
if err != nil {
|
||||
return nil, errors.New(err.Error() + " " + string(jsonBlob))
|
||||
}
|
||||
|
||||
//set newly checked vids to cache
|
||||
for _, vid := range unknown_vids {
|
||||
locations := ret[vid].Locations
|
||||
vc.Set(vid, locations, 10*time.Minute)
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user