This commit is contained in:
chrislu
2024-05-20 11:03:56 -07:00
parent d3032d1e80
commit d218fe54fa
26 changed files with 92 additions and 95 deletions

View File

@@ -6,17 +6,17 @@ import (
type Schema struct {
RecordType *schema_pb.RecordType
fieldMap map[string]*schema_pb.Field
fieldMap map[string]*schema_pb.Field
}
func NewSchema(recordType *schema_pb.RecordType) (*Schema, error) {
fieldMap := make( map[string]*schema_pb.Field)
fieldMap := make(map[string]*schema_pb.Field)
for _, field := range recordType.Fields {
fieldMap[field.Name] = field
}
return &Schema{
RecordType: recordType,
fieldMap: fieldMap,
fieldMap: fieldMap,
}, nil
}

View File

@@ -8,9 +8,9 @@ import (
var (
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
)

View File

@@ -32,10 +32,10 @@ func TestEnumScalarType(t *testing.T) {
func TestField(t *testing.T) {
field := &Field{
Name: "field_name",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
Name: "field_name",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
}
assert.NotNil(t, field)
}
@@ -44,32 +44,32 @@ func TestRecordType(t *testing.T) {
subRecord := &RecordType{
Fields: []*Field{
{
Name: "field_1",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
Name: "field_1",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
},
{
Name: "field_2",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_STRING}},
FieldIndex: 2,
IsRepeated: false,
Name: "field_2",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_STRING}},
FieldIndex: 2,
IsRepeated: false,
},
},
}
record := &RecordType{
Fields: []*Field{
{
Name: "field_key",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
Name: "field_key",
Type: &Type{Kind: &Type_ScalarType{ScalarType: ScalarType_INT32}},
FieldIndex: 1,
IsRepeated: false,
},
{
Name: "field_record",
Type: &Type{Kind: &Type_RecordType{RecordType: subRecord}},
FieldIndex: 2,
IsRepeated: false,
Name: "field_record",
Type: &Type{Kind: &Type_RecordType{RecordType: subRecord}},
FieldIndex: 2,
IsRepeated: false,
},
},
}

View File

@@ -76,7 +76,7 @@ func TestStructToSchema(t *testing.T) {
RecordTypeBegin().
WithField("Field3", TypeString).
WithField("Field4", TypeInt32).
RecordTypeEnd(),
RecordTypeEnd(),
).
RecordTypeEnd(),
},
@@ -104,7 +104,7 @@ func TestStructToSchema(t *testing.T) {
RecordTypeBegin().
WithField("Field6", TypeString).
WithField("Field7", TypeBytes).
RecordTypeEnd(),
RecordTypeEnd(),
).RecordTypeEnd(),
).
RecordTypeEnd(),

View File

@@ -7,9 +7,9 @@ import (
type ParquetLevels struct {
startColumnIndex int
endColumnIndex int
definitionDepth int
levels map[string]*ParquetLevels
endColumnIndex int
definitionDepth int
levels map[string]*ParquetLevels
}
func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
@@ -19,7 +19,7 @@ func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
func toFieldTypeLevels(fieldType *schema_pb.Type, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
case *schema_pb.Type_RecordType:
return toRecordTypeLevels(fieldType.GetRecordType(), startColumnIndex, definitionDepth)
case *schema_pb.Type_ListType:
@@ -35,15 +35,15 @@ func toFieldTypeListLevels(listType *schema_pb.ListType, startColumnIndex, defin
func toFieldTypeScalarLevels(scalarType schema_pb.ScalarType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
return &ParquetLevels{
startColumnIndex: startColumnIndex,
endColumnIndex: startColumnIndex + 1,
definitionDepth: definitionDepth,
endColumnIndex: startColumnIndex + 1,
definitionDepth: definitionDepth,
}, nil
}
func toRecordTypeLevels(recordType *schema_pb.RecordType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
recordTypeLevels := &ParquetLevels{
startColumnIndex: startColumnIndex,
definitionDepth: definitionDepth,
levels: make(map[string]*ParquetLevels),
definitionDepth: definitionDepth,
levels: make(map[string]*ParquetLevels),
}
for _, field := range recordType.Fields {
fieldTypeLevels, err := toFieldTypeLevels(field.Type, startColumnIndex, definitionDepth+1)

View File

@@ -11,9 +11,9 @@ func TestToParquetLevels(t *testing.T) {
recordType *schema_pb.RecordType
}
tests := []struct {
name string
args args
want *ParquetLevels
name string
args args
want *ParquetLevels
}{
{
name: "nested type",
@@ -25,13 +25,13 @@ func TestToParquetLevels(t *testing.T) {
RecordTypeBegin().
WithField("zName", TypeString).
WithField("emails", ListOf(TypeString)).
RecordTypeEnd()).
RecordTypeEnd()).
WithField("Company", TypeString).
WithRecordField("Address",
RecordTypeBegin().
WithField("Street", TypeString).
WithField("City", TypeString).
RecordTypeEnd()).
RecordTypeEnd()).
RecordTypeEnd(),
},
want: &ParquetLevels{

View File

@@ -31,7 +31,6 @@ func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err e
return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
}
return dataType, err
}

View File

@@ -70,7 +70,7 @@ func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *
return
}
func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
switch value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return parquet.BooleanValue(value.GetBoolValue()), nil

View File

@@ -47,7 +47,7 @@ func toRecordValue(recordType *schema_pb.RecordType, levels *ParquetLevels, valu
func toListValue(listType *schema_pb.ListType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (listValue *schema_pb.Value, endValueIndex int, err error) {
listValues := make([]*schema_pb.Value, 0)
var value *schema_pb.Value
for ;valueIndex < len(values); {
for valueIndex < len(values) {
if values[valueIndex].Column() != levels.startColumnIndex {
break
}
@@ -67,19 +67,19 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value
}
switch scalarType {
case schema_pb.ScalarType_BOOL:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex + 1, nil
case schema_pb.ScalarType_INT32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex + 1, nil
case schema_pb.ScalarType_INT64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex + 1, nil
case schema_pb.ScalarType_FLOAT:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex + 1, nil
case schema_pb.ScalarType_DOUBLE:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil
case schema_pb.ScalarType_BYTES:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex + 1, nil
case schema_pb.ScalarType_STRING:
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex+1, nil
return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex + 1, nil
}
return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType)
}

View File

@@ -19,13 +19,13 @@ func TestWriteReadParquet(t *testing.T) {
RecordTypeBegin().
WithField("zName", TypeString).
WithField("emails", ListOf(TypeString)).
RecordTypeEnd()).
RecordTypeEnd()).
WithField("Company", TypeString).
WithRecordField("Address",
RecordTypeBegin().
WithField("Street", TypeString).
WithField("City", TypeString).
RecordTypeEnd()).
RecordTypeEnd()).
RecordTypeEnd()
fmt.Printf("RecordType: %v\n", recordType)
@@ -85,9 +85,9 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
fmt.Sprintf("john_%d@c.com", i),
fmt.Sprintf("john_%d@d.com", i),
fmt.Sprintf("john_%d@e.com", i)).
RecordEnd()).
RecordEnd()).
SetString("Company", fmt.Sprintf("company_%d", i)).
RecordEnd()
RecordEnd()
AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue)
if count < 10 {