works with single level of list

This commit is contained in:
chrislu
2024-04-24 23:04:47 -07:00
parent d7e5f6b2a5
commit d88c1872ac
4 changed files with 71 additions and 60 deletions

View File

@@ -6,28 +6,33 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) error {
func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) (endIndex int, err error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
parquetValue, err := toParquetValue(fieldValue)
endIndex = columnIndex+1
var parquetValue parquet.Value
parquetValue, err = toParquetValue(fieldValue)
if err != nil {
return err
return
}
rowBuilder.Add(columnIndex, parquetValue)
// fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
case *schema_pb.Type_ListType:
rowBuilder.Next(columnIndex)
// fmt.Printf("rowBuilder.Next %d\n", columnIndex)
elementType := fieldType.GetListType().ElementType
for _, value := range fieldValue.GetListValue().Values {
if err := rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil {
return err
if endIndex, err = rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil {
return
}
}
rowBuilder.Next(columnIndex)
}
return nil
return
}
func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) error {
visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) {
return rowBuilderVisit(rowBuilder, fieldType, fieldValue, index)
}
fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
@@ -38,7 +43,7 @@ func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.Record
// typeValueVisitor is a function that is called for each value in a schema_pb.Value
// Find the column index.
// intended to be used in RowBuilder.Add(columnIndex, value)
type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) error
type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error)
func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
_, err = doVisitValue(fieldType, fieldValue, 0, visitor)
@@ -50,9 +55,9 @@ func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor
func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
return columnIndex+1, visitor(fieldType, fieldValue, columnIndex)
return visitor(fieldType, fieldValue, columnIndex)
case *schema_pb.Type_ListType:
return columnIndex+1, visitor(fieldType, fieldValue, columnIndex)
return visitor(fieldType, fieldValue, columnIndex)
case *schema_pb.Type_RecordType:
for _, field := range fieldType.GetRecordType().Fields {
fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]