a little bit more efficient
This commit is contained in:
@@ -30,11 +30,7 @@ func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
|
func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error {
|
||||||
parquetLevels, err := ToParquetLevels(recordType)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
|
visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
|
||||||
return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
|
return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,11 +9,7 @@ import (
|
|||||||
// ToRecordValue converts a parquet.Row to a schema_pb.RecordValue
|
// ToRecordValue converts a parquet.Row to a schema_pb.RecordValue
|
||||||
// This does not work or did not test with nested structures.
|
// This does not work or did not test with nested structures.
|
||||||
// Using this may fail to convert the parquet.Row to schema_pb.RecordValue
|
// Using this may fail to convert the parquet.Row to schema_pb.RecordValue
|
||||||
func ToRecordValue(recordType *schema_pb.RecordType, row parquet.Row) (*schema_pb.RecordValue, error) {
|
func ToRecordValue(recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, row parquet.Row) (*schema_pb.RecordValue, error) {
|
||||||
parquetLevels, err := ToParquetLevels(recordType)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
values := []parquet.Value(row)
|
values := []parquet.Value(row)
|
||||||
recordValue, _, err := toRecordValue(recordType, parquetLevels, values, 0)
|
recordValue, _, err := toRecordValue(recordType, parquetLevels, values, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -52,6 +52,11 @@ func TestWriteReadParquet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) {
|
func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) {
|
||||||
|
parquetLevels, err := ToParquetLevels(recordType)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ToParquetLevels failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// create a parquet file
|
// create a parquet file
|
||||||
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664)
|
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -75,7 +80,7 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
|
|||||||
fmt.Sprintf("john_%d@d.com", i),
|
fmt.Sprintf("john_%d@d.com", i),
|
||||||
fmt.Sprintf("john_%d@e.com", i))).
|
fmt.Sprintf("john_%d@e.com", i))).
|
||||||
AddStringValue("Company", fmt.Sprintf("company_%d", i)).Build()
|
AddStringValue("Company", fmt.Sprintf("company_%d", i)).Build()
|
||||||
AddRecordValue(rowBuilder, recordType, recordValue)
|
AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue)
|
||||||
|
|
||||||
if count < 10 {
|
if count < 10 {
|
||||||
fmt.Printf("RecordValue: %v\n", recordValue)
|
fmt.Printf("RecordValue: %v\n", recordValue)
|
||||||
@@ -101,6 +106,11 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) {
|
func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) {
|
||||||
|
parquetLevels, err := ToParquetLevels(recordType)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ToParquetLevels failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// read the parquet file
|
// read the parquet file
|
||||||
file, err := os.Open(filename)
|
file, err := os.Open(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -120,7 +130,7 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque
|
|||||||
for i := 0; i < rowCount; i++ {
|
for i := 0; i < rowCount; i++ {
|
||||||
row := rows[i]
|
row := rows[i]
|
||||||
// convert parquet row to schema_pb.RecordValue
|
// convert parquet row to schema_pb.RecordValue
|
||||||
recordValue, err := ToRecordValue(recordType, row)
|
recordValue, err := ToRecordValue(recordType, parquetLevels, row)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ToRecordValue failed: %v", err)
|
t.Fatalf("ToRecordValue failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user