/pinot-client-go

Apache Pinot Golang Client managed by StarTree

Primary LanguageGoApache License 2.0Apache-2.0

Pinot Client GO

GoDoc Build Status Coverage Status

image

Applications can use this golang client library to query Apache Pinot.

Examples

Local Pinot test

Please follow this Pinot Quickstart link to install and start Pinot batch quickstart locally.

bin/quick-start-batch.sh

Check out Client library Github Repo

git clone git@github.com:startreedata/pinot-client-go.git
cd pinot-client-go

Build and run the example application to query from Pinot Batch Quickstart

go build ./examples/batch-quickstart
./batch-quickstart

Pinot Json Index QuickStart

Please follow this Pinot Quickstart link to install and start Pinot json batch quickstart locally.

bin/quick-start-json-index-batch.sh

Check out Client library Github Repo

git clone git@github.com:startreedata/pinot-client-go.git
cd pinot-client-go

Build and run the example application to query from Pinot Json Batch Quickstart

go build ./examples/json-batch-quickstart
./json-batch-quickstart

Pinot Live Demo cluster

Build and run the example application to query from Pinot Batch Quickstart

go build ./examples/pinot-live-demo
./pinot-live-demo

Usage

Create a Pinot Connection

Pinot client could be initialized through:

  1. Zookeeper Path.
pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster")
  1. Controller address.
pinotClient, err := pinot.NewFromController("localhost:9000")

When the controller-based broker selector is used, the client will periodically fetch the table-to-broker mapping from the controller API. When using http scheme, the http:// controller address prefix is optional.

  1. A list of broker addresses.
  • For HTTP Default scheme is HTTP if not specified.
pinotClient, err := pinot.NewFromBrokerList([]string{"localhost:8000"})
  • For HTTPS Scheme is required to be part of the URI.
pinotClient, err := pinot.NewFromBrokerList([]string{"https://pinot-broker.pinot.live"})
  1. ClientConfig

Via Zookeeper path:

pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{
	ZkConfig: &pinot.ZookeeperConfig{
		ZookeeperPath:     zkPath,
		PathPrefix:        strings.Join([]string{zkPathPrefix, pinotCluster}, "/"),
		SessionTimeoutSec: defaultZkSessionTimeoutSec,
	},
	// additional header added to Broker Query API requests
    ExtraHTTPHeader: map[string]string{
        "extra-header":"value",
    },
})

Via controller address:

pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{
	ControllerConfig: &pinot.ControllerConfig{
		ControllerAddress: "localhost:9000",
		// Frequency of broker data refresh in milliseconds via controller API - defaults to 1000ms
		UpdateFreqMs: 500,
		// Additional HTTP headers to include in the controller API request
		ExtraControllerAPIHeaders: map[string]string{
			"header": "val",
		},
	},
	// additional header added to Broker Query API requests
	ExtraHTTPHeader: map[string]string{
		"extra-header": "value",
	},
})

Add HTTP timeout for Pinot Queries

By Default this client uses golang's default http timeout, which is "No TImeout". If you want pinot queries to timeout within given time, add HTTPTimeout in ClientConfig

pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{
	ZkConfig: &pinot.ZookeeperConfig{
		ZookeeperPath:     zkPath,
		PathPrefix:        strings.Join([]string{zkPathPrefix, pinotCluster}, "/"),
		SessionTimeoutSec: defaultZkSessionTimeoutSec,
	},
	// additional header added to Broker Query API requests
    ExtraHTTPHeader: map[string]string{
        "extra-header":"value",
    },
	// optional HTTP timeout parameter for Pinot Queries.
	HTTPTimeout: 300 * time.Millisecond,
})

Query Pinot

Please see this example for your reference.

Code snippet:

pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster")
if err != nil {
    log.Error(err)
}
brokerResp, err := pinotClient.ExecuteSQL("baseballStats", "select count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10")
if err != nil {
    log.Error(err)
}
log.Infof("Query Stats: response time - %d ms, scanned docs - %d, total docs - %d", brokerResp.TimeUsedMs, brokerResp.NumDocsScanned, brokerResp.TotalDocs)

Response Format

Query Response is defined as the struct of following:

type BrokerResponse struct {
	AggregationResults          []*AggregationResult `json:"aggregationResults,omitempty"`
	SelectionResults            *SelectionResults    `json:"SelectionResults,omitempty"`
	ResultTable                 *ResultTable         `json:"resultTable,omitempty"`
	Exceptions                  []Exception          `json:"exceptions"`
	TraceInfo                   map[string]string    `json:"traceInfo,omitempty"`
	NumServersQueried           int                  `json:"numServersQueried"`
	NumServersResponded         int                  `json:"numServersResponded"`
	NumSegmentsQueried          int                  `json:"numSegmentsQueried"`
	NumSegmentsProcessed        int                  `json:"numSegmentsProcessed"`
	NumSegmentsMatched          int                  `json:"numSegmentsMatched"`
	NumConsumingSegmentsQueried int                  `json:"numConsumingSegmentsQueried"`
	NumDocsScanned              int64                `json:"numDocsScanned"`
	NumEntriesScannedInFilter   int64                `json:"numEntriesScannedInFilter"`
	NumEntriesScannedPostFilter int64                `json:"numEntriesScannedPostFilter"`
	NumGroupsLimitReached       bool                 `json:"numGroupsLimitReached"`
	TotalDocs                   int64                `json:"totalDocs"`
	TimeUsedMs                  int                  `json:"timeUsedMs"`
	MinConsumingFreshnessTimeMs int64                `json:"minConsumingFreshnessTimeMs"`
}

Note that AggregationResults and SelectionResults are holders for PQL queries.

Meanwhile ResultTable is the holder for SQL queries. ResultTable is defined as:

// ResultTable is a ResultTable
type ResultTable struct {
	DataSchema RespSchema      `json:"dataSchema"`
	Rows       [][]interface{} `json:"rows"`
}

RespSchema is defined as:

// RespSchema is response schema
type RespSchema struct {
	ColumnDataTypes []string `json:"columnDataTypes"`
	ColumnNames     []string `json:"columnNames"`
}

There are multiple functions defined for ResultTable, like:

func (r ResultTable) GetRowCount() int
func (r ResultTable) GetColumnCount() int
func (r ResultTable) GetColumnName(columnIndex int) string
func (r ResultTable) GetColumnDataType(columnIndex int) string
func (r ResultTable) Get(rowIndex int, columnIndex int) interface{}
func (r ResultTable) GetString(rowIndex int, columnIndex int) string
func (r ResultTable) GetInt(rowIndex int, columnIndex int) int
func (r ResultTable) GetLong(rowIndex int, columnIndex int) int64
func (r ResultTable) GetFloat(rowIndex int, columnIndex int) float32
func (r ResultTable) GetDouble(rowIndex int, columnIndex int) float64

Sample Usage is here