@@ -257,7 +257,3 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) {
|
||||
|
||||
}
|
||||
|
||||
69
weed/server/volume_grpc_query.go
Normal file
69
weed/server/volume_grpc_query.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/query/json"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error {
|
||||
|
||||
for _, fid := range req.FromFileIds {
|
||||
|
||||
vid, id_cookie, err := operation.ParseFileId(fid)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err)
|
||||
return err
|
||||
}
|
||||
|
||||
n := new(needle.Needle)
|
||||
volumeId, _ := needle.NewVolumeId(vid)
|
||||
n.ParsePath(id_cookie)
|
||||
|
||||
cookie := n.Cookie
|
||||
if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
|
||||
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if n.Cookie != cookie {
|
||||
glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if req.InputSerialization.CsvInput != nil {
|
||||
|
||||
}
|
||||
|
||||
if req.InputSerialization.JsonInput != nil {
|
||||
|
||||
stripe := &volume_server_pb.QueriedStripe{
|
||||
Records: nil,
|
||||
}
|
||||
|
||||
filter := json.Query{
|
||||
Field: req.Filter.Field,
|
||||
Op: req.Filter.Operand,
|
||||
Value: req.Filter.Value,
|
||||
}
|
||||
gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool {
|
||||
passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter)
|
||||
if !passedFilter {
|
||||
return true
|
||||
}
|
||||
stripe.Records = json.ToJson(stripe.Records, req.Selections, values)
|
||||
return true
|
||||
})
|
||||
err = stream.Send(stripe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user