rabbitmqメッセージブローカーを搭載した.netのシンプルで分散したバックグラウンド処理用のオープンソースライブラリ。ドキュメントは、終了したら、MassiveJobs.net Webサイトで入手できます。
rabbitmqの既存のインストールがない場合、最も簡単な方法はコンテナで開始することです。次のコマンドは、停止時にすぐに削除されるコンテナ内でRabbitMQを開始します。
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:managementこれで、ブラウザでrabbitmq管理UIにアクセスできるはずです:http:// localhost:15672アドレス。接続、キューなどを監視する場合は、ユーザー名ゲストおよびパスワードゲストでサインインできます。
このクイックスタートには.NET Core 3.1 CLIを使用しますが、Visual Studioで、.NET Coreまたは.NET Framework 4.6.1以降でそれを行うこともできます。
プロジェクトのフォルダーを作成します。
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStart新しいコンソールアプリケーションプロジェクトを作成します。
dotnet new console足場プロジェクトをテストします。
dotnet runあなたはHello World!数秒後。
MassiveJobs.RabbitMqBrokerにパッケージ参照を追加します。
dotnet add package MassiveJobs.RabbitMqBrokerお気に入りの編集者を使用してプログラムを開き、このコードを入力します。コードのコメントは、何が起こっているかについての基本的なアイデアを与えるのに十分なはずです。
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}3つの異なるコマンドプロンプト(またはパワーシェル)を開始します。 2つは労働者として使用され、1つは出版社として使用されます。
アプリケーションを開始するには、プロジェクトフォルダーに移動して実行します。
dotnet runタイプ1とEnterを押してワーカーを開始し、タイプ2を押し、Enterを押して出版社を開始します。パブリッシャーコンソールにメッセージを入力すると、どちらか一方のワーカーで処理されているが、両方ではないことに気付くでしょう。これは、仕事が労働者間で分配されるためです。
複数の出版社も始めることができることに注意してください。
労働者と出版社は、RabbitMQサーバーにアクセスできる限り、異なるマシンに載ることができます。
複数のマシンにワーカーを配布するには、RabbitMQサーバーに関する情報を構成する必要があります。少なくとも、ユーザー名、パスワード、ホスト名(またはIPアドレス)、およびポート番号(標準以外のポートの接続をリッスンするように構成されている場合)を意味します。上記の例では、デフォルトで十分だったため、いずれも構成しませんでした-USERNAME: guest 、password: guest 、hostname: localhost 、port: -1 (=デフォルトポートを使用)。
たとえば、rabbitmqサーバーがホスト名rabbit.example.localを使用してマシンで実行されている場合、標準ポート番号でリスニングd0ntUseTh!sPassれている場合、パスワードでRabbitMqJobsでmassiveユーザーを作成しました。
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;または、ワーカースレッドを起動したくない場合(つまり、ジョブの公開にのみプロセスを使用するために)。
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;これで、労働者(および出版社)を複数のマシンに展開して実行できます。ネットワークの接続性が機能している場合(ファイアウォールを開くなど)、すべてが機能するはずです。仕事は、ラウンドロビンの方法で労働者にルーティングされます。デフォルトでは、すべてのMassiveJobsアプリケーションが2つのワーカースレッドを開始していることに注意してください。つまり、3つのマシンがある場合、それぞれが1つの大規模なジョブアプリケーションを実行している場合、ジョブの分布は次のようになります。
クイックスタートの例では、2つのPoserShell Windowsで2つの大規模なジョブアプリケーションを実行していたときに、2つのメッセージが1つのウィンドウに、次の2つはもう1つのウィンドウなどになります。今、あなたはその理由を知っています。
アプリケーションが.NETコアホスト環境(ASP.NETコアWebアプリケーションまたはワーカーサービス)で実行されている場合は、このセクションをスキップします。
MassiveJobsを実行しているアプリケーションでのログを構成することは非常に重要です。これは、アプリケーションでMassiveJobsの実行時エラーを確認する唯一の方法だからです。次のロガーライブラリのいずれかを使用している場合、適切なパッケージをインストールし、初期化時にJobLoggerFactoryを設定するのと同じくらい簡単です。
MassiveJobs.Logging.Log4Netを使用)MassiveJobs.Logging.NLogを使用)MassiveJobs.Logging.Serilogを使用)たとえば、クイックスタートの例にlog4Netロギングを追加する場合は、最初にMassiveJobs.Logging.Log4Netパッケージをプロジェクトにインストールします。その後、log4netライブラリを初期化し、最後に大規模なジョブを開始します。
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}通常、ロギングライブラリの初期化を行うため、自分で「初期化」を実装する必要があります。たとえば、Log4Netの場合、これはコンソールAppenderのみを構成します。
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}ワーカーアプリケーションを開始すると、コンソールにロギングメッセージが表示されるはずです。
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.パブリッシャーアプリケーションを開始した場合、最初のメッセージを送信しようとするまでRabbitMQに接続しようとしないことに気付くでしょう。これは、すべてのMassiveJobsアプリケーションがRabbitMQへの2つの接続を維持しているためです。1つは公開用、もう1つはメッセージを消費するためです。出版社では、私たちは労働者を起動していないため、接続の消費は初期化されていません。
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - ConnectedMassiveJobs.RabbitMqBrokerを使用する.NETコアホスト環境(ASP.NETコア、ワーカーサービス)で、アプリケーションに次のパッケージをインストールします。
dotnet add package MassiveJobs.RabbitMqBroker.Hosting次に、スタートアップクラスで、サービスを構成するときに、 services.AddMassiveJobs()を呼び出します。
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
}これにより、必要なMassiveJobsサービスを登録し、ジョブワーカーを実行するためのバックグラウンドホストサービスを開始します。これで、コントローラーからジョブを公開できます。たとえば、 Customerエンティティがあり、新しく作成された顧客にウェルカムメールを送信したい場合は、次のようなものがある場合があります。
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
} SendWelcomeEmailJob.Publishトランザクション_に参加しないことに留意することは非常に重要です。 rabbitmqbroker for MassiveJobsはトランザクションをサポートしていません。ただし、 Publishメソッドは、公開が失敗した場合に例外をスローします(公開のみ - 実際にはメールを送信するのではなく、非同期に行われます)。公開が失敗した場合、例外がスローされ、 trans.Commit()が呼び出されることはなく、トランザクションは処分時に巻き戻されます。
耐久性は、ジョブを公開することは、ここで最後のコミットリソースとして使用されます。
SendWelcomeEmailJobは次のようになる可能性があります:
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}ここに注意すべきことがいくつかあります:
customer.IsEmailSent何かをする前にチェックされます。それが真実に設定されている場合、私たちは何もしません(例外はスローされません。SaveChanges()呼び出して、後でジョブを再スケジュールする並行性例外をスローできるようにしています(ただし、機能するためには、エンティットの並行性プロパティを構成する必要があります)。ただし、この特定のケースでは、ジョブクラスは完全には慣習的ではありません。電子メールサーバーがトランザクションに参加しないため、電子メールが2回送信されることはまだ起こるかもしれません。 client.Sendがタイムアウトの例外をスローする場合、電子メールが実際に送信されたかどうかは不明です。メールサーバーはリクエストを受け取っている可能性があり、配信のためのメッセージをキードしましたが、一時的なネットワークの問題のために応答はありませんでした。別の言い方をすれば、この場合、少なくとも一度は配送が保証されますが、一度だけではありません。
データベースの変更のみがジョブに関与した場合、 1回保証することができます。しかし、それでも、ジョブのPerform方法は2回呼び出すことができるため、ジョブがPerform方法で等であることを確認する必要があります( IsEmailSentで行ったものと同様)。