C#多线程与⾼并发处理并且具备暂停、继续、停⽌功能--近期有⼀个需要运⽤多线程的项⽬,会有并发概率,所以写了⼀份代码,可能有写地⽅还不完善,后续有需求在改
1///<summary>
2///并发对象
3///</summary>
4public class MeterAsyncQueue
5 {
6public MeterAsyncQueue()
7 {
8 MeterInfoTask = new MeterInfo();
9 }
10
11public MeterInfo MeterInfoTask { get; set; }
12 }
13public class MeterInfo
14 {
15public MeterInfo()
16 {
17
18 }
19public int Id { get; set; }
20
21 }
1///<summary>
2///线程通⽤类
3///</summary>
4public class TaskCommand
5 {
6 CancellationTokenSource tokenSource = new CancellationTokenSource();
7 ManualResetEvent resetEvent = new ManualResetEvent(true);
8 Thread thread = null;
9///<summary>
10///开始任务
11///</summary>
12public void StartData()
13 {
14 tokenSource = new CancellationTokenSource();
15 resetEvent = new ManualResetEvent(true);
16
17 List<int> Ids = new List<int>();
18for (int i = 0; i < 10000; i++)
19 {
20 Ids.Add(i);
21 }
22 thread = new Thread(new ThreadStart(() => StartTask(Ids)));
23 thread.Start();
24 }
25///<summary>
26///暂停任务
27///</summary>
28public void OutData()
29 {
30//task暂停
31 resetEvent.Reset();
32 }
33///<summary>
34///继续任务
35///</summary>
36public void ContinueData()
37 {
38//task继续
39 resetEvent.Set();
40 }
41///<summary>
writeline方法的作用42///取消任务
43///</summary>
44public void Cancel()
45 {
46//释放对象
47 resetEvent.Dispose();
48foreach (var CurrentTask in ParallelTasks)
49 {
50if (CurrentTask != null)
51 {
52if (CurrentTask.Status == TaskStatus.Running) { }
53 {
54//终⽌task线程
55 tokenSource.Cancel();
56 }
57 }
58 }
59 thread.Abort();
60 }
61///<summary>
62///执⾏数据
63///</summary>
64///<param name="Index"></param>
65public void Execute(int Index)
66 {
67//阻⽌当前线程
68 resetEvent.WaitOne();
69
70 Console.WriteLine("当前第" + Index + "个线程");
71
72 Thread.Sleep(1000);
73
74 }
75//队列对象
76private Queue<MeterAsyncQueue> AsyncQueues { get; set; }
77
78///<summary>
79///并发任务数
80///</summary>
81private int ParallelTaskCount { get; set; }
82
83
84///<summary>
85///并⾏任务集合
86///</summary>
87private List<Task> ParallelTasks { get; set; }
88//控制线程并⾏数量
89public void StartTask(List<int> Ids)
90 {
91 IsInitTask = true;
92 ParallelTasks = new List<Task>();
93 AsyncQueues = new Queue<MeterAsyncQueue>();
94//获取并发数
95 ParallelTaskCount = 5;
96
97//初始化异步队列
98 InitAsyncQueue(Ids);
99//开始执⾏队列任务
100 HandlingTask();
101
102 Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) }); 103 }
104///<summary>
105///初始化异步队列
106///</summary>
107private void InitAsyncQueue(List<int> Ids)
108 {
109foreach (var item in Ids)
110 {
111 MeterInfo info = new MeterInfo();
112 info.Id = item;
113 AsyncQueues.Enqueue(new MeterAsyncQueue()
114 {
115 MeterInfoTask = info
116 });
117 }
118 }
119///<summary>
120///是否⾸次执⾏任务
121///</summary>
122private bool IsInitTask { get; set; }
123//锁
124private readonly object _objLock = new object();
125
126///<summary>
127///开始执⾏队列任务
128///</summary>
129private void HandlingTask()
130 {
131lock (_objLock)
132 {
133if (AsyncQueues.Count <= 0)
134 {
135return;
136 }
137
138var loopCount = GetAvailableTaskCount();
139//并发处理队列
140for (int i = 0; i < loopCount; i++)
141 {
142 HandlingQueue();
143 }
144 IsInitTask = false;
145 }
146 }
147///<summary>
148///获取队列锁
149///</summary>
150private readonly object _queueLock = new object();
151
152///<summary>
153///处理队列
154///</summary>
155private void HandlingQueue()
156 {
157 CancellationToken token = tokenSource.Token;
158lock (_queueLock)
159 {
160if (AsyncQueues.Count > 0)
161 {
162var asyncQueue = AsyncQueues.Dequeue();
163
164if (asyncQueue == null) return;
165var task = Task.Factory.StartNew(() =>
166 {
167if (token.IsCancellationRequested)
168 {
169return;
170 }
171//阻⽌当前线程
172 resetEvent.WaitOne();
173//执⾏任务
174 Execute(asyncQueue.MeterInfoTask.Id);
175
176 }, token).ContinueWith(t =>
177 {
178 HandlingTask();
179 }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); 180 ParallelTasks.Add(task);
181 }
182 }
183 }
184///<summary>
185///获取当前有效并⾏的任务数
186///</summary>
187///<returns></returns>
188 [MethodImpl(MethodImplOptions.Synchronized)]
189private int GetAvailableTaskCount()
190 {
191if (IsInitTask)
192return ParallelTaskCount;
193return1;
194 }
195 }
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论