Skip to content

Instantly share code, notes, and snippets.

@benravago
Last active May 19, 2021 00:50
Show Gist options
  • Select an option

  • Save benravago/2392a26da8777484bbac93ca69f7a3c9 to your computer and use it in GitHub Desktop.

Select an option

Save benravago/2392a26da8777484bbac93ca69f7a3c9 to your computer and use it in GitHub Desktop.
Yet Another NIO Echo Client/Server Example

This NIO example uses the new(ish) Selector.select(Consumer<?>) api which seems to solve a lot of the spurious wakeup problems in previous examples.

The example also demonstrates performing channel read/write functions asynchronously via SelectionKey interestOps() and attach() manipulation.

package nio;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.CountDownLatch;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import static java.nio.channels.SelectionKey.*;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import static java.net.StandardSocketOptions.*;
class EchoClient {
SocketChannel channel;
Selector selector;
CountDownLatch isConnected = new CountDownLatch(1);
SocketAddress address() throws IOException {
return channel.getLocalAddress();
}
EchoClient open(SocketAddress server) throws IOException {
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.setOption(SO_RCVBUF,0); // 0 = 1152
channel.setOption(SO_SNDBUF,0); // 0 = 2304
channel.connect(server);
selector = Selector.open();
channel.register(selector,OP_CONNECT);
return this;
}
long timeout = 1_000;
void run() {
try {
System.out.println("c "+Thread.currentThread());
while (selector.isOpen()) {
var n = selector.select(this::accept,timeout);
if (n < 1) System.out.println("c timeout");
}
System.out.println("c done");
} catch (IOException e) { throw new UncheckedIOException(e); }
}
void accept(SelectionKey key) {
try {
if (key.isConnectable()) {
connected(key);
}
if (key.isWritable()) {
sendRequest(key);
}
if (key.isReadable()) {
receiveResponse(key);
}
} catch (IOException e) { throw new UncheckedIOException(e); }
}
void connected(SelectionKey key) throws IOException {
assert channel == key.channel();
channel.finishConnect();
isConnected.countDown();
note(channel.socket());
key.interestOps(0); // initial state; passive
}
void sendRequest(SelectionKey key) throws IOException {
var buffer = (ByteBuffer) key.attachment();
channel.write(buffer); // assume one write is sufficient
buffer.clear(); // reuse buffer
key.interestOps(OP_READ); // get ready for the response
}
void receiveResponse(SelectionKey key) throws IOException {
var buffer = (ByteBuffer) key.attachment();
channel.read(buffer); // assume one read is sufficient
synchronized (buffer) {
buffer.notify(); // signal request/response cycle is complete
}
key.attach(null); // release the buffer
key.interestOps(0); // go back to passive state
}
String echoMessage(String text) throws Exception {
// ensure connected state
isConnected.await();
// allocate a transaction object
var buffer = ByteBuffer.allocate(256);
// fill in request data
buffer.put(text.getBytes());
buffer.flip();
// schedule the transaction
var key = channel.keyFor(selector);
key.attach(buffer);
key.interestOps(OP_WRITE); // indicate a request to send
selector.wakeup(); // poke the selector rather than wait for a timeout
// wait for completion
synchronized(buffer) {
buffer.wait();
}
// retrieve response data
buffer.flip();
var bytes = new byte[buffer.remaining()];
buffer.get(bytes);
// return what should be text.toUpperCase()
return new String(bytes);
}
void note(Socket socket) throws IOException {
// defaults: rcvbuf sndbuf
// unconnected 65536 8192
// connected 65536 1313280
var rcvbuf = socket.getReceiveBufferSize();
var sndbuf = socket.getSendBufferSize();
System.out.println("c "+Thread.currentThread().getName()+' '+socket.toString()+' '+rcvbuf+' '+sndbuf);
}
EchoClient start(SocketAddress server) throws IOException {
open(server);
new Thread(this::run).start();
return this;
}
void stop() throws IOException {
channel.close();
selector.close();
}
static void main(String... args) throws Exception {
var server = args.length < 1 ? null : new InetSocketAddress(args[0],Integer.parseInt(args[1]));
var client = new EchoClient().start(server);
var in = new BufferedReader(new InputStreamReader(System.in));
String req;
while ((req = in.readLine()) != null) {
req = req.trim();
var resp = client.echoMessage(req.trim());
System.out.println("~ " + req + " -> " + resp);
}
client.stop();
System.out.println("done");
}
}
package nio;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import static java.nio.channels.SelectionKey.*;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.InetSocketAddress;
import static java.net.StandardSocketOptions.*;
class EchoServer {
ServerSocketChannel channel;
Selector selector;
SocketAddress address() throws IOException {
return channel.getLocalAddress();
}
EchoServer open(SocketAddress address) throws IOException {
channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.setOption(SO_RCVBUF,0); // 0 = 1152
channel.bind(address);
note(channel.socket());
selector = Selector.open();
channel.register(selector,OP_ACCEPT);
return this;
}
long timeout = 1_000;
void run() {
try {
System.out.println("s "+Thread.currentThread());
while (selector.isOpen()) {
var n = selector.select(this::accept,timeout);
if (n < 1) System.out.println("s timeout");
}
System.out.println("s done");
} catch (IOException e) { throw new UncheckedIOException(e); }
}
void accept(SelectionKey key) {
try {
if (key.isAcceptable()) {
addClient(key);
}
if (key.isReadable()) {
receiveRequest(key);
}
if (key.isWritable()) {
sendResponse(key);
}
} catch (IOException e) { throw new UncheckedIOException(e); }
}
void addClient(SelectionKey key) throws IOException {
var server = (ServerSocketChannel) key.channel();
var client = server.accept();
client.configureBlocking(false);
client.setOption(SO_RCVBUF,0); // 0 = 1152
client.setOption(SO_SNDBUF,0); // 0 = 2304
note(client.socket());
client.register(selector,OP_READ); // initial state; wait for request
}
void receiveRequest(SelectionKey key) throws IOException {
var client = (SocketChannel) key.channel();
var buffer = ByteBuffer.allocate(256);
// retrieve request data
client.read(buffer); // assume one read is sufficient
buffer.flip();
// process the request
echoMessage(buffer);
buffer.flip();
// schedule the response
key.attach(buffer);
key.interestOps(OP_WRITE); // change state
}
void sendResponse(SelectionKey key) throws IOException {
var client = (SocketChannel) key.channel();
var buffer = (ByteBuffer) key.attachment();
client.write(buffer); // assume one write is sufficient
key.attach(null); // release the buffer
key.interestOps(OP_READ); // wait for next request
}
void echoMessage(ByteBuffer buffer) {
// load input bytes
var bytes = new byte[buffer.remaining()];
buffer.get(bytes);
// transform
var string = new String(bytes);
System.out.println("s -> "+string);
bytes = new String(bytes).toUpperCase().getBytes();
// store output bytes
buffer.clear();
buffer.put(bytes);
}
void note(ServerSocket socket) throws IOException {
// defaults: rcvbuf = 65536
var rcvbuf = socket.getReceiveBufferSize();
System.out.println("s "+socket.toString()+' '+rcvbuf);
}
void note(Socket socket) throws IOException {
var rcvbuf = socket.getReceiveBufferSize(); // default = 1152
var sndbuf = socket.getSendBufferSize(); // default = 1313280
System.out.println("s "+socket.toString()+' '+rcvbuf+' '+sndbuf);
}
EchoServer start() throws IOException {
open(null);
new Thread(this::run).start();
return this;
}
void stop() throws IOException {
channel.close();
selector.close();
}
static void main(String... args) throws Exception {
var address = args.length < 1 ? null : new InetSocketAddress(args[0],Integer.parseInt(args[1]));
new EchoServer().open(address).run();
}
}
package nio;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import static org.junit.jupiter.api.Assertions.*;
class EchoTest {
EchoServer server;
EchoClient client;
@BeforeEach
void setUp() throws Exception {
server = new EchoServer().start();
client = new EchoClient().start(server.address());
}
@Test
void test() throws Exception {
var hello = client.echoMessage("hello");
var world = client.echoMessage("world");
System.out.println("c <- "+hello+' '+world);
assertEquals("HELLO",hello);
assertEquals("WORLD",world);
}
@AfterEach
void tearDown() throws Exception {
server.stop();
client.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment