편리하게 생산자 소비자 모드를 사용하여 멀티 스레딩 + 대기열을 사용하여 일반 1 생산자 다중 소비자 모드를 지원하여 우선 순위 소비 대기열 모드를 지원하여 스트림 제한 작업 모드를 지원합니다.
pip install thread_worker
Worker 평범한 멀티 스레드 작업
PriorWorker 우선 순위 소비 대기열 작업
LimitWorker Current Limit Work
초기화, 다음 매개 변수를 사용할 수 있습니다
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' Consumer_Func 소비자 기능 기본값은 없음. 추가하면 소비자가 먼저 시작된 다음 작업이 생성되는 것으로 간주됩니다. Consumer_Count는 소비자 수를 제어합니다.
로거는 로그 출력을 설정하는 것입니다
블록은 차단 모드인지 여부를 제어하는 것입니다. True로 설정되면 기본 프로세스가 끝날 때까지 작업 스레드가 계속 실행됩니다. 그렇지 않으면 큐가 비어 있고 스레드 작업이 완료되면 작업이 종료됩니다.
타임 아웃은 큐에서 데이터를 얻는 시간 초과입니다. 기본 시간 초과는 5S입니다. 가입이 사실이라면, 작업은 가입을 다시 거짓으로 가져 와서 현재 작업을 종료합니다.
하나는 먼저 소비자를 실행 한 다음 생성하는 것입니다. 지속적으로 생성하고 소비 할 수 있습니다. 수동 에이전트 스캔 작업에 적합하거나 주요 프로세스는 전역을 제어 한 다음 작업을 생성합니다.
하나는 한 번에 모든 작업을 생성하고 소비자에게 넘겨서 한 번에 테스트를 생성하는 데 적합한 작업을 실행 한 다음 실행하는 것입니다.
WorkWork 통해 소비자 기능에 인스턴스의 데이터를 put .Work.is_end() 메소드를 통해 프로그램 차단 from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 ) 기본 작업과 마찬가지로 생성 할 때 추가 limit_time 매개 변수가 있습니다. limit_time 및 consumer_count Unit Time을 설정하면 단위 시간 내에 실행할 작업의 수를 제어합니다.
기본값 limit_time 은 1s입니다
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )출력은 다음과 같습니다. 3 초마다 요청 만 전송됩니다. 즉, 주파수는 제어됩니다.
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
이 시나리오에서는 수동 에이전트 또는 일부 주요 프로세스가 오랫동안 실행되는 프로그램에 사용됩니다. 차이점은 Work.is_end () 메소드가 블록을 차단하거나 블록을 설정하는 데 필요하지 않다는 것입니다. 기본값은 사실입니다.
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass