Concurrent
Do many things at the same time.
Several forms of concurrent programming
- Multithreading
- parallel processing
- Asynchronous programming (modern program)
- Responsive programming
Mu lt ithreading < parallel processing >
Multithreading
- A form of concurrency that uses multiple threads to execute programs.
parallel processing
- Divide a large number of tasks being executed into small pieces and allocate them to multiple threads running at the same time.
Asynchronous programming
- A form of concurrency that uses future mode or callback mechanism to avoid unnecessary threads.
Responsive programming
- A declarative programming mode in which a program responds to events.
The difference between asynchronous programming and corresponding programming
Asynchronous programming means that the program starts an operation that will be completed after a period of time. Responsive programming is very similar to asynchronous programming, but it is based on asynchronous event s rather than asynchronous operation s. Asynchronous events can occur at any time without an actual "start", and can occur multiple times, such as user input.
Asynchronous programming overview
Two benefits of asynchronous programming
The first benefit is that for end-user oriented GUI programs, asynchronous programming improves responsiveness. We have all encountered programs that temporarily lock the interface at run time. Asynchronous programming can still respond to user input when executing tasks.
The second advantage is that for server-side applications, asynchronous programming achieves scalability. Server applications can use thread pools to meet their scalability. After asynchronous programming, the scalability can usually be improved by an order of magnitude.
Overview of parallel programming
if there are a large number of computing tasks in the program, and these tasks can be divided into several independent task blocks, parallel programming should be used. Parallel programming can temporarily improve CPU utilization to improve throughput. This method is very useful if the CPU in the client system is often idle, but it is usually not suitable for the server system. Most servers themselves have parallel processing capabilities
Data parallelism focuses on processing data, while task parallelism focuses on executing tasks
-
There are two forms of parallelism: data parallelism and task parallelism
-
Data parallelism means that a large amount of data needs to be processed, and the processing process of each piece of data is basically independent of each other.
-
Task parallelism means that a large number of tasks need to be executed, and the execution process of each task is basically independent of each other. (task parallelism can be dynamic. If the execution result of a task will produce additional tasks, these new tasks can also be added to the task pool.)
-
There are several different approaches to data parallelism:
- Use parallel Foreach method, which is similar to foreach loop, should be used whenever possible
- The Parallel class also provides Parallel For method, which is similar to the for loop. This method can be used when the data processing process is based on an index
- Use PLINQ (Parallel LINQ), which provides AsParallel extensions for LINQ queries. Compared with PLINQ, parallel is more resource friendly. Parallel works well with other processes in the system, and PLINQ will try to let all CPU s execute this process. The disadvantage of parallel is that it is too obvious. In many cases, PLINQ's code is more beautiful.
// Parallel.ForEach example void RotateMatrices(IEnumerable<Matrix> matrices, float degrees) { Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees)); } // PLINQ example IEnumerable<bool> PrimalityTest(IEnumerable<int> values) { return values.AsParallel().Select(val => IsPrime(val)); }
-
There is a very important rule in parallel processing: each task block should be independent of each other as much as possible
-
Overview of responsive programming
compared with other forms of concurrent programming, responsive programming is more difficult to learn. If you are not very familiar with responsive programming, code maintenance will be relatively more difficult. Once you learn it, you will find that responsive programming is particularly powerful. Responsive programming can handle the flow of events just as it handles the flow of data. As a rule of thumb, if you have parameters in an event, it's best to use responsive programming instead of regular event handlers
Responsive programming is based on the concept of "observable stream". Once you apply for an observable stream, you can receive any number of data items (OnNext), and the stream will send an error (OnError) or a "stream ended" Notification (OnCompleted) * *. Some observable flows do not end.
// Actual interfaces Microsoft's Reactive Extensions (Rx) library has implemented all interfaces interface IObserver<in T> { void OnNext(T item); void OnCompleted(); void OnError(Exception error); } interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }
- The final code of responsive programming is very similar to LINQ, which can be regarded as "LINQ to events"
// Example ① interval timespan Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x));
Now just remember that this is a LINQ query, which is very similar to the LINQ query you have seen before. The main difference is that LINQ to Object and LINQ to Entity use the "pull" mode, and LINQ enumeration pulls out data through query. LINQ to event (Rx) uses the "push" mode. After the event arrives, it passes through the query by itself.
- The definition of an observable flow and its subscription are independent of each other
// Equivalent to example ① IObservable<DateTimeOffset> timestamps = Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp); timestamps.Subscribe(x => Trace.WriteLine(x));
- A common practice is to define an observable flow as a type and then use it as an iobserveable < T > resource. Other types can subscribe to these flows or combine them with other operators to create another observable flow.
// Parameters with handling errors Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x), ex => Trace.WriteLine(ex));
Data flow overview
- TPL data flow is very interesting. It combines asynchronous programming and parallel programming. TPL data flow is useful if you need to do a series of processing on the data
- TPL data flow is usually used as a simple pipeline. Data enters from one end of the pipeline, passes through the pipeline, and finally comes out from the other end. However, the function of TPL data flow is better than PU
The pipeline is much stronger - For dealing with various types of mesh es and defining fork s, join s and loop s in the grid, TPL data flow can be handled correctly. Of course, most of the time, the TPL data flow grid is still used as a pipeline
- The basic unit of data flow grid is data flow block. The data flow block can be a target block (receiving data) or a source block (generating data), or both. The source block can be connected to the target block to create a mesh
- The data flow block is semi independent. When the data arrives, the data flow block will try to process the data and push the processing results to the next process
- The normal way to use TPL data flow is to create all blocks, link them, and then start filling in data at one end. Then, the data will come out from the other end. Again, the function of data flow is much more powerful than this. While the data passes through, it may disconnect, create new blocks and add them to the grid, but this is a very advanced use scenario
By default, a block error does not destroy the entire mesh. This gives the program the ability to rebuild part of the grid or redirect the data. However, this is an advanced usage. Generally speaking, you want these errors to be passed to the target block through links. Data flow also provides this option. The only difficulty is that when an exception is passed through a link, it will be encapsulated in the AggregateException class. Therefore, if the pipeline is very long and the nesting level of the last exception will be very many, you can use AggregateException Flatten method:
try { var multiplyBlock = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => item - 2); multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true }); multiplyBlock.Post(1); subtractBlock.Completion.Wait(); } catch (AggregateException exception) { AggregateException ex = exception.Flatten(); Trace.WriteLine(ex.InnerException); }
- The first impression of data flow grid is that it is very similar to observable flow. In fact, they do have a lot in common
Overview of multithreaded programming
- Thread is an independent running unit. There are multiple threads in each process, and each thread can execute instructions at the same time
- Each thread has its own independent stack, but shares memory with other threads in the process
- For some programs, one thread is special. For example, the user interface program has a UI thread and the console program has a main thread
- Thread is a low-level abstraction, and thread pool is a slightly higher-level abstraction. When the code segment runs according to the rules of thread pool, the thread pool will create threads when necessary
- Processing queues for parallel and data streams follow the thread pool as appropriate. Higher levels of abstraction make it easier to write correct code
Collection of concurrent programming
- There are two kinds of sets used in concurrent programming: concurrent sets and immutable sets
- Multiple threads can update concurrent collections simultaneously in a safe manner
- Most concurrent collections use snapshot s. When one thread is adding or deleting data, another thread can enumerate data
- Using concurrent collections is much more efficient than locking regular collections to protect data
- Immutable sets are somewhat different. Immutable sets are virtually immutable
To modify an immutable set, you need to create a new set to represent the modified set. This seems very inefficient, but the efficiency is not as bad as expected because the various instances of the immutable set share as much storage as possible. One of the advantages of immutable sets is that all operations are concise, so it is especially suitable for functional code.
Modern design
Most concurrent programming techniques have one thing in common: they are functional in nature. The meaning of "functional" here is not "practical and able to complete tasks", but as a programming mode based on function combination. If you accept the concept of functional programming, the design of concurrent programming will be much simpler
- One of the principles of functional programming is simplicity (in other words, avoiding side effects)
- Another principle of functional programming is invariance. Invariance means that a piece of data cannot be modified
One of the reasons for using immutable data in concurrent programming is that the program never needs to synchronize immutable data. The fact that data cannot be modified makes synchronization unnecessary. Immutable data can also avoid side effects
- Rx in NuGet package RX main
- The official version of TPL is available in NuGet package Microsoft Tpl. In dataflow
- Immutable collection in NuGet package Microsoft Bcl. Immutable
Fundamentals of asynchronous programming (case)
Pause for a period of time
- The program needs to wait (asynchronously) for a period of time. This is useful when doing unit tests or implementing retry delays. This solution can also be used to implement simple timeout
static async Task<T> DelayResult<T>(T result, TimeSpan delay) { await Task.Delay(delay); return result; }
- A simple exponential backoff is realized. Exponential backoff is a retry strategy, and the retry delay time will increase step by step. When accessing Web services, the best way is to use exponential backoff, which can prevent the server from being blocked by too many retries.
static async Task<string> DownloadStringWithRetries(string uri) { using (var client = new HttpClient()) { // Wait 1 second before the first retry, 2 seconds for the second retry, and 4 seconds for the third retry. var nextDelay = TimeSpan.FromSeconds(1); for (int i = 0; i != 3; ++i) { try { return await client.GetStringAsync(uri); } catch {} await Task.Delay(nextDelay); nextDelay = nextDelay + nextDelay; } // Try again for the last time to let the caller know the error message. return await client.GetStringAsync(uri); } }
- Use task Delay implements a simple timeout function
// If the service does not respond within 3 seconds, null is returned static async Task<string> DownloadStringWithTimeout(string uri) { using (var client = new HttpClient()) { var downloadTask = client.GetStringAsync(uri); var timeoutTask = Task.Delay(3000); var completedTask = await Task.WhenAny(downloadTask, timeoutTask); if (completedTask == timeoutTask) return null; return await downloadTask; } }
Task.Delay is suitable for unit testing asynchronous code or implementing retry logic. To implement the timeout function, it is best to use CancellationToken
Return to completed tasks
- Implement a synchronous method with asynchronous signature
interface IMyAsyncInterface { Task<int> GetValueAsync(); } class MySynchronousImplementation : IMyAsyncInterface { public Task<int> GetValueAsync() { return Task.FromResult(13); } } // If you use Microsoft Bcl. Async and FromResult methods are in the TaskEx class
Report progress
- During the execution of an asynchronous operation, you need to show the progress of the operation
// Use iprogress < T > and progress < T > types. The async method written needs to have iprogress < T > parameter, where t is the progress type to be reported: static async Task MyMethodAsync(IProgress<double> progress = null) { double percentComplete = 0; while (!done) { ... if (progress != null) progress.Report(percentComplete); } } // Code to call the above method: static async Task CallMyMethodAsync() { var progress = new Progress<double>(); progress.ProgressChanged += (sender, args) => { ... }; await MyMethodAsync(progress); } // It should be noted that iprogress < T > The report method can be asynchronous. This means that the MyMethodAsync method will continue to run until the progress is actually reported. For this reason, it is best to define t as an immutable type, or at least a value type. If t is a variable reference type, you must call iprogress < T > every time Report, create a separate copy
Wait for a set of tasks to complete
- Perform several tasks and wait for them to be completed
// Solution 1 Task task1 = Task.FromResult(3); Task task2 = Task.FromResult(5); Task task3 = Task.FromResult(7); int[] results = await Task.WhenAll(task1, task2, task3); // "results" contains {3, 5, 7} // Solution 2 // Task.WhenAll method has an overload with IEnumerable type as parameter, but it is recommended not to use it. As long as asynchronous code is combined with LINQ, the explicit "materialized" sequence (that is, evaluating the sequence and creating a set) will make the code clearer: static async Task<string> DownloadAllAsync(IEnumerable<string> urls) { var httpClient = new HttpClient(); // Define the usage of each url. var downloads = urls.Select(url => httpClient.GetStringAsync(url)); // Note that here, the sequence has not been evaluated, so all tasks have not really started. // Next, all URL download synchronization starts. Task<string>[] downloadTasks = downloads.ToArray(); // Here, all the tasks have begun. // Wait asynchronously for all downloads to complete. string[] htmlPages = await Task.WhenAll(downloadTasks); return string.Concat(htmlPages); } // If you use Microsoft Bcl. Async is a NuGet library, WhenAll is a member of the TaskEx class, not the Task class
if a Task throws an exception, the Task Whenall will make an error and put the exception in the returned Task. If multiple tasks throw exceptions, these exceptions will be placed in the returned Task. However, if the Task is called by await, only one exception will be thrown. If you want to get each exception, you can check the Task Exception attribute of Task returned by whenall:
static async Task ThrowNotImplementedExceptionAsync() { throw new NotImplementedException(); } static async Task ThrowInvalidOperationExceptionAsync() { throw new InvalidOperationException(); } static async Task ObserveOneExceptionAsync() { var task1 = ThrowNotImplementedExceptionAsync(); var task2 = ThrowInvalidOperationExceptionAsync(); try { await Task.WhenAll(task1, task2); } catch (Exception ex) { // ex is either NotImplementedException or InvalidOperationException ... } } static async Task ObserveAllExceptionsAsync() { var task1 = ThrowNotImplementedExceptionAsync(); var task2 = ThrowInvalidOperationExceptionAsync(); Task allTasks = Task.WhenAll(task1, task2); try { await allTasks; } catch { AggregateException allExceptions = allTasks.Exception; ... } } // Use task In whenall, generally, it is enough to handle only the first error, and it is not necessary to handle all errors.
Wait for any task to complete
- To perform several tasks, you only need to respond to the completion of any one of them. This is mainly used for multiple independent attempts on an operation. As long as one attempt is completed, the task is completed. For example, ask multiple Web services for stock prices at the same time, but only care about the first response.
// Use Task Whenany method. The parameter of this method is a batch of tasks, which will be returned when any one of them is completed. The Task object as the return value is the completed Task // Returns the data length of the URL of the first response. private static async Task<int> FirstRespondingUrlAsync(string urlA, string urlB) { var httpClient = new HttpClient(); // Start two download tasks concurrently. Task<byte[]> downloadTaskA = httpClient.GetByteArrayAsync(urlA); Task<byte[]> downloadTaskB = httpClient.GetByteArrayAsync(urlB); // Wait for any task to complete. Task<byte[]> completedTask = await Task.WhenAny(downloadTaskA, downloadTaskB); // Returns the length of the data obtained from the URL. byte[] data = await completedTask; return data.Length; }
Processing at task completion
- We are await a batch of tasks. We hope to do some processing on each task when it is completed. In addition, we hope to process the achievements as soon as the task is completed, without waiting for other tasks.
static async Task<int> DelayAndReturnAsync(int val) { await Task.Delay(TimeSpan.FromSeconds(val)); return val; } // Currently, this method outputs "2", "3", "1". // We want it to output "1", "2", "3". static async Task ProcessTasksAsync() { // Create a task queue. Task<int> taskA = DelayAndReturnAsync(2); Task<int> taskB = DelayAndReturnAsync(3); Task<int> taskC = DelayAndReturnAsync(1); var tasks = new[] { taskA, taskB, taskC }; // await each task in order. foreach (var task in tasks) { var result = await task; Trace.WriteLine(result); } } // You don't have to wait for other tasks to refactor the code (not the only solution) static async Task<int> DelayAndReturnAsync(int val) { await Task.Delay(TimeSpan.FromSeconds(val)); return val; } // Now, this method outputs "1", "2", "3". static async Task ProcessTasksAsync() { // Create a task queue. Task<int> taskA = DelayAndReturnAsync(2); Task<int> taskB = DelayAndReturnAsync(3); Task<int> taskC = DelayAndReturnAsync(1); var tasks = new[] { taskA, taskB, taskC }; var processingTasks = tasks.Select(async t => { var result = await t; Trace.WriteLine(result); }).ToArray(); // Wait for the completion of all processes. await Task.WhenAll(processingTasks); }
Avoid context continuation
- By default, when an async method resumes running after being called by await, it will run in the original context. If it is a UI context and a large number of async methods are recovered in the UI context, it will cause performance problems.
// To avoid resuming operation in the context, let await call the return value of the ConfigureAwait method, and set the parameter continueOnCapturedContext to false: async Task ResumeOnContextAsync() { await Task.Delay(TimeSpan.FromSeconds(1)); // This method resumes running in the same context. } async Task ResumeWithoutContextAsync() { await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // This method discards the context when it resumes running. }
Handling the exception of async Task method
static async Task ThrowExceptionAsync() { await Task.Delay(TimeSpan.FromSeconds(1)); throw new InvalidOperationException("Test"); } static async Task TestAsync() { // Throw an exception and store it in the Task. Task task = ThrowExceptionAsync(); try { // The Task object is called by await, and the exception is thrown again here. await task; } catch (InvalidOperationException) { // Here, the exception is caught correctly. } }
Handle the exception passed from the async void method
sealed class MyAsyncCommand : ICommand { async void ICommand.Execute(object parameter) { await Execute(parameter); } public async Task Execute(object parameter) { ... // Asynchronous operation is implemented here. } ... // Other members (CanExecute, etc.). }
Fundamentals of concurrent development
Parallel processing of data
- There is a batch of data, and the same operation needs to be performed on each element. This operation is computationally intensive and takes some time
// The Parallel type has a ForEach method specially designed for this purpose, which rotates each matrix: void RotateMatrices(IEnumerable<Matrix> matrices, float degrees) { Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees)); } // If an invalid matrix is found, the loop is interrupted void InvertMatrices(IEnumerable<Matrix> matrices) { Parallel.ForEach(matrices, (matrix, state) => { if (!matrix.IsInvertible) state.Stop(); else matrix.Invert(); }); } // Parallel loops can be cancelled void RotateMatrices(IEnumerable<Matrix> matrices, float degrees, CancellationToken token) { Parallel.ForEach(matrices, new ParallelOptions { CancellationToken = token }, matrix => matrix.Rotate(degrees)); } // Invert each matrix and count the number of matrices that cannot be inverted int InvertMatrices(IEnumerable<Matrix> matrices) { object mutex = new object(); int nonInvertibleCount = 0; Parallel.ForEach(matrices, matrix => { if (matrix.IsInvertible) matrix.Invert(); else lock (mutex) { ++nonInvertibleCount; } }); return nonInvertibleCount; } /* Parallel.ForEach Method can process a series of values in Parallel. A similar solution is to use PLINQ (Parallel LINQ). PLINQ has most of the same functions as the Parallel class and uses a syntax similar to LINQ. There is a difference between the Parallel class and PLINQ: PLINQ assumes that all CPU cores in the computer can be used, while the Parallel class will dynamically adjust according to the change of CPU state Parallel.ForEach Is a Parallel version of the foreach loop. The Parallel class also provides a Parallel version of the for loop, Parallel For method. If there are multiple arrays of data and they use the same index, Parallel For is particularly applicable */
Parallel aggregation
- At the end of a parallel operation, you need to aggregate results, including cumulative sums, averages, and so on.
The Parallel class implements aggregation through the concept of local value, which is a variable that exists only inside the Parallel loop. This means that the code in the loop body can access the value directly without worrying about synchronization. The code in the loop uses the ` ` LocalFinally delegate to aggregate each local value. It should be noted that the ocalFinally ` delegate needs to access the variables storing the results in a synchronous manner
// Parallel summation // Note that this is not the most efficient implementation. // Just an example to illustrate the use of locks to protect shared state. static int ParallelSum(IEnumerable<int> values) { object mutex = new object(); int result = 0; Parallel.ForEach(source: values, localInit: () => 0, body: (item, state, localValue) => localValue + item, localFinally: localValue => { lock (mutex) result += localValue; }); return result; } // Parallel LINQ supports aggregation more smoothly than parallel class static int ParallelSum(IEnumerable<int> values) { return values.AsParallel().Sum(); } // PLINQ can also implement general aggregation functions through Aggregate static int ParallelSum(IEnumerable<int> values) { return values.AsParallel().Aggregate( seed: 0, func: (sum, item) => sum + item ); } /* If you are already using the Parallel class in your program, you can use its aggregation function. Otherwise, PLINQ's support for aggregation is more expressive and less code in most cases. */
Parallel call
- A number of methods need to be called in parallel, and these methods (mostly) are independent of each other.
// The Parallel class has a simple member Invoke, which can be used in this situation. The following example divides an array into two halves and processes them separately: static void ProcessArray(double[] array) { Parallel.Invoke( () => ProcessPartialArray(array, 0, array.Length / 2), () => ProcessPartialArray(array, array.Length / 2, array.Length) ); } static void ProcessPartialArray(double[] array, int begin, int end) { // Compute intensive processing } // If the number of calls cannot be determined before running, you can use parallel Enter an array of delegates in the invoke function: static void DoAction20Times(Action action) { Action[] actions = Enumerable.Repeat(action, 20).ToArray(); Parallel.Invoke(actions); } // Like other members of the Parallel class, Parallel Invoke also supports cancel operations: static void DoAction20Times(Action action, CancellationToken token) { Action[] actions = Enumerable.Repeat(action, 20).ToArray(); Parallel.Invoke(new ParallelOptions { CancellationToken = token }, actions); } /* For simple parallel calls, parallel Invoke is a very good solution. However, parallel is used in the following two cases Invoke is not very appropriate: you need to call an operation on each input data (use Parallel.Foreach instead), or each operation produces some output (use parallel LINQ instead) */
Dynamic parallelism
- The structure and number of parallel tasks can only be determined at run time, which is a more complex parallel programming
The Task Parallel library (TPL) is built around the Task class. The function of the Task class is very powerful. The Parallel class and Parallel LINQ encapsulate the Task class just for convenience. The easiest way to achieve dynamic parallelism is to use the Task class directly.
/* The following example processes each node of the binary tree, and the processing is very resource consuming. The structure of binary tree can only be determined at run time, so it is very suitable for dynamic parallelism. The Traverse method processes the current node and then creates two subtasks, each corresponding to a child node (in this example, it is assumed that the parent node must be processed before the child node can be processed). The ProcessTree method starts the processing process, creates a parent task at the highest level, and waits for the task to complete: */ void Traverse(Node current) { DoExpensiveActionOnNode(current); if (current.Left != null) { Task.Factory.StartNew(() => Traverse(current.Left), CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default); } if (current.Right != null) { Task.Factory.StartNew(() => Traverse(current.Right), CancellationToken.None, TaskCreationOptions.AttachedToParent, TaskScheduler.Default); } } public void ProcessTree(Node root) { var task = Task.Factory.StartNew(() => Traverse(root), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default); task.Wait(); } // If these tasks have no "parent / child" relationship, you can use the method of task continuation to arrange tasks to run one by one. Here, continuation is an independent task that runs after the original task ends: Task task = Task.Factory.StartNew( () => Thread.Sleep(TimeSpan.FromSeconds(2)), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default); Task continuation = task.ContinueWith( t => Trace.WriteLine("Task is done"), CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); // For continuation, the parameter "t" is equivalent to "task"
in concurrent programming, the task class has two functions: as a parallel task or as an asynchronous task. Parallel tasks can use blocked member functions, such as task Wait,Task.Result,Task.WaitAll and task WaitAny. Parallel tasks usually use ` ` AttachedToParent to establish the "parent / child" relationship between tasks. The creation of parallel tasks requires the use of task Run or task Factory. StartNew. Instead, asynchronous tasks should avoid blocking member functions and use await, task Whenall and task WhenAny. Asynchronous tasks do not use AttachedToParent `, but can establish an implicit "parent / child" relationship through await another task.
Parallel LINQ
- One batch of data needs to be processed in parallel to generate another batch of data, or data statistics
// Most developers are familiar with LINQ, LINQ can realize the operation of "pulling" data on the sequence. Parallel LINQ (PLINQ) extends LINQ to support parallel processing. PLINQ is very suitable for the operation of data flow, with one data queue as input and one data queue as output. The following simple example multiplies each element in the sequence by 2 (in practical application, the calculation workload is much larger): static IEnumerable<int> MultiplyBy2(IEnumerable<int> values) { return values.AsParallel().Select(item => item * 2); } // According to the default mode of parallel LINQ, the order of output data queue in this example is not fixed. It can also indicate that the original order is required to be maintained. The following example is also executed in parallel, but retains the original order of data: static IEnumerable<int> MultiplyBy2(IEnumerable<int> values) { return values.AsParallel().AsOrdered().Select(item => item * 2); } // Another common use of parallel LINQ is to aggregate or summarize data in parallel. The following code implements parallel cumulative summation: static int ParallelSum(IEnumerable<int> values) { return values.AsParallel().Sum(); }
the Parallel class can be used in many situations, but PLINQ code is more convenient for aggregation or data sequence conversion
Add brevity. It should be noted that the Parallel class works better with other processes in the system than PLINQ. If
In particular, parallel processing on the server needs to be considered.
PLINQ provides parallel versions for a variety of operations, including Where, Select, and each
Aggregation operations, such as Sum, Average, and more generally Aggregate. Generally speaking, for all of conventional LINQ
All operations can be performed on PLINQ in parallel. Because of this, if you are going to change the existing LINQ code to
In parallel mode, PLINQ is a very good choice.
Data flow foundation
the TPL dataflow library is powerful and can be used to create mesh es and pipelines and send data asynchronously through them. The code of data flow has a strong "declarative programming" style. Generally, the grid must be fully defined before data processing can begin, and finally the grid will become an architecture for data circulation
each grid is composed of various interconnected data flow blocks. Independent blocks are relatively simple and are only responsible for a single step in data processing. When the block processes its data, it passes the data to the block linked to it.
- Before using TPL data stream, you need to install a NuGet package in the program: Microsoft Tpl. Dataflow
Linked data flow block
- When creating a grid, you need to link data flow blocks to each other
// The blocks provided by TPL data stream library only have some basic members, and many practical methods are actually extension methods. Here, let's look at the LinkTo method: var multiplyBlock = new TransformBlock<int, int>(item => item * 2); var subtractBlock = new TransformBlock<int, int>(item => item - 2); // After the link is established, the data from the multiplyBlock will enter the subtractBlock. multiplyBlock.LinkTo(subtractBlock); // By default, linked data flow blocks only transfer data, not completion (or error information). If the data flow is linear (such as pipeline), it is generally necessary to transfer the completion. To pass completion (and error information), you can set the PropagateCompletion property in the link: var multiplyBlock = new TransformBlock<int, int>(item => item * 2); var subtractBlock = new TransformBlock<int, int>(item => item - 2); var options = new DataflowLinkOptions { PropagateCompletion = true }; multiplyBlock.LinkTo(subtractBlock, options); ... // The completion of the first block is automatically passed to the second block. multiplyBlock.Complete(); await subtractBlock.Completion; /** Once the link is established, the data is automatically transferred from the source block to the target block. If the PropagateCompletion property is set, data will also be passed when the situation is completed. On each node of the pipeline, when the wrong block passes the error information to the next block, it will encapsulate the error information into the AggregateException object. Therefore, if the delivery is complete If the pipeline is very long, the error message will be nested in many AggregateException instances. In this case, several members of AggregateException (such as Flatten) can handle errors. There are many ways to link data flow blocks, including bifurcation, connection, and even loop in the grid. But in most cases, a linear pipe is enough. Using the DataflowLinkOptions class, you can set multiple different parameters for the link (such as the PropagateCompletion parameter used earlier). In addition, you can set assertions in the overloaded LinkTo method to form a data access filter. The data will not be deleted when it is intercepted by the filter. The data passing through the filter will continue to the next process, and the data intercepted by the filter will also try to pass through other links. If all links fail to pass, they will remain in the original block. */
Pass error message
- Errors that occur in the data flow grid need to be handled
// ① If a delegate within a data flow block throws an error, the block enters a fault state. Once the data flow block enters the fault state, all data will be deleted (and new data will be stopped). The data flow block will not generate any new data. In the following code, the first value raises an error and the second value is deleted directly: var block = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); block.Post(1); block.Post(2); // ② Call its Completion attribute with await to catch errors in data flow blocks. The Completion property returns a task. Once the execution of the data flow block is completed, the task is also completed. If the data flow block fails, this task also fails: try { var block = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); block.Post(1); await block.Completion; } catch (InvalidOperationException) { // Exceptions are caught here. } // ③ If the completion status is passed with the parameter PropagateCompletion, the error message will also be passed. Only this exception is encapsulated in the AggregateException class and passed to the next block. In the following example, the program caught an exception at the end of the pipe. This indicates that if the exception is passed from the previous block, the program will catch the AggregateException: try { var multiplyBlock = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => item - 2); multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true }); multiplyBlock.Post(1); await subtractBlock.Completion; } catch (AggregateException) { // Exceptions are caught here. } /** After the data flow block receives the error message, even if it has been encapsulated in AggregateException, it will still be encapsulated with AggregateException. If an error occurs in the front part of the pipeline and is found after multiple links, the original error will be encapsulated in many layers by AggregateException. Then use AggregateException The flatten () method simplifies error handling. */
break link
- To modify the data flow structure during processing. This is an advanced application that is rarely used
// Data flow blocks can be linked or disconnected at any time. The free transfer of data in the grid will not be affected by this. Threads are completely safe when establishing or breaking links. When creating links between data flow blocks, keep the IDisposable interface returned by the LinkTo method. To disconnect them, simply release the interface: var multiplyBlock = new TransformBlock<int, int>(item => item * 2); var subtractBlock = new TransformBlock<int, int>(item => item - 2); IDisposable link = multiplyBlock.LinkTo(subtractBlock); multiplyBlock.Post(1); multiplyBlock.Post(2); // Disconnect the data flow block. // In the previous code, the data may or may not have been passed through the link. // In practice, consider using code blocks instead of calling Dispose. link.Dispose(); /** Unless the link is guaranteed to be idle, a race condition will occur when the data flow block is disconnected. However, there is usually no need to worry about such race conditions. The data is either passed to the next block before the link is broken, or it will never be passed. These race conditions will not repeat or lose data. Disconnection is an advanced application, but it can still be used in some situations. For example, the filter cannot be modified after the link is established. To modify the filter of an existing link, you must first break the old link and then establish a new link with the new filter (you can set DataflowLinkOptions.Append to false). In addition, to pause the data flow grid, disconnect a key link. */
Limit flow
- It is necessary to fork in the data flow grid, and it is hoped that the data flow can be balanced among the branches
/** By default, after the data flow block generates the output data, it checks each link (in the order it is created) and attempts to pass the data through the link one by one. Similarly, by default, each data stream block maintains an input buffer that receives any amount of data before processing it. When there are forks, a source block links two target blocks, which will cause problems: the first target block will keep buffering data, and the second target block will never have a chance to get data. The solution to this problem is to use the BoundedCapacity attribute of the data flow block to limit the throttling of the target block. Default settings for BoundedCapacity Is dataflowblockoptions Unbounded, which will cause the first target block to buffer all data before it has time to process the data. BoundedCapacity It can be any value greater than 0 (of course, it can also be DataflowBlockOptions.Unbounded). As long as the target block has time to process the data from the source block, it is sufficient to set this parameter to 1: */ var sourceBlock = new BufferBlock<int>(); var options = new DataflowBlockOptions { BoundedCapacity = 1 }; var targetBlockA = new BufferBlock<int>(options); var targetBlockB = new BufferBlock<int>(options); sourceBlock.LinkTo(targetBlockA); sourceBlock.LinkTo(targetBlockB); /** Current limiting can be used for forked load balancing, but it can also be used in any current limiting behavior. For example, when filling a data flow grid with data from I/O operations, you can set the BoundedCapacity property of the data flow block. In this way, when the grid has no time to process data, it will not read too much I/O data, and the grid will not cache all data. */
Parallel processing of data stream blocks
- Want to parallelize the data flow grid
/** By default, each data flow block is independent of each other. When two data flow blocks are linked together, they also run independently. Therefore, each data flow grid has its own parallelism. If you want to go further, for example, a specific data flow block has a large amount of computation, you can set the maxdegreofparallelism parameter to make the data flow block process the input data in a parallel manner. The default value of MaxDegreeOfParallelism is 1, so each data flow block can only process one piece of data at the same time. MaxDegreeOfParallelism It can be set to dataflowblockoptions Unbounded or any value greater than 0. The following example allows any number of tasks to multiply data at the same time: */ var multiplyBlock = new TransformBlock<int, int>( item => item * 2, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded } ); var subtractBlock = new TransformBlock<int, int>(item => item - 2); multiplyBlock.LinkTo(subtractBlock); /** Using the maxdegreofparallelism parameter, it is easy to implement parallel processing in data flow blocks. The real difficulty is to find out which data flow blocks need parallel processing. One way is to pause the operation of data flow during debugging and check the number of waiting data items in the debugger (that is, data items that have not been processed by data flow blocks). If there are many data items waiting, it indicates that reconstruction or parallelization is needed. The MaxDegreeOfParallelism parameter also plays a role in asynchronous processing of data flow blocks. At this time, the MaxDegreeOfParallelism parameter represents the concurrency level, that is, a certain number of slots. When the data flow block starts processing data items, each data item occupies a slot. The slot is not released until the entire asynchronous process is complete. */
Create a custom data flow block
- It is hoped that some reusable program logic can be used in custom data flow blocks. This helps to create larger data streams with complex logic
/** By using the Encapsulate method, any part of the data flow grid with a single input block and output block can be extracted. The Encapsulate method uses these two endpoints to create a separate data flow block. The developer has to be responsible for the transfer and completion of data between endpoints. The following code uses two data flow blocks to create a user-defined data flow block, and realizes the transmission of data and completion: */ IPropagatorBlock<int, int> CreateMyCustomBlock() { var multiplyBlock = new TransformBlock<int, int>(item => item * 2); var addBlock = new TransformBlock<int, int>(item => item + 2); var divideBlock = new TransformBlock<int, int>(item => item / 2); var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true }; multiplyBlock.LinkTo(addBlock, flowCompletion); addBlock.LinkTo(divideBlock, flowCompletion); return DataflowBlock.Encapsulate(multiplyBlock, divideBlock); } /** When encapsulating a grid into a custom data flow block, you have to consider what type of parameters are provided externally and how each block parameter should be passed into the internal grid (or not). In many cases, some block parameters are inappropriate or meaningless. For this reason, when creating a custom data flow block, you usually have to define parameters instead of following the DataflowBlockOptions parameter. DataflowBlock.Encapsulate Only grids with one input block and one output block are encapsulated. If a reusable grid has multiple inputs or outputs, it should be encapsulated into a custom object and exposed in the form of attributes. The input attribute type is itargetblock < T > and the output attribute type is ireceivable sourceblock < T >. */