Use convenientemente o modo de consumidor do produtor use multi-threading + filas para oferecer suporte a um produtor de um produtor de um produtor vários modos de consumidor para suportar o modo de fila de consumo de prioridade para suportar o modo de trabalho limite do fluxo
pip install thread_worker
Worker o trabalho multithread comum
Trabalho de consumo prioritário PriorWorker
LimitWorker LIMITE TRABALHO
Inicialização, os seguintes parâmetros estão disponíveis
def __init__ ( self , consumer_func , consumer_count , logger , block , timeout ):
'''
@params consumer_func consumer_func default None
@params consumer_count Work quantity
@params logger You can customize the logger. The default is logging
@params block You can set whether the thread ends when the queue is empty
By default, the thread will end when the queue is empty and the thread task is completed
You can set it to true to enable work to monitor tasks all the time
@params timeout The timeout for obtaining data in the queue is 5S by default, that is, if the data is not obtained within 5S, it will end
''' Consumer_func Função do consumidor Padrões para nenhum. Se adicionado, considera -se que o consumidor é iniciado primeiro e depois a tarefa é gerada. Consumer_Count controla quantos consumidores estão lá.
Logger é definir a saída de log
O bloco é controlar se é o modo de bloqueio. Se definido como true, o tópico de trabalho continuará sendo executado até que o processo principal termine. Caso contrário, quando a fila estiver vazia e as tarefas do encadeamento serão concluídas, o trabalho terminará.
O tempo limite é o tempo limite para obter dados da fila. O tempo limite padrão é 5s. Se a junção for verdadeira, o trabalho levará a junção para False novamente e encerrará o trabalho atual.
Um é executar o consumidor primeiro e depois gerá -lo. Pode gerá -lo e consumi -lo continuamente. É adequado para tarefas de varredura de agentes passivos ou um processo principal controla o global e gera tarefas.
Uma é gerar todas as tarefas de uma só vez e entregá -las ao consumidor para executar a tarefa adequada para gerar testes de uma só vez e depois executá -la.
WorkWork put dados da instânciaWork.is_end() from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# 不需要阻塞
w = Worker ( consumer , consumer_count = 1 , block = False )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 ) Como o trabalho padrão, existe um parâmetro limit_time adicional ao criá -lo. Ao definir limit_time e consumer_count , ele controla quantos trabalhos para executar dentro de um horário unitário.
O padrão limit_time é 1s
from thread_worker import LimitWorker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# limit_time 是limit_time 秒内有 consumer_count个消费者
w = LimitWorker ( consumer , consumer_count = 1 , block = False , limit_time = 3 )
for iid in range ( 10 ):
w . put ( iid )
# 这里通过 is_end 方法来阻塞程序
while not w . is_end ():
time . sleep ( 3 )A saída é assim. Somente uma solicitação será enviada a cada 3 segundos, ou seja, a frequência será controlada.
send: 2022-04-08 10:13:04 904000
https://www.baidu.com/0
send: 2022-04-08 10:13:07 904000
https://www.baidu.com/1
send: 2022-04-08 10:13:10 904000
https://www.baidu.com/2
send: 2022-04-08 10:13:13 904000
https://www.baidu.com/3
send: 2022-04-08 10:13:16 904000
https://www.baidu.com/4
send: 2022-04-08 10:13:19 904000
https://www.baidu.com/5
send: 2022-04-08 10:13:22 904000
https://www.baidu.com/6
send: 2022-04-08 10:13:25 904000
https://www.baidu.com/7
send: 2022-04-08 10:13:28 904000
https://www.baidu.com/8
send: 2022-04-08 10:13:31 904000
https://www.baidu.com/9
Nesse cenário, agentes passivos ou alguns processos principais serão usados em programas que são executados por um longo tempo. A diferença é que o método work.is_end () não é necessário para bloquear o bloco ou definir o bloco. O padrão é verdadeiro.
from thread_worker import Worker
import requests
import time
def consumer ( iid ):
url = "https://www.baidu.com/{0}" . format ( iid )
resp = requests . get ( url = url )
print ( resp . request . url )
# block默认就是True的
w = Worker ( consumer , consumer_count = 1 )
for iid in range ( 10 ):
w . put ( iid )
# 手动阻塞
while True :
pass