找回密码
 立即注册
首页 资源区 代码 C#编程中并行与并发的简单理解

C#编程中并行与并发的简单理解

吕梓美 2025-5-29 10:53:11
1.简述

并发通过管理多个任务的执行顺序,确保系统在高负载下仍能保持响应性;并行则利用多处理器或多核心硬件,真正同时执行任务,以加速计算。这两者在高性能计算、实时系统和用户交互应用中发挥着不可替代的作用。
在多核处理器时代,传统串行编程已无法充分利用硬件潜力。并行计算通过将任务分解到多个核心执行,显著缩短了计算时间。然而,并发与并行的实现并非没有代价,它们引入了诸如竞争条件、死锁和负载均衡等复杂问题,需要开发者具备深厚的理论基础和实践经验。
2.并发与并行

2.1 定义


  • 并发(Concurrency):

    • 指系统在一段时间内管理多个任务的能力。并发关注任务的协调与交错执行,通过时间分片等技术在一个或多个处理器上实现,因此并发看似同时进行,但不一定在同一时刻执行。
    • 并发强调任务的逻辑组织和协调。
    • 举例:一个Web服务器可以并发处理多个客户端请求,通过快速切换任务确保每个请求都能及时响应。

  • 并行(Parallelism):

    • 指多个任务在同一时刻真正同时执行,通常依赖于多核处理器或分布式系统。其核心目标是提升计算速度,通过将问题分解为独立的子任务并同时处理。并行适用于计算密集型任务。
    • 并行关注物理执行的并行性。
    • 举例:在并行矩阵乘法中,不同的核心可以同时计算矩阵的不同部分,从而显著缩短总计算时间;科学模拟或图像处理,其效果依赖于多核处理器、GPU或分布式计算系统的硬件支持。

2.2 区别

1.png

并发与并行的根本区别在于执行的时间性资源依赖性

  • 执行模式:并行强调真正的同时执行,而并发通过任务切换营造同时进行的假象。
  • 硬件依赖:并行需要多处理器或多核心支持,而并发在单核系统上即可实现。
  • 目标:并行旨在加速计算,而并发注重系统响应性和多任务处理能力。
例如,在单核系统中,操作系统通过时间片轮转调度多个线程;而多核系统中,线程可以分配到不同核心并行运行。
3.实现并发

3.1 并行实现并发

在多核处理器上,任务可以分配到不同核心并行执行,从而实现高效并发。例如,Web服务器通过多线程并行处理客户端请求。
代码示例:多线程并行处理 :System.Threading.ThreadPool来创建和管理线程池,并使用ManualResetEventSlim来等待所有任务完成。
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Threading;
  4.         class Program
  5.         {
  6.                 static void Main(string[] args)
  7.                 {
  8.                         List<Request> requests = new List<Request>
  9.                         {
  10.                                 new Request { Data = "Request1" },
  11.                                 new Request { Data = "Request2" },
  12.                                 new Request { Data = "Request3" }
  13.                                 // 添加更多请求
  14.                         };
  15.                         process_requests(requests);
  16.                         Console.WriteLine("All requests processed.");
  17.                 }
  18.                 static void process_requests(List<Request> requests)
  19.                 {
  20.                         int num_cores = Environment.ProcessorCount;  // 获取处理器核心数
  21.                         ManualResetEventSlim[] mres = new ManualResetEventSlim[requests.Count];  // 创建信号量数组
  22.                         for (int i = 0; i < requests.Count; i++)
  23.                         {
  24.                                 int index = i;
  25.                                 mres[index] = new ManualResetEventSlim(false);  // 初始化信号量
  26.                                 ThreadPool.QueueUserWorkItem((state) =>
  27.                                 {
  28.                                         handle_request(requests[index]);
  29.                                         mres[index].Set();  // 任务完成时设置信号量
  30.                                 });
  31.                         }
  32.                         // 等待所有任务完成
  33.                         ManualResetEventSlim.WaitAll(mres);
  34.                 }
  35.                 static void handle_request(Request request)
  36.                 {
  37.                         Response response = process(request);  // 处理请求
  38.                         send_response(response);  // 发送响应
  39.                 }
  40.                 static Response process(Request request)
  41.                 {
  42.                         // 模拟请求处理逻辑
  43.                         Console.WriteLine($"Processing request: {request.Data}");
  44.                         Thread.Sleep(1000);  // 模拟耗时操作
  45.                         return new Response { Data = $"Response for {request.Data}" };
  46.                 }
  47.                 static void send_response(Response response)
  48.                 {
  49.                         // 模拟发送响应逻辑
  50.                         Console.WriteLine($"Sending response: {response.Data}");
  51.                 }
  52.         }
  53.         class Request
  54.         {
  55.                 public string Data { get; set; }
  56.         }
  57.         class Response
  58.         {
  59.                 public string Data { get; set; }
  60.         }
复制代码
======================================================================================================================
使用Task.Run和Task.WhenAll来实现
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Threading.Tasks;
  4.         class Program
  5.         {
  6.                 static async Task Main(string[] args)
  7.                 {
  8.                         List<Request> requests = new List<Request>
  9.                         {
  10.                                 new Request { Data = "Request1" },
  11.                                 new Request { Data = "Request2" },
  12.                                 new Request { Data = "Request3" }
  13.                                 // 添加更多请求
  14.                         };
  15.                         await process_requests(requests);
  16.                         Console.WriteLine("All requests processed.");
  17.                 }
  18.                 static async Task process_requests(List<Request> requests)
  19.                 {
  20.                         List<Task> tasks = new List<Task>();
  21.                         foreach (Request request in requests)
  22.                         {
  23.                                 Task task = Task.Run(() => handle_request(request));
  24.                                 tasks.Add(task);
  25.                         }
  26.                         // 等待所有任务完成
  27.                         await Task.WhenAll(tasks);
  28.                 }
  29.                 static async Task handle_request(Request request)
  30.                 {
  31.                         Response response = await process(request);  // 处理请求
  32.                         send_response(response);  // 发送响应
  33.                 }
  34.                 static async Task<Response> process(Request request)
  35.                 {
  36.                         // 模拟请求处理逻辑
  37.                         Console.WriteLine($"Processing request: {request.Data}");
  38.                         await Task.Delay(1000);  // 模拟耗时操作
  39.                         return new Response { Data = $"Response for {request.Data}" };
  40.                 }
  41.                 static void send_response(Response response)
  42.                 {
  43.                         // 模拟发送响应逻辑
  44.                         Console.WriteLine($"Sending response: {response.Data}");
  45.                 }
  46.         }
  47.         class Request
  48.         {
  49.                 public string Data { get; set; }
  50.         }
  51.         class Response
  52.         {
  53.                 public string Data { get; set; }
  54.         }
复制代码
3.2 任务调度

在单核处理器上,通过时间片轮转等调度算法实现并发。操作系统在任务间快速切换,营造同时执行的假象。
代码示例:时间片轮转调度 :示例使用了Task和CancellationToken来管理任务的时间片轮转调度。
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Threading;
  4.         using System.Threading.Tasks;
  5.         class Program
  6.         {
  7.                 static async Task Main(string[] args)
  8.                 {
  9.                         List<Task> tasks = new List<Task>
  10.                         {
  11.                                 run_task("Task1", 5000),  // 创建一个任务,模拟总时间为5秒
  12.                                 run_task("Task2", 3000),  // 创建一个任务,模拟总时间为3秒
  13.                                 run_task("Task3", 7000)   // 创建一个任务,模拟总时间为7秒
  14.                         };
  15.                         int time_slice = 1000;  // 设置时间片为1秒
  16.                         await scheduler(tasks, time_slice);
  17.                         Console.WriteLine("All tasks processed.");
  18.                 }
  19.                 static async Task scheduler(List<Task> tasks, int time_slice)
  20.                 {
  21.                         List<Task> runningTasks = new List<Task>();
  22.                         List<Task> remainingTasks = new List<Task>(tasks);
  23.                         while (remainingTasks.Count > 0 || runningTasks.Count > 0)
  24.                         {
  25.                                 // 将剩余任务中的第一个任务移到运行列表
  26.                                 if (remainingTasks.Count > 0)
  27.                                 {
  28.                                         runningTasks.Add(remainingTasks[0]);
  29.                                         remainingTasks.RemoveAt(0);
  30.                                 }
  31.                                 // 复制运行任务列表以避免在遍历过程中修改列表
  32.                                 List<Task> currentRunningTasks = new List<Task>(runningTasks);
  33.                                 foreach (Task task in currentRunningTasks)
  34.                                 {
  35.                                         if (!task.IsCompleted)
  36.                                         {
  37.                                                 await run_task_for_time_slice(task, time_slice);
  38.                                                 if (task.IsCompleted)
  39.                                                 {
  40.                                                         runningTasks.Remove(task);
  41.                                                 }
  42.                                                 else
  43.                                                 {
  44.                                                         remainingTasks.Add(task);
  45.                                                         runningTasks.Remove(task);
  46.                                                 }
  47.                                         }
  48.                                 }
  49.                         }
  50.                 }
  51.                 static async Task run_task_for_time_slice(Task task, int time_slice)
  52.                 {
  53.                         // 创建一个取消令牌源
  54.                         CancellationTokenSource cts = new CancellationTokenSource(time_slice);
  55.                         try
  56.                         {
  57.                                 // 等待任务完成或时间片用完
  58.                                 await task.WaitAsync(cts.Token);
  59.                         }
  60.                         catch (TaskCanceledException)
  61.                         {
  62.                                 // 时间片用完,任务未完成
  63.                                 Console.WriteLine($"Task {task.Id} preempted after {time_slice} ms");
  64.                         }
  65.                 }
  66.                 static Task run_task(string taskName, int total_time)
  67.                 {
  68.                         return Task.Run(async () =>
  69.                         {
  70.                                 int elapsedTime = 0;
  71.                                 int time_slice = 1000;  // 模拟内部时间片
  72.                                 while (elapsedTime < total_time)
  73.                                 {
  74.                                         Console.WriteLine($"{taskName} is running. Elapsed time: {elapsedTime} ms");
  75.                                         await Task.Delay(time_slice);  // 模拟任务运行一段时间
  76.                                         elapsedTime += time_slice;
  77.                                 }
  78.                                 Console.WriteLine($"{taskName} is completed.");
  79.                         });
  80.                 }
  81.         }
复制代码
3.3 多线程

多线程通过创建多个执行单元实现并发。线程共享进程资源,通过同步机制(如互斥锁)协调访问。
代码示例:多线程同步
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Threading;
  4.         using System.Threading.Tasks;
  5.         namespace Test.EventBus
  6.         {
  7.                 public class DemoB
  8.                 {
  9.                         private static Mutex mutex = new Mutex();  // 创建互斥锁
  10.                         private static StringBuilder sharedData = new StringBuilder();  // 初始化共享数据
  11.                         public void ShowMsg(string name, string msg)
  12.                         {
  13.                                 Console.WriteLine($"ShowMsg=> name:{name},msg:{msg}");
  14.                                 var eventMsg = new EventMessage
  15.                                 {
  16.                                         Name = name,
  17.                                         Msg = msg,
  18.                                         CreatedDate = DateTime.Now
  19.                                 };
  20.                                 EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg));
  21.                         }
  22.                         public static void RunDemo()
  23.                         {
  24.                                 List<Task> tasks = new List<Task>();
  25.                                 for (int i = 0; i < 10; i++)
  26.                                 {
  27.                                         int taskId = i;
  28.                                         tasks.Add(Task.Run(() => thread_function($"Task{taskId}")));
  29.                                 }
  30.                                 // 等待所有任务完成
  31.                                 Task.WaitAll(tasks);
  32.                         }
  33.                         static void thread_function(string name)
  34.                         {
  35.                                 for (int i = 0; i < 5; i++)
  36.                                 {
  37.                                         modify_shared_data(name, i);
  38.                                 }
  39.                         }
  40.                         static void modify_shared_data(string name, int iteration)
  41.                         {
  42.                                 mutex.WaitOne();  // 加锁
  43.                                 try
  44.                                 {
  45.                                         // 修改共享数据
  46.                                         sharedData.AppendLine($"{name} is modifying shared data at iteration {iteration} on {DateTime.Now}");
  47.                                         Console.WriteLine($"{name} is modifying shared data at iteration {iteration} on {DateTime.Now}");
  48.                                 }
  49.                                 finally
  50.                                 {
  51.                                         mutex.ReleaseMutex();  // 解锁
  52.                                 }
  53.                         }
  54.                 }
  55.                 public class EventMessage
  56.                 {
  57.                         public string Name { get; set; }
  58.                         public string Msg { get; set; }
  59.                         public DateTime CreatedDate { get; set; }
  60.                 }
  61.                 public static class EventPublishSubscribeUtils
  62.                 {
  63.                         public static void PublishEvent(EventMessage eventMsg, string eventName)
  64.                         {
  65.                                 Console.WriteLine($"Published event: {eventName} => Name: {eventMsg.Name}, Msg: {eventMsg.Msg}, CreatedDate: {eventMsg.CreatedDate}");
  66.                         }
  67.                 }
  68.                 class Program
  69.                 {
  70.                         static void Main(string[] args)
  71.                         {
  72.                                 DemoB.RunDemo();
  73.                                 Console.WriteLine("All threads completed.");
  74.                         }
  75.                 }
  76.         }
复制代码
3.4 异步编程

异步编程通过事件循环和回调函数处理I/O密集型任务,避免阻塞主线程。
代码示例:异步I/O
  1. using System;
  2. using System.IO;
  3. using System.Threading.Tasks;
  4. namespace AsyncIOExample
  5. {
  6.         class Program
  7.         {
  8.                 static async Task Main(string[] args)
  9.                 {
  10.                         string filePath1 = "example1.txt";
  11.                         string filePath2 = "example2.txt";
  12.                         // 创建模拟文件
  13.                         File.WriteAllText(filePath1, "Data from example1.txt");
  14.                         File.WriteAllText(filePath2, "Data from example2.txt");
  15.                         // 异步读取文件并使用回调函数处理数据
  16.                         await async_read(filePath1, data => callback(data, filePath1));
  17.                         await async_read(filePath2, data => callback(data, filePath2));
  18.                         Console.WriteLine("All asynchronous read operations completed.");
  19.                 }
  20.                 static async Task async_read(string file, Action<string> callback)
  21.                 {
  22.                         // 模拟事件循环添加任务
  23.                         Console.WriteLine($"Starting asynchronous read for file: {file}");
  24.                         string data = await read_file(file);
  25.                         callback(data);
  26.                 }
  27.                 static async Task<string> read_file(string file)
  28.                 {
  29.                         // 模拟从磁盘读取文件
  30.                         using (StreamReader reader = new StreamReader(file))
  31.                         {
  32.                                 string data = await reader.ReadToEndAsync();
  33.                                 return data;
  34.                         }
  35.                 }
  36.                 static void callback(string data, string file)
  37.                 {
  38.                         // 处理读取后的数据
  39.                         Console.WriteLine($"Data read from {file}: {data}");
  40.                 }
  41.         }
  42. }
复制代码
3.5 协程

协程通过yield和resume机制在单线程内实现并发,适用于I/O密集型任务,具有低开销优势。
代码示例:协程
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. namespace CoroutineExample
  5. {
  6.         class Program
  7.         {
  8.                 static async Task Main(string[] args)
  9.                 {
  10.                         IAsyncEnumerable<string> coroutine = coroutine_example();
  11.                         // 创建一个异步枚举器
  12.                         IAsyncEnumerator<string> enumerator = coroutine.GetAsyncEnumerator();
  13.                         // 启动协程
  14.                         if (await enumerator.MoveNextAsync())
  15.                         {
  16.                                 Console.WriteLine("Coroutine started.");
  17.                                 // 发送数据并恢复执行
  18.                                 await enumerator.MoveNextAsync();
  19.                                 enumerator.Current = "Data1";
  20.                                 await enumerator.MoveNextAsync();
  21.                                 enumerator.Current = "Data2";
  22.                                 await enumerator.MoveNextAsync();
  23.                                 enumerator.Current = "Data3";
  24.                                 // 结束协程
  25.                                 await enumerator.DisposeAsync();
  26.                         }
  27.                 }
  28.                 static async IAsyncEnumerable<string> coroutine_example()
  29.                 {
  30.                         string data = null;
  31.                         while (true)
  32.                         {
  33.                                 // 暂停并接收数据
  34.                                 await Task.Delay(100);  // 模拟等待
  35.                                 data = yield return data;
  36.                                 // 处理数据
  37.                                 process(data);
  38.                         }
  39.                 }
  40.                 static void process(string data)
  41.                 {
  42.                         if (data != null)
  43.                         {
  44.                                 Console.WriteLine($"Processed data: {data}");
  45.                         }
  46.                         else
  47.                         {
  48.                                 Console.WriteLine("No data to process.");
  49.                         }
  50.                 }
  51.         }
  52. }
复制代码
3.6 事件驱动

事件驱动编程通过事件循环监听和处理事件,适用于GUI和网络应用。
代码示例:事件驱动
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. namespace EventDrivenExample
  5. {
  6.         class Program
  7.         {
  8.                 static async Task Main(string[] args)
  9.                 {
  10.                         // 创建事件循环
  11.                         EventLoop eventLoop = new EventLoop();
  12.                         // 注册事件处理函数
  13.                         eventLoop.RegisterHandler("Event1", Event1Handler);
  14.                         eventLoop.RegisterHandler("Event2", Event2Handler);
  15.                         // 模拟事件触发
  16.                         eventLoop.TriggerEvent(new Event { Type = "Event1", Data = "Data for Event1" });
  17.                         eventLoop.TriggerEvent(new Event { Type = "Event2", Data = "Data for Event2" });
  18.                         // 启动事件循环
  19.                         await eventLoop.Start();
  20.                         Console.WriteLine("Event loop completed.");
  21.                 }
  22.                 static void Event1Handler(Event e)
  23.                 {
  24.                         Console.WriteLine($"Handling {e.Type} with data: {e.Data}");
  25.                 }
  26.                 static void Event2Handler(Event e)
  27.                 {
  28.                         Console.WriteLine($"Handling {e.Type} with data: {e.Data}");
  29.                 }
  30.         }
  31.         public class Event
  32.         {
  33.                 public string Type { get; set; }
  34.                 public string Data { get; set; }
  35.         }
  36.         public class EventLoop
  37.         {
  38.                 private Queue<Event> _eventQueue = new Queue<Event>();
  39.                 private Dictionary<string, Action<Event>> _handlers = new Dictionary<string, Action<Event>>();
  40.                 private bool _running = false;
  41.                 public void RegisterHandler(string eventType, Action<Event> handler)
  42.                 {
  43.                         if (_handlers.ContainsKey(eventType))
  44.                         {
  45.                                 _handlers[eventType] += handler;
  46.                         }
  47.                         else
  48.                         {
  49.                                 _handlers[eventType] = handler;
  50.                         }
  51.                 }
  52.                 public void TriggerEvent(Event e)
  53.                 {
  54.                         lock (_eventQueue)
  55.                         {
  56.                                 _eventQueue.Enqueue(e);
  57.                         }
  58.                 }
  59.                 public async Task Start()
  60.                 {
  61.                         _running = true;
  62.                         while (_running)
  63.                         {
  64.                                 Event e = null;
  65.                                 lock (_eventQueue)
  66.                                 {
  67.                                         if (_eventQueue.Count > 0)
  68.                                         {
  69.                                                 e = _eventQueue.Dequeue();
  70.                                         }
  71.                                 }
  72.                                 if (e != null)
  73.                                 {
  74.                                         if (_handlers.TryGetValue(e.Type, out Action<Event> handler))
  75.                                         {
  76.                                                 handler(e);
  77.                                         }
  78.                                         else
  79.                                         {
  80.                                                 Console.WriteLine($"No handler registered for event type: {e.Type}");
  81.                                         }
  82.                                 }
  83.                                 else
  84.                                 {
  85.                                         // 模拟等待事件
  86.                                         await Task.Delay(100);  // 等待100毫秒
  87.                                 }
  88.                         }
  89.                 }
  90.                 public void Stop()
  91.                 {
  92.                         _running = false;
  93.                 }
  94.         }
  95. }
复制代码
3.7 多进程

多进程通过创建独立进程实现并发,进程间通过IPC(如管道或消息队列)通信,适用于CPU密集型任务
在C#中,多进程可以通过使用 System.Diagnostics.Process 类来创建和管理独立进程。进程间通信(IPC)可以通过多种方式实现,例如使用命名管道(System.IO.Pipes)或内存映射文件(System.IO.MemoryMappedFiles)。在这个示例中,我们将使用命名管道来进行进程间通信。
代码示例:多进程
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.IO.Pipes;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. namespace MultiProcessExample
  8. {
  9.         class Program
  10.         {
  11.                 static async Task Main(string[] args)
  12.                 {
  13.                         int num_processes = 3;  // 设置进程数量
  14.                         List<Process> processes = new List<Process>();
  15.                         List<Task<string>> readTasks = new List<Task<string>>();
  16.                         // 创建命名管道服务器
  17.                         var server = new NamedPipeServerStream("testpipe", PipeDirection.In, num_processes, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
  18.                         // 创建并启动进程
  19.                         for (int i = 0; i < num_processes; i++)
  20.                         {
  21.                                 Process p = new Process();
  22.                                 p.StartInfo.FileName = "dotnet";
  23.                                 p.StartInfo.Arguments = $"MultiProcessExample.dll worker {i}";
  24.                                 p.StartInfo.UseShellExecute = false;
  25.                                 p.StartInfo.RedirectStandardOutput = true;
  26.                                 p.StartInfo.CreateNoWindow = true;
  27.                                 p.Start();
  28.                                 processes.Add(p);
  29.                                 // 读取子进程的输出
  30.                                 readTasks.Add(Task.Run(() => read_from_process(p)));
  31.                         }
  32.                         // 等待所有进程结束
  33.                         foreach (var process in processes)
  34.                         {
  35.                                 process.WaitForExit();
  36.                         }
  37.                         // 等待所有读取任务完成
  38.                         string[] results = await Task.WhenAll(readTasks);
  39.                         // 输出所有结果
  40.                         foreach (var result in results)
  41.                         {
  42.                                 Console.WriteLine($"Received result: {result}");
  43.                         }
  44.                         // 关闭命名管道服务器
  45.                         server.Close();
  46.                 }
  47.                 static string read_from_process(Process process)
  48.                 {
  49.                         // 读取子进程的标准输出
  50.                         string result = process.StandardOutput.ReadToEnd();
  51.                         return result;
  52.                 }
  53.         }
  54.         class Worker
  55.         {
  56.                 static async Task Main(string[] args)
  57.                 {
  58.                         if (args.Length != 2 || args[0] != "worker" || !int.TryParse(args[1], out int id))
  59.                         {
  60.                                 Console.WriteLine("Invalid arguments.");
  61.                                 return;
  62.                         }
  63.                         // 创建命名管道客户端
  64.                         using (NamedPipeClientStream pipeClient = new NamedPipeClientStream(".", "testpipe", PipeDirection.Out, PipeOptions.Asynchronous))
  65.                         {
  66.                                 try
  67.                                 {
  68.                                         // 连接到命名管道服务器
  69.                                         await pipeClient.ConnectAsync();
  70.                                         // 执行计算任务
  71.                                         string result = compute(id);
  72.                                         // 发送结果
  73.                                         send_result(pipeClient, result);
  74.                                 }
  75.                                 catch (Exception ex)
  76.                                 {
  77.                                         Console.WriteLine($"Error: {ex.Message}");
  78.                                 }
  79.                         }
  80.                 }
  81.                 static string compute(int id)
  82.                 {
  83.                         // 模拟计算任务
  84.                         Console.WriteLine($"Worker {id} is computing...");
  85.                         Task.Delay(1000).Wait();  // 模拟耗时操作
  86.                         return $"Result from Worker {id}";
  87.                 }
  88.                 static void send_result(NamedPipeClientStream pipeClient, string result)
  89.                 {
  90.                         try
  91.                         {
  92.                                 // 将结果发送到命名管道
  93.                                 byte[] resultBytes = Encoding.UTF8.GetBytes(result);
  94.                                 pipeClient.Write(resultBytes, 0, resultBytes.Length);
  95.                                 pipeClient.Flush();
  96.                         }
  97.                         catch (Exception ex)
  98.                         {
  99.                                 Console.WriteLine($"Error sending result: {ex.Message}");
  100.                         }
  101.                 }
  102.         }
  103. }
复制代码
4.实现并行的技术

4.1 多线程(Multithreading)

多线程通过在单个或多个处理器核心上运行多个线程来实现并行。在多核处理器上,线程可以真正并行执行;在单核处理器上,通过时间片切换实现伪并行。多线程适用于I/O密集型和计算密集型任务,能提高资源利用率和程序响应速度。
代码示例::使用了System.Threading.Thread来创建和管理多个线程,并使用Task来提交和等待任务的完成。
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Threading;
  4.         using System.Threading.Tasks;
  5.         namespace MultiThreadExample
  6.         {
  7.                 class Program
  8.                 {
  9.                         static void Main(string[] args)
  10.                         {
  11.                                 int N = 3;  // 设置线程数量
  12.                                 List<Thread> threads = new List<Thread>();
  13.                                 List<string> results = new List<string>();
  14.                                 object lockObject = new object();  // 同步锁
  15.                                 // 创建并启动多个线程
  16.                                 for (int i = 0; i < N; i++)
  17.                                 {
  18.                                         int id = i;
  19.                                         Thread thread = new Thread(() => task_function(id, results, lockObject));
  20.                                         threads.Add(thread);
  21.                                         thread.Start();
  22.                                 }
  23.                                 // 等待所有线程完成
  24.                                 foreach (Thread thread in threads)
  25.                                 {
  26.                                         thread.Join();
  27.                                 }
  28.                                 // 输出所有结果
  29.                                 foreach (string result in results)
  30.                                 {
  31.                                         Console.WriteLine($"Result from thread: {result}");
  32.                                 }
  33.                                 Console.WriteLine("All threads completed.");
  34.                         }
  35.                         static void task_function(int id, List<string> results, object lockObject)
  36.                         {
  37.                                 string result = perform_task(id);  // 执行任务
  38.                                 lock (lockObject)
  39.                                 {
  40.                                         results.Add(result);  // 将结果添加到共享列表并加锁
  41.                                 }
  42.                         }
  43.                         static string perform_task(int id)
  44.                         {
  45.                                 // 模拟任务执行
  46.                                 Console.WriteLine($"Thread {id} is processing.");
  47.                                 Thread.Sleep(1000);  // 模拟耗时操作
  48.                                 return $"Result from Thread {id}";
  49.                         }
  50.                 }
  51.         }
复制代码
使用 Task 和 async/await 实现
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Threading.Tasks;
  4.         namespace MultiThreadExample
  5.         {
  6.                 class Program
  7.                 {
  8.                         static async Task Main(string[] args)
  9.                         {
  10.                                 int N = 3;  // 设置线程数量
  11.                                 List<Task<string>> tasks = new List<Task<string>>();
  12.                                 // 创建并启动多个线程
  13.                                 for (int i = 0; i < N; i++)
  14.                                 {
  15.                                         int id = i;
  16.                                         Task<string> task = Task.Run(() => task_function(id));
  17.                                         tasks.Add(task);
  18.                                 }
  19.                                 // 等待所有线程完成
  20.                                 string[] results = await Task.WhenAll(tasks);
  21.                                 // 输出所有结果
  22.                                 foreach (string result in results)
  23.                                 {
  24.                                         Console.WriteLine($"Result from task: {result}");
  25.                                 }
  26.                                 Console.WriteLine("All tasks completed.");
  27.                         }
  28.                         static string task_function(int id)
  29.                         {
  30.                                 string result = perform_task(id);  // 执行任务
  31.                                 return result;
  32.                         }
  33.                         static string perform_task(int id)
  34.                         {
  35.                                 // 模拟任务执行
  36.                                 Console.WriteLine($"Task {id} is processing.");
  37.                                 Task.Delay(1000).Wait();  // 模拟耗时操作
  38.                                 return $"Result from Task {id}";
  39.                         }
  40.                 }
  41.         }
复制代码
4.2 多进程(Multiprocessing)

多进程通过创建多个独立进程实现并行,每个进程运行在不同的处理器核心上。进程间通过管道或消息队列等通信机制协调工作。多进程适用于需要高隔离性和安全性的任务,如科学计算和服务器应用。
代码示例:
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Diagnostics;
  4.         using System.IO.Pipes;
  5.         using System.Text;
  6.         using System.Threading.Tasks;
  7.         namespace MultiProcessExample
  8.         {
  9.                 class Program
  10.                 {
  11.                         static async Task Main(string[] args)
  12.                         {
  13.                                 int N = 3;  // 设置进程数量
  14.                                 List<Process> processes = new List<Process>();
  15.                                 List<Task<string>> readTasks = new List<Task<string>>();
  16.                                 // 创建命名管道服务器
  17.                                 using (NamedPipeServerStream pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.In, N, PipeTransmissionMode.Message, PipeOptions.Asynchronous))
  18.                                 {
  19.                                         // 创建并启动多个进程
  20.                                         for (int i = 0; i < N; i++)
  21.                                         {
  22.                                                 Process process = create_process(i);
  23.                                                 processes.Add(process);
  24.                                                 process.Start();
  25.                                                 // 创建一个任务来读取子进程的结果
  26.                                                 readTasks.Add(Task.Run(() => read_from_pipe(pipeServer)));
  27.                                         }
  28.                                         // 等待所有进程完成
  29.                                         foreach (var process in processes)
  30.                                         {
  31.                                                 process.WaitForExit();
  32.                                         }
  33.                                         // 等待所有读取任务完成
  34.                                         string[] results = await Task.WhenAll(readTasks);
  35.                                         // 输出所有结果
  36.                                         foreach (var result in results)
  37.                                         {
  38.                                                 Console.WriteLine($"Received result: {result}");
  39.                                         }
  40.                                         // 关闭命名管道服务器
  41.                                         pipeServer.Close();
  42.                                 }
  43.                                 Console.WriteLine("All processes completed.");
  44.                         }
  45.                         static Process create_process(int id)
  46.                         {
  47.                                 Process process = new Process();
  48.                                 process.StartInfo.FileName = "dotnet";
  49.                                 process.StartInfo.Arguments = $"MultiProcessExample.dll worker {id}";
  50.                                 process.StartInfo.UseShellExecute = false;
  51.                                 process.StartInfo.RedirectStandardOutput = true;
  52.                                 process.StartInfo.CreateNoWindow = true;
  53.                                 return process;
  54.                         }
  55.                         static async Task<string> read_from_pipe(NamedPipeServerStream pipeServer)
  56.                         {
  57.                                 // 等待客户端连接
  58.                                 await pipeServer.WaitForConnectionAsync();
  59.                                 // 创建字节数组来接收数据
  60.                                 byte[] buffer = new byte[1024];
  61.                                 int bytesRead = await pipeServer.ReadAsync(buffer, 0, buffer.Length);
  62.                                 string result = Encoding.UTF8.GetString(buffer, 0, bytesRead);
  63.                                 // 断开连接
  64.                                 pipeServer.Disconnect();
  65.                                 return result;
  66.                         }
  67.                 }
  68.         }
复制代码
**使用Worker类:
**
  1.         using System;
  2.         using System.IO.Pipes;
  3.         using System.Threading.Tasks;
  4.         namespace MultiProcessExample
  5.         {
  6.                 class Worker
  7.                 {
  8.                         static async Task Main(string[] args)
  9.                         {
  10.                                 if (args.Length != 2 || args[0] != "worker" || !int.TryParse(args[1], out int id))
  11.                                 {
  12.                                         Console.WriteLine("Invalid arguments.");
  13.                                         return;
  14.                                 }
  15.                                 // 创建命名管道客户端
  16.                                 using (NamedPipeClientStream pipeClient = new NamedPipeClientStream(".", "testpipe", PipeDirection.Out, PipeOptions.Asynchronous))
  17.                                 {
  18.                                         try
  19.                                         {
  20.                                                 // 连接到命名管道服务器
  21.                                                 await pipeClient.ConnectAsync();
  22.                                                 // 执行计算任务
  23.                                                 string result = compute(id);
  24.                                                 // 发送结果
  25.                                                 send_result(pipeClient, result);
  26.                                         }
  27.                                         catch (Exception ex)
  28.                                         {
  29.                                                 Console.WriteLine($"Error: {ex.Message}");
  30.                                         }
  31.                                 }
  32.                         }
  33.                         static string compute(int id)
  34.                         {
  35.                                 // 模拟计算任务
  36.                                 Console.WriteLine($"Worker {id} is computing...");
  37.                                 Task.Delay(1000).Wait();  // 模拟耗时操作
  38.                                 return $"Result from Worker {id}";
  39.                         }
  40.                         static void send_result(NamedPipeClientStream pipeClient, string result)
  41.                         {
  42.                                 try
  43.                                 {
  44.                                         // 将结果发送到命名管道
  45.                                         byte[] resultBytes = Encoding.UTF8.GetBytes(result);
  46.                                         pipeClient.Write(resultBytes, 0, resultBytes.Length);
  47.                                         pipeClient.Flush();
  48.                                 }
  49.                                 catch (Exception ex)
  50.                                 {
  51.                                         Console.WriteLine($"Error sending result: {ex.Message}");
  52.                                 }
  53.                         }
  54.                 }
  55.         }
复制代码
4.3 分布式计算(Distributed Computing)

分布式计算将任务分配到网络中的多台计算机上并行执行,通常使用消息传递接口(MPI)进行通信。适用于大规模数据处理和复杂计算任务,如天气预报和分布式数据库。
为了简化实现,我们可以使用一个简单的消息传递库,例如 NamedPipes 和 Task 来模拟MPI的行为。这里我们使用 NamedPipes 来进行进程间通信,并模拟主节点和工作节点之间的数据交换。
代码示例:
  1.         using System;
  2.         using System.Collections.Generic;
  3.         using System.Diagnostics;
  4.         using System.IO.Pipes;
  5.         using System.Text;
  6.         using System.Threading.Tasks;
  7.         namespace DistributedComputingExample
  8.         {
  9.                 class Program
  10.                 {
  11.                         static async Task Main(string[] args)
  12.                         {
  13.                                 int num_workers = 3;  // 设置工作节点数量
  14.                                 List<Process> workers = new List<Process>();
  15.                                 List<Task<string>> readTasks = new List<Task<string>>();
  16.                                 // 创建和启动工作节点
  17.                                 for (int i = 1; i <= num_workers; i++)
  18.                                 {
  19.                                         Process worker = create_worker_process(i);
  20.                                         workers.Add(worker);
  21.                                         worker.Start();
  22.                                 }
  23.                                 // 模拟主节点
  24.                                 if (args.Length == 0 || args[0] != "worker")
  25.                                 {
  26.                                         // 主节点逻辑
  27.                                         string data = load_data(num_workers);
  28.                                         Console.WriteLine("Data loaded.");
  29.                                         // 创建命名管道服务器来发送数据
  30.                                         List<NamedPipeServerStream> sendPipes = new List<NamedPipeServerStream>();
  31.                                         for (int i = 1; i <= num_workers; i++)
  32.                                         {
  33.                                                 NamedPipeServerStream sendPipe = new NamedPipeServerStream($"sendpipe_{i}", PipeDirection.Out, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
  34.                                                 sendPipes.Add(sendPipe);
  35.                                         }
  36.                                         // 发送数据到每个工作节点
  37.                                         for (int i = 1; i <= num_workers; i++)
  38.                                         {
  39.                                                 string data_chunk = data.Split('|')[i - 1];
  40.                                                 send_data(sendPipes[i - 1], data_chunk);
  41.                                         }
  42.                                         // 创建命名管道服务器来接收结果
  43.                                         List<NamedPipeServerStream> receivePipes = new List<NamedPipeServerStream>();
  44.                                         for (int i = 1; i <= num_workers; i++)
  45.                                         {
  46.                                                 NamedPipeServerStream receivePipe = new NamedPipeServerStream($"receivepipe_{i}", PipeDirection.In, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
  47.                                                 receivePipes.Add(receivePipe);
  48.                                         }
  49.                                         // 读取每个工作节点的结果
  50.                                         for (int i = 1; i <= num_workers; i++)
  51.                                         {
  52.                                                 readTasks.Add(Task.Run(() => read_from_pipe(receivePipes[i - 1])));
  53.                                         }
  54.                                         // 等待所有工作节点完成
  55.                                         foreach (var worker in workers)
  56.                                         {
  57.                                                 worker.WaitForExit();
  58.                                         }
  59.                                         // 等待所有读取任务完成
  60.                                         string[] results = await Task.WhenAll(readTasks);
  61.                                         // 聚合结果
  62.                                         string final_result = aggregate(results);
  63.                                         Console.WriteLine($"Final result: {final_result}");
  64.                                 }
  65.                                 else
  66.                                 {
  67.                                         // 工作节点逻辑
  68.                                         int id = int.Parse(args[1]);
  69.                                         Console.WriteLine($"Worker {id} started.");
  70.                                         // 创建命名管道客户端来接收数据
  71.                                         using (NamedPipeClientStream receivePipe = new NamedPipeClientStream(".", $"sendpipe_{id}", PipeDirection.In, PipeOptions.Asynchronous))
  72.                                         {
  73.                                                 await receivePipe.ConnectAsync();
  74.                                                 string data_chunk = receive_data(receivePipe);
  75.                                                 Console.WriteLine($"Worker {id} received data: {data_chunk}");
  76.                                                 // 处理数据
  77.                                                 string result = process(data_chunk);
  78.                                                 Console.WriteLine($"Worker {id} processed data: {result}");
  79.                                                 // 创建命名管道客户端来发送结果
  80.                                                 using (NamedPipeClientStream sendPipe = new NamedPipeClientStream(".", $"receivepipe_{id}", PipeDirection.Out, PipeOptions.Asynchronous))
  81.                                                 {
  82.                                                         await sendPipe.ConnectAsync();
  83.                                                         send_result(sendPipe, result);
  84.                                                 }
  85.                                         }
  86.                                 }
  87.                         }
  88.                         static Process create_worker_process(int id)
  89.                         {
  90.                                 Process process = new Process();
  91.                                 process.StartInfo.FileName = "dotnet";
  92.                                 process.StartInfo.Arguments = $"DistributedComputingExample.dll worker {id}";
  93.                                 process.StartInfo.UseShellExecute = false;
  94.                                 process.StartInfo.RedirectStandardOutput = true;
  95.                                 process.StartInfo.CreateNoWindow = true;
  96.                                 return process;
  97.                         }
  98.                         static string load_data(int num_chunks)
  99.                         {
  100.                                 // 模拟加载数据
  101.                                 string data = "DataChunk1|DataChunk2|DataChunk3";
  102.                                 return data;
  103.                         }
  104.                         static void send_data(NamedPipeServerStream pipe, string data)
  105.                         {
  106.                                 try
  107.                                 {
  108.                                         byte[] dataBytes = Encoding.UTF8.GetBytes(data);
  109.                                         pipe.Write(dataBytes, 0, dataBytes.Length);
  110.                                         pipe.Flush();
  111.                                         pipe.Disconnect();
  112.                                 }
  113.                                 catch (Exception ex)
  114.                                 {
  115.                                         Console.WriteLine($"Error sending data: {ex.Message}");
  116.                                 }
  117.                         }
  118.                         static string receive_data(NamedPipeClientStream pipe)
  119.                         {
  120.                                 try
  121.                                 {
  122.                                         byte[] buffer = new byte[1024];
  123.                                         int bytesRead = pipe.Read(buffer, 0, buffer.Length);
  124.                                         return Encoding.UTF8.GetString(buffer, 0, bytesRead);
  125.                                 }
  126.                                 catch (Exception ex)
  127.                                 {
  128.                                         Console.WriteLine($"Error receiving data: {ex.Message}");
  129.                                         return null;
  130.                                 }
  131.                         }
  132.                         static string process(string data_chunk)
  133.                         {
  134.                                 // 模拟任务处理
  135.                                 Console.WriteLine($"Processing data chunk: {data_chunk}");
  136.                                 Task.Delay(1000).Wait();  // 模拟耗时操作
  137.                                 return $"Processed {data_chunk}";
  138.                         }
  139.                         static void send_result(NamedPipeClientStream pipe, string result)
  140.                         {
  141.                                 try
  142.                                 {
  143.                                         byte[] resultBytes = Encoding.UTF8.GetBytes(result);
  144.                                         pipe.Write(resultBytes, 0, resultBytes.Length);
  145.                                         pipe.Flush();
  146.                                 }
  147.                                 catch (Exception ex)
  148.                                 {
  149.                                         Console.WriteLine($"Error sending result: {ex.Message}");
  150.                                 }
  151.                         }
  152.                         static async Task<string> read_from_pipe(NamedPipeServerStream pipe)
  153.                         {
  154.                                 await pipe.WaitForConnectionAsync();
  155.                                 byte[] buffer = new byte[1024];
  156.                                 int bytesRead = await pipe.ReadAsync(buffer, 0, buffer.Length);
  157.                                 string result = Encoding.UTF8.GetString(buffer, 0, bytesRead);
  158.                                 pipe.Disconnect();
  159.                                 return result;
  160.                         }
  161.                         static string aggregate(string[] results)
  162.                         {
  163.                                 // 聚合结果
  164.                                 StringBuilder finalResult = new StringBuilder();
  165.                                 foreach (string result in results)
  166.                                 {
  167.                                         finalResult.AppendLine(result);
  168.                                 }
  169.                                 return finalResult.ToString();
  170.                         }
  171.                 }
  172.         }
复制代码
5 实践运用

5.1 软件开发中的并行应用

并行广泛应用于需要高计算能力的场景,包括:

  • 科学模拟:天气预报、分子动力学等任务涉及大量方程求解,可通过并行化显著加速。
  • 机器学习:深度神经网络训练依赖矩阵运算,TensorFlow和PyTorch等框架利用GPU并行性加速训练过程。
  • 图像与视频处理:如3D渲染或视频滤镜应用,可将任务分配到多核或GPU上并行执行。
常见的并行编程模型包括:

  • T- PL:TPL是.NET中用于并行编程的一个强大库
  • OpenMP:基于指令的共享内存并行API,适用于C/C++和Fortran。
  • MPI(消息传递接口):分布式内存并行的标准,用于高性能计算集群。
  • CUDA:NVIDIA的并行计算平台,支持GPU上的细粒度并行。
5.2 软件开发中的并发应用

并发在需要处理多任务或事件的系统中至关重要,例如:

  • Web服务器:如Apache和Nginx,通过多线程、多进程或事件驱动架构并发处理大量客户端请求。
  • 图形用户界面(GUI):并发确保界面在执行后台任务(如数据加载)时仍能响应用户输入。
  • 数据库系统:通过锁和事务等并发控制机制,管理多用户对数据的并发访问。
常见的并发模型包括:

  • 多线程:C#、Java和C++提供线程库(如System.Thread、java.lang.Thread、std::thread)实现并发。
  • 异步编程:Node.js和Python的asyncio支持非阻塞代码,适用于I/O密集型任务。
  • Actor模型:Erlang和Akka框架通过独立的Actor单元和消息传递实现并发,避免共享内存问题。
6. 并发与并行编程的挑战

6.1 并发挑战

并发引入了多个复杂问题:

  • 竞争条件(Race Conditions):多个线程同时访问共享资源,可能导致不可预测的结果。例如,未同步的计数器递增可能丢失更新。
  • 死锁(Deadlocks):线程间相互等待对方释放资源,导致永久阻塞。例如,两个线程各自持有对方需要的锁。
  • 活锁(Livelocks):线程不断尝试解决问题但无进展,如反复让出资源。
  • 饥饿(Starvation):某些线程因调度不公而无法获得资源。
解决这些问题通常依赖同步原语(如互斥锁、信号量),但过度同步可能降低性能。
6.2 并行挑战

并行计算也有其难点:

  • 负载均衡:确保所有处理器或核心均匀分担工作量,避免部分核心空闲。
  • 通信开销:分布式系统中,节点间通信成本可能抵消并行收益。
  • 可扩展性:随着处理器数量增加,同步开销或串行部分可能导致收益递减。
并行算法需精心设计,采用动态负载均衡或工作窃取等技术应对这些挑战。
7. 管理并行与并发的工具与技术

7.1 调试与测试

并发与并行程序的非确定性使其调试异常困难,常用工具包括:

  • 静态分析:如Intel Inspector或FindBugs,可在不运行代码的情况下检测潜在问题。
  • 运行时验证:Valgrind的Helgrind等工具在程序运行时监控同步错误。
  • 测试框架:JUnit或pytest可扩展用于并发测试,模拟多线程场景。
7.2 设计模式

设计模式为常见问题提供解决方案:

  • 线程池:管理固定数量的线程执行任务,减少创建和销毁开销。
  • 生产者-消费者:生产者生成数据,消费者处理数据,通过同步队列协调。
  • Map-Reduce:将任务映射到数据分片并归约结果,适用于大数据处理。
7.3 编程语言支持

现代语言内置了对并行与并发的支持:

  • CSharp:通过TPL和System.Collections.Concurrent等库简化并发和并行编程。
  • Go:通过goroutines和通道简化并发编程。
  • Rust:通过所有权模型在编译时防止数据竞争。
  • Java:提供java.util.concurrent包,包括线程池、并发集合等高级工具。
8.并行与并发的权衡

8.1 复杂度与性能

并行与并发提升性能的同时增加了代码复杂度:

  • 多线程:提供细粒度控制,但易引入竞争条件。
  • 异步编程:避免线程开销,但可能导致回调地狱或复杂逻辑。
8.2 共享内存与消息传递

并发模型分为两种:

  • 共享内存:线程共享数据,需同步以避免冲突,效率高但易出错。
  • 消息传递:通过消息通信避免共享状态,安全性高但可能引入延迟。
如何选择取决于性能、安全性和应用需求。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册