Buffer de anel de consumidor único com vários produtores de bloqueio (MPSC), que suporta operações de intervalo contíguas e que podem ser convenientemente usadas para passagem de mensagens. A implementação é escrita em C11 e distribuída sob a licença BSD de 2 cláusulas.
int ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, size_t length)
rbuf é um ponteiro para o objeto de tampão de anel opaco; O chamador é responsável por alocar o espaço para esse objeto. Normalmente, o objeto seria alocado dinamicamente se o uso de threads ou reservado em uma memória compartilhada bloqueada se estiver usando processos. O tamanho da alocação para o objeto deve ser obtido usando a função ringbuf_get_sizes . Retorna 0 no sucesso e -1 na falha. void ringbuf_get_sizes(unsigned nworkers, size_t *ringbuf_obj_size, size_t *ringbuf_worker_size)
ringbuf_t opaco e, opcionalmente, ringbuf_worker_t estruturas. O tamanho da estrutura ringbuf_t depende do número de trabalhadores, especificado pelo parâmetro nworkers . ringbuf_worker_t *ringbuf_register(ringbuf_t *rbuf, unsigned i)
i é um número de trabalhador, a partir de zero (ou seja, deve ser o que nworkers usados na configuração). No sucesso, retorna um ponteiro para um toque opaco ringbuf_worker_t estruturado, que faz parte do bloco de memória ringbuf_t . Na falha, retorna NULL . void ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *worker)
ssize_t ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *worker, size_t len)
ringbuf_produce deve ser chamada para indicar isso. As chamadas adquiridas aninhadas não são permitidas. void ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *worker)
size_t ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
ringbuf_release deve ser chamada para indicar isso. void ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
O consumidor retornará um bloco contíguo de faixas produzidas, ou seja, a chamada ringbuf_consume não retornará intervalos parciais. Se você pensa no intervalo produzido como uma mensagem, o consumidor retornará um bloco de mensagens, sempre terminando no limite da mensagem. Esse comportamento nos permite usar essa implementação de buffer de anel como uma fila de mensagens.
A implementação foi extensivamente testada em uma máquina X86 de 24 núcleos, consulte o teste de estresse para obter os detalhes sobre a técnica. Ele também fornece um exemplo de como o mecanismo pode ser usado para passagem de mensagens.
Essa implementação do buffer de anel sempre fornece uma gama contígua de espaço para o produtor. É alcançado por um investimento inicial se o intervalo solicitado não puder caber no final. A implicação disso é que a chamada ringbuf_acquire pode falhar se o intervalo solicitado for maior que a metade do tamanho do buffer. Portanto, pode ser necessário garantir que o tamanho do buffer do anel seja pelo menos duas vezes maior que o tamanho máximo da unidade de produção.
Deve-se notar também que uma das trade-offs desse design é que o consumidor atualmente realiza uma varredura O (n) na lista de produtores.
Produtores:
if (( w = ringbuf_register ( r , worker_id )) == NULL )
err ( EXIT_FAILURE , "ringbuf_register" )
...
if (( off = ringbuf_acquire ( r , w , len )) != -1 ) {
memcpy ( & buf [ off ], payload , len );
ringbuf_produce ( r , tls );
}Consumidor:
if (( len = ringbuf_consume ( r , & off )) != 0 ) {
process ( & buf [ off ], len );
ringbuf_release ( r , len );
}