Skip to content

Instantly share code, notes, and snippets.

@imminent
Last active August 11, 2016 17:21

Revisions

  1. imminent revised this gist Jan 22, 2014. 1 changed file with 7 additions and 15 deletions.
    22 changes: 7 additions & 15 deletions CursorSubject.java
    Original file line number Diff line number Diff line change
    @@ -12,7 +12,6 @@
    import javax.annotation.Nullable;
    import javax.annotation.ParametersAreNonnullByDefault;

    import rx.Observable;
    import rx.Observer;
    import rx.Scheduler;
    import rx.Subscription;
    @@ -25,11 +24,10 @@
    @ParametersAreNonnullByDefault
    public abstract class CursorSubject implements Closeable {

    public CursorSubject(Scheduler subscribe_on, final Scheduler observe_on) {
    public CursorSubject(Scheduler execute_on, final Scheduler observe_on) {
    _last_cursor = new AtomicReference<>();
    _cursor_channel = PublishSubject.create();
    _observable = _cursor_channel.subscribeOn(subscribe_on).observeOn(observe_on);
    _execution_scheduler = subscribe_on;
    _execution_scheduler = execute_on;
    _load_cursor_action = new Action0() {

    @Override public void call() {
    @@ -56,10 +54,8 @@ public CursorSubject(Scheduler subscribe_on, final Scheduler observe_on) {
    _deliver_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {

    @Override public Subscription call(Scheduler scheduler, Cursor cursor) {
    synchronized (_LOCK) {
    _cursor_channel.onNext(cursor);
    updateLastCursor(cursor);
    }
    _cursor_channel.onNext(cursor);
    updateLastCursor(cursor);
    return Subscriptions.empty();
    }
    };
    @@ -69,11 +65,11 @@ public CursorSubject(Scheduler subscribe_on, final Scheduler observe_on) {

    /* Public API */
    public Subscription subscribe(Observer<? super Cursor> observer) {
    return _observable.subscribe(observer);
    return _cursor_channel.subscribe(observer);
    }

    public Subscription subscribe(Action1<? super Cursor> on_next) {
    return _observable.subscribe(on_next);
    return _cursor_channel.subscribe(on_next);
    }

    /**
    @@ -117,14 +113,10 @@ public ForceLoadContentObserver() {
    }
    }

    private final Observable<Cursor> _observable;
    private final Scheduler _execution_scheduler;
    /* package */final Action0 _load_cursor_action;
    /* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _deliver_cursor_action;
    /* package */final AtomicReference<Cursor> _last_cursor;
    /* package */final CursorRetriever _cursor_retriever;
    /* package */final BehaviorSubject<Cursor> _cursor_channel;
    /* package */final ObjectPool<QueryBuilder> _query_pool;
    /* package */final PublishSubject<Cursor> _cursor_channel;
    /* package */final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
    /* package */final Object _LOCK = new Object();
    }
  2. imminent revised this gist Jan 21, 2014. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions ExampleActivity.java
    Original file line number Diff line number Diff line change
    @@ -18,12 +18,12 @@ public class ExampleActivity extends ListActivity implements Observer<Cursor> {

    /* Activity callbacks */
    @Override protected onCreate(Bundle _) {
    CursorSubject.create(io(), mainThread(), new CursorRetriever() {
    new CursorSubject(io(), mainThread()) {

    @Override
    @Nullable public Cursor retrieveCursor() {
    return getContentResolver().query(Uri.EMPTY, null, null, null, null);;
    }).subscribe(this);
    }.subscribe(this);

    /* Cursor Observer */
    @Override public void onNext(Cursor cursor) {
  3. imminent revised this gist Jan 21, 2014. 1 changed file with 28 additions and 35 deletions.
    63 changes: 28 additions & 35 deletions CursorSubject.java
    Original file line number Diff line number Diff line change
    @@ -23,40 +23,10 @@
    import rx.util.functions.Func2;

    @ParametersAreNonnullByDefault
    public final class CursorSubject implements Closeable {
    public abstract class CursorSubject implements Closeable {

    public static CursorSubject create(Scheduler subscribe_on, Scheduler observe_on
    , CursorRetriever cursor_retriever) {
    return new CursorSubject(default_cursor, subscribe_on, observe_on, cursor_retriever);
    }

    /* Public API */
    public Subscription subscribe(Observer<? super Cursor> observer) {
    return _observable.subscribe(observer);
    }

    public Subscription subscribe(Action1<? super Cursor> on_next) {
    return _observable.subscribe(on_next);
    }

    /**
    * Retrieves the {@link android.database.Cursor}.
    */
    public interface CursorRetriever {

    @Nullable Cursor retrieveCursor();
    }

    /* Closeable */
    @Override public void close() {
    _cursor_channel.onCompleted();
    updateLastCursor(null);
    }

    private CursorSubject(@Nullable Cursor default_cursor, Scheduler subscribe_on
    , final Scheduler observe_on, CursorRetriever cursor_retriever) {
    public CursorSubject(Scheduler subscribe_on, final Scheduler observe_on) {
    _last_cursor = new AtomicReference<>();
    _cursor_retriever = cursor_retriever;
    _cursor_channel = PublishSubject.create();
    _observable = _cursor_channel.subscribeOn(subscribe_on).observeOn(observe_on);
    _execution_scheduler = subscribe_on;
    @@ -86,15 +56,37 @@ private CursorSubject(@Nullable Cursor default_cursor, Scheduler subscribe_on
    _deliver_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {

    @Override public Subscription call(Scheduler scheduler, Cursor cursor) {
    _cursor_channel.onNext(cursor);
    updateLastCursor(cursor);
    synchronized (_LOCK) {
    _cursor_channel.onNext(cursor);
    updateLastCursor(cursor);
    }
    return Subscriptions.empty();
    }
    };
    // Schedules the first cursor retrieval (subsequent retrievals occur when content changes)
    loadCursor();
    }

    /* Public API */
    public Subscription subscribe(Observer<? super Cursor> observer) {
    return _observable.subscribe(observer);
    }

    public Subscription subscribe(Action1<? super Cursor> on_next) {
    return _observable.subscribe(on_next);
    }

    /**
    * Retrieves the {@link android.database.Cursor}.
    */
    @Nullable protected abstract Cursor retrieveCursor();

    /* Closeable */
    @Override public void close() {
    _cursor_channel.onCompleted();
    updateLastCursor(null);
    }

    /* package */void updateLastCursor(@Nullable Cursor cursor) {
    // Makes sure to close cursor
    final Cursor last_cursor = _last_cursor.getAndSet(cursor);
    @@ -133,5 +125,6 @@ public ForceLoadContentObserver() {
    /* package */final CursorRetriever _cursor_retriever;
    /* package */final BehaviorSubject<Cursor> _cursor_channel;
    /* package */final ObjectPool<QueryBuilder> _query_pool;
    /* package */ final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
    /* package */final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
    /* package */final Object _LOCK = new Object();
    }
  4. imminent revised this gist Jan 20, 2014. 1 changed file with 10 additions and 11 deletions.
    21 changes: 10 additions & 11 deletions CursorSubject.java
    Original file line number Diff line number Diff line change
    @@ -75,34 +75,33 @@ private CursorSubject(@Nullable Cursor default_cursor, Scheduler subscribe_on
    throw exception;
    }
    // Delivers resulting Cursor over Cursor channel
    _cursor_channel.onNext(cursor);
    // Should make sure that the cursor is scheduled to be closed after the new one has been received
    observe_on.schedule(cursor, _update_last_cursor_action);
    observe_on.schedule(cursor, _deliver_cursor_action);
    }
    } catch (Exception exception) {
    _cursor_channel.onError(exception);
    }
    }
    };
    _update_last_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {
    // Should make sure that the cursor is scheduled to be closed after the new one has been received
    _deliver_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {

    @Override public Subscription call(Scheduler _, Cursor cursor) {
    updateLastCursor(cursor);
    return Subscriptions.empty();
    @Override public Subscription call(Scheduler scheduler, Cursor cursor) {
    _cursor_channel.onNext(cursor);
    updateLastCursor(cursor);
    return Subscriptions.empty();
    }
    };
    // Schedules the first cursor retrieval (subsequent retrievals occur when content changes)
    _execution_scheduler.schedule(_load_cursor_action);
    loadCursor();
    }

    /* package */void updateLastCursor(@Nullable Cursor cursor) {
    // Makes sure to close cursor
    final Cursor last_cursor = _last_cursor.getAndSet(cursor);
    Timber.d("Updating to Cursor %s and closing Cursor %s", cursor, last_cursor);
    if (last_cursor != null) last_cursor.close();
    }

    private void loadCursor() {
    /* package */void loadCursor() {
    // Schedules Cursor loading on execution scheduler
    _execution_scheduler.schedule(_load_cursor_action);
    }
    @@ -129,7 +128,7 @@ public ForceLoadContentObserver() {
    private final Observable<Cursor> _observable;
    private final Scheduler _execution_scheduler;
    /* package */final Action0 _load_cursor_action;
    /* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _update_last_cursor_action;
    /* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _deliver_cursor_action;
    /* package */final AtomicReference<Cursor> _last_cursor;
    /* package */final CursorRetriever _cursor_retriever;
    /* package */final BehaviorSubject<Cursor> _cursor_channel;
  5. imminent revised this gist Jan 19, 2014. 1 changed file with 2 additions and 7 deletions.
    9 changes: 2 additions & 7 deletions CursorSubject.java
    Original file line number Diff line number Diff line change
    @@ -25,14 +25,9 @@
    @ParametersAreNonnullByDefault
    public final class CursorSubject implements Closeable {

    public static CursorSubject create(Cursor default_cursor, Scheduler subscribe_on, Scheduler observe_on
    , CursorRetriever cursor_retriever) {
    return new CursorSubject(default_cursor, subscribe_on, observe_on, cursor_retriever);
    }

    public static CursorSubject create(Scheduler subscribe_on, Scheduler observe_on
    , CursorRetriever cursor_retriever) {
    return create(null, subscribe_on, observe_on, cursor_retriever);
    return new CursorSubject(default_cursor, subscribe_on, observe_on, cursor_retriever);
    }

    /* Public API */
    @@ -60,7 +55,7 @@ public interface CursorRetriever {

    private CursorSubject(@Nullable Cursor default_cursor, Scheduler subscribe_on
    , final Scheduler observe_on, CursorRetriever cursor_retriever) {
    _last_cursor = new AtomicReference<>(default_cursor);
    _last_cursor = new AtomicReference<>();
    _cursor_retriever = cursor_retriever;
    _cursor_channel = PublishSubject.create();
    _observable = _cursor_channel.subscribeOn(subscribe_on).observeOn(observe_on);
  6. imminent created this gist Jan 19, 2014.
    143 changes: 143 additions & 0 deletions CursorSubject.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,143 @@
    package com.example.models;

    import android.database.ContentObserver;
    import android.database.Cursor;
    import android.os.Handler;
    import android.os.Looper;

    import java.io.Closeable;
    import java.util.concurrent.atomic.AtomicReference;

    import javax.annotation.Nonnull;
    import javax.annotation.Nullable;
    import javax.annotation.ParametersAreNonnullByDefault;

    import rx.Observable;
    import rx.Observer;
    import rx.Scheduler;
    import rx.Subscription;
    import rx.subjects.PublishSubject;
    import rx.subscriptions.Subscriptions;
    import rx.util.functions.Action0;
    import rx.util.functions.Action1;
    import rx.util.functions.Func2;

    @ParametersAreNonnullByDefault
    public final class CursorSubject implements Closeable {

    public static CursorSubject create(Cursor default_cursor, Scheduler subscribe_on, Scheduler observe_on
    , CursorRetriever cursor_retriever) {
    return new CursorSubject(default_cursor, subscribe_on, observe_on, cursor_retriever);
    }

    public static CursorSubject create(Scheduler subscribe_on, Scheduler observe_on
    , CursorRetriever cursor_retriever) {
    return create(null, subscribe_on, observe_on, cursor_retriever);
    }

    /* Public API */
    public Subscription subscribe(Observer<? super Cursor> observer) {
    return _observable.subscribe(observer);
    }

    public Subscription subscribe(Action1<? super Cursor> on_next) {
    return _observable.subscribe(on_next);
    }

    /**
    * Retrieves the {@link android.database.Cursor}.
    */
    public interface CursorRetriever {

    @Nullable Cursor retrieveCursor();
    }

    /* Closeable */
    @Override public void close() {
    _cursor_channel.onCompleted();
    updateLastCursor(null);
    }

    private CursorSubject(@Nullable Cursor default_cursor, Scheduler subscribe_on
    , final Scheduler observe_on, CursorRetriever cursor_retriever) {
    _last_cursor = new AtomicReference<>(default_cursor);
    _cursor_retriever = cursor_retriever;
    _cursor_channel = PublishSubject.create();
    _observable = _cursor_channel.subscribeOn(subscribe_on).observeOn(observe_on);
    _execution_scheduler = subscribe_on;
    _load_cursor_action = new Action0() {

    @Override public void call() {
    try {
    final Cursor cursor = _cursor_retriever.retrieveCursor();
    if (cursor != null) {
    try {
    // Ensure the cursor window is filled.
    cursor.getCount();
    cursor.registerContentObserver(_content_observer);
    } catch (RuntimeException exception) {
    cursor.close();
    throw exception;
    }
    // Delivers resulting Cursor over Cursor channel
    _cursor_channel.onNext(cursor);
    // Should make sure that the cursor is scheduled to be closed after the new one has been received
    observe_on.schedule(cursor, _update_last_cursor_action);
    }
    } catch (Exception exception) {
    _cursor_channel.onError(exception);
    }
    }
    };
    _update_last_cursor_action = new Func2<Scheduler, Cursor, Subscription>() {

    @Override public Subscription call(Scheduler _, Cursor cursor) {
    updateLastCursor(cursor);
    return Subscriptions.empty();
    }
    };
    // Schedules the first cursor retrieval (subsequent retrievals occur when content changes)
    _execution_scheduler.schedule(_load_cursor_action);
    }

    /* package */void updateLastCursor(@Nullable Cursor cursor) {
    // Makes sure to close cursor
    final Cursor last_cursor = _last_cursor.getAndSet(cursor);
    Timber.d("Updating to Cursor %s and closing Cursor %s", cursor, last_cursor);
    if (last_cursor != null) last_cursor.close();
    }

    private void loadCursor() {
    // Schedules Cursor loading on execution scheduler
    _execution_scheduler.schedule(_load_cursor_action);
    }

    /**
    * An implementation of a ContentObserver that takes care of connecting
    * it to re-load the data when the observer is told it has changed.
    */
    private final class ForceLoadContentObserver extends ContentObserver {

    public ForceLoadContentObserver() {
    super(new Handler(Looper.myLooper()));
    }

    @Override public boolean deliverSelfNotifications() {
    return true;
    }

    @Override public void onChange(boolean _) {
    loadCursor();
    }
    }

    private final Observable<Cursor> _observable;
    private final Scheduler _execution_scheduler;
    /* package */final Action0 _load_cursor_action;
    /* package */final Func2<? super Scheduler, ? super Cursor, ? extends Subscription> _update_last_cursor_action;
    /* package */final AtomicReference<Cursor> _last_cursor;
    /* package */final CursorRetriever _cursor_retriever;
    /* package */final BehaviorSubject<Cursor> _cursor_channel;
    /* package */final ObjectPool<QueryBuilder> _query_pool;
    /* package */ final ForceLoadContentObserver _content_observer = new ForceLoadContentObserver();
    }
    40 changes: 40 additions & 0 deletions ExampleActivity.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    package com.example;

    import android.app.ListActivity;
    import android.database.Cursor;
    import android.net.Uri;
    import android.os.Bundle;
    import android.widget.CursorAdapter;
    import android.widget.SimpleCursorAdapter;

    import com.example.models.CursorSubject;

    import rx.Observer;

    import static rx.schedulers.Schedulers.io;
    import static rx.android.schedulers.AndroidSchedulers.mainThread;

    public class ExampleActivity extends ListActivity implements Observer<Cursor> {

    /* Activity callbacks */
    @Override protected onCreate(Bundle _) {
    CursorSubject.create(io(), mainThread(), new CursorRetriever() {

    @Override
    @Nullable public Cursor retrieveCursor() {
    return getContentResolver().query(Uri.EMPTY, null, null, null, null);;
    }).subscribe(this);

    /* Cursor Observer */
    @Override public void onNext(Cursor cursor) {
    if (cursor.isClosed()) return;

    final CursorAdapter adapter = (CursorAdapter) getListAdapter();
    if (adapter != null) adapter.swapCursor(cursor);
    else setListAdapter(new SimpleCursorAdapter(this, android.R.layout.simple_list_item_1, cursor));
    }

    @Override public void onCompleted() { }

    @Override public void onError(Throwable _) { }
    }