功能模块设计(主题、客户端封装、服务端封装)
约 6518 个字 853 行代码 预计阅读时间 32 分钟
客户端
主题操作模块
对于客户端来说,就是提供主题操作的函数,即针对具体的主题类型向服务端发送对应的请求,但是需要注意的是,对于不同的主题,其可能对应的处理方式不同,所以需要建立主题名称和处理回调函数的映射关系,这个回调函数主要就是针对主题和对应的消息进行处理,所以参数一共有两个:主题名称和主题消息,类型设计如下:
C++ |
---|
| // 收到发布的消息时执行的回调函数类型
using publishCallback = std::function<void(const std::string &topic_name, const std::string &msg)>;
|
接着,提供一张哈希表用于建立主题名称和回调函数的映射关系:
C++ |
---|
| std::unordered_map<std::string, publishCallback> topic_callback_; // 不同的主题对应的处理回调函数映射
|
为了保证修改topic_callback_
的线程安全,需要使用一个互斥锁:
C++ |
---|
| std::mutex manager_map_mtx_;
|
对应地,提供添加、删除和获取回调函数接口:
因为客户端的请求消息大体都是类似的,除了消息类型为消息发布时需要携带消息内容以外,其他的请求消息只是请求类型不同,其他字段值都是一致的,所以可以考虑将消息的构建和发送封装到一个函数中,其他功能函数调用该请求发送函数即可
对于该请求发送函数,首先需要构建一个主题消息对象,分别填充其中的字段,包括主题名称和主题操作类型,接着调用请求管理模块的请求发送函数发送对应的请求,最后检查响应消息的返回码,如果返回码为RCode_fine
则表示请求成功,否则表示请求失败。需要注意的是,此处请求发送函数也必须使用同步的,因为必须要确保主题正常创建、删除、订阅、取消订阅以及消息发布成功,才能继续后续的操作。该函数实现如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | // 通用的请求发送接口
bool baseRequest(const base_connection::BaseConnection::ptr &con, const std::string &topic_name, const public_data::TopicOptype topic_optype, const std::string &content = "")
{
// 1. 构造出主题请求对象,并填充相关字段
auto topic_req = message_factory::MessageFactory::messageCreateFactory<request_message::TopicRequest>();
topic_req->setId(uuid_generator::UuidGenerator::generate_uuid());
topic_req->setMType(public_data::MType::Req_topic);
topic_req->setTopicName(topic_name);
topic_req->setTopicOptype(topic_optype);
// 如果操作类型是主题消息发布,则还需要设置发布的消息
if (topic_optype == public_data::TopicOptype::Topic_publish)
topic_req->setMessage(content);
// 2. 发送请求
base_message::BaseMessage::ptr msg_resp;
bool ret = requestor_->sendRequest(con, topic_req, msg_resp);
if (!ret)
{
LOG(Level::Warning, "主题操作请求发送失败");
return false;
}
// 3. 判断响应结果是否正确
auto topic_resp = std::dynamic_pointer_cast<response_message::TopicResponse>(msg_resp);
if (!topic_resp)
{
LOG(Level::Warning, "向下转型失败");
return false;
}
if (topic_resp->getRCode() != public_data::RCode::RCode_fine)
{
LOG(Level::Warning, "主题{}操作错误:{}", topic_name, public_data::errReason(topic_resp->getRCode()));
return false;
}
return true;
}
|
接着,分别实现功能函数:
接下来考虑主题订阅和取消订阅函数,对于主题订阅来说,当客户端想要订阅一个主题就需要指定一个处理该主题消息的回调函数,所以该函数除了需要设置主题名称以外,还需要外部传递一个回调函数,在函数内部首先需要将对应的消息处理函数插入到topic_callback_
中,确保后续刚订阅主题就有消息时可以立即处理,接着调用请求发送函数发送主题订阅请求,检查该函数返回值决定当前函数的返回值,需要注意,如果请求发送失败,要移除对应的回调函数确保下一次可以正常使用其他回调函数。该函数实现如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | // 订阅主题
bool subscribeTopic(const base_connection::BaseConnection::ptr &con, const std::string &topic_name, const publishCallback &cb)
{
// 先插入回调函数,防止后续刚订阅主题就有消息时可以立即处理
insertCallback(topic_name, cb);
bool ret = baseRequest(con, topic_name, public_data::TopicOptype::Topic_subscribe);
if(!ret)
{
// 删除回调函数并返回
removeCallback(topic_name);
return false;
}
return true;
}
|
对应的,取消订阅函数只需要移除对应的回调函数再发送取消订阅请求即可:
C++ |
---|
| bool cancelSubscribeTopic(const base_connection::BaseConnection::ptr &con, const std::string &topic_name)
{
// 先删除回调函数,确保取消之后都不处理消息
removeCallback(topic_name);
return baseRequest(con, topic_name, public_data::TopicOptype::Topic_cancel);
}
|
接着是消息发布函数,该函数只需要调用请求发送函数发送消息发布请求即可,但是需要注意,参数需要包含待发布的消息内容:
C++ |
---|
| // 主题发布
bool publishTopicMessage(const base_connection::BaseConnection::ptr &con, const std::string &topic_name, const std::string &content)
{
return baseRequest(con, topic_name, public_data::TopicOptype::Topic_publish, content);
}
|
除了请求发送,客户端还需要做到接收服务端发送的消息(主题消息发布请求转发的内容),具体逻辑就是根据消息基类中的主题找到对应的回调函数处理该消息即可:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | // 处理收到发布的消息
void handleTopicMessagePublishRequest(const base_connection::BaseConnection::ptr &con, const request_message::TopicRequest::ptr &msg)
{
// 判断操作类型是否是主题消息发布
public_data::TopicOptype topic_optype = msg->getTopicOptype();
if(topic_optype != public_data::TopicOptype::Topic_publish)
{
LOG(Level::Warning, "非主题消息发布操作类型,处理结束");
return;
}
// 获取到主题名称
std::string topic_name = msg->getTopicName();
// 获取到主题消息
std::string topic_msg = msg->getMessage();
// 根据主题名称获取到对应的回调函数
const publishCallback publish_cb = findPublishCallback(topic_name);
// 调用回调函数进行处理
if(!publish_cb)
{
LOG(Level::Warning, "主题{}对应的回调函数不存在", topic_name);
return;
}
publish_cb(topic_name, topic_msg);
}
|
至此,客户端主题操作模块设计完成
客户端封装
服务注册客户端
服务注册客户端将来需要集成到服务提供者服务端中,其主要功能就是让服务提供者可以将自己的服务注册到注册中心服务端,一旦发起注册请求,对应地就需要有处理响应函数,但是对于客户端来说,并没有具体针对某一个功能设计的响应处理函数,而是将响应交给Requestor
模块进行统一处理,所以需要在服务注册客户端类中添加Requestor
模块,并在构造函数中对其进行初始化,对应地,为了可以让Dispatcher
模块可以将注册响应正确地交给对应的处理函数,还需要根据消息类型注册对应的处理函数
完成上面的内容后,接下来就是创建客户端对象并绑定对应的回调函数,使用客户端基类指针指向子类MuduoClient
,客户端需要绑定对应的消息处理函数,对应的就是Dispatcher
的执行函数,内部执行交给Dispatcher
模块的响应处理函数,最后启动客户端与服务端进行连接
根据这个思路设计服务注册客户端类的基本结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | // 用于服务注册的客户端
class RegisterClient
{
public:
using ptr = std::shared_ptr<RegisterClient>;
RegisterClient(const std::string &ip, const uint16_t port)
: requestor_(std::make_shared<requestor_rpc_framework::Requestor>()), provider_(std::make_shared<rpc_client::rpc_registry::Provider>(requestor_)), dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>())
{
dispatcher_->registerService<base_message::BaseMessage>(public_data::MType::Resp_service, std::bind(&rpc_client::requestor_rpc_framework::Requestor::handleResponse, requestor_.get(), std::placeholders::_1, std::placeholders::_2));
client_ = client_factory::ClientFactory::clientCreateFactory(ip, port);
client_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
// 连接服务端
client_->connect();
}
private:
requestor_rpc_framework::Requestor::ptr requestor_;
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
base_client::BaseClient::ptr client_;
};
|
接着,因为当服务提供者需要注册时,需要告诉注册中心需要注册的服务和主机地址,所以需要提供一个注册函数调用服务提供者类的服务注册请求发送函数进行服务注册。基于这个思路,首先需要在服务注册客户端类中添加一个服务提供者对象,并在构造函数中对其进行初始化。需要注意的是,Provider
对象的声明一定要在Requestor
模块之后,因为Provider
模块依赖Requestor
模块:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | class RegisterClient
{
public:
using ptr = std::shared_ptr<RegisterClient>;
RegisterClient(const std::string &ip, const uint16_t port)
: // ...
, provider_(std::make_shared<rpc_client::rpc_registry::Provider>(requestor_))
, // ...
{
// ...
}
private:
// requestor需要在provider之前声明,因为provider依赖requestor
requestor_rpc_framework::Requestor::ptr requestor_;
// ...
};
|
接着,提供服务注册函数:
C++ |
---|
| bool toRegisterService(const std::string &method, const public_data::host_addr_t &host)
{
return provider_->registerService(client_->connection(), method, host);
}
|
至此,服务注册客户端设计完成
服务发现客户端
对于服务发现客户端来说,大致逻辑与服务注册客户端类似,只是服务发现客户端除了需要处理服务发现请求对应的响应外,还需要处理服务上线和下线的请求,所以给Dispatcher
模块除了注册Requestor
模块的响应处理函数以外,还需要注册Discoverer
类的服务上线和下线的处理函数,接下来就是创建客户端绑定Dispatcher
模块的执行函数,最后启动客户端与服务端进行连接,基本结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | // 用于服务发现的客户端
class DiscovererClient
{
public:
using ptr = std::shared_ptr<DiscovererClient>;
DiscovererClient(const std::string &ip, const uint16_t port, rpc_registry::Discoverer::offlineCallback_t cb)
: requestor_(std::make_shared<requestor_rpc_framework::Requestor>()), discoverer_(std::make_shared<rpc_client::rpc_registry::Discoverer>(requestor_)), dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>())
{
// 处理服务发现请求对应的响应
dispatcher_->registerService<base_message::BaseMessage>(public_data::MType::Resp_service, std::bind(&rpc_client::requestor_rpc_framework::Requestor::handleResponse, requestor_.get(), std::placeholders::_1, std::placeholders::_2));
// 处理服务上线/下线请求对应的响应
dispatcher_->registerService<request_message::ServiceRequest>(public_data::MType::Req_service, std::bind(&rpc_client::rpc_registry::Discoverer::handleOnlineOfflineServiceRequest, discoverer_.get(), std::placeholders::_1, std::placeholders::_2));
client_ = client_factory::ClientFactory::clientCreateFactory(ip, port);
client_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
// 连接服务端
client_->connect();
}
private:
// requestor在discoverer之前
requestor_rpc_framework::Requestor::ptr requestor_;
rpc_client::rpc_registry::Discoverer::ptr discoverer_;
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
base_client::BaseClient::ptr client_;
};
|
接着,服务发现客户端需要有对应的服务发现函数,该函数内部就是通过Discoverer
模块对象调用对应的服务发现函数获取到具体的一个服务提供者信息存储到输出型参数中:
C++ |
---|
| bool toDiscoverHost(const std::string &method, public_data::host_addr_t &host)
{
return discoverer_->discoverHost(client_->connection(), method, host);
}
|
至此,服务发现客户端设计完成
RPC功能客户端
对于RPC功能客户端来说,其主要功能就是根据指定服务和对应的参数发起RPC请求获取到服务端的处理结果。但是一个客户端可能不止请求一种服务,针对这个问题,可以考虑使用长连接的思路,此时设计一张哈希表保存服务提供者的主机信息以及当前成功建立连接的客户端对象:
C++ |
---|
| std::unordered_map<public_data::host_addr_t, base_client::BaseClient::ptr> clients_; // 可以正常发起RPC请求的客户端
|
但是,上面的哈希表对象声明是存在语法问题的,因为host_addr_t
是一个自定义类型,哈希表对于一个键需要有对应的哈希函数计算哈希值,但是对于自定义类型来说不存在对应的哈希函数,所以需要针对该类型提供一个用于计算哈希值的函数,如下:
C++ |
---|
| // 自定义类型host_addr_t需要实现哈希函数,否则编译报错,因为unordered_map无法确定如何计算哈希值
struct hostAddrHash
{
// 用于unordered_map的仿函数hash对象需要为实现const版本的operator()重载函数
size_t operator()(const public_data::host_addr_t &h) const
{
// 将主机地址信息转换为字符串类型便于使用std::hash计算哈希值
std::string host = h.first + std::to_string(h.second);
return std::hash<std::string>{}(host);
}
};
|
接着,在哈希表对象创建时传递该哈希函数:
C++ |
---|
| std::unordered_map<public_data::host_addr_t, base_client::BaseClient::ptr, hostAddrHash> clients_; // 已经发现的所有可以提供指定RPC服务的服务端
|
在前面已经设计了一个RpcCaller
类,该类是基于Requestor
模块进行的RPC请求发送的封装,所以在RPC功能客户端需要包含这两个成员,除了需要发送RPC请求外,当前客户端也需要针对RPC响应进行处理,这就需要将对应的响应处理函数注册到Dispatcher
模块中。这个过程中还涉及到一个问题:RPC功能客户端怎么知道哪些主机可以为自己提供服务?这就需要用到服务发现客户端,所以在RPC功能客户端中需要包含一个服务发现客户端对象指针,并提供一个是否启用服务发现的标记,一旦该标记为真,那么就为该指针创建对象,否则就不创建。需要注意,进行服务发现和服务请求是两个相对的逻辑,也就是说开启了服务发现就不会进行服务请求,因为如果开了服务发现,说明此时没有可以请求的服务提供者,直接进行服务请求就会出错
根据上面的思路,RPC功能客户端的基本结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | // 用于RPC调用的客户端
class RpcClient
{
public:
using ptr = std::shared_ptr<RpcClient>;
private:
// 自定义类型host_addr_t需要实现哈希函数,否则编译报错,因为unordered_map无法确定如何计算哈希值
struct hostAddrHash
{
// 用于unordered_map的仿函数hash对象需要为实现const版本的operator()重载函数
size_t operator()(const public_data::host_addr_t &h) const
{
// 将主机地址信息转换为字符串类型便于使用std::hash计算哈希值
std::string host = h.first + std::to_string(h.second);
return std::hash<std::string>{}(host);
}
};
bool isToDiscover_; // 是否需要进行服务发现
DiscovererClient::ptr discoverer_client_; // 进行服务发现时启用服务发现客户端
requestor_rpc_framework::Requestor::ptr requestor_;
rpc_client::rpc_caller::RpcCaller::ptr rpc_caller_;
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
base_client::BaseClient::ptr client_;
std::unordered_map<public_data::host_addr_t, base_client::BaseClient::ptr, hostAddrHash> clients_; // 已经发现的所有可以提供指定RPC服务的服务端
};
|
接下来,需要针对clients_
提供对应的添加、删除、查找函数,这三个函数思路较为简单,参考代码如下:
但是,除了上面提到的操作clients_
的函数外,还需要提供两个函数,一个是通过具体的服务名称获取一个已经成功建立连接的请求客户端的函数,另外一个是创建请求客户端的函数
首先是创建客户端的函数,之所以需要这个函数是因为在上面的思路中提到「进行服务发现和服务请求是两个相对的逻辑」,如果开启了服务发现,此时client_
成员就是空指针,那么接下来在进行服务请求时就会出现错误,所以在这个函数中主要逻辑就是创建客户端并绑定Dispatcher
类的消息处理函数并向RPC服务端发起连接,再将该客户端与地址信息建立映射关系,最后返回该客户端。设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13 | // 创建新客户端
base_client::BaseClient::ptr createClient(const public_data::host_addr_t &host)
{
base_client::BaseClient::ptr client = client_factory::ClientFactory::clientCreateFactory(host.first, host.second);
client->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
// 连接服务端
client->connect();
insertClient(host, client);
return client;
}
|
接着是获取客户端的函数,这个函数的作用是根据具体的服务找到已经建立连接的客户端返回给上层,该函数的基本逻辑就是如果判断启用了服务发现,则说明此时肯定没有已经建立连接的客户端,否则说明client_
对象被正常创建,直接使用该客户端对象即可。下面考虑开启了服务发现的逻辑:既然开启了服务发现,首先就是要通过服务发现客户端对象进行服务发现,获取到具体的一个服务提供者地址,接着根据这个地址通过findClient
查找到是否存在对应的客户端,如果存在就直接使用,否则调用createClient
创建一个新的客户端,代码如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | // 找到合适的客户端调用接口
base_client::BaseClient::ptr getClient(const std::string &method)
{
base_client::BaseClient::ptr client;
// 判断是否需要发现客户端
// 如果是发现客户端,则调用服务发现客户端的服务发现功能获取到客户端信息
// 否则使用固定的客户端返回信息
if (isToDiscover_)
{
public_data::host_addr_t host;
bool ret = discoverer_client_->toDiscoverHost(method, host);
if (!ret)
{
LOG(Level::Warning, "Rpc客户端服务发现失败");
return base_client::BaseClient::ptr();
}
// 判断是否已经存在对应的服务提供者
client = findClient(host);
if (!client)
client = createClient(host); // 不存在就创建
}
else
client = client_;
return client;
}
|
接着,设计发起RPC请求的三个类型的函数,分别是同步、异步和回调方式,这三个函数的实现思路都非常一致:通过getClient
获取到一个已经建立连接的客户端对象,然后调用RpcCaller
类对象的call
函数发起RPC请求,代码如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13 | bool call(const std::string &method_name, const Json::Value ¶ms, Json::Value &result)
{
// 获取到指定的客户端调用
base_client::BaseClient::ptr client = getClient(method_name);
if (!client)
{
LOG(Level::Warning, "获取客户端错误");
return false;
}
// 调用Rpc调用接口执行任务
return rpc_caller_->call(client->connection(), method_name, params, result);
}
|
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | // 异步函数
bool call(const std::string &method_name, const Json::Value ¶ms, rpc_client::rpc_caller::RpcCaller::aysnc_response &result)
{
// 获取到指定的客户端调用
base_client::BaseClient::ptr client = getClient(method_name);
if (!client)
{
LOG(Level::Warning, "获取客户端错误");
return false;
}
// 调用Rpc调用接口执行任务
return rpc_caller_->call(client->connection(), method_name, params, result);
}
|
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | // 回调函数
bool call(const std::string &method_name, const Json::Value ¶ms, const rpc_client::rpc_caller::RpcCaller::callback_t &cb)
{
// 获取到指定的客户端调用
base_client::BaseClient::ptr client = getClient(method_name);
if (!client)
{
LOG(Level::Warning, "获取客户端错误");
return false;
}
// 调用Rpc调用接口执行任务
return rpc_caller_->call(client->connection(), method_name, params, cb);
}
|
从上面的代码中可以看到,除了removeClient
以外,其他接口都在当前类中进行了调用,那removeClient
应该在什么时候调用呢?移除客户端的本质就是因为这个客户端所建立的连接已经失效了,既然是失效连接,对应的就是在处理连接断开的回调函数中进行调用,而在当前项目中,客户端失效的原因只有两种:
- 客户端连接无效被服务端断开连接
- 服务端自主断开连接
对于这两种情况,客户端都需要进行处理,第一种属于异常情况,资源会被底层的Muduo库释放,而第二种,在RPC功能客户端中,就是服务提供者下线,能知道服务端断开连接的就是Discoverer
模块,所以需要在Discoverer
模块中添加一个回调函数对象,用于执行当前的removeClient
函数,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | class Discoverer
{
public:
using ptr = std::shared_ptr<Discoverer>;
using offlineCallback_t = std::function<void(const public_data::host_addr_t&)>;
Discoverer(const requestor_rpc_framework::Requestor::ptr &requestor, const offlineCallback_t &cb)
: requestor_(requestor), offline_cb_(cb)
{
}
// 针对服务端发送的服务上线/下线请求处理
void handleOnlineOfflineServiceRequest(const base_connection::BaseConnection::ptr &con, const request_message::ServiceRequest::ptr &msg)
{
std::unique_lock<std::mutex> lock(manage_mtx_);
// ...
else if(type == public_data::ServiceOptype::Service_offline)
{
// ...
// 在服务提供者下线时需要将该服务提供者从RpcClient的连接池中移除
if(offline_cb_)
offline_cb_(msg->getHost());
}
}
private:
// ...
// 客户端离线时的处理回调
offlineCallback_t offline_cb_;
};
|
接着,需要在DiscovererClient
类中添加一个回调函数参数用于初始化Discoverer
,设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | class DiscovererClient
{
public:
using ptr = std::shared_ptr<DiscovererClient>;
DiscovererClient(const std::string &ip, const uint16_t port, rpc_registry::Discoverer::offlineCallback_t cb)
: // ...
, discoverer_(std::make_shared<rpc_client::rpc_registry::Discoverer>(requestor_, cb)) // 添加回调
, // ...
{
// ...
}
// ...
};
|
最后,在RpcClient
类中创建DiscovererClient
对象时传递removeClient
函数,结合前面的思路设计RpcClient
构造函数如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | RpcClient(bool isToDiscover, const std::string &ip, const uint16_t port)
: isToDiscover_(isToDiscover), requestor_(std::make_shared<requestor_rpc_framework::Requestor>()), dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>()),
rpc_caller_(std::make_shared<rpc_client::rpc_caller::RpcCaller>(requestor_))
{
// 处理RPC调用的回调
dispatcher_->registerService<base_message::BaseMessage>(public_data::MType::Resp_rpc, std::bind(&rpc_client::requestor_rpc_framework::Requestor::handleResponse, requestor_.get(), std::placeholders::_1, std::placeholders::_2));
if (isToDiscover_)
{
// 如果为真,说明此时需要进行服务提供者发现
// 此时的IP地址和端口表示的就是注册中心的IP地址和端口
// 同时绑定移除接口
discoverer_client_ = std::make_shared<DiscovererClient>(ip, port, std::bind(&RpcClient::removeClient, this, std::placeholders::_1));
}
else
{
// 此时就是进行RPC调用
client_ = client_factory::ClientFactory::clientCreateFactory(ip, port);
client_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
// 连接服务端
client_->connect();
}
}
|
至此,RPC功能客户端设计完成
主题功能客户端
主题功能客户端相对比较容易,因为功能已经由TopicManager
模块实现了,在主题功能客户端中只需要调用即可,但是需要注意的是,主题功能客户端除了处理每一个请求对应的响应以外,还需要对主题消息发布的请求进行处理,所以在给Dispatcher
模块注册的消息处理函数时需要注册两个处理函数,整个类的基本结构如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | class TopicClient
{
public:
using ptr = std::shared_ptr<TopicClient>;
TopicClient(const std::string &ip, const uint16_t port)
: requestor_(std::make_shared<requestor_rpc_framework::Requestor>()), dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>()), topic_manager_(std::make_shared<rpc_topic::TopicManager>(requestor_))
{
// 处理主题响应的回调
dispatcher_->registerService<base_message::BaseMessage>(public_data::MType::Resp_topic, std::bind(&rpc_client::requestor_rpc_framework::Requestor::handleResponse, requestor_.get(), std::placeholders::_1, std::placeholders::_2));
dispatcher_->registerService<request_message::TopicRequest>(public_data::MType::Req_topic, std::bind(&rpc_topic::TopicManager::handleTopicMessagePublishRequest, topic_manager_.get(), std::placeholders::_1, std::placeholders::_2));
// 此时就是进行RPC调用
client_ = client_factory::ClientFactory::clientCreateFactory(ip, port);
client_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
// 连接服务端
client_->connect();
}
private:
requestor_rpc_framework::Requestor::ptr requestor_;
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
base_client::BaseClient::ptr client_;
rpc_topic::TopicManager::ptr topic_manager_;
};
|
接着,提供基于TopicManager
模块提供的主题操作函数实现用于上层调用的函数,设计如下:
至此,主题功能客户端设计完成
服务端
主题操作模块
主题操作模块功能回顾及描述类基本结构搭建
对于服务端来说,主题操作实际上就是针对客户端的操作类型进行处理,包括主题创建、主题删除、主题订阅、主题取消订阅和主题消息发布,下面对每一个行为进行具体解释:
- 主题创建:对于主题创建来说,本质就是一个连接到主题服务端的主题客户端需要向服务器发送一个主题创建的请求,对应的服务端需要将该主题保存并管理起来
- 主题删除:对于主题删除来说,本质就是一个连接到主题服务端的主题客户端需要向服务器发送一个主题删除的请求,对应的服务端需要将该主题从管理列表中删除
- 主题订阅:对于主题订阅来说,本质就是一个连接到主题服务端的主题客户端需要向服务器发送一个主题订阅的请求,对应的服务端需要将该主题的订阅者保存并管理起来
- 主题取消订阅:对于主题取消订阅来说,本质就是一个连接到主题服务端的主题客户端需要向服务器发送一个主题取消订阅的请求,对应的服务端需要将该主题的订阅者从管理列表中删除
- 主题消息发布:对于主题消息发布来说,本质就是一个连接到主题服务端的主题客户端需要向服务器发送一个主题消息发布的请求,对应的服务端需要将该主题消息发送给所有订阅该主题的客户端
根据上面的功能描述,首先对于主题创建来说,服务端需要保存当前主题,将来还需要通过该该主题找到所有订阅的客户端进行消息发布,所以需要一个类对主题进行描述,在该类中需要有主题名称、主题订阅者的连接这两个字段,所以基本结构如下:
C++ |
---|
| struct Topic
{
std::string topic_name_; // 主题名称
std::unordered_set<base_connection::BaseConnection::ptr> subscibers_; // 所有订阅者
};
|
但是,对于订阅者来说,服务端还需要知道这个订阅者订阅了哪些主题,便于在指定主题被删除时从所有订阅该主题的客户端中移除该主题,所以还需要一个类对订阅者进行描述,在该类中需要有订阅者的连接、订阅的主题这两个字段,所以基本结构如下:
C++ |
---|
| struct Subscriber
{
using ptr = std::shared_ptr<Subscriber>;
base_connection::BaseConnection::ptr con_; // 客户端连接
std::unordered_set<std::string> topic_names_; // 当前客户端订阅的所有主题
};
|
有了对订阅者的描述,接下来修改主题描述类中管理订阅者的成员如下:
C++ |
---|
| struct Topic
{
std::string topic_name_; // 主题名称
std::unordered_set<Subscriber::ptr> subscibers_; // 所有订阅者
};
|
订阅者描述类
创建一个订阅者描述类对象时需要调用者传递当前订阅者的连接,在构造函数中对其进行初始化:
C++ |
---|
| Subscriber(const base_connection::BaseConnection::ptr &con)
: con_(con)
{
}
|
接着,该类中需要提供针对订阅的主题集合进行管理的操作函数,包括添加和删除,为了保证线程安全,需要提供一个互斥锁:
C++ |
---|
| std::mutex manage_set_mtx_; // 保证管理的线程安全
|
接着设计添加和删除接口,思路比较简单,参考下面代码:
主题描述类
创建一个主题描述类对象需要有一个参数,即主题名称,在构造函数中对其进行初始化:
C++ |
---|
| Topic(const std::string &topic_name)
: topic_name_(topic_name)
{
}
|
接着,在该类中提供添加和删除订阅者的函数,先提供一个互斥锁成员保证线程安全:
C++ |
---|
| std::mutex manage_set_mtx_; // 保证管理的线程安全
|
接着设计添加和删除接口,参考下面代码:
除了上面的两个函数以外,还需要有一个函数,该函数的作用是进行消息发布,该函数的参数是消息的内容,基本思路就是遍历所有订阅者,然后将消息发送给所有订阅者,参考下面代码:
C++ |
---|
| // 主题信息的发布
void publicMessage(const base_message::BaseMessage::ptr &msg)
{
std::unique_lock<std::mutex> lock(manage_set_mtx_);
// 遍历连接集合发送消息
for (auto &subscriber : subscibers_)
if (subscriber)
subscriber->con_->send(msg);
}
|
主题操作模块功能实现
根据前面的功能描述,在主题操作模块中主要提供五种功能,而在这五种功能中,都需要涉及对主题的管理以及对订阅者的管理,所以首先需要两张哈希表,分别用于管理「订阅者的连接与订阅者描述类对象指针的映射关系」以及「每一个主题名称与主题描述类对象指针的映射关系」,如下:
C++ |
---|
| std::unordered_map<std::string, Topic::ptr> topics_; // 主题和订阅者的管理
std::unordered_map<base_connection::BaseConnection::ptr, Subscriber::ptr> con_subscriber_; // 订阅者和连接映射
|
对应的需要提供「添加指定主题」、「删除指定主题」、「进行主题订阅」、「取消主题订阅」以及「主题消息发布」这五个函数,下面分别对这五个函数的设计思路进行介绍:
- 添加指定主题:查找消息基类中主题名称对应的主题描述类对象是否存在,如果存在就直接返回,否则就构建该主题对应的主题描述类对象,并将其添加到主题管理表中即可
- 删除指定主题:获取到消息基类中指定的服务名称,如果服务名称存在就执行删除,否则直接返回。删除逻辑为:根据指定主题找到对应的主题描述类对象,接着获取到其中的订阅者管理集合,接着删除该主题以及遍历订阅者管理集合从每一个订阅者管理的主题集合中删除该主题
- 进行主题订阅:主题订阅本质就是通知服务端有一个订阅客户端需要被管理,为了保证订阅可以成功,需要先确保具体的主题是否存在,如果主题不存在,则直接返回订阅失败,否则继续后面的逻辑,即创建一个订阅者描述类对象,接着将该订阅者添加到
con_subscriber_
,再将该订阅者添加到Topic
的subscibers_
中,并将订阅者订阅的主题添加Subscriber
的topic_names_
中完成整体的逻辑 - 取消主题订阅:取消订阅的逻辑与进行主题订阅相反,此处不再赘述
- 主题消息发布:根据指定的主题找到对应的主题描述类对象,再将消息基类的消息通过主题描述类对象的主题发布函数进行消息发布
根据上面的思路,分别设计这五个函数,参考下面代码:
有了上面的操作接口之后,接下来对于服务端来说就需要有对应处理客户端请求的函数,在该函数中实际上只需要根据客户端发送的请求类型来调用对应的处理函数即可,而对于上面五个操作来说,服务端需要返回给客户端的响应格式都是一致的,所以可以考虑将响应构建和发送单独放在一个函数中,对应地,还要有发送错误响应的函数,这两个函数的设计思路相对简单,只需要构建响应消息对象再填充主题响应需要的字段即可:
最后完善对客户端请求的处理函数:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | // 处理主题请求,注册到Dispatcher模块
void handleTopicRequest(const base_connection::BaseConnection::ptr &con, const request_message::TopicRequest::ptr &msg)
{
// 获取到主题操作类型
public_data::TopicOptype topic_optype = msg->getTopicOptype();
bool ret = true;
switch (topic_optype)
{
case public_data::TopicOptype::Topic_create:
createTopic(msg); // 1. 主题创建
break;
case public_data::TopicOptype::Topic_remove:
removeTopic(msg); // 2. 主题删除
break;
case public_data::TopicOptype::Topic_subscribe:
ret = subscribeTopic(con, msg); // 3. 主题订阅
break;
case public_data::TopicOptype::Topic_cancel:
cancelSubscribeTopic(con, msg); // 4. 主题取消订阅
break;
case public_data::TopicOptype::Topic_publish:
ret = publishTopicMessage(msg); // 5. 主题发布
break;
default:
sendErrorResponse(con, msg, public_data::RCode::RCode_invalid_opType);
break;
}
if (!ret)
{
sendErrorResponse(con, msg, public_data::RCode::RCode_not_found_topic);
return;
}
sendTopicResponse(con, msg);
}
|
除了处理请求的函数以外,服务端还需要对断开连接的订阅客户端进行资源处理,这个函数中的主要逻辑就是根据下线的连接判断是否是订阅者(即判断订阅者是否存在于con_subscriber_
中),如果是订阅者就根据订阅者关联的所有主题找到对应的Topic
,从con_subscriber_
中移除订阅者,再从这些Topic
中的subscibers_
中移除下线的订阅者。代码如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | // 订阅者下线的处理
void handleConnectionShutdown(const base_connection::BaseConnection::ptr &con)
{
// 根据下线的连接判断是否是订阅者
// 如果是订阅者就根据订阅者关联的主题找到对应的Topic
// 从Topic中的订阅者集合中移除下线的订阅者
// 从连接和订阅者映射集合中移除订阅者
Subscriber::ptr subscriber;
std::unordered_set<Topic::ptr> topics;
{
std::unique_lock<std::mutex> lock(manage_map_mtx_);
auto it_sub = con_subscriber_.find(con);
subscriber = it_sub->second;
for (auto &topic_name : it_sub->second->topic_names_)
{
auto it_topic = topics_.find(topic_name);
topics.insert(it_topic->second);
}
con_subscriber_.erase(con);
}
for (auto &topic : topics)
topic->removeSubscriber(subscriber);
}
|
至此,服务端主题操作模块设计完成
服务端封装
注册中心服务端
在服务端部分,注册中心服务端,主要就是对前面实现的注册功能进行封装,即包含ProviderDiscovererManager
的类,在该类中需要包含ProviderDiscovererManager
类对象,如下:
C++ |
---|
| class RegistryServer
{
private:
rpc_registry::ProviderDiscovererManager::ptr pd_manager_;
};
|
其次,因为本次项目中,消息分发都是靠Dispatcher
模块实现的,所以还需要添加Dispatcher
模块的对象,如下:
C++ |
---|
| class RegistryServer
{
private:
// ...
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
};
|
接着,因为是服务端,所以还需要添加服务器类对象,在本次项目中是基于Muduo库实现的一个MuduoServer
,但是为了模块的可维护性,成员使用的是服务器基类指针:
C++ |
---|
| class RegistryServer
{
private:
// ...
base_server::BaseServer::ptr server_;
};
|
接着,提供一个构造函数初始化上面提到的成员,对于server_
来说,需要设置对应的回调函数,可以考虑在构造函数体内进行初始化,而不是在初始化列表进行。在函数体内部,首先向Dispatcher
模块注册针对注册请求的处理函数,再创建MuduoServer
对象,向服务端绑定收到消息时的处理函数,因为消息是由Dispatcher
模块进行回调处理的,所以服务端只需要绑定Dispatcher
模块对应的处理函数即可,最后,注册中心除了需要处理注册/发现请求以外,还需要对离线的服务发现者或者服务提供者进行处理,所以还需要绑定处理连接断开的回调函数,即ProviderDiscovererManager
中的handleProviderConnectionShutdown
,但是直接绑定这个函数会减少一定的可维护性,所以考虑对这个函数的调用进行一层封装。整个设计如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | RegistryServer(uint16_t port)
: pd_manager_(std::make_shared<rpc_registry::ProviderDiscovererManager>()),
dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>())
{
// 向dispatcher模块中绑定回调函数
dispatcher_->registerService<request_message::ServiceRequest>(public_data::MType::Req_service, std::bind(&rpc_registry::ProviderDiscovererManager::handleRegisterDiscoverRequest, pd_manager_.get(), std::placeholders::_1, std::placeholders::_2));
// 创建服务器对象并添加消息回调
server_ = server_factory::ServerFactory::serverCreateFactory(port);
server_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
// 绑定连接断开回调
server_->setCloseCallback(std::bind(&RegistryServer::handleConnectionCallback, this, std::placeholders::_1));
}
// 提供连接断开回调的封装函数
void handleConnectionCallback(const base_connection::BaseConnection::ptr &con)
{
pd_manager_->handleProviderConnectionShutdown(con);
}
|
最后,在该类中提供用于启动服务器的函数,函数内部调用server_
的启动函数即可:
C++ |
---|
| void start()
{
server_->start();
}
|
至此,注册中心服务端设计完成
RPC功能服务端
对于RPC功能服务端来说,其除了需要处理请求客户端发送的RPC请求以外,还需要有一个服务注册客户端向注册中心服务端发起服务注册,所以在该类中可以考虑设置一个成员表示是否启用服务注册功能,如下:
C++ |
---|
| bool isToRegistry_; // 是否启用服务注册
|
对应地,当需要进行服务注册时,需要告诉注册中心自己的主机信息(包括IP地址和端口号),所以还需要添加一个成员用于保存主机信息:
C++ |
---|
| public_data::host_addr_t host_addr_; // 提供rpc服务的服务端信息
|
接着,为了可以进行服务注册,需要一个服务注册客户端成员,为了可以提供服务端功能,需要一个Dispatcher
模块的对象以及服务器类对象,与注册中心类似:
C++ |
---|
| rpc_client::main_client::RegisterClient::ptr reg_client_; // 用于服务注册的客户端
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
base_server::BaseServer::ptr server_; // 用于处理rpc服务的服务端
|
最后,还要一个成员用于处理客户端发送的RPC请求,在服务端部分使用的是RpcRouter
模块进行RPC请求处理:
C++ |
---|
| rpc_router::RpcRouter::ptr rpc_router_; // 用于处理RPC服务
|
提供一个构造函数用于初始化上面的成员,需要注意的是,只有当isToRegistry_
为true
时还需要创建用于服务注册的客户端,除了reg_client_
需要在函数体内初始化外,对于server_
也可以考虑在函数体内进行初始化,便于绑定对应的回调函数,其余成员在初始化列表初始化即可。函数体内部的逻辑与注册中心类似,只是需要额外处理reg_client_
的初始化时机,对于Dispatcher
模块来说,向其中绑定RpcRouter
模块中针对RPC请求的处理函数,接着创建服务端对象,并绑定收到消息时的处理函数即可:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | RpcServer(const public_data::host_addr_t &host_addr, bool isToRegistry = false, const public_data::host_addr_t ®istry_addr = public_data::host_addr_t())
: rpc_router_(std::make_shared<rpc_router::RpcRouter>()),
dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>()),
isToRegistry_(isToRegistry),
host_addr_(host_addr)
{
// 向dispatcher模块注册rpc处理函数
dispatcher_->registerService<request_message::RpcRequest>(public_data::MType::Req_rpc, std::bind(&rpc_router::RpcRouter::handleRpcRequest, rpc_router_.get(), std::placeholders::_1, std::placeholders::_2));
// 判断是否启用服务注册决定是否初始化服务注册客户端
if (isToRegistry_)
reg_client_ = std::make_shared<rpc_client::main_client::RegisterClient>(registry_addr.first, registry_addr.second);
// 创建服务端
server_ = server_factory::ServerFactory::serverCreateFactory(host_addr.second);
// 注册服务端的回调函数,由dispatcher提供
server_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
}
|
服务提供者既然要向注册中心发起服务注册请求就必须确保自己可以提供一些服务,而这些服务就需要上层去定义并通过接口设置到RpcRouter
中,所以需要提供一个设置服务提供者提供的服务描述的函数,在该函数中,如果开启了服务注册,则调用reg_client_
中发起注册请求的函数即可:
C++ |
---|
| // 用于注册可以提供的服务
void registryService(const rpc_router::ServiceDesc::ptr &s)
{
// 如果启用了服务注册,此时需要调用服务注册客户端的注册方法
if (isToRegistry_)
reg_client_->toRegisterService(s->getMethodName(), host_addr_);
// 向rpc_router模块中注册服务
rpc_router_->registerService(s);
}
|
最后,提供一个用于启动服务器的函数,函数内部调用server_
的启动函数即可:
C++ |
---|
| void start()
{
server_->start();
}
|
至此,RPC功能服务端设计完成
主题功能服务端
主题功能服务端与注册中心服务端设计基本一致,只是注册给Dispatcher
的请求处理函数以及连接断开的处理函数不同,其他的设置思路完全一致,参考代码如下:
C++ |
---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 | class TopicServer
{
public:
using ptr = std::shared_ptr<TopicServer>;
TopicServer(const uint16_t port)
: dispatcher_(std::make_shared<dispatcher_rpc_framework::Dispatcher>())
, topic_manager_(std::make_shared<rpc_topic::TopicManager>())
{
// 设置回调函数
dispatcher_->registerService<request_message::TopicRequest>(public_data::MType::Req_topic, std::bind(&rpc_topic::TopicManager::handleTopicRequest, topic_manager_.get(), std::placeholders::_1, std::placeholders::_2));
server_ = server_factory::ServerFactory::serverCreateFactory(port);
server_->setMessageCallback(std::bind(&dispatcher_rpc_framework::Dispatcher::executeService, dispatcher_.get(), std::placeholders::_1, std::placeholders::_2));
server_->setCloseCallback(std::bind(&TopicServer::handleConnectionCallback, this, std::placeholders::_1));
}
void start()
{
server_->start();
}
private:
// 提供连接断开回调的封装函数
void handleConnectionCallback(const base_connection::BaseConnection::ptr &con)
{
topic_manager_->handleConnectionShutdown(con);
}
private:
dispatcher_rpc_framework::Dispatcher::ptr dispatcher_;
base_server::BaseServer::ptr server_; // 用于处理主题服务的服务端
rpc_topic::TopicManager::ptr topic_manager_;
};
|
至此,主题功能服务端设计完成