thread_worker
1.0.0
方便的使用生產者消費者模式使用多線程+隊列支持正常的一個生產者多個消費者模式支持優先消費隊列模式支持限流work模式
pip install thread_worker
Worker普通多線程work
PriorWorker優先消費隊列work
LimitWorker限流work
初始化的話有如下參數
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' consumer_func 消費者函數默認為None 如果加上了就認為是先啟動消費者再生成任務consumer_count 是控制有多少個消費者
logger 是設置日誌輸出
block 是控制是否是阻塞模式如果設置為True 那麼work線程會一直執行直到主進程結束否則當隊列為空且線程任務都完成就將work結束
timeout 是從隊列獲取數據的超時時間默認是5s 超過5s 如果join是True則work重新再取join為False則結束當前work
一種是先把消費者跑起來然後再生成可以持續生成持續消費適合被動代理掃描任務或者說一個主進程控制全局然後生成任務
一種是一次性把任務都生成好交給消費者跑適合一次性就生成測試的任務然後跑就完事了
WorkWork put數據給消費者函數Work.is_end()方法阻塞程序 from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )與默認的Work一樣只是在創建的時候多了一個limit_time參數通過設置單位時間limit_time和consumer_count來控制單位時間內執行多少個work
默認limit_time是1s
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )輸出是這樣的每3s才會發一個請求即控制頻率
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
這裡的場景時候被動代理或者一些主進程會長期運行的程序中使用與上面不同的是不需要Work.is_end()方法阻塞block也不需要設置block默認則是True的
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass