Rxjava和Retrofit结合使⽤⼤量请求时候出现OOM的问题
在使⽤RxJava+Retrofit的过程中,出现了OOM的问题,报错⽇志如下:
java.lang.OutOfMemoryError: pthread_create (1040KB stack) failed: Try
again
at java.lang.Thread.nativeCreate(Native Method)
at java.lang.Thread.start(Thread.java:733)
at urrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:970)
at surePrestart(ThreadPoolExecutor.java:1611)
at urrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:342) at urrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:579)
at urrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:680)
activex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:145)
activex.internal.schedulers.IoScheduler$EventLoopWorker.schedule(IoScheduler.java:239)
activex.Scheduler.scheduleDirect(Scheduler.java:203)
activex.Scheduler.scheduleDirect(Scheduler.java:179)
activex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36) activex.Observable.subscribe(Observable.java:12267)
activex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
activex.Observable.subscribe(Observable.java:12267)
activex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45) activex.Observable.subscribe(Observable.java:12267)
原因是在创建⼤量请求时候会创建⼤量的线程, 在⼿机上由于⼿机对进程有线程数量的限制导致了闪退.
第⼀种解决⽅案
最佳答案是:
JakeWharton commented on 9 Aug 2017
The problem is that Schedulers.io() uses a cached thread pool without
a limit and thus is trying to create 1500 threads. You should consider
using a Scheduler that has a fixed limit of threads, or using RxJava
2.x’s parallel() operator to parallelize the operation to a fixed number of workers.
If you’re using raw Retrofit by default it uses OkHttp’s dispatcher
which limits the threads to something like 64 (with a max of 5 per
host). That’s why you aren’t seeing it fail.
If you use createAsync() when creating the RxJava2CallAdapterFactory
it will create fully-async Observable instances that don’t require a
subscribeOn and which use OkHttp’s Dispatcher just like Retrofit would
otherwise. Then you only need observeOn to move back to the main
thread, and you avoid all additional thread creation.
主要意思就是: 如果只使⽤retrofit是不会出现这个问题的, 由于rxjava⾃⾝没有对线程数量做限制所以可能会出现这个问题.
这个解决⽅法主要有两点:
⾸先是在创建retrofit的时候使⽤ createAsync()
if(retrofit == null){
retrofit =new Retrofit.Builder()
.baseUrl(baseurl)//设置远程地址
.ate())
.ateAsync())
.Instance().getOkHttpClient())
.build();
}
此时将创建完全异步的可见实例,这些实例不需要订阅,并且使⽤OkHttp的Dispatcher,就像Retrofit⼀样。
不需要订阅就表⽰不需要调⽤ .subscribeOn(getScheduler())
Observable<T> result = responseBody
//      .subscribeOn(Schedulers.io())    //如果使⽤ateAsync(),则不需要这⼀句
.map(new Function<ResponseBody, T>(){
@Override
public T apply(ResponseBody responseBody)throws Exception {
String string = responseBody.string();
return new Gson().fromJson(string, clazz);
}
})
.observeOn(AndroidSchedulers.mainThread());
最后的结果:
anilmaddala commented on 9 Aug 2017
Thanks @JakeWharton tried the createAsync() and removed subscribeOn to
Scheduler.io and I am not seeing the crash. Thanks.
I also posted the question in StackOverflow. If you can, please post
your answer there and I will mark it as a chosen answer.
这种解决⽅法有两点, ⾸先使⽤ ateAsync() 来⾃动创建异步, 然后不使⽤
.subscribeOn(Schedulers.io()) ,这样rxjava的线程数量就会做出和retrofit⼀样的限制, 因为retrofit不出问题, 则rxjava不出问题.
⼀个重要的后续
by 2020.8
最近,在升级项⽬架构的时候,也升级了rxjava3, 于是将retrofit的adapter也进⾏了升级,进⾏了如下引⽤
api "fit2:adapter-rxjava3:2.9.0"
同时,在创建retrofit的时候
//使⽤RxJava2版本
if(retrofit == null){
retrofit =new Retrofit.Builder()
.baseUrl(baseurl)
.ate())
.ateAsync())
.Instance().getOkHttpClient())
.build();
}
升级到
//使⽤RxJava3版本
if(retrofit == null){
retrofit =new Retrofit.Builder()
.baseUrl(baseurl)
.ate())
.ateSynchronous())
.Instance().getOkHttpClient())
.build();
}
发现⽹络请求不到数据了, 查看⽇志发现原因是: 在主线程请求了⽹络数据
⽐较蒙,经过排查, 原因是 RxJava3CallAdapterFactory 和 RxJava2CallAdapterFactory的调⽤⽅法是相反的,通过查看源码可以知道:
//RxJava2CallAdapterFactory
/**
* 返回⼀个创建不操作任何调度器的同步可观察对象的实例
* Returns an instance which creates synchronous observables that do not operate on any scheduler
* by default.
*/
public static RxJava2CallAdapterFactory create(){
return new RxJava2CallAdapterFactory(null,false);
}
//返回⼀个创建异步可观察对象的实例
/** Returns an instance which creates asynchronous observables. */
public static RxJava2CallAdapterFactory createAsync(){
return new RxJava2CallAdapterFactory(null,true);
}
⽽在RxJava3CallAdapterFactory 中
//RxJava3CallAdapterFactory
/**
* 返回⼀个运⾏在后台线程上的异步可观察对象的实例
* Returns an instance which creates asynchronous observables that run on a background thread by
* default. Applying {@code subscribeOn(..)} has no effect on instances created by the returned
* factory.
*/
public static RxJava3CallAdapterFactory create(){
return new RxJava3CallAdapterFactory(null,true);
}
/**
* 返回⼀个创建不操作任何调度器的同步可观察对象的实例
* Returns an instance which creates synchronous observables that do not operate on any scheduler
* by default. Applying {@code subscribeOn(..)} will change the scheduler on which the HTTP calls
* are made.
*/
public static RxJava3CallAdapterFactory createSynchronous(){
return new RxJava3CallAdapterFactory(null,false);
}
也就是说, 可能是因为RxJava2CallAdapterFactory 中默认没有对rxjava在使⽤
.subscribeOn(Schedulers.io())
⽅法的时候做出限制,导致很多⼈提交了 issues, 所以在RxJava3CallAdapterFactory 中默认对此作了限制,当然你⾃⼰也可以扩展,使⽤.ateSynchronous())
来⾃定义, 就像最初使⽤RxJava2 ⼀样.
总之,RxJava3CallAdapterFactory对此默认作了限制,现在,可以放⼼的使⽤ create() ⽅法即可, 同时在RxJava中⽆需再使⽤
.subscribeOn(Schedulers.io())
end.
第⼆种解决⽅案
对rxjava的 Scheduler 进⾏⾃定义限制线程数量
Observable<T> result = responseBody
.subscribeOn(getScheduler())
android retrofit
.map(new Function<ResponseBody, T>(){
@Override
public T apply(ResponseBody responseBody)throws Exception {
String string = responseBody.string();
return new Gson().fromJson(string, clazz);
}
})
.
observeOn(AndroidSchedulers.mainThread());
private static Scheduler mScheduler;
private static Scheduler getScheduler(){
if(mScheduler == null){
synchronized(HttpLoaderUtils.class){
if(mScheduler == null){
mScheduler = Schedulers.wFixedThreadPool(15));
}
}
}
return mScheduler;
}
⾃⾏对Scheduler的数量进⾏限制.
可以查看:
总结
两种⽅法的解决核⼼都是对rxjava⾃⾝的线程数量的限制.
第⼀种⽅案通过Retrofit配置adapter对Rxjava产⽣影响,使其线程的调度⽤Retrofit本⾝提供的线程,其中有⼀句:
/**
* 返回⼀个运⾏在后台线程上的异步可观察对象的实例
* Returns an instance which creates asynchronous observables that run on a background thread by
* default. Applying {@code subscribeOn(..)} has no effect on instances created by the returned
* factory.
*/
public static RxJava3CallAdapterFactory create(){
return new RxJava3CallAdapterFactory(null,true);
}
也就是说⽤了这个以后即使在RxJava中调⽤ subscribeOn(…) ⽅法也不会影响返回的实例.
第⼆种⽅案是通过RxJava⾃⾝调度来限制其所使⽤的线程数量,因为 .subscribeOn(Schedulers.io())⽅法不对线程数量做任何限制, 所以当并发请求的接⼝数量过多就可能会出现OOM的情况,通过线程池来限制也是⼀种解决⽅案.

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。