forked from Froussios/Intro-To-RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSample2_4.java
More file actions
354 lines (280 loc) · 13.5 KB
/
Sample2_4.java
File metadata and controls
354 lines (280 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.subjects.ReplaySubject;
import io.reactivex.subjects.Subject;
import model.Person;
import subscriber.PrintSubscriber;
public class Sample2_4 {
// How we can use the data in the sequence to derive new meaningful values
// The methods we will see here resemble what is called catamorphism.
// In our case, it would mean that the methods consume the values in the sequence and compose them into one
// However, they do not strictly meet the definition, as they don't return a single value. Rather, they return an observable that promises to emit a single value
public void count() {
Observable<Integer> values = Observable.range(0,3);
values
.subscribe(new PrintSubscriber("Values"));
values
.count()
.subscribe(new PrintSubscriber("Count"));
}
public void first() {
// `first` will return an observable that emits only the first value in sequence
// It is similar to `take(1)`, except that it will emit `java.util.NoSuchElementException` if noe is found.
// If you use the overload that takes a predicate, the first value that matches the predicate is returned
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
values
.first(1000L)
.subscribe(new PrintSubscriber("First"));
}
public void last() {
// `last` and `lastOrDefault` work in the same way as `first`,
// except that the item returned is the last item before sequence completed
Observable<Integer> values = Observable.range(0, 10);
values
.last(1234)
.subscribe(new PrintSubscriber("Last"));
}
public void single() {
// `single` emits the only value in the sequence, or the only value that met predicate when is given.
// It differs from `first` and `last` in that it does not ignore multiple matches.
// If multiple matches are found, it will emit an error. It can be used to assert that a sequence must only contain one such value.
// Remember that `single` must check the entire sequence to ensure your assertion
//Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Observable<Integer> values = Observable.range(0,0);
//Observable<Integer> values = Observable.range(0,1);
values.take(10)
.single(5) // Emits a result
.subscribe(new PrintSubscriber("Single1"));
values.take(3)
.single(6)
.subscribe(new PrintSubscriber("Single2"));
}
public void reduce() {
// The general idea is that you produce a single value out of many by combining them two at a time
// In its most basic overload, all you need is a function that combines two values into one.
// In example, Here we will calculate the sum of a sequence of integers: 0+1+2+3+4+.... We will also calculate the minimum value for a different example
// 0 1 2 3 4
Observable<Integer> values = Observable.range(0,5);
values
.reduce((i1, i2) -> i1+i2)
.subscribe(new PrintSubscriber("Sum"));
values
.reduce((i1,i2) -> (i1>i2) ? i2 : i1)
.subscribe(new PrintSubscriber("Min"));
//Each time, the accumulator function combines the result of the previous step with the next value.
// This is more obvious in another overload
// public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T,R> accumulator)
}
public void count_via_reduce() {
Observable<String> values = Observable.just("Rx", "is", "easy");
values
.reduce(0, (acc, next) -> acc + 1)
.subscribe(new PrintSubscriber("Count"));
// We start with an accumulator of 0, as we have counted 0 items.
// Every time a new item arrives, we return a new accumulator that is increased by one.
// The Last value corresponds to the number of elements in the source sequence
// `reduce` can be used to implement the functionality of most of the operators that emit a single value.
// It can not implement behaviour where a value is emitted before the source completes.
// So, you can implement `last` using `reduce`, but an implementation of `all` would not behave exactly like the original
values
.reduce((i1,i2) -> i2)
.subscribe(new PrintSubscriber("Last"));
}
public void scan() {
// `scan` is very similar to `reduce`, with the key difference being that `scan` will emit all the intermediate result
// In the case of our example for a sum, using `scan` will produce a running sum
Observable<Integer> values = Observable.range(0, 5);
values
.scan((i1, i2) -> i1+i2)
.subscribe(new PrintSubscriber("Sum"));
// `scan` is more general than `reduce`, since `reduce` can be implemented with `scan: reduce(acc) = scan(acc).takeLast()`
}
public void scan_minimum() {
// `scan` emits when the source emits and does not need the source to complete.
// We demonstrate that by implementing an observable that returns a running minimu,:
Subject<Integer> values = ReplaySubject.create();
values
.subscribe(new PrintSubscriber("Values"));
values
.scan((i1,i2)->(i1<i2) ? i1: i2)
.distinctUntilChanged()
.subscribe(new PrintSubscriber("Min"));
values.onNext(2);
values.onNext(3);
values.onNext(1);
values.onNext(4);
values.onComplete();
}
//Aggregation to collections
public void aggregate() {
// In `reduce` nothing is stopping your accumulator from being a collection
// you can use `reduce` to collect every value in `Observable<T>` into a `List<T>`
Observable<Integer> values = Observable.range(10, 5);
values
.reduce(
new ArrayList<Integer>(),
(acc, value) -> {
acc.add(value);
return acc;
})
.subscribe(v -> System.out.println(v));
}
public void aggregate_right() {
// `aggregate()` has a problem formality: `reduce` is meant to be a functional fold and such folds are not supposed to work on mutable accumulators.
// If we were to do this the "right" way, we would have to create a new instance of `ArrayList<Integer>` for every new item, like this
Observable<Integer> values = Observable.range(10, 5);
values
.reduce(
new ArrayList<Integer>(),
(acc, value) -> {
ArrayList<Integer> newAcc = (ArrayList<Integer>) acc.clone();
newAcc.add(value);
return newAcc;
}
).subscribe(v -> System.out.println(v));
}
public void collect() {
// The performance of creating a new collection for every new item is unacceptable. For that reason, Rx offers the `collect` operator,
// which does the same thing as `reduce`, only using a mutable accumulator this time
// By using `collect` you document that you are not following the convention of immutability and you also simplify tour code a little:
Observable<Integer> values = Observable.range(10, 5);
values
.collect(
() -> new ArrayList<Integer>(),
(acc, value) -> acc.add(value))
.subscribe(v -> System.out.println(v));
}
public void toList() {
Observable<Integer> values = Observable.range(10, 5);
values
.toList()
.subscribe(v -> System.out.println(v));
}
public void toSortedList() {
Observable<Integer> values = Observable.range(10, 5);
values
.toSortedList((i1,i2) -> i2 - i1)
.subscribe(v -> System.out.println(v));
}
// `keySelector` is a function that produces a key from a value
// `valueSelector` produces from the emitted value the actual value that will be stored in the map
// `mapFactory` creates the collection that will hold the items
public void toMap_simple() {
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(person -> person.name)
.subscribe(new PrintSubscriber("toMap"));
}
public void toMap_key_value() {
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(
person -> person.name,
person -> person.age)
.subscribe(new PrintSubscriber("toMap"));
}
public void toMap_key_value_container() {
Observable<Person> values = Observable.just(
new Person("Will", 25),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMap(
person -> person.name,
person -> person.age,
() -> new HashMap<>())
.subscribe(new PrintSubscriber("toMap"));
// The container is provided as a factory function because a new container needs to be created for every new subscription
}
// When mapping, it is very common that many values share the same key.
// The datastrcuture that maps one key to multiple values is called a multimap and it is a map from keys to collections.
// This process can also be called "grouping"
public void multiMap_grouping() {
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
person -> person.age,
person -> person.name
)
.subscribe(new PrintSubscriber("toMap"));
}
public void multiMap_container() {
// The fourth allows us to provide not only the `Map` but also the `Collection` that the values will be stored in
// The key is provided as a parameter, in case we want to customise the corresponding collection based on key
// This example we'll just ignore it
Observable<Person> values = Observable.just(
new Person("Will", 35),
new Person("Nick", 40),
new Person("Saul", 35)
);
values
.toMultimap(
person -> person.age,
person -> person.name,
() -> new HashMap(),
(key) -> new ArrayList()
).subscribe(new PrintSubscriber("toMap"));
// The operators just presented have actually limited use.
// It is tempting for a beginner to collect the data in a collection and process them in the traditional way.
// That should be avoided not just for didactic purpose, but because this practice defeats the advantages of using Rx in the first place
}
public void groupBy() {
// The last general function that we will see for now is `groupBy`
// It is the Rx way of doing `toMultimap`
// For each value, it calculates a key and groups the values into seperate observables based on that key
//The return value is an observable of `GroupObservable`
// The nested observables may complicate the signature, but they offer the advantage of allowing the groups to start emitting their items before the source observable has completed
// In example, we will take a set of words and, for each starting letter, we will print the last word that occured
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
values.groupBy(word -> word.charAt(0))
.subscribe(
group -> group.last("last")
.subscribe(v -> System.out.println(group.getKey() + ": " + v))
);
}
public void flatMap() {
Observable<String> values = Observable.just(
"first",
"second",
"third",
"forth",
"fifth",
"sixth"
);
// values.groupBy(word -> word.charAt(0))
// .flatMap(group -> group.last("last")
// .map(v -> group.getKey() + ": " + v))
// .subscribe(v -> System.out.println(v));
//FIXME
values.groupBy(word -> word.charAt(0))
//.flatMap((groupedObservable, mapper) -> mapper.)
.subscribe(v -> System.out.println(v));
}
public static void main(String[] args) {
Sample2_4 sample = new Sample2_4();
sample.groupBy();
try {System.in.read();} catch (Exception ignore) {}
}
}