RxJava Creat操作符

RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列组合异步和基于事件的程序。

教程

抛物线大神的RxJava教程

依赖

1
2
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'

创建操作符

create

Create操作文档

摘要: 你可以使用Create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法。

最常用最基础的创建Observable对象的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("rxjava");
subscriber.onCompleted();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
});

from

from文档
摘要: 在RxJava中,from操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
String[] strs = {"hello","world","!"};
//将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
//from默认不在任何特定的调度器上执行。然而你可以将Scheduler
// 作为可选的第二个参数传递给Observable,它会在那个调度器上管理这个Future
Observable.from(strs)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: " + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "call: " + throwable.toString());
}
}, new Action0() {
@Override
public void call() {
Log.i(TAG, "call: complete");
}
});

just

just文档
摘要: Just将单个数据转换为发射那个数据的Observable。
Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据

1
2
3
4
5
6
Observable.just(1,2,3,4).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});

range

range文档
摘要: Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

RxJava将这个操作符实现为range函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。

range默认不在任何特定的调度器上执行。有一个变体可以通过可选参数指定Scheduler。

1
2
3
4
5
6
Observable.range(3,4).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer); // 3,4,5,6
}
});

从3开始发送4个Integer

timer

timer文档
创建一个Observable,它在一个给定的延迟后发射一个特殊的值。

1
2
3
4
5
6
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i(TAG, "call: "+aLong); //延时1秒执行
}
});

创建延时任务,每1秒后发射一次。

interval

interval文档
创建一个按固定时间间隔发射整数序列的Observable

1
2
3
4
5
6
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.i(TAG, "call: "+aLong);//0,1,2,3,4,5......
}
});

每隔一秒发送一个Integer数据。

repeat

repeat文档
创建一个发射特定数据重复多次的Observable
RxJava将这个操作符实现为repeat方法。它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数。

repeat操作符默认在trampoline调度器上执行。有一个变体可以通过可选参数指定Scheduler。

1
2
3
4
5
6
Observable.just(2,3).repeat(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});

重复发射两次Observable对象