還不知道C#中的異步迭代器?
今天來寫寫C#中的異步迭代器 - 機(jī)制、概念和一些好用的特性
?
迭代器的概念
迭代器的概念在C#中出現(xiàn)的比較早,很多人可能已經(jīng)比較熟悉了。
通常迭代器會(huì)用在一些特定的場景中。
舉個(gè)例子:有一個(gè)foreach循環(huán):
foreach?(var?item?in?Sources)
{
????Console.WriteLine(item);
}
這個(gè)循環(huán)實(shí)現(xiàn)了一個(gè)簡單的功能:把Sources中的每一項(xiàng)在控制臺中打印出來。
有時(shí)候,Sources可能會(huì)是一組完全緩存的數(shù)據(jù),例如:List:
IEnumerable<string>?Sources(int?x)
{
????var?list?=?new?List<string>();
????for?(int?i?=?0;?i?5;?i++)
????????list.Add($"result?from?Sources,?x={x},?result?{i}");
????return?list;
}
這里會(huì)有一個(gè)小問題:在我們打印Sources的第一個(gè)的數(shù)據(jù)之前,要先運(yùn)行完整運(yùn)行Sources()方法來準(zhǔn)備數(shù)據(jù),在實(shí)際應(yīng)用中,這可能會(huì)花費(fèi)大量時(shí)間和內(nèi)存。更有甚者,Sources可能是一個(gè)無邊界的列表,或者不定長的開放式列表,比方一次只處理一個(gè)數(shù)據(jù)項(xiàng)目的隊(duì)列,或者本身沒有邏輯結(jié)束的隊(duì)列。
這種情況,C#給出了一個(gè)很好的迭代器解決:
IEnumerable<string>?Sources(int?x)
{
????for?(int?i?=?0;?i?5;?i++)
????????yield?return?$"result?from?Sources,?x={x},?result?{i}";
}
這個(gè)方式的工作原理與上一段代碼很像,但有一些根本的區(qū)別 - 我們沒有用緩存,而只是每次讓一個(gè)元素可用。
為了幫助理解,來看看foreach在編譯器中的解釋:
using?(var?iter?=?Sources.GetEnumerator())
{
????while?(iter.MoveNext())
????{
????????var?item?=?iter.Current;
????????Console.WriteLine(item);
????}
}
當(dāng)然,這個(gè)是省略掉很多東西后的概念解釋,我們不糾結(jié)這個(gè)細(xì)節(jié)。但大體的意思是這樣的:編譯器對傳遞給foreach的表達(dá)式調(diào)用GetEnumerator(),然后用一個(gè)循環(huán)去檢查是否有下一個(gè)數(shù)據(jù)(MoveNext()),在得到肯定答案后,前進(jìn)并訪問Current屬性。而這個(gè)屬性代表了前進(jìn)到的元素。
?
上面這個(gè)例子,我們通過MoveNext()/Current方式訪問了一個(gè)沒有大小限制的向前的列表。我們還用到了yield迭代器這個(gè)很復(fù)雜的東西 - 至少我是這么認(rèn)為的。
我們把上面的例子中的yield去掉,改寫一下看看:
IEnumerable<string>?Sources(int?x)?=>?new?GeneratedEnumerable(x);
class?GeneratedEnumerable?:?IEnumerable<string>
{
????private?int?x;
????public?GeneratedEnumerable(int?x)?=>?this.x?=?x;
????public?IEnumerator<string>?GetEnumerator()?=>?new?GeneratedEnumerator(x);
????IEnumerator?IEnumerable.GetEnumerator()?=>?GetEnumerator();
}
class?GeneratedEnumerator?:?IEnumerator<string>
{
????private?int?x,?i;
????public?GeneratedEnumerator(int?x)?=>?this.x?=?x;
????public?string?Current?{?get;?private?set;?}
????object?IEnumerator.Current?=>?Current;
????public?void?Dispose()?{?}
????public?bool?MoveNext()
????{
????????if?(i?5)
????????{
????????????Current?=?$"result?from?Sources,?x={x},?result?{i}";
????????????i++;
????????????return?true;
????????}
????????else
????????{
????????????return?false;
????????}
????}
????void?IEnumerator.Reset()?=>?throw?new?NotSupportedException();
}
這樣寫完,對照上面的yield迭代器,理解工作過程就比較容易了:
首先,我們給出一個(gè)對象
IEnumerable。注意,IEnumerable和IEnumerator是不同的。當(dāng)我們調(diào)用
Sources時(shí),就創(chuàng)建了GeneratedEnumerable。它存儲狀態(tài)參數(shù)x,并公開了需要的IEnumerable方法。后面,在需要
foreach迭代數(shù)據(jù)時(shí),會(huì)調(diào)用GetEnumerator(),而它又調(diào)用GeneratedEnumerator以充當(dāng)數(shù)據(jù)上的游標(biāo)。MoveNext()方法邏輯上實(shí)現(xiàn)了for循環(huán),只不過,每次調(diào)用MoveNext()只執(zhí)行一步。更多的數(shù)據(jù)會(huì)通過Current回傳過來。另外補(bǔ)充一點(diǎn):MoveNext()方法中的return false對應(yīng)于yield break關(guān)鍵字,用于終止迭代。
是不是好理解了?
?
下面說說異步中的迭代器。
異步中的迭代器
上面的迭代,是同步的過程。而現(xiàn)在Dotnet開發(fā)工作更傾向于異步,使用async/await來做,特別是在提高服務(wù)器的可伸縮性方面應(yīng)用特別多。
上面的代碼最大的問題,在于MoveNext()。很明顯,這是個(gè)同步的方法。如果它運(yùn)行需要一段時(shí)間,那線程就會(huì)被阻塞。這會(huì)讓代碼執(zhí)行過程變得不可接受。
我們能做得最接近的方法是異步獲取數(shù)據(jù):
async?Taskstring>>?Sources(int?x)?{...}
但是,異步獲取數(shù)據(jù)并不能解決數(shù)據(jù)緩存延遲的問題。
好在,C#為此特意增加了對異步迭代器的支持:
public?interface?IAsyncEnumerable
{
????IAsyncEnumerator?GetAsyncEnumerator(CancellationToken?cancellationToken?=?default);
}
public?interface?IAsyncEnumerator?:?IAsyncDisposable
{
????T?Current?{?get;?}
????ValueTask<bool>?MoveNextAsync();
}
public?interface?IAsyncDisposable
{
????ValueTask?DisposeAsync();
}
注意,從.NET Standard 2.1和.NET Core 3.0開始,異步迭代器已經(jīng)包含在框架中了。而在早期版本中,需要手動(dòng)引入:
#?dotnet?add?package?Microsoft.Bcl.AsyncInterfaces
目前這個(gè)包的版本號是5.0.0。
?
還是上面例子的邏輯:
IAsyncEnumerable<string>?Source(int?x)?=>?throw?new?NotImplementedException();
看看foreach可以await后的樣子:
await?foreach?(var?item?in?Sources)
{
????Console.WriteLine(item);
}
編譯器會(huì)將它解釋為:
await?using?(var?iter?=?Sources.GetAsyncEnumerator())
{
????while?(await?iter.MoveNextAsync())
????{
????????var?item?=?iter.Current;
????????Console.WriteLine(item);
????}
}
這兒有個(gè)新東西:await using。與using用法相同,但釋放時(shí)會(huì)調(diào)用DisposeAsync,而不是Dispose,包括回收清理也是異步的。
這段代碼其實(shí)跟前邊的同步版本非常相似,只是增加了await。但是,編譯器會(huì)分解并重寫異步狀態(tài)機(jī),它就變成異步的了。原理不細(xì)說了,不是本文關(guān)注的內(nèi)容。
那么,帶有yield的迭代器如何異步呢?看代碼:
async?IAsyncEnumerable<string>?Sources(int?x)
{
????for?(int?i?=?0;?i?5;?i++)
????{
????????await?Task.Delay(100);?//?這兒模擬異步延遲
????????yield?return?$"result?from?Sources,?x={x},?result?{i}";
????}
}
嗯,看著就舒服。
?
這就完了?圖樣圖森破。異步有一個(gè)很重要的特性:取消。
那么,怎么取消異步迭代?
異步迭代的取消
異步方法通過CancellationToken來支持取消。異步迭代也不例外??纯瓷厦?code style="font-size: inherit;line-height: inherit;word-wrap: break-word;padding: 2px 4px;border-top-left-radius: 4px;border-top-right-radius: 4px;border-bottom-right-radius: 4px;border-bottom-left-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(233, 105, 0);background-color: rgb(248, 248, 248);">IAsyncEnumeratorGetAsyncEnumerator()方法中。
那么,如果是手工循環(huán)呢?我們可以這樣寫:
await?foreach?(var?item?in?Sources.WithCancellation(cancellationToken).ConfigureAwait(false))
{
????Console.WriteLine(item);
}
這個(gè)寫法等同于:
var?iter?=?Sources.GetAsyncEnumerator(cancellationToken);
await?using?(iter.ConfigureAwait(false))
{
????while?(await?iter.MoveNextAsync().ConfigureAwait(false))
????{
????????var?item?=?iter.Current;
????????Console.WriteLine(item);
????}
}
沒錯(cuò),ConfigureAwait也適用于DisposeAsync()。所以最后就變成了:
await?iter.DisposeAsync().ConfigureAwait(false);
?
異步迭代的取消捕獲做完了,接下來怎么用呢?
看代碼:
IAsyncEnumerable<string>?Sources(int?x)?=>?new?SourcesEnumerable(x);
class?SourcesEnumerable?:?IAsyncEnumerable<string>
{
????private?int?x;
????public?SourcesEnumerable(int?x)?=>?this.x?=?x;
????public?async?IAsyncEnumerator<string>?GetAsyncEnumerator(CancellationToken?cancellationToken?=?default)
????{
????????for?(int?i?=?0;?i?5;?i++)
????????{
????????????await?Task.Delay(100,?cancellationToken);?//?模擬異步延遲
????????????yield?return?$"result?from?Sources,?x={x},?result?{i}";
????????}
????}
}
如果有CancellationToken通過WithCancellation傳過來,迭代器會(huì)在正確的時(shí)間被取消 - 包括異步獲取數(shù)據(jù)期間(例子中的Task.Delay期間)。當(dāng)然我們還可以在迭代器中任何一個(gè)位置檢查IsCancellationRequested或調(diào)用ThrowIfCancellationRequested()。
此外,編譯器也會(huì)通過[EnumeratorCancellation]來完成這個(gè)任務(wù),所以我們還可以這樣寫:
async?IAsyncEnumerable<string>?Sources(int?x,?[EnumeratorCancellation]?CancellationToken?cancellationToken?=?default)
{
????for?(int?i?=?0;?i?5;?i++)
????{
????????await?Task.Delay(100,?cancellationToken);?//?模擬異步延遲
????????yield?return?$"result?from?Sources,?x={x},?result?{i}";
????}
}
這個(gè)寫法與上面的代碼其實(shí)是一樣的,區(qū)別在于加了一個(gè)參數(shù)。
實(shí)際應(yīng)用中,我們有下面幾種寫法上的選擇:
//?不取消
await?foreach?(var?item?in?Sources)
//?通過WithCancellation取消
await?foreach?(var?item?in?Sources.WithCancellation(cancellationToken))
//?通過SourcesAsync取消
await?foreach?(var?item?in?SourcesAsync(cancellationToken))
//?通過SourcesAsync和WithCancellation取消
await?foreach?(var?item?in?SourcesAsync(cancellationToken).WithCancellation(cancellationToken))
//?通過不同的Token取消
await?foreach?(var?item?in?SourcesAsync(tokenA).WithCancellation(tokenB))
幾種方式區(qū)別于應(yīng)用場景,實(shí)質(zhì)上沒有區(qū)別。對兩個(gè)Token的方式,任何一個(gè)Token被取消時(shí),任務(wù)會(huì)被取消。
總結(jié)
同步迭代其實(shí)在各個(gè)代碼中用的都比較多,但異步迭代用得很好。一方面,這是個(gè)相對新的東西,另一方面,是會(huì)有點(diǎn)繞,所以很多人都不敢碰。
今天這個(gè),也是個(gè)人的一些經(jīng)驗(yàn)總結(jié),希望對大家理解迭代能有所幫助。


臥槽,又來一個(gè)神奇的網(wǎng)站!

GitHub上這個(gè)微信防撤回的開源項(xiàng)目,99%的程序員不知道
