Skip to content

Instantly share code, notes, and snippets.

@prabirshrestha
Last active October 10, 2018 16:06

Revisions

  1. prabirshrestha revised this gist Aug 23, 2013. 1 changed file with 2 additions and 4 deletions.
    6 changes: 2 additions & 4 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -28,7 +28,7 @@ public class HttpClient {

    private final OkHttpClient httpClient;
    private final Scheduler httpScheduler;

    public OkHttpClient getOkHttpClient() {
    return httpClient;
    }
    @@ -154,9 +154,7 @@ public void call() {
    }
    }
    });

    return subscription;


    } catch (Exception e) {
    observer.onError(e);
    }
  2. prabirshrestha revised this gist Aug 23, 2013. 1 changed file with 4 additions and 0 deletions.
    4 changes: 4 additions & 0 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -28,6 +28,10 @@ public class HttpClient {

    private final OkHttpClient httpClient;
    private final Scheduler httpScheduler;

    public OkHttpClient getOkHttpClient() {
    return httpClient;
    }

    public HttpClient(OkHttpClient httpClient, Scheduler httpScheduler) {
    this.httpClient = httpClient;
  3. prabirshrestha revised this gist Aug 23, 2013. 1 changed file with 13 additions and 2 deletions.
    15 changes: 13 additions & 2 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -11,11 +11,11 @@
    import rx.Subscription;
    import rx.concurrency.Schedulers;
    import rx.subscriptions.BooleanSubscription;
    import rx.subscriptions.Subscriptions;
    import rx.util.functions.Action0;
    import rx.util.functions.Func1;

    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.concurrent.CancellationException;
    @@ -24,6 +24,8 @@

    public class HttpClient {

    private static final int DEFAULT_BUFFER_SIZE = 1024;

    private final OkHttpClient httpClient;
    private final Scheduler httpScheduler;

    @@ -93,7 +95,16 @@ public void call() {
    } else {
    connection.setFixedLengthStreamingMode((int) entity.getContentLength());
    }
    entity.writeTo(connection.getOutputStream());

    OutputStream output = connection.getOutputStream();
    byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
    int n;
    while (-1 != (n = entity.getContent().read(buffer))) {
    if(subscription.isUnsubscribed()) {
    throw new CancellationException("Request Cancelled");
    }
    output.write(buffer, 0, n);
    }
    }
    }

  4. prabirshrestha revised this gist Aug 23, 2013. 1 changed file with 16 additions and 8 deletions.
    24 changes: 16 additions & 8 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -10,13 +10,15 @@
    import rx.Scheduler;
    import rx.Subscription;
    import rx.concurrency.Schedulers;
    import rx.subscriptions.BooleanSubscription;
    import rx.subscriptions.Subscriptions;
    import rx.util.functions.Action0;
    import rx.util.functions.Func1;

    import java.io.InputStream;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.concurrent.CancellationException;

    import static org.apache.http.HttpVersion.HTTP_1_1;

    @@ -47,6 +49,8 @@ public Observable<HttpResponse> execute(final HttpRequest request, final Schedul
    @Override
    public Subscription call(final Observer<HttpResponse> observer) {

    final BooleanSubscription subscription = new BooleanSubscription();

    try {
    RequestLine requestLine = request.getRequestLine();
    URL url = new URL(requestLine.getUri());
    @@ -62,6 +66,11 @@ public Subscription call(final Observer<HttpResponse> observer) {
    @Override
    public void call() {
    try {

    if (subscription.isUnsubscribed()) {
    throw new CancellationException("Request Cancelled");
    }

    // Stream the request body.
    if (request instanceof HttpEntityEnclosingRequest) {
    HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
    @@ -88,6 +97,10 @@ public void call() {
    }
    }

    if (subscription.isUnsubscribed()) {
    throw new CancellationException("Request Cancelled");
    }

    // Read the response headers.
    int responseCode = connection.getResponseCode();
    String message = connection.getResponseMessage();
    @@ -127,20 +140,15 @@ public void call() {
    }
    });

    return Subscriptions.create(new Action0() {
    @Override
    public void call() {
    connection.disconnect();
    }
    });
    return subscription;

    } catch (Exception e) {
    observer.onError(e);
    }

    return Subscriptions.empty();
    return subscription;
    }
    });
    }

    }
    }
  5. prabirshrestha revised this gist Aug 23, 2013. 1 changed file with 1 addition and 3 deletions.
    4 changes: 1 addition & 3 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -26,7 +26,6 @@ public class HttpClient {
    private final Scheduler httpScheduler;

    public HttpClient(OkHttpClient httpClient, Scheduler httpScheduler) {

    this.httpClient = httpClient;
    this.httpScheduler = httpScheduler;
    }
    @@ -49,7 +48,6 @@ public Observable<HttpResponse> execute(final HttpRequest request, final Schedul
    public Subscription call(final Observer<HttpResponse> observer) {

    try {

    RequestLine requestLine = request.getRequestLine();
    URL url = new URL(requestLine.getUri());
    final HttpURLConnection connection = httpClient.open(url);
    @@ -59,7 +57,7 @@ public Subscription call(final Observer<HttpResponse> observer) {
    for (Header header : request.getAllHeaders()) {
    connection.addRequestProperty(header.getName(), header.getValue());
    }

    httpScheduler.schedule(new Action0() {
    @Override
    public void call() {
  6. prabirshrestha revised this gist Aug 23, 2013. 1 changed file with 30 additions and 6 deletions.
    36 changes: 30 additions & 6 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -1,10 +1,7 @@
    package com.example.rx_okhttp;

    import com.squareup.okhttp.OkHttpClient;
    import org.apache.http.Header;
    import org.apache.http.HttpRequest;
    import org.apache.http.HttpResponse;
    import org.apache.http.RequestLine;
    import org.apache.http.*;
    import org.apache.http.entity.InputStreamEntity;
    import org.apache.http.message.BasicHeader;
    import org.apache.http.message.BasicHttpResponse;
    @@ -58,14 +55,41 @@ public Subscription call(final Observer<HttpResponse> observer) {
    final HttpURLConnection connection = httpClient.open(url);

    connection.setRequestMethod(requestLine.getMethod());

    for (Header header : request.getAllHeaders()) {
    connection.addRequestProperty(header.getName(), header.getValue());
    }

    httpScheduler.schedule(new Action0() {
    @Override
    public void call() {
    try {
    try {
    // Stream the request body.
    if (request instanceof HttpEntityEnclosingRequest) {
    HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
    if (entity != null) {
    connection.setDoOutput(true);
    Header type = entity.getContentType();
    if (type != null) {
    connection.addRequestProperty(type.getName(), type.getValue());
    }
    Header encoding = entity.getContentEncoding();
    if (encoding != null) {
    connection.addRequestProperty(encoding.getName(), encoding.getValue());
    }
    if (entity.isChunked() || entity.getContentLength() < 0) {
    connection.setChunkedStreamingMode(0);
    } else if (entity.getContentLength() <= 8192) {
    // Buffer short, fixed-length request bodies. This costs memory, but permits the request
    // to be transparently retried if there is a connection failure.
    connection.addRequestProperty("Content-Length", Long.toString(entity.getContentLength()));
    } else {
    connection.setFixedLengthStreamingMode((int) entity.getContentLength());
    }
    entity.writeTo(connection.getOutputStream());
    }
    }

    // Read the response headers.
    int responseCode = connection.getResponseCode();
    String message = connection.getResponseMessage();
  7. prabirshrestha created this gist Aug 23, 2013.
    124 changes: 124 additions & 0 deletions HttpClient.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,124 @@
    package com.example.rx_okhttp;

    import com.squareup.okhttp.OkHttpClient;
    import org.apache.http.Header;
    import org.apache.http.HttpRequest;
    import org.apache.http.HttpResponse;
    import org.apache.http.RequestLine;
    import org.apache.http.entity.InputStreamEntity;
    import org.apache.http.message.BasicHeader;
    import org.apache.http.message.BasicHttpResponse;
    import rx.Observable;
    import rx.Observer;
    import rx.Scheduler;
    import rx.Subscription;
    import rx.concurrency.Schedulers;
    import rx.subscriptions.Subscriptions;
    import rx.util.functions.Action0;
    import rx.util.functions.Func1;

    import java.io.InputStream;
    import java.net.HttpURLConnection;
    import java.net.URL;

    import static org.apache.http.HttpVersion.HTTP_1_1;

    public class HttpClient {

    private final OkHttpClient httpClient;
    private final Scheduler httpScheduler;

    public HttpClient(OkHttpClient httpClient, Scheduler httpScheduler) {

    this.httpClient = httpClient;
    this.httpScheduler = httpScheduler;
    }

    public HttpClient(OkHttpClient httpClient) {
    this(httpClient, Schedulers.threadPoolForIO());
    }

    public HttpClient() {
    this(new OkHttpClient(), Schedulers.threadPoolForIO());
    }

    public Observable<HttpResponse> execute(final HttpRequest request) {
    return execute(request, this.httpScheduler);
    }

    public Observable<HttpResponse> execute(final HttpRequest request, final Scheduler httpScheduler) {
    return Observable.create(new Func1<Observer<HttpResponse>, Subscription>() {
    @Override
    public Subscription call(final Observer<HttpResponse> observer) {

    try {

    RequestLine requestLine = request.getRequestLine();
    URL url = new URL(requestLine.getUri());
    final HttpURLConnection connection = httpClient.open(url);

    connection.setRequestMethod(requestLine.getMethod());
    for (Header header : request.getAllHeaders()) {
    connection.addRequestProperty(header.getName(), header.getValue());
    }

    httpScheduler.schedule(new Action0() {
    @Override
    public void call() {
    try {
    // Read the response headers.
    int responseCode = connection.getResponseCode();
    String message = connection.getResponseMessage();

    BasicHttpResponse response = new BasicHttpResponse(HTTP_1_1, responseCode, message);

    // Get the response body ready to stream.
    InputStream responseBody =
    responseCode < HttpURLConnection.HTTP_BAD_REQUEST
    ? connection.getInputStream()
    : connection.getErrorStream();

    InputStreamEntity entity = new InputStreamEntity(responseBody, connection.getContentLength());

    for (int i = 0; true; i++) {
    String name = connection.getHeaderFieldKey(i);
    if (name == null) {
    break;
    }

    BasicHeader header = new BasicHeader(name, connection.getHeaderField(i));
    response.addHeader(header);
    if (name.equalsIgnoreCase("Content-Type")) {
    entity.setContentType(header);
    } else if (name.equalsIgnoreCase("Content-Encoding")) {
    entity.setContentEncoding(header);
    }
    }

    response.setEntity(entity);

    observer.onNext(response);
    }
    catch (Exception e) {
    observer.onError(e);
    }
    }
    });

    return Subscriptions.create(new Action0() {
    @Override
    public void call() {
    connection.disconnect();
    }
    });

    } catch (Exception e) {
    observer.onError(e);
    }

    return Subscriptions.empty();
    }
    });
    }

    }