Last active
October 10, 2018 16:06
-
-
Save prabirshrestha/6324055 to your computer and use it in GitHub Desktop.
OkHttpClient+RxJava
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.rx_okhttp; | |
import com.squareup.okhttp.OkHttpClient; | |
import org.apache.http.*; | |
import org.apache.http.entity.InputStreamEntity; | |
import org.apache.http.message.BasicHeader; | |
import org.apache.http.message.BasicHttpResponse; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.concurrency.Schedulers; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Func1; | |
import java.io.InputStream; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import static org.apache.http.HttpVersion.HTTP_1_1; | |
public class HttpClient { | |
private final OkHttpClient httpClient; | |
private final Scheduler httpScheduler; | |
public HttpClient(OkHttpClient httpClient, Scheduler httpScheduler) { | |
this.httpClient = httpClient; | |
this.httpScheduler = httpScheduler; | |
} | |
public HttpClient(OkHttpClient httpClient) { | |
this(httpClient, Schedulers.threadPoolForIO()); | |
} | |
public HttpClient() { | |
this(new OkHttpClient(), Schedulers.threadPoolForIO()); | |
} | |
public Observable<HttpResponse> execute(final HttpRequest request) { | |
return execute(request, this.httpScheduler); | |
} | |
public Observable<HttpResponse> execute(final HttpRequest request, final Scheduler httpScheduler) { | |
return Observable.create(new Func1<Observer<HttpResponse>, Subscription>() { | |
@Override | |
public Subscription call(final Observer<HttpResponse> observer) { | |
try { | |
RequestLine requestLine = request.getRequestLine(); | |
URL url = new URL(requestLine.getUri()); | |
final HttpURLConnection connection = httpClient.open(url); | |
connection.setRequestMethod(requestLine.getMethod()); | |
for (Header header : request.getAllHeaders()) { | |
connection.addRequestProperty(header.getName(), header.getValue()); | |
} | |
httpScheduler.schedule(new Action0() { | |
@Override | |
public void call() { | |
try { | |
// Stream the request body. | |
if (request instanceof HttpEntityEnclosingRequest) { | |
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); | |
if (entity != null) { | |
connection.setDoOutput(true); | |
Header type = entity.getContentType(); | |
if (type != null) { | |
connection.addRequestProperty(type.getName(), type.getValue()); | |
} | |
Header encoding = entity.getContentEncoding(); | |
if (encoding != null) { | |
connection.addRequestProperty(encoding.getName(), encoding.getValue()); | |
} | |
if (entity.isChunked() || entity.getContentLength() < 0) { | |
connection.setChunkedStreamingMode(0); | |
} else if (entity.getContentLength() <= 8192) { | |
// Buffer short, fixed-length request bodies. This costs memory, but permits the request | |
// to be transparently retried if there is a connection failure. | |
connection.addRequestProperty("Content-Length", Long.toString(entity.getContentLength())); | |
} else { | |
connection.setFixedLengthStreamingMode((int) entity.getContentLength()); | |
} | |
entity.writeTo(connection.getOutputStream()); | |
} | |
} | |
// Read the response headers. | |
int responseCode = connection.getResponseCode(); | |
String message = connection.getResponseMessage(); | |
BasicHttpResponse response = new BasicHttpResponse(HTTP_1_1, responseCode, message); | |
// Get the response body ready to stream. | |
InputStream responseBody = | |
responseCode < HttpURLConnection.HTTP_BAD_REQUEST | |
? connection.getInputStream() | |
: connection.getErrorStream(); | |
InputStreamEntity entity = new InputStreamEntity(responseBody, connection.getContentLength()); | |
for (int i = 0; true; i++) { | |
String name = connection.getHeaderFieldKey(i); | |
if (name == null) { | |
break; | |
} | |
BasicHeader header = new BasicHeader(name, connection.getHeaderField(i)); | |
response.addHeader(header); | |
if (name.equalsIgnoreCase("Content-Type")) { | |
entity.setContentType(header); | |
} else if (name.equalsIgnoreCase("Content-Encoding")) { | |
entity.setContentEncoding(header); | |
} | |
} | |
response.setEntity(entity); | |
observer.onNext(response); | |
} | |
catch (Exception e) { | |
observer.onError(e); | |
} | |
} | |
}); | |
return Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
connection.disconnect(); | |
} | |
}); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
return Subscriptions.empty(); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment