C# -  Iniciando com Reactive Extensions (Rx)


Neste post iremos apresentar os principais conceitos da Reactive Extensions (Rx) na plataforma .NET.

Ser reativo é um comportamento desejado tanto nas pessoas como nas aplicações. Assim criar uma aplicação reativa que responda ao usuário em tempo real pode fazer toda a diferença.

Para poder construir aplicações reativas elas devem reagir a eventos, a falhas e a reagir a interação do usuário, e, para nos ajudar a fazer isso podemos usar a biblioteca Reactive Extensions ou Rx.

A biblioteca Reactive Extensions (Rx) é uma biblioteca usada para compor programas assíncronos e baseados em eventos usando LINQ e coleções observáveis(Observable) que esta disponível via pacote Nuget.

Usando esta biblioteca podemos representar fluxos de dados assíncronos usando Observable, realizar consultas em streams assíncronas usando LINQ e podemos usar os Schedulers para controlar a subscrição e publicação  das notificações.

Assim a biblioteca Rx oferece um paradigma natural para lidar com sequências de eventos (Uma sequência pode conter zero ou mais eventos) e também para chamadas assíncronas.

Usando Rx, você pode representar vários fluxos de dados assíncronos (que vêm de diversas fontes, por exemplo, cotação de ações, tweets, eventos de computador, solicitações de serviço da web, etc.) e assinar o fluxo de eventos usando a interface IObserver<T>.

A interface IObservable<T> mantém uma lista de interfaces IObserver<T> dependentes e as notifica automaticamente sobre quaisquer alterações de estado. Você pode consultar sequências observáveis usando operadores de consulta LINQ padrão implementados pelo tipo Observable.

Dessa forma, você pode filtrar, projetar, agregar, compor e executar operações baseadas em tempo em vários eventos facilmente usando esses operadores LINQ estáticos. O cancelamento e as exceções também podem ser tratados normalmente usando métodos de extensão fornecidos pelo Rx.

Vamos criar um projeto do tipo Console (.NET 5.0) usando o VS 2019 Community chamado CShp_Rx e a seguir vamos instalar a biblioteca Reactive Extensions via Nuget Package Manager.

Selecionando a guia Browse e localizando a biblioteca System.Reactive:

Atualmente a versão estável e compatível com o .NET 5.0 é a versão 5.0.0. Note que temos outros pacotes disponíveis para:

Após instalar o pacote no projeto vamos e usando o Object Browser para visualizar os assemblies para o pacote System.Reactive obtemos o seguinte:

Em System.Reactive.Linq temos toda a funcionalidade de consulta da biblioteca Rx, em System.Reactive.Concurrency temos os Schedulers dentre outros recursos.

A seguir como exemplo vamos criar um evento bem simples e a seguir vamos dar um tratamento Rx par ao evento.

Vamos criar uma classe no projeto Console chamada PlataformaNET com uma propriedade do tipo string:

 public class PlataformaNET
 {
        public string TiposDadosDisponiveis { get; set; }
 }

Na classe Program vamos criar um novo evento do tipo Action estático chamado tipos que é basicamente um delegate que vai receber um valor, que no exemplo será um tipo existente na plataforma .NET.

A seguir, no método Main,  vamos criar uma lista chamada listaTipos onde vamos incluir alguns tipos mais usados na plataforma .NET como boolean, string, int, e decimal:

using System;
using System.Collections.Generic;

namespace CShp_Rx
{
    public class Program
    {
        static event Action<string> tipos;

        static void Main(string[] args)
        {
            List<PlataformaNET> listaTipos = new List<PlataformaNET>();

            PlataformaNET tipoBoolean = new PlataformaNET();
            tipoBoolean.TiposDadosDisponiveis = "bool";
            listaTipos.Add(tipoBoolean);

            PlataformaNET tipoString = new PlataformaNET();
            tipoString.TiposDadosDisponiveis = "string";
            listaTipos.Add(tipoString);           

            PlataformaNET tipoInt = new PlataformaNET();
            tipoInt.TiposDadosDisponiveis = "int";
            listaTipos.Add(tipoInt);           

            PlataformaNET tipoDecimal = new PlataformaNET();
            tipoDecimal.TiposDadosDisponiveis = "decimal";
            listaTipos.Add(tipoDecimal);           

            tipos += x =>
            {
                Console.WriteLine(x);
            };

            for (int i = 0; i <= listaTipos.Count - 1; i++)
            {
                tipos(listaTipos[i].TiposDadosDisponiveis);
            }

            Console.ReadLine();
        }
    }
}

Executando o projeto teremos a definição da lista com os valores atribuídos e em seguida o evento criado será disparado para enviar os valores da lista para o Console:

Usando a biblioteca Rx

Vamos agora ver o funcionamento do evento usando a biblioteca Rx.

Vamos agora substituir o código usado no exemplo anterior pelo código abaixo:

using System;
using System.Collections.Generic;
using System.Reactive.Subjects;

namespace Usando_Rx
{
    class Program
    {
        static Subject<string> obsTipos = new Subject<string>();

        static void Main(string[] args)
        {
           List<PlataformaNET> listaTipos = new List<PlataformaNET>();

           PlataformaNET tipoBoolean = new PlataformaNET();
           tipoBoolean.TiposDadosDisponiveis = "bool";
           listaTipos.Add(tipoBoolean);

           PlataformaNET tipoString = new PlataformaNET();
           tipoString.TiposDadosDisponiveis = "string";
           listaTipos.Add(tipoString);

           PlataformaNET tipoInt = new PlataformaNET();
           tipoInt.TiposDadosDisponiveis = "int";
           listaTipos.Add(tipoInt);

           PlataformaNET tipoDecimal = new PlataformaNET();
           tipoDecimal.TiposDadosDisponiveis = "decimal";
           listaTipos.Add(tipoDecimal);

           // IObservable
           obsTipos.Subscribe(x =>
           {
               Console.WriteLine(x);
           });

           // IObserver
           for (int i = 0; i <= listaTipos.Count - 1; i++)
           {
               obsTipos.
OnNext(listaTipos[i].TiposDadosDisponiveis);
           }

           Console.ReadLine();
        }
    }
}

Agora vejamos as diferenças em destaques no código e como a biblioteca Rx atua.

Iniciamos definindo uma variável obsTipos estática do tipo Subject<string> :

 static Subject<string> obsTipos = new Subject<string>();

Subject<T> Representa um objeto que pode ser tanto uma sequência observable como um observador e cada notificação é espalhada para todos os observadores inscritos.

Após o código que criou a lista com os valores adicionados estamos usando o método Subscribe para inscrever um manipulador do elemento para a sequência observável. Esta é a parte IObservable do código.

obsTipos.Subscribe(x =>
{
        Console.WriteLine(x);
});

Depois de incluir este código vamos acionar o evento usando a palavra-chave OnNext que notifica todos os observadores inscritos sobre a chegada de um elemento especificado na sequência:

 for (int i = 0; i <= listaTipos.Count - 1; i++)
 {
         obsTipos.
OnNext(listaTipos[i].TiposDadosDisponiveis);
}

Portanto, conforme percorremos nossa lista, chamaremos OnNext para enviar os valores para a interface IObservable assinada.

Dessa forma vimos que usando a biblioteca Rx podemos declarar um fluxo de eventos com a palavra-chave Subject.

Com isso temos uma uma fonte de eventos que podemos publicar usando OnNext. Para ver esses valores na janela do console, fizemos a inscrição no fluxo de eventos usando Subscribe.

A biblioteca Rx permite também que você tenha objetos que atuem apenas como Publishers ou apenas como Subscribers. Isto é porque as interfaces IObservable e IObserver são de fato separadas. Além disso, observe que em Rx, os observáveis podem ser passados como parâmetros, retornados como resultados e armazenados em variáveis, o que os torna de primeira classe.

Usando Rx podemos também especificar se o fluxo de eventos foi concluído ou se ocorreu um erro. Isso realmente diferencia o Rx dos eventos do .NET. Além disso, é importante observar que incluir o namespace System.Reactive.Linq em seu projeto permite que os desenvolvedores escrevam consultas sobre o tipo de Subject porque um Assunto é uma interface IObservable.

Em outro artigo sobre o assunto iremos mostrar como usar o LINQ para realizar consultas usando a biblioteca Rx.

Pegue o código do projeto aqui: CShp_Rx.zip

"E ouvi uma grande voz do céu, que dizia: Eis aqui o tabernáculo de Deus com os homens, pois com eles habitará, e eles serão o seu povo, e o mesmo Deus estará com eles, e será o seu Deus."
Apocalipse 21:3

Referências:


José Carlos Macoratti