parquet-go can read nested objects, but not parquet cli (Parquet/Avro schema mismatch)
Opened this issue · 1 comments
remiphilippe commented
Hello,
I'm seeing a weird situation which I can't explain. I've created a parquet file in go, I can read it in go, I can read the schema from the cli, but I get an error when I cat it.
Note: I've never used the parquet command before, could be a user error too, I'm using the brew install parquet-cli
Note2: If I remove the nested structure it works fine
code:
package main
import (
"bytes"
"flag"
"os"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
"github.com/segmentio/parquet-go"
)
type ParquetOtherElements struct {
ElementId string `parquet:"elementId"`
Type string `parquet:"type,enum"`
RehomedFrom []string `parquet:"rehomedFrom,list"`
}
type ParquetElement struct {
ElementId string `parquet:"elementId"`
Type string `parquet:"type,enum"`
Label string `parquet:"label"`
Status string `parquet:"status,enum"`
Element string `parquet:"element,json"`
}
type ParquetStruct struct {
TenantId string `parquet:"tenantId"`
TriggerProviderId string `parquet:"triggerProviderId"`
TriggerEventId string `parquet:"triggerEventId"`
TriggerProcessorType string `parquet:"triggerProcessorType"`
EventTime int64 `parquet:"eventTime,timestamp(nanosecond)"`
EventFlagged bool `parquet:"eventFlagged"`
Nodes []ParquetElement `parquet:"nodes,list"`
Edges []ParquetElement `parquet:"edges,list"`
OtherElements []ParquetOtherElements `parquet:"otherElements,optional,list"`
}
func main() {
flag.Set("alsologtostderr", "true")
flag.Set("v", "3")
flag.Parse()
content := new(bytes.Buffer)
w := parquet.NewGenericWriter[ParquetStruct](content)
p := []ParquetStruct{
{
TenantId: "tenantIdV",
TriggerProviderId: "triggerProviderIdV",
TriggerEventId: "triggerEventIdV",
TriggerProcessorType: "triggerProcessorTypeV",
EventFlagged: false,
EventTime: time.Now().UnixNano(),
Nodes: []ParquetElement{{ElementId: "elementIdV", Type: "typeV", Label: "labelV", Status: "statusV", Element: "{}"}},
},
}
spew.Dump(p)
if _, err := w.Write(p); err != nil {
glog.Fatal(err)
}
if err := w.Close(); err != nil {
glog.Fatal(err)
}
// write file from content buffer
f, err := os.Create("myfile.parquet")
if err != nil {
glog.Fatal(err)
}
defer f.Close()
if _, err := f.Write(content.Bytes()); err != nil {
glog.Fatal(err)
}
glog.Infoln("reading buffer....")
file := bytes.NewReader(content.Bytes())
rows, err := parquet.Read[ParquetStruct](file, file.Size())
if err != nil {
glog.Fatal(err)
}
spew.Dump(rows)
}
bob ~/scratch/parquet go run .
([]main.ParquetStruct) (len=1 cap=1) {
(main.ParquetStruct) {
TenantId: (string) (len=9) "tenantIdV",
TriggerProviderId: (string) (len=18) "triggerProviderIdV",
TriggerEventId: (string) (len=15) "triggerEventIdV",
TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
EventTime: (int64) 1678235792424865000,
EventFlagged: (bool) false,
Nodes: ([]main.ParquetElement) (len=1 cap=1) {
(main.ParquetElement) {
ElementId: (string) (len=10) "elementIdV",
Type: (string) (len=5) "typeV",
Label: (string) (len=6) "labelV",
Status: (string) (len=7) "statusV",
Element: (string) (len=2) "{}"
}
},
Edges: ([]main.ParquetElement) <nil>,
OtherElements: ([]main.ParquetOtherElements) <nil>
}
}
I0307 16:36:32.428573 62343 main.go:76] reading buffer....
([]main.ParquetStruct) (len=1 cap=1) {
(main.ParquetStruct) {
TenantId: (string) (len=9) "tenantIdV",
TriggerProviderId: (string) (len=18) "triggerProviderIdV",
TriggerEventId: (string) (len=15) "triggerEventIdV",
TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
EventTime: (int64) 1678235792424865000,
EventFlagged: (bool) false,
Nodes: ([]main.ParquetElement) (len=1 cap=1) {
(main.ParquetElement) {
ElementId: (string) (len=10) "elementIdV",
Type: (string) (len=5) "typeV",
Label: (string) (len=6) "labelV",
Status: (string) (len=7) "statusV",
Element: (string) (len=2) "{}"
}
},
Edges: ([]main.ParquetElement) {
},
OtherElements: ([]main.ParquetOtherElements) <nil>
}
}
bob ~/scratch/parquet parquet cat myfile.parquet
Unknown error
java.lang.RuntimeException: Failed on record 0
at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:86)
at org.apache.parquet.cli.Main.run(Main.java:157)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.parquet.cli.Main.main(Main.java:187)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'elementId' not found
at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at org.apache.parquet.cli.BaseCommand$1$1.advance(BaseCommand.java:363)
at org.apache.parquet.cli.BaseCommand$1$1.<init>(BaseCommand.java:344)
at org.apache.parquet.cli.BaseCommand$1.iterator(BaseCommand.java:342)
at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:73)
... 3 more
1 bob ~/scratch/parquet parquet schema myfile.parquet
{
"type" : "record",
"name" : "ParquetStruct",
"fields" : [ {
"name" : "tenantId",
"type" : "string"
}, {
"name" : "triggerProviderId",
"type" : "string"
}, {
"name" : "triggerEventId",
"type" : "string"
}, {
"name" : "triggerProcessorType",
"type" : "string"
}, {
"name" : "eventTime",
"type" : "long"
}, {
"name" : "eventFlagged",
"type" : "boolean"
}, {
"name" : "nodes",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "list",
"fields" : [ {
"name" : "element",
"type" : {
"type" : "record",
"name" : "element",
"fields" : [ {
"name" : "elementId",
"type" : "string"
}, {
"name" : "type",
"type" : "string"
}, {
"name" : "label",
"type" : "string"
}, {
"name" : "status",
"type" : "string"
}, {
"name" : "element",
"type" : "bytes"
} ]
}
} ]
}
}
}, {
"name" : "edges",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "list",
"namespace" : "list2",
"fields" : [ {
"name" : "element",
"type" : {
"type" : "record",
"name" : "element",
"namespace" : "element2",
"fields" : [ {
"name" : "elementId",
"type" : "string"
}, {
"name" : "type",
"type" : "string"
}, {
"name" : "label",
"type" : "string"
}, {
"name" : "status",
"type" : "string"
}, {
"name" : "element",
"type" : "bytes"
} ]
}
} ]
}
}
}, {
"name" : "otherElements",
"type" : [ "null", {
"type" : "array",
"items" : {
"type" : "record",
"name" : "list",
"namespace" : "list3",
"fields" : [ {
"name" : "element",
"type" : {
"type" : "record",
"name" : "element",
"namespace" : "element3",
"fields" : [ {
"name" : "elementId",
"type" : "string"
}, {
"name" : "type",
"type" : "string"
}, {
"name" : "rehomedFrom",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "list",
"namespace" : "list4",
"fields" : [ {
"name" : "element",
"type" : "string"
} ]
}
}
} ]
}
} ]
}
} ],
"default" : null
} ]
}
bob ~/scratch/parquet
remiphilippe commented
am I the only one seeing this?