十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶(hù) + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂(yōu)售后,網(wǎng)站問(wèn)題一站解決
深入淺析RxJava和多線程并發(fā)的原理?很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
我們擁有10余年網(wǎng)頁(yè)設(shè)計(jì)和網(wǎng)站建設(shè)經(jīng)驗(yàn),從網(wǎng)站策劃到網(wǎng)站制作,我們的網(wǎng)頁(yè)設(shè)計(jì)師為您提供的解決方案。為企業(yè)提供成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、重慶小程序開(kāi)發(fā)、成都手機(jī)網(wǎng)站制作、H5開(kāi)發(fā)、等業(yè)務(wù)。無(wú)論您有什么樣的網(wǎng)站設(shè)計(jì)或者設(shè)計(jì)方案要求,我們都將富于創(chuàng)造性的提供專(zhuān)業(yè)設(shè)計(jì)服務(wù)并滿(mǎn)足您的需求。
RxJava與并發(fā)
首先讓我們來(lái)看一段RxJava協(xié)議的原文:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
如上所述,RxJava對(duì)多線程并發(fā)其實(shí)并沒(méi)有做非常的多保護(hù),這段話中說(shuō),如果多個(gè)Observables從多個(gè)線程中發(fā)射數(shù)據(jù),必須要滿(mǎn)足happens-before原則。
下面來(lái)看一個(gè)簡(jiǎn)單的例子:
final PublishSubjectsubject = PublishSubject.create(); subject.subscribe(new Subscriber () { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { unSafeCount = unSafeCount + integer; Log.d("TAG", "onNext: " + unSafeCount); } }); findViewById(R.id.send).setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { final int unit = 1; for(int i = 0;i < 10;i++) { new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 1000; j++) { subject.onNext(unit); } } }).start(); } } });
這是一個(gè)最典型的多線程問(wèn)題,從10個(gè)線程中發(fā)射數(shù)據(jù)并相加,這樣最終得到的答案是小于10000的。雖然使用了RxJava,但是這樣的使用對(duì)于并發(fā)是沒(méi)有意義的,因?yàn)镽xJava并沒(méi)有去處理并發(fā)帶來(lái)的問(wèn)題。我們可以看下subject的onNext方法的源碼,里面很簡(jiǎn)單,就是調(diào)用了對(duì)應(yīng)observer的onNext方法而已。不止是這樣,絕大多數(shù)的Subject都是線程不安全的,所以當(dāng)你在使用這樣的類(lèi)的時(shí)候(典型場(chǎng)景就是自制的RxBus),如果從多個(gè)線程中發(fā)射數(shù)據(jù),那你就要小心了。
對(duì)于這樣的問(wèn)題,有兩種解決方案:
第一種就是簡(jiǎn)單的使用傳統(tǒng)的解決方法,比如用AtomicInteger代替int。
第二種則是使用RxJava的解決方案,在這里就是用SerializedSubject去代替Subject:
final PublishSubjectsubject = PublishSubject.create(); subject.subscribe(new Subscriber () { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { unSafeCount = unSafeCount + integer; count.addAndGet(integer); Log.d("TAG", "onNext: " + count); } }); final SerializedSubject ser = new SerializedSubject (subject); findViewById(R.id.send).setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { final int unit = 1; for(int i = 0;i < 10;i++){ new Thread(new Runnable() { @Override public void run() { for(int j = 0;j < 1000;j++){ ser.onNext(unit); } } }).start(); } } });
可以看一下SerializedSubject的onNext方法做了什么:
@Override public void onNext(T t) { if (terminated) { return; } synchronized (this) { if (terminated) { return; } if (emitting) { FastList list = queue; if (list == null) { list = new FastList(); queue = list; } list.add(nl.next(t)); return; } emitting = true; } try { actual.onNext(t); } catch (Throwable e) { terminated = true; Exceptions.throwOrReport(e, actual, t); return; } for (;;) { for (int i = 0; i < MAX_DRAIN_ITERATION; i++) { FastList list; synchronized (this) { list = queue; if (list == null) { emitting = false; return; } queue = null; } for (Object o : list.array) { if (o == null) { break; } try { if (nl.accept(actual, o)) { terminated = true; return; } } catch (Throwable e) { terminated = true; Exceptions.throwIfFatal(e); actual.onError(OnErrorThrowable.addValueAsLastCause(e, t)); return; } } } } }
處理方式很簡(jiǎn)單,如果有其他線程在發(fā)射數(shù)據(jù),那就將數(shù)據(jù)放置到隊(duì)列中,等待下次發(fā)射。這保證了同一時(shí)間只會(huì)有一個(gè)線程調(diào)用onNext,onComplete和onError這些方法。
但是這樣操作顯然是會(huì)造成性能的影響的,所以RxJava并不會(huì)把所有的操作都打上線程安全的標(biāo)簽。
在這里就要引申出一個(gè)問(wèn)題,那就是使用者對(duì)create方法的濫用,其實(shí)這個(gè)方法不應(yīng)該被使用者頻繁的調(diào)用的,因?yàn)槟惚仨氁⌒牡奶幚硭械臄?shù)據(jù)發(fā)射,接收的邏輯。相反的,使用已有的操作符能很好的解決這個(gè)問(wèn)題,所以下次大家在遇到問(wèn)題的時(shí)候不要簡(jiǎn)單的使用create去自己寫(xiě),而是應(yīng)該想想有沒(méi)有現(xiàn)成的操作符可以完成相應(yīng)的需求。
RxJava中的一些操作符
RxJava中有一些操作符也和多線程并發(fā)有關(guān),下面讓我來(lái)講一講merge和concat,以及他們的一些變種操作符。
對(duì)于多線程發(fā)射數(shù)據(jù),有時(shí)候我們需要得到的結(jié)果也保持和發(fā)射時(shí)候一樣的順序,這個(gè)時(shí)候如果我們使用merge這個(gè)操作符去結(jié)合多個(gè)發(fā)射源,那么就會(huì)產(chǎn)生一定的問(wèn)題了(例子中做了非常不好的示范——使用了create操作符,請(qǐng)大家不要學(xué)習(xí)這樣的寫(xiě)法,這里單純是為了求證結(jié)果)。
Observable o1 = Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber<? super Integer> subscriber) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); subscriber.onNext(1); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }); Observable o2 = Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(2); subscriber.onCompleted(); } }); Observable.merge(o1,o2) .subscribe(new Subscriber () { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer i) { Log.d("TAG", "onNext: " + i); } });
對(duì)于這樣的場(chǎng)景,我們得到的答案將是2,1而不是先得到o1發(fā)射的數(shù)據(jù),再獲取o2的數(shù)據(jù)。
究其原因,就是因?yàn)閙erge其實(shí)就是給什么傳什么,也不會(huì)去管數(shù)據(jù)發(fā)射的順序:
@Override public void onNext(Observable<? extends T> t) { if (t == null) { return; } if (t == Observable.empty()) { emitEmpty(); } else if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable<? extends T>)t).get()); } else { InnerSubscriberinner = new InnerSubscriber (this, uniqueId++); addInner(inner); t.unsafeSubscribe(inner); emit(); } }
可以看到在經(jīng)過(guò)lift操作之后,對(duì)應(yīng)的中間人MergeSubscriber的onNext,沒(méi)有什么多余的代碼,所以在多個(gè)Observable從多線程中發(fā)射數(shù)據(jù)的時(shí)候,順序當(dāng)然不能得到保證。
一個(gè)單詞說(shuō)明這個(gè)問(wèn)題:interleaving——交錯(cuò)。merge后的數(shù)據(jù)源可能是交錯(cuò)的。由于merge有這樣數(shù)據(jù)交錯(cuò)的問(wèn)題,所以它的變種—flatMap也會(huì)有同樣的問(wèn)題。
對(duì)于這樣的場(chǎng)景,我們可以使用concat操作符來(lái)完成:
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.
根據(jù)文檔,我們知道concat操作符是一個(gè)接一個(gè)的處理數(shù)據(jù)源的數(shù)據(jù)的。
if (wip.getAndIncrement() != 0) { return; } final int delayErrorMode = this.delayErrorMode; for (;;) { if (actual.isUnsubscribed()) { return; } if (!active) { if (delayErrorMode == BOUNDARY) { if (error.get() != null) { Throwable ex = ExceptionsUtils.terminate(error); if (!ExceptionsUtils.isTerminated(ex)) { actual.onError(ex); } return; } } boolean mainDone = done; Object v = queue.poll(); boolean empty = v == null; if (mainDone && empty) { Throwable ex = ExceptionsUtils.terminate(error); if (ex == null) { actual.onCompleted(); } else if (!ExceptionsUtils.isTerminated(ex)) { actual.onError(ex); } return; } if (!empty) { Observable<? extends R> source; try { source = mapper.call(NotificationLite.instance().getValue(v)); } catch (Throwable mapperError) { Exceptions.throwIfFatal(mapperError); drainError(mapperError); return; } if (source == null) { drainError(new NullPointerException("The source returned by the mapper was null")); return; } if (source != Observable.empty()) { if (source instanceof ScalarSynchronousObservable) { ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source; active = true; arbiter.setProducer(new ConcatMapInnerScalarProducer (scalarSource.get(), this)); } else { ConcatMapInnerSubscriber innerSubscriber = new ConcatMapInnerSubscriber (this); inner.set(innerSubscriber); if (!innerSubscriber.isUnsubscribed()) { active = true; source.unsafeSubscribe(innerSubscriber); } else { return; } } request(1); } else { request(1); continue; } } } if (wip.decrementAndGet() == 0) { break; } }
通過(guò)源碼我們可以知道,active字段就保證了如果上一個(gè)數(shù)據(jù)源還沒(méi)有發(fā)射完數(shù)據(jù),就會(huì)一直在for循環(huán)中等待,直到上一個(gè)數(shù)據(jù)源發(fā)射完了數(shù)據(jù)重置了active字段。
對(duì)于concat,其實(shí)還存在一個(gè)問(wèn)題,那就是多個(gè)Observable變成了串行,會(huì)大大的增加整個(gè)RxJava事件流的處理時(shí)間,對(duì)于這個(gè)場(chǎng)景,我們可以使用concatEager來(lái)解決。concatEager的源碼就不帶大家分析了,有興趣的同學(xué)可以自行查看。
總結(jié)
這篇文章比較短,講的東西也比較淺顯,其實(shí)就是討論了一下RxJava中多線程并發(fā)的幾個(gè)問(wèn)題。最后我想說(shuō),RxJava并不是什么高大上的東西,在你的項(xiàng)目引入之前,要考慮一下是否真的有必要這么做。就算真的有場(chǎng)景需要RxJava,也請(qǐng)不要一口氣把項(xiàng)目中所有的操作都換成RxJava,一些簡(jiǎn)單的操作不一定需要使用RxJava的操作符的實(shí)現(xiàn),用了反而降低了代碼的可讀性,切勿為了使用Rx而使用Rx。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。