/executorservices

Dart executer services.

Primary LanguageDartBSD 3-Clause "New" or "Revised" LicenseBSD-3-Clause

Pub.dev License

A library for Dart and Flutter developers.

license.

Description

It allows you to execute code in isolates or any executor currently supported.

Support concurrent execution of tasks or functions.

Support cleanup of unused isolates.

Support caching of isolate that allow you to reuse them (like thread).

It's extendable.

Usage

For tasks that return either a value or an error:

For submitting a top level or static function without arguments to the executorservice. For task that return a value or a future the executorService.submit* return a future, to track the progress of the task.

import "dart:math";
import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<int> futureResult = executorService.submitAction(_randomInt);

  futureResult.then((result) => print(result));
}

int _randomInt() {
  return Random.secure().nextInt(1000);
}

For submitting a top level or static function with one argument to the executorservice.

import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<String> futureResult = executorService.submitCallable(expensiveHelloWorld, "Darel Bitsy");

  futureResult.then((result) => print(result));
}

Future<String> expensiveHelloWorld(final String name) async {
  await Future.delayed(Duration(seconds: 1));
  return "Hellow world $name";
}

For submitting a top level or static function with many other arguments to the executorservice. The executorservice provide method like submitFunction2,submitFunction3,submitFunction4.

import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<String> futureResult = executorService.submitFunction2(
    expensiveHelloWorld,
    "Darel Bitsy",
    23,
  );

  futureResult.then((result) => print(result));
}

Future<String> expensiveHelloWorld(final String name, final int age) async {
  await Future.delayed(Duration(seconds: 1));
  if (age >= 80) {
    return "Hellow world elder $name";
  } else {
    return "Hellow world $name";
  }
}

If you want to run a instance method of a existing class you need to make it extend the Task class. If if you want to have more than five arguments function. If you don't want to have the code as top level or static function.

import "dart:async";
import "dart:isolate";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Future<String> futureResult = executorService.submit(
    GetPost("https://jsonplaceholder.typicode.com/posts", 5),
  );

  futureResult.then(
    (result) => print(
      "Recieved $result and it's running from ${Isolate.current.debugName}",
    ),
  );
}

class GetPost extends Task<String> {
  GetPost(this.apiUrl, this.index);

  final String apiUrl;
  final int index;

  @override
  FutureOr<String> execute() {
    return http
        .get("$apiUrl/$index")
        .then((postResponse) => postResponse.body)
        .then(
      (json) {
        print(
          "about to return $json and it's running from ${Isolate.current.debugName}",
        );
        return json;
      },
    );
  }
}

A example of chaining tasks.

import "dart:async";
import "dart:math";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  executorService
      .submitFunction2(_fullName, "Darel", "Bitsy")
      .then(
        (fullName) {
          print("Hi, $fullName");
          return executorService.submitAction(randomPostId);
        },
      )
      .then(
        (postId) => executorService.submit(
          GetPost("https://jsonplaceholder.typicode.com/posts", postId),
        ),
      )
      .then(print);
}

int randomPostId() {
  return Random.secure().nextInt(10);
}

Future<String> _fullName(final String firstName, final String lastName) async {
  await Future.delayed(Duration(milliseconds: 500));
  return "$firstName, $lastName";
}

class GetPost extends Task<String> {
  GetPost(this.apiUrl, this.index);

  final String apiUrl;
  final int index;

  @override
  FutureOr<String> execute() {
    return http.get("$apiUrl/$index").then((postResponse) => postResponse.body);
  }
}

For tasks that emit events (Code that return a stream):

For executing to a top level or static function without arguments that return a stream to the executorservice.

import "dart:async";
import "dart:isolate";
import "dart:math";

import "package:executorservices/executorservices.dart";

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Stream<int> streamOfRandomNumber =
      executorService.subscribeToAction(randomGenerator);

  streamOfRandomNumber.listen(
    (newData) => print(
      "Received $newData and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onError: (error) => print(
      "Received ${error.toString()} and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onDone: () => print(
      "Task's done and it's running on isolate: ${Isolate.current.debugName}",
    ),
  );
}

Stream<int> randomGenerator() async* {
  for (var i = 0; i < Random.secure().nextInt(10); i++) {
    await Future.delayed(Duration(seconds: 1));
    print(
      "about to yield $i and it's running from ${Isolate.current.debugName}",
    );
    yield i;
  }
}

Here are the analogical method for other type of top level, or static functions: subscribeToCallable, subscribeToFunction2, subscribeToFunction3, subscribeToFunction4

If you want to run a instance method of a existing class you need to make it extend the SubscribableTask class. If if you want to have more than five arguments function. If you don't want to have the code as top level or static function.

import "dart:async";
import "dart:isolate";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newSingleExecutor();

  final Stream<String> streamOfPosts = executorService.subscribe(
    GetPosts("https://jsonplaceholder.typicode.com/posts", 10),
  );

  streamOfPosts.listen(
    (newData) => print(
      "Received $newData and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onError: (error) => print(
      "Received ${error.toString()} and it's running on isolate: ${Isolate.current.debugName}",
    ),
    onDone: () => print(
      "Task's done and it's running on isolate: ${Isolate.current.debugName}",
    ),
  );
}

class GetPosts extends SubscribableTask<String> {
  GetPosts(this.apiUrl, this.maxPosts);

  final String apiUrl;
  final int maxPosts;

  @override
  Stream<String> execute() async* {
    for (var index = 0; index < maxPosts; index++) {
      final postJson = await http
          .get("$apiUrl/$index")
          .then((postResponse) => postResponse.body);

      print(
        "about to yield $postJson and "
        "it's running from ${Isolate.current.debugName}",
      );

      yield postJson;
    }
  }
}

A example of chaining tasks of different type together.

import "dart:async";
import "dart:io";

import "package:executorservices/executorservices.dart";
import "package:http/http.dart" as http;

void main() {
  final executorService = ExecutorService.newUnboundExecutor();

  executorService
      .subscribeToAction(randomPostIds)
      .asyncMap((randomPostId) => executorService.submit(GetPost(randomPostId)))
      .listen(
        (newData) => print("Received $newData"),
        onError: (error) => print("Received ${error.toString()}"),
        onDone: () {
          print("Task's done");
          exit(0);
        },
      );
}

Stream<int> randomPostIds() async* {
  for (var index = 0; index < 10; index++) {
    await Future.delayed(Duration(milliseconds: 100));
    yield index;
  }
}

class GetPost extends Task<String> {
  GetPost(this.postId);

  final int postId;

  @override
  Future<String> execute() {
    return http
        .get("https://jsonplaceholder.typicode.com/posts/$postId")
        .then((postResponse) => postResponse.body);
  }
}

By default you can't re-submit the same instance of a ongoing Task to a ExecutorService multiple times. Because the result of your submitted task is associated with the task instance identifier. So by default submitting the same instance of a task multiple times will result in unexpected behaviors.

For example, this won't work:

main() {
  final executors = ExecutorService.newUnboundExecutor();
  
  final task = SameInstanceTask();

  executors.submit(task);
  executors.submit(task);
  executors.submit(task);
  
  for (var index = 0; index < 10; index++) {
    executors.submit(task);
  }
}

class SameInstanceTask extends Task<String> {
  @override
  FutureOr<String> execute() async {
    await Future.delayed(Duration(seconds: 5));
    return "Done executing same instance task";
  }
} 

But if you want to submit the same instance of a task multiple times you need to override the Task is clone method.

For example, this will now work:

main() {
  final executors = ExecutorService.newUnboundExecutor();
  
  final task = SameInstanceTask();

  for (var index = 0; index < 10; index++) {
    executors.submit(task);
  }
  
  final taskWithParams = SameInstanceTaskWithParams("Darel Bitsy");

  for (var index = 0; index < 10; index++) {
    executors.submit(taskWithParams);
  }
}

class SameInstanceTask extends Task<String> {
  @override
  FutureOr<String> execute() async {
    await Future.delayed(Duration(minutes: 5));
    return "Done executing same instance task";
  }
  
  @override
  SameInstanceTask clone() {
    return SameInstanceTask();
  }
}

class SameInstanceTaskWithParams extends Task<String> {
  SameInstanceTaskWithParams(this.name);

  final String name;

  @override
  FutureOr<String> execute() async {
    await Future.delayed(Duration(minutes: 5));
    return "Done executing same instance task with name: $name";
  }
    
  @override
  SameInstanceTaskWithParams clone() {
    return SameInstanceTaskWithParams(name);
  }
}

Features and bugs

Please file feature requests and bugs at the issue tracker.