Last active
February 1, 2017 23:10
-
-
Save NiteshKant/9bf665902815a1209bdda228ebbbd866 to your computer and use it in GitHub Desktop.
rs-java-issue-229
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.InetSocketAddress; | |
import java.net.SocketAddress; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import io.reactivesocket.ReactiveSocket; | |
import io.reactivesocket.client.KeepAliveProvider; | |
import io.reactivesocket.client.ReactiveSocketClient; | |
import io.reactivesocket.client.SetupProvider; | |
import io.reactivesocket.frame.ByteBufferUtil; | |
import io.reactivesocket.transport.tcp.client.TcpTransportClient; | |
import io.reactivesocket.util.PayloadImpl; | |
import io.reactivex.Flowable; | |
import io.reactivex.functions.Consumer; | |
import io.reactivex.schedulers.Schedulers; | |
public class Client { | |
public static class Performance { | |
final String url; | |
final int count; | |
final double avgSize; | |
public Performance(String url, int count, double avgSize) { | |
super(); | |
this.url = url; | |
this.count = count; | |
this.avgSize = avgSize; | |
} | |
public String getUrl() { | |
return url; | |
} | |
public int getCount() { | |
return count; | |
} | |
public double getAvgSize() { | |
return avgSize; | |
} | |
@Override | |
public String toString() { | |
return "Performance [url=" + url + ", count=" + count + ", avgSize=" + avgSize + "]"; | |
} | |
} | |
public static Flowable<Performance> subscribe(ReactiveSocket socket, String request) { | |
final AtomicLong received = new AtomicLong(); | |
final AtomicLong requested = new AtomicLong(); | |
return Flowable.fromPublisher( | |
socket.requestSubscription(new PayloadImpl(request))) | |
.map(payload -> payload.getData()) | |
.map(ByteBufferUtil::toUtf8String) | |
.doOnRequest(n -> requested.addAndGet(n)) | |
.doOnNext(s -> received.incrementAndGet()) | |
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) // move one down to "fix" it | |
.buffer(128) | |
.map(l-> { | |
double avgSize = l | |
.stream() | |
.mapToInt(String::length) | |
.average() | |
.orElse(0.0); | |
return new Performance(request, l.size(), avgSize); | |
}) | |
.doFinally(() -> System.out.println("Requested: " + requested.get() + ", Received: " + received.get())); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
int port = 9000; | |
String host = "localhost"; | |
SocketAddress address = new InetSocketAddress(host, port); | |
ReactiveSocket socket = Flowable.fromPublisher(ReactiveSocketClient.create(TcpTransportClient.create(address), | |
SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease()).connect()).blockingFirst(); | |
for (int i = 0; i < 1; i++) { | |
subscribe(socket, "localhost:4096:Object"+i) | |
.forEach(System.out::println); | |
} | |
Thread.sleep(10000000); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#### Server Output | |
01 Feb 2017 15:02:08,546 INFO [main] - Rx server started at port: 9000 | |
01 Feb 2017 15:02:15,777 INFO [rxnetty-nio-eventloop-1-2] - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame | |
Got request for [localhost, 4096, Object0] | |
Requested: 13986. Sent: 13986 | |
#### Client Output | |
01 Feb 2017 15:02:15,529 INFO [main] - Creating thread pooled named io.reactivesocket.frame.UnpooledFrame | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
Performance [url=localhost:4096:Object0, count=128, avgSize=4096.0] | |
io.reactivex.exceptions.MissingBackpressureException: Queue is full?! | |
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113) | |
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:89) | |
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54) | |
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295) | |
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71) | |
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107) | |
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) | |
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57) | |
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91) | |
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189) | |
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) | |
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) | |
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) | |
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) | |
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) | |
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) | |
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) | |
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) | |
at java.lang.Thread.run(Thread.java:745) | |
Exception in thread "pool-2-thread-1" io.reactivex.exceptions.MissingBackpressureException: Queue is full?! | |
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:113) | |
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:89) | |
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:67) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:166) | |
at io.reactivesocket.internal.RemoteReceiver.onNext(RemoteReceiver.java:54) | |
at io.reactivesocket.ClientReactiveSocket.handleFrame(ClientReactiveSocket.java:295) | |
at io.reactivesocket.ClientReactiveSocket.handleIncomingFrames(ClientReactiveSocket.java:232) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:73) | |
at io.reactivesocket.reactivestreams.extensions.internal.publishers.DoOnEventPublisher$1.onNext(DoOnEventPublisher.java:75) | |
at io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription.safeOnNext(ValidatingSubscription.java:94) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:107) | |
at io.reactivesocket.internal.ClientServerInputMultiplexer$SourceInput.onNext(ClientServerInputMultiplexer.java:71) | |
at rx.internal.reactivestreams.PublisherAdapter$1.onNext(PublisherAdapter.java:107) | |
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) | |
at rx.internal.operators.OperatorSerialize$1.onNext(OperatorSerialize.java:57) | |
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91) | |
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373) | |
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189) | |
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec.channelRead(ReactiveSocketFrameCodec.java:43) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) | |
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) | |
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) | |
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) | |
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) | |
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) | |
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) | |
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) | |
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) | |
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) | |
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) | |
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) | |
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) | |
at java.lang.Thread.run(Thread.java:745) | |
__Requested: 13952, Received: 13910__ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.util.Arrays; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.reactivestreams.Publisher; | |
import io.reactivesocket.AbstractReactiveSocket; | |
import io.reactivesocket.Payload; | |
import io.reactivesocket.frame.ByteBufferUtil; | |
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket; | |
import io.reactivesocket.server.ReactiveSocketServer; | |
import io.reactivesocket.transport.tcp.server.TcpTransportServer; | |
import io.reactivesocket.util.PayloadImpl; | |
import io.reactivex.Flowable; | |
import org.slf4j.event.Level; | |
public class Server { | |
public static void main(String[] args) throws IOException { | |
int port = 9000; | |
ReactiveSocketServer.create(TcpTransportServer.create(port)) | |
.start((setupPayload, reactiveSocket) -> { | |
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { | |
@Override | |
public Publisher<Payload> requestResponse(Payload p) { | |
return Flowable.just(p); | |
} | |
@Override | |
public Publisher<Payload> requestSubscription(Payload p) { | |
String[] request = ByteBufferUtil.toUtf8String(p.getData()).split(":"); | |
int size = Integer.parseInt(request[1]); | |
System.out.println("Got request for " + Arrays.toString(request)); | |
final byte[] buff = new byte[size]; | |
Arrays.fill(buff, (byte)65); | |
final AtomicLong requested = new AtomicLong(); | |
final AtomicLong sent = new AtomicLong(); | |
Flowable.interval(30, TimeUnit.SECONDS) | |
.doOnNext(aLong -> System.out.println("Requested: " + requested.get() | |
+ ". Sent: " + sent.get())) | |
.ignoreElements() | |
.ambWith(Flowable.fromPublisher(reactiveSocket.onClose()).ignoreElements()) | |
.doFinally(() -> System.out.println("Requested: " + requested.get() | |
+ ". Sent: " + sent.get())) | |
.subscribe(); | |
return Flowable.range(1, Integer.MAX_VALUE) | |
.doOnRequest(n -> requested.addAndGet(n)) | |
.map(integer -> { | |
sent.incrementAndGet(); | |
return new PayloadImpl(buff); | |
}); | |
} | |
}); | |
}); | |
new BufferedReader(new InputStreamReader(System.in)).readLine(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment