Last active
October 10, 2018 16:06
-
-
Save prabirshrestha/6324055 to your computer and use it in GitHub Desktop.
OkHttpClient+RxJava
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.rx_okhttp; | |
import com.squareup.okhttp.OkHttpClient; | |
import org.apache.http.*; | |
import org.apache.http.entity.InputStreamEntity; | |
import org.apache.http.message.BasicHeader; | |
import org.apache.http.message.BasicHttpResponse; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Scheduler; | |
import rx.Subscription; | |
import rx.concurrency.Schedulers; | |
import rx.subscriptions.BooleanSubscription; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Func1; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import java.util.concurrent.CancellationException; | |
import static org.apache.http.HttpVersion.HTTP_1_1; | |
public class HttpClient { | |
private static final int DEFAULT_BUFFER_SIZE = 1024; | |
private final OkHttpClient httpClient; | |
private final Scheduler httpScheduler; | |
public OkHttpClient getOkHttpClient() { | |
return httpClient; | |
} | |
public HttpClient(OkHttpClient httpClient, Scheduler httpScheduler) { | |
this.httpClient = httpClient; | |
this.httpScheduler = httpScheduler; | |
} | |
public HttpClient(OkHttpClient httpClient) { | |
this(httpClient, Schedulers.threadPoolForIO()); | |
} | |
public HttpClient() { | |
this(new OkHttpClient(), Schedulers.threadPoolForIO()); | |
} | |
public Observable<HttpResponse> execute(final HttpRequest request) { | |
return execute(request, this.httpScheduler); | |
} | |
public Observable<HttpResponse> execute(final HttpRequest request, final Scheduler httpScheduler) { | |
return Observable.create(new Func1<Observer<HttpResponse>, Subscription>() { | |
@Override | |
public Subscription call(final Observer<HttpResponse> observer) { | |
final BooleanSubscription subscription = new BooleanSubscription(); | |
try { | |
RequestLine requestLine = request.getRequestLine(); | |
URL url = new URL(requestLine.getUri()); | |
final HttpURLConnection connection = httpClient.open(url); | |
connection.setRequestMethod(requestLine.getMethod()); | |
for (Header header : request.getAllHeaders()) { | |
connection.addRequestProperty(header.getName(), header.getValue()); | |
} | |
httpScheduler.schedule(new Action0() { | |
@Override | |
public void call() { | |
try { | |
if (subscription.isUnsubscribed()) { | |
throw new CancellationException("Request Cancelled"); | |
} | |
// Stream the request body. | |
if (request instanceof HttpEntityEnclosingRequest) { | |
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); | |
if (entity != null) { | |
connection.setDoOutput(true); | |
Header type = entity.getContentType(); | |
if (type != null) { | |
connection.addRequestProperty(type.getName(), type.getValue()); | |
} | |
Header encoding = entity.getContentEncoding(); | |
if (encoding != null) { | |
connection.addRequestProperty(encoding.getName(), encoding.getValue()); | |
} | |
if (entity.isChunked() || entity.getContentLength() < 0) { | |
connection.setChunkedStreamingMode(0); | |
} else if (entity.getContentLength() <= 8192) { | |
// Buffer short, fixed-length request bodies. This costs memory, but permits the request | |
// to be transparently retried if there is a connection failure. | |
connection.addRequestProperty("Content-Length", Long.toString(entity.getContentLength())); | |
} else { | |
connection.setFixedLengthStreamingMode((int) entity.getContentLength()); | |
} | |
OutputStream output = connection.getOutputStream(); | |
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; | |
int n; | |
while (-1 != (n = entity.getContent().read(buffer))) { | |
if(subscription.isUnsubscribed()) { | |
throw new CancellationException("Request Cancelled"); | |
} | |
output.write(buffer, 0, n); | |
} | |
} | |
} | |
if (subscription.isUnsubscribed()) { | |
throw new CancellationException("Request Cancelled"); | |
} | |
// Read the response headers. | |
int responseCode = connection.getResponseCode(); | |
String message = connection.getResponseMessage(); | |
BasicHttpResponse response = new BasicHttpResponse(HTTP_1_1, responseCode, message); | |
// Get the response body ready to stream. | |
InputStream responseBody = | |
responseCode < HttpURLConnection.HTTP_BAD_REQUEST | |
? connection.getInputStream() | |
: connection.getErrorStream(); | |
InputStreamEntity entity = new InputStreamEntity(responseBody, connection.getContentLength()); | |
for (int i = 0; true; i++) { | |
String name = connection.getHeaderFieldKey(i); | |
if (name == null) { | |
break; | |
} | |
BasicHeader header = new BasicHeader(name, connection.getHeaderField(i)); | |
response.addHeader(header); | |
if (name.equalsIgnoreCase("Content-Type")) { | |
entity.setContentType(header); | |
} else if (name.equalsIgnoreCase("Content-Encoding")) { | |
entity.setContentEncoding(header); | |
} | |
} | |
response.setEntity(entity); | |
observer.onNext(response); | |
} | |
catch (Exception e) { | |
observer.onError(e); | |
} | |
} | |
}); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
return subscription; | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment