s3: implemented DeleteMultipleObjects

This commit is contained in:
Chris Lu
2020-02-25 14:38:36 -08:00
parent 35dde56711
commit bc38b72a20
7 changed files with 366 additions and 109 deletions

View File

@@ -139,6 +139,61 @@ func (s3a *S3ApiServer) rm(ctx context.Context, parentDirectoryPath string, entr
}
func (s3a *S3ApiServer) streamRemove(ctx context.Context, quiet bool,
fn func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool),
respFn func(err string)) error {
return s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.StreamDeleteEntries(ctx)
if err != nil {
glog.V(0).Infof("stream delete entry: %v", err)
return fmt.Errorf("stream delete entry: %v", err)
}
waitc := make(chan struct{})
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
glog.V(0).Infof("streamRemove: %v", err)
return
}
respFn(resp.Error)
}
}()
for {
finished, parentDirectoryPath, entryName, isDeleteData, isRecursive := fn()
if finished {
break
}
err = stream.Send(&filer_pb.DeleteEntryRequest{
Directory: parentDirectoryPath,
Name: entryName,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: quiet,
})
if err != nil {
glog.V(0).Infof("streamRemove: %v", err)
break
}
}
stream.CloseSend()
<-waitc
return err
})
}
func (s3a *S3ApiServer) exists(ctx context.Context, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
err = s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {