/logpush-grafana

How to send logs to a Grafana Loki aggregation system, transforming incoming data from Logpush to a format Grafana Loki understands by using Cloudflare Workers.

Primary LanguageJavaScriptApache License 2.0Apache-2.0

Logpush

Logpush allows you to "push logs of Cloudflare's datasets to your cloud service in batches".

Today, we can send logs to any HTTP endpoint, choosing a variety of Log fields or Zone-scoped datasets.

In this post, we will be taking a look at how to send logs to a Grafana Loki aggregation system, transforming incoming data from Logpush to a format Grafana Loki understands by using Cloudflare Workers to do the following:

  • Take incoming gzipped Logpush data, unpack it, transform the data;
  • Merge all fields into the Loki API format and send it off to the destination.


Create a Worker

Create a new Worker with Cloudflare Wrangler:

wrangler generate PROJECT_NAME

Update the wrangler.toml:

name = "PROJECT_NAME"
type = "webpack"
account_id = "YOUR_ACCOUNT_ID"
workers_dev = true
route = ""
zone_id = ""
usage_model = "unbound"

Dependencies

Depending on your system, you might have to install node and xcode, as well as (optionally) jq:

brew install node
xcode-select --install
brew install jq

Install the pako npm package to decompress incoming files:

npm i pako

Index.js

Edit the index.js file:

import { inflate } from 'pako'

addEventListener('fetch', event => {
  event.respondWith(handleRequest(event.request))
})

async function transformLogs(obj) {
  let encoding = obj.contentEncoding || undefined
  let payload = obj.payload
  let jobname = obj.job || 'cloudflare_logpush'
  let lokiFormat = {
    streams: [
{
stream: {
          job: jobname,
        },
        values: [],
      },
], }
let log
  if (encoding === 'gzip') {
    payload = await payload.arrayBuffer()
    let data = inflate(payload)
    let logdata = new Uint16Array(data).reduce(function(data, byte) {
      return data + String.fromCharCode(byte)
    }, '')
    log = logdata.split('\n')
  } else {
    // AFAIK this is just required for the very first time of setting up the logpush job since it's not gzipped data?
    let date = new Date().getTime() * 1000000
    log = await payload.json()
    lokiFormat.streams[0].values.push([date.toString(), JSON.stringify(log)])
    return lokiFormat
}
  log.forEach(element => {
    let date = element.EdgeStartTimestamp || new Date().getTime() * 1000000
    lokiFormat.streams[0].values.push([date.toString(), element])
})
  return lokiFormat
}

async function pushLogs(payload, credentials) {
  // `lokiHost` is an environment variable referencing the loki Server like so:
  // https://logs-prod-us-central1.grafana.net/loki/api/v1/push
  let lokiServer = lokiHost
  let req = await fetch(lokiServer, {
    body: JSON.stringify(payload),
    method: 'POST',
    headers: {
      Authorization: credentials,
      'Content-Type': 'application/json',
    },
})
return req }
async function handleRequest(request) {
  const { searchParams } = new URL(request.url)
  let job = searchParams.get('job')
  const authHeader = request.headers.get('authorization')
  const contentEncoding = request.headers.get('content-encoding')
  if (request.method !== 'POST') {
    return new Response(
      JSON.stringify(
        { success: false, message: 'please authenticate and use POST requests' },
        { headers: { 'content-type': 'application/json' } },
), )
}
  if (!authHeader) {
    return new Response(
      JSON.stringify(
        { success: false, message: 'please authenticate' },
        { headers: { 'content-type': 'application/json' } },
), )
  }
  let output = await transformLogs({ payload: await request, contentEncoding, job })
  // TODO: some error checking might be good
  await pushLogs(output, authHeader)
  return new Response(JSON.stringify({ success: true }), {
    headers: { 'content-type': 'application/json' },
  })
}

Grafana Loki Endpoint

Create an environment variable with the URL – such as for example https://logs-prod-eu-west-0.grafana.net/loki/api/v1/push – to your Loki Endpoint:

wrangler secret put lokiHost

Upload the Worker:

wrangler publish

Make sure you are getting a "✨ Success!" message from the wrangler commands!

Log Retention

Check if Log Retention is enabled:

curl -s -H "X-Auth-Email:<YOUR_EMAIL>" -H "X-Auth-Key:<GLOBAL_API_KEY>" GET "https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logs/control/retention/flag" | jq .

Enable Log Retention:

curl -s -H "X-Auth-Email:<YOUR_EMAIL>" -H "X-Auth-Key:<GLOBAL_API_KEY>" POST "https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logs/control/retention/flag" -d'{"flag":true}' | jq .

Test:

curl -sv \
    -H 'X-Auth-Email:<YOUR_EMAIL>' \
    -H 'X-Auth-Key:<GLOBAL_API_KEY>' \
    "https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logs/received?start=2021-08-02T10:00:00Z&end=2021-08-02T10:01:00Z&fields=RayID,EdgeStartTimestamp" | jq .

Logpush Job

Make sure to change the following details:

The last two parameters can be found/edited on your Grafana Dashboard > Configuration > Data Sources > Your Loki > Settings.

cURL request to create a Logpush Job with your choice of Log fields:

curl --location --request POST 'https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logpush/jobs' \
--header 'X-Auth-Email:<YOUR_EMAIL>' \
--header 'X-Auth-Key:<GLOBAL_API_KEY>' \
--header 'Content-Type:application/json' \
--data-raw '{
    "name": "http",
    "logpull_options": "fields=BotScore,BotScoreSrc,CacheCacheStatus,CacheResponseBytes,CacheResponseStatus,CacheTieredFill,ClientASN,ClientCountry,ClientDeviceType,ClientIP,ClientIPClass,ClientRequestBytes,ClientRequestHost,ClientRequestMethod,ClientRequestPath,ClientRequestProtocol,ClientRequestReferer,ClientRequestURI,ClientRequestUserAgent,ClientSSLCipher,ClientSSLProtocol,ClientSrcPort,ClientXRequestedWith,EdgeColoCode,EdgeColoID,EdgeEndTimestamp,EdgePathingOp,EdgePathingSrc,EdgePathingStatus,EdgeRateLimitAction,EdgeRateLimitID,EdgeRequestHost,EdgeResponseBytes,EdgeResponseCompressionRatio,EdgeResponseContentType,EdgeResponseStatus,EdgeServerIP,EdgeStartTimestamp,FirewallMatchesActions,FirewallMatchesRuleIDs,FirewallMatchesSources,OriginIP,OriginResponseBytes,OriginResponseHTTPExpires,OriginResponseHTTPLastModified,OriginResponseStatus,OriginResponseTime,OriginSSLProtocol,ParentRayID,RayID,SecurityLevel,WAFAction,WAFFlags,WAFMatchedVar,WAFProfile,WAFRuleID,WAFRuleMessage,WorkerCPUTime,WorkerStatus,WorkerSubrequest,WorkerSubrequestCount,ZoneID&timestamps=unixnano",
    "destination_conf": "<YOUR_WORKERS_URL>?header_Authorization=Basic%20<YOUR_LOKI_USER_AND_PASSWORD_IN_BASE64>=&job=<YOUR_LOKI_JOB_NAME>",
    "max_upload_bytes": 5000000,
    "max_upload_records": 1000,
    "dataset": "http_requests",
    "frequency": "high",
    "enabled": true
}' | jq .

Cloudflare Dashboard

If everything worked, a new Logpush Job should appear on your Cloudflare Dashboard > Analytics > Logs tab, as well as in Grafana Loki.


Analytics Integrations

Additionally, there is a variety of Analytics Integrations to use.


Disclaimer

This guide was inspired by a colleague – all crediting goes to him. 🤓

Educational purposes only, and this blog post does not necessarily reflect the opinions of Cloudflare. There are many more aspects to Cloudflare and its products and services – this is merely a brief educational intro. Properly inform yourself, keep learning, keep testing, and feel free to share your learnings and experiences as I do. Hope it was helpful! Images are online and publicly accessible.