什么是异步

同步和异步主要用于修饰方法。当一个方法被调用时,调用者需要等待该方法执行完毕并返回才能继续执行,我们称这个方法是同步方法;当一个方法被调用时立即返回,并获取一个线程执行该方法内部的业务,调用者不用等待该方法执行完毕,我们称这个方法为异步方法。

异步的好处在于非阻塞(调用线程不会暂停执行去等待子线程完成),因此我们把一些不需要立即使用结果、较耗时的任务设为异步执行,可以提高程序的运行效率。net4.0在ThreadPool的基础上推出了Task类,微软极力推荐使用Task来执行异步任务,现在C#类库中的异步方法基本都用到了Task,这称为基于任务的异步模式(TAP)

异步编程模式 | Microsoft Docs

net5.0推出了async/await,让异步编程更为方便。本篇主要介绍Task、async/await相关的内容。

ThreadPool介绍

Task是在ThreadPool的基础上推出的,我们简单了解下ThreadPool。ThreadPool中有若干数量的线程,如果有任务需要处理时,会从线程池中获取一个空闲的线程来执行任务,任务执行完毕后线程不会销毁,而是被线程池回收以供后续任务使用。当线程池中所有的线程都在忙碌时,又有新任务要处理时,线程池才会新建一个线程来处理该任务,如果线程数量达到设置的最大值,任务会排队,等待其他任务释放线程后再执行。线程池能减少线程的创建,节省开销,看一个ThreadPool的栗子吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
using System;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
for (int i = 1; i <= 10; i++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback((obj) =>
{
Console.WriteLine($"第{obj}个执行任务");
}),i);
}
}
}
}

执行了10个任务

ThreadPool相对于Thread来说可以减少线程的频繁创建过程,有效减小系统开销;但是ThreadPool不能控制线程的执行顺序,我们也不能获取线程池内线程取消/异常/完成的通知,即我们不能有效监控和控制线程池中的线程。

ThreadPool原理简析

这里简要的分析下CLR线程池,其实线程池中有一个叫做“全局队列”的概念,每一次我们使用QueueUserWorkItem的使用都会产生一个“工作项”,然后“工作项”进入“全局队列”进行排队,最后线程池中的的工作线程以FIFO(First Input First Output)的形式取出,这里值得一提的是在.net 4.0之后“全局队列”采用了无锁算法,相比以前版本锁定“全局队列”带来的性能瓶颈有了很大的改观。那么任务委托(Task)的线程池不光有“全局队列”,而且每一个工作线程都有”局部队列“。我们的第一反应肯定就是“局部队列“有什么好处呢?这里暂且不说,我们先来看一下线程池中的任务分配,如下图:

ThreadPool原理

线程池的工作方式大致如下,线程池的最小线程数是6,线程1~3正在执行任务1~3,当有新的任务时,就会向线程池请求新的线程,线程池会将空闲线程分配出去,当线程不足时,线程池就会创建新的线程来执行任务,直到线程池达到最大线程数(线程池满)。总的来说,只有有任务就会分配一个线程去执行,当FIFO十分频繁时,会造成很大的线程管理开销。而task更进一步优化了

下面我们来看一下task中是怎么做的,当我们new一个task的时候“工作项”就会进去”全局队列”,如果我们的task执行的非常快,那么“全局队列“就会FIFO的非常频繁,那么有什么办法缓解呢?当我们的task在嵌套的场景下,“局部队列”就要产生效果了,比如我们一个task里面有3个task,那么这3个task就会存在于“局部队列”中,如下图的任务一,里面有三个任务要执行,也就是产生了所谓的”局部队列”,当任务三的线程执行完成时,就会从任务一种的队列中以FIFO的形式”窃取”任务执行,从而减少了线程管理的开销。这就相当于,有两个人,一个人干完了分配给自己的所有活,而另一个人却还有很多的活,闲的人应该接手点忙的人的活,一起快速完成。

Task局部队列

从上面种种情况我们看到,这些分流和负载都是普通ThreadPool.QueueUserWorkItem所不能办到的,而且ThreadPool不能控制线程的执行顺序,我们也不能获取线程池内线程取消/异常/完成的通知,即我们不能有效监控和控制线程池中的线程,所以说在.net 4.0之后,我们尽可能的使用TPL,抛弃ThreadPool

Task创建和运行

net4.0在ThreadPool的基础上推出了Task,Task拥有线程池的优点,同时也解决了使用线程池不易控制的弊端。

首先看一下怎么去创建并运行一个Task,Task的创建和执行方式有如下三种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
//1. new方式实例化一个Task,需要通过Start方法启动
Task task1 = new Task(() =>
{
Thread.Sleep(100);
Console.WriteLine($"Hello,task1的线程ID为{Thread.CurrentThread.ManagedThreadId}");
});
task1.Start();

//2. Task.Factory.StartNew(Action action)创建并直接启动一个Task
Task task2 = Task.Factory.StartNew(() =>
{
Thread.Sleep(100);
Console.WriteLine($"Hello,task2的线程ID为{Thread.CurrentThread.ManagedThreadId}");
});

//3. Task.Run(Action action)将任务放在线程池队列,返回并启动一个Task
Task task3 = Task.Run(() =>
{
Thread.Sleep(100);
Console.WriteLine($"Hello,task3的线程ID为{Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine("执行主线程!");
Console.ReadKey();
}
}
}

执行结果

我们看到先打印”执行主线程”,然后再打印各个任务,说明了Task不会阻塞主线程。上边的栗子Task都没有返回值,我们也可以创建有返回值的Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
//1. new方式实例化一个Task<T>,需要通过Start方法启动
Task<string> task1 = new Task<string>(() =>
{
Thread.Sleep(100);
return $"Hello,task1的线程ID为{Thread.CurrentThread.ManagedThreadId}";
});
task1.Start();

//2.Task<T>.Factory.StartNew(Action action)创建并直接启动一个Task
Task<string> task2 = Task<string>.Factory.StartNew(() =>
{
Thread.Sleep(100);
return $"Hello,task2的线程ID为{Thread.CurrentThread.ManagedThreadId}";
});

//3.Task.Run<T>(Action action)将任务放在线程池队列,返回并启动一个Task

//注意这里的Run<string>完全可以不写<string>
Task<string> task3 = Task.Run<string>(() =>
{
Thread.Sleep(100);
return $"Hello,task3的线程ID为{Thread.CurrentThread.ManagedThreadId}";
});
Console.WriteLine("执行主线程!");
Console.WriteLine(task1.Result);
Console.WriteLine(task2.Result);
Console.WriteLine(task3.Result);
Console.ReadKey();
}
}
}

注意Task.Result获取结果时会阻塞线程,即如果task没有执行完成,会等待task执行完成获取到Result,然后再执行后面的代码

Task的属性IsCompleted, IsCanceled表示它是否完成和是否取消

如果我们想让Task同步执行,可以使用Task.RunSynchronously()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
Task task = new Task(() =>
{
Thread.Sleep(100);
Console.WriteLine("Task执行结束!");
});
task.RunSynchronously();
Console.WriteLine("主线程执行结束!");
Console.ReadKey();
}
}
}

Task同步运行

Task的阻塞方法(Wait、WaitAll、WaitAny)

使用Thread时,用Thread.Join()方法可以阻塞主线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
using System;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
Thread th1 = new Thread(() =>
{
Thread.Sleep(500);
Console.WriteLine("Thread1 Complete");
});
th1.Start();
Thread th2 = new Thread(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Thread2 Complete");
});
th2.Start();
//阻塞主线程
th1.Join();
th2.Join();
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

阻塞后的结果

如果不添加Join方法则会变成以下结果

不阻塞的结果

Thread的Join方法可以阻塞调用线程,但是有一些弊端:

  1. 如果我们要实现很多线程的阻塞时,每个线程都要调用一次Join方法;
  2. 如果我们想让所有的线程执行完毕(或者任一线程执行完毕)时,立即解除阻塞,使用Join方法不容易实现。Task提供了 Wait/WaitAny/WaitAll 方法,可以更方便地控制线程阻塞。

Task.Wait()表示等待task执行完毕,功能类似与Thread.Join(),Task.WaitAll(params Task[] tasks)表示只有所有的task都执行完成了再解除阻塞;Task.WaitAny(params Task[] tasks)表示只要有一个task执行完毕就解除阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
Task task1 = new Task(() =>
{
Thread.Sleep(500);
Console.WriteLine("Task1 Complete");
});
task1.Start();
Task task2 = new Task(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Task2 Complete");
});
task2.Start();
//阻塞主线程。task1\task2都执行完毕再执行主线程
//执行task1.Wait(); task2.Wait();可以实现相似功能
Task.WaitAll(task1, task2);
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

阻塞后的结果

Task的延续操作(WhenAny、WhenAll、ContinueWith)

上边的Wait/WaitAny/WaitAll方法返回值为void,这些方法单纯的实现阻塞线程。我们现在想让所有task执行完毕(或者任一task执行完毕)后,开始执行后续操作,怎么实现呢?这时就可以用到WhenAny/WhenAll方法了,这些方法执行完成返回一个task实例。 task.WhenAll(Task[] tasks) 表示所有的task都执行完毕后再去执行后续的操作, task.WhenAny(Task[] tasks) 表示任一task执行完毕后就开始执行后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
Task task1 = new Task(() =>
{
Thread.Sleep(500);
Console.WriteLine("Task1 Complete");
});
task1.Start();
Task task2 = new Task(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Task2 Complete");
});
task2.Start();
//阻塞主线程。task1\task2都执行完毕再执行后续操作

Task.WhenAll(task1, task2).ContinueWith(t =>
{
Thread.Sleep(100);
Console.WriteLine("执行后续操作完毕");
});
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

WhenAll结果

我们看到WhenAll/WhenAny方法不会阻塞主线程,当使用WhenAll方法时所有的task都执行完毕才会执行后续操作;如果把栗子中的WhenAll替换成WhenAny,则只要有一个线程执行完毕就会开始执行后续操作,这里不再演示。

上边的栗子也可以通过 Task.Factory.ContinueWhenAll(Task[] tasks, Action continuationAction)Task.Factory.ContinueWhenAny(Task[] tasks, Action continuationAction) 来实现 ,修改上边栗子代码如下,执行结果不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
Task task1 = new Task(() =>
{
Thread.Sleep(500);
Console.WriteLine("Task1 Complete");
});
task1.Start();
Task task2 = new Task(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Task2 Complete");
});
task2.Start();
//通过factory实现
Task.Factory.ContinueWhenAll(new Task[] { task1, task2 }, t =>
{
Thread.Sleep(100);
Console.WriteLine("执行后续操作完毕");
});
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

Task任务取消(CancellationTokenSource)

Thread取消任务执行

在Task前我们执行任务采用的是Thread,Thread怎么取消任务呢?一般流程是:设置一个变量来控制任务是否停止,如设置一个变量isStop,然后线程轮询查看isStop,如果isStop为true就停止,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
using System;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
bool isStop = false;
int index = 0;
Thread thread = new Thread(() =>
{
while (!isStop)
{
Thread.Sleep(500);
Console.WriteLine($"第{++index}次执行,线程运行中");
}
});
thread.Start();

Thread.Sleep(5000);
isStop = true;
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

Task取消任务执行

Task中有一个专门的类 CancellationTokenSource 来取消任务执行,还是使用上边的例子,我们修改代码如下,程序运行的效果不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
CancellationTokenSource source = new CancellationTokenSource();//一般为readonly
int index = 0;
Task task = new Task(() =>
{
while (!source.IsCancellationRequested)
{
Thread.Sleep(500);
Console.WriteLine($"第{++index}次执行,线程运行中");
}
});
task.Start();

Thread.Sleep(3000);
source.Cancel();//请求取消任务,将IsCancellationRequested变为true
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

CancellationTokenSource的功能不仅仅是取消任务执行,我们可以使用 source.CancelAfter(5000)实现5秒后自动取消任务,也可以通过 source.Token.Register(Action action)注册取消任务触发的回调函数,即任务被取消时注册的action会被执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using System;
using System.Threading.Tasks;
using System.Threading;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
CancellationTokenSource source = new CancellationTokenSource();
//注册任务取消事件
source.Token.Register(() =>
{
Console.WriteLine("任务被取消后执行xx操作");
});
int index = 0;
Task task = new Task(() =>
{
while (!source.IsCancellationRequested)
{
Thread.Sleep(500);
Console.WriteLine($"第{++index}次执行,线程运行中");
}
});
task.Start();

source.CancelAfter(3000);//延时取消
Console.WriteLine("主线程处理完毕");
Console.ReadKey();

}
}
}

执行结果

因为第六次任务执行时有Thread.Sleep(500),所以会比任务取消执行的方法慢。

异步方法(async、await)

在C#5.0中出现的async和await ,让异步编程变得更简单。我们看一个获取文件内容的栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
using System;
using System.Text;
using System.Threading.Tasks;
using System.IO;

namespace TaskLearningApp1
{
class Program
{
static void Main(string[] args)
{
string content = GetContentAsync(Environment.CurrentDirectory + @"\text.txt").Result;
//string content2 = GetContentSync(Environment.CurrentDirectory + @"\text.txt");
Console.WriteLine(content);
Console.ReadKey();

}
async static Task<string> GetContentAsync(string filename)
{
FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.ReadWrite);
var bytes = new byte[fs.Length];

//ReadAsync方法异步读取内容,不阻塞线程
Console.WriteLine("开始读取文件");
int len = await fs.ReadAsync(bytes, 0, bytes.Length);
string result = Encoding.UTF8.GetString(bytes);
return result;
}
static string GetContentSync(string filename)
{
FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.ReadWrite);
var bytes = new byte[fs.Length];

Console.WriteLine("开始读取文件");
int len = fs.Read(bytes, 0, bytes.Length);
string result = Encoding.UTF8.GetString(bytes);
return result;
}
}
}

上边的栗子也写出了同步读取的方式,将main函数中的注释去掉即可同步读取文件内容。我们可以看到异步读取代码和同步读取代码基本一致。async/await让异步编码变得更简单,我们可以像写同步代码一样去写异步代码。注意一个小问题:异步方法中方法签名(async/await)返回值为Task,代码中的返回值为T。上边栗子中GetContentAsync的签名返回值为Task,而代码中返回值为string。

异步方法签名的返回值有以下三种:

① Task<TResult>:如果调用方法想通过调用异步方法获取一个T类型的返回值,那么签名必须为Task<TResult>;

② Task:如果调用方法不想通过异步方法获取一个值,仅仅想追踪异步方法的执行状态,那么我们可以设置异步方法签名的返回值为Task;

③ void:如果调用方法仅仅只是调用一下异步方法,不和异步方法做其他交互,我们可以设置异步方法签名的返回值为void,这种形式也叫做“调用并忘记”。

C# 中的异步编程 | Microsoft Docs

我们从官方的文章中可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var breakfastTasks = new List<Task> { eggsTask, baconTask, toastTask };
while (breakfastTasks.Count > 0)
{
//Task.WhenAny可以接受任何IEnumrable<Task>的参数,list<Task>、Task[]都可以
//await Task Methods(),返回的就是Task
Task finishedTask = await Task.WhenAny(breakfastTasks);
if (finishedTask == eggsTask)
{
Console.WriteLine("Eggs are ready");
}
else if (finishedTask == baconTask)
{
Console.WriteLine("Bacon is ready");
}
else if (finishedTask == toastTask)
{
Console.WriteLine("Toast is ready");
}
breakfastTasks.Remove(finishedTask);
}

async修饰符会向编译器发出信号,说明此方法包含await语句,也包含异步操作,比如说明此方法是一个返回Task的异步方法。

await修饰符挂起它修饰的方法,这个方法可以返回Task、Task<TResult>或void,这个异步方法的控制权会一层层地交给最终调用方。

使用 Async和 Await 的任务异步编程 (TAP) 模型 (C#) | Microsoft Docs

微软的官网有详尽的解释,下面是官方示例的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public async Task<int> GetUrlContentLengthAsync()
{
var client = new HttpClient();

Task<string> getStringTask =
client.GetStringAsync("https://docs.microsoft.com/dotnet");

DoIndependentWork();

string contents = await getStringTask;

return contents.Length;
}

void DoIndependentWork()
{
Console.WriteLine("Working...");
}

下面是一篇详尽的异步编程博客

异步编程(async&await) - Jonins - 博客园 (cnblogs.com)