反应式编程
反应式编程的科普文章,翻译自https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
什么是反应式编程
微软的“反应式编程”指的是“Rx = Observables + LINQ + Schedulers”,和其他的反应式编程的概念不太相同。
反应式编程用一句话总结就是:
反应式编程就是使用异步数据流进行编程
在某种程度上,这不是什么新鲜事。事件总线或典型的单击事件实际上是一个异步事件流,您可以观察(Observe)并执行一些操作。反应式是相同的想法。您可以创建任何数据流,而不仅仅是单击和悬停事件。流是廉价且无处不在的,任何东西都可以是流:变量、用户输入、属性、缓存、数据结构等。例如,假设您的推特提要(Feed)是一个数据流,其方式与点击事件相同。你可以监听这个流并做出相应的反应。
除此之外,您还可以使用一个函数工具箱来组合、创建和过滤这些流。这就是“功能性”魔法发挥作用的地方。流可以用作另一个流的输入。甚至可以将多个流用作另一个流的输入。您可以合并两个流。您可以过滤一个流,以获得另一个只包含您感兴趣的事件的流。您可以将数据值从一个流映射到另一个新流。
既然流对Reactive非常重要,那么让我们从我们熟悉的“点击按钮”事件流开始,仔细看看它们。
流是按时间顺序排列的正在进行的事件的序列。它可以发出三种不同的信号:值(某种类型)、错误和“完成”信号。例如,当包含该按钮的当前窗口或视图关闭时,会发出“已完成”事件。
我们只异步捕获这些发出的事件,方法是定义一些函数,其中有的函数在发出值时执行,有的在发出错误时执行,有的在发出“完成”时执行。有时最后两个可以省略,您可以只专注于为值定义函数。“监听”流称为订阅(Subscribing)。我们定义的函数是观察者。流是被观察的对象(或“可观察的”)。这正是观察者设计模式。
绘制该图的另一种方法是使用ASCII,我们将在本教程的某些部分中使用它:
1 | --a---b-c---d---X---|-> |
由于这种感觉已经很熟悉了,我不想让你感到无聊,让我们做一些新的事情:我们将创建新的点击事件流,它是从原始点击事件流转换而来的。
首先,让我们创建一个计数器流,指示按钮被单击的次数。在常见的Reactive库中,每个流都有许多附加函数,例如映射(Map
)、筛选(Filter
)、扫描(Scan
)等。当您调用其中一个函数时,例如clickStream.map(f)
,会根据点击流返回一个新的流,它不会以任何方式修改原始单击流。这是一种被称为不变性(immutability)的特性,它与反应流(Reactive stream)相结合,就像煎饼与糖浆相结合一样。这允许我们链式编程clickStream.map(f).scan(g)
等函数(这些看起来就和LINQ一样):
1 | clickStream: ---c----c--c----c------c--> |
map(f)
函数根据您提供的函数f替换(到新流中)每个发出的值。在我们的例子中,我们在每次点击时都映射到数字1。scan(f)
函数聚合流上所有以前的值,生成值x=g(accumulated,current)
,其中g只是本例中的add函数。然后,每当单击发生时,counterStream
都会展示总的单击次数。
为了展示Reactive的真正威力,让我们假设您想要一系列“双击”事件。为了让它更有趣,假设我们希望新的流将三次单击(或者通常是多次单击(两次或更多))视为双击。深吸一口气,想象一下你将如何以传统的命令式和庄严的方式做到这一点。我敢打赌,这听起来相当恶心,涉及一些变量来保存状态,还涉及一些时间间隔。
然而,在Reactive中,这很简单。事实上,逻辑仅仅是4行代码。但现在让我们忽略代码。无论你是初学者还是专家,用图表思考是理解和构建流的最佳方式。
灰色框是将一个流转换为另一个流的函数。首先,只要发生250毫秒的“事件静默”(简而言之,这就是buffer(stream.throttle(250ms))
的作用),我们就会在列表中累积点击次数。此时不要担心理解细节,我们现在只是演示Reactive。结果是一个列表流,我们从中应用map()
将每个列表映射到与该列表长度匹配的整数。最后,我们使用filter(x>=2)
函数忽略1个整数。就是这样:3次操作产生我们想要的流。然后,我们可以订阅(“监听”)它,从而根据我们的意愿做出相应的反应。
我希望你喜欢这种方法的美妙之处。这个例子只是冰山一角:您可以对不同类型的流应用相同的操作,例如,对API响应流;另一方面,还有许多其他可用功能。
“为什么我应该考虑采用反应式编程”
反应式编程提高了代码的抽象级别,因此您可以专注于定义业务逻辑的事件的相互依赖性,而不必不断地处理大量的实现细节。RP中的代码可能更简洁。
这种好处在现代Web应用程序和移动应用程序中更为明显,这些应用程序与大量与数据事件相关的UI事件高度交互。10年前,与web页面的交互基本上是向后端提交一个长表单,并在前端执行简单的呈现。应用程序已经发展为更实时的:修改单个表单字段可以自动触发后台保存,对某些内容的“赞”可以实时反映给其他连接的用户,等等。
如今的应用程序拥有丰富的各种实时事件,能够为用户提供高度互动的体验。我们需要工具来正确处理这一问题,而反应式编程就是一个答案。
用反应式编程思考,并举例
让我们深入了解真实情况。这是一个真实的例子,有一个关于如何用RP思考的分步指南。没有合成的例子,没有半点解释的概念。在本教程结束时,我们将生成真正的功能代码,同时了解我们为什么要做每件事。
我选择JavaScript和RxJS作为工具,原因是:JavaScript是目前最熟悉的语言,Rx*库家族广泛用于多种语言和平台(.Net、Java、Scala、Clojure、JavaScript、Ruby、Python、C++、Swift、Groovy等)。因此,无论您的工具是什么,您都可以通过学习本教程来具体受益。
实现一个“推荐关注”的窗口
在推特中有一个建议你关注某些账号的窗口
我们将重点模仿其核心特征,即:
- 启动时,从API加载帐户数据并显示3个建议
- 单击“刷新”后,将3个其他帐户建议加载到3行中
- 单击帐户行上的“x”按钮,仅清除该当前帐户并显示另一个帐户
- 每行显示帐户的头像和指向其页面的链接
我们可以忽略其他功能和按钮,因为它们是次要的。此外,Twitter最近向未经授权的公众关闭了其API,让我们为Github上的关注者构建UI,而不是Twitter。有一个用于获取用户的Github API。
此操作的完整代码位于http://jsfiddle.net/staltz/8jFJH/48/。
请求和响应
你如何使用Rx处理这个问题?首先,(几乎)一切都可以是一条流。这就是Rx咒语。让我们从最简单的特性开始:“启动时,从API加载3个帐户数据”。这里没有什么特别的,这只是关于(1)执行请求,(2)获得响应,(3)呈现响应。因此,让我们继续将我们的请求表示为流。起初,这会让人觉得有些过火,但我们需要从基础开始,对吗?
启动时,我们只需要执行一个请求,所以如果我们将其建模为数据流,那么它将是一个只有一个发出值的流。稍后,我们知道会有许多请求发生,但目前,这只是一个请求。
1 | --a------|-> |
这是我们想要请求的URL流。无论何时发生请求事件,它都会告诉我们两件事:“When”和“What”。请求被执行时的“When”就是事件发出的时间,请求的内容“What”就是发出的值:一个包含URL的字符串。
在Rx*中,创建单个值的流非常简单。流的官方术语是“Observable”,因为它可以被观察到,但我发现它是一个愚蠢的名称,所以我称它为流。
1 | var requestStream = Rx.Observable.just('https://api.github.com/users'); |
但现在,这只是一个字符串流,不执行其他操作,所以我们需要在发出该值时以某种方式发生一些事情。这是通过订阅流来完成的。
1 | requestStream.subscribe(function(requestUrl) { |
请注意,我们正在使用jQuery Ajax回调(我们假设您已经知道了)来处理请求操作的异步性。但请稍等,Rx用于处理异步数据流。该请求的响应不能是包含将来某个时间到达的数据的流吗?好吧,在概念层面上,它看起来确实很像,所以让我们试试看。
1 | requestStream.subscribe(function(requestUrl) { |
可以看到jQuery.getJSON使用Ajax回调能够同时链接.done、.fail、.always,因为它返回的是XMLHTTPRequest的超集,这可能是JavaScript的特性之一,有待研究
Rx.Observable.create()所做的是通过显式地通知每个观察者Observer(或者换句话说,“订阅者Subscriber”)数据事件(onNext()
)或错误(onError()
)来创建自己的自定义流。我们所做的只是包装了jQuery Ajax Promise。
请等一下,这是否意味着一个Promise就是一个Observable?(Promise可以简单理解为JavaScript的异步代码)
是的!
Observable是“Promise++”,在Rx里面你可以使用var stream = Rx.Observable.fromPromise(promise)
来简单地将一个Promise转换为Observable,唯一的区别是Observables不符合Promises/A+,但在概念上没有冲突。一个Promise就是一个只有单个值的Observable。Rx流允许许多返回值,比Promise更加强大。
这真的是非常好,目前我们展示了Observables至少和Promises一样强大。所以如果你是Promises的忠实粉丝,请接着看Observable能做到的更多的事。
现在让我们回到刚才举的例子,你可能很快就注意到,我们将一个subscribe()
内嵌进了另一个subscribe()
方法之中,看起来好像要进入一个“回调地狱”,并且,responseStream
的创建依赖于requestStream
。就像你之前听到的,在Rx中有更简单的机制来从一个流中转换的创建新的流,所以我们应该更改一下方式。
你目前应该知道的一个基础方法是map(f)
,它获取流A的每个值,对其应用f()
,并在流B上生成一个值。如果我们对请求和响应流这样做,我们可以将请求URL映射到响应Promises(伪装为流)。
1 | var responseMetastream = requestStream |
然后我们将创造出一种叫做“Metastream”的怪兽:包含一个(或多个)流的流。先不要惊慌,MetaStream是一个流,其中每个发出的值都是另一个流。你可以把它当成指针:MetaStream每一个发出的值都是指向另一个流的指针。在我们举的例子中,每个requestURL都映射到一个指向包含相应响应的Promise流的指针。
用来响应的MetaStream看起来令人困惑,似乎对我们毫无帮助。我们只需要一个简单的响应流,其中每个发出的值都是一个JSON对象,而不是JSON对象的“Promise”。我们可以使用Flatmap
:它可以将MetaStream“扁平化”,通过在“主干”流上发射将在“分支”流上发出的所有内容。Flatmap
不是一个“修复程序”,MetaStream也不是一个bug,它们确实是处理Rx中异步响应的工具。
很好,因为响应流是根据请求流定义的,如果我们稍后在请求流上发生更多事件,我们将在响应流上发生相应的响应事件,正如预期的那样:
1 | requestStream: --a-----b--c------------|-> |
现在我们终于有了响应流,我们可以呈现我们接收到的数据:
1 | responseStream.subscribe(function(response) { |
将目前为止所有的代码加入进来,我们有:
1 | var requestStream = Rx.Observable.just('https://api.github.com/users'); |
刷新按钮
我还没有提到响应中的JSON是一个包含100个用户的列表。API只允许我们指定页面偏移量,而不允许指定页面大小,因此我们只使用了3个数据对象,浪费了97个其他数据对象。我们现在可以忽略这个问题,因为稍后我们将看到如何缓存响应。
每次单击刷新按钮时,请求流都应该发出一个新的URL,以便我们可以获得新的响应。我们需要两件事:刷新按钮上的点击事件流(记住:任何东西都可以是流),我们需要更改依赖新点击流的请求流。令人欣慰的是,RxJS附带了一些工具,可以从事件侦听器生成Observable。
1 | var refreshButton = document.querySelector('.refresh'); |
由于刷新点击事件本身并不携带任何API URL,因此我们需要将每次点击映射到一个实际的URL。现在,我们将请求流更改为每次使用随机偏移参数映射到API端点的刷新点击流。
1 | var requestStream = refreshClickStream |
因为我很笨,而且我没有自动化测试,所以我刚刚破坏了我们以前构建的一个特性。请求在启动时不再发生,只有在单击刷新时才会发生。啊。我需要两种行为:当点击刷新时发送请求或网页刚刚打开时也发送请求。
我们知道如何为每种情况创建单独的流:
1 | var requestOnRefreshStream = refreshClickStream |
但我们如何才能将这两者“合并”为一呢?嗯,我们有merge()
方法。用图表进行解释:
1 | stream A: ---a--------e-----o-----> |
所以我们现在就可以这样做:
1 | var requestOnRefreshStream = refreshClickStream |
有一种替代的、更干净的写作方式,没有中间流。
1 | var requestStream = refreshClickStream |
甚至能更短,更可读:
1 | var requestStream = refreshClickStream |
startWith()
方法所做的事就是你所想的那件事。无论输入流看起来如何,加上startWith(x)
的输出流的开头都会有x。但我还不够DRY,我在重复API端点字符串。解决此问题的一种方法是将startWith()
移到refreshClickStream
附近,实质上“模拟”启动时的刷新点击。
1 | var requestStream = refreshClickStream.startWith('startup click') |
如果回到我“破坏了自动化测试”的地方,您应该会看到,与最后一种方法的唯一区别是我添加了startWith()
。
将三条推荐关注转为流
到目前为止,我们仅在渲染阶段,也就是响应流的subscribe()
方法中涉及到了UI元素。现在,使用刷新按钮,我们遇到了一个问题:只要您单击“刷新”,当前的3个建议就不会被清除。只有在收到响应后才会有新的建议,但为了使UI看起来更好,我们需要在刷新时单击时清除当前的建议。
1 | refreshClickStream.subscribe(function() { |
不,先别这样,伙计!这样做是不好的,因为我们现在有两个影响DOM元素的subscriber(另一个是responseStream.subscribe()
),这听起来并不Separation of concerns。还记得反应式编程的咒语吗?
因此,让我们将建议建模为流,其中每个发出的值都是包含建议数据的JSON对象。
我们将分别为3个建议中的每个建议执行此操作。suggestion#1的流程如下:
1 | var suggestion1Stream = responseStream |
其他的suggition2Stream
和suggition3Stream
可以简单地从suggition1Stream
复制粘贴。这虽然并不DRY,但它将使我们的示例在本教程中保持简单,另外,我认为在这种情况下,思考如何避免重复是一个很好的练习。
我们不是在responseStream的subscribe()
中进行渲染,而是在这里进行:
1 | suggestion1Stream.subscribe(function(suggestion) { |
回到“刷新时,清除建议”,我们可以简单地将刷新点击映射到空建议数据,并将其包含在suggetion1Stream
中,如下所示:
1 | var suggestion1Stream = responseStream |
在呈现时,我们将null解释为“无数据”,从而隐藏其UI元素。
1 | suggestion1Stream.subscribe(function(suggestion) { |
我们的“大蓝图”就是这样:
1 | refreshClickStream: ----------o--------o----> |
其中的N
指的是null
另外,我们还可以在启动时提供“空”建议。这是通过将startWith(null)
添加到建议流来实现的:
1 | var suggestion1Stream = responseStream |
最后都放到我们的蓝图里:
1 | refreshClickStream: ----------o---------o----> |
关闭建议并使用缓存的响应
还有一个功能需要实现。每个建议都应该有自己的“x”按钮,用于关闭它,并在其位置加载另一个。乍一看,您可以说,当单击任何关闭按钮时,就可以发出新的请求:
1 | var close1Button = document.querySelector('.close1'); |
这不起作用。它将关闭并重新加载所有建议,而不仅仅是我们单击的建议。有两种不同的方法可以解决这个问题,为了保持它的趣味性,我们将通过重用以前的响应来解决它。API的响应页面大小为100个用户,而我们只使用其中的3个,因此有大量新数据可用。没有必要要求更多。
同样,让我们把所有对象都放在“流”里。当发生“close1”点击事件时,我们希望使用responseStream
上最近发出的响应,从响应列表中获取一个随机用户。像这样:
1 | requestStream: --r---------------> |
在Rx*中有一个名为combinelatest的组合函数,它似乎可以满足我们的需要。它将两个流A和B作为输入,每当其中一个流发出一个值时,combineLatest
将从两个流中联接最近发出的两个值A和B,并输出一个值c=f(x,y)
,其中f是您定义的函数。用图表来表示就是:
1 | stream A: --a-----------e--------i--------> |
我们可以在close1ClickStream
和responseStream
上应用combineLatest()
,这样每当单击close 1按钮时,我们都会得到发出的最新响应,并在suggestion1Stream
上生成一个新值。另一方面,combineLatest()
是对称的:每当在responseStream上发出新响应时,它将与最新的“close 1”单击相结合以生成新建议。这很有趣,因为它允许我们简化之前的suggestion1Stream
代码,如下所示:
1 | var suggestion1Stream = close1ClickStream |
拼图中仍缺少一块。combineLatest()
使用两个源中最新的一个,但如果其中一个源尚未发出任何内容,则combineLatest()
无法在输出流上生成数据事件。如果您查看上面的ASCII图,您将看到当第一个流发出值a时,输出没有任何内容。只有当第二个流发出了值b时,它才会产生输出值。
有不同的解决方法,我们将继续使用最简单的方法,即启动时模拟单击“关闭1”按钮的操作:
1 | var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this |
总结
终于完成了,全部代码如下所示:
1 | var refreshButton = document.querySelector('.refresh'); |
You can see this working example at http://jsfiddle.net/staltz/8jFJH/48/
这段代码很小但很密集:它的特点是通过适当分离关注点来管理多个事件,甚至缓存响应。函数样式使代码看起来更具声明性,而不是命令性:我们并不是给出一系列要执行的指令,我们只是通过定义流之间的关系来告诉它是什么。例如,对于Rx,我们告诉计算机,suggestion1Stream
是与来自最新响应的一个用户组合的“close 1”流,除了在刷新或程序启动时为空之外。
还请注意,令人印象深刻的是,这里没有if
、for
、while
等控制流元素,以及您期望从JavaScript应用程序获得的典型的基于回调的控制流。如果需要,甚至可以通过使用filter()
来除去上面subscribe()
中的if
和else
(我将把实现细节留给您作为练习)。在Rx中,我们有流函数,如map
、filter
、scan
、merge
、combineLatest
、startWith
等,用于控制事件驱动程序的流。这个函数工具集以更少的代码为您提供了更强大的功能。
接下来
如果您认为Rx*将是您首选的反应式编程库,请花点时间熟悉用于转换、组合和创建Observables的大量函数。如果您想了解流图中的那些函数,请查看Creating Observables · ReactiveX/RxJava。每当您在尝试做某事时遇到困难,请绘制这些图表,仔细思考,查看长长的函数列表,并进行更多思考。根据我的经验,此工作流是有效的。
一旦你开始掌握Rx*编程的窍门,就绝对需要理解cold-vs-hot-observables的概念。如果你忽视这一点,它会回来残忍地咬你。你已经被警告过了。通过学习真正的函数式编程,并熟悉影响Rx*的副作用等问题,进一步提高您的技能。
但反应式编程不仅仅是Rx*。Bacon.js 使用起来很直观,没有您在Rx*中有时遇到的怪癖。Elm语言属于自己的类别:它是一种函数式反应式编程语言,可编译为JavaScript+HTML+CSS,并具有穿越时间的调试器。非常棒。
Rx非常适合event巨多的前端和应用程序。但这不仅仅是客户端的事情,它在后端和靠近数据库的地方也能很好地工作。事实上,RxJava是Netflix API中实现服务器端并发的关键组件。Rx不是一个局限于特定类型应用程序或语言的框架。在编写任何事件驱动软件时,它确实是一个可以使用的范例。