Last active
October 21, 2018 08:21
-
-
Save bingli224/94035a133c178114b8f77423a742ddf3 to your computer and use it in GitHub Desktop.
Socket+Selector+Lambda in Kotlin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Socket+Selector+Lambda in Kotlin | |
* | |
* Further issues: | |
* When the receiver takes action fast enough before data is fully delivered through SocketChannel, | |
* especially the original one is likely to be bigger than buffer. | |
* | |
* Also, what if the register() is repeated again? | |
* | |
* @author BingLi224 | |
* @version 2018.10.21 | |
*/ | |
import java.nio.ByteBuffer | |
import java.nio.channels.SocketChannel | |
import java.nio.channels.ServerSocketChannel | |
import java.nio.channels.Selector | |
import java.nio.channels.SelectionKey | |
import java.net.InetSocketAddress | |
import java.util.stream.StreamSupport | |
/** | |
* Listener connects to Speakers. | |
* | |
* @constructor Creates the Listener with a thread that is ready for reading new data from connected Speakers. | |
*/ | |
class Listener { | |
// initialize selector | |
val selector = Selector.open ( ) | |
init { | |
// create the thread for listening | |
Thread { | |
while ( true ) { | |
selector.select ( ) // check for the new incoming data from channels | |
val selectedKeys = selector.selectedKeys ( ) | |
selectedKeys.parallelStream ( ) | |
.filter { it.isReadable ( ) && // expect to read only | |
it.channel ( ) is SocketChannel } // expect the SocketChannel, casted from SelectableChannel | |
.map { it.channel ( ) as SocketChannel } // cast as SocketChannel | |
.forEach { | |
val input = ByteBuffer.allocate ( 1024 ) | |
read@ while ( true ) { | |
// read the data, got the new size of data | |
val len = it.read ( input ) | |
when { | |
len > 0 -> { | |
// buffer for extract data | |
val data = ByteArray ( len ) | |
input.flip ( ) // back to the beginning of buffer | |
input.get ( data ) // convert the data to byte array | |
println ( "[$this] got: ${String ( data )}" ) | |
input.flip ( ) // reset the buffer to put() next time | |
} | |
len == 0 -> break@read // len = 0 | |
// end of data, so break out from reading loop | |
else -> { | |
// disconnected, | |
// so unregister from selector | |
it.keyFor ( selector ) | |
.cancel ( ) | |
break@read // break out from reading loop | |
} | |
} | |
} | |
} | |
// clean up the list of selected keys to check the next time | |
// if not reset the list, the channel without new data is still in the next list | |
selectedKeys.clear ( ) | |
} | |
}.start ( ) | |
} | |
/** | |
* Connects to Speaker with given socket port. | |
* | |
* @param id Socket port of the Speaker. | |
*/ | |
fun addSpeaker ( id : Int ) { | |
// create the socket channel to Speaker channel | |
var socketChannel = SocketChannel.open ( | |
InetSocketAddress ( "localhost", id ) | |
) | |
socketChannel.configureBlocking ( false ) | |
selector.wakeup ( ) // interrupt the selector.select() | |
// register the socket channel to the selector | |
socketChannel.register ( selector, SelectionKey.OP_READ ) | |
println ( "Listener [$this] connected to [$id]" ) | |
} | |
} | |
/** | |
* Speaker can send "words" to the connected Listeners. | |
* | |
* @property id The socket port for listening. | |
* @property words The data for sending through SocketChannel to the Listener. | |
* @constructor Creates a Speaker with threads that run behine to accept the new connections, and keep sending data to connected Listeners | |
*/ | |
class Speaker ( id : Int, words : String ) { | |
val serverSocketChannel : ServerSocketChannel | |
val selector : Selector = Selector.open ( ) | |
init { | |
serverSocketChannel = ServerSocketChannel.open ( ) | |
serverSocketChannel.bind ( | |
InetSocketAddress ( "localhost", id ) | |
) | |
println ( "Created Speaker: ${id}" ) | |
// create the thread for listening | |
Thread { | |
while ( true ) { | |
val listener = serverSocketChannel.accept ( ) | |
selector.wakeup ( ) | |
listener.configureBlocking ( false ) | |
listener.register ( selector, SelectionKey.OP_WRITE ) // remember the listener | |
} | |
}.start ( ) | |
Thread { | |
while ( true ) { | |
Thread.sleep ( 200L + ( 0..200 ).shuffled ( ).first ( ) ) | |
// send the words to the listeners | |
selector.selectNow ( ) | |
val selectedKeys = selector.selectedKeys ( ) | |
selectedKeys.parallelStream ( ) | |
.filter { it.isWritable ( ) && // expect to get new listener | |
it.channel ( ) is SocketChannel } // expect the SocketChannel, casted from SelectableChannel | |
.map { it.channel ( ) as SocketChannel } // cast as SocketChannel | |
.forEach { | |
val output = ByteBuffer.allocate ( words.length ) | |
output.put ( words.toByteArray ( ) ) | |
output.flip ( ) // reset the buffer position to forward data | |
println ( "[$id] say: $words" ) | |
it.write ( output ) | |
} | |
selectedKeys.clear ( ) | |
} | |
}.start ( ) | |
} | |
} | |
fun main ( argv : Array <String> ){ | |
Speaker ( 44444, "Hi" ) | |
Speaker ( 44445, ":)" ) | |
Speaker ( 44446, "hmm" ) | |
val listener1 = Listener ( ) | |
listener1.addSpeaker ( 44444 ) | |
listener1.addSpeaker ( 44445 ) | |
listener1.addSpeaker ( 44446 ) | |
val listener2 = Listener ( ) | |
listener2.addSpeaker ( 44444 ) | |
listener2.addSpeaker ( 44445 ) | |
listener2.addSpeaker ( 44446 ) | |
val listener3 = Listener ( ) | |
listener3.addSpeaker ( 44444 ) | |
listener3.addSpeaker ( 44445 ) | |
listener3.addSpeaker ( 44446 ) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment