Created
July 19, 2019 09:37
-
-
Save ppLorins/f30e6a2e14c3738288200b43a803b122 to your computer and use it in GitHub Desktop.
an example for grpc async c++ client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* | |
* Copyright 2015 gRPC authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
*/ | |
#include <iostream> | |
#include <memory> | |
#include <string> | |
#include <grpc++/grpc++.h> | |
#include <grpc/support/log.h> | |
#include <thread> | |
#include "helloworld.grpc.pb.h" | |
using grpc::Channel; | |
using grpc::ClientAsyncResponseReader; | |
using grpc::ClientContext; | |
using grpc::CompletionQueue; | |
using grpc::Status; | |
using helloworld::HelloRequest; | |
using helloworld::HelloReply; | |
using helloworld::Greeter; | |
class GreeterClient { | |
public: | |
explicit GreeterClient(const std::string &addr,CompletionQueue* in_cq,uint32_t count) : m_total(count) { | |
//auto shp_channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials()); | |
auto _channel_args = ::grpc::ChannelArguments(); | |
auto _tp = std::chrono::system_clock::now(); | |
std::chrono::seconds _sec = std::chrono::duration_cast<std::chrono::seconds>(_tp.time_since_epoch()); | |
std::string _key = "key_" + std::to_string(_sec.count()); | |
std::string _val = "val_" + std::to_string(_sec.count()); | |
_channel_args.SetString(_key,_val); | |
auto shp_channel = ::grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), | |
_channel_args); | |
stub_ = Greeter::NewStub(shp_channel); | |
this->cq_ = in_cq; | |
//this->cq_ = new CompletionQueue(); | |
} | |
// Assembles the client's payload and sends it to the server. | |
void SayHello(const std::string& user,int idx) { | |
// Data we are sending to the server. | |
HelloRequest request; | |
request.set_name(std::to_string(idx)); | |
// Call object to store rpc data | |
AsyncClientCall* call = new AsyncClientCall; | |
std::chrono::time_point<std::chrono::system_clock> _deadline = std::chrono::system_clock::now() | |
+ std::chrono::milliseconds(3100); | |
call->context.set_deadline(_deadline); | |
// stub_->PrepareAsyncSayHello() creates an RPC object, returning | |
// an instance to store in "call" but does not actually start the RPC | |
// Because we are using the asynchronous API, we need to hold on to | |
// the "call" instance in order to get updates on the ongoing RPC. | |
call->response_reader = stub_->PrepareAsyncSayHello(&call->context, request, cq_); | |
// StartCall initiates the RPC call | |
call->response_reader->StartCall(); | |
// Request that, upon completion of the RPC, "reply" be updated with the | |
// server's response; "status" with the indication of whether the operation | |
// was successful. Tag the request with the memory address of the call object. | |
call->response_reader->Finish(&call->reply, &call->status, (void*)call); | |
} | |
// Loop while listening for completed responses. | |
// Prints out the response from the server. | |
void AsyncCompleteRpc() { | |
//std::cout << "thread " << std::this_thread::get_id() << " in.." << std::endl;; | |
void* got_tag; | |
bool ok = false; | |
uint32_t _counter = 0; | |
auto _start = std::chrono::steady_clock::now(); | |
std::cout << "thread " << std::this_thread::get_id() << " start timer" << std::endl;; | |
// Block until the next result is available in the completion queue "cq". | |
while (cq_->Next(&got_tag, &ok)) { | |
//std::cout << "before counter:" << _counter << std::endl; | |
//std::this_thread::sleep_for(std::chrono::seconds(2)); | |
// The tag in this example is the memory location of the call object | |
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); | |
// Verify that the request was completed successfully. Note that "ok" | |
// corresponds solely to the request for updates introduced by Finish(). | |
GPR_ASSERT(ok); | |
if (!call->status.ok()) { | |
std::cout << call->status.error_code() << ",msg:" << call->status.error_message(); | |
GPR_ASSERT(false); | |
} | |
/* | |
if (call->status.ok()) | |
std::cout << "Greeter received: " << call->reply.message() << std::endl; | |
else | |
std::cout << "RPC failed" << std::endl; | |
*/ | |
// Once we're complete, deallocate the call object. | |
delete call; | |
if (++_counter >= m_total) | |
break; | |
} | |
auto _end = std::chrono::steady_clock::now(); | |
auto _ms = std::chrono::duration_cast<std::chrono::milliseconds>(_end - _start); | |
std::cout << "thread " << std::this_thread::get_id() << " inner time cost:" << _ms.count() << std::endl; | |
uint32_t _throughput = m_total / float(_ms.count()) * 1000; | |
std::cout << "thread " << std::this_thread::get_id() << " inner throughput : " << _throughput << std::endl; | |
} | |
private: | |
// struct for keeping state and data information | |
struct AsyncClientCall { | |
// Container for the data we expect from the server. | |
HelloReply reply; | |
// Context for the client. It could be used to convey extra information to | |
// the server and/or tweak certain RPC behaviors. | |
ClientContext context; | |
// Storage for the status of the RPC upon completion. | |
Status status; | |
std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader; | |
}; | |
// Out of the passed in Channel comes the stub, stored here, our view of the | |
// server's exposed services. | |
std::unique_ptr<Greeter::Stub> stub_; | |
// The producer-consumer queue we use to communicate asynchronously with the | |
// gRPC runtime. | |
CompletionQueue* cq_; | |
uint32_t m_total; | |
}; | |
int main(int argc, char** argv) { | |
uint32_t count = 50000; | |
if (argc != 4) { | |
std::cout << "Usage:./program --count=xx --thread=xx --addr=xx"; | |
return 0; | |
} | |
const char * target_str = "--count="; | |
auto p_target = std::strstr(argv[1],target_str); | |
if (p_target == nullptr) { | |
printf("para error argv[1] should be --count=xx \n"); | |
return 0; | |
} | |
p_target += std::strlen(target_str); | |
count = std::atoi(p_target); | |
uint32_t thread_num = 1; | |
target_str = "--thread="; | |
p_target = std::strstr(argv[2],target_str); | |
if (p_target == nullptr) { | |
printf("para error argv[2] should be --thread=xx \n"); | |
return 0; | |
} | |
p_target += std::strlen(target_str); | |
thread_num = std::atoi(p_target); | |
std::string _addr = "localhost:50051"; | |
target_str = "--addr="; | |
p_target = std::strstr(argv[3],target_str); | |
if (p_target == nullptr) { | |
printf("para error argv[1] should be --addr=xx \n"); | |
return 0; | |
} | |
p_target += std::strlen(target_str); | |
_addr = p_target; | |
std::cout << "req for each thread:" << count << std::endl; | |
// Instantiate the client. It requires a channel, out of which the actual RPCs | |
// are created. This channel models a connection to an endpoint (in this case, | |
// localhost at port 50051). We indicate that the channel isn't authenticated | |
// (use of InsecureChannelCredentials()). | |
CompletionQueue one_cq; | |
std::vector<GreeterClient*> _vec; | |
std::vector<std::thread*> _vec_t; | |
for (int i = 0; i < thread_num; ++i) { | |
auto * _p_client = new GreeterClient(_addr,&one_cq,count); | |
_vec.push_back(_p_client); | |
std::string user("world " + std::to_string(i)); | |
for (int j = 0; j < count; j++) | |
_p_client->SayHello(user, j); // The actual RPC call! | |
} | |
auto _start = std::chrono::steady_clock::now(); | |
for (uint32_t i = 0; i < thread_num; i++) | |
_vec_t.push_back(new std::thread(&GreeterClient::AsyncCompleteRpc, _vec[i])); | |
//std::this_thread::sleep_for(std::chrono::milliseconds(30)); | |
for (uint32_t i = 0; i < thread_num; i++) | |
_vec_t[i]->join(); | |
int _total = thread_num * count; | |
std::cout << "m_total:" << _total << std::endl; | |
auto _end = std::chrono::steady_clock::now(); | |
auto _ms = std::chrono::duration_cast<std::chrono::milliseconds>(_end - _start); | |
std::cout << "time cost:" << _ms.count() << std::endl; | |
uint32_t _throughput = _total / float(_ms.count()) * 1000; | |
std::cout << "throughput : " << _throughput << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment