RxJava2
RxJava是Java VM响应式编程扩展的实现,扩展了观察者模式,通过操作符对数据事件流操作,来编写异步和基于事件的程序,从而不用关心同步,线程安全并发等问题
- app/build.gradle
1
2
3
4
5
6
7
8
9
10
11
12implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
//retrofit
implementation 'com.squareup.retrofit2:retrofit:2.3.0'
//Gson converter
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
//RxJava2 Adapter
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
//okhttp
implementation 'com.squareup.okhttp3:okhttp:3.8.1'
implementation 'com.squareup.okhttp3:logging-interceptor:3.6.0'
事件流向
RxJava中的上下游对应被观察者Observable(发布事件)和观察者Observer(接收事件并处理),建立连接observable.subscribe(observer);
后才开始发送事件
普通方法使用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
observable.subscribe(observer);
}
2357-2357 D/MainActivity: 1
2357-2357 D/MainActivity: 2
2357-2357 D/MainActivity: 3
2357-2357 D/MainActivity: complete上游发送onComplete()/onError之后可以继续发送事件,但是下游不处理,onComplete()/onError()唯一且互斥,多个onComplete()不一定会Crash,但是多个onError()会Crash
Lambda 链式调用,后面的例子都用这种调用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext("2");
emitter.onNext(3);
emitter.onComplete();
// emitter.onError(new NullPointerException("ERROR"));
// emitter.onError(new NullPointerException("ERROR"));
}).subscribe(object -> Log.d(TAG, String.valueOf(object)),
throwable -> Log.e(TAG, throwable.getMessage()),
() -> Log.d(TAG, "onComplete"));
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).subscribe(integer -> Log.e(TAG, String.valueOf(integer)),
throwable -> Log.e(TAG, throwable.getMessage()),
() -> Log.d(TAG, "onComplete"));
上述两种打印结果相同
2682-2682 D/MainActivity: 1
2682-2682 D/MainActivity: 2
2682-2682 D/MainActivity: 3
2584-2584 E/MainActivity: onCompletedispose()
调用处理完事件,处理完即可丢弃,但是上游可以继续发送事件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed
*/
boolean isDisposed();
}
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
emitter.onNext(3);
}).subscribe(new Observer<Integer>() {
Disposable mD;
@Override
public void onSubscribe(Disposable d) {
mD = d;
Log.e(TAG, "onSubscribe " + d);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext=" + integer);
if (integer == 2) {
mD.dispose();
Log.e(TAG, "mD=" + mD.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError=" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
2800-2800 E/MainActivity: onSubscribe null
2800-2800 E/MainActivity: onNext=1
2800-2800 E/MainActivity: onNext=2
2800-2800 E/MainActivity: mD=true线程控制,上游在子线程,下游在主线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55Observable.create(emitter -> {
Log.e(TAG, Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext("2");
emitter.onNext(3);
emitter.onComplete();
}).subscribeOn(Schedulers.io())//指定上游线程,调用多次,只有第一次设置有效
.observeOn(AndroidSchedulers.mainThread())//多次调用下游线程,每次都会切换
.subscribe(object -> {
Log.e(TAG, Thread.currentThread().getName());
Log.e(TAG, String.valueOf(object));
}, throwable -> {
Log.e(TAG, throwable.getMessage());
}, () -> {
Log.e(TAG, "onComplete");
});
3102-3117 E/MainActivity: RxCachedThreadScheduler-1
3102-3102 E/MainActivity: main
3102-3102 E/MainActivity: 1
3102-3102 E/MainActivity: main
3102-3102 E/MainActivity: 2
3102-3102 E/MainActivity: main
3102-3102 E/MainActivity: 3
3102-3102 E/MainActivity: onComplete
相同例子
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
Log.e(TAG, "emitter =" + Thread.currentThread().getName());
}).subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(object -> {
Log.e(TAG, "observeOn mainThread=" + Thread.currentThread().getName());
})
.observeOn(Schedulers.io())
.doOnNext(object -> {
Log.e(TAG, "observeOn io=" + Thread.currentThread().getName());
}).subscribe(object -> {
Log.e(TAG, "subscribe=" + Thread.currentThread().getName());
Log.e(TAG, "subscribe=" + object);
});
3379-3395 E/MainActivity: emitter =RxCachedThreadScheduler-1
3379-3379 E/MainActivity: observeOn mainThread=main
3379-3379 E/MainActivity: observeOn mainThread=main
3379-3398 E/MainActivity: observeOn io=RxCachedThreadScheduler-2
3379-3398 E/MainActivity: subscribe=RxCachedThreadScheduler-2
3379-3398 E/MainActivity: subscribe=1
3379-3398 E/MainActivity: observeOn io=RxCachedThreadScheduler-2
3379-3398 E/MainActivity: subscribe=RxCachedThreadScheduler-2
3379-3398 E/MainActivity: subscribe=2网络请求
定义API
1
2
3
4
5
6
7public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}OkHttpClient,Retrofit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private void initOkHttp() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(10, TimeUnit.SECONDS);
builder.writeTimeout(10, TimeUnit.SECONDS);
builder.retryOnConnectionFailure(true);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
okHttpClient = builder.build();
}
private <T> T getApiService(String baseUrl, Class<T> clz) {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(baseUrl)
.client(okHttpClient)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
return retrofit.create(clz);
}请求网络
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28loginApi.login(loginRequest)
//在IO线程进行网络请求
.subscribeOn(Schedulers.io())
//回到主线程去处理请求结果
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {
//Activity退出时结束事件 CompositeDisposable.clear()
compositeDisposable.add(d);
}
@Override
public void onNext(LoginResponse value) {
Log.e(TAG, "value=" + value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
Toast.makeText(context, "onComplete", Toast.LENGTH_LONG).show();
}
});
读写数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24Observable.create((ObservableOnSubscribe<List<String>>) e -> {
// Cursor cursor = null;
// try {
// cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
// List<Record> result = new ArrayList<>();
// while (cursor.moveToNext()) {
// result.add(Db.Record.read(cursor));
// }
// emitter.onNext(result);
// emitter.onComplete();
// } finally {
// if (cursor != null) {
// cursor.close();
// }
// }
List<String> result = new ArrayList<>();
result.add("1");
result.add("2");
result.add("3");
e.onNext(result);
e.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(strings -> Log.e(TAG, strings.toString()));map操作符,可以将上游发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}).map(integer -> "result " + integer)
.subscribe(string -> Log.e(TAG, "map " + string),
throwable -> Log.e(TAG, throwable.getMessage()),
() -> Log.e(TAG, "onComplete"));
E/MainActivity: map result 1
E/MainActivity: map result 2
E/MainActivity: map result 3
E/MainActivity: onCompleteFlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发送的事件合并成一个单独的Observable.flatMap并不保证事件的顺序,如果需要保证顺序则需要使用concatMap,下面例子flatmap可以改为concatMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).flatMap(integer -> {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
}).subscribe(string -> Log.e(TAG, string),
throwable -> Log.e(TAG, throwable.getMessage()),
() -> Log.e(TAG, "onComplete"));
E/MainActivity: I am value 1
E/MainActivity: I am value 1
E/MainActivity: I am value 1
E/MainActivity: I am value 2
E/MainActivity: I am value 3
E/MainActivity: I am value 2
E/MainActivity: I am value 2
E/MainActivity: I am value 3
E/MainActivity: I am value 3
E/MainActivity: onComplete嵌套请求,注册登陆
1
2
3
4
5
6
7
8
9
10loginApi.register(registerRequest)//发起注册请求
.subscribeOn(Schedulers.io())//io线程注册
.observeOn(AndroidSchedulers.mainThread())//注册结果在主线程
.doOnNext(registerResponse -> Log.e(TAG, "registerResponse"))//注册结果
.subscribeOn(Schedulers.io())//在io线程登陆
.flatMap(mapper -> loginApi.login(new LoginRequest()))//登陆
.observeOn(AndroidSchedulers.mainThread())//登陆后回到主线程
.subscribe(loginResponse -> Log.e(TAG, "login success"),
throwable -> Log.e(TAG, "login failure " + throwable.getMessage()),
() -> Log.e(TAG, "onComplete"));map与flatmap 区别,map在返回对象集合时,subscribe中需要循环迭代取出元素处理,而flatmap将集合通过Observable.fromIterable迭代获取对象变换为多个Observable发送到下游分别处理,效率更高,在多个同步任务中,推荐使用flatmap
- 当发送非Observable对象时,使用map,返回的是普通对象或集合
- 当发送的是Observable对象时,使用flatmap,返回的是
Observable<T>
1
2
3
4
5
6
7
8
9
10
11
12
13
14E/MainActivity: map [I am value 1, I am value 1, I am value 1]
E/MainActivity: map [I am value 2, I am value 2, I am value 2]
E/MainActivity: map [I am value 3, I am value 3, I am value 3]
E/MainActivity: onComplete
E/MainActivity: I am value 1
E/MainActivity: I am value 1
E/MainActivity: I am value 1
E/MainActivity: I am value 2
E/MainActivity: I am value 2
E/MainActivity: I am value 2
E/MainActivity: I am value 3
E/MainActivity: I am value 3
E/MainActivity: I am value 3
E/MainActivity: onComplete
zip操作符,将多个Observable事件按照顺序整合在一起,发送组合在一起的事件,Observable o1 发送4个数据,Observable o2 发送3个数据,最后整合按照个数最少的进行整合
顺序执行,在同一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37Observable<Integer> o1 = Observable.create(emitter -> {
emitter.onNext(1);
Log.e(TAG,"onNext 1");
emitter.onNext(2);
Log.e(TAG,"onNext 2");
emitter.onNext(3);
Log.e(TAG,"onNext 3");
emitter.onNext(4);
Log.e(TAG,"onNext 4");
emitter.onComplete();
});
Observable<String> o2 = Observable.create(emitter -> {
emitter.onNext("A");
Log.e(TAG,"onNext A");
emitter.onNext("B");
Log.e(TAG,"onNext B");
emitter.onNext("C");
Log.e(TAG,"onNext C");
emitter.onComplete();
});
Observable.zip(o1, o2,
(integer, string) -> integer + string)
.subscribe(string -> Log.e(TAG, string),
throwable -> Log.e(TAG, throwable.getMessage()),
() -> Log.e(TAG, "onComplete"));
E/MainActivity: onNext 1
E/MainActivity: onNext 2
E/MainActivity: onNext 3
E/MainActivity: onNext 4
E/MainActivity: 1A
E/MainActivity: onNext A
E/MainActivity: 2B
E/MainActivity: onNext B
E/MainActivity: 3C
E/MainActivity: onNext C
E/MainActivity: onComplete不在同一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54Observable<Integer> observable1 = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create((ObservableOnSubscribe<String>) emitter -> {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
(integer, string) -> integer + string)
.subscribe(string -> Log.e(TAG, string),
throwable -> Log.e(TAG, throwable.getMessage()),
() -> Log.e(TAG, "onComplete"));
D/MainActivity: emit 1
D/MainActivity: emit A
E/MainActivity: 1A
D/MainActivity: emit 2
D/MainActivity: emit B
E/MainActivity: 2B
D/MainActivity: emit 3
D/MainActivity: emit 4
D/MainActivity: emit complete1
D/MainActivity: emit C
E/MainActivity: 3C
D/MainActivity: emit complete2
E/MainActivity: onComplete需要在获取到用户基本信息和其他信息之后显示界面,使用zip
1
2
3
4
5
6
7
8
9Observable<UserBaseInfoResponse> observable1 = userApi.getUserBaseInfo(baseInfoRequest).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 = userApi.getUserExtraInfo(extraInfoRequest).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, UserInfo::new)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(userInfo -> {
}, throwable -> {
});
Backpressure 控制发送事件的速率,同一线程下,上游发送的事件,需要等待下游处理完再发送下个事件,在不同的线程,就不用等下游是否处理完,会造成上游不断发送事件,造成OOM
过滤
1
2
3
4
5
6
7
8Observable.create((ObservableOnSubscribe<Integer>) e -> {
for (int i = 0; ; i++) {
e.onNext(i);
}
}).subscribeOn(Schedulers.io())
.filter(integer -> integer % 100 == 0)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> Log.d(TAG, String.valueOf(integer)));2s采样
1
2
3
4
5
6
7
8Observable.create((ObservableOnSubscribe<Integer>) e -> {
for (int i = 0; ; i++) {
e.onNext(i);
}
}).subscribeOn(Schedulers.io())
.sample(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> Log.d(TAG, "" + integer));延时发送
1
2
3
4
5
6
7
8Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000); //每次发送完事件延时2秒
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> Log.d(TAG, "" + integer));
Flowable,上游是Flowable,下游是Subscriber,两者之间通过subscribe()连接,当第二个参数为
BackpressureStrategy.ERROR
,上下游不均衡时抛出MissingBackpressureException1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe ");
s.request(3);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "" + integer);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
D/MainActivity: onSubscribe
D/MainActivity: 1
D/MainActivity: 2
D/MainActivity: 3
E/MainActivity: onCompleteSubscription 同Disposable类似,可以调用
Subscription.cancel()
取消事件的执行,增加一个void request(long n)
, 如果下游没有调用request, 上游就认为下游没有处理事件的能力,而这又是一个同步的订阅,会造成上游等待处理界面卡死,直接抛异常.所以下游调用request(Long.MAX_VALUE)或者根据上游发送事件的数量request(3)当上下游不在同一个线程,因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时,上游就会先把事件发送到这个水缸中,因此, 下游虽然没有调用request,但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29Flowable.create((FlowableOnSubscribe<Integer>) e -> {
for (int i = 0; i < 129; i++) {
Log.d(TAG, "emit " + i);
e.onNext(i);
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete ");
}
});更改背压策略
BackpressureStrategy.BUFFER
,事件发送处理类似Observable,Observer,但是当无限发送事件,内存会OOM,需要采用其他策略1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29Flowable.create((FlowableOnSubscribe<Integer>) e -> {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "emit " + i);
e.onNext(i);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete ");
}
});更改为
BackpressureStrategy.DROP
丢弃存不下的数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39Flowable.create((FlowableOnSubscribe<Integer>) e -> {
for (int i = 0; ; i++) {
e.onNext(i);
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
Subscription mSubscription;
Runnable runnable = new Runnable() {
@Override
public void run() {
mSubscription.request(128);
text.postDelayed(runnable, 2000);
}
};
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
text.postDelayed(runnable, 2000);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});更改为
BackpressureStrategy.LATEST
保存最新的数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39Flowable.create((FlowableOnSubscribe<Integer>) e -> {
for (int i = 0; ; i++) {
e.onNext(i);
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
Subscription mSubscription;
Runnable runnable = new Runnable() {
@Override
public void run() {
mSubscription.request(128);
text.postDelayed(runnable, 2000);
}
};
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
text.postDelayed(runnable, 2000);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});interval操作符发送Long型的数据,下面的例子,每隔一毫秒发送事件,下游一秒处理一个事件,直接发送会抛异常,解决办法加背压策略
- onBackpressureBuffer()
- onBackpressureDrop()
- onBackpressureLatest()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30Flowable.interval(1, TimeUnit.MICROSECONDS)
// .onBackpressureDrop()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
FlowableEmiiter 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable s);
void setCancellable(@Nullable Cancellable c);
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
boolean isCancelled();
@NonNull
FlowableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}requested为当前下游处理能力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27Flowable.create((FlowableOnSubscribe<Integer>) e ->
Log.e(TAG, "current request=" + e.requested()),
BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
s.request(10);
s.request(100);// current request 110;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});下游处理完一个事件,requested-1,complete和error事件不会消耗requested,当requested=0,上游继续发送事件,会抛异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39Flowable.create((FlowableOnSubscribe<Integer>) e -> {
Log.e(TAG, "current request=" + e.requested());
Log.e(TAG, "emitter 1");
e.onNext(1);
Log.e(TAG, "current request=" + e.requested());
Log.e(TAG, "emitter 2");
e.onNext(2);
Log.e(TAG, "current request=" + e.requested());
Log.e(TAG, "emitter 3");
e.onNext(3);
Log.e(TAG, "current request=" + e.requested());
e.onComplete();
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29Flowable
.create((FlowableOnSubscribe<Integer>) emitter -> {
Log.d(TAG, "current requested: " + emitter.requested());//还是128
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(1000);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});上游发送完128个事件,下游处理完96个事件后,上游继续发送128个事件,下游继续处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
Log.d(TAG, "First requested = " + emitter.requested());
boolean flag;
for (int i = 0; ; i++) {
flag = false;
while (emitter.requested() == 0) {
if (!flag) {
Log.d(TAG, "Oh no! I can't emit value!");
flag = true;
}
}
emitter.onNext(i);
Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(final Subscription s) {
Log.d(TAG, "onSubscribe");
text.postDelayed(() -> {
//在处理完96个事件后,上游可以继续发送emit 128-223 共96个
//95s时,上游不发送事件
s.request(95);
}, 1000);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});一行一行读取文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63Flowable.create((FlowableOnSubscribe<String>) e -> {
try {
// URL url=getClass().getResource("test.txt");
// Log.e(TAG,"url="+url);
// FileReader reader =new FileReader(url.getFile());
// FileReader reader =new FileReader("file:///android_asset/test.txt");
InputStream inputStream = getResources().getAssets().open("test.txt");
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String str;
while ((str = bufferedReader.readLine()) != null && !e.isCancelled()) {
while (e.requested() == 0) {
if (e.isCancelled()) break;
}
e.onNext(str);
}
bufferedReader.close();
inputStreamReader.close();
inputStream.close();
e.onComplete();
} catch (IOException e1) {
e1.printStackTrace();
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(1);
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext=" + s);
try {
Thread.sleep(2000);
mSubscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
System.out.println(t);
}
@Override
public void onComplete() {
}
});
E/MainActivity: onNext=AAAAAAAAAAAA
E/MainActivity: onNext=BBBBBBBB
E/MainActivity: onNext=CCCCCCC
E/MainActivity: onNext=DDDDD
E/MainActivity: onNext=EE
E/MainActivity: onNext=F