RxJS 快速入门

基本概念

通常我们的数据获取方式可以归类为两大类即“拉”和“推”,“拉”和“推”描述了两种生产者和消费者的协作模式。

什么是“拉”?

在“拉”模式中,消费者决定何时从生产者处获取数据,生产者并不知何时将数据交付给消费者,在 JavaScript 中,函数迭代器都是“拉”模式的典型代表。

“拉”就意味着代码是面向过程的,我需要什么数据我就去“拉”一下,需要几次我就去拉几次,需要把拉的时机时序等控制逻辑需要下沉到消费者,在简单的同步场景下这没有问题,但在复杂或异步场景下会变得捉襟见肘,此时你需要选择“推”模式。

什么是“推”?

在“推”模式中,生产者决定何时向消费者发送数据,消费者不知道何时会收到该数据,在 JavaScript 中,Promise是常见的“推”模式。

生产者消费者
被动:在请求时产生数据。主动:决定何时请求数据。
主动:按照自己的节奏生成数据。被动:对接收到的数据做出反应。

遗憾的是 Promise 也不能解决复杂场景下的需求,比如多次推送,此时你需要引入其它 event emitter 库。

现在 RxJS 提供了一个新的选择即 Observable,它是有序组织多个值的生产者,并将值有序地“推”给消费者,我们暂且把这种编程范式称为“响应式编程”。在 RxJS 的世界中,Observable 是非常重要的概念,我们称它为可观察对象,它可以较好弥补我们常规“推”数据方式的不足:

  1. Promise 无法惰性求值
  2. Promise 无法多次推送
  3. Promise 无法取消
  4. Promise 无法重试
  5. ...

如果你是初学者的话,相信到现在你应该有些晕了,不过没有关系,这是正常的,学习 RxJS 确实是对以往编程习惯的一种挑战。不过在随后的一步步学习中你将会逐渐清晰并爱上 RxJS,现在我们先去创建一条简单的流体验下。


参考资料:

创建流

使用 Observable 创建一个 RxJS 流非常简单,乍一眼看过去就像创建一个 Promise 一样

import { Observable } from "rxjs";

console.log("before");

const stream$ = new Observable((subscriber) => {
  // 注意这条日志只会在产生订阅的时候才会打印
  console.log("created");
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 2000);
});

console.log("after");

stream$.subscribe((num) => {
  console.log("订阅到数据", num);
});

// before
// after
// created
// 订阅到数据 1
// 订阅到数据 2
// 订阅到数据 3

在体验过上面这个示例之后我们目前可以得出几个结论

  1. Observable 是可以多次推送值的,这在 Promise A+ 规范中无法做到,这也是两者非常重要的一个区别

    const promise = new Promise((resolve) => {
      resolve(1);
      // 第二条永远不会推送成功
      resolve(2);
    });
    
  2. Observable 是懒惰计算的,如果没有消费者,它不会有任何副作用,而 Promise 一旦创建就开始计算

    const promise = new Promise(() => {
      // 这里会立刻执行,即使没有任何消费者
      console.log("promise created");
    });
    

OK,你已经知道如何创建一个简单的流了,下一步我们学习如何编排流。

编排流

到目前为止我们学习到 RxJS 的知识还非常有限,很难在实际业务场景中发挥价值,实际上 RxJS 最强大的地方它提供了数百个功能强大的操作符。

不过在跳进数百个 API 列表中之前,现在开始我们先体验几个最为简单的操作符,看下面这个例子:

  1. 创建一个每秒钟推送一次的流
  2. 将每次推送的数据乘于 10
  3. 去除掉能够被 4 整除的推送
import { interval } from "rxjs";
import { map, filter } from "rxjs/operators";

const stream$ = interval(1000) // 创建一个每秒钟推送一次的流
  .pipe(
    map((v) => v * 10), // 将每次推送的数据乘于 10
    filter((v) => v % 4 !== 0) // 去除掉能够被 4 整除的推送
  );

stream$.subscribe((num) => {
  console.log(num);
});

// 10
// 30
// 50
// 70
// 90
// ...
什么是 pipe?

用大白话说就是"顺序执行,并把上一个函数的出参作为下一个函数的入参"

pipe(a, b, c);
// 等价于
c(b(a(args)));

那么在上面这个例子中的执行过程可以理解为:先执行 map,将结果传入 filter

没了,就这么简单,ok,现在我们已经编排出一个可用的流的,下一步我们学习如何订阅流。

订阅流

方法一

import { interval } from "rxjs";

const stream$ = interval(2000);

stream$.subscribe(
  (data) => {
    console.log("方法一:数据", data);
  },
  () => {
    console.error("方法一:错误");
  },
  () => {
    console.log("方法一:结束");
  }
);

// 错误和结束并不是必需的,换句话说如果你只想处理正常推送的数据你可以进一步简化代码
stream$.subscribe((data) => {
  console.log("方法一:数据", data);
});

方法二

import { interval } from "rxjs";

const stream$ = interval(2000);

stream$.subscribe({
  next(data) {
    console.log("方法二:数据", data);
  },
  error() {
    console.error("方法二:错误");
  },
  complete() {
    console.log("方法二:结束");
  },
});

现在你已经学会如何订阅流了,但如果只订阅而不在合适的时机取消订阅的话非常容易出现内存泄漏(常见在组件销毁的时候忘记取消订阅),下一步我们学习如何结束流。

结束流

关于如何干净地结束一个流的生命周期,我们分成两个角度来看:

从消费者的角度看,是取消订阅

编程式取消

import { interval } from "rxjs";

const stream$ = interval(1000);

// 产生订阅
const subscription = stream$.subscribe((num) => {
  console.log(num);
});

//取消订阅
subscription.unsubscribe();

可以组合多个 Subscription 统一取消

import { interval, Subscription } from "rxjs";

const subscription = new Subscription();
const stream$ = interval(1000);

// 产生订阅
subscription.add(
  stream$.subscribe((num1) => {
    console.log(num1);
  })
);

// 产生订阅
subscription.add(
  stream$.subscribe((num2) => {
    console.log(num2);
  })
);

// 统一取消订阅
subscription.unsubscribe();

声明式取消

import React from "react";
import { interval, Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";

const stream$ = interval(1000);
const destroy$ = new Subject();

stream$.pipe(takeUntil(destroy$)).subscribe((num) => {
  console.log(num2);
});

// 取消订阅
destroy$.next();
destroy$.complete();

从生产者的角度看,是结束推送

  1. Observable 结束推送

    new Observable((subscriber) => {
      subscriber.complete();
    });
    
  2. Subject 结束推送

    const stream$ = new Subject();
    
    stream$.complete();
    

ok,到现在为止,已经简单了解 RxJS 基本概念。