C# - Apresentando Async Streams


Hoje veremos o recurso async streams  introduzido na versão 8. da linguagem C#.

A programação assíncrona é uma forma de programação usada para evitar o bloqueio da thread principal evitando o congelamento da aplicação que esta em atendimento. Na linguagem C# a programação assíncrona é feita usando async e await.

Recordando IEnumerable

Hoje vamos tratar do recurso dos streams assíncronos que facilita a criação e o consumo de enumeráveis assíncronos, portanto, antes de entrar no novo recurso, você primeiro precisa entender a interface IEnumerable.

A interface IEnumerable<T> esta presente deste o .NET Framework 2 e fornece uma forma segura de iterar sobre qualquer coleção. A iteração esta baseada no tipo IEnumerator<T> implementado assim:

public interface IEnumerator<T> : IDisposable
{
    bool MoveNext();
    T Current;
    void Reset();
}

Destaques:

- Note que o método MoveNext nos move para o próximo elemento e retorna true quando existe um elemento, e a seguir Current retorna o elemento;

- O método Reset() fornece uma forma de redefinir o iterador para o início;

- O IEnumerable<T> é IDisposable, portanto, sua implementação pode realizar a limpeza de recursos. Observe que o parâmetro genérico é um parâmetro externo. Esta palavra-chave permite converter o IEnumerable <T> em um tipo base IEnumerable <TBase>.

A palavra chave foreach fornece uma forma bem simples de consumir coleções Enumeráveis:

foreach (var item in colecaoEnumerable)
         Console.WriteLine(item);

A palavra chave yield fornece uma forma simples de implementar um IEnumerable<T> com um método e permite ao compilador descobrir como implementar a interface:

IEnumerable<int> MeuEnumerable
{
    get
    {
        for (int i = 0; i < 3; i++)
            yield return i;

        yield return 100;
    }
}

Este código faz com que o compilador gere um tipo que implementa IEnumerable<int>, o qual rastreia informação suficiente para saber onde estamos na iteração [0,1,2,....100].

Ocorre que os métodos que produzem streams de dados como o IEnumerable<T> exigiam que um código mais elaborado fosse definido para realizar o tratamento assíncrono.

Por este motivo no C# 8.0 foi introduzido a interface IAsyncEnumerable.

Streams Assíncronos

Os streams assíncronos usam o tipo IAsyncEnumerator<T>. Este tipo é semelhante a IEnumerator<T>, mas com um método Move que é assíncrono (ele retorna um tipo semelhante a Task):

Graças ao método assíncrono Move, podemos agora esperar de forma assíncrona pelo próximo item. Isso significa que podemos esperar sem bloquear uma thread.

Observe que o método retorna um ValueTask, o que torna a chamada livre de alocação quando o próximo item já está disponível. (Para saber mais sobre ValueTask leia o meu artigo)

Dessa forma IAsyncEnumerable<T> é uma boa correspondência para eventos que acontecem com pouca frequência ou dados que são recebidos de forma assíncrona (por exemplo, pela rede). Como o IAsyncEnumerable sabe a que taxa estamos extraindo os itens, ele pode decidir sobre a quantidade de dados que armazena em buffer e quando pedir mais dados à fonte de envio de dados.

Da mesma forma que para IEnumerable<T>, a linguagem C# fornece suporte de primeira classe para a implementação de IAsyncEnumerables, e um conjunto de 3 interfaces foi introduzido para isso:

  1. IAsyncDisposable
  2. IAsyncEnumerable<T>
  3. IAsyncEnumerator<T>

Essas interfaces nos permitem representar uma versão assíncrona de IEnumerable<T> e, assim, consumir fluxos assíncronos.

Exemplo:

static async IAsyncEnumerable<string> GetResultados(string termo)
{
    using var client = new HttpClient();

    yield return await client.GetStringAsync($"https://www.google.com?q={termo}");

    yield return await client.GetStringAsync($"https://www.bing.com?q={termo}");
}

Para consumir o resultado podemos usar await foreach :

await foreach (var item in GetResultados("teste"))
{
    System.Console.WriteLine(item);
}

Realizando o Cancelamento

O padrão para cancelar métodos assíncronos é usar o método CancellationToken:

static async IAsyncEnumerable<string> GetResultados(string termo, [EnumeratorCancellation]CancellationToken ct = default)
{
    using var client = new HttpClient();
    using var ctr = ct.Register(s=> (HttpClient) s).Dispose(), client);
    yield return await client.GetStringAsync($"https://www.google.com?q={termo}");

    yield return await client.GetStringAsync($"https://www.bing.com?q={termo}");
}

Podemos passar CancellationToken como um argumento quando invocamos o método:


var cts = new CancellationTokenSource(millisecondsDelay: 1000);

await foreach (var resultado in GetResultados("dotnet", cts.Token))

Ou podemos usar o método WithCancellation, que faz com que o compilador passe o valor para o argumento com o atributo EnumeratorCancellation.

Exemplo:

var cts = new CancellationTokenSource(millisecondsDelay: 1000);

await foreach (var resultado in GetResultados("dotnet").WithCancellation(cts.Token))

Agora os dois tipos envolvidos FileStream e StreamWriter estão implementando IAsyncDisposable e, isso permite que todo o trabalho de descarte, como liberação de alterações no disco, seja feito de forma assíncrona.

E estamos conversados...

"Porque do céu se manifesta a ira de Deus sobre toda a impiedade e injustiça dos homens, que detêm a verdade em injustiça."
Romanos 1:18

Referências:


José Carlos Macoratti