C#多线程与⾼并发处理并且具备暂停、继续、停⽌功能--近期有⼀个需要运⽤多线程的项⽬,会有并发概率,所以写了⼀份代码,可能有写地⽅还不完善,后续有需求在改
1///<summary>
2///并发对象
3///</summary>
4public class MeterAsyncQueue
5    {
6public MeterAsyncQueue()
7        {
8            MeterInfoTask = new MeterInfo();
9        }
10
11public MeterInfo MeterInfoTask { get; set; }
12    }
13public class MeterInfo
14    {
15public MeterInfo()
16        {
17
18        }
19public int Id { get; set; }
20
21    }
1///<summary>
2///线程通⽤类
3///</summary>
4public class TaskCommand
5    {
6        CancellationTokenSource tokenSource = new CancellationTokenSource();
7        ManualResetEvent resetEvent = new ManualResetEvent(true);
8        Thread thread = null;
9///<summary>
10///开始任务
11///</summary>
12public void StartData()
13        {
14            tokenSource = new CancellationTokenSource();
15            resetEvent = new ManualResetEvent(true);
16
17            List<int> Ids = new List<int>();
18for (int i = 0; i < 10000; i++)
19            {
20                Ids.Add(i);
21            }
22            thread = new Thread(new ThreadStart(() => StartTask(Ids)));
23            thread.Start();
24        }
25///<summary>
26///暂停任务
27///</summary>
28public void OutData()
29        {
30//task暂停
31            resetEvent.Reset();
32        }
33///<summary>
34///继续任务
35///</summary>
36public void ContinueData()
37        {
38//task继续
39            resetEvent.Set();
40        }
41///<summary>
writeline方法的作用42///取消任务
43///</summary>
44public void Cancel()
45        {
46//释放对象
47            resetEvent.Dispose();
48foreach (var CurrentTask in ParallelTasks)
49            {
50if (CurrentTask != null)
51                {
52if (CurrentTask.Status == TaskStatus.Running) { }
53                    {
54//终⽌task线程
55                        tokenSource.Cancel();
56                    }
57                }
58            }
59            thread.Abort();
60        }
61///<summary>
62///执⾏数据
63///</summary>
64///<param name="Index"></param>
65public void Execute(int Index)
66        {
67//阻⽌当前线程
68            resetEvent.WaitOne();
69
70            Console.WriteLine("当前第" + Index + "个线程");
71
72            Thread.Sleep(1000);
73
74        }
75//队列对象
76private Queue<MeterAsyncQueue> AsyncQueues { get; set; }
77
78///<summary>
79///并发任务数
80///</summary>
81private int ParallelTaskCount { get; set; }
82
83
84///<summary>
85///并⾏任务集合
86///</summary>
87private List<Task> ParallelTasks { get; set; }
88//控制线程并⾏数量
89public void StartTask(List<int> Ids)
90        {
91            IsInitTask = true;
92            ParallelTasks = new List<Task>();
93            AsyncQueues = new Queue<MeterAsyncQueue>();
94//获取并发数
95            ParallelTaskCount = 5;
96
97//初始化异步队列
98            InitAsyncQueue(Ids);
99//开始执⾏队列任务
100            HandlingTask();
101
102            Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) }); 103        }
104///<summary>
105///初始化异步队列
106///</summary>
107private void InitAsyncQueue(List<int> Ids)
108        {
109foreach (var item in Ids)
110            {
111                MeterInfo info = new MeterInfo();
112                info.Id = item;
113                AsyncQueues.Enqueue(new MeterAsyncQueue()
114                {
115                    MeterInfoTask = info
116                });
117            }
118        }
119///<summary>
120///是否⾸次执⾏任务
121///</summary>
122private bool IsInitTask { get; set; }
123//锁
124private readonly object _objLock = new object();
125
126///<summary>
127///开始执⾏队列任务
128///</summary>
129private void HandlingTask()
130        {
131lock (_objLock)
132            {
133if (AsyncQueues.Count <= 0)
134                {
135return;
136                }
137
138var loopCount = GetAvailableTaskCount();
139//并发处理队列
140for (int i = 0; i < loopCount; i++)
141                {
142                    HandlingQueue();
143                }
144                IsInitTask = false;
145            }
146        }
147///<summary>
148///获取队列锁
149///</summary>
150private readonly object _queueLock = new object();
151
152///<summary>
153///处理队列
154///</summary>
155private void HandlingQueue()
156        {
157            CancellationToken token = tokenSource.Token;
158lock (_queueLock)
159            {
160if (AsyncQueues.Count > 0)
161                {
162var asyncQueue = AsyncQueues.Dequeue();
163
164if (asyncQueue == null) return;
165var task = Task.Factory.StartNew(() =>
166                    {
167if (token.IsCancellationRequested)
168                        {
169return;
170                        }
171//阻⽌当前线程
172                        resetEvent.WaitOne();
173//执⾏任务
174                        Execute(asyncQueue.MeterInfoTask.Id);
175
176                    }, token).ContinueWith(t =>
177                    {
178                        HandlingTask();
179                    }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); 180                    ParallelTasks.Add(task);
181                }
182            }
183        }
184///<summary>
185///获取当前有效并⾏的任务数
186///</summary>
187///<returns></returns>
188        [MethodImpl(MethodImplOptions.Synchronized)]
189private int GetAvailableTaskCount()
190        {
191if (IsInitTask)
192return ParallelTaskCount;
193return1;
194        }
195    }

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。