Conveniently use producer consumer mode Use multi-threading + queues to support normal one producer multiple consumer modes to support priority consumption queue mode to support stream limit work mode
pip install thread_worker
Worker ordinary multi-threaded work
PriorWorker priority consumption queue work
LimitWorker current limit work
Initialization, the following parameters are available
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' consumer_func consumer function defaults to None. If added, it is considered that the consumer is started first and then the task is generated. Consumer_count controls how many consumers are there.
logger is to set log output
block is to control whether it is blocking mode. If set to True, then the work thread will continue to execute until the main process ends. Otherwise, when the queue is empty and the thread tasks are completed, the work will end.
timeout is the timeout for obtaining data from the queue. The default timeout is 5s. If the join is True, the work will take the join to False again and end the current work.
One is to run the consumer first and then generate it. It can continuously generate and consume it. It is suitable for passive agent scanning tasks or a main process controls the global and then generates tasks.
One is to generate all the tasks at once and hand them over to the consumer to run the task that is suitable for generating tests at once and then run it.
WorkWork put data of the instanceWork.is_end() method from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 ) Like the default Work, there is an additional limit_time parameter when creating it. By setting limit_time and consumer_count unit time, it controls how many works to execute within a unit time.
The default limit_time is 1s
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )The output is like this. Only a request will be sent every 3 seconds, that is, the frequency will be controlled.
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
In this scenario, passive agents or some main processes will be used in programs that run for a long time. The difference is that the Work.is_end() method is not required to block the block or set the block. The default is True.
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass