Menggunakan mode konsumen produsen dengan mudah menggunakan multi-threading + antrian untuk mendukung normal satu mode konsumen beberapa produsen untuk mendukung mode antrian konsumsi prioritas untuk mendukung mode kerja batas aliran aliran
pip install thread_worker
Pekerja Multi-Threaded Worker
Pekerjaan PriorWorker konsumsi prioritas prioritas
LimitWorker batas kerja saat ini
Inisialisasi, parameter berikut tersedia
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' Fungsi konsumen konsumer_func Default untuk tidak ada. Jika ditambahkan, dianggap bahwa konsumen dimulai lebih dulu dan kemudian tugas dihasilkan. Consumer_count Mengontrol berapa banyak konsumen di sana.
Logger adalah mengatur output log
Blok adalah untuk mengontrol apakah itu mode pemblokiran. Jika diatur ke True, maka utas kerja akan terus mengeksekusi sampai proses utama berakhir. Kalau tidak, ketika antrian kosong dan tugas utas selesai, pekerjaan akan berakhir.
Timeout adalah batas waktu untuk mendapatkan data dari antrian. Batas waktu default adalah 5s. Jika gabungan itu benar, pekerjaan akan mengambil gabungan ke false lagi dan mengakhiri pekerjaan saat ini.
Salah satunya adalah menjalankan konsumen terlebih dahulu dan kemudian menghasilkannya. Itu dapat terus menghasilkan dan mengkonsumsinya. Ini cocok untuk tugas pemindaian agen pasif atau proses utama mengontrol global dan kemudian menghasilkan tugas.
Salah satunya adalah menghasilkan semua tugas sekaligus dan menyerahkannya kepada konsumen untuk menjalankan tugas yang cocok untuk menghasilkan tes sekaligus dan kemudian menjalankannya.
WorkWork put data instanceWork.is_end() metode from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 ) Seperti pekerjaan default, ada parameter limit_time tambahan saat membuatnya. Dengan mengatur waktu satuan limit_time dan consumer_count , ia mengontrol berapa banyak pekerjaan untuk dieksekusi dalam waktu satuan.
limit_time default adalah 1s
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )Outputnya seperti ini. Hanya permintaan yang akan dikirim setiap 3 detik, yaitu, frekuensi akan dikendalikan.
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
Dalam skenario ini, agen pasif atau beberapa proses utama akan digunakan dalam program yang berjalan untuk waktu yang lama. Perbedaannya adalah bahwa metode work.is_end () tidak diperlukan untuk memblokir blok atau mengatur blok. Standarnya benar.
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass