亚洲乱色熟女一区二区三区丝袜,天堂√中文最新版在线,亚洲精品乱码久久久久久蜜桃图片,香蕉久久久久久av成人,欧美丰满熟妇bbb久久久

LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

C#編程中并行與并發(fā)的簡單理解

freeflydom
2025年9月9日 10:41 本文熱度 192

1.簡述

并發(fā)通過管理多個(gè)任務(wù)的執(zhí)行順序,確保系統(tǒng)在高負(fù)載下仍能保持響應(yīng)性;并行則利用多處理器或多核心硬件,真正同時(shí)執(zhí)行任務(wù),以加速計(jì)算。這兩者在高性能計(jì)算、實(shí)時(shí)系統(tǒng)和用戶交互應(yīng)用中發(fā)揮著不可替代的作用。

在多核處理器時(shí)代,傳統(tǒng)串行編程已無法充分利用硬件潛力。并行計(jì)算通過將任務(wù)分解到多個(gè)核心執(zhí)行,顯著縮短了計(jì)算時(shí)間。然而,并發(fā)與并行的實(shí)現(xiàn)并非沒有代價(jià),它們引入了諸如競爭條件、死鎖和負(fù)載均衡等復(fù)雜問題,需要開發(fā)者具備深厚的理論基礎(chǔ)和實(shí)踐經(jīng)驗(yàn)。

2.并發(fā)與并行

2.1 定義

  • 并發(fā)(Concurrency):

    • 指系統(tǒng)在一段時(shí)間內(nèi)管理多個(gè)任務(wù)的能力。并發(fā)關(guān)注任務(wù)的協(xié)調(diào)與交錯(cuò)執(zhí)行,通過時(shí)間分片等技術(shù)在一個(gè)或多個(gè)處理器上實(shí)現(xiàn),因此并發(fā)看似同時(shí)進(jìn)行,但不一定在同一時(shí)刻執(zhí)行。
    • 并發(fā)強(qiáng)調(diào)任務(wù)的邏輯組織和協(xié)調(diào)。
    • 舉例:一個(gè)Web服務(wù)器可以并發(fā)處理多個(gè)客戶端請求,通過快速切換任務(wù)確保每個(gè)請求都能及時(shí)響應(yīng)。
  • 并行(Parallelism):

    • 指多個(gè)任務(wù)在同一時(shí)刻真正同時(shí)執(zhí)行,通常依賴于多核處理器或分布式系統(tǒng)。其核心目標(biāo)是提升計(jì)算速度,通過將問題分解為獨(dú)立的子任務(wù)并同時(shí)處理。并行適用于計(jì)算密集型任務(wù)。
    • 并行關(guān)注物理執(zhí)行的并行性。
    • 舉例:在并行矩陣乘法中,不同的核心可以同時(shí)計(jì)算矩陣的不同部分,從而顯著縮短總計(jì)算時(shí)間;科學(xué)模擬或圖像處理,其效果依賴于多核處理器、GPU或分布式計(jì)算系統(tǒng)的硬件支持。

2.2 區(qū)別

并發(fā)與并行的根本區(qū)別在于執(zhí)行的時(shí)間性資源依賴性

  • 執(zhí)行模式:并行強(qiáng)調(diào)真正的同時(shí)執(zhí)行,而并發(fā)通過任務(wù)切換營造同時(shí)進(jìn)行的假象。
  • 硬件依賴:并行需要多處理器或多核心支持,而并發(fā)在單核系統(tǒng)上即可實(shí)現(xiàn)。
  • 目標(biāo):并行旨在加速計(jì)算,而并發(fā)注重系統(tǒng)響應(yīng)性和多任務(wù)處理能力。

例如,在單核系統(tǒng)中,操作系統(tǒng)通過時(shí)間片輪轉(zhuǎn)調(diào)度多個(gè)線程;而多核系統(tǒng)中,線程可以分配到不同核心并行運(yùn)行。

3.實(shí)現(xiàn)并發(fā)

3.1 并行實(shí)現(xiàn)并發(fā)

在多核處理器上,任務(wù)可以分配到不同核心并行執(zhí)行,從而實(shí)現(xiàn)高效并發(fā)。例如,Web服務(wù)器通過多線程并行處理客戶端請求。

代碼示例:多線程并行處理 :System.Threading.ThreadPool來創(chuàng)建和管理線程池,并使用ManualResetEventSlim來等待所有任務(wù)完成。

	using System;
	using System.Collections.Generic;
	using System.Threading;
	class Program
	{
		static void Main(string[] args)
		{
			List<Request> requests = new List<Request>
			{
				new Request { Data = "Request1" },
				new Request { Data = "Request2" },
				new Request { Data = "Request3" }
				// 添加更多請求
			};
			process_requests(requests);
			Console.WriteLine("All requests processed.");
		}
		static void process_requests(List<Request> requests)
		{
			int num_cores = Environment.ProcessorCount;  // 獲取處理器核心數(shù)
			ManualResetEventSlim[] mres = new ManualResetEventSlim[requests.Count];  // 創(chuàng)建信號量數(shù)組
			for (int i = 0; i < requests.Count; i++)
			{
				int index = i;
				mres[index] = new ManualResetEventSlim(false);  // 初始化信號量
				ThreadPool.QueueUserWorkItem((state) =>
				{
					handle_request(requests[index]);
					mres[index].Set();  // 任務(wù)完成時(shí)設(shè)置信號量
				});
			}
			// 等待所有任務(wù)完成
			ManualResetEventSlim.WaitAll(mres);
		}
		static void handle_request(Request request)
		{
			Response response = process(request);  // 處理請求
			send_response(response);  // 發(fā)送響應(yīng)
		}
		static Response process(Request request)
		{
			// 模擬請求處理邏輯
			Console.WriteLine($"Processing request: {request.Data}");
			Thread.Sleep(1000);  // 模擬耗時(shí)操作
			return new Response { Data = $"Response for {request.Data}" };
		}
		static void send_response(Response response)
		{
			// 模擬發(fā)送響應(yīng)邏輯
			Console.WriteLine($"Sending response: {response.Data}");
		}
	}
	class Request
	{
		public string Data { get; set; }
	}
	class Response
	{
		public string Data { get; set; }
	}

======================================================================================================================
使用Task.Run和Task.WhenAll來實(shí)現(xiàn)

	using System;
	using System.Collections.Generic;
	using System.Threading.Tasks;
	class Program
	{
		static async Task Main(string[] args)
		{
			List<Request> requests = new List<Request>
			{
				new Request { Data = "Request1" },
				new Request { Data = "Request2" },
				new Request { Data = "Request3" }
				// 添加更多請求
			};
			await process_requests(requests);
			Console.WriteLine("All requests processed.");
		}
		static async Task process_requests(List<Request> requests)
		{
			List<Task> tasks = new List<Task>();
			foreach (Request request in requests)
			{
				Task task = Task.Run(() => handle_request(request));
				tasks.Add(task);
			}
			// 等待所有任務(wù)完成
			await Task.WhenAll(tasks);
		}
		static async Task handle_request(Request request)
		{
			Response response = await process(request);  // 處理請求
			send_response(response);  // 發(fā)送響應(yīng)
		}
		static async Task<Response> process(Request request)
		{
			// 模擬請求處理邏輯
			Console.WriteLine($"Processing request: {request.Data}");
			await Task.Delay(1000);  // 模擬耗時(shí)操作
			return new Response { Data = $"Response for {request.Data}" };
		}
		static void send_response(Response response)
		{
			// 模擬發(fā)送響應(yīng)邏輯
			Console.WriteLine($"Sending response: {response.Data}");
		}
	}
	class Request
	{
		public string Data { get; set; }
	}
	class Response
	{
		public string Data { get; set; }
	}

3.2 任務(wù)調(diào)度

在單核處理器上,通過時(shí)間片輪轉(zhuǎn)等調(diào)度算法實(shí)現(xiàn)并發(fā)。操作系統(tǒng)在任務(wù)間快速切換,營造同時(shí)執(zhí)行的假象。

代碼示例:時(shí)間片輪轉(zhuǎn)調(diào)度 :示例使用了Task和CancellationToken來管理任務(wù)的時(shí)間片輪轉(zhuǎn)調(diào)度。

	using System;
	using System.Collections.Generic;
	using System.Threading;
	using System.Threading.Tasks;
	class Program
	{
		static async Task Main(string[] args)
		{
			List<Task> tasks = new List<Task>
			{
				run_task("Task1", 5000),  // 創(chuàng)建一個(gè)任務(wù),模擬總時(shí)間為5秒
				run_task("Task2", 3000),  // 創(chuàng)建一個(gè)任務(wù),模擬總時(shí)間為3秒
				run_task("Task3", 7000)   // 創(chuàng)建一個(gè)任務(wù),模擬總時(shí)間為7秒
			};
			int time_slice = 1000;  // 設(shè)置時(shí)間片為1秒
			await scheduler(tasks, time_slice);
			Console.WriteLine("All tasks processed.");
		}
		static async Task scheduler(List<Task> tasks, int time_slice)
		{
			List<Task> runningTasks = new List<Task>();
			List<Task> remainingTasks = new List<Task>(tasks);
			while (remainingTasks.Count > 0 || runningTasks.Count > 0)
			{
				// 將剩余任務(wù)中的第一個(gè)任務(wù)移到運(yùn)行列表
				if (remainingTasks.Count > 0)
				{
					runningTasks.Add(remainingTasks[0]);
					remainingTasks.RemoveAt(0);
				}
				// 復(fù)制運(yùn)行任務(wù)列表以避免在遍歷過程中修改列表
				List<Task> currentRunningTasks = new List<Task>(runningTasks);
				foreach (Task task in currentRunningTasks)
				{
					if (!task.IsCompleted)
					{
						await run_task_for_time_slice(task, time_slice);
						if (task.IsCompleted)
						{
							runningTasks.Remove(task);
						}
						else
						{
							remainingTasks.Add(task);
							runningTasks.Remove(task);
						}
					}
				}
			}
		}
		static async Task run_task_for_time_slice(Task task, int time_slice)
		{
			// 創(chuàng)建一個(gè)取消令牌源
			CancellationTokenSource cts = new CancellationTokenSource(time_slice);
			try
			{
				// 等待任務(wù)完成或時(shí)間片用完
				await task.WaitAsync(cts.Token);
			}
			catch (TaskCanceledException)
			{
				// 時(shí)間片用完,任務(wù)未完成
				Console.WriteLine($"Task {task.Id} preempted after {time_slice} ms");
			}
		}
		static Task run_task(string taskName, int total_time)
		{
			return Task.Run(async () =>
			{
				int elapsedTime = 0;
				int time_slice = 1000;  // 模擬內(nèi)部時(shí)間片
				while (elapsedTime < total_time)
				{
					Console.WriteLine($"{taskName} is running. Elapsed time: {elapsedTime} ms");
					await Task.Delay(time_slice);  // 模擬任務(wù)運(yùn)行一段時(shí)間
					elapsedTime += time_slice;
				}
				Console.WriteLine($"{taskName} is completed.");
			});
		}
	}

3.3 多線程

多線程通過創(chuàng)建多個(gè)執(zhí)行單元實(shí)現(xiàn)并發(fā)。線程共享進(jìn)程資源,通過同步機(jī)制(如互斥鎖)協(xié)調(diào)訪問。

代碼示例:多線程同步

	using System;
	using System.Collections.Generic;
	using System.Threading;
	using System.Threading.Tasks;
	namespace Test.EventBus
	{
		public class DemoB
		{
			private static Mutex mutex = new Mutex();  // 創(chuàng)建互斥鎖
			private static StringBuilder sharedData = new StringBuilder();  // 初始化共享數(shù)據(jù)
			public void ShowMsg(string name, string msg)
			{
				Console.WriteLine($"ShowMsg=> name:{name},msg:{msg}");
				var eventMsg = new EventMessage
				{
					Name = name,
					Msg = msg,
					CreatedDate = DateTime.Now
				};
				EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg));
			}
			public static void RunDemo()
			{
				List<Task> tasks = new List<Task>();
				for (int i = 0; i < 10; i++)
				{
					int taskId = i;
					tasks.Add(Task.Run(() => thread_function($"Task{taskId}")));
				}
				// 等待所有任務(wù)完成
				Task.WaitAll(tasks);
			}
			static void thread_function(string name)
			{
				for (int i = 0; i < 5; i++)
				{
					modify_shared_data(name, i);
				}
			}
			static void modify_shared_data(string name, int iteration)
			{
				mutex.WaitOne();  // 加鎖
				try
				{
					// 修改共享數(shù)據(jù)
					sharedData.AppendLine($"{name} is modifying shared data at iteration {iteration} on {DateTime.Now}");
					Console.WriteLine($"{name} is modifying shared data at iteration {iteration} on {DateTime.Now}");
				}
				finally
				{
					mutex.ReleaseMutex();  // 解鎖
				}
			}
		}
		public class EventMessage
		{
			public string Name { get; set; }
			public string Msg { get; set; }
			public DateTime CreatedDate { get; set; }
		}
		public static class EventPublishSubscribeUtils
		{
			public static void PublishEvent(EventMessage eventMsg, string eventName)
			{
				Console.WriteLine($"Published event: {eventName} => Name: {eventMsg.Name}, Msg: {eventMsg.Msg}, CreatedDate: {eventMsg.CreatedDate}");
			}
		}
		class Program
		{
			static void Main(string[] args)
			{
				DemoB.RunDemo();
				Console.WriteLine("All threads completed.");
			}
		}
	}

3.4 異步編程

異步編程通過事件循環(huán)和回調(diào)函數(shù)處理I/O密集型任務(wù),避免阻塞主線程。

代碼示例:異步I/O

using System;
using System.IO;
using System.Threading.Tasks;
namespace AsyncIOExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			string filePath1 = "example1.txt";
			string filePath2 = "example2.txt";
			// 創(chuàng)建模擬文件
			File.WriteAllText(filePath1, "Data from example1.txt");
			File.WriteAllText(filePath2, "Data from example2.txt");
			// 異步讀取文件并使用回調(diào)函數(shù)處理數(shù)據(jù)
			await async_read(filePath1, data => callback(data, filePath1));
			await async_read(filePath2, data => callback(data, filePath2));
			Console.WriteLine("All asynchronous read operations completed.");
		}
		static async Task async_read(string file, Action<string> callback)
		{
			// 模擬事件循環(huán)添加任務(wù)
			Console.WriteLine($"Starting asynchronous read for file: {file}");
			string data = await read_file(file);
			callback(data);
		}
		static async Task<string> read_file(string file)
		{
			// 模擬從磁盤讀取文件
			using (StreamReader reader = new StreamReader(file))
			{
				string data = await reader.ReadToEndAsync();
				return data;
			}
		}
		static void callback(string data, string file)
		{
			// 處理讀取后的數(shù)據(jù)
			Console.WriteLine($"Data read from {file}: {data}");
		}
	}
}

3.5 協(xié)程

協(xié)程通過yield和resume機(jī)制在單線程內(nèi)實(shí)現(xiàn)并發(fā),適用于I/O密集型任務(wù),具有低開銷優(yōu)勢。

代碼示例:協(xié)程

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CoroutineExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			IAsyncEnumerable<string> coroutine = coroutine_example();
			// 創(chuàng)建一個(gè)異步枚舉器
			IAsyncEnumerator<string> enumerator = coroutine.GetAsyncEnumerator();
			// 啟動(dòng)協(xié)程
			if (await enumerator.MoveNextAsync())
			{
				Console.WriteLine("Coroutine started.");
				// 發(fā)送數(shù)據(jù)并恢復(fù)執(zhí)行
				await enumerator.MoveNextAsync();
				enumerator.Current = "Data1";
				await enumerator.MoveNextAsync();
				enumerator.Current = "Data2";
				await enumerator.MoveNextAsync();
				enumerator.Current = "Data3";
				// 結(jié)束協(xié)程
				await enumerator.DisposeAsync();
			}
		}
		static async IAsyncEnumerable<string> coroutine_example()
		{
			string data = null;
			while (true)
			{
				// 暫停并接收數(shù)據(jù)
				await Task.Delay(100);  // 模擬等待
				data = yield return data;
				// 處理數(shù)據(jù)
				process(data);
			}
		}
		static void process(string data)
		{
			if (data != null)
			{
				Console.WriteLine($"Processed data: {data}");
			}
			else
			{
				Console.WriteLine("No data to process.");
			}
		}
	}
}

3.6 事件驅(qū)動(dòng)

事件驅(qū)動(dòng)編程通過事件循環(huán)監(jiān)聽和處理事件,適用于GUI和網(wǎng)絡(luò)應(yīng)用。

代碼示例:事件驅(qū)動(dòng)

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace EventDrivenExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			// 創(chuàng)建事件循環(huán)
			EventLoop eventLoop = new EventLoop();
			// 注冊事件處理函數(shù)
			eventLoop.RegisterHandler("Event1", Event1Handler);
			eventLoop.RegisterHandler("Event2", Event2Handler);
			// 模擬事件觸發(fā)
			eventLoop.TriggerEvent(new Event { Type = "Event1", Data = "Data for Event1" });
			eventLoop.TriggerEvent(new Event { Type = "Event2", Data = "Data for Event2" });
			// 啟動(dòng)事件循環(huán)
			await eventLoop.Start();
			Console.WriteLine("Event loop completed.");
		}
		static void Event1Handler(Event e)
		{
			Console.WriteLine($"Handling {e.Type} with data: {e.Data}");
		}
		static void Event2Handler(Event e)
		{
			Console.WriteLine($"Handling {e.Type} with data: {e.Data}");
		}
	}
	public class Event
	{
		public string Type { get; set; }
		public string Data { get; set; }
	}
	public class EventLoop
	{
		private Queue<Event> _eventQueue = new Queue<Event>();
		private Dictionary<string, Action<Event>> _handlers = new Dictionary<string, Action<Event>>();
		private bool _running = false;
		public void RegisterHandler(string eventType, Action<Event> handler)
		{
			if (_handlers.ContainsKey(eventType))
			{
				_handlers[eventType] += handler;
			}
			else
			{
				_handlers[eventType] = handler;
			}
		}
		public void TriggerEvent(Event e)
		{
			lock (_eventQueue)
			{
				_eventQueue.Enqueue(e);
			}
		}
		public async Task Start()
		{
			_running = true;
			while (_running)
			{
				Event e = null;
				lock (_eventQueue)
				{
					if (_eventQueue.Count > 0)
					{
						e = _eventQueue.Dequeue();
					}
				}
				if (e != null)
				{
					if (_handlers.TryGetValue(e.Type, out Action<Event> handler))
					{
						handler(e);
					}
					else
					{
						Console.WriteLine($"No handler registered for event type: {e.Type}");
					}
				}
				else
				{
					// 模擬等待事件
					await Task.Delay(100);  // 等待100毫秒
				}
			}
		}
		public void Stop()
		{
			_running = false;
		}
	}
}

3.7 多進(jìn)程

多進(jìn)程通過創(chuàng)建獨(dú)立進(jìn)程實(shí)現(xiàn)并發(fā),進(jìn)程間通過IPC(如管道或消息隊(duì)列)通信,適用于CPU密集型任務(wù)

在C#中,多進(jìn)程可以通過使用 System.Diagnostics.Process 類來創(chuàng)建和管理獨(dú)立進(jìn)程。進(jìn)程間通信(IPC)可以通過多種方式實(shí)現(xiàn),例如使用命名管道(System.IO.Pipes)或內(nèi)存映射文件(System.IO.MemoryMappedFiles)。在這個(gè)示例中,我們將使用命名管道來進(jìn)行進(jìn)程間通信。

代碼示例:多進(jìn)程

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipes;
using System.Text;
using System.Threading.Tasks;
namespace MultiProcessExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			int num_processes = 3;  // 設(shè)置進(jìn)程數(shù)量
			List<Process> processes = new List<Process>();
			List<Task<string>> readTasks = new List<Task<string>>();
			// 創(chuàng)建命名管道服務(wù)器
			var server = new NamedPipeServerStream("testpipe", PipeDirection.In, num_processes, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
			// 創(chuàng)建并啟動(dòng)進(jìn)程
			for (int i = 0; i < num_processes; i++)
			{
				Process p = new Process();
				p.StartInfo.FileName = "dotnet";
				p.StartInfo.Arguments = $"MultiProcessExample.dll worker {i}";
				p.StartInfo.UseShellExecute = false;
				p.StartInfo.RedirectStandardOutput = true;
				p.StartInfo.CreateNoWindow = true;
				p.Start();
				processes.Add(p);
				// 讀取子進(jìn)程的輸出
				readTasks.Add(Task.Run(() => read_from_process(p)));
			}
			// 等待所有進(jìn)程結(jié)束
			foreach (var process in processes)
			{
				process.WaitForExit();
			}
			// 等待所有讀取任務(wù)完成
			string[] results = await Task.WhenAll(readTasks);
			// 輸出所有結(jié)果
			foreach (var result in results)
			{
				Console.WriteLine($"Received result: {result}");
			}
			// 關(guān)閉命名管道服務(wù)器
			server.Close();
		}
		static string read_from_process(Process process)
		{
			// 讀取子進(jìn)程的標(biāo)準(zhǔn)輸出
			string result = process.StandardOutput.ReadToEnd();
			return result;
		}
	}
	class Worker
	{
		static async Task Main(string[] args)
		{
			if (args.Length != 2 || args[0] != "worker" || !int.TryParse(args[1], out int id))
			{
				Console.WriteLine("Invalid arguments.");
				return;
			}
			// 創(chuàng)建命名管道客戶端
			using (NamedPipeClientStream pipeClient = new NamedPipeClientStream(".", "testpipe", PipeDirection.Out, PipeOptions.Asynchronous))
			{
				try
				{
					// 連接到命名管道服務(wù)器
					await pipeClient.ConnectAsync();
					// 執(zhí)行計(jì)算任務(wù)
					string result = compute(id);
					// 發(fā)送結(jié)果
					send_result(pipeClient, result);
				}
				catch (Exception ex)
				{
					Console.WriteLine($"Error: {ex.Message}");
				}
			}
		}
		static string compute(int id)
		{
			// 模擬計(jì)算任務(wù)
			Console.WriteLine($"Worker {id} is computing...");
			Task.Delay(1000).Wait();  // 模擬耗時(shí)操作
			return $"Result from Worker {id}";
		}
		static void send_result(NamedPipeClientStream pipeClient, string result)
		{
			try
			{
				// 將結(jié)果發(fā)送到命名管道
				byte[] resultBytes = Encoding.UTF8.GetBytes(result);
				pipeClient.Write(resultBytes, 0, resultBytes.Length);
				pipeClient.Flush();
			}
			catch (Exception ex)
			{
				Console.WriteLine($"Error sending result: {ex.Message}");
			}
		}
	}
}

4.實(shí)現(xiàn)并行的技術(shù)

4.1 多線程(Multithreading)

多線程通過在單個(gè)或多個(gè)處理器核心上運(yùn)行多個(gè)線程來實(shí)現(xiàn)并行。在多核處理器上,線程可以真正并行執(zhí)行;在單核處理器上,通過時(shí)間片切換實(shí)現(xiàn)偽并行。多線程適用于I/O密集型和計(jì)算密集型任務(wù),能提高資源利用率和程序響應(yīng)速度。

代碼示例::使用了System.Threading.Thread來創(chuàng)建和管理多個(gè)線程,并使用Task來提交和等待任務(wù)的完成。

	using System;
	using System.Collections.Generic;
	using System.Threading;
	using System.Threading.Tasks;
	namespace MultiThreadExample
	{
		class Program
		{
			static void Main(string[] args)
			{
				int N = 3;  // 設(shè)置線程數(shù)量
				List<Thread> threads = new List<Thread>();
				List<string> results = new List<string>();
				object lockObject = new object();  // 同步鎖
				// 創(chuàng)建并啟動(dòng)多個(gè)線程
				for (int i = 0; i < N; i++)
				{
					int id = i;
					Thread thread = new Thread(() => task_function(id, results, lockObject));
					threads.Add(thread);
					thread.Start();
				}
				// 等待所有線程完成
				foreach (Thread thread in threads)
				{
					thread.Join();
				}
				// 輸出所有結(jié)果
				foreach (string result in results)
				{
					Console.WriteLine($"Result from thread: {result}");
				}
				Console.WriteLine("All threads completed.");
			}
			static void task_function(int id, List<string> results, object lockObject)
			{
				string result = perform_task(id);  // 執(zhí)行任務(wù)
				lock (lockObject)
				{
					results.Add(result);  // 將結(jié)果添加到共享列表并加鎖
				}
			}
			static string perform_task(int id)
			{
				// 模擬任務(wù)執(zhí)行
				Console.WriteLine($"Thread {id} is processing.");
				Thread.Sleep(1000);  // 模擬耗時(shí)操作
				return $"Result from Thread {id}";
			}
		}
	}

使用 Task 和 async/await 實(shí)現(xiàn)

	using System;
	using System.Collections.Generic;
	using System.Threading.Tasks;
	namespace MultiThreadExample
	{
		class Program
		{
			static async Task Main(string[] args)
			{
				int N = 3;  // 設(shè)置線程數(shù)量
				List<Task<string>> tasks = new List<Task<string>>();
				// 創(chuàng)建并啟動(dòng)多個(gè)線程
				for (int i = 0; i < N; i++)
				{
					int id = i;
					Task<string> task = Task.Run(() => task_function(id));
					tasks.Add(task);
				}
				// 等待所有線程完成
				string[] results = await Task.WhenAll(tasks);
				// 輸出所有結(jié)果
				foreach (string result in results)
				{
					Console.WriteLine($"Result from task: {result}");
				}
				Console.WriteLine("All tasks completed.");
			}
			static string task_function(int id)
			{
				string result = perform_task(id);  // 執(zhí)行任務(wù)
				return result;
			}
			static string perform_task(int id)
			{
				// 模擬任務(wù)執(zhí)行
				Console.WriteLine($"Task {id} is processing.");
				Task.Delay(1000).Wait();  // 模擬耗時(shí)操作
				return $"Result from Task {id}";
			}
		}
	}

4.2 多進(jìn)程(Multiprocessing)

多進(jìn)程通過創(chuàng)建多個(gè)獨(dú)立進(jìn)程實(shí)現(xiàn)并行,每個(gè)進(jìn)程運(yùn)行在不同的處理器核心上。進(jìn)程間通過管道或消息隊(duì)列等通信機(jī)制協(xié)調(diào)工作。多進(jìn)程適用于需要高隔離性和安全性的任務(wù),如科學(xué)計(jì)算和服務(wù)器應(yīng)用。

代碼示例:

	using System;
	using System.Collections.Generic;
	using System.Diagnostics;
	using System.IO.Pipes;
	using System.Text;
	using System.Threading.Tasks;
	namespace MultiProcessExample
	{
		class Program
		{
			static async Task Main(string[] args)
			{
				int N = 3;  // 設(shè)置進(jìn)程數(shù)量
				List<Process> processes = new List<Process>();
				List<Task<string>> readTasks = new List<Task<string>>();
				// 創(chuàng)建命名管道服務(wù)器
				using (NamedPipeServerStream pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.In, N, PipeTransmissionMode.Message, PipeOptions.Asynchronous))
				{
					// 創(chuàng)建并啟動(dòng)多個(gè)進(jìn)程
					for (int i = 0; i < N; i++)
					{
						Process process = create_process(i);
						processes.Add(process);
						process.Start();
						// 創(chuàng)建一個(gè)任務(wù)來讀取子進(jìn)程的結(jié)果
						readTasks.Add(Task.Run(() => read_from_pipe(pipeServer)));
					}
					// 等待所有進(jìn)程完成
					foreach (var process in processes)
					{
						process.WaitForExit();
					}
					// 等待所有讀取任務(wù)完成
					string[] results = await Task.WhenAll(readTasks);
					// 輸出所有結(jié)果
					foreach (var result in results)
					{
						Console.WriteLine($"Received result: {result}");
					}
					// 關(guān)閉命名管道服務(wù)器
					pipeServer.Close();
				}
				Console.WriteLine("All processes completed.");
			}
			static Process create_process(int id)
			{
				Process process = new Process();
				process.StartInfo.FileName = "dotnet";
				process.StartInfo.Arguments = $"MultiProcessExample.dll worker {id}";
				process.StartInfo.UseShellExecute = false;
				process.StartInfo.RedirectStandardOutput = true;
				process.StartInfo.CreateNoWindow = true;
				return process;
			}
			static async Task<string> read_from_pipe(NamedPipeServerStream pipeServer)
			{
				// 等待客戶端連接
				await pipeServer.WaitForConnectionAsync();
				// 創(chuàng)建字節(jié)數(shù)組來接收數(shù)據(jù)
				byte[] buffer = new byte[1024];
				int bytesRead = await pipeServer.ReadAsync(buffer, 0, buffer.Length);
				string result = Encoding.UTF8.GetString(buffer, 0, bytesRead);
				// 斷開連接
				pipeServer.Disconnect();
				return result;
			}
		}
	}

**使用Worker類:
**

	using System;
	using System.IO.Pipes;
	using System.Threading.Tasks;
	namespace MultiProcessExample
	{
		class Worker
		{
			static async Task Main(string[] args)
			{
				if (args.Length != 2 || args[0] != "worker" || !int.TryParse(args[1], out int id))
				{
					Console.WriteLine("Invalid arguments.");
					return;
				}
				// 創(chuàng)建命名管道客戶端
				using (NamedPipeClientStream pipeClient = new NamedPipeClientStream(".", "testpipe", PipeDirection.Out, PipeOptions.Asynchronous))
				{
					try
					{
						// 連接到命名管道服務(wù)器
						await pipeClient.ConnectAsync();
						// 執(zhí)行計(jì)算任務(wù)
						string result = compute(id);
						// 發(fā)送結(jié)果
						send_result(pipeClient, result);
					}
					catch (Exception ex)
					{
						Console.WriteLine($"Error: {ex.Message}");
					}
				}
			}
			static string compute(int id)
			{
				// 模擬計(jì)算任務(wù)
				Console.WriteLine($"Worker {id} is computing...");
				Task.Delay(1000).Wait();  // 模擬耗時(shí)操作
				return $"Result from Worker {id}";
			}
			static void send_result(NamedPipeClientStream pipeClient, string result)
			{
				try
				{
					// 將結(jié)果發(fā)送到命名管道
					byte[] resultBytes = Encoding.UTF8.GetBytes(result);
					pipeClient.Write(resultBytes, 0, resultBytes.Length);
					pipeClient.Flush();
				}
				catch (Exception ex)
				{
					Console.WriteLine($"Error sending result: {ex.Message}");
				}
			}
		}
	}

4.3 分布式計(jì)算(Distributed Computing)

分布式計(jì)算將任務(wù)分配到網(wǎng)絡(luò)中的多臺計(jì)算機(jī)上并行執(zhí)行,通常使用消息傳遞接口(MPI)進(jìn)行通信。適用于大規(guī)模數(shù)據(jù)處理和復(fù)雜計(jì)算任務(wù),如天氣預(yù)報(bào)和分布式數(shù)據(jù)庫。

為了簡化實(shí)現(xiàn),我們可以使用一個(gè)簡單的消息傳遞庫,例如 NamedPipes 和 Task 來模擬MPI的行為。這里我們使用 NamedPipes 來進(jìn)行進(jìn)程間通信,并模擬主節(jié)點(diǎn)和工作節(jié)點(diǎn)之間的數(shù)據(jù)交換。

代碼示例:

	using System;
	using System.Collections.Generic;
	using System.Diagnostics;
	using System.IO.Pipes;
	using System.Text;
	using System.Threading.Tasks;
	namespace DistributedComputingExample
	{
		class Program
		{
			static async Task Main(string[] args)
			{
				int num_workers = 3;  // 設(shè)置工作節(jié)點(diǎn)數(shù)量
				List<Process> workers = new List<Process>();
				List<Task<string>> readTasks = new List<Task<string>>();
				// 創(chuàng)建和啟動(dòng)工作節(jié)點(diǎn)
				for (int i = 1; i <= num_workers; i++)
				{
					Process worker = create_worker_process(i);
					workers.Add(worker);
					worker.Start();
				}
				// 模擬主節(jié)點(diǎn)
				if (args.Length == 0 || args[0] != "worker")
				{
					// 主節(jié)點(diǎn)邏輯
					string data = load_data(num_workers);
					Console.WriteLine("Data loaded.");
					// 創(chuàng)建命名管道服務(wù)器來發(fā)送數(shù)據(jù)
					List<NamedPipeServerStream> sendPipes = new List<NamedPipeServerStream>();
					for (int i = 1; i <= num_workers; i++)
					{
						NamedPipeServerStream sendPipe = new NamedPipeServerStream($"sendpipe_{i}", PipeDirection.Out, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
						sendPipes.Add(sendPipe);
					}
					// 發(fā)送數(shù)據(jù)到每個(gè)工作節(jié)點(diǎn)
					for (int i = 1; i <= num_workers; i++)
					{
						string data_chunk = data.Split('|')[i - 1];
						send_data(sendPipes[i - 1], data_chunk);
					}
					// 創(chuàng)建命名管道服務(wù)器來接收結(jié)果
					List<NamedPipeServerStream> receivePipes = new List<NamedPipeServerStream>();
					for (int i = 1; i <= num_workers; i++)
					{
						NamedPipeServerStream receivePipe = new NamedPipeServerStream($"receivepipe_{i}", PipeDirection.In, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
						receivePipes.Add(receivePipe);
					}
					// 讀取每個(gè)工作節(jié)點(diǎn)的結(jié)果
					for (int i = 1; i <= num_workers; i++)
					{
						readTasks.Add(Task.Run(() => read_from_pipe(receivePipes[i - 1])));
					}
					// 等待所有工作節(jié)點(diǎn)完成
					foreach (var worker in workers)
					{
						worker.WaitForExit();
					}
					// 等待所有讀取任務(wù)完成
					string[] results = await Task.WhenAll(readTasks);
					// 聚合結(jié)果
					string final_result = aggregate(results);
					Console.WriteLine($"Final result: {final_result}");
				}
				else
				{
					// 工作節(jié)點(diǎn)邏輯
					int id = int.Parse(args[1]);
					Console.WriteLine($"Worker {id} started.");
					// 創(chuàng)建命名管道客戶端來接收數(shù)據(jù)
					using (NamedPipeClientStream receivePipe = new NamedPipeClientStream(".", $"sendpipe_{id}", PipeDirection.In, PipeOptions.Asynchronous))
					{
						await receivePipe.ConnectAsync();
						string data_chunk = receive_data(receivePipe);
						Console.WriteLine($"Worker {id} received data: {data_chunk}");
						// 處理數(shù)據(jù)
						string result = process(data_chunk);
						Console.WriteLine($"Worker {id} processed data: {result}");
						// 創(chuàng)建命名管道客戶端來發(fā)送結(jié)果
						using (NamedPipeClientStream sendPipe = new NamedPipeClientStream(".", $"receivepipe_{id}", PipeDirection.Out, PipeOptions.Asynchronous))
						{
							await sendPipe.ConnectAsync();
							send_result(sendPipe, result);
						}
					}
				}
			}
			static Process create_worker_process(int id)
			{
				Process process = new Process();
				process.StartInfo.FileName = "dotnet";
				process.StartInfo.Arguments = $"DistributedComputingExample.dll worker {id}";
				process.StartInfo.UseShellExecute = false;
				process.StartInfo.RedirectStandardOutput = true;
				process.StartInfo.CreateNoWindow = true;
				return process;
			}
			static string load_data(int num_chunks)
			{
				// 模擬加載數(shù)據(jù)
				string data = "DataChunk1|DataChunk2|DataChunk3";
				return data;
			}
			static void send_data(NamedPipeServerStream pipe, string data)
			{
				try
				{
					byte[] dataBytes = Encoding.UTF8.GetBytes(data);
					pipe.Write(dataBytes, 0, dataBytes.Length);
					pipe.Flush();
					pipe.Disconnect();
				}
				catch (Exception ex)
				{
					Console.WriteLine($"Error sending data: {ex.Message}");
				}
			}
			static string receive_data(NamedPipeClientStream pipe)
			{
				try
				{
					byte[] buffer = new byte[1024];
					int bytesRead = pipe.Read(buffer, 0, buffer.Length);
					return Encoding.UTF8.GetString(buffer, 0, bytesRead);
				}
				catch (Exception ex)
				{
					Console.WriteLine($"Error receiving data: {ex.Message}");
					return null;
				}
			}
			static string process(string data_chunk)
			{
				// 模擬任務(wù)處理
				Console.WriteLine($"Processing data chunk: {data_chunk}");
				Task.Delay(1000).Wait();  // 模擬耗時(shí)操作
				return $"Processed {data_chunk}";
			}
			static void send_result(NamedPipeClientStream pipe, string result)
			{
				try
				{
					byte[] resultBytes = Encoding.UTF8.GetBytes(result);
					pipe.Write(resultBytes, 0, resultBytes.Length);
					pipe.Flush();
				}
				catch (Exception ex)
				{
					Console.WriteLine($"Error sending result: {ex.Message}");
				}
			}
			static async Task<string> read_from_pipe(NamedPipeServerStream pipe)
			{
				await pipe.WaitForConnectionAsync();
				byte[] buffer = new byte[1024];
				int bytesRead = await pipe.ReadAsync(buffer, 0, buffer.Length);
				string result = Encoding.UTF8.GetString(buffer, 0, bytesRead);
				pipe.Disconnect();
				return result;
			}
			static string aggregate(string[] results)
			{
				// 聚合結(jié)果
				StringBuilder finalResult = new StringBuilder();
				foreach (string result in results)
				{
					finalResult.AppendLine(result);
				}
				return finalResult.ToString();
			}
		}
	}

===================================================================
使用第三方MPI庫
安裝MPI庫:
安裝 OpenMPI 或 Microsoft MPI。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MPI;
namespace DistributedComputingExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			await MPI.StartMain(DistributedMain, args);
		}
		static async Task DistributedMain(string[] args)
		{
			int rank = MPI.Communicator.world.Rank;
			int size = MPI.Communicator.world.Size;
			if (rank == 0)
			{
				// 主節(jié)點(diǎn)邏輯
				string data = load_data(size);
				Console.WriteLine("Data loaded.");
				// 分配數(shù)據(jù)塊給每個(gè)工作節(jié)點(diǎn)
				for (int i = 1; i < size; i++)
				{
					string data_chunk = data.Split('|')[i - 1];
					send_data(data_chunk, i);
				}
				// 接收每個(gè)工作節(jié)點(diǎn)的結(jié)果
				List<string> results = new List<string>();
				for (int i = 1; i < size; i++)
				{
					string result = receive_result(i);
					results.Add(result);
				}
				// 聚合結(jié)果
				string final_result = aggregate(results);
				Console.WriteLine($"Final result: {final_result}");
			}
			else
			{
				// 工作節(jié)點(diǎn)邏輯
				string data_chunk = receive_data(0);
				Console.WriteLine($"Worker {rank} received data: {data_chunk}");
				// 處理數(shù)據(jù)
				string result = process(data_chunk);
				Console.WriteLine($"Worker {rank} processed data: {result}");
				// 發(fā)送結(jié)果到主節(jié)點(diǎn)
				send_result(result, 0);
			}
		}
		static string load_data(int num_chunks)
		{
			// 模擬加載數(shù)據(jù)
			string data = "DataChunk1|DataChunk2|DataChunk3";
			return data;
		}
		static void send_data(string data, int destination)
		{
			byte[] dataBytes = Encoding.UTF8.GetBytes(data);
			MPI.Communicator.world.Send(dataBytes, dataBytes.Length, destination, 0);
		}
		static string receive_data(int source)
		{
			int msgSize = MPI.Communicator.world.Receive<int>(source, 0);
			byte[] buffer = new byte[msgSize];
			MPI.Communicator.world.Receive(buffer, msgSize, source, 0);
			return Encoding.UTF8.GetString(buffer);
		}
		static string process(string data_chunk)
		{
			// 模擬任務(wù)處理
			Console.WriteLine($"Processing data chunk: {data_chunk}");
			Task.Delay(1000).Wait();  // 模擬耗時(shí)操作
			return $"Processed {data_chunk}";
		}
		static void send_result(string result, int destination)
		{
			byte[] resultBytes = Encoding.UTF8.GetBytes(result);
			MPI.Communicator.world.Send(resultBytes.Length, destination, 0);
			MPI.Communicator.world.Send(resultBytes, resultBytes.Length, destination, 0);
		}
		static string receive_result(int source)
		{
			int msgSize = MPI.Communicator.world.Receive<int>(source, 0);
			byte[] buffer = new byte[msgSize];
			MPI.Communicator.world.Receive(buffer, msgSize, source, 0);
			return Encoding.UTF8.GetString(buffer);
		}
		static string aggregate(List<string> results)
		{
			// 聚合結(jié)果
			StringBuilder finalResult = new StringBuilder();
			foreach (string result in results)
			{
				finalResult.AppendLine(result);
			}
			return finalResult.ToString();
		}
	}
}

4.4 GPU并行計(jì)算

GPU并行計(jì)算利用圖形處理單元(GPU)的多核心架構(gòu),通過CUDA或OpenCL等技術(shù)實(shí)現(xiàn)高度并行。適用于數(shù)據(jù)密集型任務(wù),如圖像處理和機(jī)器學(xué)習(xí)。

代碼示例:

使用 CUDAfy.NET 實(shí)現(xiàn)GPU并行計(jì)算的示例。假設(shè)我們有一個(gè)簡單的計(jì)算任務(wù),每個(gè)線程處理一個(gè)輸入元素并生成相應(yīng)的輸出元素。
安裝 CUDAfy.NET

using System;
using Cudafy;
using Cudafy.Host;
using Cudafy.Translator;
namespace GpuParallelComputingExample
{
	class Program
	{
		static void Main(string[] args)
		{
			// 加載輸入數(shù)據(jù)
			int[] input = { 1, 2, 3, 4, 5 };
			int[] output = new int[input.Length];
			// 獲取GPU設(shè)備
			GPGPU gpu = CudafyHost.GetDevice(eGPUType.Cuda);
			// 加載CUDA代碼
			gpu.LoadModule(typeof(Program));
			// 分配內(nèi)存并復(fù)制數(shù)據(jù)到GPU
			GPGPUDeviceVariable<int> d_input = gpu.Allocate(input);
			GPGPUDeviceVariable<int> d_output = gpu.Allocate(output);
			// 復(fù)制輸入數(shù)據(jù)到GPU
			gpu.CopyToDevice(input, d_input);
			gpu.CopyToDevice(output, d_output);
			// 啟動(dòng)CUDA內(nèi)核
			gpu.LaunchNewKernel(d_input.Size, 1, gpu_kernel, d_input, d_output);
			// 從GPU復(fù)制結(jié)果到主機(jī)
			gpu.CopyFromDevice(d_output, output);
			// 同步GPU操作
			gpu.Synchronize();
			// 輸出結(jié)果
			Console.WriteLine("Input: " + string.Join(", ", input));
			Console.WriteLine("Output: " + string.Join(", ", output));
			// 釋放GPU內(nèi)存
			d_input.Free();
			d_output.Free();
			gpu.FreeAll();
		}
		[Cudafy]
		public static void gpu_kernel(GPGPUThread thread, int[] input, int[] output)
		{
			int tid = thread.threadIdx.x;  // 獲取線程ID
			if (tid < input.Length)
			{
				output[tid] = compute(input[tid]);
			}
		}
		[Cudafy]
		public static int compute(int value)
		{
			// 模擬計(jì)算任務(wù),例如簡單的平方計(jì)算
			return value * value;
		}
	}
}

如果你更傾向于使用OpenCL而不是CUDA,可以使用 Managed OpenCL 庫。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ManagedOpenCL;
namespace GpuParallelComputingExample
{
	class Program
	{
		static void Main(string[] args)
		{
			// 加載輸入數(shù)據(jù)
			int[] input = { 1, 2, 3, 4, 5 };
			int[] output = new int[input.Length];
			// 獲取GPU設(shè)備
			CLPlatform platform = CLPlatform.GetPlatformIDs()[0];
			CLDevice device = platform.GetDeviceIDs()[0];
			CLContext context = CLContext.CreateContext(new[] { device });
			CLCommandQueue queue = context.CreateCommandQueue(device, CLCommandQueueProperties.None);
			// 創(chuàng)建內(nèi)核
			string kernelCode = @"
				__kernel void gpu_kernel(__global const int* input, __global int* output)
				{
					int tid = get_global_id(0);
					if (tid < get_global_size(0))
					{
						output[tid] = compute(input[tid]);
					}
				}
				int compute(int value)
				{
					return value * value;
				}
			";
			// 編譯內(nèi)核
			CLProgram program = context.CreateProgramWithSource(new[] { kernelCode });
			program.BuildProgram(new[] { device }, null, null, null);
			// 創(chuàng)建內(nèi)存緩沖區(qū)
			CLMemoryBuffer<int> d_input = context.CreateBuffer(CLMemoryFlags.CopyHostPtr, input);
			CLMemoryBuffer<int> d_output = context.CreateBuffer(CLMemoryFlags.WriteOnly, output.Length);
			// 設(shè)置內(nèi)核參數(shù)
			CLKernel kernel = program.CreateKernel("gpu_kernel");
			kernel.SetMemoryArgument(0, d_input);
			kernel.SetMemoryArgument(1, d_output);
			// 啟動(dòng)內(nèi)核
			queue.EnqueueNDRangeKernel(kernel, null, new[] { (long)input.Length }, null);
			// 從GPU復(fù)制結(jié)果到主機(jī)
			queue.EnqueueReadBuffer(d_output, true, 0, output.Length, output);
			// 同步GPU操作
			queue.Finish();
			// 輸出結(jié)果
			Console.WriteLine("Input: " + string.Join(", ", input));
			Console.WriteLine("Output: " + string.Join(", ", output));
			// 釋放資源
			d_input.Dispose();
			d_output.Dispose();
			queue.Dispose();
			program.Dispose();
			context.Dispose();
		}
	}
}

4.5 任務(wù)并行(Task Parallelism)

任務(wù)并行將一個(gè)大任務(wù)分解為多個(gè)獨(dú)立子任務(wù),并行執(zhí)行這些子任務(wù)。適用于任務(wù)間依賴較少的場景,如編譯器并行處理多個(gè)文件。

代碼示例:

	using System;
	using System.Collections.Generic;
	using System.Threading.Tasks;
	namespace TaskParallelExample
	{
		class Program
		{
			static async Task Main(string[] args)
			{
				// 創(chuàng)建任務(wù)列表
				List<Task> tasks = new List<Task>
				{
					task1(),
					task2(),
					task3()
				};
				// 等待所有任務(wù)完成
				await Task.WhenAll(tasks);
				Console.WriteLine("All tasks completed.");
			}
			static async Task task1()
			{
				Console.WriteLine($"Task1 started on thread: {Thread.CurrentThread.ManagedThreadId}");
				// 模擬耗時(shí)操作
				await Task.Delay(1000);
				Console.WriteLine($"Task1 completed on thread: {Thread.CurrentThread.ManagedThreadId}");
			}
			static async Task task2()
			{
				Console.WriteLine($"Task2 started on thread: {Thread.CurrentThread.ManagedThreadId}");
				// 模擬耗時(shí)操作
				await Task.Delay(1000);
				Console.WriteLine($"Task2 completed on thread: {Thread.CurrentThread.ManagedThreadId}");
			}
			static async Task task3()
			{
				Console.WriteLine($"Task3 started on thread: {Thread.CurrentThread.ManagedThreadId}");
				// 模擬耗時(shí)操作
				await Task.Delay(1000);
				Console.WriteLine($"Task3 completed on thread: {Thread.CurrentThread.ManagedThreadId}");
			}
		}
	}

4.6 數(shù)據(jù)并行(Data Parallelism)

數(shù)據(jù)并行將數(shù)據(jù)分割成多個(gè)部分,每個(gè)部分由不同的處理器或線程并行處理。適用于矩陣運(yùn)算和圖像處理等數(shù)據(jù)密集型任務(wù)。

代碼示例:

	using System;
	using System.Collections.Generic;
	using System.Linq;
	using System.Threading.Tasks;
	namespace DataParallelExample
	{
		class Program
		{
			static void Main(string[] args)
			{
				int N = 10;  // 設(shè)置數(shù)據(jù)數(shù)量
				List<int> input = Enumerable.Range(0, N).ToList();
				int[] output = new int[N];
				// 使用 Parallel.ForEach 實(shí)現(xiàn)數(shù)據(jù)并行
				Parallel.ForEach(input, (i, loopState) =>
				{
					output[i] = compute(i);
					Console.WriteLine($"Processed element {i} on thread: {Task.CurrentId}");
				});
				// 輸出結(jié)果
				Console.WriteLine("Input: " + string.Join(", ", input));
				Console.WriteLine("Output: " + string.Join(", ", output));
			}
			static int compute(int value)
			{
				// 模擬計(jì)算任務(wù),例如簡單的平方計(jì)算
				Console.WriteLine($"Computing value: {value} on thread: {Task.CurrentId}");
				Task.Delay(100).Wait();  // 模擬耗時(shí)操作
				return value * value;
			}
		}
	}

4.7 流水線并行(Pipeline Parallelism)

流水線并行將任務(wù)分解為一系列階段,每個(gè)階段由不同處理器或線程處理,形成處理流水線。適用于數(shù)據(jù)流處理和視頻編碼等場景。

代碼示例:

using System;
using System.Threading.Tasks;
namespace PipelineParallelExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			string input = "Initial Data";
			// 啟動(dòng)流水線并行
			string finalOutput = await StartPipeline(input);
			// 輸出最終結(jié)果
			Console.WriteLine($"Final Output: {finalOutput}");
		}
		static async Task<string> StartPipeline(string input)
		{
			// 第一階段
			string stage1Output = await stage1(input);
			// 第二階段
			string stage2Output = await stage2(stage1Output);
			// 第三階段
			string finalOutput = await stage3(stage2Output);
			return finalOutput;
		}
		static async Task<string> stage1(string input)
		{
			Console.WriteLine($"Stage 1 started with input: {input}");
			// 模擬耗時(shí)操作
			await Task.Delay(1000);
			string intermediate1 = $"Stage1: Processed {input}";
			Console.WriteLine($"Stage 1 completed with output: {intermediate1}");
			return intermediate1;
		}
		static async Task<string> stage2(string intermediate1)
		{
			Console.WriteLine($"Stage 2 started with input: {intermediate1}");
			// 模擬耗時(shí)操作
			await Task.Delay(1000);
			string intermediate2 = $"Stage2: Processed {intermediate1}";
			Console.WriteLine($"Stage 2 completed with output: {intermediate2}");
			return intermediate2;
		}
		static async Task<string> stage3(string intermediate2)
		{
			Console.WriteLine($"Stage 3 started with input: {intermediate2}");
			// 模擬耗時(shí)操作
			await Task.Delay(1000);
			string output = $"Stage3: Processed {intermediate2}";
			Console.WriteLine($"Stage 3 completed with output: {output}");
			return output;
		}
	}
}

4.8 Actor模型

Actor模型是一種并發(fā)計(jì)算模型,通過將系統(tǒng)分解為獨(dú)立執(zhí)行的Actor來實(shí)現(xiàn)并發(fā)和并行。每個(gè)Actor可以通過消息傳遞與其他演員通信,避免共享內(nèi)存和鎖的使用。常見的Actor模型有Orleans、Akka、Erlang等。

代碼示例:

using System;
using System.Threading.Tasks;
using Akka.Actor;
namespace ActorModelExample
{
	class Program
	{
		static async Task Main(string[] args)
		{
			// 創(chuàng)建Actor系統(tǒng)
			var system = ActorSystem.Create("ActorSystem");
			// 創(chuàng)建Actor1和Actor2
			var actor1 = system.ActorOf(Props.Create(() => new Actor1(system)), "Actor1");
			var actor2 = system.ActorOf(Props.Create(() => new Actor2(system)), "Actor2");
			// Actor1 發(fā)送 Ping 消息給 Actor2
			actor1.Tell(new PingMessage(actor2));
			// 等待一段時(shí)間以確保消息處理完成
			await Task.Delay(2000);
			// 停止兩個(gè)Actor
			actor1.Tell(new StopMessage());
			actor2.Tell(new StopMessage());
			// 等待一段時(shí)間以確保Actor停止完成
			await Task.Delay(1000);
			// 關(guān)閉Actor系統(tǒng)
			await system.Terminate();
		}
	}
	public class PingMessage
	{
		public IActorRef TargetActor { get; }
		public PingMessage(IActorRef targetActor)
		{
			TargetActor = targetActor;
		}
	}
	public class PongMessage
	{
		public IActorRef TargetActor { get; }
		public PongMessage(IActorRef targetActor)
		{
			TargetActor = targetActor;
		}
	}
	public class StopMessage { }
	public class Actor1 : ReceiveActor
	{
		private readonly ActorSystem _system;
		public Actor1(ActorSystem system)
		{
			_system = system;
			Receive<PingMessage>(ping =>
			{
				Console.WriteLine($"Actor1 received Ping from Actor {Sender.Path}");
				ping.TargetActor.Tell(new PongMessage(Self));
			});
			Receive<PongMessage>(pong =>
			{
				Console.WriteLine($"Actor1 received Pong from Actor {pong.TargetActor.Path}");
			});
			Receive<StopMessage>(_ =>
			{
				Console.WriteLine("Actor1 stopping.");
				Context.Stop(Self);
			});
		}
	}
	public class Actor2 : ReceiveActor
	{
		private readonly ActorSystem _system;
		public Actor2(ActorSystem system)
		{
			_system = system;
			Receive<PingMessage>(ping =>
			{
				Console.WriteLine($"Actor2 received Ping from Actor {Sender.Path}");
				ping.TargetActor.Tell(new PongMessage(Self));
			});
			Receive<StopMessage>(_ =>
			{
				Console.WriteLine("Actor2 stopping.");
				Context.Stop(Self);
			});
		}
	}
}

5 實(shí)踐運(yùn)用

5.1 軟件開發(fā)中的并行應(yīng)用

并行廣泛應(yīng)用于需要高計(jì)算能力的場景,包括:

  • 科學(xué)模擬:天氣預(yù)報(bào)、分子動(dòng)力學(xué)等任務(wù)涉及大量方程求解,可通過并行化顯著加速。
  • 機(jī)器學(xué)習(xí):深度神經(jīng)網(wǎng)絡(luò)訓(xùn)練依賴矩陣運(yùn)算,TensorFlow和PyTorch等框架利用GPU并行性加速訓(xùn)練過程。
  • 圖像與視頻處理:如3D渲染或視頻濾鏡應(yīng)用,可將任務(wù)分配到多核或GPU上并行執(zhí)行。

常見的并行編程模型包括:

  • T- PL:TPL是.NET中用于并行編程的一個(gè)強(qiáng)大庫
  • OpenMP:基于指令的共享內(nèi)存并行API,適用于C/C++和Fortran。
  • MPI(消息傳遞接口):分布式內(nèi)存并行的標(biāo)準(zhǔn),用于高性能計(jì)算集群。
  • CUDA:NVIDIA的并行計(jì)算平臺,支持GPU上的細(xì)粒度并行。

5.2 軟件開發(fā)中的并發(fā)應(yīng)用

并發(fā)在需要處理多任務(wù)或事件的系統(tǒng)中至關(guān)重要,例如:

  • Web服務(wù)器:如Apache和Nginx,通過多線程、多進(jìn)程或事件驅(qū)動(dòng)架構(gòu)并發(fā)處理大量客戶端請求。
  • 圖形用戶界面(GUI):并發(fā)確保界面在執(zhí)行后臺任務(wù)(如數(shù)據(jù)加載)時(shí)仍能響應(yīng)用戶輸入。
  • 數(shù)據(jù)庫系統(tǒng):通過鎖和事務(wù)等并發(fā)控制機(jī)制,管理多用戶對數(shù)據(jù)的并發(fā)訪問。

常見的并發(fā)模型包括:

  • 多線程:C#、Java和C++提供線程庫(如System.Thread、java.lang.Thread、std::thread)實(shí)現(xiàn)并發(fā)。
  • 異步編程:Node.js和Python的asyncio支持非阻塞代碼,適用于I/O密集型任務(wù)。
  • Actor模型:Erlang和Akka框架通過獨(dú)立的Actor單元和消息傳遞實(shí)現(xiàn)并發(fā),避免共享內(nèi)存問題。

6. 并發(fā)與并行編程的挑戰(zhàn)

6.1 并發(fā)挑戰(zhàn)

并發(fā)引入了多個(gè)復(fù)雜問題:

  • 競爭條件(Race Conditions):多個(gè)線程同時(shí)訪問共享資源,可能導(dǎo)致不可預(yù)測的結(jié)果。例如,未同步的計(jì)數(shù)器遞增可能丟失更新。
  • 死鎖(Deadlocks):線程間相互等待對方釋放資源,導(dǎo)致永久阻塞。例如,兩個(gè)線程各自持有對方需要的鎖。
  • 活鎖(Livelocks):線程不斷嘗試解決問題但無進(jìn)展,如反復(fù)讓出資源。
  • 饑餓(Starvation):某些線程因調(diào)度不公而無法獲得資源。

解決這些問題通常依賴同步原語(如互斥鎖、信號量),但過度同步可能降低性能。

6.2 并行挑戰(zhàn)

并行計(jì)算也有其難點(diǎn):

  • 負(fù)載均衡:確保所有處理器或核心均勻分擔(dān)工作量,避免部分核心空閑。
  • 通信開銷:分布式系統(tǒng)中,節(jié)點(diǎn)間通信成本可能抵消并行收益。
  • 可擴(kuò)展性:隨著處理器數(shù)量增加,同步開銷或串行部分可能導(dǎo)致收益遞減。

并行算法需精心設(shè)計(jì),采用動(dòng)態(tài)負(fù)載均衡或工作竊取等技術(shù)應(yīng)對這些挑戰(zhàn)。

7. 管理并行與并發(fā)的工具與技術(shù)

7.1 調(diào)試與測試

并發(fā)與并行程序的非確定性使其調(diào)試異常困難,常用工具包括:

  • 靜態(tài)分析:如Intel Inspector或FindBugs,可在不運(yùn)行代碼的情況下檢測潛在問題。
  • 運(yùn)行時(shí)驗(yàn)證:Valgrind的Helgrind等工具在程序運(yùn)行時(shí)監(jiān)控同步錯(cuò)誤。
  • 測試框架:JUnit或pytest可擴(kuò)展用于并發(fā)測試,模擬多線程場景。

7.2 設(shè)計(jì)模式

設(shè)計(jì)模式為常見問題提供解決方案:

  • 線程池:管理固定數(shù)量的線程執(zhí)行任務(wù),減少創(chuàng)建和銷毀開銷。
  • 生產(chǎn)者-消費(fèi)者:生產(chǎn)者生成數(shù)據(jù),消費(fèi)者處理數(shù)據(jù),通過同步隊(duì)列協(xié)調(diào)。
  • Map-Reduce:將任務(wù)映射到數(shù)據(jù)分片并歸約結(jié)果,適用于大數(shù)據(jù)處理。

7.3 編程語言支持

現(xiàn)代語言內(nèi)置了對并行與并發(fā)的支持:

  • CSharp:通過TPL和System.Collections.Concurrent等庫簡化并發(fā)和并行編程。
  • Go:通過goroutines和通道簡化并發(fā)編程。
  • Rust:通過所有權(quán)模型在編譯時(shí)防止數(shù)據(jù)競爭。
  • Java:提供java.util.concurrent包,包括線程池、并發(fā)集合等高級工具。

8.并行與并發(fā)的權(quán)衡

8.1 復(fù)雜度與性能

并行與并發(fā)提升性能的同時(shí)增加了代碼復(fù)雜度:

  • 多線程:提供細(xì)粒度控制,但易引入競爭條件。
  • 異步編程:避免線程開銷,但可能導(dǎo)致回調(diào)地獄或復(fù)雜邏輯。

8.2 共享內(nèi)存與消息傳遞

并發(fā)模型分為兩種:

  • 共享內(nèi)存:線程共享數(shù)據(jù),需同步以避免沖突,效率高但易出錯(cuò)。
  • 消息傳遞:通過消息通信避免共享狀態(tài),安全性高但可能引入延遲。

如何選擇取決于性能、安全性和應(yīng)用需求。

轉(zhuǎn)自https://www.cnblogs.com/chenshibao/p/18865227


該文章在 2025/9/9 10:42:22 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點(diǎn)晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點(diǎn)晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運(yùn)作、調(diào)度、堆場、車隊(duì)、財(cái)務(wù)費(fèi)用、相關(guān)報(bào)表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點(diǎn),圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點(diǎn)晴WMS倉儲(chǔ)管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲(chǔ)管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點(diǎn)晴免費(fèi)OA是一款軟件和通用服務(wù)都免費(fèi),不限功能、不限時(shí)間、不限用戶的免費(fèi)OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved