เสื้อคลุมที่สง่างามรอบ ๆ PHP-AMQPLIB ที่มีชื่อเสียงสำหรับกรณีใช้ 90%
การติดตั้ง
เกี่ยวกับ AMQP Agent
API
เอกสาร
การกำหนดค่า
ตัวอย่าง
ลิงค์
ใบอนุญาต
การเปลี่ยนแปลง
ลอง AMQP Agent ตอนนี้:
composer require marwanalsoltany/amqp-agent คัดลอกการกำหนดค่านี้ใน composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "วิ่ง:
composer update หมายเหตุ: AMQP Agent รองรับตอนนี้ PHP 7.1 โดยค่าเริ่มต้นเริ่มต้นจากเวอร์ชัน v1.1.1 หากคุณใช้สาขา php7.1-compatibility ในรุ่นเก่าให้อัปเดตนักแต่งเพลงของคุณ!
AMQP Agent พยายามทำให้การใช้งานของนายหน้าในโครงการ PHP ง่ายขึ้น มันใช้ค่าใช้จ่ายทั้งหมดของการสร้างและกำหนดค่าวัตถุหรือสร้างคลาสที่คุณต้องการเพื่อพูดคุยกับเซิร์ฟเวอร์ RabbitMQ (ผ่าน PHP-AMQPLIB ) และเปิดเผย API ที่ทดสอบได้อย่างเต็มที่และยืดหยุ่นซึ่งเหมาะกับโครงการเกือบทุกโครงการ
ห้องสมุด PHP-Amqplib นั้นยอดเยี่ยมและทำงานได้ดีมาก ปัญหาเดียวและเพียงอย่างเดียวคือมันค่อนข้างเปลือยเปล่าที่จะใช้ในโครงการโดยไม่ต้องสร้างชั้นเรียนห่อหุ้มของคุณเองแทบจะเป็นไปไม่ได้เลยที่จะไม่เขียนรหัสสปาเก็ตตี้ บวกกับฟังก์ชั่นวิธีการและการกำหนดค่าจำนวนมหาศาล (พารามิเตอร์) ที่มาพร้อมกับมันทำให้ยากที่จะใช้ API ที่เหมาะสมที่จะใช้ AMQP Agent แก้ปัญหานี้โดยการทำสิ่งที่เป็นนามธรรมให้มากที่สุดโดยไม่สูญเสียการควบคุมคนงานและนำคำศัพท์ที่เกี่ยวข้องกับโบรกเกอร์ข้อความกลับมาสำนักพิมพ์และผู้บริโภคเป็นสิ่งที่คุณต้องจัดการหากคุณเป็นผู้มาใหม่
ตามคำขวัญนี้ตัวแทน AMQP ทำให้การทำงานกับ RabbitMQ นั้นสนุกและสง่างามที่สุดเท่าที่จะเป็นไปได้โดยการเปิดเผยอินเทอร์เฟซที่คล่องแคล่วซึ่งนำไปใช้อย่างชาญฉลาดพอดีกับการพัฒนา PHP ที่ทันสมัยดีที่จะทำงานและใช้งานง่ายมาก ยังมีประสิทธิภาพมากและสามารถเขียนทับนิสัยใจคอที่เล็กที่สุดได้ทุกจุดของการทำงานกับคนงาน ด้วย AMQP Agent คุณสามารถเริ่มเผยแพร่และบริโภคข้อความด้วยรหัสเพียงไม่กี่บรรทัด!
AMQP Agent ไม่ได้เขียนทับสิ่งใด ๆ ของ PHP-AMQPLIB และไม่เปลี่ยนคำศัพท์ที่เกี่ยวข้องกับฟังก์ชั่นของมัน มันง่ายขึ้นเท่านั้น นำชื่อของฟังก์ชั่นออกมาและขยายออกไปในบางสถานที่ นอกจากนี้ยังเพิ่มคุณสมบัติที่ดีบางอย่างเช่นผู้บังคับบัญชาผู้บังคับบัญชาการรอช่องสัญญาณแบบไดนามิกและวิธีการอำนวยความสะดวก
AMQP Agent ยังเสนอไคลเอนต์ RPC ที่ใช้เหตุการณ์ที่มีประสิทธิภาพและเซิร์ฟเวอร์ RPC สำหรับโครงการ IoT ของคุณ
การทำงานกับ AMQP Agent นั้นง่ายอย่าง:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();AMQP Agent เปิดเผยจำนวนคลาสคอนกรีตที่สามารถใช้โดยตรงและคลาสนามธรรมอื่น ๆ ที่สามารถขยายได้ Variants ชั้นเรียนทั้งสองนี้ยังมีส่วนย่อยของผู้ช่วย
| ระดับ | คำอธิบาย | API |
|---|---|---|
Abstractworker *A | คลาสนามธรรมที่ใช้ฟังก์ชั่นพื้นฐานของคนงาน | เอกสาร |
สำนักพิมพ์ *C*S | ชั้นเรียนที่เชี่ยวชาญในการเผยแพร่ ใช้เฉพาะวิธีการที่จำเป็นสำหรับผู้เผยแพร่ | เอกสาร |
ผู้บริโภค *C*S | ชั้นเรียนที่เชี่ยวชาญในการบริโภค ใช้เฉพาะวิธีการที่จำเป็นสำหรับผู้บริโภค | เอกสาร |
AbstractEndpoint *A | คลาสนามธรรมที่ใช้ฟังก์ชั่นพื้นฐานของจุดสิ้นสุด | เอกสาร |
clientendpoint *C | ชั้นเรียนที่เชี่ยวชาญในการร้องขอ ใช้เฉพาะวิธีการที่จำเป็นสำหรับลูกค้า | เอกสาร |
ServerEndpoint *C | ชั้นเรียนที่เชี่ยวชาญในการตอบสนอง ใช้เฉพาะวิธีการที่จำเป็นสำหรับเซิร์ฟเวอร์ | เอกสาร |
AmqpagentParameters *C*H | คลาสที่มีพารามิเตอร์ตัวแทน AMQP ทั้งหมดเป็นค่าคงที่ | เอกสาร |
ยูทิลิตี้ *C*H | คลาสที่มีฟังก์ชั่นผู้ช่วยเบ็ดเตล็ด | เอกสาร |
เหตุการณ์ *C*H | คลาสง่าย ๆ สำหรับการจัดการกิจกรรม (การส่งและการฟัง) | เอกสาร |
arrayproxy *C*H | คลาสที่มีวิธีการสำหรับการจัดการและการทำงานอาร์เรย์ | เอกสาร |
classproxy *C*H | คลาสที่มีวิธีการสำหรับวิธีการพร็อกซีการเรียกการจัดการคุณสมบัติและยูทิลิตี้คลาส | เอกสาร |
idgenerator *C*H | คลาสที่มีฟังก์ชั่นสำหรับการสร้าง ID และโทเค็นที่ไม่ซ้ำกัน | เอกสาร |
serializer *C*H | serializer ที่ยืดหยุ่นที่จะใช้ร่วมกับคนงาน | เอกสาร |
Logger *C*H | คลาสที่จะเขียนบันทึกการเปิดเผยวิธีการที่ทำงานแบบคงที่และในการสร้างอินสแตนซ์ | เอกสาร |
ซิงเกิลตัน *A*H | คลาสนามธรรมที่ใช้ฟังก์ชั่นพื้นฐานของซิงเกิลตัน | เอกสาร |
config *C*R | คลาสที่เปลี่ยนไฟล์การกำหนดค่าเป็นวัตถุ | เอกสาร |
ไคลเอนต์ *C*R | ชั้นเรียนส่งคืนทุกอย่างที่ตัวแทน AMQP มีให้ คอนเทนเนอร์บริการง่ายๆเพื่อพูด | เอกสาร |
ตัวอย่าง *A*H | คลาสนามธรรมที่ใช้เป็นการโทรกลับเริ่มต้นสำหรับผู้บริโภค | เอกสาร |
ดูเพิ่มเติมที่: Abstractworkersingleton, Publishersingleton, Consumerersingleton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMutationTrait, WorkerCommandTrait
*C Concrete: คลาสนี้เป็นคลาสคอนกรีตและสามารถสร้างอินสแตนซ์โดยตรง*A บทคัดย่อ: คลาสนี้เป็นคลาสนามธรรมและไม่สามารถสร้างอินสแตนซ์โดยตรง*H Helper: คลาสนี้เป็นคลาสผู้ช่วย ทางเลือกของบุคคลที่สามสามารถใช้งานได้อย่างอิสระแทน*R แนะนำ: คลาสนี้แนะนำให้ใช้เมื่อทำงานกับ AMQP Agent (แนวทางปฏิบัติที่ดีที่สุด)*S Singleton: คลาสนี้มีเวอร์ชันซิงเกิลตันผ่านการต่อท้ายชื่อคลาสด้วย Singleton และสามารถเรียกคืนได้ผ่าน *Singleton::getInstance() , IE Publisher -> PublisherSingletonหมายเหตุ: ซิงเกิลตันถือเป็นต่อต้านรูปแบบลองหลีกเลี่ยงให้มากที่สุดเท่าที่จะเป็นไปได้แม้ว่าจะมีกรณีการใช้งาน ใช้ Singletons เฉพาะในกรณีที่คุณรู้ว่าคุณกำลังทำอะไรอยู่
หากคุณต้องการเผยแพร่และบริโภคข้อความอย่างรวดเร็วทุกอย่างพร้อมและกำหนดค่าแล้ว AMQP Agent จะถูกส่งไปพร้อมกับการกำหนดค่าที่ผ่านการทดสอบซึ่งเป็นไปตามแนวทางปฏิบัติที่ดีที่สุด คุณสามารถนำเข้าคลาส Publisher และ/หรือคลาส Consumer ในไฟล์ของคุณและเขียนทับพารามิเตอร์ที่คุณต้องการ (ข้อมูลรับรอง RABBITMQ) ในภายหลังในอินสแตนซ์
หากคุณต้องการปรับแต่งและปรับแต่งการกำหนดค่า AMQP Agent ตามความต้องการที่แน่นอนของคุณมีงานให้ทำ คุณต้องจัดหาไฟล์กำหนดค่า (ดู: MAKS-AMQP-Agent-Config.php และให้ความสนใจกับความคิดเห็น) คุณไม่จำเป็นต้องจัดหาทุกอย่างคุณสามารถเขียนพารามิเตอร์ที่คุณต้องการเขียนทับได้เท่านั้น AMQP Agent นั้นฉลาดพอที่จะต่อท้ายการขาด พารามิเตอร์เหล่านี้ยังสามารถเขียนทับในภายหลังผ่านโน้ตการมอบหมายสาธารณะหรือการโทรต่อวิธีการ
ความจริง: AMQP Agent ใช้ชื่อพารามิเตอร์เดียวกันกับ PHP-AMQPLIB ในไฟล์ config และในอาร์เรย์พารามิเตอร์ที่ส่งผ่านการเรียกเมธอด
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; หมายเหตุ: ชื่อคีย์ระดับแรกของอาร์เรย์ (คำต่อท้ายพร้อม Options ) มีความเฉพาะเจาะจงกับ AMQP Agent
ก่อนที่เราจะเริ่มต้นด้วยตัวอย่างเราต้องชี้แจงบางสิ่ง เป็นเรื่องที่ควรค่าแก่การกล่าวถึงตั้งแต่ต้นว่าด้วย AMQP Agent มีหลายวิธีในการเรียกคืนคนงานมีวิธีง่ายๆวิธีที่แนะนำและวิธีการขั้นสูงมากขึ้น หลังจากที่คุณเรียกคืนคนงานแล้วมันก็เหมือนดินคุณสามารถสร้างมันขึ้นมาในแบบที่คุณต้องการ การออกแบบแบบแยกส่วนนี้รองรับความต้องการของคุณอย่างสง่างามขับเคลื่อนไปยังรหัสฐานที่ปรับขนาดได้และทำให้ทุกคนมีความสุข
new วิธีนี้ต้องใช้พารามิเตอร์ผ่านผ่านตัวสร้างการโทรวิธีหรือการกำหนดทรัพย์สินสาธารณะPublisherSingleton::getInstance() วิธีนี้ต้องใช้พารามิเตอร์ผ่านผ่านวิธี getInstance() การโทรวิธีหรือการกำหนดทรัพย์สินสาธารณะClient วิธีนี้ยังทำให้รหัสอ่านได้มากขึ้นเนื่องจากพารามิเตอร์ถูกดึงออกมาจากการกำหนดค่าที่ผ่าน // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();ความจริง: เมื่อจัดหาพารามิเตอร์ให้เฉพาะพารามิเตอร์ที่คุณต้องการ AMQP Agent ฉลาดพอที่จะต่อท้ายการขาด
คำแนะนำ: คุณสามารถทำให้ตัวสร้างหนักที่เขียนในตัวอย่างข้างต้นง่ายขึ้นหากคุณใช้ get($className) ในอินสแตนซ์ของคลาส Client หลังจากให้ไฟล์กำหนดค่ากับพารามิเตอร์ที่คุณต้องการ
หมายเหตุ: ดูเอกสาร AMQP Agent สำหรับคำอธิบายทั้งหมดของวิธีการ อ้างถึงเอกสาร RabbitMQ และ PHP-AMQPLIB สำหรับคำอธิบายทั้งหมดของพารามิเตอร์
ในตัวอย่างเหล่านี้คุณจะเห็นว่าคุณจะทำงานกับตัวแทน AMQP ในสถานการณ์จริงได้อย่างไร
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();ความจริง: คุณสามารถสร้างรหัสในตัวอย่างขั้นสูงของผู้เผยแพร่/ผู้บริโภคได้มากขึ้นหากคุณทำการเปลี่ยนแปลงพารามิเตอร์ทั้งหมดในไฟล์กำหนดค่าและส่งผ่านไปยังไคลเอนต์แทนที่จะเป็นค่าเริ่มต้น
คำแนะนำ: ฐานรหัสตัวแทน AMQP ได้รับการบันทึกไว้อย่างดีโปรดดูลิงค์นี้เพื่อดูคลาสและวิธีการทั้งหมด
AMQP Agent เป็นแพคเกจที่ได้รับอนุญาตจากแหล่งที่มาภายใต้ GNU LGPL V2.1 เนื่องจากใบอนุญาต PHP-AMQPLIB
ลิขสิทธิ์ (c) 2020 Marwan al-Soltany สงวนลิขสิทธิ์