convert to parquet schema and value

This commit is contained in:
chrislu
2024-04-17 23:49:21 -07:00
parent ce2b2fa9b2
commit 0847a17484
7 changed files with 358 additions and 84 deletions

View File

@@ -1,32 +1,26 @@
package schema
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
type Schema struct {
RecordType *schema_pb.RecordType
indexedFields []*schema_pb.Field
fieldMap map[string]*schema_pb.Field
}
func NewSchema(recordType *schema_pb.RecordType) (*Schema, error) {
var indexedFields []*schema_pb.Field
var largestIndex int32
var fieldMap map[string]*schema_pb.Field
for _, field := range recordType.Fields {
if field.Index > largestIndex {
largestIndex = field.Index
}
if field.Index < 0 {
return nil, fmt.Errorf("field %s index %d is negative", field.Name, field.Index)
}
}
indexedFields = make([]*schema_pb.Field, largestIndex+1)
for _, field := range recordType.Fields {
indexedFields[field.Index] = field
fieldMap[field.Name] = field
}
return &Schema{
RecordType: recordType,
indexedFields: indexedFields,
fieldMap: fieldMap,
}, nil
}
func (s *Schema) GetField(name string) (*schema_pb.Field, bool) {
field, ok := s.fieldMap[name]
return field, ok
}

View File

@@ -0,0 +1,65 @@
package schema
import (
"fmt"
parquet "github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
rootNode, err := toParquetFieldTypeRecord(recordType)
if err != nil {
return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err)
}
return parquet.NewSchema(topicName, rootNode), nil
}
func toParquetFieldType(field *schema_pb.Field) (parquet.Node, error) {
var (
dataType parquet.Node
err error
)
switch field.Type.Kind.(type) {
case *schema_pb.Type_ScalarType:
dataType, err = toParquetFieldTypeScalar(field.Type.GetScalarType())
case *schema_pb.Type_RecordType:
dataType, err = toParquetFieldTypeRecord(field.Type.GetRecordType())
default:
return nil, fmt.Errorf("unknown field type: %T", field.Type.Kind)
}
return dataType, err
}
func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
switch scalarType {
case schema_pb.ScalarType_BOOLEAN:
return parquet.Leaf(parquet.BooleanType), nil
case schema_pb.ScalarType_INTEGER:
return parquet.Leaf(parquet.Int32Type), nil
case schema_pb.ScalarType_LONG:
return parquet.Leaf(parquet.Int64Type), nil
case schema_pb.ScalarType_FLOAT:
return parquet.Leaf(parquet.FloatType), nil
case schema_pb.ScalarType_DOUBLE:
return parquet.Leaf(parquet.DoubleType), nil
case schema_pb.ScalarType_BYTES:
return parquet.Leaf(parquet.ByteArrayType), nil
case schema_pb.ScalarType_STRING:
return parquet.String(), nil
default:
return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
}
}
func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
recordNode := parquet.Group{}
for _, field := range recordType.Fields {
parquetFieldType, err := toParquetFieldType(field)
if err != nil {
return nil, err
}
recordNode[field.Name] = parquetFieldType
}
return recordNode, nil
}

View File

@@ -0,0 +1,86 @@
package schema
import (
"fmt"
parquet "github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func AddRecordValue(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value) error {
visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) error {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
parquetValue, err := toParquetValue(fieldValue)
if err != nil {
return err
}
rowBuilder.Add(index, parquetValue)
}
return nil
}
return visitValue(fieldType, fieldValue, visitor)
}
// typeValueVisitor is a function that is called for each value in a schema_pb.Value
// Find the column index.
// intended to be used in RowBuilder.Add(columnIndex, value)
type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) error
func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
_, err = doVisitValue(fieldType, fieldValue, 0, visitor)
return
}
// endIndex is exclusive
// same logic as RowBuilder.configure in row_builder.go
func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
return columnIndex+1, visitor(fieldType, fieldValue, columnIndex)
case *schema_pb.Type_ListType:
for _, value := range fieldValue.GetListValue().Values {
err = visitor(fieldType, value, columnIndex)
if err != nil {
return
}
}
return columnIndex+1, nil
case *schema_pb.Type_RecordType:
for _, field := range fieldType.GetRecordType().Fields {
fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
if !found {
// TODO check this if no such field found
return columnIndex, nil
}
endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor)
if err != nil {
return
}
columnIndex = endIndex
}
return
}
return
}
func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
switch value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return parquet.BooleanValue(value.GetBoolValue()), nil
case *schema_pb.Value_Int32Value:
return parquet.Int32Value(value.GetInt32Value()), nil
case *schema_pb.Value_Int64Value:
return parquet.Int64Value(value.GetInt64Value()), nil
case *schema_pb.Value_FloatValue:
return parquet.FloatValue(value.GetFloatValue()), nil
case *schema_pb.Value_DoubleValue:
return parquet.DoubleValue(value.GetDoubleValue()), nil
case *schema_pb.Value_BytesValue:
return parquet.ByteArrayValue(value.GetBytesValue()), nil
case *schema_pb.Value_StringValue:
return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
default:
return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
}
}