用于.NET的简单,分布式背景处理的开源库,由RabbitMQ Message Broker提供支持。该文档一旦完成,将在MassiveJobs.net网站上找到。
如果您没有现有的兔子安装,则最简单的方法是在容器中启动它。以下命令将在一个容器中启动RabbitMQ ,该容器将在停止时立即删除。
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:management现在,您应该能够在浏览器中访问RabbitMQ Management UI:http:// localhost:15672地址。如果您想监视连接,队列等,则可以与用户名访客和密码来宾登录。
我们将使用.NET Core 3.1 CLI进行此快速启动,但是您也可以在Visual Studio,使用.NET Core或使用.NET Framework 4.6.1或更高版本中进行此操作。
为项目创建一个文件夹。
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStart创建一个新的控制台应用程序项目。
dotnet new console测试脚手架项目。
dotnet run您应该看到Hello World!几秒钟后。
将包装参考添加到MassiveJobs.RabbitMqBroker 。
dotnet add package MassiveJobs.RabbitMqBroker使用您喜欢的编辑器打开program.cs并输入此代码。代码中的评论应该足以让您对正在发生的事情有一个基本思路。
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}启动三个不同的命令提示(或电源外壳)。两个将用作工人,一个将用作发布者。
要启动应用程序,请转到项目文件夹并运行:
dotnet run键入1并命中Enter以启动一个工人,键入2并击中Enter以启动发布者。当您在发布者控制台中输入消息时,您会注意到它们是在一个或另一个工人中进行处理的,但并非两者兼而有之。这是因为工作是在工人之间分配的。
请注意,您也可以启动多个发布者。
只要工人和出版商可以访问RabbitMQ服务器,他们就可以在不同的机器上。
要在几台计算机上分配工人,您将必须配置有关RabbitMQ服务器的信息。至少,这意味着用户名,密码,主机名(或IP地址)和端口号(如果您的RabbitMQ服务器配置为在非标准端口上侦听连接)。在上面的示例中,我们没有配置任何一个,因为默认值足够 - 用户名: guest ,密码: guest ,主机名: localhost ,端口: -1 (=使用默认端口)。
例如,如果您的RabbitMQ服务器正在使用HostName rabbit.example.local在计算机上运行,请在标准端口号上聆听,并且您已经在RabbitMQ中massive创建了一个用户,请使用密码: d0ntUseTh!sPass您将初始化RabbitMqJobs 。
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;或者,如果您不想启动工作人员线程(即仅将过程用于发布作业):
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;现在,您可以在多台机器上部署工人(和发布者)并运行它们。如果网络连接正在工作(防火墙打开等),则一切都应该起作用。工作将以旋转方式向工人路由。请记住,默认情况下,每个MassiveJobs应用程序都在启动两个工作线程。这意味着,如果您有3台机器,则每台运行一个MassiveJobs应用程序,那么作业的分布将看起来像这样:
在快速启动的示例中,您可能已经注意到,当我们在两个Posershell Windows中运行了两个MassiveJobs应用程序时,其中两条消息将转到一个窗口,接下来两个窗口到另一个窗口,依此类推。现在您知道原因。
如果您的应用程序在.NET核心托管环境(ASP.NET Core Web应用程序或Worker Service)中运行,则跳过本节。
在运行MassiveJobs的应用程序中配置日志记录非常重要,因为这是查看应用程序中大量运行时错误的唯一方法。这与安装合适的软件包并在初始化时设置JobLoggerFactory一样简单,如果您使用以下记录器库之一:
MassiveJobs.Logging.Log4Net )MassiveJobs.Logging.NLog )MassiveJobs.Logging.Serilog )例如,如果要将log4net记录添加到快速启动示例中,请首先在项目中安装MassiveJobs.Logging.Log4Net软件包。之后,初始化log4net库,最后是MassiveJobs。
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}您必须自己实现“初始化”,就像通常为记录库进行初始化一样。例如,对于log4net,这只会配置控制台appender。
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}现在,当您启动Worker应用程序时,您应该在控制台中查看记录消息:
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.您会注意到,如果启动发布者应用程序,则直到尝试发送第一条消息之前,它不会尝试连接到RabbitMQ。这是因为每个大型jobs应用程序都保持了与兔子的两个连接,一个连接用于发布,另一个用于食用消息。在发布者中,我们不是在开始工人,因此消耗连接不会初始化。
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - Connected在.NET核心托管环境(ASP.NET Core,Worker Services)中使用MassiveJobs.RabbitMqBroker在您的应用程序中安装以下包:
dotnet add package MassiveJobs.RabbitMqBroker.Hosting然后,在您的启动类中,配置服务时,请致电services.AddMassiveJobs() 。
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
}这将注册所需的MassiveJobs服务,并为运营工作人员启动背景托管服务。现在,您可以从控制器中发布作业。例如,如果您有一个Customer实体,并且想向新创建的客户发送欢迎电子邮件,则可能有这样的东西:
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
}请记住, SendWelcomeEmailJob.Publish不参与交易_非常重要。大规模job的RabbitMQBroker不支持交易。但是,如果发布失败(仅出版 - 实际上是发送邮件,则Publish方式都会抛出异常,这是异步完成的)。如果发布失败,则将抛出异常,而trans.Commit()将永远不会被调用,并且交易将在处置时滚动。
相当地,在这里发布作业作为最后一个承诺资源。
SendWelcomeEmailJob可以看起来像这样:
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}这里有几件事要注意:
customer.IsEmailSent 。如果将其设置为真,我们什么也不做(没有例外,因为例外会使MassiveJobs库安排工作的工作)SaveChanges() ,以便可以抛出并发异常,以稍后重新安排工作(但是您必须在您的Entites上配置并发属性以使其正常工作)。但是,在这种特殊情况下,我们的工作类并没有完全掌握。由于电子邮件服务器未参与交易,因此仍可能会遇到两次电子邮件。如果client.Send抛出超时例外,则不确定该电子邮件是否是实际发送的。邮件服务器可能已经收到了请求,排队了邮件以进行交付,但是由于临时网络问题,我们从未得到响应。换句话说,在这种情况下,至少保证了一次交货,而不是完全。
如果仅涉及数据库更改,那么我们可以完全保证。但是即使到那时,作业的Perform方法也可以被调用两次,因此您必须确保该作业在Perform方法中是不掌握的(类似于我们使用IsEmailSent功能)。