MassTransit 是Net下一个开源给予消息队列的ESB,其官方网址为 http://masstransit-project.com/,你可以在上面找到相关的源代码下载地址,nuget链接地址,以及开发文档。

本文采用的是当前最新的版本:3.2.4,如果你发现本文例子与官网不符,代表开发者已经修改了相关设计,所以此时以官方为准。

本文例子基于RabbitMQ,但本文的重点是MassTransit的使用入门,所以RabbitMQ相关的内容需自行查阅。

本文从四个方面讲解MassTransit

1、契约

数据契约是消息传递的基础,在MassTransit中,与WCF的数据契约不同,MassTransit的契约官方建议定义为interface,当然你定义成class也不会错,所以这里我们分别定义接口和类两种契约

public interface IMessageContract
{
    string Value { get; set; }
}
public class MessageContract
{
    public string Value { get; set; }
}

2、Bus

在这个版本里面,Bus都是统一通过Factory扩展来创建实际的IBusControl,简单的创建代码如下,注意这里并没有创建接收消息的代码

var rbBus = Bus.Factory.CreateUsingRabbitMq(configure =>
{
    var host = configure.Host(new Uri("rabbitmq://192.168.5.136/"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });
}
在创建完IBusControl之后,就可以通过Start方法来进行启动

3、Receive

MassTransit支持多种方式来进行消息的接收

a) Consumer,该方式需要实现接口IConsumer<T>

public class MessageContractConsumer : IConsumer<IMessageContract>
{
    public Task Consume(ConsumeContext<IMessageContract> context)
    {
        return Task.Run(() =>
        {
            Console.WriteLine($"Recevied By Consumer:{context.Message.Value}");
        });
    }
}
然后你就可以在CreateUsingRabbitMq方法里面通过IRabbitMqBusFactoryConfigurator的ReceiveEndpoint来从指定的消息队列获取消息,下面分别通过Consumer方法和Instance方法来实现消息接收

    configure.ReceiveEndpoint(host, "consumer", e =>
    {
        e.Consumer(() => new MessageContractConsumer());
    });
    //var consumer = new MessageContractConsumer();
    //configure.ReceiveEndpoint(host, "consumer", e =>
    //{
    //    e.Instance(consumer);
    //});

b) Observer,该方式需实现接口IObserver<ConsumeContext<T>>

public class MessageContractObserver : IObserver<ConsumeContext<IMessageContract>>
{
    public void OnCompleted()
    {
        throw new NotImplementedException();
    }

    public void OnError(Exception error)
    {
        throw new NotImplementedException();
    }

    public void OnNext(ConsumeContext<IMessageContract> value)
    {
        Console.WriteLine("Received By Observer:{0}", value.Message.Value);
    }
}
然后通过Observer来进行注册
    configure.ReceiveEndpoint(host, "observer", e =>
    {
        e.Observer(new MessageContractObserver());
    });
b) Handler,该方法是最简单的方式
    configure.ReceiveEndpoint(host, "handler", e =>
    {
        e.Handler<IMessageContract>(async context =>
        {
            await Task.Run(() =>
            {
                Console.WriteLine("Received By Handler:{0}", context.Message.Value);
            });
        });
    });

4、Send / Publish

发送消息分全部发送(不指定EndPoint)和指定Endpoint发送

a) 全部发送(不指定EndPoint)

        rbBus.Publish<IMessageContract>(new { Value = text });
b)指定EndPoint发送,这是只有指定的ReceiveEndpoint才能接收到消息
        var point = rbBus.GetSendEndpoint(new Uri("rabbitmq://192.168.5.136/handler")).Result;
        point.Send<IMessageContract>(new MessageContract { Value = $"Send To handler { text}" });

PS:因为消息是成对的,当你Send或者Publish时,指定的契约必须与接收方一致,否则接收方不会接收到消息,比如可以将上面方法中的rbBus.Publish<IMessageContract>改为rbBus.Publish<MessageContract>,这时候接收端会因为契约不一致而收不到消息

下面是除契约外的完整代码,注意这里用的是控制台程序做demo

var rbBus = Bus.Factory.CreateUsingRabbitMq(configure =>
{
    var host = configure.Host(new Uri("rabbitmq://192.168.5.136/"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    configure.ReceiveEndpoint(host, "consumer", e =>
    {
        e.Consumer(() => new MessageContractConsumer());
    });
    //var consumer = new MessageContractConsumer();
    //configure.ReceiveEndpoint(host, "consumer", e =>
    //{
    //    e.Instance(consumer);
    //});
    configure.ReceiveEndpoint(host, "observer", e =>
    {
        e.Observer(new MessageContractObserver());
    });
    configure.ReceiveEndpoint(host, "handler", e =>
    {
        e.Handler<IMessageContract>(async context =>
        {
            await Task.Run(() =>
            {
                Console.WriteLine("Received By Handler:{0}", context.Message.Value);
            });
        });
    });
});
//Console.WriteLine(rbBus.Address);
using (rbBus.Start())
{
    string text = string.Empty;
    do
    {
        text = Console.ReadLine();
        var point = rbBus.GetSendEndpoint(new Uri("rabbitmq://192.168.5.136/handler")).Result;
        point.Send<IMessageContract>(new MessageContract { Value = $"Send To handler { text}" });
        rbBus.Publish<IMessageContract>(new { Value = text });
    }
    while (text != "E");
}

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐