A modern, high-performance Go event bus implementation with type safety, async processing, priority handling, filters, and enterprise-grade features.
- Type Safety: Go generics ensure compile-time type safety
- Sync/Async: Support for both synchronous and asynchronous event processing
- Handle Pattern: Precise subscription management with unsubscribe handles
- Once Subscription: One-time event handlers that auto-remove after execution
- Priority Processing: 4-level priority system (Critical, High, Normal, Low)
- Event Filtering: Custom event filters for fine-grained control
- Context Support: Context-based cancellation and timeout handling
- Middleware: Event processing middleware chain
- Error Handling: Comprehensive error handling and recovery
- Monitoring: Built-in performance metrics and statistics
- Graceful Shutdown: Proper resource cleanup and graceful termination
- Thread Safe: Concurrent-safe design for multi-goroutine usage
- Panic Recovery: Automatic recovery from handler panics
- Resource Management: Automatic cleanup and memory management
go get github.com/PlutoWu-Cn/go-buspackage main
import (
"fmt"
"github.com/PlutoWu-Cn/go-bus"
)
type UserEvent struct {
UserID string
Action string
}
func main() {
// Create a type-safe event bus
eventBus := bus.NewTyped[UserEvent]()
defer eventBus.Close()
// Subscribe to events
handle := eventBus.SubscribeWithHandle("user.login", func(event UserEvent) {
fmt.Printf("User %s performed %s\n", event.UserID, event.Action)
})
defer handle.Unsubscribe()
// Publish events
eventBus.Publish("user.login", UserEvent{
UserID: "user123",
Action: "login",
})
}Handlers with different priorities execute in priority order:
// High priority - security checks
securityHandle := eventBus.SubscribeWithPriority("user.action", func(event UserEvent) {
fmt.Println("🔒 Security check")
}, bus.PriorityCritical)
// Normal priority - business logic
businessHandle := eventBus.SubscribeWithPriority("user.action", func(event UserEvent) {
fmt.Println("📋 Business processing")
}, bus.PriorityNormal)
// Low priority - analytics
analyticsHandle := eventBus.SubscribeWithPriority("user.action", func(event UserEvent) {
fmt.Println("📊 Analytics")
}, bus.PriorityLow)Process only events that match specific criteria:
// Only process admin user events
adminHandle := eventBus.SubscribeWithFilter("user.action", func(event UserEvent) {
fmt.Printf("Admin action: %s\n", event.UserID)
}, func(topic string, event UserEvent) bool {
return strings.HasPrefix(event.UserID, "admin_")
})
// Only process sensitive operations
sensitiveHandle := eventBus.SubscribeWithFilter("user.action", func(event UserEvent) {
fmt.Printf("Sensitive operation alert: %s\n", event.Action)
}, func(topic string, event UserEvent) bool {
sensitiveActions := []string{"delete", "modify_permissions"}
for _, action := range sensitiveActions {
if event.Action == action {
return true
}
}
return false
})Use context for cancellation and timeout control:
// Context cancellation
ctx, cancel := context.WithCancel(context.Background())
handle := eventBus.SubscribeWithContext(ctx, "user.session", func(event UserEvent) {
fmt.Printf("Session event: %s\n", event.UserID)
})
// Cancel subscription
cancel()
// Timeout publishing
err := eventBus.PublishWithTimeout("user.action", event, 5*time.Second)
if err != nil {
fmt.Printf("Publish timeout: %v\n", err)
}Set up global error handler:
eventBus.SetErrorHandler(func(err *bus.EventError) {
log.Printf("Event processing error - Topic: %s, Error: %v", err.Topic, err.Err)
})Add processing middleware:
// Logging middleware
eventBus.AddMiddleware(func(topic string, event interface{}, next func()) error {
start := time.Now()
log.Printf("Processing event: %s", topic)
next() // Execute handlers
log.Printf("Event processed: %s, Duration: %v", topic, time.Since(start))
return nil
})
// Rate limiting middleware
eventBus.AddMiddleware(func(topic string, event interface{}, next func()) error {
if rateLimiter.Allow() {
next()
return nil
}
return fmt.Errorf("rate limit exceeded")
})Get runtime metrics:
metrics := eventBus.GetMetrics()
published, processed, failed, subscribers := metrics.GetStats()
fmt.Printf("Published events: %d\n", published)
fmt.Printf("Processed events: %d\n", processed)
fmt.Printf("Failed events: %d\n", failed)
fmt.Printf("Active subscribers: %d\n", subscribers)
// Get topic information
topics := eventBus.GetTopics()
subscriberCount := eventBus.GetSubscriberCount("user.action")// Async processing, non-transactional (concurrent execution)
err := eventBus.SubscribeAsync("user.notification", func(event UserEvent) {
sendEmail(event.UserID)
}, false)
// Async processing, transactional (serial execution)
err := eventBus.SubscribeAsync("user.audit", func(event UserEvent) {
writeAuditLog(event)
}, true)The library is organized into separate modules for better maintainability:
types.go- Core type definitions (Priority, EventError, filters, middleware)interfaces.go- Interface definitions (BusSubscriber, BusPublisher, BusController, Bus)metrics.go- Monitoring and metrics functionalityhandle.go- Subscription handle management and internal handler structuresbus.go- Core EventBus implementation
// Subscriber interface
type BusSubscriber[T any] interface {
Subscribe(topic string, fn func(T)) error
SubscribeWithPriority(topic string, fn func(T), priority Priority) *Handle[T]
SubscribeWithFilter(topic string, fn func(T), filter EventFilter[T]) *Handle[T]
SubscribeWithContext(ctx context.Context, topic string, fn func(T)) *Handle[T]
// ...
}
// Publisher interface
type BusPublisher[T any] interface {
Publish(topic string, event T)
PublishWithContext(ctx context.Context, topic string, event T) error
PublishWithTimeout(topic string, event T, timeout time.Duration) error
}
// Controller interface
type BusController interface {
GetMetrics() *EventMetrics
SetErrorHandler(handler ErrorHandler)
AddMiddleware(middleware EventMiddleware[any])
Close() error
// ...
}// Event filter
type EventFilter[T any] func(topic string, event T) bool
// Event middleware
type EventMiddleware[T any] func(topic string, event T, next func()) error
// Error handler
type ErrorHandler func(err *EventError)
// Priority levels
type Priority int
const (
PriorityLow Priority = iota
PriorityNormal
PriorityHigh
PriorityCritical
)Following industry best practices, supports these event design patterns:
type UserCreatedEvent struct {
UserID string `json:"user_id"`
Timestamp time.Time `json:"timestamp"`
// Minimal data, subscribers fetch details themselves
}type UserUpdatedEvent struct {
UserID string `json:"user_id"`
Timestamp time.Time `json:"timestamp"`
OldState map[string]interface{} `json:"old_state"`
NewState map[string]interface{} `json:"new_state"`
ChangedFields []string `json:"changed_fields"`
}// Use dot-separated hierarchical naming
"user.created"
"user.updated"
"user.deleted"
"order.placed"
"order.cancelled"
"payment.processed"
"payment.failed"
// Or use namespaces
"ecommerce.order.created"
"auth.user.login"
"notification.email.sent"// Set up retry mechanism
eventBus.SetErrorHandler(func(err *EventError) {
switch err.Err.(type) {
case *TemporaryError:
// Temporary error, retry later
retryQueue.Add(err.Topic, err.Event)
case *PermanentError:
// Permanent error, log and alert
logger.Error("Permanent error", err)
alerting.Send(err)
default:
// Unknown error, log details
logger.Warn("Unknown error", err)
}
})// Use async for non-critical paths
eventBus.SubscribeAsync("analytics.track", func(event UserEvent) {
// Non-critical analytics
analytics.Track(event)
}, false)
// Use sync for critical paths
eventBus.Subscribe("payment.validate", func(event PaymentEvent) {
// Critical payment validation
validatePayment(event)
})
// Use filters to reduce unnecessary processing
eventBus.SubscribeWithFilter("user.activity", handler, func(topic string, event UserEvent) bool {
return event.IsImportant() // Only process important events
})| Feature | go-bus | Guava EventBus | RxJava | Node.js EventEmitter |
|---|---|---|---|---|
| Type Safety | ✅ Generics | ✅ | ✅ | ❌ |
| Async Processing | ✅ | ❌ | ✅ | ✅ |
| Priority | ✅ | ❌ | ❌ | ❌ |
| Filters | ✅ | ❌ | ✅ | ❌ |
| Middleware | ✅ | ❌ | ✅ | ❌ |
| Error Handling | ✅ | ✅ | ||
| Monitoring | ✅ | ❌ | ❌ | ❌ |
| Context Support | ✅ | ❌ | ❌ | ❌ |
Run the complete test suite:
go test -v ./...Generate test coverage report:
go test -cover ./...
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.htmlCurrent test coverage: 92.2% - We maintain high test coverage to ensure reliability and stability.
Run performance benchmarks:
go test -bench=. -benchmemBenchmark results on Go 1.19+:
- Sync Publishing: ~2,000,000 events/sec
- Async Publishing: ~5,000,000 events/sec
- Memory Usage: Minimal GC pressure
- Concurrency: Excellent multi-core scaling
We welcome Issues and Pull Requests!
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
MIT License - see the LICENSE file for details
This project draws inspiration from these excellent open source projects and design patterns:
- Guava EventBus - The classic Java implementation
- MBassador - High-performance Java EventBus
- Node.js EventEmitter - JavaScript native event system
- Enterprise Integration Patterns - Enterprise integration patterns
