Skip to content

Instantly share code, notes, and snippets.

@imminent
Last active August 11, 2016 17:21
Show Gist options
  • Save imminent/8511925 to your computer and use it in GitHub Desktop.
Save imminent/8511925 to your computer and use it in GitHub Desktop.
CursorSubject is a Reactive Extension version of Android's CursorLoader. Handles retrieving the Cursor in a background thread, sending the result Cursor on the main thread, and resending a Cursor whenever the content changed notification is triggered.
package com.example.models;
import android.database.ContentObserver;
import android.database.Cursor;
import android.os.Handler;
import android.os.Looper;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func2;
@ParametersAreNonnullByDefault
public final class CursorSubject implements Closeable {
public static CursorSubject create(Cursor default_cursor, Scheduler subscribe_on, Scheduler observe_on
, CursorRetriever cursor_retriever) {
return new CursorSubject(default_cursor, subscribe_on, observe_on, cursor_retriever);
}
public static CursorSubject create(Scheduler subscribe_on, Scheduler observe_on
, CursorRetriever cursor_retriever) {
return create(null, subscribe_on, observe_on, cursor_retriever);
}
/* Public API */
public Subscription subscribe(Observer<? super Cursor> observer) {
return _observable.subscribe(observer);
}
public Subscription subscribe(Action1<? super Cursor> on_next) {
return _observable.subscribe(on_next);
}
/**
* Retrieves the {@link android.database.Cursor}.
*/
public interface CursorRetriever {
@Nullable Cursor retrieveCursor();
}
/* Closeable */
@Override public void close() {
_cursor_channel.onCompleted();
updateLastCursor(null);
}
private CursorSubject(@Nullable Cursor default_cursor, Scheduler subscribe_on
, final Scheduler observe_on, CursorRetriever cursor_retriever) {
_last_cursor = new AtomicReference<>(default_cursor);
_cursor_retriever = cursor_retriever;
_cursor_channel = PublishSubject.create();
_observable = _cursor_channel.subscribeOn(subscribe_on).observeOn(observe_on);
_execution_scheduler = subscribe_on;
_load_cursor_action = new Action0() {
@Override public void call() {
try {
final Cursor cursor = _cursor_retriever.retrieveCursor();
if (cursor != null) {
try {
// Ensure the cursor window is filled.
cursor.getCount();
cursor.registerContentObserver(_content_observer);
} catch (RuntimeException exception) {
cursor.close();
throw exception;
}
// Delivers resulting Cursor over Cursor channel
_cursor_channel.onNext(cursor);
// Should make sure that the cursor is scheduled to be closed after the new one has been received
observe_on.schedule(cursor, _update_last_cursor_action);
}
} catch (Exception exception) {
_cursor_channel.onError(exception);
}
}
};
_update_last_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {
@Override public Subscription call(Scheduler _, Cursor cursor) {
updateLastCursor(cursor);
return Subscriptions.empty();
}
};
// Schedules the first cursor retrieval (subsequent retrievals occur when content changes)
_execution_scheduler.schedule(_load_cursor_action);
}
/* package */void updateLastCursor(@Nullable Cursor cursor) {
// Makes sure to close cursor
final Cursor last_cursor = _last_cursor.getAndSet(cursor);
Timber.d("Updating to Cursor %s and closing Cursor %s", cursor, last_cursor);
if (last_cursor != null) last_cursor.close();
}
private void loadCursor() {
// Schedules Cursor loading on execution scheduler
_execution_scheduler.schedule(_load_cursor_action);
}
/**
* An implementation of a ContentObserver that takes care of connecting
* it to re-load the data when the observer is told it has changed.
*/
private final class ForceLoadContentObserver extends ContentObserver {
public ForceLoadContentObserver() {
super(new Handler(Looper.myLooper()));
}
@Override public boolean deliverSelfNotifications() {
return true;
}
@Override public void onChange(boolean _) {
loadCursor();
}
}
private final Observable<Cursor> _observable;
private final Scheduler _execution_scheduler;
/* package */final Action0 _load_cursor_action;
/* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _update_last_cursor_action;
/* package */final AtomicReference<Cursor> _last_cursor;
/* package */final CursorRetriever _cursor_retriever;
/* package */final BehaviorSubject<Cursor> _cursor_channel;
/* package */final ObjectPool<QueryBuilder> _query_pool;
/* package */ final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
}
package com.example;
import android.app.ListActivity;
import android.database.Cursor;
import android.net.Uri;
import android.os.Bundle;
import android.widget.CursorAdapter;
import android.widget.SimpleCursorAdapter;
import com.example.models.CursorSubject;
import rx.Observer;
import static rx.schedulers.Schedulers.io;
import static rx.android.schedulers.AndroidSchedulers.mainThread;
public class ExampleActivity extends ListActivity implements Observer<Cursor> {
/* Activity callbacks */
@Override protected onCreate(Bundle _) {
CursorSubject.create(io(), mainThread(), new CursorRetriever() {
@Override
@Nullable public Cursor retrieveCursor() {
return getContentResolver().query(Uri.EMPTY, null, null, null, null);;
}).subscribe(this);
/* Cursor Observer */
@Override public void onNext(Cursor cursor) {
if (cursor.isClosed()) return;
final CursorAdapter adapter = (CursorAdapter) getListAdapter();
if (adapter != null) adapter.swapCursor(cursor);
else setListAdapter(new SimpleCursorAdapter(this, android.R.layout.simple_list_item_1, cursor));
}
@Override public void onCompleted() { }
@Override public void onError(Throwable _) { }
}
@nvb
Copy link

nvb commented Jul 13, 2015

It doesn't look like this implementation is still supported by recent changes to the RxJava API (ie. Scheduler). Are you still using this code, or if not, have you found a better solution to the problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment