RxJava中操作符到底做了什么?

RxJava今年彻底火了一把,其中最牛逼之处就是操作符了,以前只知道怎么用,这几天看了看源码,大致的弄清楚了操作符的工作过程,今天分享给大家。如果有什么不对地方,请大家多多指教。

目前累计服务客户上千余家,积累了丰富的产品开发及服务经验。以网站设计水平和技术实力,树立企业形象,为客户提供成都网站制作、做网站、外贸营销网站建设、网站策划、网页设计、网络营销、VI设计、网站改版、漏洞修补等服务。成都创新互联始终以务实、诚信为根本,不断创新和提高建站品质,通过对领先技术的掌握、对创意设计的研究、对客户形象的视觉传递、对应用系统的结合,为客户提供更好的一站式互联网解决方案,携手广大客户,共同发展进步。

今天我们已filter为例,看代码:

 
 
  1. Integer[] datas={1,2,3,4,5,6,7,8,9,10};
  2. Observable.from(datas)
  3.         .filter(new Func1() {
  4.             @Override
  5.             public Boolean call(Integer integer) {
  6.                 return integer>=5;
  7.             }
  8.         })
  9.         .subscribe(new Action1() {
  10.             @Override
  11.             public void call(Integer integer) {
  12.                 mText.append(integer.toString()+",");
  13.             }
  14.         });

一个很简单的小例子,用过滤操作符 filter 找出大于等于5的数字。我们点进去看看源码中filter做了什么

 
 
  1. public final Observable filter(Func1 predicate) { 
  2. return create(new OnSubscribeFilter(this, predicate)); 
  3. }

调用了create()方法,等等我们什么时候是不是也用过create() 方法,我们在创建Observable时候也用过create()方法,原来创建了一个新的Observable返回出去了,那岂不是说我们的订阅者其实订阅的是这个新的Observable,我们继续往下看create方法,create方法需要的参数是一个OnSubscribe对象,那我们可以确定OnSubscribeFilter是OnSubscribe的一个实现类,我们点进去看看。

 
 
  1. public final class OnSubscribeFilter implements OnSubscribe {
  2.    
  3.        final Observable source;
  4.    
  5.        final Func1 predicate;
  6.    
  7.        public OnSubscribeFilter(Observable source, Func1 predicate) {
  8.            this.source = source;
  9.            this.predicate = predicate;
  10.        }

果然不出我们所料,OnSubscribeFilter是OnSubscribe的实现类,我们看他的构造方法,传递了两个参数,第一个参数Observable对象,一个Func1,其中第一个参数就是我们我们自己创建的那个Observable,第二个参数使我们在外面写的Func1,然后保存了起来。我们都知道在subscribe()订阅的时候,OnSubscribe的call()方法。我们看看OnSubscribeFilter的call()方法都干了些什么

 
 
  1. @Override
  2.         public void call(final Subscriber child) {
  3.             FilterSubscriber parent = new FilterSubscriber(child, predicate);
  4.             child.add(parent);
  5.             source.unsafeSubscribe(parent);
  6.         }

出现了一个FilterSubscriber,什么鬼玩意儿,我们看看他是什么鬼

 
 
  1. }
  2.       @Override
  3.       public void onError(Throwable e) {
  4.           if (done) {
  5.               RxJavaHooks.onError(e);
  6.               return;
  7.           }
  8.           done = true;
  9.           actual.onError(e);
  10.       }
  11.       @Override
  12.       public void onCompleted() {
  13.           if (done) {
  14.               return;
  15.           }
  16.           actual.onCompleted();
  17.       }
  18.       @Override
  19.       public void setProducer(Producer p) {
  20.           super.setProducer(p);
  21.           actual.setProducer(p);
  22.       }
  23.   }

一个Subscriber的子类,我们看他的构造方法,两个参数,一个Subscriber一个Func1,我们在创建对象时候Subscriber对象是我们真正的从外界传过来的观察者,Func1呢使我们创建OnSubscribeFilter时候传递进来的对象,也就是我们在外界定义的Func1。

回过头来我们继续看OnSubscribeFilter的call方法。我们看到source.unsafeSubscribe(parent),source是我们原来外界的Observable,他订阅了FilterSubscriber对象。我们在他的onNext方法中看到他根据func1.call(t)的返回值来判断是否让我们外界的真正的观察者调用onNext方法。

看到这里有没有恍然大悟,啥?我都不知道你在说啥,额,那我们整体的屡屡。

我们外界的代码,在subscribe()时候,Subscriber并不是订阅了我们自己写的Observable,Subscriber订阅的是filter方法返回的那个新的Observable对象,所以订阅时候会调用OnSubscribeFilter的call方法,OnSubscribeFilter才是我们订阅的被观察者的onSubscribe对象,在OnSubscribeFilter的call()方法中,我们让我们包装的FilterSubscriber订阅我们原来的被观察者,也就是我们在外界生成的那个Observable。我们在外界的Observable的onSubscribe对象的call方法中得到的观察者是FilterSubscriber对象,我们调用的onNext会回调到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中我们根据我们传递的Func1来判断是否要回调真正的Subscriber的onNext方法,在为true的时候我们才回调我们外界的观察者的onNext方法,也就起到了过滤的作用。这就是Filter的整个的流程。

我们来测试下我们的小结论:

 
 
  1. Observable.create(new Observable.OnSubscribe() {
  2.                @Override
  3.                public void call(Subscriber subscriber) {
  4.                    Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName());
  5.                    subscriber.onNext(5);
  6.                }
  7.            }).filter(new Func1() {
  8.                @Override
  9.                public Boolean call(Integer integer) {
  10.                    return integer > 0;
  11.                }
  12.            }).subscribe(new Action1() {
  13.                @Override
  14.                public void call(Integer integer) {
  15.                    
  16.                }
  17.            });

分享文章:RxJava中操作符到底做了什么?
文章URL:http://www.hantingmc.com/qtweb/news14/84114.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联