Skip to content

Instantly share code, notes, and snippets.

@joaocsousa
Last active January 29, 2018 09:37
Show Gist options
  • Save joaocsousa/f3453026954930c203fa7a22a495a9b7 to your computer and use it in GitHub Desktop.
Save joaocsousa/f3453026954930c203fa7a22a495a9b7 to your computer and use it in GitHub Desktop.
Shared observable that will run a task. Every subscriber that subscribes while the task is running will share the same observable and receive the same result. When the tasks finishes the next subscriber will trigger a new task.
[1] subscribed at 234ms
-> Started Task at 238ms
[2] subscribed at 1238ms
[3] subscribed at 2239ms
-> Finished Task at 3242ms
[1] Received result. Value: 55
[4] subscribed at 3243ms
[2] Received result. Value: 55
-> Started Task at 3243ms
[3] Received result. Value: 55
[5] subscribed at 4247ms
-> Finished Task at 6245ms
[4] Received result. Value: 27
[5] Received result. Value: 27
public class Main {
public static final long START_TIME = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
SingletonTask singletonTask = singletonTask();
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[1] Received result. Value: " + result));
System.out.println("[1] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[2] Received result. Value: " + result));
System.out.println("[2] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[3] Received result. Value: " + result));
System.out.println("[3] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[4] Received result. Value: " + result));
System.out.println("[4] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(1000);
singletonTask.getObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(result -> System.out.println("[5] Received result. Value: " + result));
System.out.println("[5] subscribed at " + (System.currentTimeMillis() - START_TIME) + "ms");
Thread.sleep(20000);
}
}
public enum SingletonTask {
INSTANCE;
private final Random randomGenerator = new Random(System.currentTimeMillis());
private final Observable<Integer> longRunningObservable = Observable.<Integer>create(observableEmitter -> {
int result = randomGenerator.nextInt(100);
System.out.println("-> Started Task at " + (System.currentTimeMillis() - Main.START_TIME) + "ms");
Thread.sleep(3000);
System.out.println("-> Finished Task at " + (System.currentTimeMillis() - Main.START_TIME) + "ms");
observableEmitter.onNext(result);
observableEmitter.onComplete();
}).share();
public static SingletonTask singletonTask() {
return INSTANCE;
}
public Observable<Integer> getObservable() {
return longRunningObservable;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment