segmentio/parquet-go

parquet-go can read nested objects, but not parquet cli (Parquet/Avro schema mismatch)

Opened this issue · 1 comments

Hello,
I'm seeing a weird situation which I can't explain. I've created a parquet file in go, I can read it in go, I can read the schema from the cli, but I get an error when I cat it.

Note: I've never used the parquet command before, could be a user error too, I'm using the brew install parquet-cli
Note2: If I remove the nested structure it works fine

code:

package main

import (
	"bytes"
	"flag"
	"os"
	"time"

	"github.com/davecgh/go-spew/spew"
	"github.com/golang/glog"
	"github.com/segmentio/parquet-go"
)

type ParquetOtherElements struct {
	ElementId   string   `parquet:"elementId"`
	Type        string   `parquet:"type,enum"`
	RehomedFrom []string `parquet:"rehomedFrom,list"`
}

type ParquetElement struct {
	ElementId string `parquet:"elementId"`
	Type      string `parquet:"type,enum"`
	Label     string `parquet:"label"`
	Status    string `parquet:"status,enum"`
	Element   string `parquet:"element,json"`
}

type ParquetStruct struct {
	TenantId             string                 `parquet:"tenantId"`
	TriggerProviderId    string                 `parquet:"triggerProviderId"`
	TriggerEventId       string                 `parquet:"triggerEventId"`
	TriggerProcessorType string                 `parquet:"triggerProcessorType"`
	EventTime            int64                  `parquet:"eventTime,timestamp(nanosecond)"`
	EventFlagged         bool                   `parquet:"eventFlagged"`
	Nodes                []ParquetElement       `parquet:"nodes,list"`
	Edges                []ParquetElement       `parquet:"edges,list"`
	OtherElements        []ParquetOtherElements `parquet:"otherElements,optional,list"`
}

func main() {
	flag.Set("alsologtostderr", "true")
	flag.Set("v", "3")
	flag.Parse()

	content := new(bytes.Buffer)
	w := parquet.NewGenericWriter[ParquetStruct](content)
	p := []ParquetStruct{
		{
			TenantId:             "tenantIdV",
			TriggerProviderId:    "triggerProviderIdV",
			TriggerEventId:       "triggerEventIdV",
			TriggerProcessorType: "triggerProcessorTypeV",
			EventFlagged:         false,
			EventTime:            time.Now().UnixNano(),
			Nodes:                []ParquetElement{{ElementId: "elementIdV", Type: "typeV", Label: "labelV", Status: "statusV", Element: "{}"}},
		},
	}
	spew.Dump(p)
	if _, err := w.Write(p); err != nil {
		glog.Fatal(err)
	}
	if err := w.Close(); err != nil {
		glog.Fatal(err)
	}

	// write file from content buffer
	f, err := os.Create("myfile.parquet")
	if err != nil {
		glog.Fatal(err)
	}
	defer f.Close()
	if _, err := f.Write(content.Bytes()); err != nil {
		glog.Fatal(err)
	}

	glog.Infoln("reading buffer....")
	file := bytes.NewReader(content.Bytes())
	rows, err := parquet.Read[ParquetStruct](file, file.Size())
	if err != nil {
		glog.Fatal(err)
	}
	spew.Dump(rows)
}

bob  ~/scratch/parquet  go run .                                                                                                                                                                                                                                                                                              
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) <nil>,
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
I0307 16:36:32.428573   62343 main.go:76] reading buffer....
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) {
  },
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
 bob  ~/scratch/parquet  parquet cat myfile.parquet                                                                                                                                                                                                                                                                           
Unknown error
java.lang.RuntimeException: Failed on record 0
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:86)
        at org.apache.parquet.cli.Main.run(Main.java:157)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.parquet.cli.Main.main(Main.java:187)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'elementId' not found
        at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
        at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
        at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
        at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at org.apache.parquet.cli.BaseCommand$1$1.advance(BaseCommand.java:363)
        at org.apache.parquet.cli.BaseCommand$1$1.<init>(BaseCommand.java:344)
        at org.apache.parquet.cli.BaseCommand$1.iterator(BaseCommand.java:342)
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:73)
        ... 3 more
 1  bob  ~/scratch/parquet  parquet schema myfile.parquet                                                                                                                                                                                                                                                                     
{
  "type" : "record",
  "name" : "ParquetStruct",
  "fields" : [ {
    "name" : "tenantId",
    "type" : "string"
  }, {
    "name" : "triggerProviderId",
    "type" : "string"
  }, {
    "name" : "triggerEventId",
    "type" : "string"
  }, {
    "name" : "triggerProcessorType",
    "type" : "string"
  }, {
    "name" : "eventTime",
    "type" : "long"
  }, {
    "name" : "eventFlagged",
    "type" : "boolean"
  }, {
    "name" : "nodes",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "edges",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list2",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element2",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "otherElements",
    "type" : [ "null", {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list3",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element3",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "rehomedFrom",
              "type" : {
                "type" : "array",
                "items" : {
                  "type" : "record",
                  "name" : "list",
                  "namespace" : "list4",
                  "fields" : [ {
                    "name" : "element",
                    "type" : "string"
                  } ]
                }
              }
            } ]
          }
        } ]
      }
    } ],
    "default" : null
  } ]
}
 bob  ~/scratch/parquet                                                                                                                                                                                                                                                                                                         

am I the only one seeing this?