Biblioteca de código abierto para un procesamiento de fondo simple y distribuido para .NET, propulsado por el corredor de mensajes de RabbitMQ. La documentación, una vez terminada, estará disponible en el sitio web de MassiveJobs.net.
Si no tiene una instalación existente de RabbitMQ, la forma más simple es iniciarla en un contenedor. El siguiente comando comenzará con RabbitMQ en un contenedor que se eliminará inmediatamente cuando se detenga .
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:managementAhora, debería poder acceder a la interfaz de usuario de gestión de RabbitMQ en su navegador en: http: // localhost: 15672 dirección. Puede iniciar sesión con el invitado de usuario y la contraseña de usuario , si desea monitorear las conexiones, colas, etc.
Usaremos .NET Core 3.1 CLI para este inicio rápido, pero también puede hacerlo en Visual Studio, con .NET Core o con .NET Framework 4.6.1 o posterior.
Crea una carpeta para el proyecto.
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStartCree un nuevo proyecto de aplicación de consola.
dotnet new consolePrueba el proyecto andamio.
dotnet run ¡Deberías ver Hello World! después de un par de segundos.
Agregue una referencia de paquete a MassiveJobs.RabbitMqBroker .
dotnet add package MassiveJobs.RabbitMqBrokerUse su editor favorito para abrir Program.cs e ingrese este código. Los comentarios en el código deberían ser suficientes para darle una idea básica de lo que está sucediendo.
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}Inicie tres informes de sistema diferentes (o conchas de alimentación). Se utilizarán dos como trabajadores, y uno se utilizará como editor.
Para iniciar la aplicación, vaya a la carpeta del proyecto y ejecute:
dotnet run Escriba 1 y presione Enter para iniciar un trabajador, escriba 2 y presione Enter para iniciar el editor. A medida que ingrese mensajes en la consola del editor, notará que se procesan en uno u otro trabajador, pero no en ambos. Esto se debe a que los trabajos se distribuyen entre los trabajadores.
Tenga en cuenta que también puede iniciar múltiples editores.
Los trabajadores y editores pueden estar en diferentes máquinas, siempre que puedan acceder al servidor RabbitMQ.
Para distribuir a los trabajadores en varias máquinas, deberá configurar la información sobre el servidor RabbitMQ. Como mínimo, eso significa nombre de usuario, contraseña, nombre de host (o dirección IP) y el número de puerto (si su servidor RabbitMQ está configurado para escuchar las conexiones en un puerto no estándar). En el ejemplo anterior, no configuramos nada de eso porque los valores predeterminados eran suficientes: nombre de usuario: guest , contraseña: guest , nombre de host: localhost , puerto: -1 (= usar el puerto predeterminado).
Por ejemplo, si su servidor RabbitMQ se está ejecutando en una máquina con el nombre de host rabbit.example.local , escuchando en el número de puerto estándar, y ha creado un usuario massive en el conejo con la contraseña: d0ntUseTh!sPass entonces inicializaría RabbitMqJobs como este.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;O, si no desea iniciar los hilos de trabajadores (es decir, usar el proceso solo para publicar trabajos):
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;Ahora puede implementar trabajadores (y editores) en múltiples máquinas y ejecutarlas. Si la conectividad de red está funcionando (firewalls abiertos, etc.) todo debería funcionar. Los trabajos serían enrutados a los trabajadores de forma redonda. Tenga en cuenta que, por defecto, cada aplicación MassiveJobs está iniciando dos hilos de trabajadores. Eso significa que, si tiene 3 máquinas, cada una con una aplicación MassiveJobs, entonces la distribución de trabajos se vería así:
Es posible que haya notado, en el ejemplo de inicio rápido, cuando teníamos dos aplicaciones MassiveJobs en dos ventanas PoserShell, dos de los mensajes irían a una ventana, las siguientes dos a la otra ventana, etc. Ahora sabes la razón.
Omita esta sección si su aplicación se ejecuta en un entorno alojado de .NET Core (ASP.NET Core Web Application o Worker Service).
Es muy importante configurar el registro en su aplicación que ejecuta MassiveJobs porque esa es la única forma de ver los errores de tiempo de ejecución de MassiveJobs en su aplicación. Es tan simple como instalar un paquete adecuado y configurar la JobLoggerFactory en la inicialización, si está utilizando una de las siguientes bibliotecas de registrador:
MassiveJobs.Logging.Log4Net )MassiveJobs.Logging.NLog )MassiveJobs.Logging.Serilog ) Por ejemplo, si desea agregar registro de Log4Net al ejemplo de inicio rápido, primero instale el paquete MassiveJobs.Logging.Log4Net en su proyecto. Después de eso, inicialice la biblioteca LOG4NET y finalmente MassiveJobs.
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}Debe implementar "InitializElogging" usted mismo, ya que normalmente realiza una inicialización para su biblioteca de registro. Por ejemplo, para LOG4Net esto solo configuraría la Appender de la consola.
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}Ahora, cuando inicia la aplicación de trabajadores, debería ver mensajes de registro en la consola:
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.Notará que si inicia la aplicación del editor, no intenta conectarse a RabbitMQ hasta que intente enviar los primeros mensajes. Esto se debe a que cada aplicación MassiveJobs mantiene dos conexiones con el RabbitMQ, una para publicar y la otra para consumir mensajes. En el editor, no estamos comenzando trabajadores, por lo que no se inicializa consumir conexión.
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - Connected Para usar MassiveJobs.RabbitMqBroker en un entorno alojado .NET Core (ASP.NET Core, Worker Services) Instale el siguiente paquete en su aplicación:
dotnet add package MassiveJobs.RabbitMqBroker.Hosting Luego, en su clase de inicio, al configurar los servicios, llamar a services.AddMassiveJobs() .
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
} Esto registrará los servicios MassiveJobs requeridos e iniciará un servicio alojado de antecedentes para ejecutar los trabajadores de trabajo. Ahora puede publicar trabajos desde un controlador. Por ejemplo, si tiene una entidad Customer y desea enviar un correo electrónico de bienvenida a un cliente recién creado, es posible que tenga algo como esto:
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
} Es muy importante tener en cuenta que SendWelcomeEmailJob.Publish no participa en la transacción _. Rabbitmqbroker para MassiveJobs no admite las transacciones. Pero, el método Publish lanzará una excepción si la publicación falla (solo la publicación, no envía el correo, lo que se realiza de manera asycronamente). Si la publicación falla, se lanzará la excepción, y trans.Commit() nunca se llamará, y la transacción se lanzará en deseche.
Esencialmente, publicar un trabajo se usa aquí como un último recurso comprometido .
El SendWelcomeEmailJob podría verse algo así:
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}Hay varias cosas a tener en cuenta aquí:
customer.IsEmailSent se verifica antes de hacer cualquier cosa. Si está configurado en verdadero, no hacemos nada (no se lanza la excepción, porque la excepción haría que el programa de la biblioteca MassiveJobs sea el trabajo para reintentos)SaveChanges() en el contexto de DB antes de enviar el correo electrónico para que pueda lanzar excepciones de concurrencia que reprogramarán el trabajo para más adelante (pero debe configurar las propiedades de concurrencia en sus intereses para que funcione). Sin embargo, en este caso particular, nuestra clase de trabajo no es totalmente ideempotente. Todavía puede suceder que el correo electrónico se envíe dos veces porque el servidor de correo electrónico no participa en la transacción. Si client.Send lanza la excepción del tiempo de espera , no está claro si el correo electrónico se envió o no. El servidor de correo podría haber recibido la solicitud, colocó el mensaje para la entrega, pero nunca recibimos la respuesta debido a un problema de red temporal. En otras palabras, al menos una vez la entrega está garantizada en este caso, no exactamente una vez .
Si solo los cambios en la base de datos estuvieran involucrados en el trabajo, entonces podríamos tener exactamente garantías. Pero incluso entonces, el método Perform del trabajo se puede llamar dos veces, por lo que debe asegurarse de que el trabajo sea ideMPOTENT en el método Perform (similar a lo que hicimos con IsEmailSent ).