RxJava2

RxJava2

RxJava是Java VM响应式编程扩展的实现,扩展了观察者模式,通过操作符对数据事件流操作,来编写异步和基于事件的程序,从而不用关心同步,线程安全并发等问题

  • app/build.gradle
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

    //retrofit
    implementation 'com.squareup.retrofit2:retrofit:2.3.0'
    //Gson converter
    implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
    //RxJava2 Adapter
    implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
    //okhttp
    implementation 'com.squareup.okhttp3:okhttp:3.8.1'
    implementation 'com.squareup.okhttp3:logging-interceptor:3.6.0'
  • 事件流向
    RxJava中的上下游对应被观察者Observable(发布事件)和观察者Observer(接收事件并处理),建立连接observable.subscribe(observer);后才开始发送事件
    普通方法使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    Observable<Integer> observable = Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
    });
    Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer value) {
    Log.d(TAG, "" + value);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
    Log.d(TAG, "complete");
    }
    };
    observable.subscribe(observer);
    }

    2357-2357 D/MainActivity: 1
    2357-2357 D/MainActivity: 2
    2357-2357 D/MainActivity: 3
    2357-2357 D/MainActivity: complete
  • 上游发送onComplete()/onError之后可以继续发送事件,但是下游不处理,onComplete()/onError()唯一且互斥,多个onComplete()不一定会Crash,但是多个onError()会Crash

  • Lambda 链式调用,后面的例子都用这种调用方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext("2");
    emitter.onNext(3);
    emitter.onComplete();
    // emitter.onError(new NullPointerException("ERROR"));
    // emitter.onError(new NullPointerException("ERROR"));
    }).subscribe(object -> Log.d(TAG, String.valueOf(object)),
    throwable -> Log.e(TAG, throwable.getMessage()),
    () -> Log.d(TAG, "onComplete"));

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
    }).subscribe(integer -> Log.e(TAG, String.valueOf(integer)),
    throwable -> Log.e(TAG, throwable.getMessage()),
    () -> Log.d(TAG, "onComplete"));

    上述两种打印结果相同
    2682-2682 D/MainActivity: 1
    2682-2682 D/MainActivity: 2
    2682-2682 D/MainActivity: 3
    2584-2584 E/MainActivity: onComplete
  • dispose()调用处理完事件,处理完即可丢弃,但是上游可以继续发送事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public interface Disposable {
    /**
    * Dispose the resource, the operation should be idempotent.
    */
    void dispose();

    /**
    * Returns true if this resource has been disposed.
    * @return true if this resource has been disposed
    */
    boolean isDisposed();
    }

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
    emitter.onNext(3);
    }).subscribe(new Observer<Integer>() {
    Disposable mD;

    @Override
    public void onSubscribe(Disposable d) {
    mD = d;
    Log.e(TAG, "onSubscribe " + d);
    }

    @Override
    public void onNext(Integer integer) {
    Log.e(TAG, "onNext=" + integer);
    if (integer == 2) {
    mD.dispose();
    Log.e(TAG, "mD=" + mD.isDisposed());
    }
    }

    @Override
    public void onError(Throwable e) {
    Log.e(TAG, "onError=" + e.getMessage());
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    });

    2800-2800 E/MainActivity: onSubscribe null
    2800-2800 E/MainActivity: onNext=1
    2800-2800 E/MainActivity: onNext=2
    2800-2800 E/MainActivity: mD=true
  • 线程控制,上游在子线程,下游在主线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    Observable.create(emitter -> {
    Log.e(TAG, Thread.currentThread().getName());
    emitter.onNext(1);
    emitter.onNext("2");
    emitter.onNext(3);
    emitter.onComplete();
    }).subscribeOn(Schedulers.io())//指定上游线程,调用多次,只有第一次设置有效
    .observeOn(AndroidSchedulers.mainThread())//多次调用下游线程,每次都会切换
    .subscribe(object -> {
    Log.e(TAG, Thread.currentThread().getName());
    Log.e(TAG, String.valueOf(object));
    }, throwable -> {
    Log.e(TAG, throwable.getMessage());
    }, () -> {
    Log.e(TAG, "onComplete");
    });

    3102-3117 E/MainActivity: RxCachedThreadScheduler-1
    3102-3102 E/MainActivity: main
    3102-3102 E/MainActivity: 1
    3102-3102 E/MainActivity: main
    3102-3102 E/MainActivity: 2
    3102-3102 E/MainActivity: main
    3102-3102 E/MainActivity: 3
    3102-3102 E/MainActivity: onComplete


    相同例子
    Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    Log.e(TAG, "emitter =" + Thread.currentThread().getName());
    }).subscribeOn(Schedulers.io())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(object -> {
    Log.e(TAG, "observeOn mainThread=" + Thread.currentThread().getName());
    })
    .observeOn(Schedulers.io())
    .doOnNext(object -> {
    Log.e(TAG, "observeOn io=" + Thread.currentThread().getName());
    }).subscribe(object -> {
    Log.e(TAG, "subscribe=" + Thread.currentThread().getName());
    Log.e(TAG, "subscribe=" + object);
    });

    3379-3395 E/MainActivity: emitter =RxCachedThreadScheduler-1
    3379-3379 E/MainActivity: observeOn mainThread=main
    3379-3379 E/MainActivity: observeOn mainThread=main
    3379-3398 E/MainActivity: observeOn io=RxCachedThreadScheduler-2
    3379-3398 E/MainActivity: subscribe=RxCachedThreadScheduler-2
    3379-3398 E/MainActivity: subscribe=1
    3379-3398 E/MainActivity: observeOn io=RxCachedThreadScheduler-2
    3379-3398 E/MainActivity: subscribe=RxCachedThreadScheduler-2
    3379-3398 E/MainActivity: subscribe=2
  • 网络请求

    • 定义API

      1
      2
      3
      4
      5
      6
      7
      public interface Api {
      @GET
      Observable<LoginResponse> login(@Body LoginRequest request);

      @GET
      Observable<RegisterResponse> register(@Body RegisterRequest request);
      }
    • OkHttpClient,Retrofit

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      private void initOkHttp() {
      OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
      builder.readTimeout(10, TimeUnit.SECONDS);
      builder.connectTimeout(10, TimeUnit.SECONDS);
      builder.writeTimeout(10, TimeUnit.SECONDS);
      builder.retryOnConnectionFailure(true);
      if (BuildConfig.DEBUG) {
      HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
      interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
      builder.addInterceptor(interceptor);
      }
      okHttpClient = builder.build();
      }

      private <T> T getApiService(String baseUrl, Class<T> clz) {
      Retrofit retrofit = new Retrofit.Builder()
      .baseUrl(baseUrl)
      .client(okHttpClient)
      .addConverterFactory(GsonConverterFactory.create())
      .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
      .build();
      return retrofit.create(clz);
      }
    • 请求网络

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      loginApi.login(loginRequest)
      //在IO线程进行网络请求
      .subscribeOn(Schedulers.io())
      //回到主线程去处理请求结果
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<LoginResponse>() {
      @Override
      public void onSubscribe(Disposable d) {
      //Activity退出时结束事件 CompositeDisposable.clear()
      compositeDisposable.add(d);
      }

      @Override
      public void onNext(LoginResponse value) {
      Log.e(TAG, "value=" + value);
      }

      @Override
      public void onError(Throwable e) {
      Log.e(TAG, "onError" + e.getMessage());
      }

      @Override
      public void onComplete() {
      Log.e(TAG, "onComplete");
      Toast.makeText(context, "onComplete", Toast.LENGTH_LONG).show();
      }
      });
  • 读写数据库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    Observable.create((ObservableOnSubscribe<List<String>>) e -> {
    // Cursor cursor = null;
    // try {
    // cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
    // List<Record> result = new ArrayList<>();
    // while (cursor.moveToNext()) {
    // result.add(Db.Record.read(cursor));
    // }
    // emitter.onNext(result);
    // emitter.onComplete();
    // } finally {
    // if (cursor != null) {
    // cursor.close();
    // }
    // }
    List<String> result = new ArrayList<>();
    result.add("1");
    result.add("2");
    result.add("3");
    e.onNext(result);
    e.onComplete();
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(strings -> Log.e(TAG, strings.toString()));
  • map操作符,可以将上游发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Observable.create((ObservableOnSubscribe<Integer>) e -> {
    e.onNext(1);
    e.onNext(2);
    e.onNext(3);
    e.onComplete();
    }).map(integer -> "result " + integer)
    .subscribe(string -> Log.e(TAG, "map " + string),
    throwable -> Log.e(TAG, throwable.getMessage()),
    () -> Log.e(TAG, "onComplete"));

    E/MainActivity: map result 1
    E/MainActivity: map result 2
    E/MainActivity: map result 3
    E/MainActivity: onComplete
  • FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发送的事件合并成一个单独的Observable.flatMap并不保证事件的顺序,如果需要保证顺序则需要使用concatMap,下面例子flatmap可以改为concatMap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
    }).flatMap(integer -> {
    List<String> list = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
    list.add("I am value " + integer);
    }
    return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
    }).subscribe(string -> Log.e(TAG, string),
    throwable -> Log.e(TAG, throwable.getMessage()),
    () -> Log.e(TAG, "onComplete"));

    E/MainActivity: I am value 1
    E/MainActivity: I am value 1
    E/MainActivity: I am value 1
    E/MainActivity: I am value 2
    E/MainActivity: I am value 3
    E/MainActivity: I am value 2
    E/MainActivity: I am value 2
    E/MainActivity: I am value 3
    E/MainActivity: I am value 3
    E/MainActivity: onComplete
  • 嵌套请求,注册登陆

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    loginApi.register(registerRequest)//发起注册请求
    .subscribeOn(Schedulers.io())//io线程注册
    .observeOn(AndroidSchedulers.mainThread())//注册结果在主线程
    .doOnNext(registerResponse -> Log.e(TAG, "registerResponse"))//注册结果
    .subscribeOn(Schedulers.io())//在io线程登陆
    .flatMap(mapper -> loginApi.login(new LoginRequest()))//登陆
    .observeOn(AndroidSchedulers.mainThread())//登陆后回到主线程
    .subscribe(loginResponse -> Log.e(TAG, "login success"),
    throwable -> Log.e(TAG, "login failure " + throwable.getMessage()),
    () -> Log.e(TAG, "onComplete"));
  • map与flatmap 区别,map在返回对象集合时,subscribe中需要循环迭代取出元素处理,而flatmap将集合通过Observable.fromIterable迭代获取对象变换为多个Observable发送到下游分别处理,效率更高,在多个同步任务中,推荐使用flatmap

    • 当发送非Observable对象时,使用map,返回的是普通对象或集合
    • 当发送的是Observable对象时,使用flatmap,返回的是Observable<T>
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      E/MainActivity: map [I am value 1, I am value 1, I am value 1]
      E/MainActivity: map [I am value 2, I am value 2, I am value 2]
      E/MainActivity: map [I am value 3, I am value 3, I am value 3]
      E/MainActivity: onComplete
      E/MainActivity: I am value 1
      E/MainActivity: I am value 1
      E/MainActivity: I am value 1
      E/MainActivity: I am value 2
      E/MainActivity: I am value 2
      E/MainActivity: I am value 2
      E/MainActivity: I am value 3
      E/MainActivity: I am value 3
      E/MainActivity: I am value 3
      E/MainActivity: onComplete
  • zip操作符,将多个Observable事件按照顺序整合在一起,发送组合在一起的事件,Observable o1 发送4个数据,Observable o2 发送3个数据,最后整合按照个数最少的进行整合

    • 顺序执行,在同一个线程

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      Observable<Integer> o1 = Observable.create(emitter -> {
      emitter.onNext(1);
      Log.e(TAG,"onNext 1");
      emitter.onNext(2);
      Log.e(TAG,"onNext 2");
      emitter.onNext(3);
      Log.e(TAG,"onNext 3");
      emitter.onNext(4);
      Log.e(TAG,"onNext 4");
      emitter.onComplete();
      });
      Observable<String> o2 = Observable.create(emitter -> {
      emitter.onNext("A");
      Log.e(TAG,"onNext A");
      emitter.onNext("B");
      Log.e(TAG,"onNext B");
      emitter.onNext("C");
      Log.e(TAG,"onNext C");
      emitter.onComplete();
      });
      Observable.zip(o1, o2,
      (integer, string) -> integer + string)
      .subscribe(string -> Log.e(TAG, string),
      throwable -> Log.e(TAG, throwable.getMessage()),
      () -> Log.e(TAG, "onComplete"));

      E/MainActivity: onNext 1
      E/MainActivity: onNext 2
      E/MainActivity: onNext 3
      E/MainActivity: onNext 4
      E/MainActivity: 1A
      E/MainActivity: onNext A
      E/MainActivity: 2B
      E/MainActivity: onNext B
      E/MainActivity: 3C
      E/MainActivity: onNext C
      E/MainActivity: onComplete
    • 不在同一个线程

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
       Observable<Integer> observable1 = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
      Log.d(TAG, "emit 1");
      emitter.onNext(1);
      Thread.sleep(1000);

      Log.d(TAG, "emit 2");
      emitter.onNext(2);
      Thread.sleep(1000);

      Log.d(TAG, "emit 3");
      emitter.onNext(3);

      Log.d(TAG, "emit 4");
      emitter.onNext(4);

      Log.d(TAG, "emit complete1");
      emitter.onComplete();
      }).subscribeOn(Schedulers.io());
      Observable<String> observable2 = Observable.create((ObservableOnSubscribe<String>) emitter -> {
      Log.d(TAG, "emit A");
      emitter.onNext("A");
      Thread.sleep(1000);

      Log.d(TAG, "emit B");
      emitter.onNext("B");
      Thread.sleep(1000);

      Log.d(TAG, "emit C");
      emitter.onNext("C");
      Thread.sleep(1000);

      Log.d(TAG, "emit complete2");
      emitter.onComplete();
      }).subscribeOn(Schedulers.io());

      Observable.zip(observable1, observable2,
      (integer, string) -> integer + string)
      .subscribe(string -> Log.e(TAG, string),
      throwable -> Log.e(TAG, throwable.getMessage()),
      () -> Log.e(TAG, "onComplete"));

      D/MainActivity: emit 1
      D/MainActivity: emit A
      E/MainActivity: 1A
      D/MainActivity: emit 2
      D/MainActivity: emit B
      E/MainActivity: 2B
      D/MainActivity: emit 3
      D/MainActivity: emit 4
      D/MainActivity: emit complete1
      D/MainActivity: emit C
      E/MainActivity: 3C
      D/MainActivity: emit complete2
      E/MainActivity: onComplete
    • 需要在获取到用户基本信息和其他信息之后显示界面,使用zip

      1
      2
      3
      4
      5
      6
      7
      8
      9
      Observable<UserBaseInfoResponse> observable1 = userApi.getUserBaseInfo(baseInfoRequest).subscribeOn(Schedulers.io());
      Observable<UserExtraInfoResponse> observable2 = userApi.getUserExtraInfo(extraInfoRequest).subscribeOn(Schedulers.io());
      Observable.zip(observable1, observable2, UserInfo::new)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(userInfo -> {

      }, throwable -> {

      });
  • Backpressure 控制发送事件的速率,同一线程下,上游发送的事件,需要等待下游处理完再发送下个事件,在不同的线程,就不用等下游是否处理完,会造成上游不断发送事件,造成OOM

    • 过滤

      1
      2
      3
      4
      5
      6
      7
      8
      Observable.create((ObservableOnSubscribe<Integer>) e -> {
      for (int i = 0; ; i++) {
      e.onNext(i);
      }
      }).subscribeOn(Schedulers.io())
      .filter(integer -> integer % 100 == 0)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(integer -> Log.d(TAG, String.valueOf(integer)));
    • 2s采样

      1
      2
      3
      4
      5
      6
      7
      8
      Observable.create((ObservableOnSubscribe<Integer>) e -> {
      for (int i = 0; ; i++) {
      e.onNext(i);
      }
      }).subscribeOn(Schedulers.io())
      .sample(2, TimeUnit.SECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(integer -> Log.d(TAG, "" + integer));
    • 延时发送

      1
      2
      3
      4
      5
      6
      7
      8
      Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
      for (int i = 0; ; i++) {
      emitter.onNext(i);
      Thread.sleep(2000); //每次发送完事件延时2秒
      }
      }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(integer -> Log.d(TAG, "" + integer));
  • Flowable,上游是Flowable,下游是Subscriber,两者之间通过subscribe()连接,当第二个参数为BackpressureStrategy.ERROR,上下游不均衡时抛出MissingBackpressureException

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
    }, BackpressureStrategy.ERROR)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe ");
    s.request(3);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "" + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.d(TAG, t.getMessage());
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    });

    D/MainActivity: onSubscribe
    D/MainActivity: 1
    D/MainActivity: 2
    D/MainActivity: 3
    E/MainActivity: onComplete
  • Subscription 同Disposable类似,可以调用Subscription.cancel()取消事件的执行,增加一个void request(long n), 如果下游没有调用request, 上游就认为下游没有处理事件的能力,而这又是一个同步的订阅,会造成上游等待处理界面卡死,直接抛异常.所以下游调用request(Long.MAX_VALUE)或者根据上游发送事件的数量request(3)

  • 当上下游不在同一个线程,因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时,上游就会先把事件发送到这个水缸中,因此, 下游虽然没有调用request,但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    Flowable.create((FlowableOnSubscribe<Integer>) e -> {
    for (int i = 0; i < 129; i++) {
    Log.d(TAG, "emit " + i);
    e.onNext(i);
    }
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
    Log.e(TAG, "onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
    Log.e(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.e(TAG, "onError: " + t);
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete ");
    }
    });
  • 更改背压策略BackpressureStrategy.BUFFER,事件发送处理类似Observable,Observer,但是当无限发送事件,内存会OOM,需要采用其他策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    Flowable.create((FlowableOnSubscribe<Integer>) e -> {
    for (int i = 0; i < 1000; i++) {
    Log.d(TAG, "emit " + i);
    e.onNext(i);
    }
    }, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
    Log.e(TAG, "onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
    Log.e(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.e(TAG, "onError: " + t);
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete ");
    }
    });
  • 更改为BackpressureStrategy.DROP丢弃存不下的数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    Flowable.create((FlowableOnSubscribe<Integer>) e -> {
    for (int i = 0; ; i++) {
    e.onNext(i);
    }
    }, BackpressureStrategy.DROP)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
    Subscription mSubscription;
    Runnable runnable = new Runnable() {
    @Override
    public void run() {
    mSubscription.request(128);
    text.postDelayed(runnable, 2000);
    }
    };

    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    mSubscription = s;
    text.postDelayed(runnable, 2000);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • 更改为BackpressureStrategy.LATEST保存最新的数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    Flowable.create((FlowableOnSubscribe<Integer>) e -> {
    for (int i = 0; ; i++) {
    e.onNext(i);
    }
    }, BackpressureStrategy.LATEST)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
    Subscription mSubscription;
    Runnable runnable = new Runnable() {
    @Override
    public void run() {
    mSubscription.request(128);
    text.postDelayed(runnable, 2000);
    }
    };

    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    mSubscription = s;
    text.postDelayed(runnable, 2000);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • interval操作符发送Long型的数据,下面的例子,每隔一毫秒发送事件,下游一秒处理一个事件,直接发送会抛异常,解决办法加背压策略

    • onBackpressureBuffer()
    • onBackpressureDrop()
    • onBackpressureLatest()
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
       Flowable.interval(1, TimeUnit.MICROSECONDS)
      // .onBackpressureDrop()
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Subscriber<Long>() {
      @Override
      public void onSubscribe(Subscription s) {
      Log.d(TAG, "onSubscribe");
      s.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(Long aLong) {
      Log.e(TAG, "onNext: " + aLong);
      try {
      Thread.sleep(1000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }

      @Override
      public void onError(Throwable t) {
      Log.e(TAG, "onError: ", t);
      }

      @Override
      public void onComplete() {
      Log.d(TAG, "onComplete");
      }
      });
  • FlowableEmiiter 源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public interface FlowableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable s);
    void setCancellable(@Nullable Cancellable c);

    /**
    * The current outstanding request amount.
    * <p>This method is thread-safe.
    * @return the current outstanding request amount
    */
    long requested();

    boolean isCancelled();

    @NonNull
    FlowableEmitter<T> serialize();

    @Experimental
    boolean tryOnError(@NonNull Throwable t);
    }
  • requested为当前下游处理能力

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    Flowable.create((FlowableOnSubscribe<Integer>) e ->
    Log.e(TAG, "current request=" + e.requested()),
    BackpressureStrategy.ERROR)
    .subscribe(new Subscriber<Integer>() {

    @Override
    public void onSubscribe(Subscription s) {
    Log.e(TAG, "onSubscribe");
    s.request(10);
    s.request(100);// current request 110;
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.e(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    });
  • 下游处理完一个事件,requested-1,complete和error事件不会消耗requested,当requested=0,上游继续发送事件,会抛异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    Flowable.create((FlowableOnSubscribe<Integer>) e -> {
    Log.e(TAG, "current request=" + e.requested());
    Log.e(TAG, "emitter 1");
    e.onNext(1);
    Log.e(TAG, "current request=" + e.requested());

    Log.e(TAG, "emitter 2");
    e.onNext(2);
    Log.e(TAG, "current request=" + e.requested());

    Log.e(TAG, "emitter 3");
    e.onNext(3);
    Log.e(TAG, "current request=" + e.requested());

    e.onComplete();
    }, BackpressureStrategy.ERROR)
    .subscribe(new Subscriber<Integer>() {

    @Override
    public void onSubscribe(Subscription s) {
    Log.e(TAG, "onSubscribe");
    s.request(10);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.e(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    });
  • 当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    Flowable
    .create((FlowableOnSubscribe<Integer>) emitter -> {
    Log.d(TAG, "current requested: " + emitter.requested());//还是128
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {

    @Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    s.request(1000);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • 上游发送完128个事件,下游处理完96个事件后,上游继续发送128个事件,下游继续处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {
    Log.d(TAG, "First requested = " + emitter.requested());
    boolean flag;
    for (int i = 0; ; i++) {
    flag = false;
    while (emitter.requested() == 0) {
    if (!flag) {
    Log.d(TAG, "Oh no! I can't emit value!");
    flag = true;
    }
    }
    emitter.onNext(i);
    Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
    }
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {

    @Override
    public void onSubscribe(final Subscription s) {
    Log.d(TAG, "onSubscribe");
    text.postDelayed(() -> {
    //在处理完96个事件后,上游可以继续发送emit 128-223 共96个
    //95s时,上游不发送事件
    s.request(95);
    }, 1000);
    }

    @Override
    public void onNext(Integer integer) {
    Log.d(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    Log.w(TAG, "onError: ", t);
    }

    @Override
    public void onComplete() {
    Log.d(TAG, "onComplete");
    }
    });
  • 一行一行读取文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    Flowable.create((FlowableOnSubscribe<String>) e -> {
    try {
    // URL url=getClass().getResource("test.txt");
    // Log.e(TAG,"url="+url);
    // FileReader reader =new FileReader(url.getFile());
    // FileReader reader =new FileReader("file:///android_asset/test.txt");
    InputStream inputStream = getResources().getAssets().open("test.txt");
    InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
    String str;
    while ((str = bufferedReader.readLine()) != null && !e.isCancelled()) {
    while (e.requested() == 0) {
    if (e.isCancelled()) break;
    }
    e.onNext(str);
    }
    bufferedReader.close();
    inputStreamReader.close();
    inputStream.close();
    e.onComplete();
    } catch (IOException e1) {
    e1.printStackTrace();
    }
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .subscribe(new Subscriber<String>() {
    Subscription mSubscription;

    @Override
    public void onSubscribe(Subscription s) {
    mSubscription = s;
    s.request(1);
    }

    @Override
    public void onNext(String s) {
    Log.e(TAG, "onNext=" + s);
    try {
    Thread.sleep(2000);
    mSubscription.request(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    @Override
    public void onError(Throwable t) {
    System.out.println(t);
    }

    @Override
    public void onComplete() {

    }
    });

    E/MainActivity: onNext=AAAAAAAAAAAA
    E/MainActivity: onNext=BBBBBBBB
    E/MainActivity: onNext=CCCCCCC
    E/MainActivity: onNext=DDDDD
    E/MainActivity: onNext=EE
    E/MainActivity: onNext=F
RxJava操作符(待更…)
willkernel wechat
关注微信公众号
帅哥美女们,请赐予我力量吧!