Created
October 31, 2019 16:17
-
-
Save ppLorins/d72272b6f79c580c25a88a5bb3e489d0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <memory> | |
#include <iostream> | |
#include <string> | |
#include <thread> | |
#include <vector> | |
#include <list> | |
#include <random> | |
#include <atomic> | |
#include <grpc++/grpc++.h> | |
#include <grpc/support/log.h> | |
#include "helloworld.grpc.pb.h" | |
using grpc::Channel; | |
using grpc::ClientContext; | |
using grpc::ClientAsyncReaderWriter; | |
using grpc::ClientAsyncResponseReader; | |
using grpc::Server; | |
using grpc::ServerAsyncResponseWriter; | |
using grpc::ServerBuilder; | |
using grpc::ServerContext; | |
using grpc::ServerAsyncReaderWriter; | |
using grpc::ServerCompletionQueue; | |
using grpc::CompletionQueue; | |
using grpc::Status; | |
using grpc::StatusCode; | |
using helloworld::HelloRequest; | |
using helloworld::HelloReply; | |
using helloworld::Greeter; | |
int g_thread_num = 1; | |
int g_cq_num = 1; | |
int g_ins_pool = 1; | |
int g_channel_pool = 1; | |
CompletionQueue *g_client_cq; | |
std::atomic<void*>** g_instance_pool = nullptr; | |
typedef std::shared_ptr<::grpc::Channel> ShpChannel; | |
typedef std::vector<ShpChannel> VecChannel; | |
typedef std::unique_ptr<VecChannel> UptrVecChannel; | |
std::vector<std::string> g_vec_backends{"192.168.0.102:50052","192.168.0.102:50053"}; | |
struct ChannelPool { | |
ChannelPool() { | |
for (const auto &_svr : g_vec_backends) { | |
this->m_backend_pool.insert(std::pair<std::string, UptrVecChannel>(_svr,UptrVecChannel(new VecChannel()))); | |
for (int i = 0; i < g_channel_pool; ++i) | |
this->m_backend_pool[_svr]->emplace_back(grpc::CreateChannel(_svr, grpc::InsecureChannelCredentials())); | |
} | |
} | |
ShpChannel PickupOneChannel(const std::string &svr) { | |
if (m_backend_pool.find(svr) == m_backend_pool.cend()) | |
return ShpChannel(); | |
auto &_uptr_vec = m_backend_pool[svr]; | |
return _uptr_vec->operator[](this->GenerateRandom(0, g_channel_pool-1)); | |
} | |
uint32_t GenerateRandom(uint32_t from, uint32_t to) { | |
std::random_device rd; | |
std::mt19937 gen(rd()); | |
std::uniform_int_distribution<unsigned long> dis(from,to); | |
return dis(gen); | |
} | |
//Read only after initialized. | |
std::map<std::string, UptrVecChannel> m_backend_pool; | |
} *g_channel; | |
class CallDataBase { | |
public: | |
CallDataBase() {} | |
virtual void Proceed(bool ok) = 0; | |
}; | |
class CallDataServerBase :public CallDataBase { | |
public: | |
CallDataServerBase(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq){ | |
} | |
protected: | |
Greeter::AsyncService* service_; | |
ServerCompletionQueue* cq_; | |
ServerContext ctx_; | |
HelloRequest request_; | |
HelloReply reply_; | |
}; | |
class CallDataUnary; | |
class AsyncUnaryGreeterClient : public CallDataBase{ | |
enum class ClientStatus { | |
PROCESS = 1, | |
FINISH = 2 | |
}; | |
public: | |
AsyncUnaryGreeterClient(std::shared_ptr<Channel> channel, CompletionQueue *cq, CallDataUnary* pcd); | |
~AsyncUnaryGreeterClient(); | |
void AsyncSayHello(const std::string& user); | |
void Proceed(bool ok); | |
private: | |
ClientContext context_; | |
ClientStatus status_; | |
std::unique_ptr<Greeter::Stub> stub_; | |
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> stream_; | |
CompletionQueue *cq_; | |
HelloRequest request_; | |
HelloReply response_; | |
grpc::Status finish_status_ = grpc::Status::OK; | |
CallDataUnary* m_parent_call_data = nullptr; | |
}; | |
class CallDataUnary : CallDataServerBase { | |
public: | |
CallDataUnary(Greeter::AsyncService* service, ServerCompletionQueue* cq) : CallDataServerBase(service,cq),responder_(&ctx_), status_(CREATE) { | |
for (const auto &_svr : g_vec_backends) { | |
auto _shp_channel = g_channel->PickupOneChannel(_svr); | |
m_backends.emplace_back(new AsyncUnaryGreeterClient(_shp_channel, g_client_cq, this)); | |
} | |
status_ = PROCESS; | |
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); | |
this->m_done_counter.store(0); | |
//std::cout << "pos_1_1" << std::endl; | |
} | |
~CallDataUnary() { | |
//std::cout << "pos_1_2" << std::endl; | |
} | |
void NotifyOneDone() { | |
int _pre = this->m_done_counter.fetch_add(1); | |
if (_pre+1 == g_vec_backends.size()) { | |
reply_.set_message(this->request_.name()); | |
status_ = FINISH; | |
responder_.Finish(reply_, Status::OK, this); | |
} | |
} | |
void Proceed(bool ok) { | |
if (status_ == PROCESS) { | |
new CallDataUnary(service_, cq_); | |
for (auto* _p_svr : m_backends) | |
_p_svr->AsyncSayHello(request_.name()); | |
//reply_.set_message(this->request_.name()); | |
//status_ = FINISH; | |
//responder_.Finish(reply_, Status::OK, this); | |
} else { | |
GPR_ASSERT(status_ == FINISH); | |
delete this; | |
} | |
} | |
private: | |
std::list<AsyncUnaryGreeterClient*> m_backends; | |
std::atomic<int> m_done_counter; | |
ServerAsyncResponseWriter<HelloReply> responder_; | |
enum CallStatus { CREATE = 1, PROCESS, FINISH }; | |
CallStatus status_; | |
}; | |
AsyncUnaryGreeterClient::AsyncUnaryGreeterClient(std::shared_ptr<Channel> channel, CompletionQueue *cq, CallDataUnary* pcd) { | |
stub_ = Greeter::NewStub(channel); | |
m_parent_call_data = pcd; | |
cq_ = cq; | |
//std::cout << "pos_2_1" << std::endl; | |
} | |
AsyncUnaryGreeterClient::~AsyncUnaryGreeterClient() { | |
//std::cout << "pos_2_2" << std::endl; | |
} | |
void AsyncUnaryGreeterClient::AsyncSayHello(const std::string& user) { | |
request_.set_name(user); | |
status_ = ClientStatus::PROCESS; | |
stream_ = stub_->PrepareAsyncSayHello(&context_, request_, cq_); | |
stream_->StartCall(); | |
stream_->Finish(&response_, &finish_status_, this); | |
} | |
void AsyncUnaryGreeterClient::Proceed(bool ok) { | |
if (!ok) { | |
std::cout << "Unary client get non-ok result from peer:" << context_.peer() << std::endl; | |
return; | |
} | |
switch (status_) { | |
case ClientStatus::PROCESS: | |
assert(this->finish_status_.ok()); | |
m_parent_call_data->NotifyOneDone(); | |
delete this; | |
break; | |
default: | |
std::cerr << "ClientUnexpected status:" << int(status_) << std::endl; | |
assert(false); | |
} | |
} | |
class ServerImpl final { | |
public: | |
~ServerImpl() { | |
server_->Shutdown(); | |
for (const auto& _cq : m_cq) | |
_cq->Shutdown(); | |
} | |
void Run() { | |
std::string server_address("0.0.0.0:50051"); | |
ServerBuilder builder; | |
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); | |
builder.RegisterService(&service_); | |
for (int i = 0; i < g_cq_num; ++i) | |
m_cq.emplace_back(builder.AddCompletionQueue()); | |
server_ = builder.BuildAndStart(); | |
std::cout << "Server listening on " << server_address << std::endl; | |
//Fidxed #threads polling on the g_client_cq | |
auto _lambda = [&](CompletionQueue* cq) { | |
void* tag; | |
bool ok; | |
while (true) { | |
GPR_ASSERT(cq->Next(&tag, &ok)); | |
CallDataBase* _p_ins = (CallDataBase*)tag; | |
_p_ins->Proceed(ok); | |
} | |
}; | |
std::vector<std::thread*> _vec_client_threads; | |
for (int i = 0; i < 2; ++i) | |
_vec_client_threads.emplace_back(new std::thread(_lambda,g_client_cq)); | |
std::vector<std::thread*> _vec_threads; | |
for (int i = 0; i < g_thread_num; ++i) { | |
int _cq_idx = i % g_cq_num; | |
for (int j = 0; j < g_ins_pool; ++j) | |
new CallDataUnary(&service_, m_cq[_cq_idx].get()); | |
_vec_threads.emplace_back(new std::thread(&ServerImpl::HandleRpcs, this, _cq_idx)); | |
} | |
std::cout << g_thread_num << " working aysnc threads spawned" << std::endl; | |
for (const auto& _t : _vec_threads) | |
_t->join(); | |
} | |
private: | |
void HandleRpcs(int cq_idx) { | |
void* tag; | |
bool ok; | |
while (true) { | |
GPR_ASSERT(m_cq[cq_idx]->Next(&tag, &ok)); | |
CallDataBase* _p_ins = (CallDataBase*)tag; | |
_p_ins->Proceed(ok); | |
} | |
} | |
std::vector<std::unique_ptr<ServerCompletionQueue>> m_cq; | |
Greeter::AsyncService service_; | |
std::unique_ptr<Server> server_; | |
}; | |
const char* ParseCmdPara( char* argv,const char* para) { | |
auto p_target = std::strstr(argv,para); | |
if (p_target == nullptr) { | |
printf("para error argv[%s] should be %s \n",argv,para); | |
return nullptr; | |
} | |
p_target += std::strlen(para); | |
return p_target; | |
} | |
int main(int argc, char** argv) { | |
if (argc != 5) { | |
std::cout << "Usage:./program --thread=xx --cq=xx --pool=xx --channel_pool=xx"; | |
return 0; | |
} | |
g_client_cq = new CompletionQueue(); | |
g_thread_num = std::atoi(ParseCmdPara(argv[1],"--thread=")); | |
g_cq_num = std::atoi(ParseCmdPara(argv[2],"--cq=")); | |
g_ins_pool = std::atoi(ParseCmdPara(argv[3],"--pool=")); | |
g_channel_pool = std::atoi(ParseCmdPara(argv[4],"--channel_pool=")); | |
g_channel = new ChannelPool(); | |
ServerImpl server; | |
server.Run(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment