Поиск  
Always will be ready notify the world about expectations as easy as possible: job change page
Oct 27, 2023

.NET Task Parallel Library vs. System.Threading.Channels

.NET Task Parallel Library vs. System.Threading.Channels
Автор:
Источник:
Просмотров:
6261

A friend reached out recently regarding the usage of Task Parallel Library (TPL) and ConcurrentBag in some .NET code. I inquired what the code was doing and it turns out that for each entry in some workload, it was performing some database operations and API calls.

I immediately wondered if using .NET’s System.Threading.Channels (STC) wouldn’t be a better choice: likely higher throughput and easier to program.

Let’s take a look!

• • •

First Thoughts

Intuitively, the nuance with TPL is that constraining the degree of parallelism means that the maximum number of tasks waiting at any given time is governed by the configured degree of parallelism (more on this in the caveats at the end); the rest of the tasks will be queued. This means that throughput is strongly dependent on properly selecting a MaxDegreeOfParallelism.

For any workload where there’s a network dependency within the parallel execution, this means that there would be significant constraints on how quickly the workload would complete since much of the time is likely spent waiting for network I/O.

Furthermore, when using TPL, it is then necessary to use synchronized access to shared state, which would again increase wait times, especially as the degree of parallelism increases (we won’t explore this angle in this article).

So let’s build a test case.

• • •

Test Design

The test design is simple: we’ll create a workload with 100 items in it, each with a random delay between 10 and 50ms to simulate some I/O:

var workload = Enumerable
  .Range(0, 100)
  .Select(i => (Index: i, Delay: Random.Shared.Next(10, 50)))
  .ToImmutableArray();

A helper method will wrap the execution of each of our test cases to provide some basic instrumentation:

async Task InstrumentedRun(string name, Func<Task> test) {
  var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
  var timer = new Stopwatch();
  timer.Start();
  await test(); // ⭐️ Actual test here.
  timer.Stop();
  Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
  Console.WriteLine($"  ⮑  {threadsAtStart} threads at start");
  Console.WriteLine($"  ⮑  {Process.GetCurrentProcess().Threads.Count} threads at end");
}

Now we can run a few cases and measure the results.

Using a Channel

First up is using a channel:

// Using System.Threading.Channels
await InstrumentedRun("Channel", async () => {
  var channel = Channel.CreateUnbounded<int>();

  async Task Run(ChannelWriter<int> writer, int id, int delay) {
    await Task.Delay(delay); // ⭐️ Simulate work
    await writer.WriteAsync(id);
  }

  async Task Receive(ChannelReader<int> reader) {
    while (await reader.WaitToReadAsync()) {
      if (reader.TryRead(out var id)) {
        // No work here.
      }
    }
  }

  var receiveTask = Receive(channel.Reader);
  var processingTasks = workload
    .AsParallel()
    .Select(e => Run(channel.Writer, e.Index, e.Delay));

  await Task
    .WhenAll(processingTasks)
    .ContinueWith(_ => channel.Writer.Complete());

  await receiveTask;
});

We start by creating a simple channel and the two methods that will hold on to each of the two ends of the channel (writer and reader). The Run method is invoked and returns a set of 100 Tasks which we’ll wait for and upon completion, notify the read end with a call to Complete().

Using Parallel.For @ 4

Next we’ll set up Parallel.For with a MaxDegreeOfParallelism of 4:

// Using Parallel.For with concurrency of 4
await InstrumentedRun("Parallel.For @ 4", () => {
  Parallel.For(0, 100,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    index => {
      Thread.Sleep(workload[index].Delay); // ⭐️ Simulate work
    }
  );

  return Task.CompletedTask;
});

Note that Parallel.For does not support async/await so we simply use Thread.Sleep here to suspend the current thread for the configured delay.

Using Parallel.ForEachAsync @ 4

Next, we’ll use an async version:

// Using Parallel.ForEachAsync with concurrency of 4
await InstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
  await Parallel.ForEachAsync(workload,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    async (item, cancel) => {
      await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
    }
  )
);

Using Parallel.ForEachAsync @ 40

Then we’ll try with the parallelism set to 40:

// Using Parallel.ForEachAsync with concurrency of 40
await InstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
  await Parallel.ForEachAsync(workload,
    new ParallelOptions { MaxDegreeOfParallelism = 40 },
    async (item, cancel) => {
      await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
    }
  )
);

Using Parallel.ForEachAsync with Defaults

And finally, we’ll try with the defaults:

// Using Parallel.ForEachAsync with concurrency unset
await InstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
  await Parallel.ForEachAsync(workload, async (item, cancel) => {
    await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
  })
);

Let’s take a look at the results and analysis:

• • •

Results

Each run will be a bit different since the initialization of the workload is random. Here’s the output from one run:

Results

Is this what you expected?

If not, then you may want to consider whether TPL or STC is the better concurrency paradigm to use if you’re after raw performance (unless you really know how to tune your MaxDegreeOfParallelism).

The STC implementation is almost 12x faster than the TPL implementation at 4 degrees of parallelism. Of course, intuitively, this makes sense: since only 4 tasks can be executing at once with MaxDegreeOfParallelism set to 4, any I/O in the task will cause queuing of the remaining 96 tasks. On the other hand, the STC implementation immediately executes the full workload until each task hits I/O.

Even at 40 degrees of parallelism, the TPL implementation is necessarily slower than the STC implementation. Since we have a total of 100 tasks to process, it means that after the first 40 enter a wait state, the remaining 60 are going to be queued. The STC implementation effectively executes all 100 tasks until they reach a waiting state.

What may be a bit surprising is the thread count: it never rises above 19. As the name “Task Parallel Library” suggests, it is in fact processing tasks using the process thread pool and even if we set the MaxDegreeOfParallelism to 40, the process does not spawn the corresponding number of threads. In other words, it’s a concurrent-parallel hybrid.

Difference between concurrent, parallel, and hybrid
Understanding the difference between concurrent, parallel, and hybrid.

(It’s worth running this code yourself and switching the order of the Channel case to the last one and see how that affects the thread count).

• • •

Caveats and Conclusion

One important caveat here is that the STC approach effectively runs everything. This means that it can be unsuitable for cases where the upstream system (an API or database) cannot handle the volume from a sudden burst of requests or there is a request quota or contention for some resource (e.g. write lock in the DB). In such cases, TPL may be a suitable way to implement a sort of “throttled” concurrent request processing.

Another factor to consider is synchronized access. The beauty of the STC approach is that it effectively serializes the data flow and it is possible to handle the output in the Receive loop as if it were single-threaded (because it is). I think that this simplifies the programming model (perhaps also easier to debug) and writing to synchronized state in the TPL case would probably introduce additional costs, especially as parallelism increases.

A throttled STC implementation is also possible by introducing two serial Channels so if there is some portion of the workload that should be throttled, it is possible to execute the non-throttled part of the workload immediately as fast as possible and then control the flow of the throttled part of the workload.

The conclusion is that if your workload is using Task Parallel Library, it’s important to understand the correct tuning of the MaxDegreeOfParallelism to achieve the target throttling behavior. Not explicitly setting the parallelism seems the worst of both worlds since you’re not explicitly throttling the workload but at the same time, constraining the throughput. In cases where there’s no throttling involved nor desired, using System.Threading.Channels would seemingly be a better choice providing both better performance and perhaps an easier, sans-synchronization programming model.

• • •

Addendum — Full Code Listing:

GitHub Gist

// Generate a set of 100 records, each with a random wait interval.
using System.Collections.Immutable;
using System.Diagnostics;
using System.Threading.Channels;

var log = (object msg) => Console.WriteLine(msg);

var workload = Enumerable
  .Range(0, 100)
  .Select(i => (Index: i, Delay: Random.Shared.Next(10, 50)))
  .ToImmutableArray();

// Using System.Threading.Channels
await InstrumentedRun("Channel", async () => {
  var channel = Channel.CreateUnbounded<int>();
  async Task Run(ChannelWriter<int> writer, int id, int delay) {
    await Task.Delay(delay);
    await writer.WriteAsync(id);
  }
  async Task Receive(ChannelReader<int> reader) {
    while (await reader.WaitToReadAsync()) {
      if (reader.TryRead(out var id)) {
        // No work here.
        //log($"  Completed {id}");
      }
    }
  }
  var receiveTask = Receive(channel.Reader);
  var processingTasks = workload
    .AsParallel()
    .Select(e => Run(channel.Writer, e.Index, e.Delay));
  await Task
    .WhenAll(processingTasks)
    .ContinueWith(_ => channel.Writer.Complete());
  await receiveTask;
});

// Using Parallel.For with concurrency of 4
await InstrumentedRun("Parallel.For @ 4", () => {
  Parallel.For(0, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 }, (index) => {
    Thread.Sleep(workload[index].Delay);
  });
  return Task.CompletedTask;
});

// Using Parallel.ForEachAsync with concurrency of 4
await InstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
  await Parallel.ForEachAsync(workload, new ParallelOptions { MaxDegreeOfParallelism = 4 }, async (item, cancel) => {
    await Task.Delay(item.Delay, cancel);
  })
);

// Using Parallel.ForEachAsync with concurrency of 40
await InstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
  await Parallel.ForEachAsync(workload, new ParallelOptions { MaxDegreeOfParallelism = 40 }, async (item, cancel) => {
    await Task.Delay(item.Delay, cancel);
  })
);

// Using Parallel.ForEachAsync with concurrency unset
await InstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
  await Parallel.ForEachAsync(workload, async (item, cancel) => {
    await Task.Delay(item.Delay, cancel);
  })
);

/*-----------------------------------------------------------
* Supporting functions
---------------------------------------------------------*/
async Task InstrumentedRun(string name, Func<Task> test) {
  var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
  var timer = new Stopwatch();
  timer.Start();
  await test();
  timer.Stop();
  Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
  Console.WriteLine($"  ⮑  {threadsAtStart} threads at start");
  Console.WriteLine($"  ⮑  {Process.GetCurrentProcess().Threads.Count} threads at end");
}

/*
YMMV since each run uses a random workload.
[Channel] = 68ms
  ⮑  8 threads at start
  ⮑  19 threads at end
[Parallel.For @ 4] = 799ms
  ⮑  19 threads at start
  ⮑  19 threads at end
[Parallel.ForEachAsync @ 4] = 754ms
  ⮑  19 threads at start
  ⮑  19 threads at end
[Parallel.ForEachAsync @ 40] = 100ms
  ⮑  19 threads at start
  ⮑  19 threads at end
[Parallel.ForEachAsync (Default)] = 384ms
  ⮑  19 threads at start
  ⮑  19 threads at end
*/
 

 

Похожее
May 14, 2023
Author: Ravi Raghav
What is Kafka?Kafka is a distributed streaming platform developed by the Apache Software Foundation. It is designed to handle high-volume, real-time data streams and is commonly used for building data pipelines, stream processing applications, and real-time analytics.At its core, Kafka...
Jun 5, 2023
Author: Juan Alberto España Garcia
In this section, we’ll explore the world of unit testing in C# and .NET, learn what unit testing is, why it’s important, and the landscape of testing frameworks and tools available to developers.What is Unit Testing?Unit testing is the process...
Apr 19
Author: Rafael Timbó
Explore the benefits and risks of using AI-generated code in software development and when and how to leverage it to stay competitive.Artificial intelligence (AI) has shaped numerous industries over the past few years, from automation and minimizing errors to boosting...
Apr 11
Author: Tepes Alexandru
Say goodbye to the hassle of having to manually set a default filter for every queryEF Core provides a useful feature called Global Query Filters. It enables you to apply a filter to all queries sent to the database. Two...
Написать сообщение
Почта
Имя
*Сообщение


© 1999–2024 WebDynamics
1980–... Sergey Drozdov
Area of interests: .NET Framework | .NET Core | C# | ASP.NET | Windows Forms | WPF | HTML5 | CSS3 | jQuery | AJAX | Angular | React | MS SQL Server | Transact-SQL | ADO.NET | Entity Framework | IIS | OOP | OOA | OOD | WCF | WPF | MSMQ | MVC | MVP | MVVM | Design Patterns | Enterprise Architecture | Scrum | Kanban