/qpool

Autoscaling worker pool on a whole other level.

Primary LanguageGoMIT LicenseMIT

QPool - Quantum Worker Pool & Message Queue

Go Reference Go Report Card License: MIT

QPool is a high-performance, feature-rich worker pool implementation in Go that combines the capabilities of a traditional worker pool with a sophisticated message queue. It's designed to handle complex job dependencies, provide robust error handling, and scale automatically based on workload.

๐ŸŒŸ Key Features

  • Dynamic Worker Pool

    • Auto-scaling based on workload
    • Configurable min/max worker counts
    • Efficient worker management
    • Smart job distribution
  • Advanced Job Dependencies

    • Support for future dependencies
    • Dependency chain resolution
    • Circular dependency detection
    • Parent-child relationship tracking
  • Robust Error Handling

    • Circuit breaker pattern
    • Configurable retry policies
    • Exponential backoff
    • Timeout management
  • Performance Features

    • Non-blocking job scheduling
    • Efficient memory usage
    • Resource utilization tracking
    • Load-based auto-scaling
  • Monitoring & Metrics

    • Comprehensive metrics collection
    • Latency percentiles (p95, p99)
    • Success/failure rates
    • Resource utilization stats
    • Dependency resolution tracking

๐Ÿ“ฆ Installation

go get github.com/theapemachine/qpool

๐Ÿš€ Quick Start

Here's a simple example to get you started:

package main

import (
    "context"
    "time"
    "github.com/theapemachine/qpool"
)

func main() {
    // Create a new pool with min 2, max 5 workers
    ctx := context.Background()
    pool := qpool.NewQ(ctx, 2, 5, &qpool.Config{
        SchedulingTimeout: time.Second,
    })
    defer pool.Close()

    // Schedule a simple job
    result := pool.Schedule("job-1", func() (any, error) {
        return "Hello, World!", nil
    })

    // Wait for the result
    value := <-result
    if value.Error != nil {
        panic(value.Error)
    }
    println(value.Value.(string))
}

๐Ÿ”จ Advanced Usage

Job Dependencies

// Create jobs with dependencies
pool.Schedule("data-fetch", func() (any, error) {
    return fetchData()
}, qpool.WithTTL(time.Minute))

pool.Schedule("data-process", func() (any, error) {
    return processData()
}, qpool.WithDependencies([]string{"data-fetch"}))

Circuit Breaker

// Add circuit breaker to protect sensitive operations
pool.Schedule("api-call", func() (any, error) {
    return callExternalAPI()
}, qpool.WithCircuitBreaker("api", 5, time.Minute))

Retry Policy

// Configure retry behavior
pool.Schedule("flaky-operation", func() (any, error) {
    return flakyOperation()
}, qpool.WithRetry(3, &qpool.ExponentialBackoff{
    Initial: time.Second,
}))

Broadcast Groups

// Create a broadcast group for pub/sub functionality
group := pool.CreateBroadcastGroup("sensors", time.Minute)
subscriber := pool.Subscribe("sensors")

// Send updates to all subscribers
group.Send(qpool.QuantumValue{
    Value: "sensor-update",
    CreatedAt: time.Now(),
})

๐Ÿ“Š Metrics & Monitoring

QPool provides comprehensive metrics for monitoring:

// Get current metrics
metrics := pool.metrics.ExportMetrics()

fmt.Printf("Active Workers: %d\n", metrics["worker_count"])
fmt.Printf("Queue Size: %d\n", metrics["queue_size"])
fmt.Printf("Success Rate: %.2f%%\n", metrics["success_rate"]*100)
fmt.Printf("P95 Latency: %dms\n", metrics["p95_latency"])

๐Ÿ”ง Configuration

QPool can be configured through the Config struct:

config := &qpool.Config{
    SchedulingTimeout: time.Second * 5,
}

pool := qpool.NewQ(ctx, minWorkers, maxWorkers, config)

๐Ÿ—๏ธ Architecture

QPool consists of several key components:

  • Q (Pool): Main orchestrator managing workers and job scheduling
  • Worker: Handles job execution and resource management
  • QuantumSpace: Manages job results and dependencies
  • CircuitBreaker: Provides fault tolerance
  • Scaler: Handles dynamic worker pool sizing
  • Metrics: Collects and exposes performance data

๐Ÿ“ˆ Performance

QPool is designed for high performance:

  • Non-blocking job scheduling
  • Efficient memory usage
  • Smart resource allocation
  • Automatic scaling based on load
  • Optimized dependency resolution

๐Ÿงช Testing

Run the test suite:

go test -v ./...

Run with race detection:

go test -race -v ./...

๐Ÿค Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • Inspired by a conversation with Clause AI
  • Built with modern concurrency patterns and best practices
  • Designed for real-world production use cases

๐Ÿ“š Documentation

For detailed documentation, please visit our Go Docs.

๐Ÿ“ž Support

  • Create an issue for bug reports
  • Start a discussion for feature requests
  • Check existing issues before creating new ones

Made with โค๏ธ by Daniel Owen van Dommelen