include: co/co.h.
#基本概念
- 协程是运行于线程中的轻量级调度单位。
- 协程之于线程,类似于线程之于进程。
- 一个进程中可以存在多个线程,一个线程中可以存在多个协程。
- 协程所在的线程一般被称为调度线程。
- 协程发生 io 阻塞或调用 sleep 等操作时,调度线程会挂起此协程。
- 协程挂起时,调度线程会切换到其他等待中的协程运行。
- 协程的切换是在用户态进行的,比线程间的切换更快。
协程非常适合写网络程序,可以实现同步的编程方式,不需要异步回调,大大减轻了程序员的思想负担。
co 协程库实现的是一种类似 golang 的协程,有如下特性:
- 支持多线程调度,默认线程数为系统 CPU 核数。
- 共享栈,同一线程中的协程共用若干个栈(大小默认为 1MB),内存占用低,Linux 上的测试显示 1000 万协程只用了 2.8G 内存(仅供参考)。
- 协程创建后,始终在同一个线程中运行,而不会切换到其他线程。
- 各协程之间为平级关系,可以在任何地方(包括在协程中)创建新的协程。
co 协程库在 linux, mac, windows 等平台,分别基于 epoll, kqueue, iocp 实现。
co 协程库中 context 切换相关的代码,取自 ruki 的 tbox,而 tbox 则参考了 boost 的实现,在此表示感谢!
#协程 API
#v3.0 删除的 API
- co::init, v3.0 移除,从 co 3.0 开始,一般只需要在 main 函数开头调用
flag::init(argc, argv)
。 - co::exit, v3.0 移除。
- co::stop, v3.0 移除。
- co::all_schedulers, v3.0 重命名为 co::schedulers。
#go
void go(Closure* cb);
template<typename F> void go(F&& f);
template<typename F, typename P> void go(F&& f, P&& p);
template<typename F, typename T, typename P> void go(F&& f, T* t, P&& p);
-
此函数用于创建协程,与创建线程类似,需要指定一个协程函数。
-
第 1 个版本中,参数 cb 指向一个 Closure 对象,协程启动后会调用 Closure 中的
run()
方法。 -
第 2-4 个版本,将传入的参数打包成一个 Closure,然后调用第 1 个版本。
-
第 2 个版本中,参数 f 是任意可调用的对象,只要能调用
f()
或(*f)()
就行。 -
第 3 个版本中,参数 f 是任意可调用的对象,只要能调用
f(p)
,(*f)(p)
或(p->*f)()
就行。 -
第 4 个版本中,参数 f 是类中带一个参数的方法
void T::f(P)
,参数 t 是T
类型的指针,参数 p 是方法 f 的参数。 -
实际测试发现,创建
std::function
类型的对象开销较大,应尽量少用。 -
严格来说,go() 只是将 Closure 分配到一个调度线程中,真正创建协程是由调度线程完成的。但从用户的角度看,逻辑上可以认为 go() 创建了协程。
-
示例
go(f); // void f();
go(f, 7); // void f(int);
go(&T::f, &o); // void T::f(); T o;
go(&T::f, &o, 3); // void T::f(int); T o;
// lambda
go([](){
LOG << "hello co";
});
// std::function
std::function<void()> x(std::bind(f, 7));
go(x);
go(&x); // Ensure that x is alive when the coroutine is running.
#DEF_main
这个宏用于定义 main 函数,并将 main 函数中的代码也放到协程中运行。DEF_main 内部已经调用 flag::init(argc, argv)
进行初始化,用户无需再次调用。
- 示例
DEF_main(argc, argv) {
go([](){
LOG << "hello world";
});
co::sleep(100);
}
#co::coroutine
void* coroutine();
- 返回当前的 coroutine 指针,若在非协程中调用此函数,则返回值是 NULL。
- 此函数的返回值,可作为 co::resume() 的参数,用于唤醒协程。
#co::resume
void resume(void* p);
- 唤醒指定的协程,参数
p
是 co::coroutine() 的返回值。 - 此函数是线程安全的,可在任意地方调用。
#co::yield
void yield();
- 挂起当前协程,必须在协程中调用。
- 此函数配合 co::coroutine() 与 co::resume(),可以手动控制协程的调度,详情参考 test/yield.cc。
#co::scheduler
Scheduler* scheduler();
- 返回当前线程的 scheduler 指针,如果当前线程不是调度线程,返回值是 NULL。
- 此函数一般在协程中调用,用于获取当前协程所在的 scheduler。
#co::schedulers
const co::vector<Scheduler*>& schedulers();
- 返回 Scheduler 列表的引用,一个 Scheduler 对应一个调度线程。
#co::next_scheduler
Scheduler* next_scheduler();
-
此函数返回下一个 Scheduler 指针。
-
go(...)
实际上等价于co::next_scheduler()->go(...)
。 -
示例
// 创建在同一个线程中运行的协程
auto s = co::next_scheduler();
s->go(f1);
s->go(f2);
#co::scheduler_num
int scheduler_num();
-
返回 scheduler 的数量,此函数常用于实现一些协程安全的数据结构。
-
示例
co::vector<T> v(co::scheduler_num());
void f() {
// get object for the current scheduler
auto& t = v[co::scheduler_id()];
}
go(f);
#co::scheduler_id
int scheduler_id();
- 返回当前线程的 scheduler id,这个值是 0 到
co::scheduler_num()-1
之间的值。如果当前线程不是调度线程,返回值是 -1。 - 此函数一般在协程中调用,用于获取当前协程所在的 scheduler id。
#co::coroutine_id
int coroutine_id();
- 此函数返回当前协程的 id,不同协程有不同的 id。
- 此函数一般在协程中调用,在非协程中调用时,返回值是 -1。
- 协程 id 与 scheduler id 有着简单的线性对应关系。假设有 4 个 scheduler,id 分别是 0, 1, 2, 3,这些 scheduler 内的协程 id 分别是:
4k (0, 4, 8, ...)
4k + 1 (1, 5, 9, ...)
4k + 2 (2, 6, 10, ...)
4k + 3 (3, 7, 11, ...)
#co::sleep
void sleep(uint32 ms);
- 让当前协程睡一会儿,参数 ms 是时间,单位是毫秒。
- 此函数一般在协程中调用,在非协程中调用相当于
sleep::ms(ms)
。
#co::timeout
bool timeout();
- 此函数判断之前的 IO 操作是否超时。用户在调用
co::recv()
等带超时时间的函数后,可以调用此函数判断是否超时。 - 此函数必须在协程中调用。
#代码示例
// print scheduler id and coroutine id every 3 seconds
void f() {
while (true) {
LOG << "s: " << co::scheduler_id() << " c: " << co::coroutine_id();
co::sleep(3000);
}
}
int main(int argc, char** argv) {
flag::init(argc, argv);
FLG_cout = true; // also log to terminal
for (int i = 0; i < 32; ++i) go(f);
while (true) sleep::sec(1024);
return 0;
}
#协程化的 socket API
co 提供了常用的协程化的 socket API,以支持基于协程的网络编程。
大部分 API 形式上与原生的 socket API 保持一致,这样可以减轻用户的学习负担,熟悉 socket 编程的用户可以轻松上手。
这些 API 大部分需要在协程中使用,它们在 I/O 阻塞或调用 sleep 等操作时,调度线程会挂起当前协程,切换到其他等待中的协程运行,调度线程本身并不会阻塞。借助这些 API,用户可以轻松的实现高并发、高性能的网络程序。
#术语约定(必看)
阻塞
在描述 co 中的一些 socket API 时,会用到阻塞一词,如 accept, recv,文档中说它们会阻塞,是指当前的协程会阻塞,而当前的调度线程并不会阻塞(可以切换到其他协程运行)。用户看到的是协程,而不是调度线程,因此从用户的角度看,它们是阻塞的。实际上,这些 API 内部使用 non-blocking socket,并不会真的阻塞,只是在 socket 上没有数据可读或者无法立即写入数据时,调度线程会挂起当前进行 I/O 操作的协程,当 socket 变为可读或可写时,调度线程会重新唤起该协程,继续 I/O 操作。
non-blocking socket
co 中的 socket API 必须使用 non-blocking socket,在 windows 平台还要求 socket 支持 overlapped I/O,win32 API 创建的 socket 默认都支持 overlapped I/O,用户一般不需要担心这个问题。为了叙述方便,这里约定文档中说到 non-blocking socket 时,同时也表示它在 windows 上支持 overlapped I/O。
#co::socket
sock_t socket(int domain, int type, int proto);
sock_t tcp_socket(int domain=AF_INET);
sock_t udp_socket(int domain=AF_INET);
- 创建 socket。
- 第 1 个函数形式上与原生 API 完全一样,在 linux 系统可以用
man socket
查看参数详情。 - 第 2 个函数创建一个 TCP socket。
- 第 3 个函数创建一个 UDP socket。
- 参数 domain 一般是 AF_INET 或 AF_INET6,前者表示 ipv4,后者表示 ipv6。
- 这些函数返回一个 non-blocking socket。发生错误时,返回值是 -1,可以调用
co::error()
,co::strerror()
获取错误信息。
#co::accept
sock_t accept(sock_t fd, void* addr, int* addrlen);
- 在指定 socket 上接收客户端连接,参数 fd 是之前调用 listen() 监听的 non-blocking socket,参数 addr 与 addrlen 用于接收客户端的地址信息,
*addrlen
的初始值是 addr 所指向 buffer 的长度。如果用户不需要客户端地址信息,可以将 addr 与 addrlen 设置为 NULL。 - 此函数必须在协程中调用。
- 此函数会阻塞,直到有新的连接进来,或者发生错误。
- 此函数成功时返回一个 non-blocking socket,发生错误时返回 -1,可以调用
co::error()
,co::strerror()
获取错误信息。
#co::bind
int bind(sock_t fd, const void* addr, int addrlen);
- 给 socket 绑定 ip 地址,参数 addr 与 addrlen 是地址信息,与原生 API 相同。
- 此函数成功时返回 0,否则返回 -1,可以调用
co::error()
,co::strerror()
获取错误信息。
#co::close
int close(sock_t fd, int ms=0);
- 关闭 socket。
- 在 2.0.0 及之前的版本中,此函数必须在进行 I/O 操作的线程中调用。从 2.0.1 版本开始,此函数可以在协程或非协程中调用。
- 参数 ms > 0 时,先调用
co::sleep(ms)
将当前协程挂起一段时间,再关闭 socket。一般只在 server 端设置 > 0 的参数,可以在一定程度上缓解非法的网络攻击。 - 此函数内部已经处理了
EINTR
信号,用户无需考虑。 - 此函数成功时返回 0,否则返回 -1,可以调用
co::error()
,co::strerror()
获取错误信息。
#co::connect
int connect(sock_t fd, const void* addr, int addrlen, int ms=-1);
- 在指定 socket 上创建到指定地址的连接,参数 fd 必须是 non-blocking 的,参数 addr 与 addrlen 是地址信息,参数 ms 是超时时间,单位为毫秒,默认为 -1,永不超时。
- 此函数必须在协程中调用。
- 此函数会阻塞,直到连接完成,或者超时、发生错误。
- 此函数成功时返回 0,超时或发生错误返回 -1,用户可以调用 co::timeout() 判断是否超时,调用
co::error()
,co::strerror()
获取错误信息。
#co::listen
int listen(sock_t fd, int backlog=1024);
- 监听指定的 socket,参数 fd 是已经调用 bind() 绑定 ip 及端口的 socket。
- 此函数成功时返回 0,否则返回 -1,可以调用
co::error()
,co::strerror()
获取错误信息。
#co::recv
int recv(sock_t fd, void* buf, int n, int ms=-1);
- 在指定 socket 上接收数据,参数 fd 必须是 non-blocking 的,参数 buf 是用于接收数据的 buffer,参数 n 是 buffer 长度,参数 ms 是超时时间,单位为毫秒,默认为 -1,永不超时。
- 此函数必须在协程中调用。
- 在 Windows 平台,此函数只适用于 TCP 等 stream 类型的 socket。
- 此函数会阻塞,直到有数据进来,或者超时、发生错误。
- 此函数成功时返回接收的数据长度(可能小于 n),对端关闭连接时返回 0,超时或发生错误返回 -1,用户可以调用 co::timeout() 判断是否超时,调用
co::error()
,co::strerror()
获取错误信息。
#co::recvn
int recvn(sock_t fd, void* buf, int n, int ms=-1);
- 在指定 socket 上接收指定长度的数据,参数 fd 必须是 non-blocking 的,参数 buf 是用于接收数据的 buffer,参数 n 是要接收数据的长度,参数 ms 是超时时间,单位为毫秒,默认为 -1,永不超时。
- 此函数必须在协程中调用。
- 此函数会阻塞,直到 n 字节的数据全部接收完,或者超时、发生错误。
- 此函数成功时返回 n,对端关闭连接时返回 0,超时或发生错误返回 -1,用户可以调用 co::timeout() 检查是否超时,调用
co::error()
,co::strerror()
获取错误信息。
#co::recvfrom
int recvfrom(sock_t fd, void* buf, int n, void* src_addr, int* addrlen, int ms=-1);
- 与 recv() 类似,只是可以用参数 src_addr 与 addrlen 接收源地址信息,
*addrlen
的初始值是 src_addr 所指向 buffer 的长度,如果用户不需要源地址信息,可以将 addr 与 addrlen 设置为 NULL。 - 一般建议只用此函数接收 UDP 数据,对于 TCP 数据,建议用 recv() 或 recvn()。
#co::send
int send(sock_t fd, const void* buf, int n, int ms=-1);
- 向指定 socket 上发送数据,参数 fd 必须是 non-blocking 的,参数 buf 与 n 是要发送的数据及长度,参数 ms 是超时时间,单位为毫秒,默认为 -1,永不超时。
- 此函数必须在协程中调用。
- 在 Windows 平台,此函数只适用于 TCP 等 stream 类型的 socket。
- 此函数会阻塞,直到 n 字节的数据全部发送完,或者超时、发生错误。
- 此函数成功时返回 n,超时或发生错误返回 -1,用户可以调用 co::timeout() 检查是否超时,调用
co::error()
,co::strerror()
获取错误信息。
#co::sendto
int sendto(sock_t fd, const void* buf, int n, const void* dst_addr, int addrlen, int ms=-1);
- 向指定的地址发送数据,当 dst_addr 为 NULL,addrlen 为 0 时,与 send() 等价。
- 一般建议只用此函数发送 UDP 数据,对于 TCP 数据,建议用 send()。
- fd 是 UDP socket 时,n 最大是 65507。
#co::shutdown
int shutdown(sock_t fd, char c='b');
- 此函数一般用于半关闭 socket,参数 c 为
'r'
时表示关闭读,为'w'
时表示关闭写,默认为'b'
,关闭读与写。 - 一般建议在进行 IO 操作的线程中调用此函数。
- 此函数成功时返回 0,否则返回 -1,可以调用
co::error()
,co::strerror()
获取错误信息。
#co::error
int& error();
- 返回当前的错误码。
- CO 中的 socket API 返回 -1 时,可以调用此函数获取错误码。
#co::strerror
const char* strerror(int err);
const char* strerror();
- 获取错误码对应的描述信息。此函数是线程安全的。
- 第 2 个版本获取当前错误的描述信息,等价于
strerror(co::error())
。
#———————————
#co::getsockopt
int getsockopt(sock_t fd, int lv, int opt, void* optval, int* optlen);
- 获取 socket option 信息,与原生 API 完全一样,man getsockopt 看详情。
#co::setsockopt
int setsockopt(sock_t fd, int lv, int opt, const void* optval, int optlen);
- 设置 socket option 信息,与原生 API 完全一样,man setsockopt 看详情。
#co::set_nonblock
void set_nonblock(sock_t fd);
- 给 socket 设置 O_NONBLOCK 选项。
#co::set_reuseaddr
void set_reuseaddr(sock_t fd);
- 给 socket 设置 SO_REUSEADDR 选项,一般 server 端的 listening socket 需要设置这个选项,防止 server 重启后 bind 失败。
#co::set_recv_buffer_size
void set_recv_buffer_size(sock_t fd, int n);
- 设置 socket 的接收缓冲区大小,必须在 socket 连接前调用此函数。
#co::set_send_buffer_size
void set_send_buffer_size(sock_t fd, int n);
- 设置 socket 的发送缓冲区大小,必须在 socket 连接前调用此函数。
#co::set_tcp_keepalive
void set_tcp_keepalive(sock_t fd);
- 给 socket 设置 SO_KEEPALIVE 选项。
#co::set_tcp_nodelay
void set_tcp_nodelay(sock_t fd);
- 给 socket 设置 TCP_NODELAY 选项。
#co::reset_tcp_socket
int reset_tcp_socket(sock_t fd, int ms=0);
- 重置 TCP 连接,与 co::close() 类似,但主动调用方不会进入 TIME_WAIT 状态。
- 一般只有 server 端会调用此函数,用于主动关闭客户端连接,同时避免进入 TIME_WAIT 状态。
#———————————
#co::init_ip_addr
bool init_ip_addr(struct sockaddr_in* addr, const char* ip, int port);
bool init_ip_addr(struct sockaddr_in6* addr, const char* ip, int port);
-
用 ip 及 port 初始化 sockaddr 结构。
-
第 1 个版本用于 ipv4 地址,第 2 个版本用于 ipv6 地址。
-
示例
union {
struct sockaddr_in v4;
struct sockaddr_in6 v6;
} addr;
co::init_ip_addr(&addr.v4, "127.0.0.1", 7777);
co::init_ip_addr(&addr.v6, "::", 7777);
#co::ip_str
fastring ip_str(const struct sockaddr_in* addr);
fastring ip_str(const struct sockaddr_in6* addr);
-
从 sockaddr 结构中获取 ip 字符串。
-
第 1 个版本用于 ipv4 地址,第 2 个版本用于 ipv6 地址。
-
示例
struct sockaddr_in addr;
co::init_ip_addr(&addr, "127.0.0.1", 7777);
auto s = co::ip_str(&addr); // s -> "127.0.0.1"
#co::to_string
fastring to_string(const struct sockaddr_in* addr);
fastring to_string(const struct sockaddr_in6* addr);
fastring to_string(const void* addr, int addrlen);
-
将 sockaddr 地址转换成
"ip:port"
形式的字符串。 -
第 1 个版本用于 ipv4 地址,第 2 个版本用于 ipv6 地址。
-
第 3 个版本根据 addrlen 选择调用版本 1 或版本 2。
-
示例
struct sockaddr_in addr;
co::init_ip_addr(&addr, "127.0.0.1", 7777);
auto s = co::to_string(&addr); // s -> "127.0.0.1:7777"
#co::peer
fastring peer(sock_t fd);
- 获取 peer 端的地址信息,返回值是
"ip:port"
形式的字符串。
#channel(co::Chan)
co::Chan
是一个模板类,它类似于 golang 中的 channel,用于在协程之间传递数据。
template <typename T> class Chan;
co::Chan
内部基于内存拷贝实现,模板参数 T 可以是内置类型、指针类型,或者拷贝操作具有简单的内存拷贝语义的结构体类型。简而言之,T 必须满足下述条件:对于 T 类型的两个变量或对象 a 与 b,a = b
等价于memcpy(&a, &b, sizeof(T))
。- 像
std::string
或 STL 中的容器类型,拷贝操作不是简单的内存拷贝,因此不能直接在 channel 中传递。
#Chan::Chan
explicit Chan(uint32 cap=1, uint32 ms=(uint32)-1);
Chan(Chan&& c);
Chan(const Chan& c);
- 第 1 个构造函数中,参数 cap 是内部队列的最大容量,默认是 1,参数 ms 是读写操作的超时时间,单位为毫秒,默认为 -1,永不超时。
- 第 2 个是 move 构造函数,可以将
co::Chan
放入 STL 容器中。 - 第 3 个是拷贝构造函数,仅将内部引用计数加 1。
#operator«
template <typename T>
void operator<<(const T& x) const;
- 写入操作,必须在协程中进行。
- 此方法会阻塞,直到写入操作完成或超时。
- 构造函数中设置了超时时间时,可以用
co::timeout()
判断是否超时。
#operator»
template <typename T>
void operator>>(T& x) const;
- 读取操作,必须在协程中进行。
- 此方法会阻塞,直到读取操作完成或超时。
- 构造函数中设置了超时时间时,可以用
co::timeout()
判断是否超时。
#代码示例
#include "co/co.h"
void f() {
co::Chan<int> ch;
go([ch]() { ch << 7; });
int v = 0;
ch >> v;
LOG << "v: " << v;
}
void g() {
co::Chan<int> ch(32, 500);
go([ch]() {
ch << 7;
if (co::timeout()) LOG << "write to channel timeout..";
});
int v = 0;
ch >> v;
if (!co::timeout()) LOG << "v: " << v;
}
DEF_main(argc, argv) {
f();
g();
return 0;
}
上述代码中的 channel 对象在栈上,而 CO 采用的是共享栈实现方式,一个协程栈上的数据可能被其他协程覆盖,协程间一般不能直接通过栈上的数据通信,因此代码中的 lambda 采用了按值捕获的方式,将 channel 拷贝了一份,传递到新建的协程中。channel 的拷贝操作只是将内部引用计数加 1,几乎不会对性能造成影响。
#协程同步事件(co::Event)
co::Event
是协程间的一种同步机制,它与线程中的 SyncEvent
类似。从 co 2.0.1 版本开始,co::Event 可以在线程、协程环境中混用。
#Event::Event
Event();
Event(Event&& e);
Event(const Event& e);
- 第 1 个是默认构造函数。
- 第 2 个是 move 构造函数,支持将 co::Event 放入 STL 容器中。
- 第 3 个是拷贝构造函数,仅将内部引用计数加 1。
#Event::signal
void signal() const;
- 产生同步信号,co::Event 变成同步状态,所有 waiting 状态的协程会被唤醒。
- 若 co::Event 当前并没有 waiting 状态的协程,则下一个调用 wait() 方法的协程会立即返回。
- 此方法可以在任何地方调用。
#Event::wait
void wait() const;
bool wait(uint32 ms) const;
- 等待同步信号,若 co::Event 当前是未同步状态,则调用的协程会进入 waiting 状态。
- 在 co 2.0.0 及之前的版本,此方法必须在协程中调用。从 2.0.1 版本开始,此方法可以在任何地方调用。
- 第 1 个版本会阻塞,直到 co::Event 变为同步状态。
- 第 2 个版本会阻塞,直到 co::Event 变为同步状态或超时,参数 ms 是超时时间,单位为毫秒。超时返回 false,正常返回 true。
#代码示例
co::Event ev;
// capture by value, as data on stack may be overwritten by other coroutines.
go([ev](){
ev.signal();
});
ev.wait(100); // wait for 100 ms
#waitgroup(co::WaitGroup)
co::WaitGroup
类似于 golang 中的 sync.WaitGroup
,可用于等待协程或线程的退出。
#WaitGroup::WaitGroup
explicit WaitGroup(uint32 n);
WaitGroup();
WaitGroup(WaitGroup&& wg);
WaitGroup(const WaitGroup& wg);
- 第 1 个构造函数,将内部计数器初始化为
n
。 - 第 2 个是默认构造函数,将内部计数器初始化为 0。
- 第 3 个是 move 构造函数,支持将 co::WaitGroup 放入 STL 容器中。
- 第 4 个是拷贝构造函数,仅将内部引用计数加 1。
#WaitGroup::add
void add(uint32 n=1) const;
- 将内部计数器加 n,n 默认值是 1。
- 此方法是线程安全的,可在任何地方调用。
#WaitGroup::done
void done() const;
- 将内部计数器减 1。
- 此方法是线程安全的,可在任何地方调用。
- 此方法通常在协程或线程函数结束时调用。
#WaitGroup::wait
void wait() const;
- 等待直到内部计数器的值变为 0。
#代码示例
#include "co/co.h"
DEF_main(argc, argv) {
co::WaitGroup wg;
wg.add(8);
for (int i = 0; i < 8; ++i) {
go([wg]() {
LOG << "co: " << co::coroutine_id();
wg.done();
});
}
wg.wait();
return 0;
}
#协程锁(co::Mutex)
co::Mutex
是协程中的互斥锁,与线程中的 Mutex
类似,只是需要在协程环境中使用。
#Mutex::Mutex
Mutex();
Mutex(Mutex&& m);
Mutex(const Mutex& m);
- 第 1 个是默认构造函数。
- 第 2 个是 move 构造函数,可以将 co::Mutex 放入 STL 容器中。
- 第 3 个是拷贝构造函数,仅将内部引用计数加 1。
#Mutex::lock
void lock() const;
- 获取锁,必须在协程中调用。
- 阻塞直到获得锁为止。
#Mutex::try_lock
bool try_lock() const;
- 获取锁,不会阻塞,成功获取锁时返回 true,否则返回 false。
- 此方法可以在任何地方调用,但一般是在协程中调用。
#Mutex::unlock
void unlock() const;
- 释放锁,可以在任何地方调用,但设计良好的程序,通常是由之前获得锁的协程调用。
#co::MutexGuard
#MutexGuard::MutexGuard
explicit MutexGuard(co::Mutex& m);
explicit MutexGuard(co::Mutex* m);
- 构造函数,调用 m.lock() 获取锁,参数 m 是
co::Mutex
类的引用或指针。
#MutexGuard::~MutexGuard
~MutexGuard();
- 析构函数,释放构造函数中获得的锁。
#代码示例
co::Mutex mtx;
int v = 0;
void f1() {
co::MutexGuard g(mtx);
++v;
}
void f2() {
co::MutexGuard g(mtx);
--v;
}
go(f1);
go(f2);
#协程池(co::Pool)
co::Pool
是一种通用的协程池,它是协程安全的,内部存储 void*
类型的指针,可以用作连接池、内存池或其他用途的缓存。
#Pool::Pool
Pool();
Pool(Pool&& p);
Pool(const Pool& p);
Pool(std::function<void*()>&& ccb, std::function<void(void*)>&& dcb, size_t cap=(size_t)-1);
-
第 1 个是默认构造函数,与第 4 个相比,ccb 与 dcb 为 NULL。
-
第 2 个是 move 构造函数,可以将 co::Pool 放入 STL 容器中。
-
第 3 个是拷贝构造函数,仅将内部引用计数加 1。
-
第 4 个构造函数中,参数 ccb 用于创建元素,参数 dcb 用于销毁元素,参数 cap 指定 pool 的最大容量,默认为 -1 不限容量。
-
注意参数 cap 并不是总容量,它是对单个线程而言,在 co::Pool 内部实现中,每个线程都有自己的 pool,如 cap 设置为 1024,调度线程有 8 个,则总容量是 8192。
-
当 dcb 为 NULL 时,cap 参数会被忽略,这是因为当元素个数超过最大容量时,co::Pool 需要用 dcb 销毁多余的元素。
-
示例
class T;
co::Pool p(
[]() { return (void*) new T; }, // ccb
[](void* p) { delete (T*) p; }, // dcb
);
#Pool::clear
void clear() const;
- 清空整个 co::Pool,可以在任何地方调用。
- 如果设置了 dcb,会用 dcb 销毁 pool 中的元素。
#Pool::pop
void* pop() const;
- 从 co::Pool 中取出一个元素,必须在协程中调用。
- co::Pool 为空时,若 ccb 不是 NULL,则调用 ccb 创建一个元素并返回,否则返回 NULL。
- 此方法是协程安全的,不需要加锁。
#Pool::push
void push(void* e) const;
-
将元素放回 co::Pool 中,必须在协程中调用。
-
参数 e 为 NULL 时,直接忽略。
-
由于每个线程在内部拥有自己的 pool,push() 与 pop() 方法应该在同一个线程中调用。
-
若 co::Pool 已经达到最大容量,且 dcb 不为 NULL,则直接调用
dcb(e)
销毁该元素。 -
此方法是协程安全的,不需要加锁。
-
示例
class Redis; // assume class Redis is a connection to the redis server
co::Pool p;
void f {
Redis* rds = (Redis*) p.pop(); // pop a redis connection
if (rds == NULL) rds = new Redis;
rds->get("xx"); // call get() method of redis
p.push(rds); // push rds back to co::Pool
}
go(f);
#Pool::size
size_t size() const;
- 返回当前线程的 pool 大小,必须在协程中调用。
#co::PoolGuard
co::PoolGuard
在构造时自动从 co::Pool 取出元素,析构时自动将元素放回 co::Pool。同时,它还重载了 operator->
,可以像智能指针一样使用它。
template<typename T, typename D=std::default_delete<T>>
class PoolGuard;
- 参数 T 是 co::Pool 中指针所指向的实际类型,参数 D 是 deleter,用于 delete
T*
类型的指针。
#PoolGuard::PoolGuard
explicit PoolGuard(co::Pool& p);
explicit PoolGuard(co::Pool* p);
- 构造函数,从 co::Pool 中取出一个元素,参数 p 是 co::Pool 类的引用或指针。
#PoolGuard::~PoolGuard
~PoolGuard();
- 析构函数,将构造函数中获取的元素,放回 co::Pool 中。
#PoolGuard::get
T* get() const;
- 获取从 co::Pool 中取出的指针。
#PoolGuard::operator->
T* operator->() const;
- 重载
operator->
,返回从 co::Pool 中取出的元素。
#PoolGuard::operator*
T& operator*() const;
- 重载
operator*
,返回 T 类的引用。
#PoolGuard::operator bool
explicit operator bool() const;
- 将 co::PoolGuard 转换为 bool 类型,若内部指针不是 NULL,返回 true,否则返回 false。
#PoolGuard::operator!
bool operator!() const;
- 判断内部指针是否为 NULL,为 NULL 时返回 true,否则返回 false。
#PoolGuard::operator==
bool operator==(T* p) const;
- 判断内部指针是否等于 p。
#PoolGuard::operator!=
bool operator!=(T* p) const;
- 判断内部指针是否不等于 p。
#PoolGuard::operator=
void operator=(T* p);
- 赋值操作,等价于
reset(p)
。
#PoolGuard::reset
void reset(T* p = 0);
- 重置内部指针,并调用
D()(x)
删除原先的指针。
#代码示例
co::Pool p(
[]() { return (void*) new string; }, // ccb
[](void* p) { delete (string*)p; } // dcb
);
void fs() {
co::PoolGuard<string> rds(p); // now rds can be used like a Redis* pointer.
rds->append("xx");
}
go(fs);
class Redis; // assume class Redis is a connection to the redis server
co::Pool p(
[]() { return (void*) new Redis; }, // ccb
[](void* p) { delete (Redis*) p; } // dcb
);
void f() {
co::PoolGuard<Redis> rds(p); // now rds can be used like a Redis* pointer.
rds->get("xx");//redis的方法
}
go(f);
上面的例子中,co::Pool 相当于 redis 连接池。如果使用 CLS 机制,一个协程一个连接,则 100 万协程需要建立 100 万连接,消耗较大。但使用 pool 机制,100 万协程可能只需要共用少量的连接。pool 机制比 CLS 更高效、更合理,这也是 CO 不支持 CLS 的原因。
#I/O 事件(co::IoEvent)
co::IoEvent
用于将非阻塞 I/O 转换为同步方式。用户在协程中对一个 non-blocking socket 进行 I/O 操作,当 socket 不可读或不可写时,用户调用 co::IoEvent 的 wait()
方法挂起协程,等待 I/O 事件;当 socket 变为可读或可写时,调度线程重新唤醒该协程,继续 I/O 操作。
co 1.x 版本并没有公开 co::IoEvent 类,只是在 co 内部使用,co 2.0 中将这个类公开,方便用户将三方网络库协程化。
#co::io_event_t
enum io_event_t {
ev_read = 1,
ev_write = 2,
};
- enum 类型,表示 I/O 事件类型,co::ev_read 表示读,co::ev_write 表示写。
#IoEvent::IoEvent
IoEvent(sock_t fd, io_event_t ev);
IoEvent(sock_t fd, int n=0); // for windows only
- 构造函数,linux 与 mac 平台只提供第 1 个版本,windows 平台还提供第 2 个版本。
- 第 1 个版本中,参数 fd 是一个 non-blocking socket,参数 ev 是 I/O 事件类型,只能是 co::ev_read 或 co::ev_write 中的一种。调用 wait() 方法会在 socket 上等待 ev 指定的 I/O 事件,wait() 成功返回时,需要用户调用 recv, send 等函数完成 I/O 操作。在 windows 平台,fd 必须是 TCP socket(对于 UDP,很难用 IOCP 模拟 epoll 或 kqueue 的行为)。
- 第 2 个版本仅适用于 windows,与第 1 个版本不同,fd 可以是 UDP socket,但用户需要手动调用 WSARecvFrom, WSASendTo 等函数向 IOCP 发送 overlapped I/O 请求,然后调用 wait() 方法,当 wait() 成功返回时,表示 IOCP 已经帮用户完成了 I/O 操作。具体的用法此处不详述,代码中有详细的注释,建议直接参考 co::IoEvent 的源码,以及 windows 上 co::accept, co::connect, co::recvfrom, co::sendto 的实现。
#IoEvent::~IoEvent
~IoEvent();
- 析构函数,从 epoll 或 kqueue 中移除之前注册的 I/O 事件。
#IoEvent::wait
bool wait(int ms=-1);
- 此方法等待 socket 上的 I/O 事件,参数 ms 是超时时间,单位为毫秒,默认为 -1,永不超时。
- 此方法阻塞,直到 I/O 事件到来,或者超时、发生错误。
- 此方法成功时返回 true,超时或发生错误时返回 false。用户可以用
co::timeout()
判断是否超时。
#代码示例
int recv(sock_t fd, void* buf, int n, int ms) {
CHECK(gSched) << "must be called in coroutine..";
co::IoEvent ev(fd, co::ev_read);
do {
int r = (int) CO_RAW_API(recv)(fd, buf, n, 0);
if (r != -1) return r;
if (errno == EWOULDBLOCK || errno == EAGAIN) {
if (!ev.wait(ms)) return -1;
} else if (errno != EINTR) {
return -1;
}
} while (true);
}
上面的例子是 co::recv
的实现,调用原生 recv 方法产生 EWOULDBLOCK 或 EAGAIN 错误时,用 co::IoEvent 等待读事件,wait() 正常返回时表示 socket 可读,继续调用原生 recv 方法完成读操作。
#协程中使用三方网络库
在协程中直接使用三方网络库时,有可能阻塞调度线程,导致调度线程无法正常工作。解决这个问题有两种方法,第一种是将三方库协程化,第二种是 hook 系统中的 socket API,下面分别介绍。
#协程化
协程化需要三方库提供非阻塞 API,利用 co::IoEvent 可以轻松将这些 API 转换为协程同步方式。
int recv(SSL* s, void* buf, int n, int ms) {
CHECK(co::scheduler()) << "must be called in coroutine..";
int r, e;
int fd = SSL_get_fd(s);
if (fd < 0) return -1;
do {
ERR_clear_error();
r = SSL_read(s, buf, n);
if (r > 0) return r; // success
if (r == 0) {
DLOG << "SSL_read return 0, error: " << SSL_get_error(s, 0);
return 0;
}
e = SSL_get_error(s, r);
if (e == SSL_ERROR_WANT_READ) {
co::IoEvent ev(fd, co::ev_read);
if (!ev.wait(ms)) return -1;
} else if (e == SSL_ERROR_WANT_WRITE) {
co::IoEvent ev(fd, co::ev_write);
if (!ev.wait(ms)) return -1;
} else {
DLOG << "SSL_read return " << r << ", error: " << e;
return r;
}
} while (true);
}
上面是将 openssl 中的 SSL_read 协程化的例子,整个过程比较简单,底层使用 non-blocking socket,在 SSL_read 产生 SSL_ERROR_WANT_READ
错误时,用 co::IoEvent 等待读事件,产生 SSL_ERROR_WANT_WRITE
错误时,用 co::IoEvent 等待写事件,wait() 正常返回时,表示 socket 可读或可写,继续调用 SSL_read 完成 I/O 操作。
目前,CO 已经成功将 openssl, libcurl 协程化。理论上,支持非阻塞 I/O 操作的三方网络库,都可以用与上面类似的方法协程化。
#系统 API hook
API hook 简单来说就是拦截系统 API 请求,如果发现该请求是在协程中,且使用 blocking socket,就将 socket 修改成 non-blocking 模式,然后利用 co::IoEvent 或 CO 中更底层的接口,等待 socket 上的 I/O 事件,I/O 事件到来时,再唤醒协程,调用原生的 socket api 完成 I/O 操作。
从 CO 2.0.1 开始,在 linux, mac 与 windows 平台均已支持 hook。
API hook 与协程化的区别在于:前者是将阻塞 API 转换成协程同步方式,后者是将非阻塞 API 转换成协程同步方式。协程同步方式是指协程可能会阻塞,从协程的角度看是同步的,但调度线程不会阻塞,它可以切换到其他协程运行。另外,它们的使用方式也是不同的,前者需要在协程中用阻塞的方式调用原生 API,后者则直接在协程中调用协程化的 API。
API hook 的好处在于,只需要 hook 系统中的少量 socket API,就可以在协程中使用所有提供阻塞 API 的三方库。协程化则需要为每个三方库各提供一套协程化的 API,但它比 API hook 性能更好,且更安全,可以避免由三方库的复杂性引起的一些问题。
#基于协程的网络编程模式
协程可以用同步的方式,实现高并发、高性能的网络程序。协程虽然会阻塞,但调度线程可以在大量的协程间快速切换,因此要实现高并发,只需要创建更多的协程即可。
以 TCP 程序为例,服务端一般采用一个连接一个协程的模式,为每个客户端连接创建新的协程,在协程中处理连接上的数据。客户端没必要一个连接一个协程,一般使用连接池,多个协程共用连接池中的连接。
#服务端网络模型
// recv or send data on the connection
void on_connection(int fd) {
while (true) {
co::recv(fd, ...); // recv request from client
process(...); // process the request
co::send(fd, ...); // send response to client
}
}
void server_fun() {
while (true) {
int fd = co::accept(...);
if (fd != -1) go(on_connection, fd);
}
}
go(server_fun);
- 服务端采用一个连接一个协程的模型。
- 在一个协程中,调用
co::accept()
接受客户端连接。 - 有连接到来时,创建一个新的协程,在协程中处理连接上的数据。
on_connection()
是处理连接的协程函数,接收、处理与发送数据,在该协程中以完全同步的方式进行,不需要任何异步回调。- 完整的实现可以参考 co 中的测试代码。
#客户端网络模型
void client_fun() {
while true {
if (!connected) co::connect(...); // connect to the server
co::send(...); // send request to the server
co::recv(...); // recv response from the server
process(...); // process the response
if (over) co::close(...); // close the connection
}
}
go(client_fun);
- 建立连接,发送、接收、处理数据,在协程中以完全同步的方式进行。
- 完整的实现可以参考 co 中的测试代码。
实际应用中,一般使用 co::Pool 作为连接池,以避免创建过多的连接:
co::Pool pool;
void client_fun() {
while true {
co::PoolGuard<Connection> conn(pool); // get a idle connection from the pool
conn->send(...); // send request to the server
conn->recv(...); // recv response from the server
process(...); // process the response
if (over) conn->close(...); // close the connection
}
}
go(client_fun);
- co::PoolGuard 构造时自动从 co::Pool 中获取一个空闲连接,析构时自动将该连接放加 co::Pool 中。
#配置
#co_debug_log
DEF_bool(co_debug_log, false, "#1 enable debug log for coroutine library");
- 打印协程相关的调试日志,默认为 false。
#co_sched_num
DEF_uint32(co_sched_num, os::cpunum(), "#1 number of coroutine schedulers, default: os::cpunum()");
- 协程调度线程的数量,默认为系统 CPU 核数。目前的实现中,这个值最大也是系统 CPU 核数。
#co_stack_size
DEF_uint32(co_stack_size, 1024 * 1024, "#1 size of the stack shared by coroutines, default: 1M");
- 协程栈大小,默认为 1M。
#disable_hook_sleep
DEF_bool(disable_hook_sleep, false, "#1 disable hook sleep if true");
- 禁止 hook sleep 相关的 API,默认为 false。
#hook_log
DEF_bool(hook_log, false, "#1 enable log for hook if true");
- 打印 hook 相关的日志,默认为 false。