linkedin/parseq

Issue in parallel execution

Anmol-Singh-Jaggi opened this issue · 8 comments

I was trying to concurrently update a hashmap using parseq Task.par(...), expecting that that due to race conditions, I would see an anomaly.
However, I ran the parallel task multiple times and no race condition ever happened.
Also, parseq ran considerably (20x) slower than ExecutorService.
This seems to suggest that either parallel tasks are not actually happening in parallel, or I am using the parseq API incorrectly.

Consider the following comparison between using Parseq Task.par() versus using raw ExecutorService:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.ParTask;
import com.linkedin.parseq.Task;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestingParseqParallelTasks {
  private final static int expectedCount = 5000000;

  private static ExecutorService getExecutorService(){
    int numCores = Runtime.getRuntime().availableProcessors(); // 16
    return Executors.newFixedThreadPool(numCores + 1);
  }

  private static Engine getEngine() {
    final ExecutorService taskScheduler = getExecutorService();
    final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();
    return new EngineBuilder().setTaskExecutor(taskScheduler).setTimerScheduler(timerScheduler).build();
  }

  private static ParTask<Void> parallelUpdateTask(Map<String, Integer> map, int count) throws InterruptedException {
    List<Task<Void>> tasks = new ArrayList<>();
    for (int i = 0; i < count; i++) {
      tasks.add(Task.action(() -> map.put("abcd", map.get("abcd") + 1)));
    }
    return Task.par(tasks);
  }

  public static int testParseq() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    map.put("abcd", 0);
    Engine engine = getEngine();
    Task<List<Void>> t1 = parallelUpdateTask(map, expectedCount);
    engine.run(t1);
    t1.await();
    return map.get("abcd");
  }

  public static int testExecutorService() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    map.put("abcd", 0);
    ExecutorService executor = getExecutorService();
    for (int i = 0; i < expectedCount; i++) 
    {
        executor.execute(() -> map.put("abcd", map.get("abcd") + 1));
    }
    executor.shutdown();
    boolean done = executor.awaitTermination(60, TimeUnit.SECONDS);
    if(!done){
      throw new Exception("Taking too much time!");
    }
    return map.get("abcd");
  }

  public static void main(String[] args) throws Exception {
    long before, after;
    before = System.nanoTime();
    final int observedCountFromExecutor = testExecutorService();
    after = System.nanoTime();
    System.out.println(String.format("Executor returned in %s nanoseconds and returned %s", (after-before), observedCountFromExecutor));
    // This runs way faster and is always less than expectedCount

    before = System.nanoTime();
    final int observedCountFromParseq = testParseq();
    after = System.nanoTime();
    System.out.println(String.format("Parseq returned in %s nanoseconds and returned %s", (after-before), observedCountFromParseq));
    // This runs considerably slower and is always equal to expectedCount

    System.exit(0);
    
    /*
    Output:
        Executor returned in 1616721786 nanoseconds and returned 4980309
        Parseq returned in   21630724933 nanoseconds and returned 5000000
    */
  }
}

Another more basic example demonstrating the problem:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class TestingParseqParallelTasksBasic {
  private static Engine getEngine() {
    int numCores = Runtime.getRuntime().availableProcessors(); // 16
    final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
    final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();
    return new EngineBuilder().setTaskExecutor(taskScheduler).setTimerScheduler(timerScheduler).build();
  }

  public static void main(String[] args) throws Exception {
    Task<Void> task1 = Task.action(() -> {
      System.out.println("task1");
      Thread.sleep(10000);
    });

    Task<Void> task2 = Task.action(() -> {
      System.out.println("task2");
      Thread.sleep(10000);
    });

    Engine engine = getEngine();
    engine.run(Task.par(task1, task2));

    /*
    Output:
       task1
       <Nothing for 10 seconds>
       task2
    */
  }
}

I was expecting both of them to start and end almost together.

Another example with chained tasks demonstrating that one chain is completely executed before starting the other one:

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class TestingParseqParallelTasksBasic {
  private static Engine getEngine() {
    int numCores = Runtime.getRuntime().availableProcessors(); // 16
    final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
    final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();
    return new EngineBuilder().setTaskExecutor(taskScheduler).setTimerScheduler(timerScheduler).build();
  }

  public static void main(String[] args) throws Exception {
    Task<Void> task1 = Task.action(() -> System.out.println("task0"));
    for (int i = 0; i < 9; i++) {
      final int j = i + 1;
      task1 = task1.map((Void) -> {
        Thread.sleep(500);
        System.out.println("task" + String.valueOf(j));
        return null;
      });
    }

    Task<Void> task2 = Task.action(() -> System.out.println("task100"));
    for (int i = 0; i < 9; i++) {
      final int j = i + 101;
      task2 = task2.map((Void) -> {
        Thread.sleep(500);
        System.out.println("task" + String.valueOf(j));
        return null;
      });
    }

    Engine engine = getEngine();
    engine.run(Task.par(task1, task2));
  }
}

/* OUTPUT:

task0
task100
task1
task2
task3
task4
task5
task6
task7
task8
task9
task101
task102
task103
task104
task105
task106
task107
task108
task109

*/

If it was truly parallel, we should have seen interleaved outputs alternating between the 2 chains.

UPDATE:
One of my team members discovered that by using FIFOPriorityQueue and replacing map with flatMap makes it run concurrently.
However, all the tasks are being run in the same thread.
They dont seem to run truly in parallel in multiple threads.
Can someone please confirm if its the expected behaviour?
Also, whats the significance of using FIFOPriorityQueue?

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.internal.FIFOPriorityQueue;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class TestingParseqParallelTasksBasic {
  private static Engine getEngine() {
    int numCores = Runtime.getRuntime().availableProcessors(); // 16
    final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
    final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();
    return new EngineBuilder().setTaskExecutor(taskScheduler).setTimerScheduler(timerScheduler)
        .setTaskQueueFactory(FIFOPriorityQueue::new).build();
  }

  public static void main(String[] args) throws Exception {
    Task<Void> task1 = Task.action(() -> System.out.println("task0"));
    for (int i = 0; i < 9; i++) {
      final int j = i + 1;
      task1 = task1.flatMap(x -> Task.action(() -> {
        Thread.sleep(100);
        System.out.println("task" + String.valueOf(j));
      }));
    }

    Task<Void> task2 = Task.action(() -> System.out.println("task100"));
    for (int i = 0; i < 9; i++) {
      final int j = i + 101;
      task2 = task2.flatMap(x -> Task.action(() -> {
        Thread.sleep(100);
        System.out.println("task" + String.valueOf(j));
      }));
    }

    Engine engine = getEngine();
    engine.run(Task.par(task1, task2));
  }
}

/* OUTPUT:
task0
task100
task1 Thread[pool-1-thread-1,5,main]
task101 Thread[pool-1-thread-1,5,main]
task2 Thread[pool-1-thread-1,5,main]
task102 Thread[pool-1-thread-1,5,main]
task3 Thread[pool-1-thread-1,5,main]
task103 Thread[pool-1-thread-1,5,main]
task4 Thread[pool-1-thread-1,5,main]
task104 Thread[pool-1-thread-1,5,main]
task5 Thread[pool-1-thread-1,5,main]
task105 Thread[pool-1-thread-1,5,main]
task6 Thread[pool-1-thread-1,5,main]
task106 Thread[pool-1-thread-1,5,main]
task7 Thread[pool-1-thread-1,5,main]
task107 Thread[pool-1-thread-1,5,main]
task8 Thread[pool-1-thread-1,5,main]
task108 Thread[pool-1-thread-1,5,main]
task9 Thread[pool-1-thread-1,5,main]
task109 Thread[pool-1-thread-1,5,main]
*/

Thanks for your interest in parseq. The question you raised is related to ParSeq execution model and how it makes async coding easier for developer. One important feature in parseq is its serialized execution for non-blocking computation. This presentation I made in Saturn conference may give you some hints on this: https://www.youtube.com/watch?v=BkJyXtyLL3Y&t=4s (around 24-25 mins). If you look at code https://github.com/linkedin/parseq/blob/master/subprojects/parseq/src/main/java/com/linkedin/parseq/internal/SerialExecutor.java, you will see that they are running in sequential to guarantee "happens before" semantics.

Hello Anmo,

We are working on execution wiki and I think after that is published, hopely by that time you will get a better idea.

To answer your specific question regarding the thread executions, let me have a try to explain:
map and flatmap didn't cause the difference. Becuase your task1 and task2 anyway are 10-folds nested. You can think 10 tasks cascadely embeded into 1 task.

Task<Task<Task.... 10 .... times<Task<Void>>....> task1
Task<Task<Task.... 10 .... times<Task<Void>>....> task2

Now if you run Task.par(task1, task2), you put them into one single task plan. One rule to remember is that there is no parallelism within one single task plan, you don't get parallelism but supposely you get concurrency. So

(1) If you run task1, and task2 in same task plan with default taskqueue by using Task.par(task1, task2), you will get concurrency for these two tasks. Here the 10-times fold task is the minimum unit of concurrency, so you will see either task1 before task2, or task2 before task 1. I agree that it is arguable whether sequantial execution of task1 task2 are truly "concurrent" because there are just two tasks, but think about any two cpu-intensive task running on a single processor, whether they are interleaved really doesn't matter in terms of optimization and we think not interleaving will give a better performance because the better use of the cache.

Here you might wondeirng why I say the minimum unit of concurrency is that 10-times fold task. The answer are also two folds,

  • (i) The task you created as not async, i.e. they are not promise/future based, so that the 20 tasks are treated same by the scheduler regarding to their supposed running time.
  • (ii) the second reason lies in the LIFOBiPriorityQueue, which is the default task queue implementation if you don't specify that FIFOPriorityQueue. The queue is used here for best temporal cache locality and potential spatial cache locality. So that's why the 10 task associated with task1 are treated as a single trunk and are finished together, similar to task 2.

(2) So with no other change, if you wan to run them in real parallelism, meaning you want to utilize more processors (or thread context, if you processor is using HyperThreading) you should submit two task plans to the engine:

ArrayList<Task<Void>> taskList_1 = new ArrayList<Task<Void>>();
taskList_1.add(task1);

ArrayList<Task<Void>> taskList_2 = new ArrayList<Task<Void>>();
taskList_2.add(task2);
engine.run(Task.par(taskList_1));
engine.run(Task.par(taskList_2));

This will be run on two threads in parralel, and from the point of view of task1 or task2 themselves, their 10 tasks are run sequantially. This should be running 50% of the time that Task.par(Task1, Task2) took.

(3) Or alternatively, if we don't submit two plans, we can still achieve finer-grained concurrency by trying to interleaving these CPU-intensive tasks (but with no parallelism, so this won't be faster, just give an impression of concurrency). You already posted the answer: If you use FIFOPriorityQueue, the minimum unit of concurrency became a single task, and here are 20 tasks. So in Task.par(task1, task2) you would have 20 tasks added to the plan and the scheduler will run them, so that task1, and task2 are truly run in concurrency. You can think the actual sequence of adding those 20 tasks into the FIFOPriorityQueue like a level-order traversal of a tree.

  Par2Task
   /   \
Task1  Task2
 |        |
Task1  Task2
...     ...
...     ...
...     ...
 |        |
Task1  Task2

Summary:
So as I see it, your confusion came from the fact that the task you run is actually cpu-intensive (instead of IO-intensive), and you seems to be think they should run in concurrent - like interleaved.

To begin with, there is really little point for optimizing cpu-intensive tasks using concurrency. Async and concurrency is meant to optimize IO reads or other async tasks that have detectable and avoidable busy-waitings. In your example, you let the thread sleep for 500ms, however the thread have to run for that 500ms, that is different than an IO takes 500ms to have data buffered in. The waiting for IO is detectable and avoidable because kernel provided good support for providing efficient IO, such as Linux's Epoll, Mac's Kequeue or Windows's IOCP.

In terms of scheduling, parseq's execution model is actually similar to Netty's multi-threaded event loop, or the Go runtime scheduler(minus the communication channel part). I personally recommendation is to remember these two rules:
(1) For a single task plan, its tasks could be in different threads, but during any period of real time, only one task will be run on only one thread (thus one single core). This is somewhat similar to Nodejs/Python/Redis.
(2) But multiple task plans can be run in parallel, which made Parseq more efficient than other single-treaded event loops(NodeJs, Python, Redis).

Thanks for the reply @junchuanwang .
But I didn't get how Thread.sleep() is making it less optimal.
If instead of sleeping, we were reading from a big file, how would it be any different?

@Anmol-Singh-Jaggi

Hi Anmol,
It is not the best analogy because you can read a file synchronously or asynchronously.

if you do it synchronously, it is just like doing some blocking operations, in this case, it is same asThread.sleep and in ParSeq, you have a chance to do it using Task.blocking() so it can be made asynchronously.

If you directly use asynchronous library to read a file, you can check with this section to integrate with parseq task. In this case, it is made asynchronous and will be optimal

For Thread.sleep(), (if you don't use Task.blocking() to run it in ParSeq), it will actually considered a synchronous operation and will block the task engine from picking up other tasks.

Thanks for the reply @junchuanwang
Closing the issue for now.