0%

rxjs

rxjs

rxjs是一个响应式编程的js实现,angular工程里面经常会用到

核心概念

主要是实现了设计模式中的观察者模式
观察者模式主要实现对象间一对多的行为模式。

  • 可观察对象 Observable
  • 观察者 Observer

完整的Observer观察者需要需要3个处理器

  • 消息处理器Next (必选的)
  • 错误处理器Error
  • 观察结束处理器 complete
    1
    2
    3
    4
    5
    const observer = {
    next: x => console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
    };
    可观察对象Observable能够向观察者发射这三种类型的消息给观察者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 这里我简单的模拟下Observable对象的结构
    class Observable{
    constructor(fn){
    this.fn=fn
    }
    subscribe(observer){
    this.fn(observer)
    }
    }
    let observable=new Observable(subscriber => {
    subscriber.next(1);
    //加了这句后面就不执行了,直接走错误处理器
    // throw '报错了'
    subscriber.next(2);
    subscriber.next(3);
    setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
    }, 1000);
    });
    加上observable.subscribe(observer)这句代码,就可以让观察者执行被观察对象发出的消息了。

实现观察者模式

上面的实现,是一对一的观察(单播)。观察者模式是一对多(多播),跟事件增加处理器一样的道理。rxjs增加了一个中间人Subject做分发。

就像有个山洞,只能进去一个人,但是有一堆人想知道里面的消息。那么派一个人观察,然后告诉大家里面发生了什么就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 模拟一个 Subject 类做消息分发
class Subject{
this.observerList=[]
subscribe(observer){
observerList.push(observer)
}
unsubscribe(observer){
let index=this.observerList.findIndex(observer)
this.observerList=[]
}
complete(){
this.observerList.forEach(observer => {
observer.complete()
});
}
error(err){
this.observerList.forEach(observer => {
observer.error(err)
});
}
next(arg){
this.observerList.forEach(observer => {
observer.next(arg)
});
}
}
let subject=new Subject() //找个中间人
let observer1=()=>{console.log('观察者1')}
let observer2=()=>{console.log('观察者2')}
subject.subscribe(observer1)//观察者1订阅中间人的消息
subject.subscribe(observer2)//观察者2订阅中间人的消息
observable.subscribe(subject) // 中间人订阅可观察对象

取消订阅

订阅会产生订阅信息subscription,订阅信息可以做从属关系,主订阅信息取消,从订阅信息也会被取消

1
2
3
4
let subscription1=subject.subscribe(observer1)
let subscription2=subject.subscribe(observer2)
subscription1.add(subscription2)//建立从属关系
subscription1.unsubscribe()//这时候subscription2也跟着一起取消了

中间人还有几个变体

  • BehaviorSubject 这个中间人会把你订阅时候前一次消息告知观察者(比如我打开电视去看足球赛,旁边一个老铁跟我说了句,现在中国队0:7落后。我拿到这个消息,可能会立马砸掉电视,而不会继续观看。)
  • ReplaySubject 这个中间人有一个缓冲区,和一个缓冲时间。他会把观察者订阅之前出于缓冲区内且在缓冲时间内的消息,告知观察者。
  • AsyncSubject 这个中间人只会等待complete后通知最后的结果(比如观测者不关心过程,只关心结果的情况)

分离订阅时间点

目前观察者跟可观察对象,是在建立联系的那一刻。可观察对象就产生消息。我们需要控制联系的时间。比如一个在线视频,我需要等前面的广告播放完再看。
rxjs是怎么做的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

setTimeout(() => {
subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

运算符 Operators

我们在上述例子中使用了不少rxjs的运算符。运算符包含两种:

  • 一种是创建运算符,这种运算符的返回值都是一个可观察对象。它能从集合,定时任务,DOM事件创建可观察对象。
  • 另一种是管道运算符,这种运算符用于对可观测对象发出的消息进行处理,比如做消息内容的整合,消息时间的调整。

调度器 Scheduler

调度器分类

  • null 同步传递消息
  • queueScheduler 队列调度器,事件宏任务
  • asapScheduler 微任务调度器,在两次宏任务之间执行,用于promise
  • asyncScheduler 异步调度器,用于定时器
  • animationFrameScheduler 动画调度器

调度器用处

  • 作为静态操作符的参数
  • subscribeOn(scheduler) 在调用subscribe()函数的时候,作为执行的上下文
  • observeOn(scheduler) 当可观察对象执行next()函数的时候,作为执行上下文

    来个例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    //调度器枚举值,实际上是调度器对象,还能配置,这里简化处理
    const schedulerType={
    asyncScheduler:'asyncScheduler',
    queueScheduler:'queueScheduler',
    asapScheduler:'asapScheduler',
    animationFrameScheduler:'animationFrameScheduler',
    }

    // 模拟个from运算符
    function from(array, scheduler){
    let observe;
    let observe=subscriber => {
    array.forEach(element => {
    subscriber.next(element)
    });
    }
    if(scheduler==schedulerType.asyncScheduler){
    observe=subscriber => {
    setTimeout(() => {
    array.forEach(element => {
    subscriber.next(element)
    });
    }, 0);

    }
    }
    return new Observable(observe);
    }
    console.log('start')
    from([1,2,3]).subscribe(x=>console.log(x))
    console.log('end')
    //logs打印如下
    //start
    //1
    //2
    //3
    //end
    console.log('start')
    from([1,2,3],schedulerType.asyncScheduler).subscribe(x=>console.log(x))
    console.log('end')
    //logs打印如下
    //start
    //end
    //1
    //2
    //3

欢迎关注我的其它发布渠道

来发评论吧~
Powered By Valine
v1.5.2