Last active
April 12, 2018 05:45
-
-
Save JvmName/cef01863de4e0948ab63875555f8a4a0 to your computer and use it in GitHub Desktop.
A quick-n-dirty hybrid of RxJava's Map + Filter operators
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Observable; | |
import io.reactivex.Observer; | |
import io.reactivex.annotations.NonNull; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.exceptions.Exceptions; | |
import io.reactivex.functions.Function; | |
import io.reactivex.functions.Predicate; | |
import io.reactivex.internal.disposables.DisposableHelper; | |
import io.reactivex.internal.functions.ObjectHelper; | |
import io.reactivex.plugins.RxJavaPlugins; | |
public class ObservableFilterMap<In, Out> extends Observable<Out> { | |
private final Function<In, Out> mapper; | |
private final Predicate<In> filter; | |
public ObservableFilterMap(Predicate<In> filter, Function<In, Out> mapper) { | |
this.filter = filter; | |
this.mapper = mapper; | |
} | |
@Override | |
protected void subscribeActual(Observer<? super Out> observer) { | |
new MapFilterObserver<>(observer, filter, mapper); | |
} | |
static final class MapFilterObserver<In, Out> implements Observer<In>, Disposable{ | |
Disposable s; | |
private final Observer<? super Out> downstream; | |
private final Predicate<In> filter; | |
private final Function<In, Out> mapper; | |
public MapFilterObserver(Observer<? super Out> downstream, Predicate<In> filter, Function<In, Out> mapper) { | |
this.downstream = downstream; | |
this.filter = filter; | |
this.mapper = mapper; | |
} | |
@Override | |
public void onSubscribe(@NonNull Disposable d) { | |
if (DisposableHelper.validate(this.s, d)) { | |
this.s = d; | |
downstream.onSubscribe(this); | |
} | |
} | |
@Override | |
public void onNext(@NonNull In t) { | |
boolean b; | |
try { | |
b = filter.test(t); | |
} catch (Throwable e) { | |
fail(e); | |
return; | |
} | |
if (b) { | |
Out v; | |
try { | |
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); | |
} catch (Throwable ex) { | |
fail(ex); | |
return; | |
} | |
downstream.onNext(v); | |
} | |
} | |
@Override | |
public void onError(@NonNull Throwable e) { | |
downstream.onError(e); | |
} | |
@Override | |
public void onComplete() { | |
downstream.onComplete(); | |
} | |
@Override | |
public void dispose() { | |
s.dispose(); | |
} | |
@Override | |
public boolean isDisposed() { | |
return s.isDisposed(); | |
} | |
protected final void fail(Throwable t) { | |
Exceptions.throwIfFatal(t); | |
s.dispose(); | |
onError(t); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thanks to @artem-zinnatullin for the suggestions!