Jetstream is a streaming service that consumes an ATProto com.atproto.sync.subscribeRepos stream and converts it into lightweight, friendly JSON.
Jetstream converts the CBOR-encoded MST blocks produced by the ATProto firehose and translates them into JSON objects that are easier to interface with using standard tooling available in programming languages.
To run Jetstream, make sure you have docker and docker compose installed and run make up in the repo root.
This will pull the latest built image from GHCR and start a Jetstream instance at http://localhost:6008
- To build Jetstream from source via Docker and start it up, run
make rebuild
Once started, you can connect to the event stream at: ws://localhost:6008/subscribe
Prometheus metrics are exposed at http://localhost:6008/metrics
A Grafana Dashboard for Jetstream is available at ./grafana-dashboard.json and should be easy to copy/paste into Grafana's dashboard import prompt.
- This dashboard has a few device-specific graphs for disk and network usage that require NodeExporter and may need to be tuned to your setup.
To consume Jetstream you can use any websocket client
Connect to ws://localhost:6008/subscribe to start the stream
- A publicly available instance of Jetstream is available at
wss://jetstream.atproto.tools/subscribe
The following Query Parameters are supported:
wantedCollections- An array of Collection NSIDs to filter which records you receive on your stream (default empty = all collections)wantedCollectionssupports NSID path prefixes i.e.app.bsky.graph.*, orapp.bsky.*. The prefix before the.*must pass NSID validation and Jetstream does not support incomplete prefixes i.e.app.bsky.graph.fo*.- Regardless of desired collections, all subscribers recieve Account and Identity events.
- You can specify at most 100 wanted collections/prefixes.
wantedDids- An array of Repo DIDs to filter which records you receive on your stream (Default empty = all repos)- You can specify at most 10,000 wanted DIDs.
cursor- A unix microseconds timestamp cursor to begin playback from- An absent cursor or a cursor from the future will result in live-tail operation
- When reconnecting, use the
time_usfrom your most recently processed event and maybe provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback
Jetstream supports zstd-based compression of messages. Jetstream uses a custom dictionary for compression that can be found in pkg/models/zstd_dictionary and is required to decode compressed messages from the server.
zstd compressed Jetstream messages are ~56% smaller on average than the raw JSON version of the Jetstream firehose.
The provided client library uses compression by default, using an embedded copy of the Dictionary from the models package.
To request a compressed stream, pass the Socket-Encoding: zstd header through when initiating the websocket.
A simple example that hits the public instance looks like:
$ websocat wss://jetstream.atproto.tools/subscribe\?wantedCollections=app.bsky.feed.postA maximal example using all parameters looks like:
$ websocat "ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow&wantedDids=did:plc:q6gjnaw2blty4crticxkmujt&cursor=1725519626134432"Jetstream events have 3 types (so far):
com: a Commit to a repo which involves either a create, update, or delete of a recordid: an Identity update for a DID which indicates that you may want to purge an identity cache and revalidate the DID doc and handleacc: an Account event that indicates a change in account status i.e. fromactivetodeactivated, or totakendownif the PDS has taken down the repo.
Jetstream Commits have 3 types:
c: Create a new record with the contents providedu: Update an existing record and replace it with the contents providedd: Delete an existing record with the DID, Collection, and RKey provided
{
"did": "did:plc:eygmaihciaxprqvxpfvl6flk",
"time_us": 1725911162329308,
"type": "com",
"commit": {
"rev": "3l3qo2vutsw2b",
"type": "c",
"collection": "app.bsky.feed.like",
"rkey": "3l3qo2vuowo2b",
"record": {
"$type": "app.bsky.feed.like",
"createdAt": "2024-09-09T19:46:02.102Z",
"subject": {
"cid": "bafyreidc6sydkkbchcyg62v77wbhzvb2mvytlmsychqgwf2xojjtirmzj4",
"uri": "at://did:plc:wa7b35aakoll7hugkrjtf3xf/app.bsky.feed.post/3l3pte3p2e325"
}
},
"cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
}
}{
"did": "did:plc:rfov6bpyztcnedeyyzgfq42k",
"time_us": 1725516666833633,
"type": "com",
"commit": {
"rev": "3l3f6nzl3cv2s",
"type": "d",
"collection": "app.bsky.graph.follow",
"rkey": "3l3dn7tku762u"
}
}{
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"time_us": 1725516665234703,
"type": "id",
"identity": {
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"handle": "yohenrique.bsky.social",
"seq": 1409752997,
"time": "2024-09-05T06:11:04.870Z"
}
}{
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"time_us": 1725516665333808,
"type": "acc",
"account": {
"active": true,
"did": "did:plc:ufbl4k27gp6kzas5glhz7fim",
"seq": 1409753013,
"time": "2024-09-05T06:11:04.870Z"
}
}