用於.NET的簡單,分佈式背景處理的開源庫,由RabbitMQ Message Broker提供支持。該文檔一旦完成,將在MassiveJobs.net網站上找到。
如果您沒有現有的兔子安裝,則最簡單的方法是在容器中啟動它。以下命令將在一個容器中啟動RabbitMQ ,該容器將在停止時立即刪除。
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:management現在,您應該能夠在瀏覽器中訪問RabbitMQ Management UI:http:// localhost:15672地址。如果您想監視連接,隊列等,則可以與用戶名訪客和密碼來賓登錄。
我們將使用.NET Core 3.1 CLI進行此快速啟動,但是您也可以在Visual Studio,使用.NET Core或使用.NET Framework 4.6.1或更高版本中進行此操作。
為項目創建一個文件夾。
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStart創建一個新的控制台應用程序項目。
dotnet new console測試腳手架項目。
dotnet run您應該看到Hello World!幾秒鐘後。
將包裝參考添加到MassiveJobs.RabbitMqBroker 。
dotnet add package MassiveJobs.RabbitMqBroker使用您喜歡的編輯器打開program.cs並輸入此代碼。代碼中的評論應該足以讓您對正在發生的事情有一個基本思路。
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}啟動三個不同的命令提示(或電源外殼)。兩個將用作工人,一個將用作發布者。
要啟動應用程序,請轉到項目文件夾並運行:
dotnet run鍵入1並命中Enter以啟動一個工人,鍵入2並擊中Enter以啟動發布者。當您在發布者控制台中輸入消息時,您會注意到它們是在一個或另一個工人中進行處理的,但並非兩者兼而有之。這是因為工作是在工人之間分配的。
請注意,您也可以啟動多個發布者。
只要工人和出版商可以訪問RabbitMQ服務器,他們就可以在不同的機器上。
要在幾台計算機上分配工人,您將必須配置有關RabbitMQ服務器的信息。至少,這意味著用戶名,密碼,主機名(或IP地址)和端口號(如果您的RabbitMQ服務器配置為在非標準端口上偵聽連接)。在上面的示例中,我們沒有配置任何一個,因為默認值足夠 - 用戶名: guest ,密碼: guest ,主機名: localhost ,端口: -1 (=使用默認端口)。
例如,如果您的RabbitMQ服務器正在使用HostName rabbit.example.local在計算機上運行,請在標準端口號上聆聽,並且您已經在RabbitMQ中massive創建了一個用戶,請使用密碼: d0ntUseTh!sPass您將初始化RabbitMqJobs 。
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;或者,如果您不想啟動工作人員線程(即僅將過程用於發布作業):
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;現在,您可以在多台機器上部署工人(和發布者)並運行它們。如果網絡連接正在工作(防火牆打開等),則一切都應該起作用。工作將以旋轉方式向工人路由。請記住,默認情況下,每個MassiveJobs應用程序都在啟動兩個工作線程。這意味著,如果您有3台機器,則每台運行一個MassiveJobs應用程序,那麼作業的分佈將看起來像這樣:
在快速啟動的示例中,您可能已經註意到,當我們在兩個Posershell Windows中運行了兩個MassiveJobs應用程序時,其中兩條消息將轉到一個窗口,接下來兩個窗口到另一個窗口,依此類推。現在您知道原因。
如果您的應用程序在.NET核心託管環境(ASP.NET Core Web應用程序或Worker Service)中運行,則跳過本節。
在運行MassiveJobs的應用程序中配置日誌記錄非常重要,因為這是查看應用程序中大量運行時錯誤的唯一方法。這與安裝合適的軟件包並在初始化時設置JobLoggerFactory一樣簡單,如果您使用以下記錄器庫之一:
MassiveJobs.Logging.Log4Net )MassiveJobs.Logging.NLog )MassiveJobs.Logging.Serilog )例如,如果要將log4net記錄添加到快速啟動示例中,請首先在項目中安裝MassiveJobs.Logging.Log4Net軟件包。之後,初始化log4net庫,最後是MassiveJobs。
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}您必須自己實現“初始化”,就像通常為記錄庫進行初始化一樣。例如,對於log4net,這只會配置控制台appender。
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}現在,當您啟動Worker應用程序時,您應該在控制台中查看記錄消息:
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.您會注意到,如果啟動發布者應用程序,則直到嘗試發送第一條消息之前,它不會嘗試連接到RabbitMQ。這是因為每個大型jobs應用程序都保持了與兔子的兩個連接,一個連接用於發布,另一個用於食用消息。在發布者中,我們不是在開始工人,因此消耗連接不會初始化。
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - Connected在.NET核心託管環境(ASP.NET Core,Worker Services)中使用MassiveJobs.RabbitMqBroker在您的應用程序中安裝以下包:
dotnet add package MassiveJobs.RabbitMqBroker.Hosting然後,在您的啟動類中,配置服務時,請致電services.AddMassiveJobs() 。
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
}這將註冊所需的MassiveJobs服務,並為運營工作人員啟動背景託管服務。現在,您可以從控制器中發布作業。例如,如果您有一個Customer實體,並且想向新創建的客戶發送歡迎電子郵件,則可能有這樣的東西:
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
}請記住, SendWelcomeEmailJob.Publish不參與交易_非常重要。大規模job的RabbitMQBroker不支持交易。但是,如果發布失敗(僅出版 - 實際上是發送郵件,則Publish方式都會拋出異常,這是異步完成的)。如果發布失敗,則將拋出異常,而trans.Commit()將永遠不會被調用,並且交易將在處置時滾動。
相當地,在這裡發布作業作為最後一個承諾資源。
SendWelcomeEmailJob可以看起來像這樣:
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}這裡有幾件事要注意:
customer.IsEmailSent 。如果將其設置為真,我們什麼也不做(沒有例外,因為例外會使MassiveJobs庫安排工作的工作)SaveChanges() ,以便可以拋出並發異常,以稍後重新安排工作(但是您必須在您的Entites上配置並發屬性以使其正常工作)。但是,在這種特殊情況下,我們的工作類並沒有完全掌握。由於電子郵件服務器未參與交易,因此仍可能會遇到兩次電子郵件。如果client.Send拋出超時例外,則不確定該電子郵件是否是實際發送的。郵件服務器可能已經收到了請求,排隊了郵件以進行交付,但是由於臨時網絡問題,我們從未得到響應。換句話說,在這種情況下,至少保證了一次交貨,而不是完全。
如果僅涉及數據庫更改,那麼我們可以完全保證。但是即使到那時,作業的Perform方法也可以被調用兩次,因此您必須確保該作業在Perform方法中是不掌握的(類似於我們使用IsEmailSent功能)。