forked from Froussios/Intro-To-RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSample2_3.java
More file actions
236 lines (189 loc) · 7.5 KB
/
Sample2_3.java
File metadata and controls
236 lines (189 loc) · 7.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.operators.observable.ObservableInternalHelper;
public class Sample2_3 {
public void all_success() {
// The all method established that every value emitted by an observable meets a criterion.
Observable<Integer> values = Observable.create(o -> {
o.onNext(0);
o.onNext(10);
o.onNext(10);
o.onNext(2);
o.onComplete();
});
Disposable evenNumbers = values
.all(i -> i % 2 == 0)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
try {System.in.read();} catch (Exception ignore) {}
// As soon as an item fails the predicate, `false` will be emiited.
// A value of `true` on the other hand cannot be emitted until the source sequence has completed and `all` of the items are checked
// Returning the decision inside an observable is a convenient way of making the operation non-blocking
}
public void all_fail() {
Observable<Long> values = Observable.interval(150, TimeUnit.MILLISECONDS).take(5);
Disposable disposable = values
.all(i -> i < 3) // Will fail eventually
.subscribe(
v -> System.out.println("All: " + v),
e -> System.out.println("All Error: " + e)
);
Disposable disposable2 = values
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
try {System.in.read();} catch (Exception ignore) {}
}
public void all_error() {
// If the source observable emits an error, then `all` becomes irrelevant and the error pass through, terminating the sequence
Observable<Integer> values = Observable.create(o -> {
o.onNext(0);
o.onNext(2);
o.onError(new Exception());
});
Disposable disposable = values
.all(i -> i % 2 == 0)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
try {System.in.read();} catch (Exception ignore) {}
}
public void all_fail_before_error() {
// If, howeve, the predicate fails, then `false` is emitted and the sequence terminates. Even if the source observable fails after that,
// the event is ignroed, as required by the Rx contract ( no events after a termination event )
Observable<Integer> values = Observable.create( o -> {
o.onNext(1);
o.onNext(2);
o.onError(new Exception());
});
Disposable disposable = values
.all(i -> i % 2 == 0)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
}
public void exist() {
// Change to use `Maybe` in RxJava2 ?
// The exist method returns an observable that will emit `true` if any of the values emitted by the observable make the predicate true
// Observable<Integer> values = Observable.range(0, 2);
//
// Disposable disposable = values
// .exists(i -> i > 2)
// .subscribe(
// v -> System.out.println(v),
// e -> System.out.println("Error: " + e)
// );
}
public void isEmpty() {
// This operator's result is a boolean value, indecating if an observable emitted values before completing or not
Observable<Long> values = Observable.timer(1000, TimeUnit.MILLISECONDS);
Disposable disposable = values
.isEmpty()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
try {System.in.read();} catch (Exception ignore) {}
}
public void contains() {
// `contains` establishes if a particular element is emitted by an observable
// `contains` will use the `Object.equals` method to establish the quality
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Disposable disposable = values
.contains(4L) // if we had used 4 where we used 4L, nothing would be printed. 4 != 4L in java
.subscribe(
v -> System.out.println("Contains: " + v),
e -> System.out.println("Contains Error: " + e)
);
// Disposable disposable2 = values
// .subscribe(
// v -> System.out.println(v),
// e -> System.out.println("Error: " + e)
// );
try {System.in.read();} catch (Exception ignore) {}
}
public void defaultIfEmpty() {
// rather than checking with `isEmpty`
// you can force an observable to emit a value on completion if it didn't emit anything before completing
Observable<Integer> values = Observable.empty();
Disposable disposable = values
.defaultIfEmpty(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
}
public void defaultIfEmpty_error() {
// the default calue will not be emitted before the error
Observable<Integer> values = Observable.error(new Exception());
Disposable disposable = values
.defaultIfEmpty(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
}
public void elementAt() {
// you can select exactly one element out of an observable using the `elementAt` method
Observable<Integer> values = Observable.range(100, 10);
Disposable disposable = values
.elementAt(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
try {System.in.read();} catch (Exception ignore) {}
}
public void elementAtOrError() {
// to prevent `java.lang.IndexOutOfBoundsException`
// -> elementAtOrDefault is gone!
Observable<Integer> values = Observable.range(100, 10);
Disposable disposable = values
.elementAtOrError(22)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
try {System.in.read();} catch (Exception ignore) {}
}
public void sequenceEqual() {
// two sequences are equal by comparing the values at hte same index
// Both the size of the sequences and the values must be equal
// The function will either use `Object.equals` or the function that you supply to compare values
Observable<String> strings = Observable.just("1", "2", "3");
Observable<Integer> ints = Observable.just(1, 2, 3);
Observable.sequenceEqual(strings, ints, (s, i) -> s.equals(i.toString()))
//.sequenceEqual(strings, ints) -- result would be false
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
}
public void sequenceEqual_error() {
//failing is not part of the comparision. As soon as either sequence fails, the resulting observable forwards the error
Observable<Integer> values = Observable.create( o -> {
o.onNext(1);
o.onNext(2);
o.onError(new Exception());
});
Observable.sequenceEqual(values, values)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e)
);
}
public static void main(String[] args) {
Sample2_3 sample = new Sample2_3();
sample.sequenceEqual_error();
}
}