一、概念及介绍

        通常通过使用事件总线实现来执行此发布/订阅系统。 事件总线可以设计为包含 API 的接口,该 API 是订阅和取消订阅事件和发布事件所需的。 它还可以包含一个或多个基于跨进程或消息通信的实现,例如支持异步通信和发布/订阅模型的消息队列或服务总线。

        本问介绍如何使用RabbitMQ通用事件总线接口实现这种与 .NET 的通信,并结合项目代码实践演示。 存在多种可能的实现,每种实现使用不同的技术或基础结构,例如 RabbitMQ、Azure 服务总线或任何其他第三方开源或商用服务总线。

二、什么是观察者模式?

观察者模式又名发布-订阅模式,具体概念就是定义对象间的一种一对多的依赖关系,当一个对象的状态发生变化时,所有依赖的对象都得到通知并被自动更新。

三、创建下发订单的项目(发布服务)

        使用Visual Studio2022新建web api 项目, 该项目主要实现简易的订单和工单发布的测试样例

1、新增IntegrationEvent类
public abstract class IntegrationEvent
{
    public string? Status { get; set; }
    public DateTime OccurredOn { get; set; }
}
2、新建IEventBusServices.cs,定义发布、订阅、取消订阅接口的泛型参数接口

参数说明:

        TEvent:表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类

        TEventHandler:表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口

public interface IEventBusServices
{
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <typeparam name="TEvent">表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类</typeparam>
    /// <param name="event">事件</param>
    void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;
    /// <summary>
    /// 订阅消息
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler">表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>
    void Subscribe<TEvent, TEventHandler>() 
        where TEvent : IntegrationEvent 
        where TEventHandler: IIntegrationEventHandler<TEvent>;
    /// <summary>
    /// 取消订阅
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler"></typeparam>
    void Unsubscribe<TEvent,TEventHandler>() 
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>;
}
3、新建IIntegrationEventHandler.cs
 public interface IIntegrationEventHandler<in TEvent> where TEvent : IntegrationEvent
 {
     Task Handle(TEvent @event);
 }
4、创建RabbitMQConfigs.cs 抽象基类

        通过实例化 ConnectionFactory 对象,并设置其属性,来创建一个 RabbitMQ 连接工厂。然后,将从配置中获取的相关信息分配给连接工厂的属性,例如主机名、端口、用户名和密码。最后,返回创建的连接工厂对象。

public abstract class RabbitMQConfigs
{
    protected ConnectionFactory GreateConnectionFactory(IConfiguration configuration)
    {
        // 在这里设置ConnectionFactory的属性
        var factory = new ConnectionFactory
        {
            // 设置连接属性
            HostName = configuration["RabbitMQ:HostName"],
            Port = int.Parse(configuration["RabbitMQ:Port"]),
            UserName = configuration["RabbitMQ:UserName"],
            Password = configuration["RabbitMQ:Password"]
        };
        return factory;
    }
}
5、新建EventBusServices.cs

        通过继承 RabbitMQConfigs 类,并调用 GreateConnectionFactory 方法,创建和配置 RabbitMQ 的连接工厂对象,并创建RabbitMQ通道,实现消息发布、订阅、取消订阅接口

public class EventBusServices : RabbitMQConfigs,IEventBusServices
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    public EventBusServices(IConfiguration configuration) {
        var factory = GreateConnectionFactory(configuration);
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }
    public void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent
    {
        var eventName = @event.GetType().Name;
        var message = JsonSerializer.Serialize(@event);
        var body = Encoding.UTF8.GetBytes(message);
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.BasicPublish(exchange: "events",
                              routingKey: eventName,
                              basicProperties: null,
                              body: body);
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.QueueDeclare(queue: eventName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        _channel.QueueBind(queue: eventName, exchange: "events", routingKey: eventName);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var @event = JsonSerializer.Deserialize<TEvent>(message);
            var handler = Activator.CreateInstance<TEventHandler>();
            if(@event != null)
            {
                await handler.Handle(@event);
            }                
        };

        _channel.BasicConsume(queue:eventName,autoAck:true,consumer:consumer);
    }

    public void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.QueueUnbind(queue: eventName, exchange: "events", routingKey: eventName);
    }
}
6、创建测试订单服务OrderServices.cs

        通过构造函数依赖注入IEventBusServices事件总线服务,使用事件总线实现订单消息的发布

 public class OrderServices: IOrderServices
 {
     private readonly IEventBusServices _eventBusServices;
     public OrderServices(IEventBusServices eventBusServices) 
     {
         _eventBusServices = eventBusServices;
     }

     public void PlaceOrder(OrderCreatedEvent order)
     {
         //处理订单
         order.OccurredOn = DateTime.Now;

         //发布订单消息
         _eventBusServices.Publish(order);
         
     }

     public void PlaceWorkOrder(WorkOrderCreatedEvent workOrder)
     {
         //处理
         workOrder.OccurredOn = DateTime.Now;

         //发布消息
         _eventBusServices.Publish(workOrder);
     }
 }

public class OrderCreatedEvent : IntegrationEvent
{
    public int OrderId { get; set; }

    public string? OrderName { get; set; }
}

public class WorkOrderCreatedEvent : IntegrationEvent
{
    public int WorkOrderId { get; set; }

    public string? WorkOrderName { get; set; }
}
7、实现模拟订单发送请求的代码
[Route("api/createOrder")]
[HttpPut]
public void PutOrder()
{
    //创建一个子线程:发布订单到MQ中(发布服务)
    Task.Run(() =>
    {
        // 调用订单服务的方法
        for (int i = 0; i< 8; i++)
        {
            var order = new OrderCreatedEvent { OrderId = i, OrderName =$"测试订单{i}", Status ="下单中" };
            _orderService.PlaceOrder(order);                    
            Console.WriteLine($"订单创建完成,订单ID:{order.OrderId};订单:{order.OrderName}{order.Status}");
            Thread.Sleep(1000);
        }
    });
    
    //创建一个子线程:发布工单到MQ中(发布服务)
    Task.Run(() =>
    {
        // 调用订单服务的方法
        for (int i = 0; i< 8; i++)
        {
            var workorder = new WorkOrderCreatedEvent { WorkOrderId = i, WorkOrderName =$"测试工单{i}", Status ="下单中" };
            _orderService.PlaceWorkOrder(workorder);                    
            Console.WriteLine($"工单创建完成,工单ID: {workorder.WorkOrderId} ;工单: {workorder.WorkOrderName}{workorder.Status}");
            Thread.Sleep(1000);
        }
    });
    
}

四、创建接收订单并处理的项目(订阅服务)

        使用Visual Studio2022新建控制台应用项目, 该项目主要实现订阅监听订单服务,接收订单服务发布的订单消息,并实现对订单的处理完成

1、新增EventBusClass类
public class EventBusClass
{
    public abstract class IntegrationEvent
    {
        public string? Status { get; set; }
        public DateTime OccurredOn { get; set; }
    }
    public class OrderCreatedEvent : IntegrationEvent
    {
        public int OrderId { get; set; }

        public string? OrderName { get; set; }
    }

    public class WorkOrderCreatedEvent : IntegrationEvent
    {
        public int WorkOrderId { get; set; }

        public string? WorkOrderName { get; set; }
    }
}
2、新建IEventBusServices.cs,定义发布、订阅、取消订阅接口的泛型参数接口

参数说明:

        TEvent:表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类

        TEventHandler:表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口

public interface IEventBusServices
{
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <typeparam name="TEvent">表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类</typeparam>
    /// <param name="event">事件</param>
    void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;
    /// <summary>
    /// 订阅消息
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler">表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>
    void Subscribe<TEvent, TEventHandler>() 
        where TEvent : IntegrationEvent 
        where TEventHandler: IIntegrationEventHandler<TEvent>;
    /// <summary>
    /// 取消订阅
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler"></typeparam>
    void Unsubscribe<TEvent,TEventHandler>() 
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>;
}
 3、新建IIntegrationEventHandler.cs
 public interface IIntegrationEventHandler<in TEvent> where TEvent : IntegrationEvent
 {
     Task Handle(TEvent @event);
 }
4、新建EventBusServices.cs
public class EventBusServices : IEventBusServices
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    public EventBusServices() {
        var factory = new ConnectionFactory
        {
            // 设置连接属性
            HostName = "localhost",
            Port = 5672,
            UserName = "my",
            Password = "123456"
        }; ;
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }
    public void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent
    {
        var eventName = @event.GetType().Name;
        var message = JsonSerializer.Serialize(@event);
        var body = Encoding.UTF8.GetBytes(message);
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.BasicPublish(exchange: "events",
                              routingKey: eventName,
                              basicProperties: null,
                              body: body);
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.QueueDeclare(queue: eventName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        _channel.QueueBind(queue: eventName, exchange: "events", routingKey: eventName);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var @event = JsonSerializer.Deserialize<TEvent>(message);
            var handler = Activator.CreateInstance<TEventHandler>();
            if(@event != null)
            {
                await handler.Handle(@event);
            }                
        };

        _channel.BasicConsume(queue:eventName,autoAck:true,consumer:consumer);
    }

    public void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.QueueUnbind(queue: eventName, exchange: "events", routingKey: eventName);
    }
}
4、新建订单处理集成事件OrderHandlerServices.cs

        除了事件订阅逻辑外,还需要实现集成事件处理程序的内部代码(例如回调方法)。 在事件处理程序中,可指定接收和处理某种事件消息的位置

public class OrderCreatedEventHandler : IIntegrationEventHandler<OrderCreatedEvent>
{
    public async Task Handle(OrderCreatedEvent @event)
    {
        await Task.Run(() =>
        {
            // 处理订单创建事件逻辑...                
            @event.Status = "完成";
            Console.WriteLine($"订单创建事件处理完成,订单ID:{@event.OrderId};订单:{@event.OrderName}{@event.Status}");
        });
    }
}

public class WorkOrderCreatedEventHandler : IIntegrationEventHandler<WorkOrderCreatedEvent>
{
    public async Task Handle(WorkOrderCreatedEvent @event)
    {
        await Task.Run(() =>
        {
            // 处理订单创建事件逻辑...                
            @event.Status = "完成";
            Console.WriteLine($"工单创建事件处理完成,工单ID:{@event.WorkOrderId};工单:{@event.WorkOrderName}{@event.Status}");
        });
    }
}
5、修改Program.cs程序主入口,实现订阅
using rabbirmqtestReciver;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using study_project.IServices.EventBus;
using study_project.Services.EventBus;
using System.Text;

EventBusServices eventBusServices = new EventBusServices();


//订阅订单服务
eventBusServices.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();
//订阅工单服务
eventBusServices.Subscribe<WorkOrderCreatedEvent, WorkOrderCreatedEventHandler>();

Console.WriteLine("按任意键退出...");
Console.ReadKey();

五、测试运行与结果

分别启动运行创建的api项目和控制台应用项目,请求api的发送订单接口api/createOrder

发布服务:

订阅服务:

结果对比:

六、结语

        至此,本文已经演示了使用基于RabbitMQ实现的事件总线,展示了2个实例:订单和工单发布的案例,通过使用EventBus实现发布与订阅,指定事件类型和集成事件,便可以实现发布和订阅,减少了使用RabbitMQ实现信息发布订阅的部分代码编程;如果是分布式应用中,可以使用EventBus实现应用的业务逻辑模块与RabbitMQ发布订阅模块的解耦。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐