.NET Core实现RabbitMQ消息队列的示例代码

ASP.NET教程 2025-08-24

目录

  • 1. 安装和配置 RabbitMQ
    • 使用 Docker 安装 RabbitMQ
  • 2. 安装 RabbitMQ 客户端库
    • 3. 创建生产者(Producer)
      • 创建消息生产者代码
      • 参数说明:
    • 4. 创建消费者(Consumer)
      • 创建消息消费者代码
      • 参数说明:
    • 5. 持久化消息
      • 消息持久化设置
    • 6. 消息确认机制
      • 启用手动消息确认
    • 7. 运行和测试
      • 8. 总结

        RabbitMQ 是一个流行的消息队列中间件,它允许应用程序通过异步消息的方式进行通信。RabbitMQ 支持 AMQP 协议,可以通过多种方式与应用程序交互。在本教程中,我们将深入探讨如何在 .NET Core 环境中使用 RabbitMQ 来实现消息队列。我们将学习如何在生产者端发送消息,消费者端接收消息,并确保消息的可靠性。

        1. 安装和配置 RabbitMQ

        在开始使用 RabbitMQ 之前,首先需要确保你的机器上已经安装并运行 RabbitMQ。可以通过以下方式安装 RabbitMQ:

        使用 Docker 安装 RabbitMQ

        RabbitMQ 提供了官方的 Docker 镜像,这使得在本地机器上运行 RabbitMQ 非常简单。

        docker pull rabbitmq:management
        docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:management
        
        • 5672是 RabbitMQ 的默认消息队列端口。
        • 15672是 RabbitMQ 管理插件的 Web 界面端口。通过浏览器访问http://localhost:15672可以登录 RabbitMQ 管理界面,默认的用户名和密码都是guest。

        安装并启动 RabbitMQ 后,您可以继续进行开发。

        2. 安装 RabbitMQ 客户端库

        在 .NET Core 中与 RabbitMQ 进行交互,我们需要使用RabbitMQ.ClientNuGet 包。可以通过以下命令在项目中添加这个依赖:

        dotnet add package RabbitMQ.Client
        

        这个库提供了与 RabbitMQ 服务进行交互所需的所有工具。

        3. 创建生产者(Producer)

        生产者是负责将消息发送到 RabbitMQ 的应用程序。它通过连接到 RabbitMQ 服务器、创建一个队列和交换机,将消息发布到队列中。

        创建消息生产者代码

        下面是一个基本的生产者示例代码,展示了如何连接到 RabbitMQ,声明队列,并发送一条简单的消息:

        using RabbitMQ.Client;
        using System;
        using System.Text;
        
        class Program
        {
            static void Main(string[] args)
            {
                // 创建连接工厂
                var factory = new ConnectionFactory() { HostName = "localhost" };
                
                // 创建连接和通道
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    // 声明一个队列(确保队列存在)
                    channel.QueueDeclare(queue: "hello_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
        
                    // 创建消息
                    string message = "Hello, RabbitMQ!";
                    var body = Encoding.UTF8.GetBytes(message);
        
                    // 发送消息到队列
                    channel.BasicPublish(exchange: "", routingKey: "hello_queue", basicProperties: null, body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                }
        
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
        

        在上面的代码中:

        • ConnectionFactory用来创建连接到 RabbitMQ 服务器的连接。
        • QueueDeclare用来声明一个队列,确保队列存在。如果队列已经存在,声明将被忽略。
        • BasicPublish用来将消息发送到队列。

        参数说明:

        • queue: 队列的名称(此例中是hello_queue)。
        • durable: 是否将队列标记为持久化。如果设置为true,即使 RabbitMQ 重启,队列也会存在。
        • exclusive: 是否使队列只对当前连接可用。
        • autoDelete: 是否在最后一个消费者断开连接时自动删除队列。

        4. 创建消费者(Consumer)

        消费者从队列中获取并处理消息。消费者通常是另一个应用程序,它会连接到 RabbitMQ,并持续地从队列中取出消息进行处理。

        创建消息消费者代码

        using RabbitMQ.Client;
        using RabbitMQ.Client.Events;
        using System;
        using System.Text;
        
        class Program
        {
            static void Main(string[] args)
            {
                // 创建连接工厂
                var factory = new ConnectionFactory() { HostName = "localhost" };
        
                // 创建连接和通道
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    // 声明队列,确保消费者能够连接到相同的队列
                    channel.QueueDeclare(queue: "hello_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
        
                    // 创建消费者对象
                    var consumer = new EventingBasicConsumer(channel);
        
                    // 消息处理逻辑
                    consumer.Received += (model, ea) =
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                    };
        
                    // 开始消费消息
                    channel.BasicConsume(queue: "hello_queue", autoAck: true, consumer: consumer);
        
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
        

        在上面的代码中:

        • QueueDeclare用来确保消费者连接到相同的队列。
        • EventingBasicConsumer是消费者的实现,用于异步接收消息。
        • BasicConsume用于开始消费消息,autoAck设置为true,表示自动确认消息。

        参数说明:

        • autoAck: 如果设置为true,消费者会自动确认消息。如果设置为false,需要手动确认消息。

        5. 持久化消息

        如果您希望在 RabbitMQ 重启后保持消息的持久性,可以在生产者和消费者中启用消息的持久化。

        消息持久化设置

        在生产者端发送持久化消息:

        // 设置消息持久化
        var properties = channel.CreateBasicProperties();
        properties.Persistent = true; // 设置消息为持久化
        
        channel.BasicPublish(exchange: "", routingKey: "hello_queue", basicProperties: properties, body: body);
        

        此外,声明队列时也需要设置durable: true,确保队列本身是持久化的。

        channel.QueueDeclare(queue: "hello_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
        

        6. 消息确认机制

        在消息传递过程中,为了确保消息被成功消费并避免丢失,可以启用消息确认机制。在这种情况下,消费者需要显式确认消息。

        启用手动消息确认

        在消费者端禁用自动确认,并手动确认每条已成功处理的消息:

        channel.BasicConsume(queue: "hello_queue", autoAck: false, consumer: consumer);
        
        consumer.Received += (model, ea) =
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);
            
            // 手动确认消息
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        

        BasicAck用于确认消息已经被处理。deliveryTag是消息的标识符,multiple参数表示是否确认多个消息。

        7. 运行和测试

        • 启动消费者应用程序,确保它可以连接到 RabbitMQ 并等待消息。
        • 启动生产者应用程序,它将发送消息到 RabbitMQ 队列。
        • 消费者将从队列中接收到消息,并进行处理。

        如果一切配置正确,您将在控制台中看到生产者发送的消息以及消费者处理的消息。

        8. 总结

        通过本教程,我们学习了如何在 .NET Core 中使用 RabbitMQ 实现一个简单的消息队列系统。关键步骤包括:

        • 安装 RabbitMQ 客户端库。
        • 在生产者中声明队列并发送消息。
        • 在消费者中声明队列并处理消息。
        • 配置消息持久化和确认机制,确保消息的可靠性。

        RabbitMQ 是一个强大的消息队列中间件,适用于各种需要解耦和异步通信的应用程序。通过灵活的交换机和队列配置,您可以实现不同的消息传递模式,以满足不同的业务需求。

        到此这篇关于.NET Core实现RabbitMQ消息队列的示例代码的文章就介绍到这了,更多相关.NET Core RabbitMQ消息队列内容请搜索本站以前的文章或继续浏览下面的相关文章希望大家以后多多支持本站!

        您可能感兴趣的文章:
        • 如何在一个.NET Core项目中使用RabbitMQ进行即时消息管理
        • .NETCore中RabbitMQ使用死信队列的实现
        • .Net Core和RabbitMQ限制循环消费的方法
        • .NETCore基于RabbitMQ实现延时队列的两方法
        • 运用.net core中实例讲解RabbitMQ高可用集群构建
        • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列
        • 运用.net core中实例讲解RabbitMQ