amibar/SmartThreadPool

New dotnet libraries and mechanisms

Opened this issue · 27 comments

Hello Ami,
A chance to thank you for your great job!
I'm using STP a lot and this is defiantly my favorite library!

Do you consider for future releases some of the new dotnet libraries and mechanisms?

  • async/await?
  • ValueTasks?
  • System.Threading.Channels as a Base queue?

Thanks!

Hi Yoav,

Can you please give examples of how you wish to use these features with STP?

Regards,
Ami

Hi Ami,
Just thought maybe it will be a nice improvement to replace some of the blocking code and having some async methods (such as 'await enqueue' or 'await WaitForIdle')

The idea to combine it with ValueTask is just for having a better allocation than just using a regular Task.

Regarding the 'System.Threading.Channels', one thing I like is the 'bounded queue' backpressure mechanism that automatically waits the queue will have a room for additional items when the queue is full (instead of throwing a 'queue is full' exception).

Yoav

Hi @yoav-melamed,

By async/await I thought you meant that when you enqueue an async method it will continue to work on the STP.

For example let's take this async method:

public async Task DoIt()
{ 
   // Do first thing
   await Task.Delay(10);
   // Do second thing
}

Today if you enqueue it into to the STP, the work item will complete after "first thing" is done, and "second thing" will be done on the .net Thread Pool (out of the STP threads).

With an async/await feature the STP can do the second thing on the STP and only then will the work item complete.

Is this a required feature? Or you just need the equeue and WaitForIdle to be awaitable ?

Ami

Hi @amibar
Sorry for the super-late response - I missed your comment :/
Yes, having an awaitable task in the queue will defiantly be a big step for STP
It will allow each thread to be more efficient and will defiantly help with writing an async code

In addition, since the WaitForIdle is an expensive blocking method, having it async would be great.

Oh yeah, async/await feature so that you can do QueueWorkItem(async () => await xxx())

Hi @yoav-melamed & @MichelZ,

I started to work on the async/await a while ago and I am still figuring out how to solve some scenarios.
However, why do you want the async/await feature instead of just using it directly ?
I just want to know that my work will be useful for you.

Regards,
Ami

Hi @amibar
The reasons from my end is to make every thread of STP the most efficient it could be:

  • Having an awaitable task so each thread of the STP could do several async operations
  • Having awaitable 'WaitForIdle' so while the main thread waits for the STP to complete, we can render the UI/do other things (this operation could take some time and in the meanwhile the tread could do other stuff while waiting for the STP to complete)
  • Having the STP to stop or cancel in an async way (this operation could take some time and in the meanwhile the tread could do other stuff while waiting for the STP to complete)

That is what I had in mind
Thank you for your hard work!

Hi @yoav-melamed,

Thanks.

The threads of the STP suppose to be worker threads, hence they should use the CPU.
When your work item on the STP awaits, it just delegate the work to another arbitrary thread which the STP has no control on.

One of the ideas behind STP is to limit concurrency.
If, for example, you limited the concurrency to 1 (max threads) and send 10 work items that each one awaits for a file to download. You'll end up with 10 concurrent downloads that executes on the threads of the .net ThreadPool, while the STP thread is idle.

As for WaitForIdleAsync() and GetResultAsync(), it's quite reasonable tom implement.

What do you think?

Regards,
Ami

I often use it to limit concurrency, yes.
But I also need to use other api's in the worker threads sometimes - and they happen to be more and more async. So it's more a matter of convenience (.GetAwaiter(false).GetResult() anyone?)

Did you mean .ConfigureAwait(false) ?

Yeah, that one :)

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Both. We use STP in a few projects

I have dilemas regarding async methods and the WorkItemGroup.

  1. Lets say you enqueue 2 async work items into a WorkItemGroup with concurrency of 1. The 1st starts executing and then awaits, so the thread is released. Should the 2nd work item starts executing or should it wait until the 1st work item completes?
    (by "complete" I mean the async method returns rather than awaits)

  2. Another case is when several work items are enqued to a WorkItemGroup and after a while they are all in awaiting state. Does it mean that the WorkItemGroup is idle ? Or only when all these work items complete?

Hi Ami,
For you question

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Same as @MichelZ , I'm using STP in several projects and it depends on the project I'm working on. Mainly I'm using the STP directly, but in some project, I need to have 4 workers that collect data each from a different source, and add it to a centralized queue to be processed (and each worker have a different priority) so - I'm using the WIG there

When your work item on the STP awaits, it just delegate the work to another arbitrary thread which the STP has no control on

I thought a lot about what you said, and you are right - executing an async task will make the dotnet ThreadPool control the thread life cycle, which is the opposite of what we want.
For calling async API's I think it's better to use the old fashion way (and NOT using ConfigureAwait(false) because we do want the results to returned to the caller thread [which is the STP workitem thread]).

A Sample I wrote to demonstrate what I mean:

using System;
using System.Threading.Tasks;
using Amib.Threading;

const int NUM_OF_ITEM = 10;

SmartThreadPool stp = new();
stp.Concurrency = 1;

for (int i = 1; i <= NUM_OF_ITEM; i++)
    stp.QueueWorkItem(Executor, i);

stp.WaitForIdle(); // would like to have `await stp.WaitForIdleAsync()` here

Console.WriteLine("All Done.");

void Executor(int ItemNumber)
{
    Console.WriteLine($"Executing item number {ItemNumber}...");

    // executing some async API
    var task = Task<int>.Run(async () =>
    {
        await Task.Delay(1000);
        return new Random().Next(1, 1000);
    });
    var results = task.Result;

    Console.WriteLine($"Item {ItemNumber} has done with results of {results}.");
}

Hi @yoav-melamed,

I looked at your sample and I realized that using the QueueWorkItem for a Task is not the best solution.

When this project started (.net 1.1) the Task type didn’t exist yet, so I had to wrap delegates with objects (WorkItem) and return promises (IWorkItemResult). A Task is a promise, so I can harness its methods, instead of treating it like a delegate.

So for a Task I prefer using RunTask, similar to Task.Run(), rewriting your sample it should look like this:

using System;
using System.Threading.Tasks;
using Amib.Threading;

const int NUM_OF_ITEM = 10;

SmartThreadPool stp = new SmartThreadPool();
stp.Concurrency = 1;

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    _ = stp.RunTask(() => Executor(i));
}

// The tasks are converted to work items internally, so you can wait for idle on the stp as usual
await stp.WaitForIdleAsync(); 

Console.WriteLine("All Done.");

// Executor is an async method
async Task<int> Executor(int ItemNumber)
{
    Console.WriteLine($"Executing item number {ItemNumber}...");

    await Task.Delay(1000);

    var results = new Random().Next(1, 1000);

    Console.WriteLine($"Item {ItemNumber} has done with results of {results}.");

    return results;
}

The RunTask also supports cancelletion token and priority.

I can still implement the QueueWorkItem for tasks, but I don't think it necessary.

What’s your opinion?

I pushed a branch named async with what I did so far.

Ami

Hi @amibar
That looks like a great idea! I think that's exactly what @MichelZ and myself are looking for.
I'm going to pull the async branch and take it on a tour - this will defiantly help us with all related to async operations.

Does the RunTask() method will execute a task? or queue it on the stp ThreadPool (like the .net Task.Run() is doing behind the scenes?

If so, would you be able to queue tasks also on WokItemGroups? or for now - only on the stp itself?

Hi @amibar
After pulling the async branch and trying it myself, there is an issue I found with how the code is running:
The stp.Concurrency = 1; has no effect on the current code. All the items fired simultaneously and not 1 after the other.
Changing it to stp.Concurrency = 5; (for example) has no effect (compared to the original code) and if I'm getting it right - the reason is we are not actually queueing the items, but just firing tasks. Am I right?
Did you notice this behavior on your side as well?

if I'm getting it right - the reason is we are not actually queueing the items, but just firing tasks. Am I right?
Did you notice this behavior on your side as well?

Ignore this comment... I saw the code:

if (cancellationToken?.IsCancellationRequested ?? false)
    return Task.FromCanceled(cancellationToken.Value);

PreQueueWorkItem();
WorkItem workItem = WorkItemFactory.CreateWorkItem(
    this,
    WIGStartInfo,
    _ =>
    {
         action();
         return null;
    },
    priority);
Enqueue(workItem);

var wir = workItem.GetWorkItemResult();
cancellationToken?.Register(() => wir.Cancel());

return wir.GetResultAsync();

You queue the task, convert it to WorkItem, and return a Promise (as you explain in your previous comment).
I am still trying to investigate why it's not working as expected and all the tasks runs in parallel while ignoring the concurrency parameter

Hi @yoav-melamed,

The concurrency is of the threads not the tasks.
When a work item awaits, it releases the thread to another work item.

Ami

Hi @amibar
Sorry, maybe I got confused, so what are the use cases you think RunTask will resolve? Because as for now I thought it will be executed on the queue as a work item but in awaitable way - probably my misunderstanding.

Hi @yoav-melamed,

I pushed anothee commit to the async branch.

This time a WorkItemsGroup cannot have more than the Concurrency work items executing.

Please check if it works for you.

Thanks,
Ami

Hi @amibar
Thanks for your feedback!
I tried to re-run the sample above with the updated async branch, but the results were the same:
When the stp.Concurrency = 1 all the tasks were executed simultaneously and returned the results after 1 second (the delay time in the Executor).

@yoav-melamed, check the WorkItemsGroup, I didn't update the STP itself.

Hi @amibar
I made the following changes in my code:

//stp.Concurrency = 1;
var wig = stp.CreateWorkItemsGroup(concurrency: 5);

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    //_ = stp.RunTask(() => Executor(i));

    _ = wig.RunTask(() => Executor(i));
}

Now the execution is running as expected (if concurrency is 1, the items are being executed one after the other, if it's 5 (for example) so only 5 in parallel are invoked).

The issue I noticed now is that since the RunTak returns a Task that we are not await to, all the items passed to the Executor() function have the value of 11, and it makes the Executor run with the wrong values. We cannot await to RunTask because it's executing it and not adding it to the queue. Is there any way you can think of to bypass it and make execute with the correct values?

@yoav-melamed,

Check https://stackoverflow.com/questions/271440/captured-variable-in-a-loop-in-c-sharp to solve this.

You should copy i to another local variable before passing it to the Executor
Something like:

//stp.Concurrency = 1;
var wig = stp.CreateWorkItemsGroup(concurrency: 5);

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    //_ = stp.RunTask(() => Executor(i));
   var i2 = i;
    _ = wig.RunTask(() => Executor(i2));
}


Hi @amibar
That's working great now - thank you!
Is there a more elegant way to handle the 'copy to another variable' functionality? or that's just something we will have to do for it to work?