buttonRPC解析

buttonRPC简单解析

RPC

rpc主要模块:

  • 函数注册与查找
  • 序列化、反序列化
  • 网络传输

buttonRPC使用

Client端

1
2
3
4
5
6
7
8
buttonrpc client;
client.as_client("127.0.0.1", 5555);
client.set_timeout(2000);

int foo4r = client.call<int>("foo_4", 10, "buttonrpc", 100, (float)10.8).val();

PersonInfo dd = { 10, "buttonrpc", 170 };
dd = client.call<PersonInfo>("foo_5", dd, 120).val();

声明客户端,调用call接口,传入函数名称与相关参数,最后调用val函数获得返回值。

注意:

  1. 需要显式实例化call接口,传入具体的返回类型
  2. 调用call接口的实际返回值是value_t<R>,需要检查RPC调用是否成功

Server端

1
2
3
4
5
6
7
8
9
10
11
buttonrpc server;
server.as_server(5555);

server.bind("foo_1", foo_1);
server.bind("foo_2", foo_2);
server.bind("foo_3", std::function<int(int)>(foo_3));

ClassMem s;
server.bind("foo_6", &ClassMem::bar, &s);

server.run();

声明服务端,调用bind接口注册函数。注册成员函数时需要绑定对象地址。最后运行run函数即可开始服务。

总结

buttonRPC的client端需要制定返回值类型,并调用val方法获得真实返回值;server端的使用比较符合直觉,核心是std::function和std::bind的使用。

buttonRPC解析

Client端

value_t结构体

调用一个PRC,失败原因可能有很多,比如:

  • 调用超时(网络波动,服务端无响应)
  • 返回类型不正确
  • 返回消息解析不正确

因此,有必要封装一个返回类型的结构体:类成员是返回码、消息描述、真实返回值。

利用结构体存储类型,方便模版使用。值得注意是,value_t为Serializer声明友元,这是为了方便value_t的序列化处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template<typename T>
struct type_xx{ typedef T type; };

template<typename T>
class value_t {
public:
typedef typename type_xx<T>::type type;
typedef std::string msg_type;
typedef uint16_t code_type;

value_t() { code_ = 0; msg_.clear(); }
bool valid() { return (code_ == 0 ? true : false); }
int error_code() { return code_; }
std::string error_msg() { return msg_; }
type val() { return val_; }

void set_val(const type& val) { val_ = val; }
void set_code(code_type code) { code_ = code; }
void set_msg(msg_type msg) { msg_ = msg; }

friend Serializer& operator >> (Serializer& in, value_t<T>& d) {
in >> d.code_ >> d.msg_;
if (d.code_ == 0) {
in >> d.val_;
}
return in;
}
friend Serializer& operator << (Serializer& out, value_t<T> d) {
out << d.code_ << d.msg_ << d.val_;
return out;
}
private:
code_type code_;
msg_type msg_;
type val_;
};

Serializer

Serializer是序列化器,其中m_byteorder用于标识大小端序,StreamBuffer继承了vector,实际使用时m_curpos标识了已经取过的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class StreamBuffer : public vector<char>
{
public:
StreamBuffer(){ m_curpos = 0; }
StreamBuffer(const char* in, size_t len){
m_curpos = 0;
insert(begin(), in, in+len);
}
~StreamBuffer(){ };

void reset(){ m_curpos = 0; }
const char* data(){ return &(*this)[0]; }
const char* current(){ return&(*this)[m_curpos]; }
void offset(int k){ m_curpos += k; }
bool is_eof(){ return (m_curpos >= size()); }
void input( char* in, size_t len){ insert(end(), in, in+len); }
int findc(char c){
iterator itr = find(begin()+m_curpos, end(), c);
if (itr != end())
{
return itr - (begin()+m_curpos);
}
return -1;
}

private:
unsigned int m_curpos;
};

class Serializer
{
public:
template<typename T>
Serializer &operator >> (T& i){
output_type(i);
return *this;
}

template<typename T>
Serializer &operator << (T i){
input_type(i);
return *this;
}

private:
int m_byteorder;
StreamBuffer m_iodevice;
};

input_type 和 output_type 逻辑比较粗暴,将类型T强制转化为char的数组,然后存进vector中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template<typename T>
inline void Serializer::input_type(T t)
{
int len = sizeof(T);
char* d = new char[len];
const char* p = reinterpret_cast<const char*>(&t);
memcpy(d, p, len);
byte_orser(d, len);
m_iodevice.input(d, len);
delete [] d;
}

template<typename T>
inline void Serializer::output_type(T& t)
{
int len = sizeof(T);
char* d = new char[len];
if (!m_iodevice.is_eof()){
memcpy(d, m_iodevice.current(), len);
m_iodevice.offset(len);
byte_orser(d, len);
t = *reinterpret_cast<T*>(&d[0]);
}
delete [] d;
}

call

call方法针对多种不同候选参数的做法也是简单粗暴,直接声明多参数的模版方法,然后将参数全部存入Serializer中,最后调用net_call进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename R>
value_t<R> call(std::string name);

template<typename R, typename P1>
value_t<R> call(std::string name, P1);

template<typename R, typename P1, typename P2>
value_t<R> call(std::string name, P1, P2);

template<typename R>
inline buttonrpc::value_t<R> buttonrpc::call(std::string name)
{
Serializer ds;
ds << name;
return net_call<R>(ds);
}

template<typename R, typename P1>
inline buttonrpc::value_t<R> buttonrpc::call(std::string name, P1 p1)
{
Serializer ds;
ds << name << p1;
return net_call<R>(ds);
}

net_call方法记住zmq进行网络消息发送,值得注意的是:进行网络传输时,数据从Serializer进入到zmq,最后也是从zmq再到Serializer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template<typename R>
inline buttonrpc::value_t<R> buttonrpc::net_call(Serializer& ds)
{
zmq::message_t request(ds.size() + 1);
memcpy(request.data(), ds.data(), ds.size());
if (m_error_code != RPC_ERR_RECV_TIMEOUT) {
send(request);
}
zmq::message_t reply;
recv(reply);
value_t<R> val;
if (reply.size() == 0) {
// timeout
m_error_code = RPC_ERR_RECV_TIMEOUT;
val.set_code(RPC_ERR_RECV_TIMEOUT);
val.set_msg("recv timeout");
return val;
}
m_error_code = RPC_ERR_SUCCESS;
ds.clear();
ds.write_raw_data((char*)reply.data(), reply.size());
ds.reset();

ds >> val;
return val;
}

Server端

Server端的任务有两个:

  • 注册函数
  • 接收消息并处理函数

其中注册函数使用map处理,比较简单;接收消息时,怎么讲函数参数一一还原呢?我们需要知道有多少个参数,每个参数的类型。

bind

我们先看函数注册:

  • 怎么应对多个参数的函数?
  • 怎么应对函数重载?
  • 怎么应对没有返回值的函数?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template<typename F>
void buttonrpc::bind( std::string name, F func )
{
m_handlers[name] = std::bind(&buttonrpc::callproxy<F>, this, func, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}

template<typename F>
void buttonrpc::callproxy(F fun, Serializer* pr, const char* data, int len )
{
callproxy_(fun, pr, data, len);
}

template<typename R>
void buttonrpc::callproxy_(std::function<R()> func, Serializer* pr, const char* data, int len)
{
typename type_xx<R>::type r = call_helper<R>(std::bind(func));

value_t<R> val;
val.set_code(RPC_ERR_SUCCESS);
val.set_val(r);
(*pr) << val;
}

template<typename R, typename P1>
void buttonrpc::callproxy_(std::function<R(P1)> func, Serializer* pr, const char* data, int len)
{
Serializer ds(StreamBuffer(data, len));
P1 p1;
ds >> p1; //通过模版参数完成类型自定义,将data中的参数还原具体类型
typename type_xx<R>::type r = call_helper<R>(std::bind(func, p1)); //绑定参数至函数

value_t<R> val;
val.set_code(RPC_ERR_SUCCESS);
val.set_val(r);
(*pr) << val; // 将函数返回值存到序列器中
}

注册的过程:bind -> callproxy -> callproxy_ ->call_helper

从上述函数声明中我们可以回答:

  • 多参数函数由多模版参数应对
  • 函数重载由std::function应对

call_helper

这个玩意儿的出现还是为了应对函数的返回值,因为有些函数没有返回值,但是我们该怎么判断一个函数签名有没有返回值呢?

这点上call_helper的解法是判断函数的类型是否是 std::function<void(若干参数)>

其中利用了std::enable_if

1
2
3
4
5
6
7
8
9
10
11
// help call return value type is void function
template<typename R, typename F>
typename std::enable_if<std::is_same<R, void>::value, typename type_xx<R>::type >::type call_helper(F f) {
f();
return 0;
}

template<typename R, typename F>
typename std::enable_if<!std::is_same<R, void>::value, typename type_xx<R>::type >::type call_helper(F f) {
return f(); // 有参函数已经通过bind绑定
}

std::enable_if<std::is_same<R, void>::value, typename type_xx<R>::type >::type 这一大段比较懵逼,拆分来看:

  • std::is_same<R, void>::value 如果R为void的话,这个value就是true

  • typename type_xx<R>::type 单纯返回R的类型

  • std::enable_if<bool, type>::type 如果bool填的是true,那么返回模版参数中的type

那么就能判断R是否为void类型。

那这个有个疑问:call_hleper里调用的都是func无参函数,其中的参数去哪里了?

回答:在调用callproxy_时,函数参数已经通过std::bind绑定到std::function中了。

run

回头再看call_就是非常简单,通过name获取func,然后再调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void buttonrpc::run()
{
// only server can call
if (m_role != RPC_SERVER) {
return;
}
while (1){
zmq::message_t data;
recv(data);
StreamBuffer iodev((char*)data.data(), data.size());
Serializer ds(iodev);

std::string funname;
ds >> funname;
Serializer* r = call_(funname, ds.current(), ds.size()- funname.size());

zmq::message_t retmsg (r->size());
memcpy (retmsg.data (), r->data(), r->size());
send(retmsg);
delete r;
}
}

Serializer* buttonrpc::call_(std::string name, const char* data, int len)
{
Serializer* ds = new Serializer();
if (m_handlers.find(name) == m_handlers.end()) {
(*ds) << value_t<int>::code_type(RPC_ERR_FUNCTIION_NOT_BIND);
(*ds) << value_t<int>::msg_type("function not bind: " + name);
return ds;
}
auto fun = m_handlers[name];
fun(ds, data, len); // data len 是 args
// 调用callproxy_
ds->reset();
return ds;
}
作者

Desirer

发布于

2025-08-10

更新于

2025-11-23

许可协议