This is a C++ NIO network library with both performance and ease of use. It supports C++14 and above and spans three major mainstream platforms.
The underlying IO model borrows one loop per thread model of the muduo network library, which makes thread-safe API encapsulation more efficient and simple.
The API interface provided by the upper layer borrows from ByteDance's open source go language NIO network library netpoll, and abstracts Listener and Dialer to finally provide services through EventLoop .
For how this library is used, you can check the Quick Start.
For specific usage examples, please see examples.
Already supported :
windows/linux/macos platforms are implemented using the platform's highest performance multiplexing implementation (epoll/epoll/kqueue implemented by IOCP).EventLoop to implement asynchronous calls. If the operating system supports sendfile calls, the zero-copy call will be called instead of calling the C standard library.echo server, you only need the following code: struct server
{
NETPOLL_TCP_MESSAGE (conn, buffer){conn-> send (buffer-> readAll ());}
};
int main ()
{
auto loop = netpoll::NewEventLoop ();
auto listener = netpoll::tcp::Listener::New ({ 6666 });
listener. bind <server>();
loop. serve (listener);
}100^8s , which may only require 100*8byte ). For details, you can check the blog: detailed introduction.one loop per thread model very well.lockfree_queuefuture for synchronization, which may be useful in client programming. netpoll::SignalTask::Register ({SIGINT,SIGTERM});
netpoll::SignalTask::Add ([](){
// do something
});Will support it in the future :
The performance is extremely high, and I temporarily tested asio(C++)/netty(java)/netpoll(go language).
I tested asio/netty on Windows system, and the following chart test is based on Linux, and I am not very good at deploying Java programs on Linux, so the following chart does not have the performance of netty.
The average latency of a single connected echo service under different concurrency situations is as follows (avg):
The underlying model architecture of the server side:
Listener encapsulation simplifies the use of TcpServer , all calls are as follows:
namespace netpoll ::tcp{
class Listener {
template < typename T>
void bind (Args &&...args); // 用于绑定任意类的对应方法到回调
template < typename T>
std::shared_ptr<T> instance () // 返回内部帮你创建的实例
static Listener New( const InetAddress &address,
const StringView &name = " tcp_listener " ,
bool reUseAddr = true , bool reUsePort = true ); // 建立Listener实例
void enableKickoffIdle ( size_t timeout); // 用于开启剔除空闲连接
}
} Dialer encapsulation simplifies the use of TcpClient , and the calls used are as follows:
namespace netpoll ::tcp{
class Dialer {
template < typename T>
void bind (Args &&...args); // 用于绑定任意类的对应方法到回调
template < typename T>
std::shared_ptr<T> instance () // 返回内部帮你创建的实例
static Dialer New( const InetAddress &address,
const StringView &name = " tcp_dialer " ); // 建立Listener实例
void enableRetry (); // 在连接失败后重试
// 以下调用方均是为了在没有C++17的if constexpr情况下的替代品,否则应该直接使用bind
template < typename T, typename ... Args>
static std::shared_ptr<T> Register (Args &&...args);
void onMessage (netpoll::RecvMessageCallback const &cb);
void onConnection (netpoll::ConnectionCallback const &cb);
void onConnectionError (netpoll::ConnectionErrorCallback const &cb);
void onWriteComplete (netpoll::WriteCompleteCallback const &cb);
}
} EventLoop 's load balancing strategy does not have separate settings, and they all use Round Robin .
The APIs related to EventLoop are as follows:
NewEventLoop(size_t threadNum=2,const netpoll::StringView&name="eventloop") : Create an EventLoop instance and set the number of threads for EventLoop.
serve method: After you Newly exit Dialer or Listener, you can provide services to them through this method.
serveAsDaemon method: The effect is the same as the serve method, but opening a new thread to serve will not block the current thread.
enableQuit method: allows active call to loop exit methods. By default, it cannot actively exit all loop threads and is used in conjunction with the QuitAllEventLoop method.
QuitAllEventLoop method: Exit all loops.
MessageBuffer is an intermediate cache for reading and writing kernel buffer data. It is actually a variable buffer, and it is also very simple to implement. For different types of buffer implementations, you can check out my article: Implementation of variable length and immutable length buffer
I will not describe various calls here. If you want to know, you can directly look at the corresponding header file: netpoll/util/message_buffer.h.
Let me briefly talk about the highlights of implementing this buffer:
When expanding, avoiding the side effects of resize, it can also simplify memory opening and copying operations.
void MessageBuffer::ensureWritableBytes ( size_t len)
{
if ( writableBytes () >= len) return ;
// move readable bytes
if (m_head + writableBytes () >= (len + kBufferOffset ))
{
std::copy ( begin () + m_head, begin () + m_tail, begin () + kBufferOffset );
m_tail = kBufferOffset + (m_tail - m_head);
m_head = kBufferOffset ;
return ;
}
// create new buffer
size_t newLen;
if ((m_buffer. size () * 2 ) > ( kBufferOffset + readableBytes () + len))
newLen = m_buffer. size () * 2 ;
else newLen = kBufferOffset + readableBytes () + len;
// Avoid the inefficiency of using resize
MessageBuffer newbuffer (newLen);
newbuffer. pushBack (* this );
swap (newbuffer);
}Provide the readFd method, which reads the data of the corresponding fd read buffer to the MessageBuffer. The content reads each time is large enough. For example, if the MessageBuffer writable area is less than 8kb, then the 8kb alternate read cache is enabled.
ssize_t MessageBuffer::readFd ( int fd, int *retErrno)
{
char extBuffer[ 8192 ];
struct iovec vec[ 2 ];
size_t writable = writableBytes ();
vec[ 0 ]. iov_base = begin () + m_tail;
vec[ 0 ]. iov_len = static_cast < int >(writable);
vec[ 1 ]. iov_base = extBuffer;
vec[ 1 ]. iov_len = sizeof (extBuffer);
const int iovcnt = (writable < sizeof extBuffer) ? 2 : 1 ;
ssize_t n = :: readv (fd, vec, iovcnt);
if (n < 0 ) { *retErrno = errno; }
else if ( static_cast < size_t >(n) <= writable) { m_tail += n; }
else
{
m_tail = m_buffer. size ();
pushBack ({extBuffer, n - writable});
}
return n;
}The TcpConnection class is an abstract class that is used through smart pointers when used.
This interface specifies the following functions:
Send data (including string/buffer/file/stream).
/* *
* @brief Send some data to the peer.
*
* @param msg
* @param len
*/
virtual void send (StringView const &msg) = 0;
virtual void send ( const MessageBuffer &buffer) = 0;
virtual void send (MessageBuffer &&buffer) = 0;
virtual void send ( const std::shared_ptr<MessageBuffer> &msgPtr) = 0;
/* *
* @brief Send a file to the peer.
*
* @param fileName in UTF-8
* @param offset
* @param length
*/
virtual void sendFile (StringView const &fileName, size_t offset = 0 ,
size_t length = 0 ) = 0;
/* *
* @brief Send a stream to the peer.
*
* @param callback function to retrieve the stream data (stream ends when a
* zero size is returned) the callback will be called with nullptr when the
* send is finished/interrupted, so that it cleans up any internal data (ex:
* close file).
* @warning The buffer size should be >= 10 to allow http chunked-encoding
* data stream
*/
// (buffer, buffer size) -> size
// of data put in buffer
virtual void sendStream (
std::function<std:: size_t ( char *, std:: size_t )> callback) = 0;Get connection information, such as address information or connection status or a buffer that receives data.
/* *
* @brief New the local address of the connection.
*
* @return const InetAddress&
*/
virtual const InetAddress & localAddr () const = 0;
/* *
* @brief New the remote address of the connection.
*
* @return const InetAddress&
*/
virtual const InetAddress & peerAddr () const = 0;
/* *
* @brief Return true if the connection is established.
*
* @return true
* @return false
*/
virtual bool connected () const = 0;
/* *
* @brief Return false if the connection is established.
*
* @return true
* @return false
*/
virtual bool disconnected () const = 0;
/* *
* @brief New the buffer in which the received data stored.
*
* @return MsgBuffer*
*/
virtual MessageBuffer * getRecvBuffer () = 0;Set the connection callback or status (disconnect or set tcpNoDelay/keepAlive).
/* *
* @brief Set the high water mark callback
*
* @param cb The callback is called when the data in sending buffer is
* larger than the water mark.
* @param markLen The water mark in bytes.
*/
virtual void setHighWaterMarkCallback ( const HighWaterMarkCallback &cb,
size_t markLen) = 0;
/* *
* @brief Set the TCP_NODELAY option to the socket.
*
* @param on
*/
virtual void setTcpNoDelay ( bool on) = 0;
/* *
* @brief Shutdown the connection.
* @note This method only closes the writing direction.
*/
virtual void shutdown () = 0;
/* *
* @brief Close the connection forcefully.
*
*/
virtual void forceClose () = 0;
/* *
* @brief Call this method to avoid being kicked off by TcpServer, refer to
* the kickoffIdleConnections method in the TcpServer class.
*
*/
virtual void keepAlive () = 0;
/* *
* @brief Return true if the keepAlive() method is called.
*
* @return true
* @return false
*/
virtual bool isKeepAlive () = 0;Sets the context of the connection to handle the dedicated business logic for the connection.
/* *
* @brief Set the custom data on the connection.
*
* @param context
*/
void setContext ( const Any &context) { m_context = context; }
void setContext (Any &&context) { m_context = std::move (context); }
/* *
* @brief New mutable context
*
* @return Any
*/
Any & getContextRefMut () { return m_context; }
/* *
* @brief New unmutable context
*
* @return Any
*/
Any const & getContextRef () const { return m_context; }
/* *
* @brief Return true if the custom data is set by user.
*
* @return true
* @return false
*/
bool hasContext () const
{
# if __cplusplus >= 201703L
return m_context. has_value ();
# else
return m_context. empty ();
# endif
}
/* *
* @brief Clear the custom data.
*
*/
void clearContext ()
{
# if __cplusplus >= 201703L
m_context. reset ();
# else
m_context. clear ();
# endif
}The connection accumulates the amount of data sent and received by the data.
/* *
* @brief Return the number of bytes sent
*
* @return size_t
*/
virtual size_t bytesSent () const = 0;
/* *
* @brief Return the number of bytes received.
*
* @return size_t
*/
virtual size_t bytesReceived () const = 0;Get the corresponding Loop for this connection.
/* *
* @brief New the event loop in which the connection I/O is handled.
*
* @return EventLoop*
*/
virtual EventLoop * getLoop () = 0;The following two use cases are temporarily provided: