/async

utilities for async processing in Go

Primary LanguageGoMIT LicenseMIT

async

import "github.com/autom8ter/async"

Package async is a package for asynchronous programming in Go.

It provides a set of primitives for safely building concurrent applications that don't leak resources or deadlock.

Index

Borrower is a thread-safe object that can be borrowed and returned.

type Borrower[T any] struct {
    // contains filtered or unexported fields
}

func NewBorrower[T any](value T) *Borrower[T]

NewBorrower returns a new Borrower with the provided value.

func (*Borrower[T]) Borrow

func (b *Borrower[T]) Borrow() *T

Borrow returns the value of the Borrower. If the value is not available, it will block until it is.

func (*Borrower[T]) BorrowContext

func (b *Borrower[T]) BorrowContext(ctx context.Context) (*T, error)

BorrowContext returns the value of the Borrower. If the value is not available, it will block until it is or the context is canceled.

func (*Borrower[T]) Close

func (b *Borrower[T]) Close() error

Close closes the Borrower and prevents it from being borrowed again. If the Borrower is still borrowed, it will return an error. Close is idempotent.

func (*Borrower[T]) Do

func (b *Borrower[T]) Do(fn func(*T)) error

Do borrows the value, calls the provided function, and returns the value.

func (*Borrower[T]) Return

func (b *Borrower[T]) Return(obj *T) error

Return returns the value to the Borrower so it can be borrowed again. If the value is not a pointer to the value that was borrowed, it will return an error. If the value has already been returned, it will return an error.

func (*Borrower[T]) Swap

func (b *Borrower[T]) Swap(value T) error

Swap borrows the value, swaps it with the provided value, and returns the value to the Borrower.

func (*Borrower[T]) TryBorrow

func (b *Borrower[T]) TryBorrow() (*T, bool)

TryBorrow returns the value of the Borrower if it is available. If the value is not available, it will return false.

func (*Borrower[T]) Value

func (b *Borrower[T]) Value() T

Value returns the value of the Borrower. This is a non-blocking operation since the value is not borrowed(non-pointer).

type Channel

Channel is a safer version of a channel that can be closed and has a context to prevent sending or receiving when the context is canceled.

type Channel[T any] struct {
    // contains filtered or unexported fields
}

func NewChannel[T any](ctx context.Context, opts ...ChannelOpt[T]) *Channel[T]

NewChannel returns a new Channel with the provided options. The channel will be closed when the context is cancelled.

func (*Channel[T]) Context

func (c *Channel[T]) Context() *MultiContext

Context returns the context of the channel.

func (*Channel[T]) ProxyFrom

func (c *Channel[T]) ProxyFrom(ctx context.Context, ch *Channel[T])

ProxyFrom proxies values from the given channel to this channel. This is a non-blocking call.

func (*Channel[T]) Send

func (c *Channel[T]) Send(ctx context.Context, value T) bool

Send sends a value to the channel. If the channel is closed or the context is cancelled, it will return false. If the value is sent, it will return true. This is a blocking call.

func (*Channel[T]) SendAsync

func (c *Channel[T]) SendAsync(ctx context.Context, value T) chan bool

SendAsync sends a value to the channel in a goroutine. If the channel is closed, it will return false to the channel returned by this function. If the context is canceled, it will return false to the channel returned by this function. If the value is sent, it will return true to the channel returned by this function. This is a non-blocking call.

ChannelBroadcast is a thread-safe group of channels. It is useful for broadcasting a value to multiple channels at once.

type ChannelBroadcast[T any] struct {
    // contains filtered or unexported fields
}

func NewChannelBroadcast[T any](ctx context.Context) *ChannelBroadcast[T]

NewChannelBroadcast returns a new ChannelBroadcast. The context is used to cancel all subscribers when the context is canceled. A channel group is useful for broadcasting a value to multiple subscribers.

func (*ChannelBroadcast[T]) Channel

func (b *ChannelBroadcast[T]) Channel(ctx context.Context, opts ...ChannelOpt[T]) *ChannelReceiver[T]

Channel returns a channel that will receive values from broadcasted values. The channel will be closed when the context is canceled. This is a non-blocking operation.

func (*ChannelBroadcast[T]) Close

func (b *ChannelBroadcast[T]) Close()

Close blocks until all subscribers have been removed and then closes the broadcast.

func (*ChannelBroadcast[T]) Len

func (c *ChannelBroadcast[T]) Len() int

Len returns the number of subscribers.

func (*ChannelBroadcast[T]) Send

func (cg *ChannelBroadcast[T]) Send(ctx context.Context, val T)

SendAsync sends a value to all channels in the group asynchronously. This is a non-blocking operation.

func (*ChannelBroadcast[T]) SendAsync

func (cg *ChannelBroadcast[T]) SendAsync(ctx context.Context, val T) chan bool

SendAsync sends a value to all channels in the group asynchronously. This is a non-blocking operation.

ChannelOpt is an option for creating a new Channel.

type ChannelOpt[T any] func(*channelOpts[T])

func WithBufferSize[T any](bufferSize int) ChannelOpt[T]

WithBufferSize sets the buffer size of the channel.

func WithOnClose[T any](fn func(ctx context.Context)) ChannelOpt[T]

WithOnClose adds a function to be called before the channel is closed.

func WithOnRcv[T any](fn func(context.Context, T) T) ChannelOpt[T]

WithOnRcv adds a function to be called before receiving a value.

func WithOnSend[T any](fn func(context.Context, T) T) ChannelOpt[T]

WithOnSend adds a function to be called before sending a value.

func WithWhere[T any](fn func(context.Context, T) bool) ChannelOpt[T]

WithWhere adds a function to be called before sending a value to determine if the value should be sent.

ChannelReceiver is a receiver for a channel.

type ChannelReceiver[T any] struct {
    // contains filtered or unexported fields
}

func (*ChannelReceiver[T]) Close

func (c *ChannelReceiver[T]) Close(ctx context.Context)

Close closes the channel. It will call the OnClose functions and wait for all goroutines to finish. If the context is cancelled, waiting for goroutines to finish will be cancelled.

func (*ChannelReceiver[T]) ForEach

func (c *ChannelReceiver[T]) ForEach(ctx context.Context, fn func(context.Context, T) bool)

ForEach calls the given function for each value in the channel until the channel is closed, the context is cancelled, or the function returns false.

func (*ChannelReceiver[T]) ForEachAsync

func (c *ChannelReceiver[T]) ForEachAsync(ctx context.Context, fn func(context.Context, T) bool)

ForEachAsync calls the given function for each value in the channel until the channel is closed, the context is cancelled, or the function returns false. It will call the function in a new goroutine for each value.

func (*ChannelReceiver[T]) Len

func (c *ChannelReceiver[T]) Len() int

Len returns the number of values in the channel.

func (*ChannelReceiver[T]) ProxyTo

func (c *ChannelReceiver[T]) ProxyTo(ctx context.Context, ch *Channel[T])

ProxyTo proxies values from this channel to the given channel. This is a non-blocking call.

func (*ChannelReceiver[T]) Recv

func (c *ChannelReceiver[T]) Recv(ctx context.Context) (T, bool)

Recv returns the next value from the channel. If the channel is closed, it will return false.

type IOFunc

IOFunc is a function that takes a context and an input and returns a result.

type IOFunc[I any, O any] func(input Input[I]) *Output[O]

func (IOFunc[I, O]) WrapInput

func (fn IOFunc[I, O]) WrapInput(wrappers ...InputWrapper[I]) IOFunc[I, O]

WrapInput wraps the input of the IO function with the given input wrappers before calling it.

func (IOFunc[I, O]) WrapOutput

func (fn IOFunc[I, O]) WrapOutput(wrappers ...OutputWrapper[O]) IOFunc[I, O]

WrapOutput wraps the output of the IO function with the given output wrappers before returning it.

IOHandler runs functions asynchronously.

type IOHandler[I any, O any] struct {
    // contains filtered or unexported fields
}

func NewIOHandler[I any, O any](ctx context.Context, fn IOFunc[I, O]) *IOHandler[I, O]

NewIOHandler creates a new IOHandler instance.

func (*IOHandler[I, O]) Close

func (a *IOHandler[I, O]) Close()

Close cancels the root context and waits for all goroutines to finish.

func (*IOHandler[I, O]) Process

func (a *IOHandler[I, O]) Process(input Input[I], opts ...ChannelOpt[*Output[O]]) *Channel[*Output[O]]

Process calls the given function in a new goroutine and returns a channel that will receive the result of the function.

func (*IOHandler[I, O]) ProcessStream

func (a *IOHandler[I, O]) ProcessStream(streamCtx context.Context, inputs chan Input[I], opts ...ChannelOpt[*Output[O]]) *Channel[*Output[O]]

ProcessStream calls the given function for each value in the channel until the channel is closed, or the context is cancelled.

func (*IOHandler[I, O]) ProcessSync

func (a *IOHandler[I, O]) ProcessSync(input Input[I]) *Output[O]

ProcessSync calls the given function and returns the result.

func (*IOHandler[I, O]) Wait

func (a *IOHandler[I, O]) Wait()

Wait waits for all goroutines to finish.

func (*IOHandler[I, O]) WithInputWrappers

func (a *IOHandler[I, O]) WithInputWrappers(wrappers ...InputWrapper[I]) *IOHandler[I, O]

WithInputWrappers adds the given input wrappers to the IOHandler instance. This method is not thread-safe.

func (*IOHandler[I, O]) WithOutputWrappers

func (a *IOHandler[I, O]) WithOutputWrappers(wrappers ...OutputWrapper[O]) *IOHandler[I, O]

WithOutputWrappers adds the given output wrappers to the IOHandler instance. This method is not thread-safe.

IOMiddleware is a struct that contains an input wrapper and an output wrapper. It can be used to wrap the input and output of an IO function.

type IOMiddleware[I any, O any] struct {
    Input  func(Input[I]) Input[I]
    Output func(*Output[O]) *Output[O]
}

type Input

Input is a struct that contains a context and a value.

type Input[T any] struct {
    Ctx   context.Context
    ID    string
    Value T
}

InputWrapper is a function that takes an input and returns an input.

type InputWrapper[I any] func(I) I

MultiContext is a context that can be used to combine contexts with a root context so they can be cancelled together.

type MultiContext struct {
    context.Context
    // contains filtered or unexported fields
}

func NewMultiContext(ctx context.Context) *MultiContext

NewMultiContext returns a new MultiContext.

func (*MultiContext) Cancel

func (m *MultiContext) Cancel()

Cancel cancels all child contexts.

func (*MultiContext) WithCloser

func (m *MultiContext) WithCloser(fn func())

WithCloser adds a function to be called when the multi context is cancelled.

func (*MultiContext) WithContext

func (m *MultiContext) WithContext(ctx context.Context) context.Context

WithContext returns a new context that is a child of the root context. This context will be cancelled when the multi context is cancelled.

type Output

Output is a struct that contains a context, a value, and an error.

type Output[T any] struct {
    Ctx   context.Context
    Value T
    Err   error
}

OutputWrapper is a function that takes an output and returns an output.

type OutputWrapper[O any] func(*Output[O]) *Output[O]

Generated by gomarkdoc