/asyncp

The simple framework to build async task execution programs

Primary LanguageGoApache License 2.0Apache-2.0

Asyncp processing framework

Build Status Coverage Status Go Report Card GoDoc

License Apache 2.0

The simple framework to build async task execution programs.

Pipelines

Sequentially apply a list of tasks. Pipelines allow to split one big processing part for small simple steps to simplify the complex logic.

The pipeline is atomic, all tasks in the pipeline will successfully execute the whole task or all not.

Example program

import (
  ...
  "github.com/demdxx/asyncp/v2"
  "github.com/demdxx/asyncp/v2/streams"
  ...
)

func main() {
  taskQueueSub := nats.NewSubscriber(...)
  taskQueuePub := nats.NewPublisher(...)

  // Create new async multiplexer
  mx := asyncp.NewTaskMux(
    // Define default strem message queue
    asyncp.WithStreamResponseFactory(taskQueuePub),
    asyncp.WithPanicHandler(...),
    asyncp.WithErrorHandler(...),
    asyncp.WithCluster(...),
  )
  defer func() { _ = mx.Close() }()

  // Create new task handler to download articles by RSS
  mx.Handle("rss", downloadRSSList).
    Then(downloadRSSItem).
    Then(updateRSSArticles)

  // Create new task handler to process video files
  mx.Handle("video", loadVideoForProcessing).
    Then(makeVideoThumbs).
    Then(convertVideoFormat)

  // Send report to user (event contains login and email target)
  mx.Handle("email", pipeline.New(
    `userinfo`, assembleBasicInfo,
    `changes`, assembleAllChangesForUser,
    `template`, assembleEmailHTMLTemplate,
    pipeline.New(
      sendNotification,
      sendSendLogs,
    ),
  )).Then(sendEmailTask)

  // Retranslate all message to the queue if can`t process
  mx.Failver(asyncp.Retranslator(taskQueuePub))

  // Alternative:
  // taskQueueSub.Subscribe(context.Background(), mx)
  // taskQueueSub.Listen(context.Background())
  err = streams.ListenAndServe(context.Background(), mx,
    taskQueueSub, "nats://host:2222/group?topics=topicName")
}

Convert task to async executor.

atask := asyncp.WrapAsyncTask(task,
  WithWorkerCount(10),
  WithWorkerPoolSize(20),
  WithRecoverHandler(func(rec any) {
    // ...
  }))

// Or

asyncp.FuncTask(assembleBasicInfo).Async()

Cluster mode

The framework supports cluster task processing. Some of the servers can execute some specific tasks like video processing or windows specific stuff.

To extend cluster base functionality need to create in other applications (with the same sync options) linked task baseHandlerTaskName>myNewClusterTask.

func main() {
  ...
  // After RSS parsing we need to prcess video files from links if it's present
  mx.Handle("rss>videoExtraction", loadVideoForProcessing).
    Then(makeVideoThumbs).
    Then(convertVideoFormat)
  ...
}

Apmonitor tool

Displays state of the cluster and every task common state.

apmonitor tool