Created
October 12, 2018 06:52
-
-
Save kiwiandroiddev/6ab274f41ffc5d2a850e096f227896a8 to your computer and use it in GitHub Desktop.
Converts OkHttp's callback interface for WebSockets into an RxJava Observable of events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observable | |
import okhttp3.* | |
import okio.ByteString | |
fun OkHttpClient.newWebSocketObservable(serverUrl: String): Observable<WebSocketEvent> { | |
val request = Request.Builder().url(serverUrl).build() | |
return newWebSocketObservable(request) | |
} | |
fun OkHttpClient.newWebSocketObservable(request: Request): Observable<WebSocketEvent> { | |
return Observable.create<WebSocketEvent> { emitter -> | |
newWebSocket(request, object : WebSocketListener() { | |
override fun onOpen(webSocket: WebSocket, response: Response) { | |
emitter.onNext(WebSocketEvent.OnOpen(webSocket, response)) | |
} | |
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { | |
emitter.onNext(WebSocketEvent.OnFailure(webSocket, t, response)) | |
emitter.onComplete() | |
} | |
override fun onMessage(webSocket: WebSocket, text: String) { | |
emitter.onNext(WebSocketEvent.OnTextMessage(webSocket, text)) | |
} | |
override fun onMessage(webSocket: WebSocket, bytes: ByteString) { | |
emitter.onNext(WebSocketEvent.OnBytesMessage(webSocket, bytes)) | |
} | |
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { | |
emitter.onNext(WebSocketEvent.OnClosing(webSocket, code, reason)) | |
} | |
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { | |
emitter.onNext(WebSocketEvent.OnClosed(webSocket, code, reason)) | |
emitter.onComplete() | |
} | |
}) | |
} | |
} | |
sealed class WebSocketEvent(open val webSocket: WebSocket) { | |
data class OnOpen(override val webSocket: WebSocket, | |
val response: Response) : WebSocketEvent(webSocket) | |
data class OnFailure(override val webSocket: WebSocket, | |
val t: Throwable, | |
val response: Response?) : WebSocketEvent(webSocket) | |
data class OnTextMessage(override val webSocket: WebSocket, | |
val text: String) : WebSocketEvent(webSocket) | |
data class OnBytesMessage(override val webSocket: WebSocket, | |
val bytes: ByteString) : WebSocketEvent(webSocket) | |
data class OnClosing(override val webSocket: WebSocket, | |
val code: Int, | |
val reason: String) : WebSocketEvent(webSocket) | |
data class OnClosed(override val webSocket: WebSocket, | |
val code: Int, | |
val reason: String) : WebSocketEvent(webSocket) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment