RPC

include: co/rpc.h.

coost 实现了一个基于协程的 RPC 框架,它内部使用 JSON 格式传输数据,与使用 protobuf 等二进制协议的 RPC 框架相比,它更加灵活,用起来更方便。

从 v3.0 开始,RPC 框架同时也支持 HTTP 协议,可以用 HTTP 的 POST 方法调用 RPC 服务。

#rpc::Service

class Service {
  public:
    Service() = default;
    virtual ~Service() = default;

    typedef std::function<void(Json&, Json&)> Fun;

    virtual const char* name() const = 0;
    virtual const co::map<const char*, Fun>& methods() const = 0;
};
  • 接口类,它表示一个 RPC service,一个 RPC server 中可以包含多个 service。
  • 方法 name() 返回 service 名,methods() 返回所有的 RPC 接口及其业务处理函数。
  • 用户不需要关心此类。

#rpc::Server

#Server::Server

Server();
  • 默认构造函数,用户不需要关心。

#Server::add_service

1. Server& add_service(rpc::Service* s);
2. Server& add_service(const std::shared_ptr<rpc::Service>& s);
  • 添加 service,1 中参数 s 必须是用 operator new 动态创建的。
  • 用户可以多次调用此方法,添加多个 service,不同 service 必须有不同的名字。

#Server::start

void start(
    const char* ip, int port,
    const char* url="/",
    const char* key=0, const char* ca=0
);
  • 启动 RPC server,此方法不会阻塞当前线程。
  • 参数 ip 是服务 ip,可以是 IPv4 或 IPv6 地址,参数 port 是服务端口。
  • 参数 url 是 HTTP 服务的 url,必须以 / 开头。
  • 参数 key 是存放 SSL private key 的 PEM 文件路径,参数 ca 是存放 SSL 证书的 PEM 文件路径,默认 key 和 ca 是 NULL,不启用 SSL。
  • 从 v3.0 开始,server 启动后就不再依赖于 rpc::Server 对象。

#Server::exit

void exit();
  • v2.0.3 新增。
  • 退出 RPC server,关闭 listening socket,不再接收新的连接。
  • 从 v3.0 开始,RPC server 退出后,之前已经建立的连接将在未来被重置。

#RPC server 示例

#定义 proto 文件

下面是一个简单的 proto 文件 hello_world.proto:

package xx

service HelloWorld {
    hello
    world
}
  • package 定义包名,在 C++ 中对应为 namespace。
  • service 定义一个 RPC service,该 service 提供 hello, world 两个方法。
  • 由于 RPC 请求及响应都是 JSON,不需要在协议文件中定义结构体。
  • 一个 proto 文件中最多只能定义一个 service。
v3.0.1 基于 flex 与 byacc 重写了 gen 工具,proto 语法上除了支持 service 定义,还支持结构体的定义,具体用法可以参考 j2s

#生成 service 代码

gen 是 coost 提供的代码生成工具,它可以生成 service 相关代码。

xmake -b gen             # 构建 gen
cp gen /usr/local/bin    # 将 gen 放到 /usr/local/bin 目录
gen hello_world.proto    # 生成代码

生成的文件 hello_world.h 如下:

// Autogenerated.
// DO NOT EDIT. All changes will be undone.
#pragma once

#include "co/rpc.h"

namespace xx {

class HelloWorld : public rpc::Service {
  public:
    typedef std::function<void(Json&, Json&)> Fun;

    HelloWorld() {
        using std::placeholders::_1;
        using std::placeholders::_2;
        _methods["HelloWorld.hello"] = std::bind(&HelloWorld::hello, this, _1, _2);
        _methods["HelloWorld.world"] = std::bind(&HelloWorld::world, this, _1, _2);
    }

    virtual ~HelloWorld() {}

    virtual const char* name() const {
        return "HelloWorld";
    }

    virtual const co::map<const char*, Fun>& methods() const {
        return _methods;
    }

    virtual void hello(Json& req, Json& res) = 0;

    virtual void world(Json& req, Json& res) = 0;

  private:
    co::map<const char*, Fun> _methods;
};

} // xx
  • 可以看到,HelloWorld 类继承于 rpc::Service,它已经实现了 rpc::Service 类中的 name()methods() 方法。
  • 用户只需要继承 HelloWorld 类,实现 hello 与 world 两个方法即可。

#业务实现

#include "hello_world.h"

namespace xx {

class HelloWorldImpl : public HelloWorld {
  public:
    HelloWorldImpl() = default;
    virtual ~HelloWorldImpl() = default;

    virtual void hello(Json& req, Json& res) {
        res = {
            { "result", {
                { "hello", 23 }
            }}
        };
    }

    virtual void world(Json& req, Json& res) {
        res = {
            { "error", "not supported"}
        };
    }
};

} // xx
  • 上面只是一个很简单的例子,实际应用中,一般需要根据 req 中的参数,进行相应的业务处理,然后填充 res。

#启动 RPC server

int main(int argc, char** argv) {
    flag::parse(argc, argv);

    rpc::Server()
        .add_service(new xx::HelloWorldImpl)
        .start("127.0.0.1", 7788, "/xx");

    for (;;) sleep::sec(80000);
    return 0;
}
start() 方法不会阻塞当前线程,因此需要写一个 for 循环,防止 main 函数退出。

#用 curl 调用 RPC 服务

在 v3.0 版本中,RPC 框架支持 HTTP 协议,因此可以用 curl 命令调用 RPC 服务:

curl http://127.0.0.1:7788/xx --request POST --data '{"api":"ping"}'
curl http://127.0.0.1:7788/xx --request POST --data '{"api":"HelloWorld.hello"}'
  • 上面用 curl 给 RPC 服务发送 POST 请求,参数为 JSON 字符串,需要提供一个 api 字段,指明调用的 RPC 方法。
  • "ping" 是 RPC 框架内置的方法,一般用于测试或发送心跳。
  • url 中 /xx 要与 RPC server 启动时指定的 url 保持一致。

#rpc::Client

#Client::Client

1. Client(const char* ip, int port, bool use_ssl=false);
2. Client(const Client& c);
  • 1, 参数 ip 可以是域名、IPv4 或 IPv6 地址;参数 port 是服务端口;参数 use_ssl 表示是否启用 SSL 传输,默认为 false。
  • 2, 拷贝构造函数。
rpc::Client 构建时,并不会立即建立连接。

#Client::~Client

Client::~Client();
  • 析构函数,关闭连接。

#Client::call

void call(const Json& req, Json& res);
  • 执行 RPC 请求,必须在协程中调用。
  • 参数 req 中必须带有 "api" 字段,该字段的值一般为 "service.method" 形式。
  • 参数 res 是 RPC 请求的响应结果。
  • 若 RPC 请求没有发送出去,或者没有收到服务端的响应,res 将不会被填充。
  • 此方法在发送 RPC 请求前,会检查连接状态,未连接时,先建立连接。

#Client::close

void close();
  • 关闭连接,多次调用此函数是安全的。

#Client::ping

void ping();
  • 给 RPC server 发送 ping 请求,一般用于测试或发送心跳。

#RPC client 示例

#直接使用 rpc::Client

DEF_bool(use_ssl, false, "use ssl if true");
DEF_int32(n, 3, "request num");

void client_fun() {
    rpc::Client c("127.0.0.1", 7788, FLG_use_ssl);

    for (int i = 0; i < FLG_n; ++i) {
        co::Json req = {
            {"api", "HelloWorld.hello"}
        };
        co::Json res;
        c.call(req, res);
        co::sleep(1000);
    }

    c.close();
}

go(client_fun);
  • 上面的例子中,client 每隔 1 秒向服务端发送一个 RPC 请求。

#使用连接池 co::pool

当客户端需要建立大量连接时,可以用 co::pool 管理这些连接。

std::unique_ptr<rpc::Client> proto;

co::pool pool(
    []() { return (void*) new rpc::Client(*proto); },
    [](void* p) { delete (rpc::Client*) p; }
);

void client_fun() {
    co::pool_guard<rpc::Client> c(pool);

    while (true) {
        c->ping();
        co::sleep(3000);
    }
}

proto.reset(new rpc::Client("127.0.0.1", 7788));

for (int i = 0; i < 8; ++i) {
    go(client_fun);
}
  • 上面的例子,使用 co::pool 保存客户端,多个协程可以共享这些客户端。
  • co::pool 的 ccb 使用拷贝构造的方式从 proto 复制一个客户端连接。

#配置项

coost 使用 co.flag 定义了 RPC 相关的配置项。

#rpc_conn_idle_sec

DEF_int32(rpc_conn_idle_sec, 180, "#2 connection may be closed if no data...");
  • rpc::Server 空闲连接超时时间,单位为秒。一个连接在此时间内没有收到任何数据,server 可能会关闭此连接。

#rpc_conn_timeout

DEF_int32(rpc_conn_timeout, 3000, "#2 connect timeout in ms");
  • rpc::Client 连接超时时间,单位为毫秒。

#rpc_log

DEF_bool(rpc_log, true, "#2 enable rpc log if true");
  • 是否打印 RPC 日志,默认为 true,rpc::Server 与 rpc::Client 会打印 RPC 请求与响应。

#rpc_max_idle_conn

DEF_int32(rpc_max_idle_conn, 128, "#2 max idle connections");
  • rpc::Server 最大空闲连接数,默认为 128,超过这个数量时,server 会关闭部分空闲连接。

#rpc_max_msg_size

DEF_int32(rpc_max_msg_size, 8 << 20, "#2 max size of rpc message, default: 8M");
  • RPC 消息的最大长度,默认为 8M。

#rpc_recv_timeout

DEF_int32(rpc_recv_timeout, 3000, "#2 recv timeout in ms");
  • RPC 接收超时时间,单位为毫秒。

#rpc_send_timeout

DEF_int32(rpc_send_timeout, 3000, "#2 send timeout in ms");
  • RPC 发送超时时间,单位为毫秒。