Created
October 31, 2023 20:54
-
-
Save sergiitk/a4b58a2fb958191638750b408b1ad421 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hello; | |
import io.grpc.ManagedChannel; | |
import io.grpc.ManagedChannelBuilder; | |
import io.grpc.stub.ClientCallStreamObserver; | |
import io.grpc.stub.StreamObserver; | |
import java.util.concurrent.TimeUnit; | |
public class HelloStreamClient { | |
private final ManagedChannel channel; | |
private final ClientCallStreamObserver<test.HelloRequest> clientCallStreamObserver; | |
public HelloStreamClient(String host, int port) { | |
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); | |
clientCallStreamObserver = (ClientCallStreamObserver<test.HelloRequest>) HelloStreamGrpc | |
.newStub(channel) | |
.sayHelloStream( | |
new StreamObserver<>() { | |
@Override | |
public void onNext(test.HelloResponse value) { | |
} | |
@Override | |
public void onError(Throwable t) { | |
} | |
@Override | |
public void onCompleted() { | |
} | |
}); | |
} | |
public void shutdown() throws InterruptedException { | |
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); | |
} | |
public void sayHello() { | |
int runCount = 10000000; | |
for (int i = 0; i < runCount; ++i) { | |
clientCallStreamObserver.onNext(test.HelloRequest.newBuilder().setTraceId(i).build()); | |
} | |
} | |
public static void main(String[] args) throws InterruptedException { | |
HelloStreamClient client = new HelloStreamClient("127.0.0.1", 50051); | |
client.sayHello(); | |
try { | |
Thread.sleep(1000000); | |
} catch (Exception e) { | |
} | |
client.shutdown(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hello; | |
import io.grpc.Server; | |
import io.grpc.ServerBuilder; | |
import io.grpc.stub.StreamObserver; | |
import java.io.IOException; | |
import java.util.concurrent.TimeUnit; | |
public class HelloStreamServer { | |
private Server server; | |
private void start() throws IOException { | |
int port = 50051; | |
server = ServerBuilder.forPort(port) | |
.addService(new HelloStreamImpl()) | |
.build() | |
.start(); | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
try { | |
stop(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(System.err); | |
} | |
})); | |
} | |
private void stop() throws InterruptedException { | |
if (server != null) { | |
server.shutdown().awaitTermination(30, TimeUnit.SECONDS); | |
} | |
} | |
private void blockUntilShutdown() throws InterruptedException { | |
if (server != null) { | |
server.awaitTermination(); | |
} | |
} | |
public static void main(String[] args) throws IOException, InterruptedException { | |
final HelloStreamServer server = new HelloStreamServer(); | |
server.start(); | |
server.blockUntilShutdown(); | |
} | |
private class HelloStreamImpl extends HelloStreamGrpc.HelloStreamImplBase { | |
@Override | |
public StreamObserver<test.HelloRequest> sayHelloStream(StreamObserver<test.HelloResponse> responseStreamObserver) { | |
return new StreamObserver<>() { | |
@Override | |
public void onNext(test.HelloRequest value) { | |
//NOTE: here DO NOT response anything to reproduce the client side memory leak problem | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
} | |
@Override | |
public void onCompleted() { | |
} | |
}; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hello.proto: | |
syntax = "proto3"; | |
package hello; | |
option java_package = "hello"; | |
option java_outer_classname = "test"; | |
message HelloRequest { | |
optional int64 traceId = 1; | |
} | |
service HelloStream { | |
rpc SayHelloStream (stream HelloRequest) returns (stream HelloResponse) {} | |
} | |
message HelloResponse { | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment