/fx

a function extension package, with synchronous and asynchronous support

Primary LanguageGoMIT LicenseMIT

fx

a function extension package, with synchronous and asynchronous support.

Using type parameters as method receiver is unsupported, liking func [T] (T) Xxx(...), and type's method must have no type parameters, so i can only use the fucking interface{}.

Usage

map and reduce

  sum, err := From(Range(1, 100)).
    Map(func(v Any) (Any, error) {
      return v.(int) * 2, nil
    }).
    Reduce(0, func(sum, v Any) (Any, error) {
      return sum.(int) + v.(int), nil
    })

async and spawn

  list, err := From(Infinite()).
    Map(func(v Any) (Any, error) {
      return v.(uint64) * v.(uint64), nil
    }).
    Async(10).                        // async the previous map iter, and the size of chan buf is 10
    Spawn(10, func(s Stream) Stream { // spawn 10 goroutines to cousume the iter
      return s.
        Map(func(v Any) (Any, error) {
          time.Sleep(time.Millisecond * 100)
          return v, nil
        }).
        Filter(func(v Any) (bool, error) {
          return v.(uint64)%10 == 4, nil
        }).
        FlatMap(func(v Any) ([]Any, error) {
          iv := v.(uint64)
          return []Any{iv, iv + 1, iv + 2}, nil
        })
    }).
    OnError(func(error) error {
      // log...
      return nil // skip the err
    }).
    Take(100)
  • async: make the previous iter execute asynchronously
  • spawn: dispatch to multiple groutines and fan-in