全网整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:400-708-3566

简单谈谈RxJava和多线程并发

前言

相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的。

RxJava与并发

首先让我们来看一段RxJava协议的原文:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

如上所述,RxJava对多线程并发其实并没有做非常的多保护,这段话中说,如果多个Observables从多个线程中发射数据,必须要满足happens-before原则。

下面来看一个简单的例子:

final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  Log.d("TAG", "onNext: " + unSafeCount);
 }
});

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;
  for(int i = 0;i < 10;i++) {
   new Thread(new Runnable() {
    @Override
    public void run() {
     for (int j = 0; j < 1000; j++) {
      subject.onNext(unit);
     }
    }
   }).start();
  }
 }
});

这是一个最典型的多线程问题,从10个线程中发射数据并相加,这样最终得到的答案是小于10000的。虽然使用了RxJava,但是这样的使用对于并发是没有意义的,因为RxJava并没有去处理并发带来的问题。我们可以看下subject的onNext方法的源码,里面很简单,就是调用了对应observer的onNext方法而已。不止是这样,绝大多数的Subject都是线程不安全的,所以当你在使用这样的类的时候(典型场景就是自制的RxBus),如果从多个线程中发射数据,那你就要小心了。

对于这样的问题,有两种解决方案:

第一种就是简单的使用传统的解决方法,比如用AtomicInteger代替int。

第二种则是使用RxJava的解决方案,在这里就是用SerializedSubject去代替Subject:

final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  count.addAndGet(integer);

  Log.d("TAG", "onNext: " + count);
 }
});

final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;

  for(int i = 0;i < 10;i++){
   new Thread(new Runnable() {
    @Override
    public void run() {
     for(int j = 0;j < 1000;j++){
      ser.onNext(unit);
     }
    }
   }).start();
  }
 }
});

可以看一下SerializedSubject的onNext方法做了什么:

@Override
public void onNext(T t) {
 if (terminated) {
  return;
 }
 synchronized (this) {
  if (terminated) {
   return;
  }
  if (emitting) {
   FastList list = queue;
   if (list == null) {
    list = new FastList();
    queue = list;
   }
   list.add(nl.next(t));
   return;
  }
  emitting = true;
 }
 try {
  actual.onNext(t);
 } catch (Throwable e) {
  terminated = true;
  Exceptions.throwOrReport(e, actual, t);
  return;
 }
 for (;;) {
  for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
   FastList list;
   synchronized (this) {
    list = queue;
    if (list == null) {
     emitting = false;
     return;
    }
    queue = null;
   }
   for (Object o : list.array) {
    if (o == null) {
     break;
    }
    try {
     if (nl.accept(actual, o)) {
      terminated = true;
      return;
     }
    } catch (Throwable e) {
     terminated = true;
     Exceptions.throwIfFatal(e);
     actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
     return;
    }
   }
  }
 }
}

处理方式很简单,如果有其他线程在发射数据,那就将数据放置到队列中,等待下次发射。这保证了同一时间只会有一个线程调用onNext,onComplete和onError这些方法。

但是这样操作显然是会造成性能的影响的,所以RxJava并不会把所有的操作都打上线程安全的标签。

在这里就要引申出一个问题,那就是使用者对create方法的滥用,其实这个方法不应该被使用者频繁的调用的,因为你必须要小心的处理所有的数据发射,接收的逻辑。相反的,使用已有的操作符能很好的解决这个问题,所以下次大家在遇到问题的时候不要简单的使用create去自己写,而是应该想想有没有现成的操作符可以完成相应的需求。

RxJava中的一些操作符

RxJava中有一些操作符也和多线程并发有关,下面让我来讲一讲merge和concat,以及他们的一些变种操作符。

对于多线程发射数据,有时候我们需要得到的结果也保持和发射时候一样的顺序,这个时候如果我们使用merge这个操作符去结合多个发射源,那么就会产生一定的问题了(例子中做了非常不好的示范——使用了create操作符,请大家不要学习这样的写法,这里单纯是为了求证结果)。

Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(final Subscriber<? super Integer> subscriber) {
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     Thread.sleep(1000);
     subscriber.onNext(1);
     subscriber.onCompleted();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(Subscriber<? super Integer> subscriber) {
  subscriber.onNext(2);
  subscriber.onCompleted();
 }
});

Observable.merge(o1,o2)
  .subscribe(new Subscriber<Integer>() {
   @Override
   public void onCompleted() {

   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(Integer i) {
    Log.d("TAG", "onNext: " + i);
   }
  });

对于这样的场景,我们得到的答案将是2,1而不是先得到o1发射的数据,再获取o2的数据。

究其原因,就是因为merge其实就是给什么传什么,也不会去管数据发射的顺序:

@Override
public void onNext(Observable<? extends T> t) {
  if (t == null) {
    return;
  }
  if (t == Observable.empty()) {
    emitEmpty();
  } else
  if (t instanceof ScalarSynchronousObservable) {
    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
  } else {
    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
    addInner(inner);
    t.unsafeSubscribe(inner);
    emit();
  }
}

可以看到在经过lift操作之后,对应的中间人MergeSubscriber的onNext,没有什么多余的代码,所以在多个Observable从多线程中发射数据的时候,顺序当然不能得到保证。

一个单词说明这个问题:interleaving——交错。merge后的数据源可能是交错的。由于merge有这样数据交错的问题,所以它的变种—flatMap也会有同样的问题。

对于这样的场景,我们可以使用concat操作符来完成:

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

根据文档,我们知道concat操作符是一个接一个的处理数据源的数据的。

if (wip.getAndIncrement() != 0) {
  return;
}

final int delayErrorMode = this.delayErrorMode;

for (;;) {
  if (actual.isUnsubscribed()) {
    return;
  }

  if (!active) {
    if (delayErrorMode == BOUNDARY) {
      if (error.get() != null) {
        Throwable ex = ExceptionsUtils.terminate(error);
        if (!ExceptionsUtils.isTerminated(ex)) {
          actual.onError(ex);
        }
        return;
      }
    }

    boolean mainDone = done;
    Object v = queue.poll();
    boolean empty = v == null;

    if (mainDone && empty) {
      Throwable ex = ExceptionsUtils.terminate(error);
      if (ex == null) {
        actual.onCompleted();
      } else
      if (!ExceptionsUtils.isTerminated(ex)) {
        actual.onError(ex);
      }
      return;
    }

    if (!empty) {

      Observable<? extends R> source;

      try {
        source = mapper.call(NotificationLite.<T>instance().getValue(v));
      } catch (Throwable mapperError) {
        Exceptions.throwIfFatal(mapperError);
        drainError(mapperError);
        return;
      }

      if (source == null) {
        drainError(new NullPointerException("The source returned by the mapper was null"));
        return;
      }

      if (source != Observable.empty()) {

        if (source instanceof ScalarSynchronousObservable) {
          ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

          active = true;

          arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
        } else {
          ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
          inner.set(innerSubscriber);

          if (!innerSubscriber.isUnsubscribed()) {
            active = true;

            source.unsafeSubscribe(innerSubscriber);
          } else {
            return;
          }
        }
        request(1);
      } else {
        request(1);
        continue;
      }
    }
  }
  if (wip.decrementAndGet() == 0) {
    break;
  }
}

通过源码我们可以知道,active字段就保证了如果上一个数据源还没有发射完数据,就会一直在for循环中等待,直到上一个数据源发射完了数据重置了active字段。

对于concat,其实还存在一个问题,那就是多个Observable变成了串行,会大大的增加整个RxJava事件流的处理时间,对于这个场景,我们可以使用concatEager来解决。concatEager的源码就不带大家分析了,有兴趣的同学可以自行查看。

总结

这篇文章比较短,讲的东西也比较浅显,其实就是讨论了一下RxJava中多线程并发的几个问题。最后我想说,RxJava并不是什么高大上的东西,在你的项目引入之前,要考虑一下是否真的有必要这么做。就算真的有场景需要RxJava,也请不要一口气把项目中所有的操作都换成RxJava,一些简单的操作不一定需要使用RxJava的操作符的实现,用了反而降低了代码的可读性,切勿为了使用Rx而使用Rx。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。


# rxjava  # 多线程并发  # 多线程  # 并发  # Java多线程并发执行demo代码实例  # Java常见面试题之多线程和高并发详解  # Java多线程并发编程 并发三大要素  # JAVA多线程并发下的单例模式应用  # 使用java的HttpClient实现多线程并发  # Java多线程和并发基础面试题(问答形式)  # 深入探究Java多线程并发编程的要点  # java多线程并发executorservice(任务调度)类  # java多线程和并发包入门示例  # 详解Java多线程与并发  # 多个  # 就会  # 在这里  # 我们可以  # 上一  # 很简单  # 可以使用  # 一个问题  # 这篇文章  # 要小心  # 都是  # 是一个  # 那就是  # 下次  # 他们的  # 使用了  # 也不  # 让我  # 还没有 


相关文章: 桂林网站制作公司有哪些,桂林马拉松怎么报名?  营销式网站制作方案,销售哪个网站招聘效果最好?  建站之星2.7模板:企业网站建设与h5定制设计专题  如何通过老薛主机一键快速建站?  C++ static_cast和dynamic_cast区别_C++静态转换与动态类型安全转换  济南企业网站制作公司,济南社保单位网上缴费步骤?  css网站制作参考文献有哪些,易聊怎么注册?  南平网站制作公司,2025年南平市事业单位报名时间?  微信推文制作网站有哪些,怎么做微信推文,急?  英语简历制作免费网站推荐,如何将简历翻译成英文?  建站之星如何快速更换网站模板?  高性价比服务器租赁——企业级配置与24小时运维服务  网站建设制作、微信公众号,公明人民医院怎么在网上预约?  郑州企业网站制作公司,郑州招聘网站有哪些?  C++时间戳转换成日期时间的步骤和示例代码  教学网站制作软件,学习*后期制作的网站有哪些?  怎么将XML数据可视化 D3.js加载XML  php能控制zigbee模块吗_php通过串口与cc2530 zigbee通信【介绍】  制作网站哪家好,cc、.co、.cm哪个域名更适合做网站?  成都网站制作价格表,现在成都广电的单独网络宽带有多少的,资费是什么情况呢?  网站制作说明怎么写,简述网页设计的流程并说明原因?  如何通过.red域名打造高辨识度品牌网站?  制作宣传网站的软件,小红书可以宣传网站吗?  如何实现建站之星域名转发设置?  建站中国官网:模板定制+SEO优化+建站流程一站式指南  零服务器AI建站解决方案:快速部署与云端平台低成本实践  如何制作公司的网站链接,公司想做一个网站,一般需要花多少钱?  c# Task.Yield 的作用是什么 它和Task.Delay(1)有区别吗  高防服务器租用如何选择配置与防御等级?  如何在Windows环境下新建FTP站点并设置权限?  Avalonia如何实现跨窗口通信 Avalonia窗口间数据传递  c# 在ASP.NET Core中管理和取消后台任务  建站之星展会模版如何一键下载生成?  在线ppt制作网站有哪些,请推荐几个好的课件下载的网站?  如何通过多用户协作模板快速搭建高效企业网站?  如何选择网络建站服务器?高效建站必看指南  网站制作与设计教程,如何制作一个企业网站,建设网站的基本步骤有哪些?  深圳网站制作案例,网页的相关名词有哪些?  非常酷的网站设计制作软件,酷培ai教育官方网站?  制作网站公司那家好,网络公司是做什么的?  公司网站制作价格怎么算,公司办个官网需要多少钱?  广德云建站网站建设方案与建站流程优化指南  建站之星后台管理系统如何操作?  天河区网站制作公司,广州天河区如何办理身份证?需要什么资料有预约的网站吗?  网站企业制作流程,用什么语言做企业网站比较好?  建站之星IIS配置教程:代码生成技巧与站点搭建指南  如何设计高效校园网站?  广州商城建站系统开发成本与周期如何控制?  广东专业制作网站有哪些,广东省能源集团有限公司官网?  电商网站制作公司有哪些,1688网是什么意思? 

您的项目需求

*请认真填写需求信息,我们会在24小时内与您取得联系。