A lightweight Go package for distributed command execution and coordination across services
Note
This version is written by Claude after explaining the requirements and providing some code that I used somewhere else. I haven't tested it extensively, please use with caution. I will update once I get the chance to test it thoroughly.
cnc-go lets you:
-
β‘ Register commands with custom handlers
-
π‘ Trigger commands locally or across a cluster
-
π Propagate events using pluggable Pub/Sub backends (Redis/Valkey included, extendable to NATS, Kafka, etc.)
-
ποΈ Ensure synchronized actions across pods/nodes
-
π οΈ Gracefully start, run, and shutdown CNC workers
In distributed systems, some actions β like invalidating caches, reloading configs, or resetting feature flags β must be executed everywhere.
Instead of building ad-hoc RPCs or reinventing messaging, cnc-go provides a simple abstraction: publish a command once and have every node execute it reliably.
-
π Pluggable Pub/Sub transport (Redis included, extendable to NATS/Kafka/etc.)
-
π§© Simple API for registering and triggering commands
-
π‘οΈ Typed errors for safe error handling (
ErrHandlerNotFound,ErrHandlerAlreadyExists, β¦) -
π¦ Clean, idiomatic Go design (no external dependencies beyond the transport driver)
-
β Unit tested & extensible
go get github.com/TheAlpha16/cnc-gopackage main
import (
"context"
"fmt"
"log"
"time"
"github.com/TheAlpha16/cnc-go"
"github.com/valkey-io/valkey-go"
)
func main() {
// Create CNC instance with Valkey transport
cncInstance, err := cnc.NewCNCWithValkeyAddress("localhost:6379", "my-commands", []valkey.ClientOption{})
if err != nil {
log.Fatalf("Failed to create CNC: %v", err)
}
defer cncInstance.Shutdown()
// Register a simple command handler
err = cncInstance.RegisterHandler("hello", func(ctx context.Context, cmd cnc.Command) error {
name := "World"
if n, ok := cmd.Parameters["name"].(string); ok {
name = n
}
fmt.Printf("Hello, %s!\n", name)
return nil
})
if err != nil {
log.Fatalf("Failed to register handler: %v", err)
}
// Start the CNC instance
ctx := context.Background()
if err := cncInstance.Start(ctx); err != nil {
log.Fatalf("Failed to start CNC: %v", err)
}
fmt.Println("CNC started! Transport is now listening for commands...")
// Trigger a command after a short delay
time.Sleep(1 * time.Second)
command := cnc.Command{
Name: "hello",
Parameters: map[string]any{
"name": "CNC User",
},
}
if err := cncInstance.TriggerCommand(ctx, command); err != nil {
log.Printf("Failed to trigger command: %v", err)
}
// Keep the program running for a bit to see the result
time.Sleep(3 * time.Second)
fmt.Println("Quick start example completed!")
}The main interface for command and control operations:
type CNC interface {
RegisterHandler(commandName CommandName, handler Handler) error
TriggerCommand(ctx context.Context, command Command) error
Start(ctx context.Context) error
Shutdown() error
IsRunning() bool
}Pluggable transport layer for message passing:
type Transport interface {
Publish(ctx context.Context, command Command) error
Subscribe(ctx context.Context) error
Messages() <-chan Command // Returns channel of received commands
Close() error
IsConnected() bool
}client, err := cnc.NewValkeyClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
cncInstance := cnc.NewCNCWithValkey(client, "command-channel")cncInstance, err := cnc.NewCNCWithValkeyAddress("localhost:6379", "command-channel")
if err != nil {
log.Fatal(err)
}transport := NewMyCustomTransport()
cncInstance := cnc.NewCNC(transport)type Command struct {
Name CommandName `json:"type"`
Parameters map[string]any `json:"parameters,omitempty"`
}
type Handler func(ctx context.Context, command Command) errorThe package provides typed errors for better error handling:
var (
ErrInvalidCommand = errors.New("invalid command")
ErrHandlerAlreadyExists = errors.New("handler already exists")
ErrHandlerNotFound = errors.New("handler not found")
ErrPublishFailed = errors.New("failed to publish command")
ErrSubscribeFailed = errors.New("failed to subscribe to channel")
ErrTransportNotConnected = errors.New("transport not connected")
ErrTransportClosed = errors.New("transport is closed")
ErrCNCNotStarted = errors.New("cnc instance not started")
ErrCNCAlreadyStarted = errors.New("cnc instance already started")
)To implement a custom transport (e.g., for NATS, Kafka), implement the Transport interface:
type MyCustomTransport struct {
msgChan chan Command
// Your other transport fields
}
func (t *MyCustomTransport) Publish(ctx context.Context, command Command) error {
// Implement publishing logic
return nil
}
func (t *MyCustomTransport) Subscribe(ctx context.Context) error {
// Start listening for messages and feed them to msgChan
return nil
}
func (t *MyCustomTransport) Messages() <-chan Command {
// Return the channel that receives commands
return t.msgChan
}
func (t *MyCustomTransport) Close() error {
// Implement cleanup logic
close(t.msgChan)
return nil
}
func (t *MyCustomTransport) IsConnected() bool {
// Return connection status
return true
}Run the tests:
go test ./...Run tests with coverage:
go test -cover ./...- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
The package follows a clean separation of concerns:
- Transport Layer: Pure message passing (publish/subscribe) with channels
- Registry: Maps command names to handler functions
- CNC Core: Orchestrates transport and registry, manages lifecycle
Commands βββ
βΌ
βββββββββββββββ ββββββββββββββββ ββββββββββββββ
β Transport βββββΆβ Channel βββββΆβ CNC Core β
β β β <-chan Cmd β β β
βββββββββββββββ ββββββββββββββββ ββββββββββββββ
β
βΌ
βββββββββββββββ
β Registry β
β (Handlers) β
βββββββββββββββ
If you have any questions or need help:
- π§ Open an issue on GitHub
- π¬ Start a discussion in the repository
- β Star the repo if you find it useful!