Created
May 22, 2017 09:58
-
-
Save mkrajc/de8bd66e3528af774e4e6b7deb0c691e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.mech; | |
import java.util.concurrent.TimeUnit; | |
import io.reactivex.Observable; | |
import io.reactivex.Scheduler; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.observables.ConnectableObservable; | |
import io.reactivex.schedulers.Schedulers; | |
public class RxDynamicTimer { | |
private final Speed speed; | |
private Disposable subsription; | |
private final ConnectableObservable<Integer> dynamicTimer; | |
private Scheduler scheduler = Schedulers.single(); | |
public RxDynamicTimer(Integer milis) { | |
this.speed = new Speed(milis); | |
this.dynamicTimer = setupAndPublish(); | |
} | |
public Observable<Integer> getTimer() { | |
return dynamicTimer; | |
} | |
public Speed getSpeed() { | |
return speed; | |
} | |
public void start() { | |
if (subsription == null) { | |
// start the dynamic speed | |
this.subsription = dynamicTimer.connect(); | |
} | |
} | |
public void stop() { | |
if (subsription != null || !subsription.isDisposed()) { | |
subsription.dispose(); | |
//scheduler.shutdown(); | |
} | |
} | |
private ConnectableObservable<Integer> setupAndPublish() { | |
// create dynamic speed | |
return createTimer() | |
// some logging | |
.doOnNext(i -> RxDynamicTimerTest.log("Got: " + i)) | |
// publish it, so only one subscription is done to createTimer internally | |
// but it will not start until connect is invoked | |
.publish(); | |
} | |
private Observable<Integer> createTimer() { | |
// recursively delay emission of item based on speed | |
return Observable.just(1).delay(speed.getStep(), TimeUnit.MILLISECONDS, scheduler) | |
// lazy concat with itself | |
.concatWith(Observable.defer(this::createTimer)); | |
} | |
// Speed provides current speed of item emission | |
public static class Speed { | |
private Integer step = 1000; | |
Speed(Integer milis) { | |
this.step = milis; | |
} | |
public Integer getStep() { | |
return step; | |
} | |
public void speedUp() { | |
step /= 2; | |
System.out.println("Current step " + step + " ms"); | |
} | |
public void slowDown() { | |
step *= 2; | |
System.out.println("Current step " + step + " ms"); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.mech; | |
import io.reactivex.annotations.NonNull; | |
import io.reactivex.observers.DisposableObserver; | |
public class RxDynamicTimerTest { | |
private static Long start; | |
public static void main(String[] args) throws InterruptedException { | |
start = System.currentTimeMillis(); | |
log("Starting"); | |
final RxDynamicTimer rxDynamicTimer = new RxDynamicTimer(1000); | |
final DynamicTimerSubscriber subA = new DynamicTimerSubscriber("SubA"); | |
final DynamicTimerSubscriber subB = new DynamicTimerSubscriber("SubB"); | |
log("Observers created"); | |
rxDynamicTimer.getTimer().subscribe(subA); | |
rxDynamicTimer.getTimer().subscribe(subB); | |
log("Observers subscribed."); | |
waitFor(1); | |
log("Start dynamic timer"); | |
rxDynamicTimer.start(); | |
waitFor(3); | |
log("Increase speed"); | |
rxDynamicTimer.getSpeed().speedUp(); | |
waitFor(3); | |
log("Unsubscribe A"); | |
subA.dispose(); | |
waitFor(3); | |
log("Unsubscribe B"); | |
subB.dispose(); | |
waitFor(3); | |
log("Stop timer."); | |
rxDynamicTimer.stop(); | |
log("End"); | |
} | |
private static void waitFor(Integer seconds) throws InterruptedException { | |
log("Waiting " + seconds + " second" + ((seconds > 1) ? "s" : "") + " ..."); | |
Thread.sleep(seconds * 1000); | |
} | |
public static void log(String s) { | |
log("APP ", s); | |
} | |
private static void log(String src, String s) { | |
System.out | |
.println("[" + (System.currentTimeMillis() - start) + "]\t" + src + "\t[" + Thread.currentThread().getName() + "]\t" + s); | |
} | |
private static class DynamicTimerSubscriber extends DisposableObserver<Integer> { | |
private String name; | |
DynamicTimerSubscriber(String n) { | |
this.name = n; | |
} | |
@Override | |
protected void onStart() { | |
log("onStart"); | |
} | |
@Override | |
public void onNext(@NonNull final Integer i) { | |
log("onNext: " + String.valueOf(i)); | |
} | |
@Override | |
public void onError(@NonNull final Throwable e) { | |
e.printStackTrace(); | |
log("onError: " + e.getClass()); | |
} | |
@Override | |
public void onComplete() { | |
log("onComplete"); | |
} | |
private void log(String s) { | |
RxDynamicTimerTest.log(name, s); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment