Удобно использовать режим потребителя производителя. Используйте многопоточные очереди + очереди для поддержки нормального производителя нескольких режимов потребителей для поддержки режима очереди потребления приоритета для поддержки режима ограничения потока.
pip install thread_worker
Worker обычная многопоточная работа
Работа в очереди приоритета приоритета PriorWorker
LimitWorker ограничения
Инициализация, доступны следующие параметры
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' Consumer_func Consumer Function Function по умолчанию никто. Если добавлено, считается, что потребитель запускается сначала, а затем задача генерируется. Consumer_Count контролирует, сколько там потребителей.
Регистратор должен установить вывод журнала
Блок должен контролировать, является ли он режимом блокировки. Если установить на TRUE, то рабочий поток будет продолжать выполняться, пока основной процесс не закончится. В противном случае, когда очередь пуста и задачи потока выполнены, работа закончится.
Тайм -аут - это тайм -аут для получения данных из очереди. Тайм -аут по умолчанию составляет 5 с. Если присоединение верно, работа снова примет присоединение к False и завершит текущую работу.
Один из них - сначала запустить потребителя, а затем генерировать его. Он может постоянно генерировать и потреблять его. Он подходит для задач сканирования пассивного агента или основного процесса управляет глобальным, а затем генерирует задачи.
Одним из них является генерация всех задач сразу и передать их потребителю, чтобы запустить задачу, которая подходит для создания тестов одновременно, а затем запустить ее.
WorkWork put данные экземпляраWork.is_end() from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 ) Как и в работе по умолчанию, при его создании есть дополнительный параметр limit_time . Установив время limit_time и consumer_count Unit, он контролирует, сколько работ для выполнения в течение всего времени.
limit_time по умолчанию - 1 с
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )Вывод такой. Только запрос будет отправляться каждые 3 секунды, то есть частота будет контролироваться.
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
В этом сценарии пассивные агенты или некоторые основные процессы будут использоваться в программах, которые работают в течение длительного времени. Разница в том, что метод работы.is_end () не требуется для блокировки или установить блок. По умолчанию верно.
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass