
How to send logs to a Grafana Loki aggregation system, transforming incoming data from Logpush to a format Grafana Loki understands by using Cloudflare Workers.

Logpush allows you to "push logs of Cloudflare's datasets to your cloud service in batches".

Today, we can send logs to any HTTP endpoint, choosing a variety of Log fields or Zone-scoped datasets.

In this post, we will be taking a look at how to send logs to a Grafana Loki aggregation system, transforming incoming data from Logpush to a format Grafana Loki understands by using Cloudflare Workers to do the following:

  • Take incoming gzipped Logpush data, unpack it, transform the data;
  • Merge all fields into the Loki API format and send it off to the destination.

Create a Worker

Create a new Worker with Cloudflare Wrangler:

wrangler generate PROJECT_NAME

Update the wrangler.toml:

type = "webpack"
account_id = "YOUR_ACCOUNT_ID"
workers_dev = true
route = ""
zone_id = ""
usage_model = "unbound"


Depending on your system, you might have to install node and xcode, as well as (optionally) jq:

brew install node
xcode-select --install
brew install jq

Install the pako npm package to decompress incoming files:

npm i pako


Edit the index.js file:

import { inflate } from 'pako'

addEventListener('fetch', event => {

async function transformLogs(obj) {
  let encoding = obj.contentEncoding || undefined
  let payload = obj.payload
  let jobname = obj.job || 'cloudflare_logpush'
  let lokiFormat = {
    streams: [
stream: {
          job: jobname,
        values: [],
], }
let log
  if (encoding === 'gzip') {
    payload = await payload.arrayBuffer()
    let data = inflate(payload)
    let logdata = new Uint16Array(data).reduce(function(data, byte) {
      return data + String.fromCharCode(byte)
    }, '')
    log = logdata.split('\n')
  } else {
    // AFAIK this is just required for the very first time of setting up the logpush job since it's not gzipped data?
    let date = new Date().getTime() * 1000000
    log = await payload.json()
    lokiFormat.streams[0].values.push([date.toString(), JSON.stringify(log)])
    return lokiFormat
  log.forEach(element => {
    let date = element.EdgeStartTimestamp || new Date().getTime() * 1000000
    lokiFormat.streams[0].values.push([date.toString(), element])
  return lokiFormat

async function pushLogs(payload, credentials) {
  // `lokiHost` is an environment variable referencing the loki Server like so:
  // https://logs-prod-us-central1.grafana.net/loki/api/v1/push
  let lokiServer = lokiHost
  let req = await fetch(lokiServer, {
    body: JSON.stringify(payload),
    method: 'POST',
    headers: {
      Authorization: credentials,
      'Content-Type': 'application/json',
return req }
async function handleRequest(request) {
  const { searchParams } = new URL(request.url)
  let job = searchParams.get('job')
  const authHeader = request.headers.get('authorization')
  const contentEncoding = request.headers.get('content-encoding')
  if (request.method !== 'POST') {
    return new Response(
        { success: false, message: 'please authenticate and use POST requests' },
        { headers: { 'content-type': 'application/json' } },
), )
  if (!authHeader) {
    return new Response(
        { success: false, message: 'please authenticate' },
        { headers: { 'content-type': 'application/json' } },
), )
  let output = await transformLogs({ payload: await request, contentEncoding, job })
  // TODO: some error checking might be good
  await pushLogs(output, authHeader)
  return new Response(JSON.stringify({ success: true }), {
    headers: { 'content-type': 'application/json' },

Grafana Loki Endpoint

Create an environment variable with the URL – such as for example https://logs-prod-eu-west-0.grafana.net/loki/api/v1/push – to your Loki Endpoint:

wrangler secret put lokiHost

Upload the Worker:

wrangler publish

Make sure you are getting a "✨ Success!" message from the wrangler commands!

Log Retention

Check if Log Retention is enabled:

curl -s -H "X-Auth-Email:<YOUR_EMAIL>" -H "X-Auth-Key:<GLOBAL_API_KEY>" GET "https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logs/control/retention/flag" | jq .

Enable Log Retention:

curl -s -H "X-Auth-Email:<YOUR_EMAIL>" -H "X-Auth-Key:<GLOBAL_API_KEY>" POST "https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logs/control/retention/flag" -d'{"flag":true}' | jq .


curl -sv \
    -H 'X-Auth-Email:<YOUR_EMAIL>' \
    -H 'X-Auth-Key:<GLOBAL_API_KEY>' \
    "https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logs/received?start=2021-08-02T10:00:00Z&end=2021-08-02T10:01:00Z&fields=RayID,EdgeStartTimestamp" | jq .

Logpush Job

Make sure to change the following details:

The last two parameters can be found/edited on your Grafana Dashboard > Configuration > Data Sources > Your Loki > Settings.

cURL request to create a Logpush Job with your choice of Log fields:

curl --location --request POST 'https://api.cloudflare.com/client/v4/zones/<ZONE_ID>/logpush/jobs' \
--header 'X-Auth-Email:<YOUR_EMAIL>' \
--header 'X-Auth-Key:<GLOBAL_API_KEY>' \
--header 'Content-Type:application/json' \
--data-raw '{
    "name": "http",
    "logpull_options": "fields=BotScore,BotScoreSrc,CacheCacheStatus,CacheResponseBytes,CacheResponseStatus,CacheTieredFill,ClientASN,ClientCountry,ClientDeviceType,ClientIP,ClientIPClass,ClientRequestBytes,ClientRequestHost,ClientRequestMethod,ClientRequestPath,ClientRequestProtocol,ClientRequestReferer,ClientRequestURI,ClientRequestUserAgent,ClientSSLCipher,ClientSSLProtocol,ClientSrcPort,ClientXRequestedWith,EdgeColoCode,EdgeColoID,EdgeEndTimestamp,EdgePathingOp,EdgePathingSrc,EdgePathingStatus,EdgeRateLimitAction,EdgeRateLimitID,EdgeRequestHost,EdgeResponseBytes,EdgeResponseCompressionRatio,EdgeResponseContentType,EdgeResponseStatus,EdgeServerIP,EdgeStartTimestamp,FirewallMatchesActions,FirewallMatchesRuleIDs,FirewallMatchesSources,OriginIP,OriginResponseBytes,OriginResponseHTTPExpires,OriginResponseHTTPLastModified,OriginResponseStatus,OriginResponseTime,OriginSSLProtocol,ParentRayID,RayID,SecurityLevel,WAFAction,WAFFlags,WAFMatchedVar,WAFProfile,WAFRuleID,WAFRuleMessage,WorkerCPUTime,WorkerStatus,WorkerSubrequest,WorkerSubrequestCount,ZoneID&timestamps=unixnano",
    "destination_conf": "<YOUR_WORKERS_URL>?header_Authorization=Basic%20<YOUR_LOKI_USER_AND_PASSWORD_IN_BASE64>=&job=<YOUR_LOKI_JOB_NAME>",
    "max_upload_bytes": 5000000,
    "max_upload_records": 1000,
    "dataset": "http_requests",
    "frequency": "high",
    "enabled": true
}' | jq .

Cloudflare Dashboard

If everything worked, a new Logpush Job should appear on your Cloudflare Dashboard > Analytics > Logs tab, as well as in Grafana Loki.

Analytics Integrations

Additionally, there is a variety of Analytics Integrations to use.


This guide was inspired by a colleague – all crediting goes to him. 🤓

Educational purposes only, and this blog post does not necessarily reflect the opinions of Cloudflare. There are many more aspects to Cloudflare and its products and services – this is merely a brief educational intro. Properly inform yourself, keep learning, keep testing, and feel free to share your learnings and experiences as I do. Hope it was helpful! Images are online and publicly accessible.