# rxjs7 **Repository Path**: iceleee/rxjs7 ## Basic Information - **Project Name**: rxjs7 - **Description**: rxjs7 中文文档 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 1 - **Created**: 2021-03-23 - **Last Updated**: 2025-04-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RXJS 7 ## Observable (可观察对象) **Observables** 是多个值的惰性推送集合,它填补了下面表格中的空白 | | 单个值 | 多个值 | | :--: | :------: | :--------: | | 拉取 | Function | Iterator | | 推送 | Promise | Observable | **示例**-当订阅下面代码中的 **Observable** 的时候会立即(同步地)推送值 **1**、**2**、**3**,然后 **1** 秒后会推送值 **4**,再然后是完成流: ```js import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); setTimeout(() => { subscriber.next(4); subscriber.complete(); }, 1000); }); ``` 要调用 **Observable** 并看到这些值,我们需要订阅 **Observable**: ```js import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); setTimeout(() => { subscriber.next(4); subscriber.complete(); }, 1000); }); console.log('just before subscribe'); observable.subscribe({ next(x) { console.log('got value ' + x); }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); }, }); console.log('just after subscribe'); // Logs: // just before subscribe // got value 1 // got value 2 // got value 3 // just after subscribe // got value 4 // done ``` ## 拉取 (Pull) vs. 推送 (Push) 拉取和推送是两种不同的协议,用来描述数据生产者 (**Producer**) 如何与数据消费者 (**Consumer**) 进行通信的。 什么是拉取? - 在拉取体系中,由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。 每个 **JavaScript** 函数都是拉取体系。函数是数据的生产者,调用该函数的代码通过从函数调用中 **取出** 一个单个返回值来对该函数进行消费。 **ES6** 引入了 **generator** 函数和 **iterators** (**function**\*),这是另外一种类型的拉取体系。调用 **iterator**.**next**() 的代码是消费者,它会从 **iterator**(生产者) 那 **取出** 多个值。 | | 生产者 | 消费者 | | :--: | :--------------------------: | :--------------------------: | | 拉取 | 被动的: 当被请求时产生数据 | 主动的: 决定何时请求数据 | | 推送 | 主动的: 按自己的节奏产生数据 | 被动的: 对收到的数据做出反应 | 什么是推送? - 在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。 在当今的 **JavaScript** 世界中,**Promises** 是最常见的推送体系类型。**Promise**(生产者) 将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由 **Promise** 来决定何时把值 **推送** 给回调函数。 **RxJS** 引入了 **Observables**,一个新的 **JavaScript** 推送体系。**Observable** 是多个值的生产者,并将值 **推送** 给观察者(消费者)。 - **Function** 是惰性的评估运算,调用时会同步地返回一个单一值。 - **Generator** 是惰性的评估运算,调用时会同步地返回零到(有可能的)无限多个值。 - **Promise** 是最终可能(或可能不)返回单个值的运算。 - **Observable** 是惰性的评估运算,它可以从它被调用的时刻起同步或异步地返回零到(有可能的)无限多个值。 ## Observables 作为函数的泛化 与流行的说法正好相反,**Observables** 既不像 **EventEmitters**,也不像多个值的 **Promises** 。在某些情况下,即当使用 **RxJS** 的 **Subjects** 进行多播时, **Observables** 的行为可能会比较像 **EventEmitters**,但通常情况下 **Observables** 的行为并不像 **EventEmitters** 。 > Observables 像是没有参数, 但可以泛化为多个值的函数。 思考以下代码: ```js function foo() { console.log('Hello'); return 42; } const x = foo.call(); // same as foo() console.log(x); const y = foo.call(); // same as foo() console.log(y); // Logs: // 'Hello' // 42 // 'Hello' // 42 ``` 你可以使用 **Observables** 重写上面的代码: ```js import { Observable } from 'rxjs'; const foo = new Observable(subscriber => { console.log('Hello'); subscriber.next(42); }); foo.subscribe(x => { console.log(x); }); foo.subscribe(y => { console.log(y); }); // Logs: // "Hello" // 42 // "Hello" // 42 ``` 这是因为函数和 **Observables** 都是惰性运算。如果你不调用函数,`console.log('Hello')` 就不会执行。**Observables** 也是如此,如果你不 **调用** 它(使用 **subscribe**),`console.log('Hello')` 也不会执行。此外,**调用** 或 **订阅** 是独立的操作:两个函数调用会触发两个单独的副作用,两个 **Observable** 订阅同样也是触发两个单独的副作用。**EventEmitters** 共享副作用并且无论是否存在订阅者都会尽早执行,**Observables** 与之相反,不会共享副作用并且是延迟执行。 > 订阅 Observable 类似于调用函数 > 一些人声称 Observables 是异步的。那不是真的。如果你用日志包围一个函数调用,像这样: ```js console.log('before'); console.log(foo.call()); console.log('after'); // Logs: // "before" // "Hello" // 42 // "after" ``` 使用 **Observables** 来做同样的事: ```js console.log('before'); foo.subscribe(x => { console.log(x); }); console.log('after'); // Logs: // "before" // "Hello" // 42 // "after" ``` 这证明了 **foo** 的订阅完全是同步的,就像函数一样。 > Observables 传递值可以是同步的,也可以是异步的。 那么 **Observable** 和 函数的区别是什么呢?**Observable** 可以随着时间的推移 **返回** 多个值,这是函数所做不到的。你无法这样: ```js function foo() { console.log('Hello'); return 42; return 100; // 死代码,永远不会执行 } ``` 函数只能返回一个值。但 **Observables** 可以这样: ```js import { Observable } from 'rxjs'; const foo = new Observable(subscriber => { console.log('Hello'); subscriber.next(42); subscriber.next(100); // "return" another value subscriber.next(200); // "return" yet another }); console.log('before'); foo.subscribe(x => { console.log(x); }); console.log('after'); // Logs: // "before" // "Hello" // 42 // 100 // 200 // "after" ``` 但你也可以异步地 **返回** 值: ```js import { Observable } from 'rxjs'; const foo = new Observable(subscriber => { console.log('Hello'); subscriber.next(42); subscriber.next(100); subscriber.next(200); setTimeout(() => { subscriber.next(300); // happens asynchronously }, 1000); }); console.log('before'); foo.subscribe(x => { console.log(x); }); console.log('after'); // Logs: // "before" // "Hello" // 42 // 100 // 200 // "after" // 300 ``` 结论: - `func.call()` 意思是 **同步地给我一个值** - `observable.subscribe()` 意思是 **给我任意数量的值,无论是同步还是异步** ## Observable 剖析 **Observables** 是使用 `new Observable` 或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送 **next** / **error** / **complete** 通知给观察者,而且执行可能会被清理。这四个方面全部编码在 **Observables** 实例中,但某些方面是与其他类型相关的,像 **Observer** (观察者) 和 **Subscription** (订阅)。 Observable 的核心关注点: - 创建 Observables - 订阅 Observables - 执行 Observables - 清理 Observables ### 创建 Observables **Observable** 构造函数接收一个参数:**subscribe** 函数。 下面的示例创建了一个 **Observable**,它每隔一秒会向观察者发送字符串 **'hi'** 。 ```js import { Observable } from 'rxjs'; const observable = new Observable(function subscribe(subscriber) { const id = setInterval(() => { subscriber.next('hi'); }, 1000); }); ``` > Observables 可以使用 new Observable 来创建, 但通常我们使用所谓的创建操作符, 像 of、from、interval、等等来创建 在上面的示例中,**subscribe** 函数是用来描述 **Observable** 最重要的一块。我们来看下订阅是什么意思。 ### 订阅 Observables 示例中的 **Observable** 对象 **observable** 可以像这样订阅: ```js observable.subscribe(x => console.log(x)); ``` `observable.subscribe` 和 `new Observable(function subscribe(observer) {...})` 中的 **subscribe** 有着同样的名字,这并不是一个巧合。在库中,它们是不同的,但从实际出发,你可以认为在概念上它们是等同的。 这表明 **subscribe** 调用在同一 **Observable** 的多个观察者之间是不共享的。当使用一个观察者调用 `observable.subscribe` 时,`new Observable(function subscribe(subscriber) {...})` 中的 **subscribe** 函数只服务于给定的观察者。对 `observable.subscribe` 的每次调用都会触发针对给定观察者的独立设置。 > 订阅 Observable 像是调用函数, 并提供接收数据的回调函数 这与像 **addEventListener** / **removeEventListener** 这样的事件处理方法 **API** 是完全不同的。使用 `observable.subscribe`,在 **Observable** 中不会将给定的观察者注册为监听器。**Observable** 甚至不会去维护一个附加的观察者列表。 **subscribe** 调用是启动 **Observable** 执行的一种简单方式, 并将值或事件传递给本次执行的观察者。 ### 执行 Observables `new Observable(function subscribe(subscriber) {...})` 中...的代码表示 **Observable** 执行,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。 **Observable** 执行可以传递三种类型的值: - **Next**:发送一个值,比如数字、字符串、对象,等等。 - **Error**:发送一个 **JavaScript** 错误 或 异常。 - **Complete**:不再发送任何值。 **Next** 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。**Error** 和 **Complete** 通知可能只会在 **Observable** 执行期间发生一次,并且只会执行其中的一个。 这些约束用所谓的 **Observable** 语法或合约表达最好,写为正则表达式是这样的: ```js next*(error|complete)? ``` > 在 Observable 执行中, 可能会发送零个到无穷多个 Next 通知。如果发送的是 Error 或 Complete 通知的话,那么之后不会再发送任何通知了。 下面是 **Observable** 执行的示例,它发送了三个 **Next** 通知,然后是 **Complete** 通知: ```js import { Observable } from 'rxjs'; const observable = new Observable(function subscribe(subscriber) { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); ``` **Observable** 严格遵守自身的规约,所以下面的代码不会发送 **Next** 通知 4: ```js import { Observable } from 'rxjs'; const observable = new Observable(function subscribe(subscriber) { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); subscriber.next(4); // Is not delivered because it would violate the contract }); ``` 在 **subscribe** 中用 `try/catch` 代码块来包裹任意代码是个不错的主意,如果捕获到异常的话,会发送 **Error** 通知: ```js import { Observable } from 'rxjs'; const observable = new Observable(function subscribe(subscriber) { try { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); } catch (err) { subscriber.error(err); // delivers an error if it caught one } }); ``` ### 清理 Observable 执行 因为 **Observable** 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 **API** 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。 当调用了 **observable**.**subscribe** ,观察者会被附加到新创建的 **Observable** 执行中。这个调用还返回一个对象,即 **Subscription** (订阅): ```js const subscription = observable.subscribe(x => console.log(x)); ``` **Subscription** 表示进行中的执行,它有最小化的 **API** 以允许你取消执行。想了解更多订阅相关的内容,请参见 **Subscription** 类型。使用 `subscription.unsubscribe()` 你可以取消进行中的执行: ```js import { from } from 'rxjs'; const observable = from([10, 20, 30]); const subscription = observable.subscribe(x => console.log(x)); // Later: subscription.unsubscribe(); ``` > 当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行 当我们创建了一个 **Observable** 时,**Observable** 必须定义如何清理执行的资源。你可以通过在 `function subscribe()` 中返回一个自定义的 **unsubscribe** 函数。 举例来说,这是我们如何清理使用了 **setInterval** 的 **interval** 执行集合: ```js const observable = new Observable(function subscribe(subscriber) { // Keep track of the interval resource const intervalId = setInterval(() => { subscriber.next('hi'); }, 1000); // Provide a way of canceling and disposing the interval resource return function unsubscribe() { clearInterval(intervalId); }; }); ``` 正如 **observable.subscribe** 类似于 `new Observable(function subscribe() {...})`,从 **subscribe** 返回的 **unsubscribe** 在概念上也等同于 **subscription.unsubscribe**。事实上,如果我们抛开围绕这些概念的 **ReactiveX** 类型,保留下来的只是相当简单的 **JavaScript** 。 ```js function subscribe(subscriber) { const intervalId = setInterval(() => { subscriber.next('hi'); }, 1000); return function unsubscribe() { clearInterval(intervalId); }; } const unsubscribe = subscribe({ next: x => console.log(x) }); // Later: unsubscribe(); // dispose the resources ``` 为什么我们要使用像 **Observable**、**Observer** 和 **Subscription** 这样的 **Rx** 类型?原因是保证代码的安全性(比如 **Observable** 规约)和操作符的可组合性。 ## Observer (观察者) 什么是观察者? - 观察者是由 **Observable** 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 **Observable** 发送的通知类型:**next**、**error** 和 **complete** 。下面的示例是一个典型的观察者对象: ```js const observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), }; ``` 要使用观察者,需要把它提供给 **Observable** 的 **subscribe** 方法: ```js observable.subscribe(observer); ``` > 观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。 **RxJS** 中的观察者也可能是部分的。如果你没有提供某个回调函数,**Observable** 的执行也会正常运行,只是某些通知类型会被忽略,因为观察者中没有相对应的回调函数 下面的示例是没有 **complete** 回调函数的观察者: ```js const observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), }; ``` 当订阅 **Observable** 时,你可能只提供了一个回调函数作为参数,而并没有将其附加到观察者对象上,例如这样: ```js observable.subscribe(x => console.log('Observer got a next value: ' + x)); ``` 在 `observable.subscribe` 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 **next** 的处理方法。三种类型的回调函数都可以直接作为参数来提供: ```js observable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification'), ); ``` ## Subscription 什么是 **Subscription** ? - **Subscription** 是表示可清理资源的对象,通常是 **Observable** 的执行。**Subscription** 有一个重要的方法,即 **unsubscribe**,它不需要任何参数,只是用来清理由 **Subscription** 占用的资源。在上一个版本的 **RxJS** 中,**Subscription** 叫做 **Disposable** (可清理对象)。 ```js import { interval } from 'rxjs'; const observable = interval(1000); const subscription = observable.subscribe(x => console.log(x)); // Later: // This cancels the ongoing Observable execution which // was started by calling subscribe with an Observer. subscription.unsubscribe(); ``` > Subscription 基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable 执行。 **Subscription** 还可以合在一起,这样一个 **Subscription** 调用 `unsubscribe()` 方法,可能会有多个 **Subscription** 取消订阅 。你可以通过把一个 **Subscription** 添加到另一个上面来做这件事: ```js import { interval } from 'rxjs'; const observable1 = interval(400); const observable2 = interval(300); const subscription = observable1.subscribe(x => console.log('first: ' + x)); const childSubscription = observable2.subscribe(x => console.log('second: ' + x), ); subscription.add(childSubscription); setTimeout(() => { // Unsubscribes BOTH subscription and childSubscription subscription.unsubscribe(); }, 1000); // Logs: // second: 0 // first: 0 // second: 1 // first: 1 // second: 2 ``` **Subscriptions** 还有一个 `remove(otherSubscription)` 方法,用来撤销一个已添加的子 **Subscription** ## Subject (主体) 什么是 **Subject**? - **Subject** 是一种特殊类型的 **Observable**,它允许将值多播给多个观察者,所以 **Subject** 是多播的,而普通的 **Observables** 是单播的(每个已订阅的观察者都拥有 **Observable** 的独立执行) > Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。 每个 **Subject** 都是 **Observable** 。 - 对于 **Subject**,你可以提供一个观察者并使用 **subscribe** 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 **Observable** 执行是来自普通的 **Observable** 还是 **Subject** 。 在 **Subject** 的内部,**subscribe** 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 **addListener** 的工作方式。 每个 **Subject** 都是观察者。 - **Subject** 是一个有如下方法的对象: `next(v)`、`error(e)` 和 `complete()` 。要给 **Subject** 提供新值,只要调用 `next(theValue)`,它会将值多播给已注册监听该 **Subject** 的观察者们。 在下面的示例中,我们为 **Subject** 添加了两个观察者,然后给 **Subject** 提供一些值: ```js import { Subject } from 'rxjs'; const subject = new Subject(); subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); subject.next(1); subject.next(2); // Logs: // observerA: 1 // observerB: 1 // observerA: 2 // observerB: 2 ``` 因为 **Subject** 是观察者,这也就在意味着你可以把 **Subject** 作为参数传给任何 **Observable** 的 **subscribe** 方法,如下面的示例所展示的: ```js import { Subject, from } from 'rxjs'; const subject = new Subject(); subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); const observable = from([1, 2, 3]); observable.subscribe(subject); // You can subscribe providing a Subject // Logs: // observerA: 1 // observerB: 1 // observerA: 2 // observerB: 2 // observerA: 3 // observerB: 3 ``` 使用上面的方法,我们基本上只是通过 **Subject** 将单播的 **Observable** 执行转换为多播的。这也说明了 **Subjects** 是将任意 **Observable** 执行共享给多个观察者的唯一方式。 还有一些特殊类型的 **Subject**:**BehaviorSubject**、**ReplaySubject** 和 **AsyncSubject**。 ### 多播的 Observables 多播 **Observable** 通过 **Subject** 来发送通知,这个 **Subject** 可能有多个订阅者,然而普通的 单播 **Observable** 只发送通知给单个观察者。 > 多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。 在底层,这就是 **multicast** 操作符的工作原理:观察者订阅一个基础的 **Subject**,然后 **Subject** 订阅源 **Observable** 。下面的示例与前面使用 `observable.subscribe(subject)` 的示例类似: ```js import { from, Subject } from 'rxjs'; import { multicast } from 'rxjs/operators'; const source = from([1, 2, 3]); const subject = new Subject(); const multicasted = source.pipe(multicast(subject)); // These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: v => console.log(`observerA: ${v}`), }); multicasted.subscribe({ next: v => console.log(`observerB: ${v}`), }); // This is, under the hood, `source.subscribe(subject)`: multicasted.connect(); ``` **multicast** 操作符返回一个 **Observable**,它看起来和普通的 **Observable** 没什么区别,但当订阅时就像是 **Subject** 。**multicast** 返回的是 **ConnectableObservable**,它只是一个有 `connect()` 方法的 **Observable** 。 `connect()` 方法十分重要,它决定了何时启动共享的 **Observable** 执行。因为 `connect()` 方法在底层执行了 `source.subscribe(subject)`,所以它返回的是 **Subscription**,你可以取消订阅以取消共享的 **Observable** 执行。 ### 引用计数 手动调用 `connect()` 并处理 **Subscription** 通常太笨重。通常,当第一个观察者到达时我们想要自动地连接,而当最后一个观察者取消订阅时我们想要自动地取消共享执行。 请考虑以下示例,下面的列表概述了 **Subscriptions** 发生的经过: 1. 第一个观察者订阅了多播 Observable 2. 多播 Observable 已连接 3. next 值 0 发送给第一个观察者 4. 第二个观察者订阅了多播 Observable 5. next 值 1 发送给第一个观察者 6. next 值 1 发送给第二个观察者 7. 第一个观察者取消了多播 Observable 的订阅 8. next 值 2 发送给第二个观察者 9. 第二个观察者取消了多播 Observable 的订阅 10. 多播 Observable 的连接已中断(底层进行的操作是取消订阅) 要实现这点,需要显式地调用 `connect()`,代码如下: ```js import { interval, Subject } from 'rxjs'; import { multicast } from 'rxjs/operators'; const source = interval(500); const subject = new Subject(); const multicasted = source.pipe(multicast(subject)); let subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: v => console.log(`observerA: ${v}`), }); // We should call `connect()` here, because the first // subscriber to `multicasted` is interested in consuming values subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: v => console.log(`observerB: ${v}`), }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); // We should unsubscribe the shared Observable execution here, // because `multicasted` would have no more subscribers after this setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // for the shared Observable execution }, 2000); ``` 如果不想显式调用 `connect()`,我们可以使用 **ConnectableObservable** 的 `refCount()` 方法(引用计数),这个方法返回 **Observable**,这个 **Observable** 会追踪有多少个订阅者。当订阅者的数量从 **0** 变成 **1**,它会调用 `connect()` 以开启共享的执行。当订阅者数量从 **1** 变成 **0** 时,它会完全取消订阅,停止进一步的执行。 > refCount 的作用是,当有第一个订阅者时,多播 Observable 会自动地启动执行,而当最后一个订阅者离开时,多播 Observable 会自动地停止执行 示例如下: ```js import { interval, Subject } from 'rxjs'; import { multicast, refCount } from 'rxjs/operators'; const source = interval(500); const subject = new Subject(); const refCounted = source.pipe(multicast(subject), refCount()); let subscription1, subscription2; // This calls `connect()`, because // it is the first subscriber to `refCounted` console.log('observerA subscribed'); subscription1 = refCounted.subscribe({ next: v => console.log(`observerA: ${v}`), }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: v => console.log(`observerB: ${v}`), }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); // This is when the shared Observable execution will stop, because // `refCounted` would have no more subscribers after this setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); }, 2000); // Logs // observerA subscribed // observerA: 0 // observerB subscribed // observerA: 1 // observerB: 1 // observerA unsubscribed // observerB: 2 // observerB unsubscribed ``` `refCount()` 只存在于 **ConnectableObservable**,它返回的是 **Observable**,而不是另一个 **ConnectableObservable** 。 ### BehaviorSubject **Subject** 的其中一个变体就是 **BehaviorSubject**,它有一个 **当前值** 的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 **BehaviorSubject** 那接收到 **当前值**。 > BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 在下面的示例中,**BehaviorSubject** 使用值 **0** 进行初始化,当第一个观察者订阅时会得到 **0**。第二个观察者订阅时会得到值 **2**,尽管它是在值 **2** 发送之后订阅的。 ```js import { BehaviorSubject } from 'rxjs'; const subject = new BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: v => console.log(`observerA: ${v}`), }); subject.next(1); subject.next(2); subject.subscribe({ next: v => console.log(`observerB: ${v}`), }); subject.next(3); // Logs // observerA: 0 // observerA: 1 // observerA: 2 // observerB: 2 // observerA: 3 // observerB: 3 ``` ### ReplaySubject **ReplaySubject** 类似于 **BehaviorSubject**,它可以发送旧值给新的订阅者,但它还可以记录 **Observable** 执行的一部分 > ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者 当创建 **ReplaySubject** 时,你可以指定回放多少个值: ```js import { ReplaySubject } from 'rxjs'; const subject = new ReplaySubject(3); // buffer 3 values for new subscribers subject.subscribe({ next: v => console.log(`observerA: ${v}`), }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: v => console.log(`observerB: ${v}`), }); subject.next(5); // Logs: // observerA: 1 // observerA: 2 // observerA: 3 // observerA: 4 // observerB: 2 // observerB: 3 // observerB: 4 // observerA: 5 // observerB: 5 ``` 除了缓冲数量,你还可以指定 **window time** (以毫秒为单位)来确定多久之前的值可以记录。在下面的示例中,我们使用了较大的缓存数量 **100**,但 **window time** 参数只设置了 **500** 毫秒 ```js import { ReplaySubject } from 'rxjs'; const subject = new ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: v => console.log(`observerA: ${v}`), }); let i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: v => console.log(`observerB: ${v}`), }); }, 1000); // Logs // observerA: 1 // observerA: 2 // observerA: 3 // observerA: 4 // observerA: 5 // observerB: 3 // observerB: 4 // observerB: 5 // observerA: 6 // observerB: 6 // ... ``` ### AsyncSubject **AsyncSubject** 是另一个 **Subject** 变体,只有当 **Observable** 执行完成时(执行 **complete**()),它才会将执行的最后一个值发送给观察者。 ```js import { AsyncSubject } from 'rxjs'; const subject = new AsyncSubject(); subject.subscribe({ next: v => console.log(`observerA: ${v}`), }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: v => console.log(`observerB: ${v}`), }); subject.next(5); subject.complete(); // Logs: // observerA: 5 // observerB: 5 ``` **AsyncSubject** 和 `last()` 操作符类似,因为它也是等待 **complete** 通知,以发送一个单个值。 ## Operators (操作符) 尽管 RxJS 的根基是 Observable,但最有用的还是它的操作符。操作符是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元 ### 什么是操作符? 操作符是 **Observable** 类型上的方法,比如 `.map(...)`、`.filter(...)`、`.merge(...)`,等等。当操作符被调用时,它们不会改变已经存在的 **Observable** 实例。相反,它们返回一个新的 **Observable** ,它的 **subscription** 逻辑基于第一个 **Observable** 。 > 操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。 操作符本质上是一个纯函数,它接收一个 **Observable** 作为输入,并生成一个新的 **Observable** 作为输出。订阅输出 **Observable** 同样会订阅输入 **Observable** ## Scheduler (调度器) 什么是调度器? - 调度器控制着何时启动 **subscription** 和何时发送通知。它由三部分组成: 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 `setTimeout` 或 `process.nextTick`),或动画帧)。 调度器有一个(虚拟的)时钟。 调度器功能通过它的 `getter` 方法 `now()` 提供了 **时间** 的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。 调度器可以让你规定 **Observable** 在什么样的执行上下文中发送通知给它的观察者。 在下面的示例中,我们采用普通的 **Observable** ,它同步地发出值 **1**、**2**、**3**,并使用操作符 **observeOn** 来指定 **async** 调度器发送这些值。 ```js import { Observable, asyncScheduler } from 'rxjs'; import { observeOn } from 'rxjs/operators'; const observable = new Observable(observer => { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }).pipe(observeOn(asyncScheduler)); console.log('just before subscribe'); observable.subscribe({ next(x) { console.log('got value ' + x); }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); }, }); console.log('just after subscribe'); // Logs // just before subscribe // just after subscribe // got value 1 // got value 2 // got value 3 // done ``` 注意通知 `got value...` 在 `just after subscribe` 之后才发送,这与我们到目前为止所见的默认行为是不一样的。这是因为 `observeOn(asyncScheduler)` 在 `new Observable` 和最终的观察者之间引入了一个代理观察者。在下面的示例代码中,我们重命名了一些标识符,使得其中的区别变得更明显: ```js import { Observable, asyncScheduler } from 'rxjs'; import { observeOn } from 'rxjs/operators'; var observable = new Observable(proxyObserver => { proxyObserver.next(1); proxyObserver.next(2); proxyObserver.next(3); proxyObserver.complete(); }).pipe(observeOn(asyncScheduler)); var finalObserver = { next(x) { console.log('got value ' + x); }, error(err) { console.error('something wrong occurred: ' + err); }, complete() { console.log('done'); }, }; console.log('just before subscribe'); observable.subscribe(finalObserver); console.log('just after subscribe'); ``` **proxyObserver** 是在 `observeOn(asyncScheduler)` 中创建的,它的 `next(val)` 函数大概是下面这样子的: ```js const proxyObserver = { next(val) { asyncScheduler.schedule( x => finalObserver.next(x), 0 /* delay */, val /* will be the x for the function above */, ); }, // ... }; ``` **async** 调度器操作符使用了 `setTimeout` 或 `setInterval`,即使给定的延迟时间为 **0**。照例,在 **JavaScript** 中,我们已知的是 `setTimeout(fn, 0)` 会在下一次事件循环迭代的最开始运行 `fn` 。这也解释了为什么发送给 **finalObserver** 的 `got value 1` 发生在 `just after subscribe` 之后。 调度器的 `schedule()` 方法接收一个 **delay** 参数,它指的是相对于调度器内部时钟的一段时间。调度器的时钟不需要与实际的挂钟时间有任何关系。这也就是为什么像 **delay** 这样的时间操作符不是在实际时间上操作的,而是取决于调度器的时钟时间。这在测试中极其有用,可以使用虚拟时间调度器来伪造挂钟时间,同时实际上是在同步执行计划任务。 ### 调度器类型 **async** 调度器是 **RxJS** 提供的内置调度器中的一个。可以通过使用 **Scheduler** 对象的静态属性创建并返回其中的每种类型的调度器 | 调度器 | 目的 | | :---------------------: | :------------------------------------------------------------------------: | | null | 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作 | | queueScheduler | 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。 | | asapScheduler | 微任务的队列调度,用于异步转换 | | asyncScheduler | 使用 setInterval 的调度。用于基于时间的操作符。 | | animationFrameScheduler | 用于在浏览器下次重绘之前 | ### 使用调度器