adding grpc mutual tls

This commit is contained in:
Chris Lu
2019-02-18 12:11:52 -08:00
parent 55761ae806
commit 77b9af531d
53 changed files with 382 additions and 188 deletions

View File

@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
"google.golang.org/grpc"
"strings"
"time"
@@ -30,7 +31,7 @@ type AssignResult struct {
Auth security.EncodedJwt `json:"auth,omitempty"`
}
func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -44,7 +45,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
continue
}
lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
lastError = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"google.golang.org/grpc"
"io"
"net/http"
"sort"
@@ -69,12 +70,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
func (cm *ChunkManifest) DeleteChunks(master string) error {
func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
results, err := DeleteFiles(master, fileIds)
results, err := DeleteFiles(master, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -28,17 +29,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
return LookupVolumeIds(master, vids)
return LookupVolumeIds(master, grpcDialOption, vids)
}
return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
}
func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
@@ -92,7 +93,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
go func(server string, fidList []string) {
defer wg.Done()
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil {
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
err = deleteErr
} else {
ret = append(ret, deleteResults...)
@@ -106,9 +107,9 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

View File

@@ -18,7 +18,7 @@ var (
grpcClientsLock sync.Mutex
)
func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
@@ -28,7 +28,7 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, grpcAddress)
}, grpcAddress, grpcDialOption)
}
@@ -42,7 +42,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
func withMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0)
if parseErr != nil {
@@ -52,6 +52,6 @@ func withMasterServerClient(masterServer string, fn func(masterClient master_pb.
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress)
}, masterGrpcAddress, grpcDialOption)
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"google.golang.org/grpc"
"math/rand"
"net/url"
"strings"
@@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -98,7 +99,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
//only query unknown_vids
err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
err := withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

View File

@@ -2,14 +2,15 @@ package operation
import (
"context"
"google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
err = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()

View File

@@ -2,6 +2,7 @@ package operation
import (
"bytes"
"google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -36,7 +37,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
func SubmitFiles(master string, files []FilePart,
func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
@@ -49,7 +50,7 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
ret, err := Assign(master, ar)
ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -65,7 +66,7 @@ func SubmitFiles(master string, files []FilePart,
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
results[index].Size, err = file.Upload(maxMB, master, ret.Auth)
results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -108,7 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (retSize uint32, err error) {
func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -136,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
Collection: fi.Collection,
Ttl: fi.Ttl,
}
ret, err = Assign(master, ar)
ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -149,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
Collection: fi.Collection,
Ttl: fi.Ttl,
}
ret, err = Assign(master, ar)
ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
cm.DeleteChunks(master, grpcDialOption)
return
}
id = ret.Fid
@@ -170,7 +171,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
ret.Auth)
if e != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
cm.DeleteChunks(master, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -185,7 +186,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
cm.DeleteChunks(master, grpcDialOption)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)

View File

@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
"time"
@@ -11,9 +12,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -26,9 +27,9 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
return
}
func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})