RabbitMQ 메시지 브로커로 구동되는 .NET 용 간단하고 분산 배경 처리를위한 오픈 소스 라이브러리. 이 문서는 일단 완료되면 MassiveJobs.net 웹 사이트에서 제공됩니다.
RabbitMQ의 기존 설치가없는 경우 가장 간단한 방법은 컨테이너에서 시작하는 것입니다. 다음 명령은 컨테이너에서 RabbitMQ를 시작하여 중지되면 즉시 제거됩니다 .
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:management이제 브라우저에서 RabbitMQ Management UI에 액세스 할 수 있어야합니다. http : // localhost : 15672 주소. 연결, 대기열 등을 모니터링하려면 사용자 이름 게스트 및 비밀번호 게스트 로 로그인 할 수 있습니다.
이 빠른 시작에 .NET Core 3.1 CLI를 사용하지만 Visual Studio, .NET Core 또는 .NET Framework 4.6.1 이상으로 수행 할 수도 있습니다.
프로젝트 용 폴더를 만듭니다.
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStart새 콘솔 응용 프로그램 프로젝트를 만듭니다.
dotnet new console비계 프로젝트를 테스트하십시오.
dotnet run 당신은 Hello World! 몇 초 후.
MassiveJobs.RabbitMqBroker 에 대한 패키지 참조를 추가하십시오.
dotnet add package MassiveJobs.RabbitMqBroker좋아하는 편집기를 사용하여 Program.cs를 열고이 코드를 입력하십시오. 코드의 의견은 무슨 일이 일어나고 있는지에 대한 기본 아이디어를 제공하기에 충분해야합니다.
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}세 가지 다른 명령 프롬프트 (또는 파워 쉘)를 시작하십시오. 두 개는 근로자로 사용되며 하나는 출판사로 사용됩니다.
응용 프로그램을 시작하려면 프로젝트 폴더로 이동하여 실행합니다.
dotnet run 유형 1 과 Enter를 누르면 작업자를 시작하고 유형 2 시작하고 Enter를 누르려면 출판사를 시작하십시오. 게시자 콘솔에 메시지를 입력 할 때 하나 또는 다른 작업자에서 처리되는 것을 알 수 있습니다. 일자리가 근로자들 사이에 분배되기 때문입니다.
여러 게시자도 시작할 수 있습니다.
작업자와 게시자는 RabbitMQ 서버에 액세스 할 수있는 한 다른 기계에있을 수 있습니다.
여러 기계에 작업자를 배포하려면 RabbitMQ 서버에 대한 정보를 구성해야합니다. 이는 최소한 사용자 이름, 암호, 호스트 이름 (또는 IP 주소) 및 포트 번호 (RabbitMQ 서버가 비표준 포트에서 연결을 듣도록 구성된 경우)를 의미합니다. 위의 예에서는 기본값이 충분했기 때문에 - 사용자 이름 : guest , Password : guest , Hostname : localhost , 포트 : -1 (= 기본 포트 사용)이기 때문에 구성하지 않았습니다.
예를 들어, RabbitMQ 서버가 호스트 이름 rabbit.example.local 이있는 컴퓨터에서 실행중인 경우 표준 포트 번호를 듣고 RabbitMQ에서 d0ntUseTh!sPass 사용하여 RabbitMqJobs massive 합니다.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;또는 작업자 스레드를 시작하고 싶지 않은 경우 (예 : 작업을 위해 프로세스 만 사용하려면) :
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;이제 여러 기계에 작업자 (및 게시자)를 배치하여 실행할 수 있습니다. 네트워크 연결이 작동하는 경우 (방화벽이 열려 있음) 모든 것이 작동해야합니다. 일자리는 라운드 로빈 방식으로 노동자들에게 라우팅 될 것입니다. 기본적으로 모든 MassiveJobs 응용 프로그램은 2 개의 작업자 스레드를 시작합니다. 즉, 3 개의 기계가있는 경우 각각 하나의 대규모 작업 응용 프로그램을 실행하는 경우 작업 배포가 다음과 같이 보입니다.
빠른 시작 예에서 2 개의 Posershell Windows에서 두 개의 MassiveJobs 응용 프로그램을 실행했을 때 두 개의 메시지가 한 창으로, 다음 2 개는 다른 창 등으로 이동했습니다. 이제 그 이유를 알고 있습니다.
.NET Core 호스팅 환경 (ASP.NET Core Web Application 또는 Worker Service)에서 응용 프로그램이 실행중인 경우이 섹션을 건너 뜁니다.
MassiveJobs를 실행하는 응용 프로그램의 로깅을 구성하는 것이 매우 중요합니다. 이는 응용 프로그램에서 MassiveJobs 런타임 오류를 볼 수있는 유일한 방법이기 때문입니다. 다음 로거 라이브러리 중 하나를 사용하는 경우 적절한 패키지를 설치하고 초기화시 JobLoggerFactory 설정하는 것만 큼 간단합니다.
MassiveJobs.Logging.Log4Net 사용)MassiveJobs.Logging.NLog 사용)MassiveJobs.Logging.Serilog 사용) 예를 들어, 빠른 시작 예제에 log4net 로깅을 추가하려면 먼저 프로젝트에 MassiveJobs.Logging.Log4Net 패키지를 설치하십시오. 그 후 Log4net 라이브러리를 초기화하고 마지막으로 MassiveJobs를 초기화하십시오.
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}일반적으로 로깅 라이브러리의 초기화를 수행하므로 "InitializeLogging"을 직접 구현해야합니다. 예를 들어 Log4Net의 경우 콘솔 appender 만 구성합니다.
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}이제 작업자 응용 프로그램을 시작하면 콘솔에 로깅 메시지가 표시됩니다.
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.게시자 응용 프로그램을 시작하면 첫 번째 메시지를 보내려고 할 때까지 RabbitMQ에 연결하려고하지 않습니다. 이것은 모든 MassiveJobs 응용 프로그램이 RabbitMQ와 두 가지 연결을 유지하고 하나는 게시를위한 두 가지 연결을 유지하고 다른 하나는 메시지 소비를 유지하기 때문입니다. 출판사에서는 작업자를 시작하지 않으므로 연결 소비가 초기화되지 않습니다.
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - Connected .NET Core 호스팅 환경 (ASP.NET Core, Worker Services)에서 MassiveJobs.RabbitMqBroker 사용하려면 응용 프로그램에 다음 패키지를 설치하십시오.
dotnet add package MassiveJobs.RabbitMqBroker.Hosting 그런 다음 스타트 업 클래스에서 서비스를 구성 할 때 services.AddMassiveJobs() 호출합니다.
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
} 이것은 필요한 대규모 조브 서비스를 등록하고 구직자를 운영하기위한 배경 호스팅 서비스를 시작합니다. 이제 컨트롤러에서 작업을 게시 할 수 있습니다. 예를 들어, Customer 엔티티가 있고 새로 생성 된 고객에게 환영 이메일을 보내려면 다음과 같은 것이있을 수 있습니다.
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
} SendWelcomeEmailJob.Publish 거래에 참여하지 않는다는 점을 명심해야합니다. MassiveJobs 용 RabbitMqbroker는 거래를 지원하지 않습니다. 그러나 Publish 방법은 출판이 실패하면 예외가 발생합니다 (출판 만 - 실제로 메일을 보내지 않고 비동기 적으로 수행됩니다). 출판이 실패하면 예외가 발생하고 trans.Commit() 절대 호출되지 않으며 거래는 폐기시 롤백됩니다.
본질적으로, 직업을 게시하는 것은 여기에서 마지막으로 커밋 된 자원 으로 사용됩니다.
SendWelcomeEmailJob 은 다음과 같이 보일 수 있습니다.
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}여기에는 몇 가지 사항이 있습니다.
customer.IsEmailSent 무엇이든하기 전에 메일을 확인합니다. 그것이 진실로 설정되면 우리는 아무것도하지 않습니다 (예외는 예외가 발생하지 않습니다. 예외는 MassiveJobs 라이브러리가 재시에 대한 작업을 일정으로 만들기 때문에).SaveChanges() 호출하여 동시성 예외를 던져 나중에 작업을 조정할 수 있습니다 (그러나 귀하는 Entites에서 동시성 속성을 구성해야합니다 ). 그러나이 특별한 경우, 우리의 구인 수업은 완전히 묘사되지 않습니다. 전자 메일 서버가 트랜잭션에 참여하지 않기 때문에 이메일이 두 번 전송 될 수 있습니다. client.Send 시간 초과 예외를 던지면 이메일이 실제로 전송되었는지 여부는 확실하지 않습니다. Mail Server는 요청을 받고 배송 메시지를 대기했을 수도 있지만 임시 네트워크 문제로 인해 응답을받지 못했습니다. 다른 말로,이 경우 적어도 한 번은 배달이 보장됩니다. 정확히 한 번은 아닙니다.
데이터베이스 변경 만 작업에 관련된 경우 정확히 한 번 보장 할 수 있습니다. 그러나 그럼에도 불구하고 작업의 Perform 방법은 두 번 호출 될 수 있으므로 작업이 Perform 방법 ( IsEmailSent 로 수행 한 작업과 유사)에서 작업이 효과적인지 확인해야합니다 .