Last active
January 5, 2016 16:11
-
-
Save codeprogression/00f186f5a8bb4e0b7746 to your computer and use it in GitHub Desktop.
Encapsulate polling an endpoint using RxJava, OkHttp, and Gson
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.gson.Gson; | |
import com.squareup.okhttp.OkHttpClient; | |
import javax.inject.Inject; | |
import javax.inject.Singleton; | |
@Singleton | |
public class ExamplePoller extends RxPoller<Example> { | |
@Inject | |
public ExamplePoller(OkHttpClient client, Gson gson) { | |
super(client, gson); | |
} | |
public void setListener(RxPoller.Configuration<Example> configuration) { | |
super.configure(listener, configuration.class); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.util.Pair; | |
import java.util.concurrent.TimeUnit; | |
import javax.inject.Inject; | |
import javax.inject.Named; | |
import javax.inject.Singleton; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.schedulers.Schedulers; | |
import rx.subjects.BehaviorSubject; | |
@Singleton | |
public class ExampleProvider implements RxPoller.Configuration<Example>{ | |
public static final Pair<Integer, TimeUnit> POLLING_INTERVAL = new Pair<>(5, TimeUnit.MINUTES); | |
private final ExamplePoller poller; | |
private final String exampleUrl; | |
private Subscription subscription; | |
private BehaviorSubject<Example> exampleSubject; | |
@Inject | |
public ExampleProvider(ExamplePoller poller, @Named("EXAMPLE_URL") String exampleUrl){ | |
this.exampleUrl = exampleUrl; | |
this.poller = poller; | |
poller.configure(this); | |
exampleSubject = BehaviorSubject.create(new Example()); | |
startPolling(); | |
} | |
private void startPolling() { | |
subscription = Observable.create(this.poller) | |
.subscribe(new Subscriber<Example>() { | |
@Override public void onCompleted() { | |
exampleSubject.onCompleted(); | |
} | |
@Override public void onError(Throwable e) { | |
exampleSubject.onError(e); | |
} | |
@Override public void onNext(Example example) { | |
exampleSubject.onNext(example); | |
} | |
}); | |
} | |
public synchronized Observable<Example> getObservable() { | |
if (subscription == null || subscription.isUnsubscribed()) { | |
startPolling(); | |
} | |
return exampleSubject.asObservable() | |
.subscribeOn(Schedulers.io()) | |
.doOnUnsubscribe(new Action0() { | |
@Override public void call() { | |
poller.cancel(); | |
subscription.unsubscribe(); | |
} | |
}); | |
} | |
@Override | |
public int getMaxRetries() { | |
return 10; | |
} | |
@Override public Pair<Integer, TimeUnit> getPollingInterval(Example result) { | |
return POLLING_INTERVAL; | |
} | |
@Override | |
public String getUrl() { | |
return examplenUrl; | |
} | |
@Override public boolean enableNulls() { | |
return false; | |
} | |
public void update(Example example) { | |
exampleSubject.onNext(example); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import javax.inject.Inject; | |
import rx.Subscriber; | |
public class ExampleClient { | |
ExampleProvider provider; | |
@Inject | |
public ExampleClient(ExampleProvider provider){ | |
this.provider = provider; | |
} | |
public void doSomething(){ | |
provider.getObservable() | |
.subscribe(new Subscriber<Example>(){ | |
@Override public void onCompleted(){} | |
@Override public void onError(Throwable e){} | |
@Override public void onNext(Example example){ | |
// do something with example object | |
} | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.text.TextUtils; | |
import android.util.Pair; | |
import com.google.gson.Gson; | |
import com.squareup.okhttp.Call; | |
import com.squareup.okhttp.Callback; | |
import com.squareup.okhttp.OkHttpClient; | |
import com.squareup.okhttp.Request; | |
import com.squareup.okhttp.Response; | |
import java.io.IOException; | |
import java.net.SocketException; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.Subscriber; | |
import rx.functions.Action0; | |
import rx.schedulers.Schedulers; | |
import rx.subscriptions.Subscriptions; | |
import timber.log.Timber; | |
public class RxPoller<T> implements Observable.OnSubscribe<T> { | |
private final OkHttpClient client; | |
private final Gson gson; | |
private int retry = 0; | |
protected Configuration<T> configuration; | |
protected Class<?> resultClass; | |
public Pair<Integer, TimeUnit> pollingInterval; | |
private String lastModified; | |
private Scheduler.Worker worker; | |
private Action0 action; | |
private String etag; | |
protected String responseBody; | |
private Call call; | |
protected Subscriber<? super T> subscriber; | |
public RxPoller(OkHttpClient client, Gson gson) { | |
this.client = client; | |
this.gson = gson; | |
} | |
public void configure(Configuration<T> configuration, Class<?> resultClass) { | |
this.configuration = configuration; | |
this.resultClass = resultClass; | |
} | |
public Subscriber<? super T> getSubscriber(){ | |
return subscriber; | |
} | |
@Override | |
public void call(final Subscriber<? super T> subscriber) { | |
this.subscriber = subscriber; | |
if (worker == null || worker.isUnsubscribed()) { | |
worker = Schedulers.io().createWorker(); | |
worker.schedule(getAction()); | |
} else if (!TextUtils.isEmpty(responseBody)) { | |
subscriber.onNext(getResult()); | |
} | |
} | |
protected T getResult() { | |
return (T) gson.fromJson(responseBody, resultClass); | |
} | |
protected Action0 getAction() { | |
if (action == null) { | |
action = new Action0() { | |
@Override public void call() { | |
try { | |
String url = configuration.getUrl(); | |
if (url == null) { | |
Timber.d("URL is null for %s, delaying 15 seconds", resultClass.getSimpleName()); | |
if (responseBody == null) { | |
pollingInterval = new Pair<>(15, TimeUnit.SECONDS); | |
} | |
scheduleWorker(); | |
return; | |
} | |
Timber.d("Getting " + resultClass.getSimpleName() + " from NETWORK"); | |
Request.Builder request = new Request.Builder() | |
.url(url) | |
.get(); | |
if (lastModified != null) | |
request.addHeader("If-Modified-Since", lastModified); | |
if (etag != null) | |
request.addHeader("If-None-Match", etag); | |
call = client.newCall(request.build()); | |
call.enqueue(PollingSubscriber.this.getCallback()); | |
} catch (Exception e) { | |
if (subscriber != null){ | |
subscriber.onError(e); | |
} | |
Timber.d(e, "Error while polling"); | |
} | |
} | |
} | |
; | |
} | |
return action; | |
} | |
private Callback getCallback() { | |
return new Callback() { | |
@Override | |
public void onFailure(Request request, IOException e) { | |
if ("Canceled".equals(e.getMessage())) return; | |
handleFailure(e); | |
} | |
@Override | |
public void onResponse(Response response) throws IOException { | |
Throwable error = getFailureExceptionOnBadStatus(response); | |
if (error != null) { | |
handleFailure(error); | |
return; | |
} | |
String lastModified = response.header("Last-Modified"); | |
String etag = response.header("Etag"); | |
retry = 0; | |
if (isModified(etag)) { | |
Timber.d("Received response - LAST-MODIFIED: " + lastModified); | |
Timber.d("Received response - ETAG: " + etag); | |
T result; | |
try { | |
//noinspection unchecked | |
result = (T) gson.fromJson(response.body().charStream(), resultClass); | |
responseBody = gson.toJson(result); | |
} catch (Exception e) { | |
if ((e instanceof SocketException)) { | |
Timber.e(e, ""); | |
// scheduleWorker(); | |
} else { | |
handleFailure(e); | |
} | |
return; | |
} | |
Pair<Integer, TimeUnit> interval = configuration.getPollingInterval(result); | |
pollingInterval = interval; | |
if (result != null || configuration.enableNulls()) { | |
if (subscriber != null){ | |
subscriber.onNext(result); | |
} | |
} | |
RxPoller.this.lastModified = lastModified; | |
RxPoller.this.etag = etag; | |
} | |
Timber.d("Next poll in %s %s", | |
Integer.toString(pollingInterval.first), | |
pollingInterval.second.toString()); | |
scheduleWorker(); | |
} | |
}; | |
} | |
public void kick() { | |
if (worker != null){ | |
worker.unsubscribe(); | |
} | |
worker = Schedulers.io().createWorker(); | |
worker.schedule(getAction()); | |
} | |
protected void scheduleWorker() { | |
if ( worker == null || worker.isUnsubscribed()) { | |
worker = Schedulers.io().createWorker(); | |
} | |
worker.schedule(getAction(), pollingInterval.first, pollingInterval.second); | |
} | |
protected void sendCachedResult() { | |
Timber.d("Getting " + resultClass.getName() + " from CACHE"); | |
T result = getResult(); | |
if (subscriber != null){ | |
subscriber.onNext(result); | |
} | |
scheduleWorker(); | |
} | |
private boolean isModified(String etag) { | |
boolean etagMatches = this.etag != null && this.etag.equals(etag); | |
if (etagMatches) Timber.d("ETAG match, no changes: " + etag); | |
return !etagMatches; | |
} | |
private void handleFailure(Throwable e) { | |
Timber.e("Error received: ", e); | |
if (!suppressFailureRetry()) { | |
pollingInterval = new Pair<>(10, TimeUnit.SECONDS); | |
Timber.d("Setting polling time to: " + pollingInterval); | |
retry++; | |
if (retry > configuration.getMaxRetries()) { | |
if (subscriber != null) { | |
subscriber.onError(e); | |
} | |
return; | |
} | |
} | |
scheduleWorker(); | |
} | |
protected boolean suppressFailureRetry() { | |
return false; | |
} | |
private Throwable getFailureExceptionOnBadStatus(Response resp) { | |
if (resp.code() < 399) return null; | |
String message = String.format("Polling subscriber received bad response for: %s\n\t%d %s", | |
resp.request().urlString(), resp.code(), resp.body()); | |
return new Exception(message); | |
} | |
private void cancelExistingCalls(final Call call, Subscriber<? super T> subscriber) { | |
subscriber.add(Subscriptions.create(new Action0() { | |
@Override public void call() { | |
call.cancel(); | |
} | |
})); | |
} | |
public void cancel() { | |
if (worker != null) { | |
worker.unsubscribe(); | |
} | |
new Thread(new Runnable() { | |
@Override public void run() { | |
if (call == null || subscriber == null) return; | |
cancelExistingCalls(call, subscriber); | |
} | |
}).start(); | |
} | |
public Class<?> getResultClass() { | |
return resultClass; | |
} | |
public interface Configuration<T> { | |
int getMaxRetries(); | |
Pair<Integer, TimeUnit> getPollingInterval(T result); | |
String getUrl(); | |
boolean enableNulls(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment