/go-beanstalk

Primary LanguageGoMIT LicenseMIT

Go Beanstalk

Build Status codecov Go Reference

Go client for beanstalkd.

Installation

go get github.com/IvanLutokhin/go-beanstalk

Quick Start

Producer

c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
if err != nil {
	panic(err)
}

id, err := c.Put(1, 0, 5*time.Second, []byte("example"))
if err != nil {
	panic(err)
}

fmt.Println(id) // output job id

Consumer

c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
if err != nil {
	panic(err)
}

job, err := c.Reserve()
if err != nil {
	panic(err)
}

fmt.Println(job.ID) // output job id
fmt.Println(job.Data) // output job data

Pool

p := beanstalk.NewPool(&beanstalk.PoolOptions{
	Dialer: func () (*beanstalk.Client, error) { return beanstalk.Dial("127.0.0.1:11300") },
	Logger: beanstalk.NopLogger,
	Capacity: 5,
	MaxAge: 0,
	IdleTimeout: 0,
})

// establish connections
if err = p.Open(); err != nil {
	panic(err)
}

// retrieve client
c, err := p.Get()
if err != nil {
	panic(err)
}

// use client
stats, err := c.Stats()
if err != nil {
	panic(err)
}

// return client
if err = p.Put(c); err != nil {
	panic(err)
}

// close connections
if err = p.Close(); err != nil {
	panic(err)
}

HTTP Handler

// Handler
type Handler interface {
    ServeHTTP(client *beanstalk.Client, writer http.ResponseWriter, request *http.Request)
}

type HandlerFunc func(client *beanstalk.Client, writer http.ResponseWriter, request *http.Request)

func (f HandlerFunc) ServeHTTP(client *beanstalk.Client, writer http.ResponseWriter, request *http.Request) {
    f(client, writer, request)
}

// Adapter
type HTTPHandlerAdapter struct {
    pool    *beanstalk.Pool
    handler Handler
}

func NewHTTPHandlerAdapter(pool *beanstalk.Pool, handler Handler) *HTTPHandlerAdapter {
    return &HTTPHandlerAdapter{
        pool:    pool,
        handler: handler,
    }
}

func (a HTTPHandlerAdapter) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
    client, err := a.pool.Get()
    if err != nil {
        panic(err)
    }

    defer a.pool.Put(client)

    a.handler.ServeHTTP(client, writer, request)
}

func GetServerStats() Handler {
	return HandlerFunc(func(c *beanstalk.Client, w http.ResponseWriter, r *http.Request) {
		stats, err := c.Stats()
		if err != nil {
			panic(err)
		}
		
		bytes, err := json.Marshal(v)
		if err != nil {
			panic(err)
		}
		
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		w.Write(bytes)
	})
}

func main() {
    p := beanstalk.NewPool(&beanstalk.PoolOptions{
        Dialer: func () (*beanstalk.Client, error) { return beanstalk.Dial("127.0.0.1:11300") },
        Logger: beanstalk.NopLogger,
        Capacity: 5,
        MaxAge: 0,
        IdleTimeout: 0,
    })

    if err := p.Open(); err != nil {
        panic(err)
    }
	
    http.Handle("/stats", NewHTTPHandlerAdapter(p, GetServerStats()))
    http.ListenAndServe(":8090", nil)	
}

License

The MIT License (MIT)