RxJava 变换操作符

变换操作

摘抄自Reactive X文档中文翻译

map 对序列的每一项都应用一个函数来变换Observable发射的数据序列

flatmap 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
javajavajavajavajavajava
scan 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值

groupBy 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据

[buffer][5 ?
window 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项

map

map 对序列的每一项都应用一个函数来变换Observable发射的数据序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//比如将每个字符串拼接上 \n from magicer
String[] str = {"winter is coming","the king in the north"};
Observable.from(str).map(new Func1<String, String>() {
@Override
public String call(String s) {
return s+"\n from magicer";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
});
}

flatmap

flatmap 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable

1
2
3
4
5
6
7
8
9
10
11
12
//这里的nickName是个List<String>
Observable.from(mStudents).flatMap(new Func1<Student, Observable<String>>() {
@Override
public Observable<String> call(Student student) {
return Observable.from(student.getNickNames());
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
});

scan

scan 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值

scan

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1,2,3,4,5).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer+integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});

groupby

groupBy 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据

groupby

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.interval(1, TimeUnit.SECONDS).groupBy(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong %2;
}
}).subscribe(new Action1<GroupedObservable<Long, Long>>() {
@Override
public void call(final GroupedObservable<Long, Long> result) {
result.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i(TAG, "groupby call: "+aLong+" getKey:"+result.getKey());
}
});
}
});

buffer

buffer 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
buffer

1
2
3
4
5
6
7
8
Observable.interval(1,TimeUnit.SECONDS)
.buffer(3,TimeUnit.SECONDS)
.subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> longs) {
Log.i(TAG, "call: "+longs);
}
});

window

window 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。

window

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.interval(1,TimeUnit.SECONDS)
.window(3,TimeUnit.SECONDS)
.subscribe(new Action1<Observable<Long>>() {
@Override
public void call(final Observable<Long> longObservable) {
longObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i(TAG, "call: "+aLong);
}
});
}
});