ไลบรารีโอเพนซอร์ซสำหรับการประมวลผลพื้นหลังแบบง่ายกระจายสำหรับ. NET ขับเคลื่อนโดยนายหน้าข้อความ RabbitMQ เอกสารประกอบเมื่อเสร็จแล้วจะมีอยู่ที่เว็บไซต์ MassiveJobs.net
หากคุณไม่มีการติดตั้ง RabbitMQ ที่มีอยู่วิธีที่ง่ายที่สุดคือการเริ่มต้นในคอนเทนเนอร์ คำสั่งต่อไปนี้จะเริ่ม RabbitMQ ในคอนเทนเนอร์ ที่จะถูกลบออกทันทีเมื่อหยุด
docker run -- rm -- hostname rabbit - test -- name rabbit - test - d - p 15672 : 15672 - p 5672 : 5672 rabbitmq:managementตอนนี้คุณควรจะสามารถเข้าถึง UI การจัดการ RabbitMQ ในเบราว์เซอร์ของคุณได้ที่: http: // localhost: 15672 ที่อยู่ คุณสามารถลงชื่อเข้าใช้กับ แขกรับ เชิญและรหัสผ่านของ ผู้ใช้ หากคุณต้องการตรวจสอบการเชื่อมต่อคิว ฯลฯ
เราจะใช้. NET Core 3.1 CLI สำหรับการเริ่มต้นอย่างรวดเร็วนี้ แต่คุณสามารถทำได้ใน Visual Studio ด้วย. NET Core หรือด้วย. NET Framework 4.6.1 หรือใหม่กว่า
สร้างโฟลเดอร์สำหรับโครงการ
mkdir MassiveJobs.QuickStart
cd MassiveJobs.QuickStartสร้างโครงการแอปพลิเคชันคอนโซลใหม่
dotnet new consoleทดสอบโครงการนั่งร้าน
dotnet run คุณควรเห็น Hello World! หลังจากสองสามวินาที
เพิ่มการอ้างอิงแพ็คเกจไปยัง MassiveJobs.RabbitMqBroker
dotnet add package MassiveJobs.RabbitMqBrokerใช้โปรแกรมแก้ไขที่คุณชื่นชอบเพื่อเปิด program.cs และป้อนรหัสนี้ ความคิดเห็นในรหัสควรเพียงพอที่จะให้แนวคิดพื้นฐานเกี่ยวกับสิ่งที่เกิดขึ้น
using System ;
using MassiveJobs . Core ;
using MassiveJobs . RabbitMqBroker ;
namespace MassiveJobs . QuickStart
{
/// <summary>
/// This is a "job" class.
/// It will be instantiated every time a message is received, and Perform will be called.
/// It inherits from Job<TJob, TArgs> generic class, where TJob specifies the type of the job,
/// and TArgs specifies the type of the parameter expected by the Perform method.
///
/// In the example below, TArgs is a string, but it can be a custom class with multiple properties.
/// TArgs instances will be serialized (System.Text.Json by default) as a part of the job,
/// before it gets sent to the RabbitMQ.
/// </summary>
public class MessageReceiver : Job < MessageReceiver , string >
{
public override void Perform ( string message )
{
Console . WriteLine ( "Job performed: " + message ) ;
}
}
class Program
{
private static void Main ( )
{
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
JobsBuilder . DisposeJobs ( ) ;
}
private static void RunWorker ( )
{
Console . WriteLine ( "Initialized job worker." ) ;
Console . WriteLine ( "Press Enter to end the application." ) ;
Console . ReadLine ( ) ;
}
private static void RunPublisher ( )
{
Console . WriteLine ( "Initialized job publisher" ) ;
Console . WriteLine ( "Write the job name and press Enter to publish it (empty job name to end)." ) ;
while ( true )
{
Console . Write ( "> " ) ;
var message = Console . ReadLine ( ) ;
if ( string . IsNullOrWhiteSpace ( message ) ) break ;
// notice that Publish is a static method on our MessageReceiver class
// it is available because MessageReceiver inherits from Job<TJob, TArgs>
MessageReceiver . Publish ( message ) ;
}
}
}
}เริ่มต้นคำสั่งสามคำสั่งที่แตกต่างกัน (หรือเชลล์พลังงาน) สองจะถูกใช้เป็นคนงานและจะใช้เป็นสำนักพิมพ์
ในการเริ่มต้นแอปพลิเคชันไปที่โฟลเดอร์โครงการและเรียกใช้:
dotnet run Type 1 และ Hit Enter เพื่อเริ่มงาน Type 2 และ Hit Enter เพื่อเริ่มต้นผู้เผยแพร่ เมื่อคุณป้อนข้อความในคอนโซลผู้เผยแพร่คุณจะสังเกตเห็นว่าพวกเขากำลังดำเนินการในหนึ่งหรือคนอื่น ๆ แต่ไม่ใช่ทั้งสองอย่าง นี่เป็นเพราะมีการแจกจ่ายงานระหว่างคนงาน
โปรดทราบว่าคุณสามารถเริ่มต้นผู้เผยแพร่หลายรายได้เช่นกัน
คนงานและผู้เผยแพร่สามารถอยู่ในเครื่องที่แตกต่างกันตราบใดที่พวกเขาสามารถเข้าถึงเซิร์ฟเวอร์ RabbitMQ
ในการแจกจ่ายคนงานข้ามเครื่องหลายเครื่องคุณจะต้องกำหนดค่าข้อมูลเกี่ยวกับเซิร์ฟเวอร์ RabbitMQ อย่างน้อยที่สุดนั่นหมายถึงชื่อผู้ใช้รหัสผ่านชื่อโฮสต์ (หรือที่อยู่ IP) และหมายเลขพอร์ต (หากเซิร์ฟเวอร์ RabbitMQ ของคุณได้รับการกำหนดค่าให้ฟังการเชื่อมต่อบนพอร์ตที่ไม่ได้มาตรฐาน) ในตัวอย่างข้างต้นเราไม่ได้กำหนดค่าใด ๆ เพราะค่าเริ่มต้นเพียงพอ - ชื่อผู้ใช้: guest , รหัสผ่าน: guest , ชื่อโฮสต์: localhost , พอร์ต: -1 (= ใช้พอร์ตเริ่มต้น)
ตัวอย่างเช่นหากเซิร์ฟเวอร์ RabbitMQ ของคุณทำงานบนเครื่องด้วย HostName rabbit.example.local การฟังหมายเลขพอร์ตมาตรฐานและคุณได้สร้างผู้ใช้ massive ใน RabbitMqJobs ด้วยรหัสผ่าน: d0ntUseTh!sPass
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
s . HostNames = new [ ] { "rabbit.example.com" } ;
s . Username = "massive" ;
s . Password = "d0ntUseTh!sPass" ;
} )
. Build ( ) ;หรือถ้าคุณไม่ต้องการเริ่มเธรดคนงาน (เช่นการใช้กระบวนการสำหรับการเผยแพร่งานเท่านั้น):
JobsBuilder . Configure ( )
. WithRabbitMqBroker ( s =>
{
//...
} )
. Build ( false ) ;ตอนนี้คุณสามารถปรับใช้คนงาน (และผู้เผยแพร่) ในหลายเครื่องและเรียกใช้ หากการเชื่อมต่อเครือข่ายใช้งานได้ (ไฟร์วอลล์เปิด ฯลฯ ) ทุกอย่างควรทำงาน งานจะถูกส่งไปยังคนงานในรูปแบบรอบโรบิน โปรดทราบว่าโดยค่าเริ่มต้นแอปพลิเคชัน MassiveJobs ทุกตัวจะเริ่มต้นเธรดคนงานสองคน นั่นหมายความว่าถ้าคุณมี 3 เครื่องแต่ละเครื่องใช้แอปพลิเคชัน MassiveJobs หนึ่งเครื่องการกระจายงานจะมีลักษณะเช่นนี้:
คุณอาจสังเกตเห็นในตัวอย่างที่เริ่มต้นอย่างรวดเร็วเมื่อเราใช้แอพพลิเคชั่น MassiveJobs สองตัวในสองหน้าต่างโพสท่าสองข้อความข้อความสองข้อความจะไปที่หน้าต่างหนึ่งสองรายการถัดไปไปยังหน้าต่างอื่นและอื่น ๆ ตอนนี้คุณรู้เหตุผล
ข้ามส่วนนี้หากแอปพลิเคชันของคุณทำงานในสภาพแวดล้อมที่โฮสต์. NET Core (ASP.NET Core Web Application หรือบริการผู้ปฏิบัติงาน)
เป็นสิ่งสำคัญมากในการกำหนดค่าการบันทึกในแอปพลิเคชันของคุณที่ใช้ MassiveJobs เพราะนั่นเป็นวิธีเดียวที่จะเห็นข้อผิดพลาดในการรันเวลา MassiveJobs ในแอปพลิเคชันของคุณ มันง่ายพอ ๆ กับการติดตั้งแพ็คเกจที่เหมาะสมและการตั้งค่า JobLoggerFactory ในการเริ่มต้นหากคุณใช้หนึ่งในไลบรารี logger ต่อไปนี้:
MassiveJobs.Logging.Log4Net )MassiveJobs.Logging.NLog )MassiveJobs.Logging.Serilog ) ตัวอย่างเช่นหากคุณต้องการเพิ่มการบันทึก log4Net ในตัวอย่างที่เริ่มต้นอย่างรวดเร็วให้ติดตั้งแพ็คเกจ MassiveJobs.Logging.Log4Net ก่อนอื่นในโครงการของคุณ หลังจากนั้นให้เริ่มต้นไลบรารี log4net และในที่สุด MassiveJobs
//...
using MassiveJobs . Logging . Log4Net ;
//...
private static void Main ( )
{
InitializeLogging ( ) ;
Console . WriteLine ( "1: Worker" ) ;
Console . WriteLine ( "2: Publisher" ) ;
Console . Write ( "Choose 1 or 2 -> " ) ;
var startWorkers = Console . ReadLine ( ) != "2" ;
// We are not starting job workers if '2' is selected.
// This is not mandatory, an application can run job workers
// and publish jobs using the same MassiveJobs instance.
JobsBuilder . Configure ( )
. WithLog4Net ( )
. WithRabbitMqBroker ( )
. Build ( startWorkers ) ;
if ( startWorkers )
{
RunWorker ( ) ;
}
else
{
RunPublisher ( ) ;
}
}คุณต้องใช้ "initializelogging" ด้วยตัวเองตามปกติคุณจะเริ่มต้นสำหรับไลบรารีการบันทึก ตัวอย่างเช่นสำหรับ log4Net สิ่งนี้จะกำหนดค่าคอนโซล Appender เท่านั้น
private static void InitializeLogging ( )
{
var patternLayout = new PatternLayout ( ) ;
patternLayout . ConversionPattern = "%date [%thread] %-5level %logger - %message%newline" ;
patternLayout . ActivateOptions ( ) ;
var hierarchy = ( Hierarchy ) LogManager . GetRepository ( Assembly . GetExecutingAssembly ( ) ) ;
hierarchy . Root . AddAppender ( new ConsoleAppender { Layout = patternLayout } ) ;
hierarchy . Root . Level = Level . Warn ;
hierarchy . Configured = true ;
}ตอนนี้เมื่อคุณเริ่มแอปพลิเคชันคนงานคุณควรเห็นข้อความบันทึกในคอนโซล:
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 1
2020 - 11 - 10 10 : 25 : 22 , 251 [ 1 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessageConsumer - Connected
Initialized job worker.
Press Enter to end the application.คุณจะสังเกตเห็นว่าถ้าคุณเริ่มแอปพลิเคชันผู้เผยแพร่จะไม่พยายามเชื่อมต่อกับ RabbitMQ จนกว่าคุณจะพยายามส่งข้อความแรก นี่เป็นเพราะแอปพลิเคชัน MassiveJobs ทุกตัวจะมีการเชื่อมต่อสองครั้งกับ RabbitMQ หนึ่งรายการสำหรับการเผยแพร่และอื่น ๆ สำหรับการบริโภคข้อความ ในสำนักพิมพ์เราไม่ได้เริ่มทำงานดังนั้นการใช้การเชื่อมต่อจึงไม่ได้เริ่มต้น
PS > dotnet run
1 : Worker
2 : Publisher
Choose 1 or 2 - > 2
Initialized job publisher
Write the job name and press Enter to publish it (empty job name to end ).
> Hello
2020 - 11 - 10 10 : 27 : 22 , 954 [ 4 ] WARN MassiveJobs.RabbitMqBroker.RabbitMqMessagePublisher - Connected หากต้องการใช้ MassiveJobs.RabbitMqBroker ในสภาพแวดล้อมที่โฮสต์. NET Core (ASP.NET Core, บริการผู้ปฏิบัติงาน) ติดตั้งแพ็คเกจต่อไปนี้ในแอปพลิเคชันของคุณ:
dotnet add package MassiveJobs.RabbitMqBroker.Hosting จากนั้นในชั้นเรียนเริ่มต้นของคุณเมื่อกำหนดค่าบริการโทร services.AddMassiveJobs()
//...
using MassiveJobs . RabbitMqBroker . Hosting ;
namespace MassiveJobs . Examples . Api
{
public class Startup
{
public Startup ( IConfiguration configuration )
{
Configuration = configuration ;
}
public IConfiguration Configuration { get ; }
public void ConfigureServices ( IServiceCollection services )
{
//...
services . AddMassiveJobs ( )
. UseRabbitMqBroker ( ) ;
}
//...
}
} สิ่งนี้จะลงทะเบียนบริการ MassiveJobs ที่จำเป็นและเริ่มบริการโฮสต์พื้นหลังสำหรับการทำงาน ตอนนี้คุณสามารถเผยแพร่งานจากคอนโทรลเลอร์ ตัวอย่างเช่นหากคุณมีนิติบุคคล Customer และต้องการส่งอีเมลต้อนรับไปยังลูกค้าที่สร้างขึ้นใหม่คุณอาจมีอะไรเช่นนี้:
// POST: api/Customers
[ HttpPost ]
public ActionResult < Customer > PostCustomer ( Customer customer )
{
using var trans = _context . Database . BeginTransaction ( ) ;
_context . Customers . Add ( customer ) ;
_context . SaveChanges ( ) ;
if ( ! string . IsNullOrWhiteSpace ( customer . Email ) )
{
// send a welcome email after 5 seconds
SendWelcomeEmailJob . Publish ( customer . Id , TimeSpan . FromSeconds ( 5 ) ) ;
}
// do this last. If Job publishing to RabbitMq fails, we will rollback
trans . Commit ( ) ;
return CreatedAtAction ( "GetCustomer" , new { id = customer . Id } , customer ) ;
} เป็นสิ่งสำคัญมากที่จะต้องจำไว้ว่า SendWelcomeEmailJob.Publish ไม่ได้มีส่วนร่วมในการทำธุรกรรม _ Rabbitmqbroker สำหรับ MassiveJobs ไม่สนับสนุนการทำธุรกรรม แต่วิธี Publish จะทำให้เกิดข้อยกเว้นหากการเผยแพร่ล้มเหลว (เฉพาะการเผยแพร่ - ไม่ได้ส่งอีเมลซึ่งทำแบบอะซิงโครไนซ์) หากการเผยแพร่ล้มเหลวข้อยกเว้นจะถูกโยนและ trans.Commit() จะไม่ถูกเรียกและการทำธุรกรรมจะถูกรีดกลับในการกำจัด
การเผยแพร่งานนั้นใช้เป็น ทรัพยากรที่กระทำครั้งสุดท้าย
SendWelcomeEmailJob อาจมีลักษณะเช่นนี้:
public class SendWelcomeEmailJob : Job < SendWelcomeEmailJob , int >
{
private readonly ExamplesDbContext _context ;
public SendWelcomeEmailJob ( ExamplesDbContext context )
{
_context = context ;
}
public override void Perform ( int customerId )
{
using var trans = _context . Database . BeginTransaction ( ) ;
var customer = _context . Customers . Find ( customerId ) ;
if ( customer . IsEmailSent ) return ; // make the job idempotent
customer . IsEmailSent = true ;
// Do this before sending email, to lessen the chance of an exception on commit.
// Also, if optimistic concurrency is enabled, we will fail here, before sending the email.
// This way we avoid sending the email to the customer twice.
_context . SaveChanges ( ) ;
SendEmail ( customer ) ;
// Do this last. In case the SendEmail method fails, the transaction will be rolled back.
trans . Commit ( ) ;
}
private static void SendEmail ( Customer customer )
{
var mailMessage = new MailMessage
{
From = new MailAddress ( "[email protected]" ) ,
Body = $ "Welcome customer { customer . FirstName } { customer . LastName } " ,
Subject = "Welcome to examples.com"
} ;
mailMessage . To . Add ( customer . Email ) ;
using ( var client = new SmtpClient ( "smtp.examples.com" ) )
{
client . UseDefaultCredentials = false ;
client . Credentials = new NetworkCredential ( "username" , "password" ) ;
client . Send ( mailMessage ) ;
}
}
}มีหลายสิ่งที่ควรทราบที่นี่:
customer.IsEmailSent มีการตรวจสอบก่อนที่จะทำอะไร หากถูกตั้งค่าเป็นจริงเราจะไม่ทำอะไรเลย (ไม่มีข้อยกเว้นใด ๆSaveChanges() ในบริบท DB ก่อนที่ จะส่งอีเมลจริง ๆ เพื่อให้สามารถ โยนข้อยกเว้นพร้อมกัน ซึ่งจะจัดตารางงานใหม่ในภายหลัง (แต่ คุณต้องกำหนดค่าคุณสมบัติพร้อมกันในการเข้าร่วมของคุณ เพื่อให้ทำงาน) อย่างไรก็ตามในกรณีนี้คลาสงานของเราไม่ได้เป็น idempotent อย่างสมบูรณ์ อาจเกิดขึ้นได้ว่าอีเมลจะถูกส่งสองครั้งเนื่องจากเซิร์ฟเวอร์อีเมลไม่ได้เข้าร่วมในการทำธุรกรรม หาก client.Send โยนข้อยกเว้น หมดเวลา จะไม่แน่ใจว่าอีเมลถูกส่งจริงหรือไม่ เซิร์ฟเวอร์เมลอาจได้รับคำขอเข้าคิวข้อความสำหรับการจัดส่ง แต่เราไม่เคยได้รับการตอบกลับเนื่องจากปัญหาเครือข่ายชั่วคราว กล่าวอีกนัยหนึ่ง อย่างน้อยหนึ่งครั้ง รับประกันการจัดส่งในกรณีนี้ไม่ใช่ ครั้งเดียว
หากมี เพียงการเปลี่ยนแปลงฐานข้อมูล ที่เกี่ยวข้องกับงานเราก็สามารถรับประกันได้ ทันที แต่ถึงอย่างนั้นวิธี Perform ของงานสามารถเรียกได้สองครั้งดังนั้นคุณ ต้องตรวจสอบให้แน่ใจว่างานนั้นเป็น idempotent ในวิธี Perform (คล้ายกับที่เราทำกับ IsEmailSent )