binrun is a self-contained Go application that demonstrates advanced NATS JetStream patterns using an in-process, ephemeral NATS server. It supports two modes:
- Business Logic Mode: Runs a dynamic, auto-reconfiguring event system. Streams, key-value buckets, and consumers are created on startup. Session state is managed via JetStream KV, and the system adapts in real time to CLI-driven changes (e.g., new sessions, subjects, or commands).
- Simulation Mode: Runs a deterministic, side-effect-free scenario using a fresh, in-memory JetStream server. All state is discarded on exit, ensuring repeatable, isolated test runs.
The architecture is modular:
cmd/binrun/– main entry pointinternal/– core business logic, simulation, config, and embedded server helpersTaskfile.yml– hot-reload and build tasksdocs/– design notes, CLI/Go API reference, and how-tos
binrun is ideal for:
- Prototyping and learning JetStream (streams, KV, mirroring, transforms)
- Testing dynamic subscription and work-queue patterns
- Running repeatable, isolated simulations
- Interacting with a live JetStream system via the NATS CLI
cmd/
└─ binrun/ # main entry-point (see below)
internal/
├─ core.go # business logic + Sim implementation
├─ embedded_server.go # helper to start an embedded NATS server
├─ config.go # default configs + loader
└─ types.go # AppConfig, FlagsConfig, SimConfig, …
docs/ # design notes, scratch pads, how-tos
Taskfile.yml # build / hot-reload / utility tasks
// internal/types.go
// ... existing code ...Defaults are loaded from environment variables:
| ENV | Purpose | Default |
|---|---|---|
SIM |
true → run the deterministic simulation instead of business logic |
false |
All other settings (store dir, stream counts, etc.) are hard-coded defaults for now; extend LoadAppConfig() as needed.
| Task | What it does |
|---|---|
task -w hot |
Watches *.go/*.templ, rebuilds bin/binrun, kills the old process, runs the new one ( business logic mode ) |
task (default) |
Alias for task -w hot |
task tools |
Installs Task, templ, goimports, NATS CLI, and NATS server binaries |
task fmt, tidy, templ, build, kill |
Usual helpers |
To run the simulation once (no watch), set the env var:
SIM=true task build # or: SIM=true bin/binrun# 1. Install Task and Go 1.22+
go install github.com/go-task/task/v3/cmd/task@latest
# 2. Start hot-reload dev loop
task # or: task -w hot
# -> A NATS server starts in-process on each change.
# Streams: COMMAND, EVENT, COMMANDX_MIRROR
# KV bucket: sessions
# 3. Interact with it (in a separate shell)
nats pub event.orders.created '{"order_id":123,"status":"NEW"}'
nats kv put sessions Alice '{"subscriptions":["event.orders.created"]}'SIM=true bin/binrun
# or:
SIM=true task buildSimulation output looks like:
Sim: starting scenario with {NumSessions:10 ...}
Sim: environment reset complete
Sim: created 10 sessions
Sim: published 5 events per subject to 3 subjects
Sim: published 5 command.x messages
Sim: churned 2 sessions
Sim: mirror stream has correct message count: 5
Sim: session existence checks complete
Sim: scenario PASSED
Every run is isolated—streams and buckets live only in memory and are discarded when the program exits.
- More flags – add fields to
FlagsConfig, read fromos.GetenvindefaultFlagsCfg(). - Scenario files – parse YAML/JSON into
SimConfigand callcore.Sim(...). - Additional streams – edit
core.Run(prod logic) andcore.Sim(scenario) as desired. - Persistent store – set
EmbeddedServerConfig.StoreDirandEnableLogging. - CLI / HTTP metrics – enable the NATS monitoring port in
embedded_server.go.
- Create:
nats stream add <StreamName> --subjects "<subject.pattern>" [--storage file|memory] - List:
nats stream lsornats stream list -a - Info:
nats stream info <StreamName> - View messages:
nats stream view <StreamName>ornats stream get <StreamName> --seq <N> - Remove/Purge:
nats stream rm <StreamName>,nats stream purge <StreamName>
- Add:
nats consumer add <Stream> <ConsumerName> [--filter <subject>] [--ack explicit|none|all] [--pull] - List:
nats consumer ls <Stream> - Info:
nats consumer info <Stream> <ConsumerName> - Next message:
nats consumer next <Stream> <ConsumerName> [--batch <N>] - Delete:
nats consumer rm <Stream> <ConsumerName>
- Create bucket:
nats kv add <BucketName> [--history <N>] [--ttl <duration>] - List buckets:
nats kv ls - Put/Get:
nats kv put <Bucket> <Key> <Value>,nats kv get <Bucket> <Key> - Watch:
nats kv watch <Bucket> [<KeyPattern>] - History:
nats kv history <Bucket> <Key> - Delete:
nats kv del <Bucket> <Key>,nats kv rm <Bucket>
- Create bucket:
nats object add <BucketName> - List buckets/objects:
nats object ls,nats object ls <Bucket> - Put/Get:
nats object put <Bucket> <filePath>,nats object get <Bucket> <ObjectName> [--output <filePath>] - Delete:
nats object rm <Bucket> <ObjectName> - Info:
nats object info <Bucket> <ObjectName> - Watch:
nats object watch <Bucket>
- Account info:
nats account info - Stream/Consumer health:
nats server check stream,nats server check consumer - Server stats:
nats server info,nats server list,nats server report - Publish/Subscribe:
nats pub <subject> <message>,nats sub <subject>,nats sub --js <subject>
import (
"context"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
ctx := context.Background()
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := jetstream.New(nc)// Create
s, _ := js.CreateStream(ctx, jetstream.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
// Update
s, _ = js.UpdateStream(ctx, jetstream.StreamConfig{Name: "ORDERS", Subjects: []string{"ORDERS.*"}})
// Get handle
s, _ = js.Stream(ctx, "ORDERS")
// Delete
js.DeleteStream(ctx, "ORDERS")// Create durable
cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{Durable: "foo", AckPolicy: jetstream.AckExplicitPolicy})
// Create ephemeral
cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
// Update
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
// Get handle
cons, _ = js.Consumer(ctx, "ORDERS", "foo")
// Delete
js.DeleteConsumer(ctx, "ORDERS", "foo")// Fetch batch
msgs, _ := cons.Fetch(10)
for msg := range msgs.Messages() { msg.Ack() }
// Callback
consCtx, _ := cons.Consume(func(msg jetstream.Msg) { msg.Ack() })
defer consCtx.Stop()
// Iterator
it, _ := cons.Messages()
for i := 0; i < 10; i++ { msg, _ := it.Next(); msg.Ack() }
it.Stop()kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})
kv.Put(ctx, "sue.color", []byte("blue"))
entry, _ := kv.Get(ctx, "sue.color")
kv.Delete(ctx, "sue.color")
js.DeleteKeyValue(ctx, "profiles")
// Watch
watcher, _ := kv.Watch(ctx, "sue.*")
defer watcher.Stop()
entry = <-watcher.Updates()os, _ := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "configs"})
os.PutString(ctx, "config-1", "first config")
os.Get(ctx, "config-1")
os.Delete(ctx, "config-1")
js.DeleteObjectStore(ctx, "configs")
// Watch
watcher, _ := os.Watch(ctx)
defer watcher.Stop()
object := <-watcher.Updates()ack, _ := js.Publish(ctx, "ORDERS.new", []byte("hello"))
ackF, _ := js.PublishAsync("ORDERS.new", []byte("hello"))For more, see NATS JetStream Go API Reference and NATS CLI Reference.