.NET Core - Apresentando o MassTransit


Neste artigo vou apresentar o MassTransit, um framework open source para a plataforma .NET que ajuda o desenvolvedor a rotear mensagens para serviços de mensageria como RabbitMQ, Azure Service Bus, SQS e bus de serviço ActiveMQ.

O MassTransit é  é um service bus leve para criar aplicações na plataforma .NET distribuídas.

Ele não possui um implementação específica e funciona como uma interface, uma abstração sobre o conceito de barramento da mensagem fornecendo implementações do Messaging Bus para diversos serviços de mensageria como o RabbitMQ, Kafka, etc.

O objetivo principal do MassTransit é fornecer uma abstração consistente e amigável da .NET no transporte de mensagens.

Para cumprir esse objetivo, o MassTransit traz muito da lógica específica do aplicativo para mais perto do desenvolvedor de uma maneira fácil de configurar e entender.

Dentre os recursos disponibilizados pelo MassTransit cabe destacar :

A seguir veremos alguns conceitos importantes que são usados no contexto do MassTransit.

Transports

O MassTransit suporta múltiplos transportes incluindo :

- RabbitMQ
- Azure Service Bus
- ActiveMQ
- Amazon SQS
- In Memory

Messages

No MassTransit, um contrato de mensagem é definido criando um tipo .NET que pode ser uma classe ou uma interface. Isso resulta em um contrato fortemente tipado.  

As mensagens devem ser limitadas a propriedades somente leitura e não incluir métodos ou comportamento. (O MassTransit usa o nome completo do tipo, incluindo o namespace, para contratos de mensagem.)

Existem dois tipos principais de mensagens: Eventos e Comandos.

  1. Eventos - Um evento significa que algo aconteceu.
    Os eventos são publicados (usando Publish) por meio de IBus (standalone) ou ConsumeContext (dentro de um consumidor de mensagem). Um evento não deve ser enviado diretamente para um endpoint.
     
  2. Comandos - Um comando diz a um serviço para fazer algo.
    Os comandos são enviados (usando Send) para um endpoint, pois é esperado que uma única instância de serviço execute a ação do comando. Um comando nunca deve ser publicado.

Consumers

No MassTransit, um consumidor consome um ou mais tipos de mensagem quando configurado ou conectado a um endpoint de recebimento.  Ele inclui muitos tipos de consumidores, incluindo consumidores, sagas, máquinas de estado de saga, atividades de roteamento, manipuladores e consumidores de empregos.  O tipo de consumidor mais comum é uma classe que consome um ou mais tipos de mensagens.

Para cada tipo de mensagem, a interface IConsumer<T> é implementada, onde T é o tipo de mensagem consumida. A interface tem um método, Consume, conforme mostrado abaixo.

public interface IConsumer<in TMessage> :  IConsumer where TMessage : class
{
    Task Consume(ConsumeContext<TMessage> context);
}

Producers

Um aplicativo ou serviço pode produzir mensagens usando dois métodos diferentes.  Uma mensagem pode ser enviada ou publicada. O comportamento de cada método é muito diferente, mas é fácil de entender observando os tipos de mensagens envolvidas em cada método específico.

Quando uma mensagem é enviada, ela é entregue a um endpoint específico usando um DestinationAddress, já quando uma mensagem é publicada, ela não é enviada a um endpoint específico, mas sim transmitida a qualquer consumidor que tenha se inscrito no tipo de mensagem.

Com base nesses dois comportamentos temos que :

1- Mensagens enviadas são descritas como comandos
2- Mensagens publicadas são descritas como eventos.

Usando o MassTransit na plataforma .net

O MassTransit pode ser configurado na maioria dos tipos de aplicativos da plataforma .NET. Os principais pacotes Nuget usados para fazer a integração das aplicações .NET com o MassTransit são:

Dependendo do tipo da aplicação a ser criado podemos usar um ou mais pacotes e a definição dos recursos do MassTransit a ser usado vai depender de diversos fatores incluindo o tipo de aplicação.

A seguir vamos criar uma aplicação bem simples onde vamos simular a reserva de passagem onde teremos uma solução - OrderService e dois projetos do tipo ASP .NET Core Web API que se comunicam com o RabbitMQ e um projeto do tipo Class Library Shared.Model :

- O projeto OrderPublisher gera uma reserva de passagem enviando uma mensagem par ao RabbitMQ - seria o Producer;
- O projeto OrderConsumer - Consome a fila do RabbitMQ - seria o Consumer;
- O contrato da mensagem será definido no projeto compartilhado onde vamos criar a classe Ticket Share - seria a Message;

Os recursos usados são:

Então mãos à obra...

Criando a solução e projetos no VS 2019 Community

Abra o VS 2019 Community e clique em New Project selecionando o template Blank Solution:

Informe o nome OrderServer e clique em Create.

A seguir clique no menu File e selecione Add- New Project  e Selecione o template  ASP .NET Core Web API e clique em Next;

Informe o nome OrderPublisher e clique em Next;

A seguir selecione o Target Framework, Authentication Type e demais configurações conforme mostrada na figura:

Repita o procedimento acima e crie o projeto OrderConsumer com as mesmas configurações.

A seguir clique no menu File e selecione Add- New Project  e selecione o template  Class Library e clique em Next;

Informe o nome Shared.Model e a seguir selecione o Target Framework como .NET 5.0 (Current) conforme figura abaixo:

Aproveitando vamos criar a classe Ticket neste projeto que vai representar o nosso contrato de mensagem:

public class Ticket
{
   public string UserName { get; set; }
   public DateTime Booked { get; set; }
   public string Location { get; set; }
}

Ao final teremos a solução com 3 projetos :

Vamos incluir uma referência o projeto Shared.Models nos dois projetos Web API.

Para isso em cada projeto clique com o botão direito do mouse e selecione a opção Add-> Project Reference e da lista selecione o projeto conforme abaixo:

Aproveitando vamos incluir as referências nos projetos Web API aos pacotes do MassTransit.

Clique com o botão direito do mouse sobre cada projeto Web API e a seguir selecione Manage Nuget Packages.

Na próxima janela selecione a guia Browser e a seguir inclua cada um dos pacotes abaixo:

Selecionando a última versão estável e instalando o pacote no projeto.

Ao final teremos os dois projetos Web API contendo as 3 referências do MassTransit.

Definindo a publicação das mensagens no projeto OrderPublisher

Abra o projeto OrderPublisher e no arquivo Startup do projeto vamos definir a configuração do MassTransit incluindo o código abaixo no método ConfigureServices.

public void ConfigureServices(IServiceCollection services)
 {
            services.AddMassTransit(x =>
            {
                x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
                {
                    config.UseHealthCheck(provider);
                    config.Host(new Uri("rabbitmq://localhost"), h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    });
                }));
            });
            services.AddMassTransitHostedService();
            services.AddControllers();
            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "OrderPublisher", Version = "v1" });
            });
}

Neste código estamos:

- Incluindo o serviço do MassTransit no contêiner  da ASP .NET Core
- Cria um novo service bus usando o RabbitMQ  local e definindo a conexão, passando os parâmetros para o usuário e senha padrão
- Incluindo o serviço hosted do MassTransit que inicia e para de forma automática o serviço de bus

Criando o controlador para enviar a mensagem

Vamos criar o controller OrderController na pasta Controllers e incluir o código abaixo:

using MassTransit;
using Microsoft.AspNetCore.Mvc;
using Shared.Model;
using System;
using System.Threading.Tasks;
namespace OrderPublisher.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrderController : ControllerBase
    {
        private readonly IBus _bus;
        public OrderController(IBus bus)
        {
            _bus = bus;
        }
        [HttpPost]
        public async Task<IActionResult> CreateTicket(Ticket ticket)
        {
            if (ticket != null)
            {
                ticket.Booked = DateTime.Now;
                Uri uri = new Uri("rabbitmq://localhost/orderTicketQueue");
                var endPoint = await _bus.GetSendEndpoint(uri);
                await endPoint.Send(ticket);
                return Ok();
            }
            return BadRequest();
        }
    }
}

Neste código temos :

  1. A utilização da interface IBus configurada no arquivo Startup e injetamos o objeto IBus no construtor do controlador

  2. A atribuição da data para a reserva (Booked)
  3. A definição do nome fila como orderTicketQueue e a criação de uma nova url : ‘rabbitmq: // localhost /orderTicketQueue’.

    Se a fila não existir o RabbitMQ vai criar para nós

  4. A obtenção de um endpoint para o qual vamos enviar o objeto Ticket (Shared.Model)

  5. O envio da mensagem para o RabbitMQ usando o método Send;

Definindo e configurando o Consumer da mensagem

Agora vamos abrir o projeto OrderConsumer e no arquivo Startup no método ConfigureServices vamos incluir o código abaixo para configurar o MassTransit e o Consumer que vai consumir a mensagem da fila.

      public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<TicketConsumer>();
                x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    cfg.UseHealthCheck(provider);
                    cfg.Host(new Uri("rabbitmq://localhost"), h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    });
                    cfg.ReceiveEndpoint("orderTicketQueue", ep =>
                    {
                        ep.PrefetchCount = 10;
                        ep.UseMessageRetry(r => r.Interval(2, 100));
                        ep.ConfigureConsumer<TicketConsumer>(provider);
                    });
                }));
            });
            services.AddMassTransitHostedService();
            services.AddControllers();
            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "OrderConsumer", Version = "v1" });
            });
        }

Entendendo o código incluído:

Agora vamos definir o código da classe TicketConsumer<Ticket> que representa o contrato da mensagem e que vai permitir consumir as mensagens da fila orderTicketQueue.

using MassTransit;
using Microsoft.Extensions.Logging;
using Shared.Model;
using System;
using System.Threading.Tasks;
namespace OrderConsumer.Consumers
{
    public class TicketConsumer : IConsumer<Ticket>
    {
        private readonly ILogger<TicketConsumer> logger;
        public TicketConsumer(ILogger<TicketConsumer> logger)
        {
            this.logger = logger;
        }
        public async Task Consume(ConsumeContext<Ticket> context)
        {
            await Console.Out.WriteLineAsync(context.Message.UserName);
            logger.LogInformation($"Nova mensagem recebida :" +
                $" {context.Message.UserName} {context.Message.Location}");
        }
    }
}


Definimos um Consumer simples que implementa a interface  IConsumer da classe MassTransit, sendo que qualquer mensagem com a assinatura do Modelo Ticket que for enviada à fila orderTicketQueue será recebida por este consumidor.

 

Agora vamos definir nas propriedades da Solution a opção Mutiple startup projects marcando os projetos OrderConsumer e OrderPublisher para inicializarem juntos com a aplicação:
 

Executando o projeto teremos a API OrderPublisher apresentando o endpoint /api/Order na interface do Swagger:

Vamos enviar um POST informando o nome do usuário - Macoratti e a localização - Pindamonhangaba e clicar no botão Execute.

Com isso a mensagem será enviada para a fila do RabbitMQ que deve estar em execução.

Vemos que foram criadas connections, exchanges e 1 fila e 1 consumer:

Na aba Queues temos a fila - orderTickedQueue - criada e uma mensagem aguardando o recebimento pois colocamos um breakpoint na classe TicketConsumer:

Como enviamos várias vezes a mesma mensagem ao continuar a execução temos o recebimento das mensagens pelo Consumer criado conforme mostra a figura acima.

Assim, usando o MassTransit publicamos e consumimos mensagens em uma fila do RabbitMQ.

Pegue o código do projeto aqui :  OrderService.zip (sem as referências)

"Nisto consiste o amor: não em que nós tenhamos amado a Deus, mas em que ele nos amou e enviou o seu Filho como propiciação pelos nossos pecados."
1 João 4:10

Referências:


José Carlos Macoratti