/RetrofitRxJavaDemo

Learn Retrofit Rxjava

Primary LanguageJava

#RetrofitRxJavaDemo

OkHttp

Retrofit

Retrofit 是一个 RESTful 的 HTTP 网络请求框架的封装。网络请求的工作本质上是 OkHttp 完成,而 Retrofit 仅负责 网络请求接口的封装

这是一份很详细的 Retrofit 2.0 使用教程

Android:手把手带你深入剖析 Retrofit 2.0 源码

RxJava

给 Android 开发者的 RxJava 详解

retryWhen使用方法

RxJava 一句话概括就是一个实现异步操作的库

Retrofit+RxJava:RxJava把Retrofit的请求结果封装成Observable,形成链式调用,代码清晰简洁。

  • 上传文件,接口定义
    //上传单个文件
    @Multipart
    @POST("upload")
    Observable<String> uploadFile(@Part("images") RequestBody file);

    //上传参数和单个文件
    @Multipart
    @POST("upload")
    Observable<String> uploadSingleFile(@Part("description") RequestBody description, @Part MultipartBody.Part file);

    //上传多个文件
    @Multipart
    @POST("upload")
    Observable<String> uploadMultiFile(@Part MultipartBody.Part... file);

    //上传多个文件
    @Multipart
    @POST("upload")
    Observable<String> uploadMultiFile(@Part List<MultipartBody.Part> partList);

    //上传多个文件
    @Multipart
    @POST("upload")
    Observable<String> uploadManyFile(@PartMap Map<String, RequestBody> map);
  • RequestBody和MultipartBody.Part的区别
RequestBody requestBody = RequestBody.create(MediaType.parse("multipart/form-data"), file);
// MultipartBody.Part is used to send also the actual file name
MultipartBody.Part body = MultipartBody.Part.createFormData("image", file.getName(), requestBody);
  • disposable
/**
     * {@link Disposable}
     * 调用 Disposable的dispose方法以后,会导致下游收不到事件,但是上游会继续发送剩余的事件.
     * @param view
     */
    public void testDisposable(View view) {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
                i++;
                if (i == 2) {
                    Log.d(TAG, "dispose");
                    mDisposable.dispose();
                    Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
    
  • from

    private void from() {
            String[] words = {"Hello", "Hi", "Aloha"};
            Observable.fromArray(words)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.e(TAG, "accept: s=" + s);
                        }
                    });
            ArrayList<String> list = new ArrayList<>();
            list.add("hello");
            list.add("Hi");
            list.add("Aloha");
            Observable.fromIterable(list)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.e(TAG, "accept: s=" + s);
                        }
                    });
            FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Log.e(TAG, "call: current Thread is" + Thread.currentThread().getName());
                    Thread.sleep(1000);
                    return 1;
                }
            });
            new Thread(future).start();
    
            Observable.fromFuture(future, 3000, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.e(TAG, "accept: integer =" + integer);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: error:" + throwable);
                        }
                    });
        }
  • buffer(int count, int skip)

        //buffer(int count, int skip),从原始Observable中每缓存skip个item,从中选择最多count个数据发
        // 射。如果从原始Observable缓存的数据不到skip个,就遇到onCompleted(),则发射当前缓存的数据。
        // 如果从原始Observable缓存的数据不到skip个,就遇到onError(),如果当前缓存的数据够count个,
        // 则发射这组数据,并传递错误通知。如果当前缓存的数据不够count个,则不发射当前缓存的数据,
        // 直接传递错误通知
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int j = 1; j <= 11; j++) {
                    emitter.onNext(j);
                    if (j == 10) {
                        emitter.onError(new Throwable("emit a error"));
                    }
                }
                emitter.onComplete();
            }
        }).buffer(2, 4)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.e(TAG, "accept: ");
                        for (Integer integer : integers) {
                            Log.e(TAG, "accept: integer=" + integer);
                        }
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: " + throwable);
                    }
                });
  • buffer(ObservableSource boundary, final int initialCapacity)
  Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(100)
                //第二个参数 initialCapacity 表示返回的List的初始容量
                .buffer(Observable.interval(250, TimeUnit.MILLISECONDS),6)
                .subscribe(new Consumer<List<Long>>() {
                    @Override
                    public void accept(List<Long> longs) throws Exception {
                        Log.e(TAG, "accept: ");
                        for (Long integer : longs) {
                            Log.e(TAG, "accept: integer=" + integer);
                        }
                    }
                });
                
  • compose
 /**
     * 使用compose复用操作符的例子
     * {@link NetWork#applySchedulers()}
     */
    public void getNowWeather(View view) {
        map = new HashMap();
        //"http://api.k780.com:88/?app=weather.history&weaid=1&date=2015-07-20&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json";
        map.put("app", "weather.today");
        map.put("weaid", 1);
        map.put("appkey", "10003");
        map.put("sign", "b59bc3ef6191eb9f747dd4e83c99f2a4");
        map.put("format", "json");
        NetWork.getApi().testNowWeather(map)
                .compose(NetWork.applySchedulers())
                .subscribe(new Consumer<NowWeatherBean>() {
                    @Override
                    public void accept(NowWeatherBean bean) throws Exception {
                        Log.e(TAG, bean.getCitynm());
                        binding.textWeatherResult.setText(bean.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error" + throwable.getMessage());
                    }
                });
    }
  • API 接口,统一使用 下面的方法来获取数据
@GET
Observable<HttpResult<Object>> getData(@Url String url, @QueryMap Map<String, Object> map);

  • 如果要下载Apk 使用 DownloadUtil(RetrofitRxJavaActivity有使用例子),或者使用系统自带的DownloadManager(DownloadManagerActivity有使用例子)

  • retry

 private void testRetry() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                /*emitter.onNext(number);
                emitter.onComplete();*/
                retryTimes--;
                if (retryTimes > 0) {
                    emitter.onError(new RuntimeException("always fails"));
                } else {
                    //emitter.onError(new Throwable("last fails"));
                    emitter.onNext(100);
                    emitter.onComplete();
                }
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.zipWith(Observable.range(1, rangeCount), new BiFunction<Throwable, Integer, Boolean>() {
                    @Override
                    public Boolean apply(Throwable throwable, Integer integer) throws Exception {
                        boolean b = throwable instanceof RuntimeException;
                        Log.d(TAG, "apply: throwable:" + throwable.getMessage() + ",:" + b);
                        return b;
                    }
                }).flatMap(new Function<Boolean, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Boolean aBoolean) throws Exception {
                        //如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的
                        // 是onError通知,它就将这个通知传递给观察者然后终止。
                        if (aBoolean) {
                            //重新订阅
                            return Observable.timer(1, TimeUnit.SECONDS);
                        } else {
                            //给观察者传递一个throwable,然后终止
                            return Observable.error(new Throwable("test retryWhen"));
                        }
                    }
                });
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "accept: integer:" + integer);

            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "accept: throwable:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }
  • RxJava统一异常处理

在Android平台下,Retrofit内置的CallAdapter的作用只是为了把请不请求的回调切换到主线程。线程切换内部还是使用的Handler

RxJava Schedulers

特别需要的是 ComputationScheduler 和 IoScheduler 都是依赖线程池来维护线程的,区别就是 IoScheduler 线程池中的个数是无限的,由 prefix 和 incrementAndGet() 产生的递增值来决定线程的名字;而 ComputationScheduler 中则是一个固定线程数量的线程池,数据为CPU的数目,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

RxJava 线程模型分析