这是一个性能与易用性兼备的 C++ NIO 网络库,支持 C++14 及以上版本,并且跨三大主流平台.
底层的IO模型借鉴了 muduo 网络库的 one loop per thread 模型,该模型使得线程安全的API封装更为高效且简单.
上层提供的API接口借鉴了字节跳动开源的 go 语言 NIO 网络库 netpoll,抽象出 Listener 和 Dialer 最终通过 EventLoop 来提供服务.
关于本库的使用方式可以查看 快速开始.
具体的使用范例可以查看 examples.
已经支持:
windows/linux/macos 各个平台分别用该平台最高性能的多路复用实现(IOCP实现的epoll/epoll/kqueue).EventLoop 实现异步调用,如果所在操作系统支持 sendfile 调用,则会调用该零拷贝调用而不是调用C标准库.echo 服务器,只需要下面的代码:
struct server
{
NETPOLL_TCP_MESSAGE(conn, buffer){conn->send(buffer->readAll());}
};
int main()
{
auto loop = netpoll::NewEventLoop();
auto listener = netpoll::tcp::Listener::New({6666});
listener.bind<server>();
loop.serve(listener);
}100^8s,可能只需要100*8byte) 具体可以查看博客:详细介绍.one loop per thread 模型非常匹配.lockfree_queuefuture 用于同步,这在客户端编程中可能很有用.netpoll::SignalTask::Register({SIGINT,SIGTERM});
netpoll::SignalTask::Add([](){
//do something
});将来会支持:
性能极高,暂时测试了asio(C++)/netty(java)/netpoll(go语言).
在Windows系统下测试了 asio/netty ,而下列图表测试基于Linux,而我不太会在Linux上部署Java程序,所以下列图表没有netty的性能表现.
不同并发情况下单个连接echo服务的平均延迟如下(avg):
服务端的底层模型架构:
Listener 的封装简化了 TcpServer 的使用,所有的调用如下:
namespace netpoll::tcp{
class Listener{
template<typename T>
void bind(Args &&...args); // 用于绑定任意类的对应方法到回调
template <typename T>
std::shared_ptr<T> instance() // 返回内部帮你创建的实例
static Listener New(const InetAddress &address,
const StringView &name = "tcp_listener",
bool reUseAddr = true, bool reUsePort = true); // 建立Listener实例
void enableKickoffIdle(size_t timeout); // 用于开启剔除空闲连接
}
}Dialer 的封装简化了 TcpClient 的使用,所用调用如下:
namespace netpoll::tcp{
class Dialer{
template<typename T>
void bind(Args &&...args); // 用于绑定任意类的对应方法到回调
template <typename T>
std::shared_ptr<T> instance() // 返回内部帮你创建的实例
static Dialer New(const InetAddress &address,
const StringView &name = "tcp_dialer"); // 建立Listener实例
void enableRetry(); // 在连接失败后重试
// 以下调用方均是为了在没有C++17的if constexpr情况下的替代品,否则应该直接使用bind
template <typename T, typename... Args>
static std::shared_ptr<T> Register(Args &&...args);
void onMessage(netpoll::RecvMessageCallback const &cb);
void onConnection(netpoll::ConnectionCallback const &cb);
void onConnectionError(netpoll::ConnectionErrorCallback const &cb);
void onWriteComplete(netpoll::WriteCompleteCallback const &cb);
}
}EventLoop 的负载均衡策略没有单独设置选项,用的都是 Round Robin .
EventLoop相关的API如下:
NewEventLoop(size_t threadNum=2,const netpoll::StringView&name="eventloop") : 建立 EventLoop 实例,可以设置 EventLoop 的线程数量.
serve 方法: 当你 New 出 Dialer 或 Listener 后可以通过该方法为他们提供服务.
serveAsDaemon 方法: 与 serve 方法的效果是一样的,但是是开一个新的线程去 serve ,不会阻塞当前线程.
enableQuit 方法: 允许主动调用循环退出的方法,默认无法主动退出所有的循环线程,配合 QuitAllEventLoop 方法使用.
QuitAllEventLoop 方法:退出所有的循环.
MessageBuffer是用于读写内核缓冲区数据的中间缓存,其实就是一个可变的缓冲区,实现起来也很比较简单,关于不同种类的缓冲区实现可以看看我的这篇文章:可变长与不可变长buffer的实现
这里就不对各种调用进行描述了,想要了解的可以直接看对应的头文件: netpoll/util/message_buffer.h .
简单讲一下实现该缓冲区的亮点:
扩容时避免resize的副作用导致无意义的赋值,同时也可以简化内存开辟和复制的操作.
void MessageBuffer::ensureWritableBytes(size_t len)
{
if (writableBytes() >= len) return;
// move readable bytes
if (m_head + writableBytes() >= (len + kBufferOffset))
{
std::copy(begin() + m_head, begin() + m_tail, begin() + kBufferOffset);
m_tail = kBufferOffset + (m_tail - m_head);
m_head = kBufferOffset;
return;
}
// create new buffer
size_t newLen;
if ((m_buffer.size() * 2) > (kBufferOffset + readableBytes() + len))
newLen = m_buffer.size() * 2;
else newLen = kBufferOffset + readableBytes() + len;
// Avoid the inefficiency of using resize
MessageBuffer newbuffer(newLen);
newbuffer.pushBack(*this);
swap(newbuffer);
}提供 readFd 方法,该方法读取对应 fd 的读缓冲区的数据到 MessageBuffer ,每次读取的内容都足够的大,比如MessageBuffer可写区域如果小于8kb,那么就启用8kb的备用读取缓存.
ssize_t MessageBuffer::readFd(int fd, int *retErrno)
{
char extBuffer[8192];
struct iovec vec[2];
size_t writable = writableBytes();
vec[0].iov_base = begin() + m_tail;
vec[0].iov_len = static_cast<int>(writable);
vec[1].iov_base = extBuffer;
vec[1].iov_len = sizeof(extBuffer);
const int iovcnt = (writable < sizeof extBuffer) ? 2 : 1;
ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0) { *retErrno = errno; }
else if (static_cast<size_t>(n) <= writable) { m_tail += n; }
else
{
m_tail = m_buffer.size();
pushBack({extBuffer, n - writable});
}
return n;
}TcpConnection类是一个抽象类,在使用时都是通过智能指针来使用.
该接口规范了下列功能:
发送数据(包括string/buffer/file/stream).
/**
* @brief Send some data to the peer.
*
* @param msg
* @param len
*/
virtual void send(StringView const &msg) = 0;
virtual void send(const MessageBuffer &buffer) = 0;
virtual void send(MessageBuffer &&buffer) = 0;
virtual void send(const std::shared_ptr<MessageBuffer> &msgPtr) = 0;
/**
* @brief Send a file to the peer.
*
* @param fileName in UTF-8
* @param offset
* @param length
*/
virtual void sendFile(StringView const &fileName, size_t offset = 0,
size_t length = 0) = 0;
/**
* @brief Send a stream to the peer.
*
* @param callback function to retrieve the stream data (stream ends when a
* zero size is returned) the callback will be called with nullptr when the
* send is finished/interrupted, so that it cleans up any internal data (ex:
* close file).
* @warning The buffer size should be >= 10 to allow http chunked-encoding
* data stream
*/
// (buffer, buffer size) -> size
// of data put in buffer
virtual void sendStream(
std::function<std::size_t(char *, std::size_t)> callback) = 0;获得连接信息,如地址信息或连接状态或接收数据的Buffer.
/**
* @brief New the local address of the connection.
*
* @return const InetAddress&
*/
virtual const InetAddress &localAddr() const = 0;
/**
* @brief New the remote address of the connection.
*
* @return const InetAddress&
*/
virtual const InetAddress &peerAddr() const = 0;
/**
* @brief Return true if the connection is established.
*
* @return true
* @return false
*/
virtual bool connected() const = 0;
/**
* @brief Return false if the connection is established.
*
* @return true
* @return false
*/
virtual bool disconnected() const = 0;
/**
* @brief New the buffer in which the received data stored.
*
* @return MsgBuffer*
*/
virtual MessageBuffer *getRecvBuffer() = 0;设置连接回调或状态(断开连接或设置tcpNoDelay/keepAlive).
/**
* @brief Set the high water mark callback
*
* @param cb The callback is called when the data in sending buffer is
* larger than the water mark.
* @param markLen The water mark in bytes.
*/
virtual void setHighWaterMarkCallback(const HighWaterMarkCallback &cb,
size_t markLen) = 0;
/**
* @brief Set the TCP_NODELAY option to the socket.
*
* @param on
*/
virtual void setTcpNoDelay(bool on) = 0;
/**
* @brief Shutdown the connection.
* @note This method only closes the writing direction.
*/
virtual void shutdown() = 0;
/**
* @brief Close the connection forcefully.
*
*/
virtual void forceClose() = 0;
/**
* @brief Call this method to avoid being kicked off by TcpServer, refer to
* the kickoffIdleConnections method in the TcpServer class.
*
*/
virtual void keepAlive() = 0;
/**
* @brief Return true if the keepAlive() method is called.
*
* @return true
* @return false
*/
virtual bool isKeepAlive() = 0;设置该连接的上下文,用于处理该连接的专用业务逻辑.
/**
* @brief Set the custom data on the connection.
*
* @param context
*/
void setContext(const Any &context) { m_context = context; }
void setContext(Any &&context) { m_context = std::move(context); }
/**
* @brief New mutable context
*
* @return Any
*/
Any &getContextRefMut() { return m_context; }
/**
* @brief New unmutable context
*
* @return Any
*/
Any const &getContextRef() const { return m_context; }
/**
* @brief Return true if the custom data is set by user.
*
* @return true
* @return false
*/
bool hasContext() const
{
#if __cplusplus >= 201703L
return m_context.has_value();
#else
return m_context.empty();
#endif
}
/**
* @brief Clear the custom data.
*
*/
void clearContext()
{
#if __cplusplus >= 201703L
m_context.reset();
#else
m_context.clear();
#endif
}该连接累计收发的数据量.
/**
* @brief Return the number of bytes sent
*
* @return size_t
*/
virtual size_t bytesSent() const = 0;
/**
* @brief Return the number of bytes received.
*
* @return size_t
*/
virtual size_t bytesReceived() const = 0;获取该连接对应的Loop.
/**
* @brief New the event loop in which the connection I/O is handled.
*
* @return EventLoop*
*/
virtual EventLoop *getLoop() = 0;暂时提供了下面两个用例: