如何在long-running task中调用async方法|世界新消息
long-running task 是指那些长时间运行的任务,比如在一个 while True 中执行耗时较长的同步处理。
下面的例子中,我们不断从队列中尝试取出数据,并对这些数据进行处理,这样的任务就适合交给一个 long-running task 来处理。
(资料图片)
var queue = new BlockingCollection();Task.Factory.StartNew(() =>{ while (true) { // BlockingCollection.Take() 方法会阻塞当前线程,直到队列中有数据可以取出。 var input = queue.Take(); Console.WriteLine($"You entered: {input}"); }}, TaskCreationOptions.LongRunning);while (true){ var input = Console.ReadLine(); queue.Add(input);}
在 .NET 中,我们可以使用 Task.Factory.StartNew 方法并传入 TaskCreationOptions.LongRunning 来创建一个 long-running task。
虽然这种方式创建的 long-running task 和默认创建的 task 一样,都是分配给 ThreadPoolTaskScheduler 来调度的, 但 long-running task 会被分配到一个新的 Background 线程上执行,而不是交给 ThreadPool 中的线程来执行。
class ThreadPoolTaskScheduler : TaskScheduler{ // ... protected internal override void QueueTask(Task task) { TaskCreationOptions options = task.Options; if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0) { // 在一个新的 Background 线程上执行 long-running task。 new Thread(s_longRunningThreadWork) { IsBackground = true, Name = ".NET Long Running Task" }.UnsafeStart(task); } else { // 非 long-running task 交给 ThreadPool 中的线程来执行。 ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0); } } // ...}
为什么long-running task要和普通的task分开调度如果一个task持续占用一个线程,那么这个线程就不能被其他的task使用,这和 ThreadPool 的设计初衷是相违背的。
如果在 ThreadPool 中创建了大量的 long-running task,那么就会导致ThreadPool 中的线程不够用,从而影响到其他的 task 的执行。
在 long-running task await 一个 async 方法后会发生什么有时候,我们需要在 long-running task 中调用一个 async 方法。比如下面的例子中,我们需要在 long-running task 中调用一个 async的方法来处理数据。
var queue = new BlockingCollection();Task.Factory.StartNew(async () =>{ while (true) { var input = queue.Take(); Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); await ProcessAsync(input); Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); }}, TaskCreationOptions.LongRunning);async Task ProcessAsync(string input){ // 模拟一个异步操作。 await Task.Delay(100); Console.WriteLine($"You entered: {input}, thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");}while (true){ var input = Console.ReadLine(); queue.Add(input);}TaskScheduler InternalCurrentTaskScheduler(){ var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic); return (TaskScheduler)propertyInfo.GetValue(null);}
连续输入 1、2、3、4,输出如下:
1Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: FalseYou entered: 1, thread id: 4, task scheduler: , thread pool: TrueAfter process: thread id: 4, task scheduler: , thread pool: True2Before process: thread id: 4, task scheduler: , thread pool: TrueYou entered: 2, thread id: 4, task scheduler: , thread pool: TrueAfter process: thread id: 4, task scheduler: , thread pool: True3Before process: thread id: 4, task scheduler: , thread pool: TrueYou entered: 3, thread id: 4, task scheduler: , thread pool: TrueAfter process: thread id: 4, task scheduler: , thread pool: True4Before process: thread id: 4, task scheduler: , thread pool: TrueYou entered: 4, thread id: 4, task scheduler: , thread pool: TrueAfter process: thread id: 4, task scheduler: , thread pool: True
从执行结果中可以看出,第一次 await 之前,当前线程是 long-running task 所在的线程(thread id: 9),此后就变成了 ThreadPool中的线程(thread id: 4)。
至于为什么之后一直是 ThreadPool 中的线程(thread id: 4),这边做一下简单的解释。在我以前一篇介绍 await 的文章中介绍了 await 的执行过程,以及 await 之后的代码会在哪个线程上执行。https://www.cnblogs.com/eventhorizon/p/15912383.html
第一次 await 前,当前线程是 long-running task 所在的线程(thread id: 9),绑定了 TaskScheduler(ThreadPoolTaskScheduler),也就是说 await 之后的代码会被调度到 ThreadPool 中执行。第一次 await 之后的代码被调度到 ThreadPool 中的线程(thread id: 4)上执行。ThreadPool 中的线程不会绑定 TaskScheduler,也就意味着之后的代码还是会在 ThreadPool 中的线程上执行,并且是本地队列优先,所以一直是 thread id: 4 这个线程在从本地队列中取出任务在执行。线程池的介绍请参考我另一篇博客https://www.cnblogs.com/eventhorizon/p/15316955.html
回到本文的主题,如果在 long-running task 使用了 await 调用一个 async 方法,就会导致为 long-running task 分配的独立线程提前退出,和我们的预期不符。
long-running task 中 调用 一个 async 方法的可能姿势使用 Task.Wait在 long-running task 中调用一个 async 方法,可以使用 Task.Wait 来阻塞当前线程,直到 async 方法执行完毕。对于 Task.Factory.StartNew 创建出来的 long-running task 来说,因为其绑定了 ThreadPoolTaskScheduler,就算是使用 Task.Wait阻塞了当前线程,也不会导致死锁。并且 Task.Wait 会把异常抛出来,所以我们可以在 catch 中处理异常。
// ...Task.Factory.StartNew( () =>{ while (true) { var input = queue.Take(); Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); ProcessAsync(input).Wait(); Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}"); }}, TaskCreationOptions.LongRunning);// ...
输出如下:
1Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: FalseYou entered: 1, thread id: 5, task scheduler: , thread pool: TrueAfter process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False2Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: FalseYou entered: 2, thread id: 5, task scheduler: , thread pool: TrueAfter process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False3Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: FalseYou entered: 3, thread id: 5, task scheduler: , thread pool: TrueAfter process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False4Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: FalseYou entered: 4, thread id: 5, task scheduler: , thread pool: TrueAfter process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
Task.Wait 并不会对 async 方法内部产生影响,所以 async 方法内部的代码还是按照正常的逻辑执行。这边 ProcessAsync 方法内部打印的thread id 没变纯粹是因为 ThreadPool 目前就只创建了一个线程,你可以疯狂输入看看结果。
关于 Task.Wait 的使用,可以参考我另一篇博客https://www.cnblogs.com/eventhorizon/p/17481757.html
使用自定义的 TaskScheduler 来创建 long-running taskTask.Factory.StartNew(async () =>{ while (true) { var input = queue.Take(); Console.WriteLine( $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); await ProcessAsync(input); Console.WriteLine( $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); }}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());class CustomerTaskScheduler : TaskScheduler{ // 这边的 BlockingCollection 只是举个例子,如果是普通的队列,配合锁也是可以的。 private readonly BlockingCollection _tasks = new BlockingCollection(); public CustomerTaskScheduler() { var thread = new Thread(() => { foreach (var task in _tasks.GetConsumingEnumerable()) { TryExecuteTask(task); } }) { IsBackground = true }; thread.Start(); } protected override IEnumerable GetScheduledTasks() { return _tasks; } protected override void QueueTask(Task task) { _tasks.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }}
输出如下:
1Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseYou entered: 1, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseAfter process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False2Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseYou entered: 2, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseAfter process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False3Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseYou entered: 3, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseAfter process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False4Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseYou entered: 4, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: FalseAfter process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
因为修改了上下文绑定的 TaskScheduler,会影响到 async 方法内部 await 回调的执行。
这种做法不推荐使用,因为可能会导致死锁。
如果我将 await 改成 Task.Wait,就会导致死锁。
Task.Factory.StartNew(() =>{ while (true) { var input = queue.Take(); Console.WriteLine( $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); ProcessAsync(input).Wait(); Console.WriteLine( $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}"); }}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
输出如下:
1Before process: thread id: 7, task scheduler: CustomerTaskScheduler, thread pool: False
后面就没有输出了,因为死锁了,除非我们在 ProcessAsync 方法内部每个 await 的 Task 后加上ConfigureAwait(false)。
同理,同学们也可以尝试用 SynchronizationContext 来实现类似的效果,同样有死锁的风险。
总结如果你想要在一个 long-running task 中执行 async 方法,使用 await 关键字会导致 long-running task 的独立线程提前退出。
比较推荐的做法是使用 Task.Wait。如果连续执行多个 async 方法,建议将这些 async 方法封装成一个新方法,然后只 Wait 这个新方法的 Task。
关键词:
相关阅读
-
如何在long-running task中调用async方...
什么是long-runningthreadlong-runningtask是指那些长时间运行的任务 -
环球资讯:五星苏烟价格多少钱一包_5星...
问题:我想问你一个问题,就是5星苏烟多少钱一包?最近,我从朋友那里 -
湖北咸安:夏至,小荷才露尖尖角
夏至,是二十四节气中的一个重要节点,标志着炎炎夏日的开始。随着夏至 -
中国海警舰艇编队在我钓鱼岛领海内巡航
6月21日,中国海警2502舰艇编队在我钓鱼岛领海内巡航。这是中国海警 -
朗来科技获5亿元融资,礼来亚洲基金首次...
朗来科技获5亿元融资,礼来亚洲基金首次投资湖北---朗来科技公司现有员 -
劳动法中试用期合同是怎样规定的?
在试用期期间是否需要签订劳动合同,很多人都不清楚这个问题,那么劳动 -
天津蓟州郭家沟:美丽乡村“蝶变记” ...
天津蓟州郭家沟:美丽乡村“蝶变记” -
“江”派潮生活|零距离感受湘江新区渔...
编者按:走过生机盎然的春天,迎来繁花如锦的夏天。随着四季的不断变化 -
中俄组合输球,所有数据全面落后,杨钊...
北京时间2023年6月21日23时52分,WTA500德国柏林站女双第1轮,中国金花 -
北向资金今日净买入宁德时代5.5亿元|世...
北向资金今日净卖出6 41亿元。中国平安、比亚迪、中国中免分别遭净卖出 -
太康县人民医院上班时间_太康县人民医院
1、打12580。2、从12580里找举报电话 告医院 。本文到此分享完毕,希望 -
环球聚焦:蔚来CEO李斌称新能源购置税减...
每经AI快讯,6月21日,针对新能源购置税减免政策,蔚来创始人、董事长 -
这篇不是来吵架的 快播
刺猬为何竖起了刺。 -
当前速读:塞尔达传说荒野之息依盖队基...
塞尔达传说荒野之息依盖队基地怎么过?不少玩家表示打雷神兽之前的依盖 -
2022年惠州五一周边游景点推荐(五一惠...
2022年的五一活动就要来临啦,在惠州想出个市外游玩的,可以看看这些地 -
董晓禾(关于董晓禾的简介)
大家好,董晓禾,关于董晓禾的简介很多人还不知道,现在让我们一起来看 -
全国移民管理机构半年口岸边境缉毒4.74吨
中国日报北京6月21日电截至6月20日,全国移民管理机构上半年共在口岸、 -
【世界报资讯】退避三舍的主人公叫什么...
1、退避三舍晋文公重耳为履行被楚王收留时的承诺,两军交战时先后退90 -
信息:竞争还是合作?国内快递业的“三...
图片来源@视觉中国文|旗帜财观团“竞争进入白热化阶段的快递行业似... -
“中国流动科技馆”西秀站巡展活动开始...
顺风耳一个有想法的公众号!近日,以“快乐科普进校园助推双减提素...