remove ctx if possible
This commit is contained in:
@@ -232,9 +232,8 @@ func (fs *FilerServer) StreamDeleteEntries(stream filer_pb.SeaweedFiler_StreamDe
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive delete entry request: %v", err)
|
||||
}
|
||||
ctx := context.Background()
|
||||
fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))
|
||||
err = fs.filer.DeleteEntryMetaAndData(ctx, fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
|
||||
err = fs.filer.DeleteEntryMetaAndData(context.Background(), fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
|
||||
resp := &filer_pb.DeleteEntryResponse{}
|
||||
if err != nil {
|
||||
resp.Error = err.Error()
|
||||
|
||||
@@ -42,7 +42,7 @@ func (vs *VolumeServer) heartbeat() {
|
||||
continue
|
||||
}
|
||||
vs.store.MasterAddress = master
|
||||
newLeader, err = vs.doHeartbeat(context.Background(), master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
|
||||
newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("heartbeat error: %v", err)
|
||||
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
|
||||
@@ -53,16 +53,16 @@ func (vs *VolumeServer) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
|
||||
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
|
||||
|
||||
grpcConection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
|
||||
grpcConection, err := util.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
|
||||
}
|
||||
defer grpcConection.Close()
|
||||
|
||||
client := master_pb.NewSeaweedClient(grpcConection)
|
||||
stream, err := client.SendHeartbeat(ctx)
|
||||
stream, err := client.SendHeartbeat(context.Background())
|
||||
if err != nil {
|
||||
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
|
||||
return "", err
|
||||
|
||||
@@ -55,15 +55,15 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
|
||||
// println("source:", volFileInfoResp.String())
|
||||
// copy ecx file
|
||||
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
|
||||
if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
|
||||
if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
|
||||
if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -95,10 +95,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
|
||||
}, err
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
|
||||
compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error {
|
||||
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
|
||||
|
||||
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
||||
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
|
||||
VolumeId: vid,
|
||||
Ext: ext,
|
||||
CompactionRevision: compactRevision,
|
||||
|
||||
@@ -110,7 +110,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||
|
||||
// copy ec data slices
|
||||
for _, shardId := range req.ShardIds {
|
||||
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
|
||||
if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -118,7 +118,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||
if req.CopyEcxFile {
|
||||
|
||||
// copy ecx file
|
||||
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
|
||||
if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -126,14 +126,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
|
||||
|
||||
if req.CopyEcjFile {
|
||||
// copy ecj file
|
||||
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
|
||||
if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if req.CopyVifFile {
|
||||
// copy vif file
|
||||
if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
|
||||
if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@@ -98,7 +97,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
|
||||
|
||||
if hasEcVolume {
|
||||
count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie)
|
||||
count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
|
||||
writeDeleteResult(err, count, w, r)
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user