Open-Source-Bibliothek für eine einfache, verteilte Hintergrundverarbeitung für .NET, betrieben von Rabbitmq Message Broker. Die Dokumentation, die nach Fertigstellung, wird auf der Website von Massivejobs.net verfügbar sein.
Wenn Sie keine vorhandene Installation von Rabbitmq haben, besteht die einfachste Möglichkeit, sie in einem Container zu starten. Der folgende Befehl startet Rabbitmq in einem Container , der beim Stoppen sofort entfernt wird .
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:managementJetzt sollten Sie in Ihrem Browser unter: http: // localhost: 15672 Adresse zugreifen können. Sie können sich mit dem Gast- und Passwort -Gast von Benutzername anmelden, wenn Sie die Verbindungen, Warteschlangen usw. überwachen möchten.
Wir werden .NET Core 3.1 CLI für diesen schnellen Start verwenden. Sie können dies jedoch auch in Visual Studio mit .NET Core oder mit .NET Framework 4.6.1 oder höher tun.
Erstellen Sie einen Ordner für das Projekt.
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStartErstellen Sie ein neues Konsolenantragsprojekt.
dotnet new consoleTesten Sie das Gerüstprojekt.
dotnet run Du solltest Hello World! Nach ein paar Sekunden.
Fügen Sie einen Paketverweis auf die MassiveJobs.RabbitMqBroker hinzu.
dotnet add package MassiveJobs.RabbitMqBrokerVerwenden Sie Ihren bevorzugten Editor, um Programm.cs zu öffnen und diesen Code einzugeben. Kommentare im Code sollten ausreichen, um Ihnen eine grundlegende Vorstellung davon zu geben, was los ist.
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}Starten Sie drei verschiedene Eingabeaufforderungen (oder Power Shells). Zwei werden als Arbeiter verwendet und einer wird als Verlag verwendet.
Um die Anwendung zu starten, gehen Sie zum Projektordner und führen Sie aus:
dotnet run Geben Sie 1 ein und drücken Sie die Eingabetaste, um einen Arbeiter zu starten, Typ 2 und drücken Sie die Eingabetaste, um den Verlag zu starten. Wenn Sie Nachrichten in der Publisher -Konsole eingeben, werden Sie feststellen, dass sie in dem einen oder anderen Arbeiter verarbeitet werden, aber nicht beides. Dies liegt daran, dass Jobs zwischen den Arbeitern verteilt sind.
Beachten Sie, dass Sie auch mehrere Verlage starten können.
Arbeiter und Verlage können sich auf verschiedenen Maschinen befinden, solange sie auf den Rabbitmq -Server zugreifen können.
Um die Arbeiter auf mehrere Maschinen zu verteilen, müssen Sie die Informationen über den Rabbitmq -Server konfigurieren. Zumindest bedeutet dies, dass Benutzername, Kennwort, Hostname (oder IP-Adresse) und die Portnummer (wenn Ihr RabbitMQ-Server so konfiguriert ist, dass sie auf Verbindungen auf einem nicht standardmäßigen Port anhören). Im obigen Beispiel haben wir keine davon konfiguriert, da die Standardeinstellungen ausreichend waren - Benutzername: guest , Passwort: guest , Hostname: localhost , Port: -1 (= Verwenden Sie den Standardport).
Wenn Ihr Rabbitmq -Server beispielsweise auf einer Maschine mit dem Hostnamen rabbit.example.local ausgeführt wird, die die Standard -Portnummer zuhört, und Sie einen Benutzer erstellt haben, massive im Rabbitmq mit dem Passwort: d0ntUseTh!sPass dann würden Sie RabbitMqJobs so initialisieren.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;Oder wenn Sie die Arbeiter -Threads nicht starten möchten (dh den Prozess nur für Veröffentlichungsjobs):
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;Jetzt können Sie Arbeitnehmer (und Verleger) auf mehreren Maschinen bereitstellen und diese ausführen. Wenn die Netzwerkkonnektivität funktioniert (Firewalls Open usw.), sollte alles funktionieren. Jobs würden rund-Robin-Weise an die Arbeiter weitergeleitet. Beachten Sie, dass jede massive Antrag auf zwei Arbeiterfäden standardmäßig jede massive Antragsbewerbung startet. Das heißt, wenn Sie 3 Maschinen haben, die jeweils eine massive Jobs -Anwendung ausführen, würde die Verteilung von Jobs ungefähr so aussehen:
Möglicherweise haben Sie im schnellen Beispiel bemerkt, als wir zwei Massivejobs-Anwendungen in zwei PoserShell-Fenstern ausgeführt hatten, gingen zwei der Nachrichten in ein Fenster, die nächsten zwei zum anderen Fenster und so weiter. Jetzt kennen Sie den Grund.
Überspringen Sie diesen Abschnitt, wenn Ihre Anwendung in einer .NET -Core -Hosted -Umgebung (ASP.NET Core -Webanwendung oder Worker -Dienst) ausgeführt wird.
Es ist sehr wichtig, die Protokollierung in Ihrer Anwendung zu konfigurieren, die Massivejobs ausführen, da dies die einzige Möglichkeit ist, Massivejobs-Laufzeitfehler in Ihrer Anwendung zu sehen. Es ist so einfach wie die Installation eines geeigneten Pakets und das Einstellen des JobLoggerFactory bei der Initialisierung, wenn Sie eine der folgenden Logger -Bibliotheken verwenden:
MassiveJobs.Logging.Log4Net )MassiveJobs.Logging.NLog )MassiveJobs.Logging.Serilog ) Wenn Sie beispielsweise Log4Net-Protokollierung zum Quick-Start-Beispiel hinzufügen möchten, installieren Sie zuerst das Paket MassiveJobs.Logging.Log4Net in Ihrem Projekt. Danach initialisieren Sie die log4net -Bibliothek und schließlich massive Jobs.
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}Sie müssen selbst "initializelogging" implementieren, da Sie normalerweise die Initialisierung für Ihre Protokollierungsbibliothek durchführen. Zum Beispiel für Log4NET würde dies nur Konsolen Appender konfigurieren.
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}Wenn Sie nun die Worker -Anwendung starten, sollten Sie Protokollierungsnachrichten in der Konsole sehen:
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.Sie werden feststellen, dass wenn Sie die Publisher -Anwendung starten, nicht versucht, eine Verbindung zu Rabbitmq herzustellen, bis Sie versuchen, die ersten Nachrichten zu senden. Dies liegt daran, dass jede massive Anwendung zwei Verbindungen zum Rabbitmq aufrechterhalten, eines für die Veröffentlichung und die andere für den Verzehr von Nachrichten. Beim Verlag starten wir keine Arbeitnehmer, sodass die Konsumierung der Verbindung nicht initialisiert wird.
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - Connected Um MassiveJobs.RabbitMqBroker in einer .NET -Kern -gehosteten Umgebung (ASP.NET CORE, Worker Services) zu verwenden, installieren Sie das folgende Paket in Ihrer Anwendung:
dotnet add package MassiveJobs.RabbitMqBroker.Hosting In Ihrer Startkurs, bei der Konfiguration von Diensten, call services.AddMassiveJobs() .
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
} Dadurch werden die erforderlichen Massivejobs -Dienste registriert und einen von Hintergrund gehosteten Service für die Leitung der Arbeitnehmer beginnen. Jetzt können Sie Jobs von einem Controller veröffentlichen. Wenn Sie beispielsweise eine Customer haben und eine willkommene E -Mail an einen neu erstellten Kunden senden möchten, haben Sie möglicherweise so etwas wie folgt:
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
} Es ist sehr wichtig zu beachten, dass SendWelcomeEmailJob.Publish nicht an der Transaktion beteiligt ist . Rabbitmqbroker für Massivejobs unterstützt keine Transaktionen. Die Publish wird jedoch eine Ausnahme ausführen, wenn das Veröffentlichen fehlschlägt (nur Veröffentlichen - nicht tatsächlich die E -Mail senden, was asynrnisch gemacht wird). Wenn das Verlagswesen fehlschlägt, wird die Ausnahme ausgelöst, und trans.Commit() wird niemals aufgerufen, und die Transaktion wird auf Entsendung gerollt.
Im Begründung wird hier die Veröffentlichung eines Jobs als letzte engagierende Ressource genutzt.
Das SendWelcomeEmailJob könnte ungefähr so aussehen:
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}Hier gibt es mehrere Dinge zu beachten:
customer.IsEmailSent überprüft, bevor Sie etwas tun. Wenn es auf wahr ist, tun wir nichts (es wird keine Ausnahme ausgelöst, da die Ausnahme die Massivejobs -Bibliothek den Auftrag für Wiederholungen planen lässt).SaveChanges() im DB -Kontext an , bevor wir die E -Mail tatsächlich senden, damit sie Ausnahmen von Parallelität veröffentlichen können, die den Job für später verschieben können (aber Sie müssen die Eigenschaften von Parallelität auf Ihrem Gebot konfigurieren, damit er funktioniert). In diesem speziellen Fall ist unsere Jobklasse jedoch nicht vollständig idempotent. Es kann immer noch passieren, dass die E -Mail zweimal gesendet wird, da der E -Mail -Server nicht an der Transaktion teilnimmt. Wenn client.Send die Ausnahme von Zeitüberschreitungen ausgelöst hat, ist es ungewiss, ob die E -Mail tatsächlich gesendet wurde oder nicht. Der Mailserver hat möglicherweise die Anfrage erhalten, die Nachricht zur Zustellung in die Warteschlange gestellt, aber wir haben die Antwort aufgrund eines temporären Netzwerkproblems nie erhalten. Mit anderen Worten, mindestens einmal wird die Lieferung in diesem Fall garantiert, nicht genau einmal .
Wenn nur Datenbankänderungen in den Job beteiligt wären, könnten wir genau einmal Garantien haben. Aber selbst dann kann die Perform des Jobs zweimal aufgerufen werden, sodass Sie sicherstellen müssen, dass der Job in der Methode Perform (ähnlich wie wir es mit IsEmailSent gemacht haben).