プロデューサーの消費者モードを使用して、マルチスレッド +キューを使用して通常の1つのプロデューサーの複数の消費者モードをサポートして、優先消費キューモードをサポートしてストリーム制限ワークモードをサポートします
pip install thread_worker
Worker通常のマルチスレッド作品PriorWorker優先順位消費キュー作業LimitWorker現在の制限作業
初期化、次のパラメーターが利用可能です
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' Consumer_func Consumer機能はデフォルトではありません。追加すると、消費者が最初に開始され、その後タスクが生成されると見なされます。 Consumer_countは、何人の消費者がいるかを制御します。
ロガーは、ログ出力を設定することです
ブロックは、ブロッキングモードかどうかを制御することです。 trueに設定すると、メインプロセスが終了するまで作業スレッドは実行され続けます。それ以外の場合、キューが空で、スレッドタスクが完了すると、作業が終了します。
タイムアウトは、キューからデータを取得するためのタイムアウトです。デフォルトのタイムアウトは5秒です。結合が真の場合、作業は再びfalseに参加し、現在の作業を終了します。
1つは、最初に消費者を実行してから生成することです。継続的に生成して消費することができます。パッシブエージェントのスキャンタスクに適しているか、メインプロセスがグローバルを制御し、タスクを生成します。
1つは、すべてのタスクを一度に生成し、それらを消費者に引き渡して、一度にテストを生成するのに適したタスクを実行してから実行することです。
Work例Workを通じて消費者機能にインスタンスのデータputWork.is_end()メソッドを介してプログラムをブロックします from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )デフォルトの作業と同様に、作成時に追加のlimit_timeパラメーターがあります。 limit_timeとconsumer_count単位時間を設定することにより、単位時間内に実行する作業の数を制御します。
デフォルトのlimit_timeは1秒です
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )出力はこのようなものです。 3秒ごとにリクエストのみが送信されます。つまり、周波数は制御されます。
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
このシナリオでは、パッシブエージェントまたはいくつかの主要なプロセスが、長い間実行されるプログラムで使用されます。違いは、work.is_end()メソッドがブロックをブロックしたり、ブロックを設定したりする必要がないことです。デフォルトは真です。
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass