接口请求、键盘输入、文件上传、界面拖拽......业务中有各种各样“异步”的情形。在过去我们只能精心编织代码,祈求并发不要出错,接口响应不要相互覆盖......但有了RxJS,处理这些事件变成了一种尽在掌握的享受,以至于我觉得这是JS中最值得掌握的package之一。
Think of RxJS as Lodash for events.
但我就不介绍RxJS的概念了,因为真的不太好理解。
下面的内容,全是个人的想象!
在时间的概念还不存在的时候,JS宇宙中散落着一些零散的元素,没人知道它们什么时候存在,什么时候消失,它们互相也无法发生联系......
但某一天,我们得到了一块时间宝石!
从此我们可以创造时间线,并将元素控制在这条时间线内:
如果元素几乎是同步发生的,就像这样:
如果是等待一段时间后才被出现的元素,就可能像这样:(好比API请求的响应,或者用户的输入,你永远不知道它们什么时候会到来!)
那么,如何用时间宝石创造时间线?我稍微挑几个方法介绍:
首先是of
法术,它会将一系列元素紧密的创造到一条时间线上:
of(1, 2, 3)
结果就是这样,元素1
,2
,3
会排在一起。
然后是from
法术,它只接受一个元素,但会根据元素类型不同有不同效果,其中最常见的两种反应:
of
类似:from([1, 2, 3])
from("abc")
from(fetch('api.com'))
另一个非常实用的法术就是fromEvent
,它能够将用户输入展开到一条时间线上:
fromEvent(window, 'keydown')
Subject
比较复杂,总的来说它有两个特殊能力
听起来有点复杂,让我们仔细看看:
const subject = new Subject();
subject.subscribe((value: number) => console.log(`log all number: ${value}`));
subject
.pipe(filter((value: number) => value % 2 === 0))
.subscribe((value) => console.log(`log even number: ${value}`));
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.next(5);
// output
// log all number: 1
// log all number: 2
// log even number: 2
// log all number: 3
// log all number: 4
// log even number: 4
// log all number: 5
对于一个subject
,它可以通过subscribe
创建多条时间线并与其关联。subject
每一次调用next
,都会往与其关联的所有时间线发送元素。
在我们创造一条时间线时,时间还没有开始流动。
fromEvent(window, 'keydown')
只有当我们决定观测,时间才会开始流动!!
对于一条时间线,使用subscribe
观测每个元素的出现:
fromEvent(window, 'keydown').subscribe((e) => {
console.log(e.code);
});
我们不仅可以通过subscribe
观测正常的元素,甚至可以观测到错误与时间的结束:
import { from } from 'rxjs';
// fake network request, not important.
const fakeFetch = (url) =>
new Promise((resolve, reject) => {
const takedTime = 2000 + Math.random() * 3000;
setTimeout(() => {
if (takedTime < 4000) {
resolve(`${url} request succeed!`);
} else {
reject(`${url} error!`);
}
}, takedTime);
});
const request = from(fakeFetch('www.baidu.com'));
// important!
request.subscribe({
complete: () => console.log('done'),
next: (v) => console.log(v),
error: (error) => console.log(error),
});
拥有时间宝石的我们显然不会仅仅满足于创造,我们还拥有许多影响时间线的能力!
要改变时间线,只需要使用pipe
。
在pipe
中使用"map"法术,可以改变时间线上的每个元素:
of(1, 2).pipe(map(value => 10 * value))
又或者如"filter"能力,可以筛选时间线上的元素:
from([1, 2, 3, 4]).pipe(filter(value => value % 2 === 1))
我们不得不提一个问题:被观测的时间线何时才会结束?
这主要取决于你的时间线是如何创造的。
使用of
,from
这样的方法创造时间线,由于在创造时就需要提供所有的元素,因此在这些元素全部触发后,时间线会自动结束。
如果是使用fromEvent
这样的方法,首先要看我们有没有使用一些特别的方法去影响时间线,比如first
,take
,由于这些方法只关心时间线中固定数量的元素,这条时间线也会在元素触发后自动结束。
最后,我们还可以手动停止时间线观测unsubscribe
:
const observable = fromEvent(document, "click");
const subscription = observable.subscribe((x) => console.log(x));
// Later
subscription.unsubscribe();
如果存在两条不相干的时间线,时间宝石可以对其进行合并:
import { merge, fromEvent, interval } from 'rxjs';
const clicks = fromEvent(document, 'click');
const timer = interval(1000);
const clicksOrTimer = merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));
直到刚刚我们说的都还是常规操作......
但随着我们逐渐熟悉时间宝石的能力,一个大胆的想法孕育而生:
从时间上的元素创造出另一条时间线!
仔细看看下面这段代码:
from
让这个请求落在了新的时间线上from(['api.acfun.com', 'api.bilibili.com', 'api.x.com']).pipe(
map((value) => from(fakeFetch(value)))
);
但问题来了,时间线被嵌套住了,还怎么观测我们最关心的Response?
还记得时间宝石有合并时间线的能力么,对于嵌套时间线,它还可以使用mergeAll
法术对其进行展平!
from(['api.acfun.com', 'api.bilibili.com', 'api.x.com'])
.pipe(map((value) => from(fakeFetch(value))))
.pipe(mergeAll())
.subscribe((value) => console.log(value));
这就是时间宝石的招牌能力之一——自由的嵌套或展平时间线!
事实上,由于这个特性过于好用,我们甚至发明了快速施法的口令:mergeMap
,此时的效果与先map
再mergeAll
是一样的。
from(['api.acfun.com', 'api.bilibili.com', 'api.x.com'])
.pipe(mergeMap((value) => from(fakeFetch(value))))
.subscribe((value) => console.log(value));
说了这么多,我来举两个例子,看看在实际的情景中,这样一块时间宝石可以帮我们做到哪些事情。
假设我们需要请求一批资源,共50个API地址,因此我们希望限制一下最大并发数。
我们已经提到了mergeMap
,它实际的作用是“遍历时间线上的每个值并生成新的时间线,再将这些时间线上的元素铺平到一条新的时间线”,它总共支持三个参数,而第三个参数concurrent
是这里要重点介绍的(默认为Infinity
),用于限制同时处理的嵌套时间线数量!比如将其设置为2,那么只有时间线上的前两个元素的嵌套时间线结束后,才会开始处理第三个元素!
因此我们只要稍微修改上一章节的实例代码,就可以达到想要的效果:
const urls = ['www.bilibili.com', 'www.acfun.com', 'learnrxjs.io'];
const result = from(urls).pipe(
mergeMap((url) => {
return from(fakeFetch(url));
}, 2) // 限制同时激活的嵌套时间线为2,就起到了限制并发数为2的效果
);
result.subscribe((x) => console.log(x));
细心的小朋友可能发现,mergeAll
也可以接收concurrent
参数,那如果这样写,效果是一样的么?
from(['api.acfun.com', 'api.bilibili.com', 'api.x.com'])
.pipe(map((value) => from(fakeFetch(value))))
.pipe(mergeAll(2))
.subscribe((value) => console.log(value));
建议有冒险精神的小朋友们自己试试( ͡° ͜ʖ ͡°)
上、上、下、下、左、右、左、右、B、A、B、A
如果我们要检测用户是否在键盘输入了这个经典秘籍,可以怎么做?
这里介绍一个处理时间线的方法:bufferCount
,它可以聚合过去的x个元素,将其变成一个元素。
那么解决方案就很直观了不是么:
const secret = [
'ArrowUp',
'ArrowUp',
'ArrowDown',
'ArrowDown',
'ArrowLeft',
'ArrowRight',
'ArrowLeft',
'ArrowRight',
'KeyB',
'KeyA',
'KeyB',
'KeyA',
].join('');
const click$ = fromEvent(window, 'keydown');
click$
.pipe(
map((e: KeyboardEvent) => e.code),
bufferCount(12, 1)
)
.subscribe((value) => {
if (value.join('') === secret) {
console.log('add 30 lifes!');
}
});
最后,我们重新捋一下概念:
subscribe
方法的参数:Observer好了,相信你已经完全掌握RxJS了!
开玩笑的,现在你可以去试着看RxJS的官方文档了,相信此时再去理解各种各样的名词将不会那么困难。说到底,只要元素最终被控制在一条时间线上,你就可以为所欲为!