Skip to content

Instantly share code, notes, and snippets.

@joaocsousa
Last active January 29, 2018 09:37
Show Gist options
  • Save joaocsousa/f3453026954930c203fa7a22a495a9b7 to your computer and use it in GitHub Desktop.
Save joaocsousa/f3453026954930c203fa7a22a495a9b7 to your computer and use it in GitHub Desktop.
Shared observable that will run a task. Every subscriber that subscribes while the task is running will share the same observable and receive the same result. When the tasks finishes the next subscriber will trigger a new task.
[1] subscribed at 229ms
-> Started Task at 233ms
[2] subscribed at 1233ms
[3] subscribed at 2239ms
-> Finished Task at 3237ms
[1] Got result: 233ms
[2] Got result: 233ms
[3] Got result: 233ms
[4] subscribed at 3244ms
-> Started Task at 3244ms
[5] subscribed at 4248ms
-> Finished Task at 6246ms
[4] Got result: 3244ms
[5] Got result: 3244ms
public class Main {
public static final long START_TIME = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
SingletonTask singletonTask = singletonTask();
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(timestamp -> System.out.println("[1] Got result: " + formatTimestamp(timestamp) + "ms"));
System.out.println("[1] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(timestamp -> System.out.println("[2] Got result: " + formatTimestamp(timestamp) + "ms"));
System.out.println("[2] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(timestamp -> System.out.println("[3] Got result: " + formatTimestamp(timestamp) + "ms"));
System.out.println("[3] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(timestamp -> System.out.println("[4] Got result: " + formatTimestamp(timestamp) + "ms"));
System.out.println("[4] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(timestamp -> System.out.println("[5] Got result: " + formatTimestamp(timestamp) + "ms"));
System.out.println("[5] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(20000);
}
private static String formatTimestamp(long timestamp) {
return String.valueOf((timestamp - START_TIME) % 1000000);
}
}
public enum SingletonTask {
INSTANCE;
private final Observable<Long> longRunningObservable = Observable.<Long>create(observableEmitter -> {
long timestamp = System.currentTimeMillis();
System.out.println("-> Started Task at " + (System.currentTimeMillis() - Main.START_TIME) + "ms");
Thread.sleep(3000);
System.out.println("-> Finished Task at " + (System.currentTimeMillis() - Main.START_TIME) + "ms");
observableEmitter.onNext(timestamp);
observableEmitter.onComplete();
}).share();
public static SingletonTask singletonTask() {
return INSTANCE;
}
public Observable<Long> getObservable() {
return longRunningObservable;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment