Prometheus stats are incompatible with the prometheus sdk
Opened this issue · 2 comments
Hi!
We are encountering a problem with integrating segmentio stats for our kafka consumers into the rest of our prometheus metrics. The issue arises due to the stats lib from segmentio not using the sdk that prometheus provides and instead rolls it's own collector and publisher. This means that the two libraries are fundamentally incompatible when it comes to serving them under the same '/metrics' path.
Is there a way that someone has worked around this? If not, could some form of adapter be added to the lib?
I don't want to be the guy that asks for a large scale rewrite, but it would be nice to see this awesome lib use the sdk provided by prometheus for golang.
Hey @jdeal-mediamath, I think it's unlikely for us to use the prometheus library in this module for various reasons. However, we don't register the metrics URL automatically so you should be able to host the metrics exporter at a separate path from the Prometheus package. It would mean that you would have to scrape 2 endpoints from the same service, but I don't really see a way around that.
What are you looking to integrate with? If it's an internal package, why not use segmentio/stats
? If it's a third party library, perhaps you could suggest they add an interface for exporting stats versus using prometheus directly?
here is the wip i'm currently working on in my consumer app project :
usage :
var defaultKafkaCollector = newKafkaCollector(namespace)
func ObserveKafkaReader(reader *kafka.Reader) {
defaultKafkaCollector.InstrumentReader(reader)
}
"glue code"
package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/kafka-go"
)
var labelNames = []string{"topic", "partition"}
type counterAdder struct {
counter *prometheus.CounterVec
getter func(*kafka.ReaderStats) float64
}
func (ca *counterAdder) Add(r *kafka.ReaderStats) {
ca.counter.WithLabelValues(r.Topic, r.Partition).Add(ca.getter(r))
}
type gaugeSetter struct {
gauge *prometheus.GaugeVec
getter func(*kafka.ReaderStats) float64
}
func (gs *gaugeSetter) Set(r *kafka.ReaderStats) {
gs.gauge.WithLabelValues(r.Topic, r.Partition).Set(gs.getter(r))
}
type kafkaCollector struct {
readers []*kafka.Reader
readersLock sync.RWMutex
counters []*counterAdder
gauges []*gaugeSetter
}
func newKafkaCollector(namespace string) *kafkaCollector {
collector := new(kafkaCollector)
cFac := func(name string, f func(*kafka.ReaderStats) float64) {
collector.counters = append(collector.counters, &counterAdder{
counter: prometheus.NewCounterVec(prometheus.CounterOpts{Namespace: namespace, Subsystem: "kafka", Name: name}, labelNames),
getter: f,
})
}
gFac := func(name string, f func(*kafka.ReaderStats) float64) {
collector.gauges = append(collector.gauges, &gaugeSetter{
gauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{Namespace: namespace, Subsystem: "kafka", Name: name}, labelNames),
getter: f,
})
}
cFac("dials_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Dials) })
cFac("fetches_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Fetches) })
cFac("messages_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Messages) })
cFac("bytes_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Bytes) })
cFac("rebalances_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Rebalances) })
cFac("timeouts_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Timeouts) })
cFac("errors_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Errors) })
gFac("offset", func(rs *kafka.ReaderStats) float64 { return float64(rs.Offset) })
gFac("lag", func(rs *kafka.ReaderStats) float64 { return float64(rs.Lag) })
gFac("queue_length", func(rs *kafka.ReaderStats) float64 { return float64(rs.QueueLength) })
gFac("queue_capacity", func(rs *kafka.ReaderStats) float64 { return float64(rs.QueueCapacity) })
prometheus.MustRegister(collector)
return collector
}
func (c *kafkaCollector) Collect(m chan<- prometheus.Metric) {
c.readersLock.RLock()
defer c.readersLock.RUnlock()
for _, reader := range c.readers {
stats := reader.Stats()
for _, counter := range c.counters {
counter.Add(&stats)
}
for _, gauge := range c.gauges {
gauge.Set(&stats)
}
}
for _, counter := range c.counters {
counter.counter.Collect(m)
}
for _, gauge := range c.gauges {
gauge.gauge.Collect(m)
}
}
func (c *kafkaCollector) Describe(d chan<- *prometheus.Desc) {
for _, counter := range c.counters {
counter.counter.Describe(d)
}
for _, gauge := range c.gauges {
gauge.gauge.Describe(d)
}
}
func (c *kafkaCollector) InstrumentReader(r *kafka.Reader) {
c.readersLock.Lock()
defer c.readersLock.Unlock()
c.readers = append(c.readers, r)
}