功能模块设计(RPC、注册)
约 11864 个字 1105 行代码 预计阅读时间 53 分钟
服务端
RPC功能模块
服务描述类设计
根据前面对RPC功能的介绍,服务端实际上需要实现的功能就是查找客户端请求的服务是否存在,如果存在就调用该服务执行并将结果返回给客户端。接着考虑功能的设计思路:
客户端要请求服务,就必须知道当前服务的格式,对于服务端来说就需要约定当前可以提供的服务对应的格式,包括服务名称、每一个服务参数的名称和类型、服务的返回值和具体服务函数,其中,服务端需要告诉客户端的就是服务名称和每一个服务参数的名称和类型。从这一点可以看出,服务端需要对自己可以提供的服务进行描述,而服务端可能不止提供一个服务,所以除了描述以外,还需要对可以提供的服务进行管理,整个思想即为先描述再组织
根据这个思想,首先设计用于服务描述和管理的类,在这个类中需要包含服务名称(字符串)、服务参数与类型描述、服务返回值类型描述、服务业务函数。其中一个服务可能不止一个参数,而一个参数需要包含参数名称和类型说明,所以考虑使用std::pair
类型来描述一个参数,key
为参数名称,value
为参数类型,接着使用一个std::vector
来管理一个服务的所有的参数和类型描述,即:
C++ |
---|
| using params_desciption_t = std::pair<std::string, params_type>;
std::vector<params_desciption_t> params_; // 保存所有参数和对应的类型
|
接着,提供其他的成员组成一个类,并提供相应的构造函数:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | class ServiceDesc
{
public:
using ptr = std::shared_ptr<ServiceDesc>;
ServiceDesc(std::string &&method, handler_t &&handler, std::vector<params_desciption_t> &¶ms, params_type &&return_type)
: method_name_(std::move(method)), handler_(std::move(handler)), params_(params), return_type_(std::move(return_type))
{
}
private:
std::string method_name_; // 方法名
handler_t handler_; // 业务回调函数
std::vector<params_desciption_t> params_; // 保存所有参数和对应的类型
params_type return_type_; // 返回值类型
};
|
考虑到客户端发送给服务端的服务可能存在参数错误或者其他错误形式,还需要提供一个服务参数检查函数,在该函数内部检查客户端发送的每一个参数是否存在于服务端设置的参数描述集合中,如果存在就检查客户端指定的每一个参数类型是否与服务端描述的参数类型一致
在JSONCPP中提供了下面几种的常见类型:
- 布尔类型(Bool)
- 整数类型(Integral)
- 数值类型(Numeric)
- 字符串类型(String)
- 数组类型(Array)
- 对象类型(Object)
基于这些类型,单独设计一个函数来检查每一个客户端发送的参数和服务端指定的参数是否一致,为了方便表示参数类型,先提供一个枚举类描述上面的类型:
C++ |
---|
| enum class params_type
{
Bool = 0,
Integral,
Numeric,
String,
Array,
Object,
};
|
接着完善单个参数的检查函数:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | bool checkParamsType(const params_type &p, const Json::Value &val)
{
switch (p)
{
case params_type::Bool:
return val.isBool();
case params_type::Integral:
return val.isIntegral();
case params_type::Numeric:
return val.isNumeric();
case params_type::String:
return val.isString();
case params_type::Array:
return val.isArray();
case params_type::Object:
return val.isObject();
default:
break;
}
return false;
}
|
接着完善整个参数检查函数,步骤如下:
- 检查客户端发送的参数是否存在于服务端设置的参数描述集合中
- 检查客户端发送的每一个参数类型是否与服务端描述的参数类型一致
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | bool paramsCheck(const Json::Value ¶ms)
{
for (const auto &desc : params_)
{
// 判断字段是否存在
if (!params.isMember(desc.first))
{
LOG(Level::Warning, "指定的成员不存在:{}", desc.first);
return false;
}
// 判断字段类型和给定类型是否一致
// 注意需要判断的是指定的字段而不是直接params对象
if (!checkParamsType(desc.second, params[desc.first]))
{
LOG(Level::Warning, "指定的类型错误:{}", static_cast<int>(desc.second));
return false;
}
}
return true;
}
|
每一个服务描述类中都存在着服务业务函数,但是并没有提供调用接口,所以需要在服务描述类中添加该接口,但是该接口不仅仅是调用回调函数,还需要检查返回值类型是否正确,所以首先需要有一个检查返回值类型是否正确的函数,检查返回值类型的思路和检查单个参数的思路是一致的,所以直接调用上面的检查单个参数的函数:
Note
注意,这里提到的返回值类型不是服务函数的返回值类型,而是服务端服务的处理结果类型
C++ |
---|
| bool checkReturnType(params_type return_type, const Json::Value &val)
{
return checkParamsType(return_type, val);
}
|
接着,考虑为回调函数提供两个参数,第一个参数为服务参数的输入,第二个参数表示服务执行结果,对应地,调用接口也是两个参数,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13 | using handler_t = std::function<void(const Json::Value &, Json::Value &)>;
// 调用回调函数
bool callHandler(const Json::Value &input, Json::Value &output)
{
handler_(input, output);
if (!checkReturnType(return_type_, output))
{
LOG(Level::Warning, "返回值的类型错误:{}", static_cast<int>(return_type_));
return false;
}
return true;
}
|
服务描述类建造者设计
上面的服务描述类设计并没有提供修改每一个成员的接口,但是直接提供势必会存在执行服务的过程中可能有某一个线程修改成员内容,此时就会出现线程安全问题,在本次项目中考虑在执行服务之前就将每一个成员的值设置好,并且在执行服务过程中不允许修改,针对这个需求,基于考虑使用一个简易的建造者模式来完成,即通过方法设置每一个成员的值构建具体类的对象以隐藏具体的构造细节,这个简易的建造者模式与前面的简易工厂的区别就在于简易工厂只是创建对象,并没有设置对象成员的接口
要实现这个简易的建造者模式来构建服务描述类对象,可以考虑两种方案:
- 在服务描述类对象建造者中添加和服务描述类一模一样的成员,再调用设置成员的接口设置每一个成员的值,最后调用服务描述类的构造函数构建对象并返回该对象
- 在服务描述类中添加该类建造者的友元类,并且提供一个无参构造函数用于建造者创建一个无参对象,接着在建造者中设置每一个成员的值,最后返回该对象
两个方式都可以实现创建服务描述类对象,但是方式2破坏了封装性,本次项目中使用方式1
服务管理类设计
上面已经针对一个服务设计了服务描述类,但是一个服务描述类只能描述一个服务,所以需要一个服务管理类来管理多个服务描述类,这个服务管理类需要提供下面的功能:
- 添加服务
- 查找服务
- 删除服务
为了便于查找服务,本次考虑使用一个哈希表来保存服务名称和服务描述类对象指针的映射关系,即:
C++ |
---|
| std::unordered_map<std::string, ServiceDesc::ptr> service_manager_;
|
为了确保修改哈希表时的线程安全,需要使用一个互斥锁来保护服务管理类的成员
另外,还需要在服务描述类中提供一个获取服务名称的接口:
C++ |
---|
| // 获取服务名称
std::string getMethodName()
{
return method_name_;
}
|
接着,在类中提供功能函数,思路较为简单,此处不具体介绍:
服务功能类设计
上面已经完成了对服务的描述和管理,接下来就是实现服务端执行服务的流程。作为功能处理模块,为了便于服务的添加,本次不在当前模块中直接写死服务,而是提供一个服务添加函数,这个函数的参数就是一个服务描述类对象指针,函数内部负责将服务通过服务管理类将服务添加到服务映射表中。基于这个思路,这个类中需要有一个服务管理类对象指针,并在构造函数中初始化该对象,接着提供一个提加服务函数:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | class RpcRouter
{
public:
using ptr = std::shared_ptr<RpcRouter>;
RpcRouter()
: services_(std::make_shared<ServiceManager>())
{
}
// 注册服务
// 调用ServiceManager类的插入函数
void registerService(const ServiceDesc::ptr &s)
{
services_->insertService(s);
}
private:
ServiceManager::ptr services_;
};
|
接着提供一个针对RPC服务请求的处理函数,在这个函数中先对客户端请求服务进行查找,如果存在就检查客户端发送的服务参数和类型是否与服务端需要的一致,如果一致,就可以调用对应的服务回调函数处理,处理完毕后检查返回值类型是否正确,如果正确就是构建正常响应给客户端,否则构建错误响应给客户端
在上面的逻辑中,涉及到构建响应,包括正常响应和错误响应,所以将构建逻辑抽离封装为一个函数,在该函数中首先创建一个RPC响应对象,再根据前面的RPC响应结构设置相关字段,再调用连接对象的send
函数将响应发送给客户端,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12 | void buildRpcResponse(const base_connection::BaseConnection::ptr &con, request_message::RpcRequest::ptr &msg, const Json::Value &ret, public_data::RCode rcode)
{
// 构建RpcResponse对象并填充字段
auto rpc_resp = message_factory::MessageFactory::messageCreateFactory<response_message::RpcResponse>();
rpc_resp->setId(msg->getReqRespId());
rpc_resp->setMType(public_data::MType::Resp_rpc);
rpc_resp->setRCode(rcode);
rpc_resp->setResult(ret);
// 发送给客户端
con->send(rpc_resp);
}
|
接着,根据前面的思路实现功能函数:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | void handleRpcRequest(const base_connection::BaseConnection::ptr &con, request_message::RpcRequest::ptr &msg)
{
// 1. 查找请求服务是否存在
auto pos = services_->findService(msg->getMethod());
if (!pos)
{
LOG(Level::Warning, "请求的:{} 服务不存在", msg->getMethod());
buildRpcResponse(con, msg, Json::Value(), public_data::RCode::RCode_not_found_service);
}
// 2. 判断请求中提供的参数是否正确
if (!pos->paramsCheck(msg->getParams()))
{
LOG(Level::Warning, "请求的:{} 服务参数错误", msg->getMethod());
buildRpcResponse(con, msg, Json::Value(), public_data::RCode::RCode_invalid_params);
}
// 3. 调用ServiceManager类中的函数执行服务
Json::Value result;
bool ret = pos->callHandler(msg->getParams(), result);
if (!ret)
{
LOG(Level::Warning, "请求的:{} 服务返回值错误(内部错误)", msg->getMethod());
buildRpcResponse(con, msg, Json::Value(), public_data::RCode::RCode_internal_error);
}
// 4. 返回处理结果
buildRpcResponse(con, msg, result, public_data::RCode::RCode_fine);
}
|
至此,服务端的RPC功能模块完成
服务注册模块
服务注册模块功能回顾
在服务端部分,服务注册模块实际上扮演的角色就是服务注册中心的底层功能,根据前面的项目介绍,服务注册中心需要提供下面的功能:
- 服务注册:当一个服务启动时,需要向当前注册中心注册,确保该服务提供者可以被记录
- 服务发现:在服务注册完毕后,需要告诉客户端当前有哪些服务可以提供,即通知服务发现者
- 服务上线:当有一个新的服务端上线时,一旦注册到注册中心,就需要通知已经发现过当前服务器可以提供的服务对应的客户端
- 服务下线:当一个服务端下线时,注册中心需要通知已经发现过当前服务器可以提供的服务对应的客户端
服务提供者描述类设计
对于服务端来说,当有一个服务提供者注册时,服务提供者需要告诉注册中心自己的主机信息和所有可以提供的服务,那么对于注册中心来说,就需要对每一个服务提供者的信息进行管理,所以需要一个服务提供者描述类来描述每一个服务提供者。在本次项目中,一个服务提供者可能不止提供一个服务,而是多个服务,但是具体的实现可以不需要告诉注册中心,所以只需要用一个字符串数组来保存当前服务提供者可以提供的所有服务,接着,还需要提供一个连接字段表示当前服务提供者的连接信息,便于注册中心在所有服务提供者中查找指定的一个服务提供者,所以基本字段如下:
Note
为了更方便访问每一个服务提供者的字段,此处使用struct
而不是class
C++ |
---|
| // 服务提供者信息
struct ServiceProvider
{
using ptr = std::shared_ptr<ServiceProvider>;
base_connection::BaseConnection::ptr con_; // 当前提供者的连接信息
std::vector<std::string> methods_; // 当前提供者可以提供的所有服务
public_data::host_addr_t host_; // 当前提供者的主机信息
};
|
接着,提供一个用于构造服务提供者描述类对象的构造函数,该构造函数需要有两个参数,其中一个是提供者的连接信息,另外一个是自己的主机信息:
C++ |
---|
| ServiceProvider(const base_connection::BaseConnection::ptr &con, const public_data::host_addr_t &host)
: con_(con), host_(host)
{
}
|
最后,提供一个添加服务的函数,该函数的作用是在创建一个服务提供者描述类对象时可以将该服务提供者告诉注册中心的自己可以提供的服务添加到当前描述类对象的methods_
字段中,设计如下:
C++ |
---|
| std::mutex method_mtx_; // 用于服务管理的线程安全
void insertService(const std::string &method)
{
std::unique_lock<std::mutex> lock(method_mtx_);
methods_.push_back(method);
}
|
服务提供者管理类设计
上面已经完成了对一个服务提供者进行描述,但是一个注册中心可能不止有一个服务提供者,所以除了描述一个服务提供者以外,还需要对所有服务提供者进行管理,这就需要一个服务提供者管理类。在这个管理类中,主要实现对服务提供者描述类的增删查操作,为了便于找到每一个服务提供者,就需要用到服务提供者的连接信息,根据连接信息和服务提供者描述类对象,此处就可以使用哈希表建立二者之间的映射关系,如下:
C++ |
---|
| std::unordered_map<base_connection::BaseConnection::ptr, ServiceProvider::ptr> con_provider_; // 管理连接和提供者
|
接着,考虑到客户端请求的一个服务可能对应着对个服务提供者,所以还需要一张哈希表来管理服务名称和服务提供者集合的映射关系,如下:
C++ |
---|
| std::unordered_map<std::string, std::set<ServiceProvider::ptr>> providers_; // 指定服务的所有提供者,使用set方便删除
|
接下来设计对应的增删查函数。对于增加来说,就是在con_provider_
通过对应的连接查找当前服务提供者是否已经存在,如果不存在就创建一个服务提供者描述类对象,否则就直接使用该对象进行后续操作,对于这一步可以直接按照这里的逻辑进行设计,也可以利用insert
函数返回值的特点:存在返回已有的,否则返回新创建的。除了更新con_provider_
以外,还需要更新providers_
,这里就是根据服务提供者的可以提供的服务将当前服务提供者插入到对应的集合中,表示指定服务有了新的服务提供者。需要注意,上面的步骤需要保证线程安全,完成上面的步骤之后,还需要将当前服务提供者可以提供的服务插入到对应的描述类对象中,本次的设计思路是:每一次调用当前增加函数就只添加一个服务。整个函数的设计如下:
对于第二种插入写法来说,可以使用C++ 17中的try_emplace
函数,使用try_emplace
可以先查找key
是否存在再创建对象,insert
会先构造对象再去判断是否存在,二者返回值和解释一致。在可能重复插入的场景下可以使用try_emplace
,避免不必要的对象构造开销。优化如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | // 添加服务提供者
void insertProvider(const base_connection::BaseConnection::ptr &con, const std::string &method, const public_data::host_addr_t &host)
{
ServiceProvider::ptr sp;
{
// ...
auto pos = con_provider_.try_emplace(con, std::make_shared<ServiceProvider>(con, host));
sp = pos.first->second;
// ...
}
// 添加服务到当前提供者
sp->insertService(method);
}
|
Note
之所以不将服务添加的逻辑放在添加服务提供者逻辑所在代码块内部是因为服务添加的逻辑本身就已经进行了加锁,二次加锁可能会导致死锁问题
接着考虑删除函数,对于删除来说,首先通过连接对象在con_provider_
中查找对应的服务提供者对象指针,如果不存在就不执行删除,否则就继续后面的逻辑:找到该服务提供者可以提供的服务,根据这些服务找到每一个包含当前待删除的服务提供者的集合,从该集合中删除待删除的服务提供者,最后再将当前服务提供者从con_provider_
中删除。整个函数的设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | // 移除服务提供者
void removeProvider(const base_connection::BaseConnection::ptr &con)
{
std::unique_lock<std::mutex> lock(provider_mtx_);
auto pos = con_provider_.find(con);
if (pos == con_provider_.end())
{
LOG(Level::Warning, "不存在指定的提供者,删除失败");
return;
}
// 获取服务提供者,并根据其可以提供的方法将其从服务提供者和方法映射表中移除
ServiceProvider::ptr sp = pos->second;
std::vector<std::string> &methods = sp->methods_;
for (std::string &m : methods)
{
// 找到存储提供者的结构
auto &provider = providers_[m];
provider.erase(sp);
}
// 从管理连接和提供者的哈希表中删除
con_provider_.erase(con);
}
|
这里需要注意,在构建映射时,并不是直接使用服务描述类对象的普通指针,而是使用智能指针类型,这样可以保证键值对删除时,对象资源会被自动释放防止内存泄漏问题
最后考虑查找函数,本次查找分为两种查找:
- 查找指定连接对应的服务提供者
- 查找指定服务对应的服务提供者集合
对于第一种查找,直接通过连接对象在con_provider_
中查找对应的服务提供者即可,但是对于第二种查找,需要注意的是,在前面基础模块中设计ServiceResponse
类中的setHosts
函数时参数使用的是std::vector<host_addr_t>
,而本次实现时使用的是std::set<ServiceProvider::ptr>
,所以在查找时需要将std::set
中的每一个服务提供者描述对象的host_addr_t
成员插入到std::vector
中再返回。两个函数设计如下:
服务发现者描述类设计
根据前面对服务注册模块功能的介绍,除了需要服务提供者以外,还需要有对请求服务的一方进行描述,这便是服务发现者。对于服务发现者来说,与服务提供者比较类似,首先需要有当前服务发现者的连接对象,接着还需要管理当前服务发现者发现的所有方法,便于在某一个服务提供者断开或者上线时及时更新可以请求的服务提供者,所以服务发现者描述类基本设计如下:
C++ |
---|
| struct ServiceDiscoverer
{
using ptr = std::shared_ptr<ServiceDiscoverer>;
base_connection::BaseConnection::ptr con_; // 发现过服务的客户端
std::vector<std::string> methods_; // 客户端发现的服务
};
|
接着,提供一个构造函数用于使用连接对象构造一个服务发现者对象,如下:
C++ |
---|
| ServiceDiscoverer(const base_connection::BaseConnection::ptr &con)
: con_(con)
{
}
|
最后提供一个服务添加函数,用于表示向当前服务发现者添加已经发现的服务,设计如下:
C++ |
---|
| std::mutex method_mtx_; // 用于服务管理的线程安全
void insertService(const std::string &method)
{
std::unique_lock<std::mutex> lock(method_mtx_);
methods_.push_back(method);
}
|
服务发现者管理类设计
同样,提供一个对服务发现者进行管理的类,在该类中需要提供增删通知函数。首先,为了便于使用连接对象快速找到对应的服务发现者,需要有一个保存连接对象和服务发现者对象指针进行映射的哈希表:
C++ |
---|
| std::unordered_map<base_connection::BaseConnection::ptr, ServiceDiscoverer::ptr> con_discoverer_; // 管理连接和发现者
|
接着,为了便于通过具体的方法可以查找到所有的服务发现者用于在某一个服务上线或者下线时的通知以及移除服务发现者,还需要建立一张保存服务和服务发现者集合映射关系的哈希表:
C++ |
---|
| std::unordered_map<std::string, std::set<ServiceDiscoverer::ptr>> discovers_; // 发现指定服务的所有客户端
|
接下来设计三种函数,首先是增加函数,在该函数中,思路与添加服务提供者比较类似:先查找con_discoverer_
是否存在已有的服务提供者,如果存在就直接使用已有的,否则就创建并使用该对象,完成这一步之后,再通过具体的服务名称找到对应的服务发现者集合,将该服务发现者插入到该集合中。最后,将指定的服务插入到对应的服务发现者中即可。此处同样可以使用两种插入写法,具体实现如下:
接着实现删除函数,在该函数中,首先也是通过查找con_discoverer_
判断指定的服务发现者是否存在决定需要继续删除,如果存在,那么后面的逻辑为:根据待删除的服务发现者管理的方法,从每一个方法对应的发现者集合中移除当前发现者,再将发现者从连接和服务发现者映射表中移除。实现如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | // 移除发现者
void removeDiscoverer(const base_connection::BaseConnection::ptr &con)
{
std::unique_lock<std::mutex> lock(discover_mtx_);
auto pos = con_discoverer_.find(con);
if (pos == con_discoverer_.end())
{
LOG(Level::Info, "当前已经不存在任何发现者");
return;
}
ServiceDiscoverer::ptr sd = pos->second;
auto &methods = sd->methods_;
for (std::string &m : methods)
{
auto &discovers = discovers_[m];
discovers.erase(sd);
}
con_discoverer_.erase(con);
}
|
最后,对于服务发现管理来说,还需要提供服务上线和下线的通知功能,所谓的通知就是告诉发现指定服务的发现者哪一个服务提供者下线了,可以移除关于它的主机信息,下一次不能再请求,因为上线和下线通知只是消息类型不同,大体逻辑都是一致的,所以可以将这段逻辑抽离到单独的一个函数中完成。该函数具体的逻辑是:构建服务请求,根据参数传递的主机地址、消息类型和服务名称填充相关的请求字段,再根据服务名称找到具体的服务发现者集合,从该集合中拿到每一个服务发现者的连接对象进行消息发送。根据这个思路,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | void notify(const std::string &method, const public_data::host_addr_t &addr, public_data::ServiceOptype op)
{
std::unique_lock<std::mutex> lock(discover_mtx_);
auto pos = discovers_.find(method);
if (pos == discovers_.end())
{
LOG(Level::Warning, "当前服务:{}无提供者", method);
return;
}
// 构建服务发现请求并发送给所有客户端
auto service_msg = message_factory::MessageFactory::messageCreateFactory<request_message::ServiceRequest>();
service_msg->setId(uuid_generator::UuidGenerator::generate_uuid());
service_msg->setMethod(method);
service_msg->setHost(addr);
service_msg->setServiceOptype(op);
for (auto &d : pos->second)
d->con_->send(service_msg);
}
|
接着完善具体的上线和下线通知,注意消息类型的不同:
服务注册发现管理模块设计
上面已经分别实现了服务提供者的管理和服务发现者的管理,但是并没有提供一个处理请求和响应的类,在这个类中,需要判断客户端的消息类型(服务注册或者服务发现)来决定具体的服务管理操作,所以在当前类中,少不了的就是服务发现者管理类对象和服务提供者管理类对象,对应地,在创建当前类对象时直接对这两个对象进行实例化,整个类的基本结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | class ProviderDiscovererManager
{
public:
using ptr = std::shared_ptr<ProviderDiscovererManager>;
ProviderDiscovererManager()
: provider_manager_(std::make_shared<ServiceProviderManager>()),
discoverer_manager_(std::make_shared<ServiceDiscovererManager>())
{
}
private:
ServiceProviderManager::ptr provider_manager_;
ServiceDiscovererManager::ptr discoverer_manager_;
};
|
接着,在当前类中需要处理服务发现请求和服务注册请求的函数,在该函数中,需要根据Message基类对象中的消息类型判断具体处理哪一个请求,处理方式如下:
- 收到服务注册请求:调用服务提供者管理类中的添加函数将当前服务提供者提供的服务以及对应的主机信息添加到服务提供者管理对象中,再调用服务发现者的上线通知函数通知已经发现过具体服务的服务发现者,最后返回服务注册响应给服务注册客户端
- 收到服务发现请求:将当前服务发现者添加到服务发现管理者对象中,返回服务发现响应给服务发现客户端
正常情况下只需要处理上面的两种请求,但是为了以防万一,可以考虑针对其他请求发送错误的响应提示请求错误无法被处理
根据上面的思路,首先需要设计返回服务注册响应函数和服务发现响应函数
对于服务注册响应函数来说,根据前面项目介绍对服务注册响应的设计,并不需要给服务注册客户端返回特殊的结果,只需要设置对应的错误码和其他响应信息再将响应信息发送给客户端即可,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | void sendRegistryResponse(const base_connection::BaseConnection::ptr &con, const request_message::ServiceRequest::ptr &msg)
{
// 构建响应
auto service_resp = message_factory::MessageFactory::messageCreateFactory<response_message::ServiceResponse>();
// 获取主机信息
service_resp->setId(msg->getReqRespId());
// 设置方法和主机信息
service_resp->setMType(public_data::MType::Resp_service);
service_resp->setServiceOptye(public_data::ServiceOptype::Service_register);
service_resp->setRCode(public_data::RCode::RCode_fine);
con->send(service_resp);
}
|
对于服务发现响应函数来说,除了需要设置基本的响应字段以外,还需要通过服务提供者管理类对象获取到当前可以提供指定服务的所有服务提供者主机信息数组,将这些提供者的信息数组通过构建好的响应对象发送给客户端,代码如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | void sendDiscoverResponse(const base_connection::BaseConnection::ptr &con, const request_message::ServiceRequest::ptr &msg)
{
// 构建响应
auto service_resp = message_factory::MessageFactory::messageCreateFactory<response_message::ServiceResponse>();
// 获取主机信息
auto hosts = provider_manager_->getServiceProviders(msg->getMethod());
service_resp->setId(msg->getReqRespId());
// 设置方法和主机信息
service_resp->setMethod(msg->getMethod());
service_resp->setMType(public_data::MType::Resp_service);
service_resp->setServiceOptye(public_data::ServiceOptype::Service_discover);
if (hosts.empty())
{
LOG(Level::Warning, "不存在主机信息");
// 构建错误响应
service_resp->setRCode(public_data::RCode::RCode_not_found_service);
con->send(service_resp);
return;
}
service_resp->setRCode(public_data::RCode::RCode_fine);
service_resp->setHosts(hosts);
con->send(service_resp);
}
|
接着,提供一个发送错误响应的函数,只需要创建响应对象,构建常规信息和错误码,再将响应发送给客户端即可:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12 | void handleErrorResponse(const base_connection::BaseConnection::ptr &con, const request_message::ServiceRequest::ptr &msg)
{
auto service_resp = message_factory::MessageFactory::messageCreateFactory<response_message::ServiceResponse>();
service_resp->setId(msg->getReqRespId());
// 设置方法和主机信息
service_resp->setMType(public_data::MType::Resp_service);
service_resp->setServiceOptye(public_data::ServiceOptype::Service_wrong_type);
service_resp->setRCode(public_data::RCode::RCode_not_found_service);
con->send(service_resp);
}
|
有了上面的响应发送接口,结合前面提到的思路,编写处理请求的函数如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | void handleRegisterDiscoverRequest(const base_connection::BaseConnection::ptr &con, const request_message::ServiceRequest::ptr &msg)
{
// 收到服务请求只有两种情况:服务发现和服务注册
// 对于服务上线和下线通知是当前服务端发送给客户端,需要由客户端进行处理的
// 获取操作类型
public_data::ServiceOptype type = msg->getServiceOptye();
if (type == public_data::ServiceOptype::Service_register)
{
// 服务注册
// 添加服务提供者到ProviderManager中
provider_manager_->insertProvider(con, msg->getMethod(), msg->getHost());
// 通知发现者
discoverer_manager_->onlineNotify(msg->getMethod(), msg->getHost());
sendRegistryResponse(con, msg);
}
else if (type == public_data::ServiceOptype::Service_discover)
{
// 服务发现
discoverer_manager_->insertDiscoverer(con, msg->getMethod());
sendDiscoverResponse(con, msg);
}
else
{
LOG(Level::Error, "收到服务请求,但是服务类型错误");
handleErrorResponse(con, msg);
}
}
|
除了上面处理发现和注册请求外,服务提供者和服务发现者也有可能下线,所以对应地还需要有一个处理服务下线/发现者离线行为的接口,这个接口在服务器检测到有连接断开时调用connectionCallback
函数时会调用,此时会将服务提供者/服务发现者的连接作为参数传递给当前函数用于处理连接断开时的资源释放问题
在该函数中,首先根据对应的连接对象查找到当前服务提供者对应的方法集合,遍历该方法集合中的每一个方法通知到发现过该方法的所有服务发现客户端当前指定的服务下线,如果当前连接不是服务提供者,那么也不会出现问题,因为当前项目并没有使用强断言,所以也不会有终止程序的问题,通知完毕后删除服务发现者或者服务提供者,同样,因为不是强断言,不存在就删除失败,不会使程序崩溃。结合这个思路,代码如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | // 处理连接断开时
void handleProviderConnectionShutdown(const base_connection::BaseConnection::ptr &con)
{
// 通知所有发现者
auto sp = provider_manager_->findProvider(con);
if (!sp)
{
LOG(Level::Warning, "不存在指定的发现者,通知结束");
return;
}
for (auto &m : sp->methods_)
discoverer_manager_->offlineNotify(m, sp->host_);
// 移除服务提供者
provider_manager_->removeProvider(con);
// 如果是服务发现者,移除
discoverer_manager_->removeDiscoverer(con);
}
|
至此,服务端部分注册模块完成
客户端
请求管理模块
请求描述类设计
在网络通信过程中,客户端不会等到服务端响应完成后才继续发送下一个请求,而是有多少个请求就发送多少个请求,但是此时就会出现一个问题,客户端发了很多个请求,服务端对应的就会返回多个响应,既然存在着多个请求和响应,那么客户端就需要对这些请求和响应进行描述和管理,为了使整个客户端结构更加清晰,本次将请求管理作为一个单独的模块,下面考虑当前模块的设计思路:
首先就是对请求和响应进行描述,既然是请求管理,那么少不了的就是请求信息,而每一个请求都对应着一个响应,所以在请求描述中一定包含的字段就是请求和响应的消息,对于请求来说,直接使用Message派生类即可,但是对于响应来说,服务端有可能不是理解给予客户端响应,所以此处需要使用异步的方式来保存响应,当服务端给客户端指定的请求做出响应之后,此时异步结果就会被设置,客户端就可以直接拿到结果,而不是阻塞等待服务端响应,为了存储异步结果,对于响应来说就需要使用到std::promise
,所以请求描述类设计如下:
C++ |
---|
| struct RequestDesc
{
base_message::BaseMessage::ptr request; // 请求描述
std::promise<base_message::BaseMessage::ptr> response; // 存储异步请求响应结果
};
|
除了上面主要的异步方式以外,本次项目中还会考虑其他两种方式:同步和回调函数,同步就是阻塞等待结果,而回调函数就是当收到响应时调用对应的回调函数对结果进行处理而不是将结果保存到std::promise
中。对于同步来说,只需要一直阻塞等待结果带来后直接获取并存储到Message基类中即可,不需要额外保存,对于回调来说亦是如此,但是每一个请求可能需要的处理回调并不相同,所以在请求描述类中还需要一个成员表示回调函数,而回调函数的类型就是Message基类指针,即:
C++ |
---|
| // 回调类型
using callback_t = std::function<void(base_message::BaseMessage::ptr &)>;
struct RequestDesc
{
// ...
callback_t callback; // 回调处理函数
};
|
既然提供了三种方式,那么请求处理模块在收到对应的响应之后又该如何分别是哪一种方式发送的请求,又该调用哪一种方式处理响应结果?此时就需要使用到消息发送模式枚举类,该类在前面的基础模块设计部分已经展示过,此处不再重复。完整的请求描述类设计如下:
C++ |
---|
| struct RequestDesc
{
using ptr = std::shared_ptr<RequestDesc>;
base_message::BaseMessage::ptr request; // 请求描述
public_data::RType send_type; // 消息发送模式
std::promise<base_message::BaseMessage::ptr> response; // 存储异步请求响应结果
callback_t callback; // 回调处理函数
};
|
管理功能设计
对请求和响应进行描述之后,现在考虑如何管理这个描述类。在应用层协议部分提到过,为了区分不同的请求使用到了请求/响应ID,所以使用这个ID与请求描述类构建映射关系,即一个请求/响应ID对应一个请求描述类,使用一个哈希表来保存这个映射关系,设计如下:
C++ |
---|
| std::unordered_map<std::string, RequestDesc::ptr> request_map_; // 请求ID与描述映射
|
对应地给出针对这个哈希表的增删查函数。需要注意的是,对于添加函数来说,因为客户端发送请求后会获取到服务端的响应,该响应可能要保存到response
中,所以该函数在插入成功后需要返回对应的请求描述类对象指针,便于上层使用response
。另外,对于回调函数的设置需要确保操作类型是Req_callback
且回调函数不为空,如果是异步类型或者同步类型就没有必要设置回调函数。其余两个接口设计较为简单,此处不再赘述。实现如下:
功能设计
请求管理模块最主要的功能就是发送请求并管理响应,所以请求接口是必不可少的,根据前面的设计,本次需要提供三种发送请求的函数,分别是同步请求、异步请求和回调请求
首先考虑异步请求发送函数,该函数整体的逻辑创建一个请求描述类对象,填充相关的字段后与当前请求ID建立映射关系,接着调用连接对象的send
函数发送请求,接着调用std::promise
的get_future
函数,一旦收到响应调用set_value
函数就会将结果设置到当前输出型参数std::future
中。根据这个思路,该函数设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | // 异步结果类型
using async_response = std::future<base_message::BaseMessage::ptr>;
// 异步发送接口
bool sendRequest(const base_connection::BaseConnection::ptr &con, const base_message::BaseMessage::ptr &msg, async_response &resp)
{
// 创建出请求描述
RequestDesc::ptr rd = insertRequestDesc(msg, msg->getReqRespId(), public_data::RType::Req_async);
if(!rd.get())
{
LOG(Level::Error, "异步发送创建请求描述失败");
return false;
}
// 发送请求
con->send(msg);
// 获取future对象
resp = rd->response.get_future();
return true;
}
|
接着是同步请求发送函数,这个函数的设计思路是创建一个std::future
对象,这个对象用于存储异步响应结果,因为当前项目中主要使用的是异步操作,所以要实现同步可以直接考虑使用到std::future
对象的get
函数,该函数会阻塞等待直到有结果为止,得到结果后将结果赋值给输出型参数Message基类指针即可。客户端需要得到响应就必须先发送请求,所以在获取结果之前需要发送请求函数将需要发送的请求发送给服务端,此处可以直接调用异步发送函数。需要注意,与异步发送不同的是,同步发送函数不需要对每一个请求进行映射管理,因为同步函数只有当前请求结束了才会发起下一个请求,否则会一直阻塞等待,也就是说,整个过程完全是串行的。根据这个思路,该函数设计如下:
Note
需要注意,本次实现的同步发送函数并不是直接调用接收相关的函数(例如recv
),因为大部分的网络库(包括本项目中使用Muduo库)都是使用的异步模式,所以本次实现的同步发送函数也是通过异步方式间接实现的
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | // 同步发送接口
bool sendRequest(const base_connection::BaseConnection::ptr &con, const base_message::BaseMessage::ptr &msg, base_message::BaseMessage::ptr &resp)
{
// 创建出请求描述
async_response resp_async;
bool ret = sendRequest(con, msg, resp_async);
if(!ret)
{
LOG(Level::Error, "同步发送失败");
return false;
}
// 不存在结果时会阻塞
resp = resp_async.get();
return true;
}
|
接下来是回调请求发送函数,该函数的基本思路与异步发送函数基本一致,只是不需要额外将结果返回,而是收到响应时调用设置的回调函数直接进行处理。该函数设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | // 回调发送接口
bool sendRequest(const base_connection::BaseConnection::ptr &con, const base_message::BaseMessage::ptr &msg, callback_t &cb)
{
// 创建出请求描述
RequestDesc::ptr rd = insertRequestDesc(msg, msg->getReqRespId(), public_data::RType::Req_callback, cb);
if (!rd.get())
{
LOG(Level::Error, "回调发送创建请求描述失败");
return false;
}
// 发送请求
con->send(msg);
return true;
}
|
有了发送接口还需要有处理响应的接口,该接口就是通过请求/响应ID找到对应的请求描述类,根据该类对象中的发送方式决定如何处理响应,如果是异步方式,就调用set_value
设置响应结果,如果是回调方式,就调用设置的回调函数进行处理,最后不要忘记移除当前的请求描述类和请求/响应ID映射关系,因为该请求已经被正常处理。该函数设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | // 收到服务端响应时的回调函数
void handleResponse(const base_connection::BaseConnection::ptr &con, base_message::BaseMessage::ptr &msg)
{
// 1. 查找到指定的描述字段
RequestDesc::ptr rd = findRequestDesc(msg->getReqRespId());
if(!rd.get())
{
LOG(Level::Warning, "不存在请求ID:{}对应的描述字段", msg->getReqRespId());
return;
}
// 2. 根据异步或者回调获取结果
if(rd->send_type == public_data::RType::Req_async)
rd->response.set_value(msg);
else if(rd->send_type == public_data::RType::Req_callback)
(rd->callback)(msg);
// 3. 处理完当前响应后说明对应的请求结束,删除对应的rid
removeRequestDesc(msg->getReqRespId());
}
|
至此,客户端请求管理模块完成
RPC功能模块
模块基本结构
有了前面针对客户端发送请求模块的设计,现在根据该模块具体设计针对RPC功能请求的模块。在这个模块中,主要就是基于请求管理模块进行进一步封装得到用户可以使用的RPC调用接口,因为本次项目中的设计逻辑是请求管理模块并不是让用户直接使用,而是让客户端的请求发送和响应处理更加统一且方便管理。基于这个思路,下面设计客户端部分的RPC功能模块
在RPC功能模块部分,主要提供三种请求发送接口,分别是同步、异步和回调。因为当前类是基于请求模块的,所以需要在当前类中添加请求管理模块的对象指针成员,整个类结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | class RpcCaller
{
public:
using ptr = std::shared_ptr<RpcCaller>;
RpcCaller(const requestor_rpc_framework::Requestor::ptr &requestor)
: requestor_(requestor)
{
}
// 同步调用函数
bool call(const base_connection::BaseConnection::ptr &con, const std::string &method_name, const Json::Value ¶ms, Json::Value &result)
{
}
// 异步调用函数
bool call(const base_connection::BaseConnection::ptr &con, const std::string &method_name, const Json::Value ¶ms, aysnc_response &result)
{
}
// 回调方式调用函数
bool call(const base_connection::BaseConnection::ptr &con, const std::string &method_name, const Json::Value ¶ms, const callback_t &cb)
{
}
private:
requestor_rpc_framework::Requestor::ptr requestor_; // 调用Requestor模块中的发送函数
};
|
同步请求发送函数设计
在同步请求发送函数中,首先需要构建一个RPC请求对象依次填充请求需要的字段,包括请求/响应ID、请求类型、方法名和方法参数,接着调用请求管理模块中的同步发送函数发送请求获取到结果,此处需要注意,在请求管理模块中,三个发送函数是通过函数重载实现的,所以在调用时必须保证类型完全一致,不可以出现父类指针指向子类对象的情况。因为是同步请求,所以在真正获取到结果之前都是阻塞等待,所以接下来就可以直接调用获取结果的函数拿到结果,再通过输出型参数将结果返回给上层,但是这里需要注意,因为上一步使用的是消息基类指针,而在前面的设计中,消息基类中没有获取结果的函数,所以此处还需要将消息基类指针向下转型为RPC响应对象指针。根据这个思路,该函数设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | // 同步调用函数
bool call(const base_connection::BaseConnection::ptr &con, const std::string &method_name, const Json::Value ¶ms, Json::Value &result)
{
// 1. 创建请求
auto rpc_req = message_factory::MessageFactory::messageCreateFactory<request_message::RpcRequest>();
rpc_req->setId(uuid_generator::UuidGenerator::generate_uuid());
rpc_req->setMType(public_data::MType::Req_rpc);
rpc_req->setMethod(method_name);
rpc_req->setParams(params);
// 2. 发送请求
base_message::BaseMessage::ptr base_msg;
// 重载函数必须保证类型完全一致,而父类和子类之间的关系也属于类型不一致
bool ret = requestor_->sendRequest(con, std::dynamic_pointer_cast<base_message::BaseMessage>(rpc_req), base_msg);
if (!ret)
{
LOG(Level::Warning, "同步处理请求失败");
return false;
}
// 3. 等待结果
auto rpc_resp = std::dynamic_pointer_cast<response_message::RpcResponse>(base_msg);
if (rpc_resp->getRCode() != public_data::RCode::RCode_fine)
{
LOG(Level::Warning, "结果异常,原因:{}", errReason(rpc_resp->getRCode()));
return false;
}
result = rpc_resp->getResult();
return true;
}
|
异步请求发送函数设计
根据上面的思路,直接想到的就是调用请求管理模块中的异步请求发送函数,但是这里会遇到一个问题:
在请求管理模块中,异步请求发送函数接收的是一个消息基类的std::future
对象,而在当前项目中,用户拿到的结果都是JSON格式的消息,而不是底层的消息基类对象,所以在设计当前模块时参数不能与请求管理模块一样使用消息基类的std::future
对象,而是需要使用保存JSON对象的std::future
对象,即:
C++ |
---|
| using aysnc_response = std::future<Json::Value>;
|
这样设计之后,在调用请求管理模块中的异步请求发送接口时就需要额外定义一个消息基类的std::future
对象,然后将这个对象作为实参传递给请求管理模块的异步请求发送函数等待内部返回结果,此时就会发现因为请求管理模块的异步请求发送函数并不会直接返回结果,所以调用到get_future
函数并不会直接返回结果,而是只有收到响应时调用了set_value
函数才会真正有结果,但是对于RPC功能模块的异步发送函数来说,其并不知道有没有结果,也不知道何时会有结果,在后续代码中如果考虑从std::future
对象使用get
获取结果就会阻塞,此时就回到了上面的同步模式。这种情况下,RPC功能模块的异步请求发送函数就没有存在的意义了
那么如何解决这个问题呢?解决这个问题之前,先了解异步请求发送函数需要达到的目的:当上层使用该函数时不会阻塞在该接口中,上层在获取输出型参数的值时可以正常获取到结果。基于这个目的,在当前函数中可以考虑使用请求管理模块中的回调请求发送函数,使用该函数可以确保在收到响应时会调用设置的回调函数针对结果进行处理,例如在本次调用中可以从消息基类中拿到JSON对象格式的结果,再调用set_value
函数将结果设置到输出型参数中
根据这个思路,在设计RPC功能模块中的异步请求发送接口时就需要使用到请求管理模块中的回调请求发送函数,对应地需要设计一个回调函数作为参数传递给该回调请求发送函数。首先考虑如何设计该回调函数,因为该回调函数是收到响应时才会执行,所以少不了的参数就是消息基类指针,在请求管理模块中的响应处理函数被调用时,会传递该消息基类指针表示响应结果,但是除了这一个参数以外,还需要一个参数,这个参数就是保存类型为JSON对象的std::promise
对象,因为上层拿到的是JSON格式的数据,所以需要通过这个对象设置对应的结果,在使用该回调函数时,也需要创建该对象以此来将结果返回给上层,在函数内部,只需要对响应结果进行正确性判断已经结果设置,如果结果正常,那么就可以直接将结果通过set_value
函数设置,否则直接返回。设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | // 异步请求回调函数
void async_callback(std::shared_ptr<std::promise<Json::Value>> result, base_message::BaseMessage::ptr &msg)
{
auto resp_rpc = std::dynamic_pointer_cast<response_message::RpcResponse>(msg);
if (!resp_rpc)
{
LOG(Level::Warning, "异步回调内部对象转换失败");
return;
}
if (resp_rpc->getRCode() != public_data::RCode::RCode_fine)
{
LOG(Level::Warning, "结果异常,原因:{}", errReason(resp_rpc->getRCode()));
return;
}
result->set_value(resp_rpc->getResult());
}
|
接着,完善异步请求发送函数,基本逻辑与同步请求发送函数一致,需要注意的是,创建std::promise<Json::Value>
对象时需要使用智能指针,并且在回调函数中使用值传递,确保该对象不会在异步请求发送函数结束之后被销毁,创建完对象后,如果get_future
函数获取到std::future<Json::Value>
对象,一旦回调函数中调用了set_value
就会将结果返回给上层,整体设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | // 异步调用函数
bool call(const base_connection::BaseConnection::ptr &con, const std::string &method_name, const Json::Value ¶ms, aysnc_response &result)
{
// 1. 创建请求
auto rpc_req = message_factory::MessageFactory::messageCreateFactory<request_message::RpcRequest>();
rpc_req->setId(uuid_generator::UuidGenerator::generate_uuid());
rpc_req->setMType(public_data::MType::Req_rpc);
rpc_req->setMethod(method_name);
rpc_req->setParams(params);
// 2. 发送请求
// 使用智能指针防止局部promise变量被销毁导致错误
std::shared_ptr<std::promise<Json::Value>> json_promise = std::make_shared<std::promise<Json::Value>>();
result = json_promise->get_future();
requestor_rpc_framework::Requestor::callback_t cb = std::bind(&RpcCaller::async_callback, this, json_promise, std::placeholders::_1);
// 重载函数必须保证类型完全一致,而父类和子类之间的关系也属于类型不一致
bool ret = requestor_->sendRequest(con, std::dynamic_pointer_cast<base_message::BaseMessage>(rpc_req), cb);
if (!ret)
{
LOG(Level::Warning, "同步处理请求失败");
return false;
}
return true;
}
|
回调请求发送函数设计
有了上面异步请求发送函数设计的基础,对于回调请求发送函数的设计也是类似,只是将之前的std::promise<Json::Value>
参数换成回调函数作为参数,这个回调函数就是由上层传递的、用于处理结果的函数,该函数中的逻辑就是调用用户传递的回调函数处理对应的结果,其他逻辑不变,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | // 回调请求函数
void cb_callback(const callback_t &cb, base_message::BaseMessage::ptr &msg)
{
auto resp_rpc = std::dynamic_pointer_cast<response_message::RpcResponse>(msg);
if (!resp_rpc)
{
LOG(Level::Warning, "异步回调内部对象转换失败");
return;
}
if (resp_rpc->getRCode() != public_data::RCode::RCode_fine)
{
LOG(Level::Warning, "结果异常,原因:{}", errReason(resp_rpc->getRCode()));
return;
}
// 调用回调函数处理结果
cb(resp_rpc->getResult());
}
|
接着,完善回调请求发送函数,基本思路就是构建请求,通过请求管理模块的回调请求发送函数将上面的回调函数作为参数传递给该函数:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | // 回调方式调用函数
bool call(const base_connection::BaseConnection::ptr &con, const std::string &method_name, const Json::Value ¶ms, const callback_t &cb)
{
// 1. 创建请求
auto rpc_req = message_factory::MessageFactory::messageCreateFactory<request_message::RpcRequest>();
rpc_req->setId(uuid_generator::UuidGenerator::generate_uuid());
rpc_req->setMType(public_data::MType::Req_rpc);
rpc_req->setMethod(method_name);
rpc_req->setParams(params);
// 设置回调函数
requestor_rpc_framework::Requestor::callback_t req_cb = std::bind(&RpcCaller::cb_callback, this, cb, std::placeholders::_1);
bool ret = requestor_->sendRequest(con, std::dynamic_pointer_cast<base_message::BaseMessage>(rpc_req), req_cb);
if (!ret)
{
LOG(Level::Warning, "回调处理请求失败");
return false;
}
return true;
}
|
至此,客户端中关于RPC的基本功能完成
RPC基本功能测试
为了确保RPC功能正常,接下来进入RPC功能测试,见文档功能测试
服务注册模块
客户端功能回顾
在服务端设计服务注册模块是为了注册中心的基本功能,但是注册中心终究是一个服务器,不论是服务提供者还是服务发现者,都不能是服务端,否则无法向注册中心发起连接,所以本次项目中,服务提供者既是一个服务器,也是一个客户端,而服务发现者除了本身自己是客户端外,还需要包含一个服务发现的客户端,二者分别实现下面的功能:
- 服务注册客户端:服务提供者客户端,用于向注册中心发起连接并发送注册请求
- 服务发现客户端:服务发现者客户端,用于向注册中心发起连接并发送服务发现请求,并处理服务发现响应
服务提供者类
根据前面的功能介绍,要实现一个服务注册客户端,就必须要存在发送注册请求的接口,本次将接口抽离到一个类中简化后面对客户端进行的封装。下面实现服务提供者类:
为了可以发送注册请求,就必须要用到请求管理模块中的请求发送函数,所以在服务提供者类中需要包含请求管理模块类的对象,在构造时通过参数对其进行初始化。另外,还需要提供一个发送注册请求的接口,因为对于注册中心来说,其需要知道哪一个服务提供者注册了哪一个服务,所以注册时需要传递服务名称和服务提供者的主机信息,为了发送注册请求,还要有一个连接对象。根据这个思路,整个类的结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | class Provider
{
public:
using ptr = std::shared_ptr<Provider>;
Provider(const rpc_client::requestor_rpc_framework::Requestor::ptr &requestor)
: requestor_(requestor)
{
}
// 服务注册接口——用于服务提供方
// 主要行为就是通过Requestor模块向服务端发起服务注册请求
bool registerService(const base_connection::BaseConnection::ptr &con, const std::string &method, const public_data::host_addr_t &host)
{
}
private:
rpc_client::requestor_rpc_framework::Requestor::ptr requestor_;
};
|
接着,实现服务注册接口,在这个接口中主要行为就是发送服务注册请求,所以首先需要构建请求对象,并填充响应的字段,根据前面对服务注册模块的介绍,客户端请求需要包含服务类型、服务名称和当前主机信息,将这些内容设置到正文中即可。接着调用请求管理模块中的请求发送函数发送请求,这里需要注意,使用的是同步请求发送函数,因为只有注册成功了才能进行后续的操作,如果是异步的,那么客户端可能在服务端真正注册完成之前就默认服务端已经注册完毕了,此时如果客户端请求就会可能出现注册中心分发请求错误。最后因为此时使用的是同步请求发送函数,所以在当前函数调用完请求发送函数后如果返回值为true
就可以直接检查返回状态码是否正常,如果异常向上层返回false
,否则返回true
。基于这个思路,实现服务注册接口如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | bool registerService(const base_connection::BaseConnection::ptr &con, const std::string &method, const public_data::host_addr_t &host)
{
// 创建服务注册请求并填充字段
auto service_req = message_factory::MessageFactory::messageCreateFactory<request_message::ServiceRequest>();
service_req->setId(uuid_generator::UuidGenerator::generate_uuid());
service_req->setMethod(method);
service_req->setHost(host);
service_req->setServiceOptype(public_data::ServiceOptype::Service_register);
service_req->setMType(public_data::MType::Req_service);
// 发送服务注册请求
base_message::BaseMessage::ptr base_resp;
bool ret = requestor_->sendRequest(con, service_req, base_resp);
if (!ret)
{
LOG(Level::Warning, "服务注册请求发送失败");
return false;
}
// 转换为具体子类结果判断返回值
response_message::ServiceResponse::ptr service_resp = std::dynamic_pointer_cast<response_message::ServiceResponse>(base_resp);
if (service_resp->getRCode() != public_data::RCode::RCode_fine)
{
LOG(Level::Warning, "服务注册请求返回值类型异常:{}", public_data::errReason(service_resp->getRCode()));
return false;
}
return true;
}
|
主机信息管理类
对于服务发现者来说,当服务提供者注册时,注册中心就会保存该服务提供者的信息,当服务发现者进行服务发现时,在本次项目中,注册中心会根据客户端需要的服务返回所有服务提供者的主机信息,这就需要服务发现者对这些主机信息进行保存和管理,对于此处的管理,完全可以直接使用一个数组进行保存,但是此处还需要考虑一个问题,服务发现者收到了很多的主机信息,但是实际上请求时只需要拿到一个主机信息,这就涉及到在多个主机信息中选择的策略,在本次项目中考虑使用RR轮转的策略,为了更好得管理主机信息和对主机信息数组的选择和修改,下面考虑将其封装到一个主机信息管理类中,设计如下:
根据上面的思路,在主机信息管理类中需要有一个保存服务提供者信息的数组,如下:
C++ |
---|
| std::vector<public_data::host_addr_t> hosts_;
|
接下来,为了便于上层获取到一个主机信息,结合RR轮转策略,还需要提供一个表示待选择的主机信息元素的索引,如下:
接着,设计一个构造函数,在该函数中使用外部传递的主机信息数组对当前类中的主机信息数组进行初始化,另外默认索引值为0:
C++ |
---|
| HostManager(const std::vector<public_data::host_addr_t> &hosts = std::vector<public_data::host_addr_t>())
: hosts_(hosts), index_(0)
{
}
|
接着,提供下面的接口:
- 添加一个主机信息
- 删除一个主机信息
- 根据RR轮转策略获取一个主机信息
- 判断当前主机信息数组是否为空
为了操作的线程安全,还需要添加互斥锁成员:
分别实现上述接口,实现如下:
服务发现者类
要实现服务发现客户端,首先设计服务发现者类,简化后面封装客户端的过程。上面已经实现了针对一种服务的主机信息管理类,但是服务发现者不一定只会对一种服务进行服务发现,所以在服务发现者类中需要有保存服务名称和服务主机信息管理类对象映射关系的哈希表,如下:
C++ |
---|
| std::unordered_map<std::string, HostManager::ptr> service_providers_;
|
对应地提供一个保证线程安全的互斥锁:
接着,根据前面对注册中心的设计,服务发现者一共需要处理两种行为:
- 进行服务发现
- 处理服务上线/下线请求
对于「进行服务发现」行为来说,本质就是服务发现客户端需要拿到一个主机信息返回给服务请求客户端,在这一过程中存在两种情况:
- 指定的服务已经存在于
service_providers_
中,此时通过主机信息管理类对象调用选取主机信息的函数将主机信息返回给上层 - 指定的服务不存在于
service_providers_
中,此时就需要向注册中心发起服务发现请求,根据注册中心返回的响应构建主机信息管理类对象建立服务名称和服务主机信息管理类对象的映射关系,再调用主机信息管理类对象的选取主机信息函数获取到具体一个主机信息将其返回给上层
根据这两个情况以及对应的处理思路可以看到还需要有一个请求管理模块类的对象用于进行消息发送:
C++ |
---|
| requestor_rpc_framework::Requestor::ptr requestor_;
|
对应的,提供一个构造函数对其进行初始化:
C++ |
---|
| Discoverer(const requestor_rpc_framework::Requestor::ptr &requestor)
: requestor_(requestor)
{
}
|
完善进行服务发现的函数,需要注意的是,此处调用的请求消息发送函数依旧是同步发送,原因类似于上面的服务提供者部分提到的说法:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 | bool discoverHost(const base_connection::BaseConnection::ptr &con, const std::string &method, public_data::host_addr_t &host)
{
{
std::unique_lock<std::mutex> lock(manage_mtx_);
// 判断是否存在指定的方法,如果存在,通过选择策略选择主机通过输出型参数返回给上层
auto pos = service_providers_.find(method);
if (pos != service_providers_.end())
{
// 判断主机信息管理结构是否为空
// 如果不为空,说明可以选择一个主机信息进行返回
auto method_host = pos->second;
if (!method_host->emptyHosts())
{
host = method_host->choostHost();
return true;
}
}
}
// 如果不存在指定的方法,那么肯定不存在对应的MethodHost结构
// 此时就需要向服务端发起服务发现的请求
auto service_req = message_factory::MessageFactory::messageCreateFactory<request_message::ServiceRequest>();
service_req->setId(uuid_generator::UuidGenerator::generate_uuid());
service_req->setMethod(method);
service_req->setMType(public_data::MType::Req_service);
service_req->setServiceOptype(public_data::ServiceOptype::Service_discover);
base_message::BaseMessage::ptr msg_resp;
requestor_->sendRequest(con, service_req, msg_resp);
auto service_resp = std::dynamic_pointer_cast<response_message::ServiceResponse>(msg_resp);
if(!service_resp)
{
LOG(Level::Warning, "向下转型失败");
return false;
}
if(service_resp->getRCode() != public_data::RCode::RCode_fine)
{
LOG(Level::Warning, "服务:{}发现错误:{}", method, public_data::errReason(service_resp->getRCode()));
return false;
}
std::unique_lock<std::mutex> lock(manage_mtx_);
// 此时说明一定存在服务了
// 构建MethodHost对象
auto methodHost = std::make_shared<HostManager>(service_resp->getHosts());
// 获取一个host返回
host = methodHost->choostHost();
// 插入到映射表
service_providers_[method] = methodHost;
return true;
}
|
对于「处理服务上线/下线请求」行为来说,需要对当前消息类型进行区分处理:
- 服务上线请求:当一个服务提供者上线时,注册中心会通过服务上线请求将主机信息发送给对应的服务发现者,对应的服务发现者只需要根据服务名称找到对应的主机信息管理类对象,然后调用主机信息管理类对象的添加主机信息的函数将主机信息添加到对应的主机信息管理类对象中即可。但是这一步有可能存在服务名称不存在的情况,如果不存在就需要创建一个主机信息管理类对象,然后再将主机信息添加到对应的主机信息管理类对象中,最后将服务名称和主机信息管理类对象的映射关系添加到
service_providers_
中 - 服务下线请求:当一个服务提供者下线时,注册中心会通过服务下线请求将主机信息发送给对应的服务发现这,对应的服务发现者根据服务名称找对应的主机信息管理类对象,然后调用主机信息管理类对象的删除主机信息的函数将主机信息从对应的主机信息管理类对象中删除即可,如果不存在指定的服务名,则直接返回,不做出任何处理
根据上面的思路,完善处理服务上线/下线请求的函数,实现如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | // 针对服务端发送的服务上线/下线请求处理
void handleOnlineOfflineServiceRequest(const base_connection::BaseConnection::ptr &con, const request_message::ServiceRequest::ptr &msg)
{
std::unique_lock<std::mutex> lock(manage_mtx_);
// 只针对服务上线和下线的请求进行处理,不对其他服务类型的请求处理
// 获取请求类型
auto type = msg->getServiceOptye();
auto method = msg->getMethod();
if(type == public_data::ServiceOptype::Service_online)
{
// 1. 服务上线请求处理
// 处理思路:查找是否存在指定服务的MethodHost对象
// 如果存在直接向其中添加
// 不存在则说明则构造一个该服务对应的MethodHost
// 再添加到哈希表中
LOG(Level::Info, "服务提供者:{}:{}上线了一个{}服务", msg->getHost().first, msg->getHost().second, msg->getMethod());
auto pos = service_providers_.find(method);
if (pos == service_providers_.end())
{
// 不存在指定服务
auto host = std::make_shared<HostManager>();
host->insertHost(msg->getHost());
service_providers_[method] = host;
}
else
{
// 存在直接添加
auto method_hosts = pos->second;
method_hosts->insertHost(msg->getHost());
}
}
else if(type == public_data::ServiceOptype::Service_offline)
{
LOG(Level::Info, "服务提供者:{}:{}下线了一个{}服务", msg->getHost().first, msg->getHost().second, msg->getMethod());
// 2. 服务下线请求处理
// 将对应服务的主机从管理主机信息的结构中移除
auto pos = service_providers_.find(method);
if(pos == service_providers_.end())
{
LOG(Level::Warning, "不存在指定的服务");
return;
}
auto method_hosts = pos->second;
auto host = msg->getHost();
method_hosts->removeHost(host);
}
}
|
至此,客户端服务注册模块设计完成90%,剩下10%见功能模块设计(主题、客户端封装、服务端封装)