import "github.com/autom8ter/async"
Package async is a package for asynchronous programming in Go.
It provides a set of primitives for safely building concurrent applications that don't leak resources or deadlock.
- type Borrower
- func NewBorrower[T any](value T) *Borrower[T]
- func (b *Borrower[T]) Borrow() *T
- func (b *Borrower[T]) BorrowContext(ctx context.Context) (*T, error)
- func (b *Borrower[T]) Close() error
- func (b *Borrower[T]) Do(fn func(*T)) error
- func (b *Borrower[T]) Return(obj *T) error
- func (b *Borrower[T]) Swap(value T) error
- func (b *Borrower[T]) TryBorrow() (*T, bool)
- func (b *Borrower[T]) Value() T
- type Channel
- func NewChannel[T any](ctx context.Context, opts ...ChannelOpt[T]) *Channel[T]
- func (c *Channel[T]) Context() *MultiContext
- func (c *Channel[T]) ProxyFrom(ctx context.Context, ch *Channel[T])
- func (c *Channel[T]) Send(ctx context.Context, value T) bool
- func (c *Channel[T]) SendAsync(ctx context.Context, value T) chan bool
- type ChannelBroadcast
- func NewChannelBroadcast[T any](ctx context.Context) *ChannelBroadcast[T]
- func (b *ChannelBroadcast[T]) Channel(ctx context.Context, opts ...ChannelOpt[T]) *ChannelReceiver[T]
- func (b *ChannelBroadcast[T]) Close()
- func (c *ChannelBroadcast[T]) Len() int
- func (cg *ChannelBroadcast[T]) Send(ctx context.Context, val T)
- func (cg *ChannelBroadcast[T]) SendAsync(ctx context.Context, val T) chan bool
- type ChannelOpt
- func WithBufferSize[T any](bufferSize int) ChannelOpt[T]
- func WithOnClose[T any](fn func(ctx context.Context)) ChannelOpt[T]
- func WithOnRcv[T any](fn func(context.Context, T) T) ChannelOpt[T]
- func WithOnSend[T any](fn func(context.Context, T) T) ChannelOpt[T]
- func WithWhere[T any](fn func(context.Context, T) bool) ChannelOpt[T]
- type ChannelReceiver
- func (c *ChannelReceiver[T]) Close(ctx context.Context)
- func (c *ChannelReceiver[T]) ForEach(ctx context.Context, fn func(context.Context, T) bool)
- func (c *ChannelReceiver[T]) ForEachAsync(ctx context.Context, fn func(context.Context, T) bool)
- func (c *ChannelReceiver[T]) Len() int
- func (c *ChannelReceiver[T]) ProxyTo(ctx context.Context, ch *Channel[T])
- func (c *ChannelReceiver[T]) Recv(ctx context.Context) (T, bool)
- type IOFunc
- type IOHandler
- func NewIOHandler[I any, O any](ctx context.Context, fn IOFunc[I, O]) *IOHandler[I, O]
- func (a *IOHandler[I, O]) Close()
- func (a *IOHandler[I, O]) Process(input Input[I], opts ...ChannelOpt[*Output[O]]) *Channel[*Output[O]]
- func (a *IOHandler[I, O]) ProcessStream(streamCtx context.Context, inputs chan Input[I], opts ...ChannelOpt[*Output[O]]) *Channel[*Output[O]]
- func (a *IOHandler[I, O]) ProcessSync(input Input[I]) *Output[O]
- func (a *IOHandler[I, O]) Wait()
- func (a *IOHandler[I, O]) WithInputWrappers(wrappers ...InputWrapper[I]) *IOHandler[I, O]
- func (a *IOHandler[I, O]) WithOutputWrappers(wrappers ...OutputWrapper[O]) *IOHandler[I, O]
- type IOMiddleware
- type Input
- type InputWrapper
- type MultiContext
- type Output
- type OutputWrapper
type Borrower
Borrower is a thread-safe object that can be borrowed and returned.
type Borrower[T any] struct {
// contains filtered or unexported fields
}
func NewBorrower
func NewBorrower[T any](value T) *Borrower[T]
NewBorrower returns a new Borrower with the provided value.
func (*Borrower[T]) Borrow
func (b *Borrower[T]) Borrow() *T
Borrow returns the value of the Borrower. If the value is not available, it will block until it is.
func (*Borrower[T]) BorrowContext
func (b *Borrower[T]) BorrowContext(ctx context.Context) (*T, error)
BorrowContext returns the value of the Borrower. If the value is not available, it will block until it is or the context is canceled.
func (*Borrower[T]) Close
func (b *Borrower[T]) Close() error
Close closes the Borrower and prevents it from being borrowed again. If the Borrower is still borrowed, it will return an error. Close is idempotent.
func (*Borrower[T]) Do
func (b *Borrower[T]) Do(fn func(*T)) error
Do borrows the value, calls the provided function, and returns the value.
func (*Borrower[T]) Return
func (b *Borrower[T]) Return(obj *T) error
Return returns the value to the Borrower so it can be borrowed again. If the value is not a pointer to the value that was borrowed, it will return an error. If the value has already been returned, it will return an error.
func (*Borrower[T]) Swap
func (b *Borrower[T]) Swap(value T) error
Swap borrows the value, swaps it with the provided value, and returns the value to the Borrower.
func (*Borrower[T]) TryBorrow
func (b *Borrower[T]) TryBorrow() (*T, bool)
TryBorrow returns the value of the Borrower if it is available. If the value is not available, it will return false.
func (*Borrower[T]) Value
func (b *Borrower[T]) Value() T
Value returns the value of the Borrower. This is a non-blocking operation since the value is not borrowed(non-pointer).
type Channel
Channel is a safer version of a channel that can be closed and has a context to prevent sending or receiving when the context is canceled.
type Channel[T any] struct {
// contains filtered or unexported fields
}
func NewChannel
func NewChannel[T any](ctx context.Context, opts ...ChannelOpt[T]) *Channel[T]
NewChannel returns a new Channel with the provided options. The channel will be closed when the context is cancelled.
func (*Channel[T]) Context
func (c *Channel[T]) Context() *MultiContext
Context returns the context of the channel.
func (*Channel[T]) ProxyFrom
func (c *Channel[T]) ProxyFrom(ctx context.Context, ch *Channel[T])
ProxyFrom proxies values from the given channel to this channel. This is a non-blocking call.
func (*Channel[T]) Send
func (c *Channel[T]) Send(ctx context.Context, value T) bool
Send sends a value to the channel. If the channel is closed or the context is cancelled, it will return false. If the value is sent, it will return true. This is a blocking call.
func (*Channel[T]) SendAsync
func (c *Channel[T]) SendAsync(ctx context.Context, value T) chan bool
SendAsync sends a value to the channel in a goroutine. If the channel is closed, it will return false to the channel returned by this function. If the context is canceled, it will return false to the channel returned by this function. If the value is sent, it will return true to the channel returned by this function. This is a non-blocking call.
type ChannelBroadcast
ChannelBroadcast is a thread-safe group of channels. It is useful for broadcasting a value to multiple channels at once.
type ChannelBroadcast[T any] struct {
// contains filtered or unexported fields
}
func NewChannelBroadcast
func NewChannelBroadcast[T any](ctx context.Context) *ChannelBroadcast[T]
NewChannelBroadcast returns a new ChannelBroadcast. The context is used to cancel all subscribers when the context is canceled. A channel group is useful for broadcasting a value to multiple subscribers.
func (*ChannelBroadcast[T]) Channel
func (b *ChannelBroadcast[T]) Channel(ctx context.Context, opts ...ChannelOpt[T]) *ChannelReceiver[T]
Channel returns a channel that will receive values from broadcasted values. The channel will be closed when the context is canceled. This is a non-blocking operation.
func (*ChannelBroadcast[T]) Close
func (b *ChannelBroadcast[T]) Close()
Close blocks until all subscribers have been removed and then closes the broadcast.
func (*ChannelBroadcast[T]) Len
func (c *ChannelBroadcast[T]) Len() int
Len returns the number of subscribers.
func (*ChannelBroadcast[T]) Send
func (cg *ChannelBroadcast[T]) Send(ctx context.Context, val T)
SendAsync sends a value to all channels in the group asynchronously. This is a non-blocking operation.
func (*ChannelBroadcast[T]) SendAsync
func (cg *ChannelBroadcast[T]) SendAsync(ctx context.Context, val T) chan bool
SendAsync sends a value to all channels in the group asynchronously. This is a non-blocking operation.
type ChannelOpt
ChannelOpt is an option for creating a new Channel.
type ChannelOpt[T any] func(*channelOpts[T])
func WithBufferSize
func WithBufferSize[T any](bufferSize int) ChannelOpt[T]
WithBufferSize sets the buffer size of the channel.
func WithOnClose
func WithOnClose[T any](fn func(ctx context.Context)) ChannelOpt[T]
WithOnClose adds a function to be called before the channel is closed.
func WithOnRcv
func WithOnRcv[T any](fn func(context.Context, T) T) ChannelOpt[T]
WithOnRcv adds a function to be called before receiving a value.
func WithOnSend
func WithOnSend[T any](fn func(context.Context, T) T) ChannelOpt[T]
WithOnSend adds a function to be called before sending a value.
func WithWhere
func WithWhere[T any](fn func(context.Context, T) bool) ChannelOpt[T]
WithWhere adds a function to be called before sending a value to determine if the value should be sent.
type ChannelReceiver
ChannelReceiver is a receiver for a channel.
type ChannelReceiver[T any] struct {
// contains filtered or unexported fields
}
func (*ChannelReceiver[T]) Close
func (c *ChannelReceiver[T]) Close(ctx context.Context)
Close closes the channel. It will call the OnClose functions and wait for all goroutines to finish. If the context is cancelled, waiting for goroutines to finish will be cancelled.
func (*ChannelReceiver[T]) ForEach
func (c *ChannelReceiver[T]) ForEach(ctx context.Context, fn func(context.Context, T) bool)
ForEach calls the given function for each value in the channel until the channel is closed, the context is cancelled, or the function returns false.
func (*ChannelReceiver[T]) ForEachAsync
func (c *ChannelReceiver[T]) ForEachAsync(ctx context.Context, fn func(context.Context, T) bool)
ForEachAsync calls the given function for each value in the channel until the channel is closed, the context is cancelled, or the function returns false. It will call the function in a new goroutine for each value.
func (*ChannelReceiver[T]) Len
func (c *ChannelReceiver[T]) Len() int
Len returns the number of values in the channel.
func (*ChannelReceiver[T]) ProxyTo
func (c *ChannelReceiver[T]) ProxyTo(ctx context.Context, ch *Channel[T])
ProxyTo proxies values from this channel to the given channel. This is a non-blocking call.
func (*ChannelReceiver[T]) Recv
func (c *ChannelReceiver[T]) Recv(ctx context.Context) (T, bool)
Recv returns the next value from the channel. If the channel is closed, it will return false.
type IOFunc
IOFunc is a function that takes a context and an input and returns a result.
type IOFunc[I any, O any] func(input Input[I]) *Output[O]
func (IOFunc[I, O]) WrapInput
func (fn IOFunc[I, O]) WrapInput(wrappers ...InputWrapper[I]) IOFunc[I, O]
WrapInput wraps the input of the IO function with the given input wrappers before calling it.
func (IOFunc[I, O]) WrapOutput
func (fn IOFunc[I, O]) WrapOutput(wrappers ...OutputWrapper[O]) IOFunc[I, O]
WrapOutput wraps the output of the IO function with the given output wrappers before returning it.
type IOHandler
IOHandler runs functions asynchronously.
type IOHandler[I any, O any] struct {
// contains filtered or unexported fields
}
func NewIOHandler
func NewIOHandler[I any, O any](ctx context.Context, fn IOFunc[I, O]) *IOHandler[I, O]
NewIOHandler creates a new IOHandler instance.
func (*IOHandler[I, O]) Close
func (a *IOHandler[I, O]) Close()
Close cancels the root context and waits for all goroutines to finish.
func (*IOHandler[I, O]) Process
func (a *IOHandler[I, O]) Process(input Input[I], opts ...ChannelOpt[*Output[O]]) *Channel[*Output[O]]
Process calls the given function in a new goroutine and returns a channel that will receive the result of the function.
func (*IOHandler[I, O]) ProcessStream
func (a *IOHandler[I, O]) ProcessStream(streamCtx context.Context, inputs chan Input[I], opts ...ChannelOpt[*Output[O]]) *Channel[*Output[O]]
ProcessStream calls the given function for each value in the channel until the channel is closed, or the context is cancelled.
func (*IOHandler[I, O]) ProcessSync
func (a *IOHandler[I, O]) ProcessSync(input Input[I]) *Output[O]
ProcessSync calls the given function and returns the result.
func (*IOHandler[I, O]) Wait
func (a *IOHandler[I, O]) Wait()
Wait waits for all goroutines to finish.
func (*IOHandler[I, O]) WithInputWrappers
func (a *IOHandler[I, O]) WithInputWrappers(wrappers ...InputWrapper[I]) *IOHandler[I, O]
WithInputWrappers adds the given input wrappers to the IOHandler instance. This method is not thread-safe.
func (*IOHandler[I, O]) WithOutputWrappers
func (a *IOHandler[I, O]) WithOutputWrappers(wrappers ...OutputWrapper[O]) *IOHandler[I, O]
WithOutputWrappers adds the given output wrappers to the IOHandler instance. This method is not thread-safe.
type IOMiddleware
IOMiddleware is a struct that contains an input wrapper and an output wrapper. It can be used to wrap the input and output of an IO function.
type IOMiddleware[I any, O any] struct {
Input func(Input[I]) Input[I]
Output func(*Output[O]) *Output[O]
}
type Input
Input is a struct that contains a context and a value.
type Input[T any] struct {
Ctx context.Context
ID string
Value T
}
type InputWrapper
InputWrapper is a function that takes an input and returns an input.
type InputWrapper[I any] func(I) I
type MultiContext
MultiContext is a context that can be used to combine contexts with a root context so they can be cancelled together.
type MultiContext struct {
context.Context
// contains filtered or unexported fields
}
func NewMultiContext
func NewMultiContext(ctx context.Context) *MultiContext
NewMultiContext returns a new MultiContext.
func (*MultiContext) Cancel
func (m *MultiContext) Cancel()
Cancel cancels all child contexts.
func (*MultiContext) WithCloser
func (m *MultiContext) WithCloser(fn func())
WithCloser adds a function to be called when the multi context is cancelled.
func (*MultiContext) WithContext
func (m *MultiContext) WithContext(ctx context.Context) context.Context
WithContext returns a new context that is a child of the root context. This context will be cancelled when the multi context is cancelled.
type Output
Output is a struct that contains a context, a value, and an error.
type Output[T any] struct {
Ctx context.Context
Value T
Err error
}
type OutputWrapper
OutputWrapper is a function that takes an output and returns an output.
type OutputWrapper[O any] func(*Output[O]) *Output[O]
Generated by gomarkdoc