您的位置:新葡亰496net > 新葡亰官网 > 新葡亰496net:服务端主动通报Web前端的一些研究

新葡亰496net:服务端主动通报Web前端的一些研究

发布时间:2019-06-17 08:30编辑:新葡亰官网浏览(147)

    复杂单页应用的数据层设计

    2017/01/11 · JavaScript · 单页应用

    原文出处: 徐飞   

    很多人看到这个标题的时候,会产生一些怀疑:

    什么是“数据层”?前端需要数据层吗?

    可以说,绝大部分场景下,前端是不需要数据层的,如果业务场景出现了一些特殊的需求,尤其是为了无刷新,很可能会催生这方面的需要。

    我们来看几个场景,再结合场景所产生的一些诉求,探讨可行的实现方式。

    单页应用的一个特点就是即时响应,对发生变化数据实现 UI 的快速变更。实现的基础技术不外乎 AJAX 和 WebSocket,前者负责数据的获取和更新,后者负责变更数据的客户端同步。其中要解决的最主要的问题还是数据同步。

    RxJS字面意思就是:JavaScript的响应式扩展(Reactive Extensions for JavaScript)。

    随着物联网的发展促进传统行业不断转型,在设备间通信的业务场景越来越多。其中很大一部分在于移动端和设备或服务端与设备的通信,例如已成主流的共享单车。但存在一个这样小问题,当指令下发完毕之后,设备不会同步返回指令执行是否成功,而是异步通知或是服务端去主动查询设备指令是否发送成功,这样一来客户端也无法同步获取指令执行情况,只能通过服务端异步通知来接收该状态了。这也就引出了这篇博客想要探索的一项技术:如何实现服务端主动通知前端? 其实,这样的业务场景还有很多,但这样的解决方案却不是非常成熟,方案概括过来就两个大类。1.前端定时请求轮询 2.前端和服务端保持长连接,以持续进行数据交互,这个可以包括较为成熟的WebSocket。我们可以看看张小龙在知乎问题 如何在大型 Web 应用中保持数据的同步更新? 的回答,更加清楚的认识这个过程。

    为什么要使用 RxJS

    RxJS 是一套处理异步编程的 API,那么我将从异步讲起。

    前端编程中的异步有:事件(event)、AJAX、动画(animation)、定时器(timer)。

    视图间的数据共享

    所谓共享,指的是:

    同一份数据被多处视图使用,并且要保持一定程度的同步。

    如果一个业务场景中,不存在视图之间的数据复用,可以考虑使用端到端组件。

    什么是端到端组件呢?

    我们看一个示例,在很多地方都会碰到选择城市、地区的组件。这个组件对外的接口其实很简单,就是选中的项。但这时候我们会有一个问题:

    这个组件需要的省市区域数据,是由这个组件自己去查询,还是使用这个组件的业务去查好了传给这个组件?

    两者当然是各有利弊的,前一种,它把查询逻辑封装在自己内部,对使用者更加有利,调用方只需这么写:

    XHTML

    <RegionSelector selected=“callback(region)”></RegionSelector>

    1
    <RegionSelector selected=“callback(region)”></RegionSelector>

    外部只需实现一个响应取值事件的东西就可以了,用起来非常简便。这样的一个组件,就被称为端到端组件,因为它独自打通了从视图到后端的整个通道。

    这么看来,端到端组件非常美好,因为它对使用者太便利了,我们简直应当拥抱它,放弃其他所有。

    端到端组件示意图:

    A | B | C --------- Server

    1
    2
    3
    A | B | C
    ---------
    Server

    可惜并非如此,选择哪种组件实现方式,是要看业务场景的。如果在一个高度集成的视图中,刚才这个组件同时出现了多次,就有些尴尬了。

    尴尬的地方在哪里呢?首先是同样的查询请求被触发了多次,造成了冗余请求,因为这些组件互相不知道对方的存在,当然有几个就会查几份数据。这其实是个小事,但如果同时还存在修改这些数据的组件,就麻烦了。

    比如说:在选择某个实体的时候,发现之前漏了配置,于是点击“立刻配置”,新增了一条,然后回来继续原流程。

    例如,买东西填地址的时候,发现想要的地址不在列表中,于是点击弹出新增,在不打断原流程的情况下,插入了新数据,并且可以选择。

    这个地方的麻烦之处在于:

    组件A的多个实例都是纯查询的,查询的是ModelA这样的数据,而组件B对ModelA作修改,它当然可以把自己的那块界面更新到最新数据,但是这么多A的实例怎么办,它们里面都是老数据,谁来更新它们,怎么更新?

    这个问题为什么很值得说呢,因为如果没有一个良好的数据层抽象,你要做这个事情,一个业务上的选择和会有两个技术上的选择:

    • 引导用户自己刷新界面
    • 在新增完成的地方,写死一段逻辑,往查询组件中加数据
    • 发一个自定义业务事件,让查询组件自己响应这个事件,更新数据

    这三者都有缺点:

    • 引导用户刷新界面这个,在技术上是比较偷懒的,可能体验未必好。
    • 写死逻辑这个,倒置了依赖顺序,导致代码产生了反向耦合,以后再来几个要更新的地方,这里代码改得会很痛苦,而且,我一个配置的地方,为什么要管你后续增加的那些查询界面?
    • 自定义业务事件这个,耦合是减少了,却让查询组件自己的逻辑膨胀了不少,如果要监听多种消息,并且合并数据,可能这里更复杂,能否有一种比较简化的方式?

    所以,从这个角度看,我们需要一层东西,垫在整个组件层下方,这一层需要能够把查询和更新做好抽象,并且让视图组件使用起来尽可能简单。

    另外,如果多个视图组件之间的数据存在时序关系,不提取出来整体作控制的话,也很难去维护这样的代码。

    添加了数据层之后的整体关系如图:

    A | B | C ------------ 前端的数据层 ------------ Server

    1
    2
    3
    4
    5
    A | B | C
    ------------
    前端的数据层
    ------------
      Server

    那么,视图访问数据层的接口会是什么样?

    我们考虑耦合的问题。如果要减少耦合,很必然的就是这么一种形式:

    • 变更的数据产生某种消息
    • 使用者订阅这个消息,做一些后续处理

    因此,数据层应当尽可能对外提供类似订阅方式的接口。

    可以把这个问题拆分为两个具体问题:

    RxJS是一个利用可观察(observable)序列和LINQ查询操作符来处理异步以及基于事件程序的一个库。通过RxJS, 开发人员用Observables来表示 异步数据流,用LINQ运算符查询 异步数据流,并使用Schedulers参数化 异步数据流中的并发。简而言之,Rx = Observables LINQ Schedulers。

    这个问题在10年前已经被解决过无数次了,最简单的例子就是网页聊天室。题主的需求稍微复杂些,需要支持的数据格式更多,然而只要定义好了通讯规范,多出来的也只是搬砖的活儿了。整个过程可以分为5个环节:1 包装数据、2 触发通知、3 通讯传输、4 解析数据、5 渲染数据。这5个环节中有三点很关键:1 通讯通道选择、2 数据格式定义、3 渲染数据。

    1 通讯通道选择:这个很多前端高手已经回答了,基本就是两种方式:轮询和长连接,这种情况通常的解决方式是长连接,Web端可以用WebSocket来解决,这也是业界普遍采用的方案,比如环信、用友有信、融云等等。通讯环节是相当耗费服务器资源的一个环节,而且开发成本偏高,建议将这些第三方的平台直接集成到自己的项目中,以降低开发的成本。

    2 数据格式定义:数据格式可以定义得五花八门,不过为了前端的解析,建议外层统一数据格式,定义一个类似type的属性来标记数据属性(是IM消息、微博数据还是发货通知),然后定义一个data属性来记录数据的内容(一般对应数据表中的一行数据)。统一数据格式后,前端解析数据的成本会大大降低。

    3 渲染数据渲染数据是关系到前端架构的,比如是React、Vue还是Angular(BTW:不要用Angular,个人认为Angular在走向灭亡)。这些框架都用到了数据绑定,这已经成为业界的共识了(只需要对数据进行操作,不需要操作DOM),这点不再论述。在此种需求场景下,数据流会是一个比较大的问题,因为可能每一条新数据都需要寻找对应的组件去传递数据,这个过程会特别恶心。所以选择单一树的数据流应该会很合适,这样只需要对一棵树的节点进行操作即可:定义好type和树节点的对应关系,然后直接定位到对应的节点对数据增删改就可以,例如Redux。

    以上三点是最核心的环节,涉及到前后端的数据传输、前端数据渲染,其他的内容就比较简单了,也简单说下。

    后端:包装数据、触发通知这个对后端来说就很Easy了,建一个队列池,不断的往池子里丢任务,让池子去触发通知。

    前端:解析数据解析数据就是多出来的搬砖的活儿,过滤type、取data。技术难度并不大,主要点还是在于如何能低开发成本、低维护成本地达到目的,上面是一种比较综合的低成本的解决方案。

    异步常见的问题

    • 回调地狱(Callback Hell)
    • 竞态条件(Race Condition)
    • 内存泄漏(Memory Leak)
    • 管理复杂状态(Manage Complex States)
    • 错误处理(Exception Handling)

    回调地狱就是指层层嵌套的回调函数,造成代码难以理解,并且难以协调组织复杂的操作。

    竞态条件出现的原因是无法保证异步操作的完成会和他们开始时的顺序一样,因此最终结果不可控。比如常见的 AutoComplete 效果,每次输入后向后端发送请求获取结果展示在搜索框下面,由于网络、后端数据查询等原因有可能出现最后发送的请求比之前的请求更快地完成了,这时最终展现的并不是最后那个请求的结果,而这并不是我们所希望的。

    这里说的内存泄漏指的是单页应用切换页面时由于忘记在合适的时机移除监听事件造成的内存泄漏。

    异步带来了状态的改变,可能会使状态管理变得非常复杂,尤其是某个状态有多个来源时,比如有些应用,一开始有一个默认值,再通过 AJAX 获取初始状态,存储在 localStorage,之后通过 WebSocket 获取更新。这时查询状态可能是同步或者异步的,状态的变更可能是主动获取也可能是被动推送的,如果还有各种排序、筛选,状态管理将会更加复杂。

    JavaScript 中的 try/catch 只能捕获同步的错误,异步的错误不易处理。

    服务端推送

    如果要引入服务端推送,怎么调整?

    考虑一个典型场景,WebIM,如果要在浏览器中实现这么一个东西,通常会引入WebSocket作更新的推送。

    对于一个聊天窗口而言,它的数据有几个来源:

    • 初始查询
    • 本机发起的更新(发送一条聊天数据)
    • 其他人发起的更新,由WebSocket推送过来
    视图展示的数据 := 初始查询的数据   本机发起的更新   推送的更新
    
    <table>
    <colgroup>
    <col style="width: 50%" />
    <col style="width: 50%" />
    </colgroup>
    <tbody>
    <tr class="odd">
    <td><div class="crayon-nums-content" style="font-size: 13px !important; line-height: 15px !important;">
    <div class="crayon-num" data-line="crayon-5b8f4b62cb7b7061328078-1">
    1
    </div>
    </div></td>
    <td><div class="crayon-pre" style="font-size: 13px !important; line-height: 15px !important; -moz-tab-size:4; -o-tab-size:4; -webkit-tab-size:4; tab-size:4;">
    <div id="crayon-5b8f4b62cb7b7061328078-1" class="crayon-line">
    视图展示的数据 := 初始查询的数据   本机发起的更新   推送的更新
    </div>
    </div></td>
    </tr>
    </tbody>
    </table>
    

    这里,至少有两种编程方式。

    查询数据的时候,我们使用类似Promise的方式:

    JavaScript

    getListData().then(data => { // 处理数据 })

    1
    2
    3
    getListData().then(data => {
      // 处理数据
    })

    而响应WebSocket的时候,用类似事件响应的方式:

    JavaScript

    ws.on(‘data’, data => { // 处理数据 })

    1
    2
    3
    ws.on(‘data’, data => {
      // 处理数据
    })

    这意味着,如果没有比较好的统一,视图组件里至少需要通过这两种方式来处理数据,添加到列表中。

    如果这个场景再跟上一节提到的多视图共享结合起来,就更复杂了,可能很多视图里都要同时写这两种处理。

    所以,从这个角度看,我们需要有一层东西,能够把拉取和推送统一封装起来,屏蔽它们的差异。

    数据共享:多个视图引用的数据能在发生变化后,即时响应变化。

    无论你在用 Node.js编写一个web端应用还是服务端应用,你都必须经常处理异步和基于事件的编程。Web应用程序和Node.js应用程序都会碰到I / O操作和计算耗时的任务,这些任务可能需要很长时间才能完成,并可能会阻塞主线程。而且,处理异常,取消和同步也很麻烦,并且容易出错。

    对于对实时性要求较高的业务场景,轮询显然是无法满足需求的,而长连接的缺点在于长期占了服务端的连接资源,当前端用户数量指数增长到一定数量时,服务端的分布式须另辟蹊径来处理WebSocket的连接匹配问题。它的优点也很明显,对于传输内容不大的情况下,有非常快的交互速度,因为他不是基于HTTP请求的,而是浏览器端扩展的Socket通信。

    Promise

    使用 Promise 可以减轻一些异步问题,如将回调函数变为串行的链式调用,统一同步和异步代码等,async/await 中也可以使用 try/catch 来捕获错误。但是对于复杂的场景,仍然难于处理。而且 Promise 还有其他的问题,一是只有一个结果,二是不可以取消。

    缓存的使用

    如果说我们的业务里,有一些数据是通过WebSocket把更新都同步过来,这些数据在前端就始终是可信的,在后续使用的时候,可以作一些复用。

    比如说:

    在一个项目中,项目所有成员都已经查询过,数据全在本地,而且变更有WebSocket推送来保证。这时候如果要新建一条任务,想要从项目成员中指派任务的执行人员,可以不必再发起查询,而是直接用之前的数据,这样选择界面就可以更流畅地出现。

    这时候,从视图角度看,它需要解决一个问题:

    • 如果要获取的数据未有缓存,它需要产生一个请求,这个调用过程就是异步的
    • 如果要获取的数据已有缓存,它可以直接从缓存中返回,这个调用过程就是同步的

    如果我们有一个数据层,我们至少期望它能够把同步和异步的差异屏蔽掉,否则要使用两种代码来调用。通常,我们是使用Promise来做这种差异封装的:

    JavaScript

    function getDataP() : Promise<T> { if (data) { return Promise.resolve(data) } else { return fetch(url) } }

    1
    2
    3
    4
    5
    6
    7
    function getDataP() : Promise<T> {
      if (data) {
        return Promise.resolve(data)
      } else {
        return fetch(url)
      }
    }

    这样,使用者可以用相同的编程方式去获取数据,无需关心内部的差异。

    数据同步:多终端访问的数据能在一个客户端发生变化后,即时响应变化。

    使用RxJS,你可以用Observer 对象来表示多个异步数据流 (那些来自多个数据源的,比如,股票报价,微博,计算机事件, 网络服务请求,等等。),还可以用Observer 对象订阅事件流。无论事件何时触发,Observable 对象都会通知订阅它的 Observer对象。

    <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency>
    
    @Configuration@EnableWebSocketMessageBrokerpublic class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { // 添加服务端点,可以理解为某一服务的唯一key值 stompEndpointRegistry.addEndpoint("/chatApp"); //当浏览器支持sockjs时执行该配置 stompEndpointRegistry.addEndpoint("/chatApp").setAllowedOrigins.withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry config) { // 配置接受订阅消息地址前缀为topic的消息 config.enableSimpleBroker; // Broker接收消息地址前缀 config.setApplicationDestinationPrefixes; }}
    
     @Autowired private SimpMessagingTemplate template; //接收客户端"/app/chat"的消息,并发送给所有订阅了"/topic/messages"的用户 @MessageMapping @SendTo("/topic/messages") public OutputMessage receiveAndSend(InputMessage inputMessage) throws Exception { System.out.println("get message ("   inputMessage.getText from client!"); System.out.println("send messages to all subscribers!"); String time = new SimpleDateFormat.format(new Date; return new OutputMessage(inputMessage.getFrom(), inputMessage.getText; } //或者直接从服务端发送消息给指定客户端 @MessageMapping("/chat_user") public void sendToSpecifiedUser(@Payload InputMessage inputMessage, SimpMessageHeaderAccessor headerAccessor) throws Exception { System.out.println("get message from client ("   inputMessage.getFrom; System.out.println("send messages to the specified subscriber!"); String time = new SimpleDateFormat.format(new Date; this.template.convertAndSend("/topic/"   inputMessage.getFrom(), new OutputMessage(inputMessage.getFrom(), inputMessage.getText; }
    
    <!DOCTYPE html><!DOCTYPE html><html> <head> <title>Chat WebSocket</title> <script src="http://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script> <script src="js/stomp.js"></script> <script type="text/javascript"> var apiUrlPre = "http://10.200.0.126:9041/discovery"; var stompClient = null; function setConnected(connected) { document.getElementById('connect').disabled = connected; document.getElementById('disconnect').disabled = !connected; document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden'; document.getElementById('response').innerHTML = ''; } function connect() { var socket = new SockJS('http://localhost:9041/discovery/chatApp'); var from = document.getElementById.value; stompClient = Stomp.over; stompClient.connect({}, function { setConnected; console.log('Connected: '   frame); //stompClient.subscribe('/topic/'   from, function(messageOutput) { stompClient.subscribe('/topic/messages', function(messageOutput) { // alert(messageOutput.body); showMessageOutput(JSON.parse(messageOutput.body)); }); }); } function disconnect() { if(stompClient != null) { stompClient.disconnect(); } setConnected; console.log("Disconnected"); } function sendMessage() { var from = document.getElementById.value; var text = document.getElementById.value; //stompClient.send("/app/chat_user", {}, stompClient.send("/app/chat", {}, JSON.stringify({ 'from': from, 'text': text }) ); } function showMessageOutput(messageOutput) { var response = document.getElementById('response'); var p = document.createElement; p.style.wordWrap = 'break-word'; p.appendChild(document.createTextNode(messageOutput.from   ": "   messageOutput.text   " ("   messageOutput.time   ")")); response.appendChild; } </script> </head> <body onload="disconnect()"> <div> <div> <input type="text" placeholder="Choose a nickname" /> </div> <br /> <div> <button onclick="connect();">Connect</button> <button disabled="disabled" onclick="disconnect();"> Disconnect </button> </div> <br /> <div > <input type="text" placeholder="Write a message..." /> <button onclick="sendMessage();">Send</button> <p ></p> </div> </div> </body></html>
    

    异步 API:

    异步编程时不仅要面对这些问题,还有下面这些使用方式各异的 API:

    • DOM Events
    • XMLHttpRequest
    • fetch
    • WebSocket
    • Service Worker
    • setTimeout
    • setInterval
    • requestAnimationFrame

    而如果使用 RxJS,可以用统一的 API 来进行处理,而且借助 RxJS 各种强大的操作符,我们可以更简单地实现我们的需求。

    数据的聚合

    很多时候,视图上需要的数据与数据库存储的形态并不完全相同,在数据库中,我们总是倾向于储存更原子化的数据,并且建立一些关联,这样,从这种数据想要变成视图需要的格式,免不了需要一些聚合过程。

    通常我们指的聚合有这么几种:

    • 在服务端先聚合数据,然后再把这些数据与视图模板聚合,形成HTML,整体输出,这个过程也称为服务端渲染
    • 在服务端只聚合数据,然后把这些数据返回到前端,再生成界面
    • 服务端只提供原子化的数据接口,前端根据自己的需要,请求若干个接口获得数据,聚合成视图需要的格式,再生成界面

    大部分传统应用在服务端聚合数据,通过数据库的关联,直接查询出聚合数据,或者在Web服务接口的地方,聚合多个底层服务接口。

    我们需要考虑自己应用的特点来决定前端数据层的设计方案。有的情况下,后端返回细粒度的接口会比聚合更合适,因为有的场景下,我们需要细粒度的数据更新,前端需要知道数据之间的变更联动关系。

    所以,很多场景下,我们可以考虑在后端用GraphQL之类的方式来聚合数据,或者在前端用类似Linq的方式聚合数据。但是,注意到如果这种聚合关系要跟WebSocket推送产生关联,就会比较复杂。

    我们拿一个场景来看,假设有一个界面,长得像新浪微博的Feed流。对于一条Feed而言,它可能来自几个实体:

    Feed消息本身

    JavaScript

    class Feed { content: string creator: UserId tags: TagId[] }

    1
    2
    3
    4
    5
    class Feed {
      content: string
      creator: UserId
      tags: TagId[]
    }

    Feed被打的标签

    JavaScript

    class Tag { id: TagId content: string }

    1
    2
    3
    4
    class Tag {
      id: TagId
      content: string
    }

    人员

    JavaScript

    class User { id: UserId name: string avatar: string }

    1
    2
    3
    4
    5
    class User {
      id: UserId
      name: string
      avatar: string
    }

    如果我们的需求跟微博一样,肯定还是会选择第一种聚合方式,也就是服务端渲染。但是,如果我们的业务场景中,存在大量的细粒度更新,就比较有意思了。

    比如说,如果我们修改一个标签的名称,就要把关联的Feed上的标签也刷新,如果之前我们把数据聚合成了这样:

    JavaScript

    class ComposedFeed { content: string creator: User tags: Tag[] }

    1
    2
    3
    4
    5
    class ComposedFeed {
      content: string
      creator: User
      tags: Tag[]
    }

    就会导致无法反向查找聚合后的结果,从中筛选出需要更新的东西。如果我们能够保存这个变更路径,就比较方便了。所以,在存在大量细粒度更新的情况下,服务端API零散化,前端负责聚合数据就比较合适了。

    当然这样会带来一个问题,那就是请求数量增加很多。对此,我们可以变通一下:

    做物理聚合,不做逻辑聚合。

    这段话怎么理解呢?

    我们仍然可以在一个接口中一次获取所需的各种数据,只是这种数据格式可能是:

    JavaScript

    { feed: Feed tags: Tags[] user: User }

    1
    2
    3
    4
    5
    {
      feed: Feed
      tags: Tags[]
      user: User
    }

    不做深度聚合,只是简单地包装一下。

    在这个场景中,我们对数据层的诉求是:建立数据之间的关联关系。

    发布订阅模式

    因为可观察序列是数据流,你可以用Observable的扩展方法实现的标准查询运算符来查询它们。从而,你可以使用这些标准查询运算符轻松筛选,投影(project),聚合,撰写和执行基于时间轴(time-based)的多个事件的操作。此外,还有一些其他反应流特定的操作符允许强大的查询写入。 通过使用Rx提供的扩展方法,还可以正常处理取消,异常和同步。

    新葡亰496net 1send to all subscribers新葡亰496net 2send to the specified subscriber

    认识 RxJS

    综合场景

    以上,我们述及四种典型的对前端数据层有诉求的场景,如果存在更复杂的情况,兼有这些情况,又当如何?

    Teambition的场景正是这么一种情况,它的产品特点如下:

    • 大部分交互都以对话框的形式展现,在视图的不同位置,存在大量的共享数据,以任务信息为例,一条任务数据对应渲染的视图可能会有20个这样的数量级。
    • 全业务都存在WebSocket推送,把相关用户(比如处于同一项目中)的一切变更都发送到前端,并实时展示
    • 很强调无刷新,提供一种类似桌面软件的交互体验

    比如说:

    当一条任务变更的时候,无论你处于视图的什么状态,需要把这20种可能的地方去做同步。

    当任务的标签变更的时候,需要把标签信息也查找出来,进行实时变更。

    甚至:

    • 如果某个用户更改了自己的头像,而他的头像被到处使用了?
    • 如果当前用户被移除了与所操作对象的关联关系,导致权限变更,按钮禁用状态改变了?
    • 如果别人修改了当前用户的身份,在管理员和普通成员之间作了变化,视图怎么自动变化?

    当然这些问题都是可以从产品角度权衡的,但是本文主要考虑的还是如果产品角度不放弃对某些极致体验的追求,从技术角度如何更容易地去做。

    我们来分析一下整个业务场景:

    • 存在全业务的细粒度变更推送 => 需要在前端聚合数据
    • 前端聚合 => 数据的组合链路长
    • 视图大量共享数据 => 数据变更的分发路径多

    这就是我们得到的一个大致认识。

    在旧的项目中是使用了发布订阅模式解决这些问题。不管是 AJAX 请求的返回数据还是 WebSocket 的推送数据,统一向全局发布消息,每个需要这些数据的视图去订阅对应的消息使视图变化。

    RxJS可与诸如数组,集合和映射之类的同步数据流以及诸如Promises之类的单值异步计算进行互补和顺畅的互操作,如下图所示:

    这是spring-boot接入WebSocket最简单的方法了,很直观的表现了socket在浏览器段通信的便利,但根据不同的业务场景,对该技术的使用还需要斟酌,例如如何使WebSocket在分布式服务端保持服务,如何在连接上集群后下发消息找到长连接的服务端机器。我也在为这个问题苦苦思考,思路虽有,实践起来却举步维艰,特别是网上谈到比较多的将连接序列化到缓存中,统一管理读取分配,分享几个好思路,也希望自己能给找到较好的方案再分享一篇博客。来自Push notifications with websockets in a distributed Node.js app

    什么是 RxJS

    我们都知道 JS 是什么,那么什么是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的简称,指的是实践响应式编程的一套工具,Rx 官网首页的介绍是一套通过可监听流来做异步编程的 API(An API for asynchronous programming with observable streams)。

    Rx 最早是由微软开发的 LinQ 扩展出来的开源项目,之后由开源社区维护,有多种语言的实现,如 Java 的 RxJava,Python 的 RxPY 等,而 RxJS 就是 Rx 的 JavaScript 语言实现。

    技术诉求

    以上,我们介绍了业务场景,分析了技术特点。假设我们要为这么一种复杂场景设计数据层,它要提供怎样的接口,才能让视图使用起来简便呢?

    从视图角度出发,我们有这样的诉求:

    • 类似订阅的使用方式(只被上层依赖,无反向链路)。这个来源于多视图对同一业务数据的共享,如果不是类似订阅的方式,职责就反转了,对维护不利
    • 查询和推送的统一。这个来源于WebSocket的使用。
    • 同步与异步的统一。这个来源于缓存的使用。
    • 灵活的可组合性。这个来源于细粒度数据的前端聚合。

    根据这些,我们可用的技术选型是什么呢?

    缺点是:一个视图为了响应变化需要写很多订阅并更新视图数据的硬编码,涉及数据越多,逻辑也越复杂。

    单返回值 多返回值
    Pull/Synchronous/Interactive Object Iterables (Array / Set / Map / Object)
    Push/Asynchronous/Reactive Promise Observable
    1. Configure Nginx to send websocket requests from each browser to all the server in the cluster. I could not figure out how to do it. Load balancing does not support broadcasting.
    2. Store websocket connections in the databse, so that all servers had access to it. I am not sure how to serialize the websocket connection object to store it in MongoDB.
    3. Set up a communication mechanism among the servers in the cluster (some kind message bus) and whenever event happens, have all the servers notify the websocket clients they are tracking. This somewhat complicates the system and requires the nodes to know the addresses of each other. Which package is most suitable for such a solution?再分享几个讨论:springsession如何对spring的WebSocketSession进行分布式配置?websocket多台服务器之间怎么共享websocketSession?

    RxJS 的两种编程思想

    RxJS 引入了两种重要的编程思想:函数式编程和响应式编程。

    函数式编程(Functional Programming,简称 FP)是一种编程范式,强调使用函数来思考问题、编写代码。

    In computer science, functional programming is a programming paradigm—a style of building the structure and elements of computer programs—that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data.

    函数式编程的主要设计点在于避免使用状态和可变的数据,即 stateless and immutable。

    函数式编程对函数的使用有一些特殊要求:

    • 声明式(Declarative)
    • 纯函数(Pure Function)
    • 数据不可变性(Immutability)

    声明式的函数,让开发者只需要表达”想要做什么”,而不需要表达“怎么去做”。

    纯函数指的是执行结果由输入参数决定,参数相同时结果相同,不受其他数据影响,并且不会带来副作用(Side Effect)的函数。副作用指的是函数做了和本身运算返回值没有关系的事情,如修改外部变量或传入的参数对象,甚至是执行 console.log 都算是 Side Effect。前端中常见的副作用有发送 http 请求、操作 DOM、调用 alert 或者 confirm 函数等。满足纯函数的特性也叫做引用透明度(Referential Transparency)。

    数据不可变就是指这个数据一旦产生,它的值就永远不会变。JavaScript 中字符串类型和数字类型就是不可改变的,而对象基本都是可变的,可能会带来各种副作用。现在有各种库可以实现 Immutable 特性,如 immutable.js 和 immer.js

    中文维基上说响应式编程(Reactive Programming)是一种面向数据流(stream)和变化传播的编程范式。个人的理解是对数据流进行编程的一种编程范式,使用各种函数创建、组合、过滤数据流,然后通过监听这个数据流来响应它的变化。响应式编程抽象出了流这个概念,提高了代码的抽象级别,我们不用去关心大量的实现细节,而专注于对数据流的操作。

    响应式流可以认为是随着时间发出的一系列元素。响应式和观察者模式有点相似,订阅者订阅后,发布者吐出数据时,订阅者会响应式进行处理。实际上Rx 组合了观察者模式(Observer pattern )、迭代器模式(Iterator pattern)和函数式编程。

    RxJS 是上面两种编程思想的结合,但是对于它是不是函数响应式编程(FRP)有比较大的争议,因为它虽然既是函数式又是响应式但是不符合早期 FRP 的定义。

    主流框架对数据层的考虑

    一直以来,前端框架的侧重点都是视图部分,因为这块是普适性很强的,但在数据层方面,一般都没有很深入的探索。

    • React, Vue 两者主要侧重数据和视图的同步,生态体系中有一些库会在数据逻辑部分做一些事情
    • Angular,看似有Service这类可以封装数据逻辑的东西,实际上远远不够,有形无实,在Service内部必须自行做一些事情
    • Backbone,做了一些业务模型实体和关联关系的抽象,更早的ExtJS也做了一些事情

    综合以上,我们可以发现,几乎所有现存方案都是不完整的,要么只做实体和关系的抽象,要么只做数据变化的封装,而我们需要的是实体的关系定义和数据变更链路的封装,所以需要自行作一些定制。

    那么,我们有怎样的技术选型呢?

    数据流

    推送模式 vs 拉取模式

    在交互式编程中,应用程序为了获取更多信息会主动遍历一个数据源,通过检索一个代表数据源的序列。这种行为就像是JavaScript数组,对象,集合,映射等的迭代器模式。在交互式编程中,必须通过数组中的索引或通过ES6 iterators来获取下一项。

    在拉取模式中,应用程序在数据检索过程中处于活动状态: 它通过自己主动调用next来决定检索的速度。 此枚举模式是同步的,这意味着在轮询数据源时可能会阻止您的应用程序的主线程。 这种拉取模式好比是你在图书馆翻阅一本书。 你阅读完成这本书后,你才能去读另一本。

    另一方面在响应式编程中,应用程序通过订阅数据流获得更多的信息 (在RxJS中称为可观测序列),数据源的任何更新都传递给可观测序列。这种模式下应用是被动接收数据:除了订阅可观察的来源,并不会主动查询来源,而只是对推送给它的数据作出反应。事件完成后,信息来源将向用户发送通知。这样,您的应用程序将不会被等待源更新阻止。

    这是RxJS采用的推送模式。 这好比是加入一个图书俱乐部,在这个图书俱乐部中你注册了某个特定类型的兴趣组,而符合你兴趣的书籍在发布时会自动发送给你。 而不需要排队去搜索获取你想要的书籍。 在重UI应用中,使用推送数据模式尤其有用,在程序等待某些事件时,UI线程不会被阻塞,这使得在具有异步要求的JavaScript运行环境中非常重要。 总之,利用RxJS,可使应用程序更具响应性。

    Observable / Observer的可观察模式就是Rx实现的推送模型。 Observable对象会自动通知所有观察者状态变化。 请使用Observablesubscribe方法来订阅,subscribe方法需要Observer对象并返回Disposable对象。 这使您能够跟踪您的订阅,并能够处理订阅。 您可以将可观察序列(如一系列的鼠标悬停事件)视为普通的集合。 RxJS对可观察序列的内置实现的查询,允许开发人员在基于推送序列(如事件,回调,Promise,HTML5地理定位API等等)上组合复杂的事件处理。有关这两个接口的更多信息,请参阅探索 RxJS的主要概念。

    WebSocket Support

    RxJS 的特点

    • 数据流抽象了很多现实问题
    • 擅长处理异步问题
    • 把复杂问题分解为简单问题的组合

    前端中的 DOM 事件、WebSocket 推送消息、AJAX 请求资源、动画都可以看作是数据流。

    RxJS 对数据采用“推”的方式,当一个数据产生时,会将其推送给对应的处理函数,这个处理函数不用关心数据时同步产生还是异步产生的,因此处理异步将会变得非常简单。

    RxJS 中很多操作符,每个操作符都提供了一个小功能,学习 RxJS 最重要的就是学习如何组合操作符来解决复杂问题。

    RxJS

    遍观流行的辅助库,我们会发现,基于数据流的一些方案会对我们有较大帮助,比如RxJS,xstream等,它们的特点刚好满足了我们的需求。

    以下是这类库的特点,刚好是迎合我们之前的诉求。

    • Observable,基于订阅模式
    • 类似Promise对同步和异步的统一
    • 查询和推送可统一为数据管道
    • 容易组合的数据管道
    • 形拉实推,兼顾编写的便利性和执行的高效性
    • 懒执行,不被订阅的数据流不执行

    这些基于数据流理念的库,提供了较高层次的抽象,比如下面这段代码:

    JavaScript

    function getDataO(): Observable<T> { if (cache) { return Observable.of(cache) } else { return Observable.fromPromise(fetch(url)) } } getDataO().subscribe(data => { // 处理数据 })

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    function getDataO(): Observable<T> {
      if (cache) {
        return Observable.of(cache)
      }
      else {
        return Observable.fromPromise(fetch(url))
      }
    }
     
    getDataO().subscribe(data => {
      // 处理数据
    })

    这段代码实际上抽象程度很高,它至少包含了这么一些含义:

    • 统一了同步与异步,兼容有无缓存的情况
    • 统一了首次查询与后续推送的响应,可以把getDataO方法内部这个Observable也缓存起来,然后把推送信息合并进去

    我们再看另外一段代码:

    JavaScript

    const permission$: Observable<boolean> = Observable .combineLatest(task$, user$) .map(data => { let [task, user] = data return user.isAdmin || task.creatorId === user.id })

    1
    2
    3
    4
    5
    6
    const permission$: Observable<boolean> = Observable
      .combineLatest(task$, user$)
      .map(data => {
        let [task, user] = data
        return user.isAdmin || task.creatorId === user.id
      })

    这段代码的意思是,根据当前的任务和用户,计算是否拥有这条任务的操作权限,这段代码其实也包含了很多含义:

    首先,它把两个数据流task$和user$合并,并且计算得出了另外一个表示当前权限状态的数据流permission$。像RxJS这类数据流库,提供了非常多的操作符,可用于非常简便地按照需求把不同的数据流合并起来。

    我们这里展示的是把两个对等的数据流合并,实际上,还可以进一步细化,比如说,这里的user$,我们如果再追踪它的来源,可以这么看待:

    某用户的数据流user$ := 对该用户的查询 后续对该用户的变更(包括从本机发起的,还有其他地方更改的推送)

    如果说,这其中每个因子都是一个数据流,它们的叠加关系就不是对等的,而是这么一种东西:

    • 每当有主动查询,就会重置整个user$流,恢复一次初始状态
    • user$等于初始状态叠加后续变更,注意这是一个reduce操作,也就是把后续的变更往初始状态上合并,然后得到下一个状态

    这样,这个user$数据流才是“始终反映某用户当前状态”的数据流,我们也就因此可以用它与其它流组合,参与后续运算。

    这么一段代码,其实就足以覆盖如下需求:

    • 任务本身变化了(执行者、参与者改变,导致当前用户权限不同)
    • 当前用户自身的权限改变了

    这两者导致后续操作权限的变化,都能实时根据需要计算出来。

    其次,这是一个形拉实推的关系。这是什么意思呢,通俗地说,如果存在如下关系:

    JavaScript

    c = a b // 不管a还是b发生更新,c都不动,等到c被使用的时候,才去重新根据a和b的当前值计算

    1
    c = a b     // 不管a还是b发生更新,c都不动,等到c被使用的时候,才去重新根据a和b的当前值计算

    如果我们站在对c消费的角度,写出这么一个表达式,这就是一个拉取关系,每次获取c的时候,我们重新根据a和b当前的值来计算结果。

    而如果站在a和b的角度,我们会写出这两个表达式:

    JavaScript

    c = a1 b // a1是当a变更之后的新值 c = a b1 // b1是当b变更之后的新值

    1
    2
    c = a1 b     // a1是当a变更之后的新值
    c = a b1    // b1是当b变更之后的新值

    这是一个推送关系,每当有a或者b的变更时,主动重算并设置c的新值。

    如果我们是c的消费者,显然拉取的表达式写起来更简洁,尤其是当表达式更复杂时,比如:

    JavaScript

    e = (a b ) * c - d

    1
    e = (a b ) * c - d

    如果用推的方式写,要写4个表达式。

    所以,我们写订阅表达式的时候,显然是从使用者的角度去编写,采用拉取的方式更直观,但通常这种方式的执行效率都较低,每次拉取,无论结果是否变更,都要重算整个表达式,而推送的方式是比较高效精确的。

    但是刚才RxJS的这种表达式,让我们写出了形似拉取,实际以推送执行的表达式,达到了编写直观、执行高效的结果。

    看刚才这个表达式,大致可以看出:

    permission$ := task$ user$

    这么一个关系,而其中每个东西的变更,都是通过订阅机制精确发送的。

    有些视图库中,也会在这方面作一些优化,比如说,一个计算属性(computed property),是用拉的思路写代码,但可能会被框架分析依赖关系,在内部反转为推的模式,从而优化执行效率。

    此外,这种数据流还有其它魔力,那就是懒执行。

    什么是懒执行呢?考虑如下代码:

    JavaScript

    const a$: Subject<number> = new Subject<number>() const b$: Subject<number> = new Subject<number>() const c$: Observable<number> = Observable.combineLatest(a$, b$) .map(arr => { let [a, b] = arr return a b }) const d$: Observable<number> = c$.map(num => { console.log('here') return num 1 }) c$.subscribe(data => console.log(`c: ${data}`)) a$.next(2) b$.next(3) setTimeout(() => { a$.next(4) }, 1000)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    const a$: Subject<number> = new Subject<number>()
    const b$: Subject<number> = new Subject<number>()
     
    const c$: Observable<number> = Observable.combineLatest(a$, b$)
      .map(arr => {
        let [a, b] = arr
        return a b
      })
     
    const d$: Observable<number> = c$.map(num => {
      console.log('here')
      return num 1
    })
     
    c$.subscribe(data => console.log(`c: ${data}`))
     
    a$.next(2)
    b$.next(3)
     
    setTimeout(() => {
      a$.next(4)
    }, 1000)

    注意这里的d$,如果a$或者b$中产生变更,它里面那个here会被打印出来吗?大家可以运行一下这段代码,并没有。为什么呢?

    因为在RxJS中,只有被订阅的数据流才会执行。

    主题所限,本文不深究内部细节,只想探讨一下这个特点对我们业务场景的意义。

    想象一下最初我们想要解决的问题,是同一份数据被若干个视图使用,而视图侧的变化是我们不可预期的,可能在某个时刻,只有这些订阅者的一个子集存在,其它推送分支如果也执行,就是一种浪费,RxJS的这个特性刚好能让我们只精确执行向确实存在的视图的数据流推送。

    对于 Vue,首先它是一个 MVVM 框架。

    RxJS 入门

    RxJS与其它方案的对比

    Model <----> ViewModel <----> View

    RxJS 使用

    RxJS 仓库现在移到了 ReactiveX 组织下,最新的大版本为 6,与之前的版本相比有许多破坏性变更,请注意。

    RxJS 的 import 路径有以下 5 种:

    1. 创建 Observable 的方法、types、schedulers 和一些工具方法

      import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';

    2. 操作符 operators

      import { map, filter, scan } from 'rxjs/operators';

    3. webSocket

      import { webSocket } from 'rxjs/webSocket';

    4. ajax

      import { ajax } from 'rxjs/ajax';

    5. 测试

      import { TestScheduler } from 'rxjs/testing';

    本文所有 demo 均在 v6.2.1 中测试过

    1. 与watch机制的对比

    不少视图层方案,比如Angular和Vue中,存在watch这么一种机制。在很多场景下,watch是一种很便捷的操作,比如说,想要在某个对象属性变更的时候,执行某些操作,就可以使用它,大致代码如下:

    JavaScript

    watch(‘a.b’, newVal => { // 处理新数据 })

    1
    2
    3
    watch(‘a.b’, newVal => {
      // 处理新数据
    })

    这类监控机制,其内部实现无非几种,比如自定义了setter,拦截数据的赋值,或者通过对比新旧数据的脏检查方式,或者通过类似Proxy的机制代理了数据的变化过程。

    从这些机制,我们可以得到一些推论,比如说,它在对大数组或者复杂对象作监控的时候,监控效率都会降低。

    有时候,我们也会有监控多个数据,以合成另外一个的需求,比如:

    一条用于展示的任务数据 := 这条任务的原始数据 任务上的标签信息 任务的执行者信息

    如果不以数据流的方式编写,这地方就需要为每个变量单独编写表达式或者批量监控多个变量,前者面临的问题是代码冗余,跟前面我们提到的推数据的方式类似;后者面临的问题就比较有意思了。

    监控的方式会比计算属性强一些,原因在于计算属性处理不了异步的数据变更,而监控可以。但如果监控条件进一步复杂化,比如说,要监控的数据之间存在竞争关系等等,都不是容易表达出来的。

    另外一个问题是,watch不适合做长链路的变更,比如:

    JavaScript

    c := a b d := c 1 e := a * c f := d * e

    1
    2
    3
    4
    c := a b
    d := c 1
    e := a * c
    f := d * e

    这种类型,如果要用监控表达式写,会非常啰嗦。

    一目了然的关系,Model 的变化影响到 ViewModel 的变化再触发 View 更新。那么反过来呢,View 更改 ViewModel 再更改 Model?

    一个简单的例子

    import { fromEvent } from 'rxjs';
    import { take } from 'rxjs/operators';
    
    const eleBtn = document.querySelector('#btn')
    const click$ = fromEvent(eleBtn, 'click')
    
    click$.pipe(take(1))
      .subscribe(e => {
        console.log('只可点击一次')
        eleBtn.setAttribute('disabled', '')
      })
    

    这里演示了 RxJS 的大概用法,通过 fromEvent 将点击事件转换为 RxJS 的 Observable (响应式数据流),take(1) 表示只操作一次,观察者通过订阅(subscribe)来响应变化。具体 API 的使用会在后面讲到。

    演示地址

    代表流的变量用 $ 符号结尾,是 RxJS 中的一种惯例。

    2. 跟Redux的对比

    Rx和Redux其实没有什么关系。在表达数据变更的时候,从逻辑上讲,这两种技术是等价的,一种方式能表达出的东西,另外一种也都能够。

    比如说,同样是表达数据a到b这么一个转换,两者所关注的点可能是不一样的:

    • Redux:定义一个action叫做AtoB,在其实现中,把a转换成b
    • Rx:定义两个数据流A和B,B是从A经过一次map转换得到的,map的表达式是把a转成b

    由于Redux更多地是一种理念,它的库功能并不复杂,而Rx是一种强大的库,所以两者直接对比并不合适,比如说,可以用Rx依照Redux的理念作实现,但反之不行。

    在数据变更的链路较长时,Rx是具有很大优势的,它可以很简便地做多级状态变更的连接,也可以做数据变更链路的复用(比如存在a -> b -> c,又存在a -> b -> d,可以把a -> b这个过程拿出来复用),还天生能处理好包括竞态在内的各种异步的情况,Redux可能要借助saga等理念才能更好地组织代码。

    我们之前有些demo代码也提到了,比如说:

    用户信息数据流 := 用户信息的查询 用户信息的更新

    1
    用户信息数据流 := 用户信息的查询 用户信息的更新

    这段东西就是按照reducer的理念去写的,跟Redux类似,我们把变更操作放到一个数据流中,然后用它去累积在初始状态上,就能得到始终反映某个实体当前状态的数据流。

    在Redux方案中,中间件是一种比较好的东西,能够对业务产生一定的约束,如果我们用RxJS实现,可以把变更过程中间接入一个统一的数据流来完成同样的事情。

    对于更新数据而言,更改 ViewModel 真是多此一举了。因为我们只需要改变 Model 数据自然就会按照Model > ViewModel > View的路径同步过来了。这也就是为什么 Vue 后来抛弃了双向绑定,而仅仅支持表单组件的双向绑定。对于双向绑定而言,表单算得上是最佳实践场景了。

    RxJS 要点

    RxJS 有一个核心和三个重点,一个核心是 Observable 再加上相关的 Operators,三个重点分别是 Observer、Subject、Schedulers。

    具体方案

    以上我们谈了以RxJS为代表的数据流库的这么多好处,彷佛有了它,就像有了民主,人民就自动吃饱穿暖,物质文化生活就自动丰富了,其实不然。任何一个框架和库,它都不是来直接解决我们的业务问题的,而是来增强某方面的能力的,它刚好可以为我们所用,作为整套解决方案的一部分。

    至此,我们的数据层方案还缺失什么东西吗?

    考虑如下场景:

    某个任务的一条子任务产生了变更,我们会让哪条数据流产生变更推送?

    分析子任务的数据流,可以大致得出它的来源:

    subtask$ = subtaskQuery$ subtaskUpdate$

    看这句伪代码,加上我们之前的解释(这是一个reduce操作),我们得到的结论是,这条任务对应的subtask$数据流会产生变更推送,让视图作后续更新。

    仅仅这样就可以了吗?并没有这么简单。

    从视图角度看,我们还存在这样的对子任务的使用:那就是任务的详情界面。但这个界面订阅的是这条子任务的所属任务数据流,在其中任务数据包含的子任务列表中,含有这条子任务。所以,它订阅的并不是subtask$,而是task$。这么一来,我们必须使task$也产生更新,以此推动任务详情界面的刷新。

    那么,怎么做到在subtask的数据流变更的时候,也推动所属task的数据流变更呢?这个事情并非RxJS本身能做的,也不是它应该做的。我们之前用RxJS来封装的部分,都只是数据的变更链条,记得之前我们是怎么描述数据层解决方案的吗?

    实体的关系定义和数据变更链路的封装

    我们前面关注的都是后面一半,前面这一半,还完全没做呢!

    实体的变更关系如何做呢,办法其实很多,可以用类似Backbone的Model和Collection那样做,也可以用更加专业的方案,引入一个ORM机制来做。这里面的实现就不细说了,那是个相对成熟的领域,而且说起来篇幅太大,有疑问的可以自行了解。

    需要注意的是,我们在这个里面需要考虑好与缓存的结合,前端的缓存很简单,基本就是一种精简的k-v数据库,在做它的存储的时候,需要做到两件事:

    • 以集合形式获取的数据,需要拆分放入缓存,比如Task[],应当以每个Task的TaskId为索引,分别单独存储
    • 有时候后端返回的数据可能是不完整的,或者格式有差异,需要在储存之间作正规化(normalize)

    总结以上,我们的思路是:

    • 缓存 => 基于内存的微型k-v数据库
    • 关联变更 => 使用ORM的方式抽象业务实体和变更关系
    • 细粒度推送 => 某个实体的查询与变更先合并为数据流
    • 从实体的变更关系,引出数据流,并且所属实体的流
    • 业务上层使用这些原始数据流以组装后续变更

    在开发实践中,最常见的还是单向数据流。

    什么是 Observable

    个人认为在文档中说的 Observable 更确切的说法是 Observable Stream,也就是 Rx 的响应式数据流。

    在 RxJS 中 Observable 是可被观察者,观察者则是 Observer,它们通过 Observable 的 subscribe 方法进行关联。

    前面提到了 RxJS 结合了观察者模式和迭代器模式。

    对于观察者模式,我们其实比较熟悉了,比如各种 DOM 事件的监听,也是观察者模式的一种实践。核心就是发布者发布事件,观察者选择时机去订阅(subscribe)事件。

    在 ES6 中,Array、String 等可遍历的数据结构原生部署了迭代器(Iterator )接口。

    const numbers = [1, 2, 3]
    const iterator = numbers[Symbol.iterator]()
    iterator.next() // {value: 1, done: false}
    iterator.next() // {value: 2, done: false}
    iterator.next() // {value: 3, done: false}
    iterator.next() // {value: undefined, done: true}
    

    观察者模式和迭代器模式的相同之处是两者都是渐进式使用数据的,只不过从数据使用者的角度来说,观察者模式数据是推送(push)过来的,而迭代器模式是自己去拉取(pull)的。Rx 中的数据是 Observable 推送的,观察者不需要主动去拉取。

    Observable 与 Array 相当类似,都可以看作是 Collection,只不过 Observable 是 a collection of items over time,是随时间发出的一序列元素,所以下面我们会看到 Observable 的一些操作符与 Array 的方法极其相似。

    更深入的探索

    如果说我们针对这样的复杂场景,实现了这么一套复杂的数据层方案,还可以有什么有意思的事情做呢?

    这里我开几个脑洞:

    • 用Worker隔离计算逻辑
    • 用ServiceWorker实现本地共享
    • 与本地持久缓存结合
    • 前后端状态共享
    • 可视化配置

    我们一个一个看,好玩的地方在哪里。

    第一个,之前提到,整个方案的核心是一种类似ORM的机制,外加各种数据流,这里面必然涉及数据的组合、计算之类,那么我们能否把它们隔离到渲染线程之外,让整个视图变得更流畅?

    第二个,很可能我们会碰到同时开多个浏览器选项卡的客户,但是每个选项卡展现的界面状态可能不同。正常情况下,我们的整个数据层会在每个选项卡中各存在一份,并且独立运行,但其实这是没有必要的,因为我们有订阅机制来保证可以扩散到每个视图。那么,是否可以用过ServiceWorker之类的东西,实现跨选项卡的数据层共享?这样就可以减少很多计算的负担。

    对这两条来说,让数据流跨越线程,可能会存在一些障碍待解决。

    第三个,我们之前提到的缓存,全部是在内存中,属于易失性缓存,只要用户关掉浏览器,就全部丢了,可能有的情况下,我们需要做持久缓存,比如把不太变动的东西,比如企业通讯录的人员名单存起来,这时候可以考虑在数据层中加一些异步的与本地存储通信的机制,不但可以存localStorage之类的key-value存储,还可以考虑存本地的关系型数据库。

    第四个,在业务和交互体验复杂到一定程度的时候,服务端未必还是无状态的,想要在两者之间做好状态共享,有一定的挑战。基于这么一套机制,可以考虑在前后端之间打通一个类似meteor的通道,实现状态共享。

    第五个,这个话题其实跟本文的业务场景无关,只是从第四个话题引发。很多时候我们期望能做到可视化配置业务系统,但一般最多也就做到配置视图,所以,要么做到的是一个配置运营页面的东西,要么是能生成一个脚手架,供后续开发使用,但是一旦开始写代码,就没法合并回来。究其原因,是因为配不出组件的数据源和业务逻辑,找不到合理的抽象机制。如果有第四条那么一种铺垫,也许是可以做得比较好的,用数据流作数据源,还是挺合适的,更何况,数据流的组合关系能够可视化描述啊。

    Model --> ViewModel --> View --> Model

    创建 Observable

    要创建一个 Observable,只要给 new Observable 传递一个接收 observer 参数的回调函数,在这个函数中去定义如何发送数据。

    import { Observable } from 'rxjs';
    
    const source$ = new Observable(observer => {
      observer.next(1)
      observer.next(2)
      observer.next(3)
    })
    
    const observer = {
      next : item => console.log(item)
    }
    
    console.log('start')
    source$.subscribe(observer)
    console.log('end')
    

    上面的代码通过 new Observable 创建了一个 Observable,调用它的 subscribe 方法进行订阅,执行结果为依次输出 'start',1,2,3,'end'。

    下面我们再看一个异步的例子:

    import { Observable } from 'rxjs';
    
    const source$ = new Observable(observer => {
      let number = 1
      setInterval(() => {
        observer.next(number  )
      }, 1000)
    })
    
    const observer = {
      next : item => console.log(item)
    }
    
    console.log('start')
    source$.subscribe(observer)
    console.log('end')
    

    先输出 ’start' 、'end',然后每隔 1000 ms 输出一个递增的数字。

    通过这两个小例子,我们知道 RxJS 既能处理同步的行为,也能处理异步的。

    独立数据层的优势

    回顾我们整个数据层方案,它的特点是很独立,从头到尾,做掉了很长的数据变更链路,也因此带来几个优势:

    单向数据流告诉我们这样两样事:

    观察者 Observer

    观察者 Observer 是一个有三个方法的对象:

    • next: 当 Observable 发出新的值时被调用,接收这个值作为参数
    • complete:当 Observable 完结,没有更多数据时被调用。complete 之后,next 方法无效
    • error:当 Observable 内部发生错误时被调用,之后不会调用 complete,next 方法无效

      const source$ = new Observable(observer => {
        observer.next(1)
        observer.next(2)
        observer.complete()
        observer.next(3)
      })
      
      const observer = {
        next: item => console.log(item),
        complete: () => console.log('complete')
      }
      
      source$.subscribe(observer)
      

    上面的代码会输出 1,2,'complete',而不会输出 3。

    const source$ = new Observable(observer => {
      try {
        observer.next(1)
        observer.next(2)
        throw new Error('there is an exception')
        observer.complete()
      } catch (e) {
        observer.error(e)
      }
    })
    
    const observer = {
      next: item => console.log(item),
      error: e => console.log(e),
      complete: () => console.log('complete')
    }
    
    source$.subscribe(observer)
    

    注意 error 之后不会再调用 complete。

    Observer 还有简单形式,即不用构建一个对象,而是直接把函数作为 subscribe 方法的参数。

    source$.subscribe(
      item => console.log(item),
      e => console.log(e),
      () => console.log('complete')
    )
    

    参数依次为 next 、error、complete,后面两个参数可以省略。

    1. 视图的极度轻量化。

    我们可以看到,如果视图所消费的数据都是来源于从核心模型延伸并组合而成的各种数据流,那视图层的职责就非常单一,无非就是根据订阅的数据渲染界面,所以这就使得整个视图层非常薄。而且,视图之间是不太需要打交道的,组件之间的通信很少,大家都会去跟数据层交互,这意味着几件事:

    • 视图的变更难度大幅降低了
    • 视图的框架迁移难度大幅降低了
    • 甚至同一个项目中,在必要的情况下,还可以混用若干种视图层方案(比如刚好需要某个组件)

    我们采用了一种相对中立的底层方案,以抵抗整个应用架构在前端领域日新月异的情况下的变更趋势。

    不直接绑定 Model,而是使用由 1~N 个 Model 聚合的 ViewModel。

    延迟执行(lazy evaluation)

    我们传给 new Observable 的回调函数如果没有订阅是不会执行的,订阅一个 Observable 就像是执行一个函数,和下面的函数类似。这和我们常见的那种内部保存有观察者列表的观察者模式是不同的,Observable 内部没有这个观察者列表。

    function subscribe (observer) {
      let number = 1
      setInterval(() => {
        observer.next(number  )
      }, 1000)
    }
    
    subscribe({
        next: item => console.log(item),
        error: e => console.log(e),
        complete: () => console.log('complete')
    })
    

    2. 增强了整个应用的可测试性。

    因为数据层的占比较高,并且相对集中,所以可以更容易对数据层做测试。此外,由于视图非常薄,甚至可以脱离视图打造这个应用的命令行版本,并且把这个版本与e2e测试合为一体,进行覆盖全业务的自动化测试。

    View 的变化永远去修改变更值对应的 Model。

    退订(unsubscribe)

    观察者想退订,只要调用订阅返回的对象的 unsubscribe 方法,这样观察者就再也不会接受到 Observable 的信息了。

    const source$ = new Observable(observer => {
      let number = 1
      setInterval(() => {
        observer.next(number  )
      }, 1000)
    })
    
    const observer = {
      next : item => console.log(item)
    }
    
    const subscription = source$.subscribe(observer)
    
    setTimeout(() => {
      subscription.unsubscribe()
    }, 5000)
    

    3. 跨端复用代码。

    以前我们经常会考虑做响应式布局,目的是能够减少开发的工作量,尽量让一份代码在PC端和移动端复用。但是现在,越来越少的人这么做,原因是这样并不一定降低开发的难度,而且对交互体验的设计是一个巨大考验。那么,我们能不能退而求其次,复用尽量多的数据和业务逻辑,而开发两套视图层?

    在这里,可能我们需要做一些取舍。

    回忆一下MVVM这个词,很多人对它的理解流于形式,最关键的点在于,M和VM的差异是什么?即使是多数MVVM库比如Vue的用户,也未必能说得出。

    在很多场景下,这两者并无明显分界,服务端返回的数据直接就适于在视图上用,很少需要加工。但是在我们这个方案中,还是比较明显的:

    > ------ Fetch -------------> | | View <-- VM <-- M <-- RESTful ^ | <-- WebSocket

    1
    2
    3
    4
    5
    > ------ Fetch ------------->
    |                           |
    View  <--  VM  <--  M  <--  RESTful
                        ^
                        |  <--  WebSocket

    这个简图大致描述了数据的流转关系。其中,M指代的是对原始数据的封装,而VM则侧重于面向视图的数据组合,把来自M的数据流进行组合。

    我们需要根据业务场景考虑:是要连VM一起跨端复用呢,还是只复用M?考虑清楚了这个问题之后,我们才能确定数据层的边界所在。

    除了在PC和移动版之间复用代码,我们还可以考虑拿这块代码去做服务端渲染,甚至构建到一些Native方案中,毕竟这块主要的代码也是纯逻辑。

    新葡亰496net 3

    操作符

    在 RxJS 中,操作符是用来处理数据流的。我们往往需要对数据流做一系列处理,才交给 Observer,这时一个操作符就像一个管道一样,数据进入管道,完成处理,流出管道。

    import { interval } from 'rxjs';
    import { map } from 'rxjs/operators'
    
    const source$ = interval(1000).pipe(
      map(x => x * x)
    )
    
    source$.subscribe(x => console.log(x))
    

    interval 操作符创造了一个数据流,interval(1000) 会产生一个每隔 1000 ms 就发出一个从 0 开始递增的数据。map 操作符和数组的 map 方法类似,可以对数据流进行处理。具体见演示地址。

    这个 map 和数组的 map 方法会产生新的数组类似,它会产生新的 Observable。每一个操作符都会产生一个新的 Observable,不会对上游的 Observable 做任何修改,这完全符合函数式编程“数据不可变”的要求。

    上面的 pipe 方法就是数据管道,会对数据流进行处理,上面的例子只有一个 map 操作符进行处理,可以添加更多的操作符作为参数。

    4. 可拆解的WebSocket补丁

    这个标题需要结合上面那个图来理解。我们怎么理解WebSocket在整个方案中的意义呢?其实可以整体视为整个通用数据层的补丁包,因此,我们就可以用这个理念来实现它,把所有对WebSocket的处理部分,都独立出去,如果需要,就异步加载到主应用来,如果在某些场景下,想把这块拿掉,只需不引用它就行了,一行配置解决它的有无问题。

    但是在具体实现的时候,需要注意:拆掉WebSocket之后的数据层,对应的缓存是不可信的,需要做相应考虑。

    Data Flow

    弹珠图

    弹珠图(Marble diagrams)就是用图例形象地表示 Observable 和各种操作符的一种方法。

    用 - 表示一小段时间,X 代表有错误发生, | 表示结束,() 表示同步发生。

    上面的例子可以如下表示:

    source: -----0-----1-----2-----3--...
            map(x => x * x)
    newest: -----0-----1-----4-----9--...
    

    具体关于弹珠图的使用可以查看这个网站。

    对技术选型的思考

    到目前为止,各种视图方案是逐渐趋同的,它们最核心的两个能力都是:

    • 组件化
    • MDV(模型驱动视图)

    缺少这两个特性的方案都很容易出局。

    我们会看到,不管哪种方案,都出现了针对视图之外部分的一些补充,整体称为某种“全家桶”。

    全家桶方案的出现是必然的,因为为了解决业务需要,必然会出现一些默认搭配,省去技术选型的烦恼。

    但是我们必须认识到,各种全家桶方案都是面向通用问题的,它能解决的都是很常见的问题,如果你的业务场景很与众不同,还坚持用默认的全家桶,就比较危险了。

    通常,这些全家桶方案的数据层部分都还比较薄弱,而有些特殊场景,其数据层复杂度远非这些方案所能解决,必须作一定程度的自主设计和修正,我工作十余年来,长期从事的都是复杂的toB场景,见过很多厚重的、集成度很高的产品,在这些产品中,前端数据和业务逻辑的占比较高,有的非常复杂,但视图部分也无非是组件化,一层套一层。

    所以,真正会产生大的差异的地方,往往不是在视图层,而是在水的下面。

    愿读者在处理这类复杂场景的时候,慎重考虑。有个简单的判断标准是:视图复用数据是否较多,整个产品是否很重视无刷新的交互体验。如果这两点都回答否,那放心用各种全家桶,基本不会有问题,否则就要三思了。

    必须注意到,本文所提及的技术方案,是针对特定业务场景的,所以未必具有普适性。有时候,很多问题也可以通过产品角度的权衡去避免,不过本文主要探讨的还是技术问题,期望能够在产品需求不让步的情况下,也能找到比较优雅、和谐的解决方案,在业务场景面前能攻能守,不至于进退失据。

    即使我们面对的业务场景没有这么复杂,使用类似RxJS的库,依照数据流的理念对业务模型做适度抽象,也是会有一些意义的,因为它可以用一条规则统一很多东西,比如同步和异步、过去和未来,并且提供了很多方便的时序操作。

    解决数据问题的答案已经呼之欲出了。

    创建 Observable

    创建 Observable 的这些方法就是用来创建 Observable 数据流的,注意和操作符不同,它们是从 rxjs 中导入的,而不是 rxjs/operators

    后记

    不久前,我写过一篇总结,内容跟本文有不少重合之处,但为什么还要写这篇呢?

    上一篇,讲问题的视角是从解决方案本身出发,阐述解决了哪些问题,但是对这些问题的来龙去脉讲得并不清晰。很多读者看完之后,仍然没有得到深刻认识。

    这一篇,我希望从场景出发,逐步展示整个方案的推导过程,每一步是怎样的,要如何去解决,整体又该怎么做,什么方案能解决什么问题,不能解决什么问题。

    上次我那篇讲述在Teambition工作经历的回答中,也有不少人产生了一些误解,并且有反复推荐某些全家桶方案,认为能够包打天下的。平心而论,我对方案和技术选型的认识还是比较慎重的,这类事情,事关技术方案的严谨性,关系到自身综合水准的鉴定,不得不一辩到底。当时关注八卦,看热闹的人太多,对于探讨技术本身倒没有展现足够的热情,个人认为比较可惜,还是希望大家能够多关注这样一种有特色的技术场景。因此,此文非写不可。

    如果有关注我比较久的,可能会发现之前写过不少关于视图层方案技术细节,或者组件化相关的主题,但从15年年中开始,个人的关注点逐步过渡到了数据层,主要是因为上层的东西,现在研究的人已经多起来了,不劳我多说,而各种复杂方案的数据层场景,还需要作更艰难的探索。可预见的几年内,我可能还会在这个领域作更多探索,前路漫漫,其修远兮。

    (整个这篇写起来还是比较顺利的,因为之前思路都是完整的。上周在北京闲逛一周,本来是比较随意交流的,鉴于有些公司的朋友发了比较正式的分享邮件,花了些时间写了幻灯片,在百度、去哪儿网、58到家等公司作了比较正式的分享,回来之后,花了一整天时间整理出了本文,与大家分享一下,欢迎探讨。)

    2 赞 4 收藏 评论

    新葡亰496net 4

    多个视图引用的数据在发生变化后,如何响应变化?

    of 方法

    之前我们写的这种形式:

    const source$ = new Observable(observer => {
      observer.next(1)
      observer.next(2)
      observer.next(3)
      observer.complete()
    })
    

    使用 of 方法将会非常简洁:

    import {of} from 'rxjs'
    const source$ = of(1, 2, 3)
    

    保证多个 View 绑定的 ViewModel 中共同数据来自同一个Model。

    from 方法

    上面的代码用 from 则是这样:

    import {from} from 'rxjs'
    const source$ = from([1, 2, 3])
    

    from 可以将可遍历的对象(iterable)转化为一个 Observable,字符串也部署有 iterator 接口,所以也支持。

    from 还可以根据 promise 创建一个 Observable。我们用 fetch 或者 axios 等类库发送的请求都是一个 promise 对象,我们可以使用 from 将其处理为一个 Observable 对象。

    新葡亰496net 5

    fromEvent 方法

    用 DOM 事件创建 Observable,第一个参数为 DOM 对象,第二个参数为事件名称。具体示例见前面 RxJS 入门章节的一个简单例子。

    多终端访问的数据在一个客户端发生变化后,如何响应变化?

    fromEventPattern 方法

    将添加事件处理器、删除事件处理器的 API 转化为 Observable。

    function addClickHandler (handler) {
      document.addEventListener('click', handler)
    }
    
    function removeClickHandler (handler) {
      document.removeEventListener('click', handler)
    }
    
    fromEventPattern(
      addClickHandler,
      removeClickHandler
    ).subscribe(x => console.log(x))
    

    也可以是我们自己实现的和事件类似,拥有注册监听和移除监听的 API。

    import { fromEventPattern } from 'rxjs'
    
    class EventEmitter {
      constructor () {
        this.handlers = {}
      }
      on (eventName, handler) {
        if (!this.handlers[eventName]) {
          this.handlers[eventName] = []
        }
        if(typeof handler === 'function') {
            this.handlers[eventName].push(handler)
        } else {
            throw new Error('handler 不是函数!!!')
        }
      }
      off (eventName, handler) {
        this.handlers[eventName].splice(this.handlers[eventName].indexOf(handler), 1)
      }
      emit (eventName, ...args) {
        this.handlers[eventName].forEach(handler => {
          handler(...args)
        })
      }
    }
    
    const event = new EventEmitter()
    
    const subscription = fromEventPattern(
      event.on.bind(event, 'say'), 
      event.off.bind(event, 'say')
    ).subscribe(x => console.log(x))
    
    let timer = (() => {
      let number = 1
      return setInterval(() => {
        if (number === 5) {
          clearInterval(timer)
          timer = null
        }
        event.emit('say', number  )
      }, 1000)
    })()
    
    setTimeout(() => {
      subscription.unsubscribe()
    }, 3000)
    

    演示地址

    首先多终端数据同步来源于 WebSocket 数据推送,要保证收到数据推送时去更改直接对应的 Model,而不是 ViewModel。

    interval、timer

    interval 和 JS 中的 setInterval 类似,参数为间隔时间,下面的代码每隔 1000 ms 会发出一个递增的整数。

    interval(1000).subscribe(console.log)
    // 0
    // 1
    // 2
    // ...
    

    timer 则可以接收两个参数,第一个参数为发出第一个值需要等待的时间,第二个参数为之后的间隔时间。第一个参数可以是数字,也可以是一个 Date 对象,第二个参数可省。

    新葡亰496net 6

    range

    操作符 of 产生较少的数据时可以直接写如 of(1, 2, 3),但是如果是 100 个呢?这时我们可以使用 range 操作符。

    range(1, 100) // 产生 1 到 100 的正整数
    

    Vue中的解决方案

    empty、throwError、never

    empty 是创建一个立即完结的 Observable,throwError 是创建一个抛出错误的 Observable,never 则是创建一个什么也不做的 Observable(不完结、不吐出数据、不抛出错误)。这三个操作符单独用时没有什么意义,主要用来与其他操作符进行组合。目前官方不推荐使用 empty 和 never 方法,而是推荐使用常量 EMPTY 和 NEVER(注意不是方法,已经是一个 Observable 对象了)。

    不只是要思想上解决问题,而且要代入到编程语言、框架等开发技术中实现。

    defer

    defer 创建的 Observable 只有在订阅时才会去创建我们真正想要操作的 Observable。defer 延迟了创建 Observable,而又有一个 Observable 方便我们去订阅,这样也就推迟了占用资源。

    defer(() => ajax(ajaxUrl))
    

    只有订阅了才会去发送 ajax 请求。

    Model的存放

    操作符

    操作符其实看作是处理数据流的管道,每个操作符实现了针对某个小的具体应用问题的功能,RxJS 编程最大的难点其实就是如何去组合这些操作符从而解决我们的问题。

    在 RxJS 中,有各种各样的操作符,有转化类、过滤类、合并类、多播类、错误处理类、辅助工具类等等。一般不需要自己去实现操作符,但是我们需要知道操作符是一个函数,实现的时候必须考虑以下功能:

    1. 返回一个全新的 Observable 对象
    2. 对上游和下游的订阅和退订处理
    3. 处理异常情况
    4. 及时释放资源

    Model 作为原始数据,即使用 AJAX GET 得到的数据,应该位于整个 Vue 项目结构的最上层。对于 Model 的存放位置,也有不同的选择。

    pipeable 操作符

    之前版本的 RxJS 各种操作符都挂载到了全局 Observable 对象上,可以这样链式调用:

    source$.filter(x => x % 2 === 0).map(x => x * 2)
    

    现在需要这样使用:

    import {filter, map} from 'rxjs/operators'
    
    source$.pipe(
      filter(x => x % 2 === 0),
      map(x => x * 2)
    )
    

    其实也很好理解,pipe 就是管道的意思,数据流通过操作符处理,流出然后交给下一个操作符。

    非共享Model

    几个类似数组方法的基础操作符

    map、filter 和数组的 map、filter 方法类似,scan 则是和 reduce 方法类似,mapTo 是将所有发出的数据映射到一个给定的值。

    import {mapTo} from 'rxjs/operators'
    
    fromEvent(document, 'click').pipe(
      mapTo('Hi')
    ).subscribe(x => console.log(x))
    

    每次点击页面时都会输出 Hi。

    不需要共享的 Model 可以放到视图组件的data中。但仍然避免 View 直接绑定 Model,即使该 View 的 ViewModel 不再需要额外的 Model 聚合。因为最终影响 View 呈现的不只是来自服务器的 Model 数据,还有视图状态ViewState。

    一些过滤的操作符

    • take 是从数据流中选取最先发出的若干数据
    • takeLast 是从数据流中选取最后发出的若干数据
    • takeUntil 是从数据流中选取直到发生某种情况前发出的若干数据
    • first 是获得满足判断条件的第一个数据
    • last 是获得满足判断条件的最后一个数据
    • skip 是从数据流中忽略最先发出的若干数据
    • skipLast 是从数据流中忽略最后发出的若干数据

      import { interval } from 'rxjs';
      import { take } from 'rxjs/operators';
      
      interval(1000).pipe(
        take(3)
      ).subscribe(
        x => console.log(x),
        null,
        () => console.log('complete')
      )
      // 0
      // 1
      // 2
      // 'complete'
      

    使用了 take(3),表示只取 3 个数据,Observable 就进入完结状态。

    import { interval, fromEvent } from 'rxjs'
    import { takeUntil } from 'rxjs/operators'
    
    interval(1000).pipe(
      takeUntil(fromEvent(document.querySelector('#btn'), 'click'))
    ).subscribe(
      x => { document.querySelector('#time').textContent = x   1 },
      null,
      () => console.log('complete')
    )
    

    这里有一个 interval 创建的数据流一直在发出数据,直到当用户点击按钮时停止计时,见演示。

    来个:chestnut::一个简单的列表组件,负责渲染展示数据和关键字过滤功能。输入的过滤关键字和列表数据都作为 data 存放。

    合并类操作符

    合并类操作符用来将多个数据流合并。

    1)concat、merge

    concat、merge 都是用来把多个 Observable 合并成一个,但是 concat 要等上一个 Observable 对象 complete 之后才会去订阅第二个 Observable 对象获取数据并把数据传给下游,而 merge 时同时处理多个 Observable。使用方式如下:

    import { interval } from 'rxjs'
    import { merge, take } from 'rxjs/operators'
    
    interval(500).pipe(
      take(3),
      merge(interval(300).pipe(take(6)))
    ).subscribe(x => console.log(x))
    

    可以点此去比对效果,concat 的结果应该比较好理解,merge 借助弹珠图也比较好理解,它是在时间上对数据进行了合并。

    source : ----0----1----2|
    source2: --0--1--2--3--4--5|
                merge()
    example: --0-01--21-3--(24)--5|
    

    merge 的逻辑类似 OR,经常用来多个按钮有部分相同行为时的处理。

    注意最新的官方文档和RxJS v5.x 到 6 的更新指南中指出不推荐使用 merge、concat、combineLatest、race、zip 这些操作符方法,而是推荐使用对应的静态方法。

    将上面的 merge 改成从 rxjs 中导入,使用方式变成了合并多个 Observable,而不是一个 Observable 与其他 Observable 合并。

    import { interval,merge } from 'rxjs'
    import { take } from 'rxjs/operators'
    
    merge(
      interval(500).pipe(take(3)),
      interval(300).pipe(take(6))
    ).subscribe(x => console.log(x))
    

    2)concatAll、mergeAll、switchAll

    用来将高阶的 Observable 对象压平成一阶的 Observable,和 loadash 中压平数组的 flatten 方法类似。concatAll 会对内部的 Observable 对象做 concat 操作,和 concat 操作符类似,如果前一个内部 Observable 没有完结,那么 concatAll 不会订阅下一个内部 Observable,mergeAll 则是同时处理。switchAll 比较特殊一些,它总是切换到最新的内部 Observable 对象获取数据。上游高阶 Observable 产生一个新的内部 Observable 时,switchAll 就会立即订阅最新的内部 Observable,退订之前的,这也就是 ‘switch’ 的含义。

    import { interval } from 'rxjs';
    import { map, switchAll, take } from 'rxjs/operators';
    
    interval(1500).pipe(
      take(2),
      map(x => interval(1000).pipe(
        map(y => x   ':'   y), 
        take(2))
      ),
      switchAll()
    ).subscribe(console.log)
    
    // 0:0
    // 1:0
    // 1:1
    

    内部第一个 Observable 对象的第二个数据还没来得及发出,第二个 Observable 对象就产生了。

    3)concatMap、mergeMap、switchMap

    从上面的例子我们也可以看到高阶 Observable 常常是由 map 操作符将每个数据映射为 Observable 产生的,而我们订阅的时候需要将其压平为一阶 Observable,而就是要先使用 map 操作符再使用 concatAll 或 mergeAll 或 switchAll 这些操作符中的一个。RxJS 中提供了对应的更简洁的 API。使用的效果可以用下面的公式表示:

    concatMap = map   concatAll
    mergeMap = map   mergeAll
    switchMap = map   switchAll
    

    4)zip、combineLatest、withLatestFrom

    zip 有拉链的意思,这个操作符和拉链的相似之处在于数据一定是一一对应的。

    import { interval } from 'rxjs';
    import { zip, take } from 'rxjs/operators';
    const source$ = interval(500).pipe(take(3))
    const newest$ = interval(300).pipe(take(6))
    
    source$.pipe(
      zip(newest$, (x, y) => x   y)
    ).subscribe(x => console.log(x))
    // 0
    // 2
    // 4
    

    zip 是内部的 Observable 都发出相同顺序的数据后才交给下游处理,最后一个参数是可选的 resultSelector 参数,这个函数用来处理操作符的结果。上面的示例运行过程如下:

    1. newest 发出第一个值 0,但这时 source 还没有发出第一个值,所以不执行 resultSelector 函数也不会像下游发出数据
    2. source 发出第一个值 0,此时 newest 之前已发出了第一个值 0,执行 resultSelector 函数得到结果 0,发出这个结果
    3. newest 发出第二个值 1,但这时 source 还没有发出第二个值,所以不执行 resultSelector 函数也不会像下游发出数据
    4. newest 发出第三个值 2,但这时 source 还没有发出第三个值,所以不执行 resultSelector 函数也不会像下游发出数据
    5. source 发出第二个值 1,此时 newest 之前已发出了第一个值 1,执行 resultSelector 函数得到结果 2,发出这个结果
    6. newest 发出第四个值 3,但这时 source 还没有发出第四个值,所以不执行 resultSelector 函数也不会像下游发出数据
    7. source 发出第三个值 2,此时 newest 之前已发出了第一个值 2,执行 resultSelector 函数得到结果 4,发出这个结果
    8. source 完结,不可能再有对应的数据了,整个 Observable 完结

    上面如果没有传递最后一个参数 resultSelector 函数,将会依次输出数组 [0, 0]、[1, 1]、[2, 2]。在更新指南中,官方指出不推荐使用 resultSelector 参数,将会在 v7 中移除。加上之前提到的推荐使用静态方法,这个示例应该改成这样:

    import { interval, zip } from 'rxjs';
    import { take, map } from 'rxjs/operators';
    
    const source$ = interval(500).pipe(take(3))
    const newest$ = interval(300).pipe(take(6))
    
    const add = (x, y) => x   y
    
    zip(source$, newest$).pipe(
      map(x => add(...x))
    ).subscribe(x => console.log(x))
    

    使用 zip 当有数据流吐出数据很快,而有数据流发出值很慢时,要小心数据积压的问题。这时快的数据流已经发出了很多数据,由于对应的数据还没发出,RxJS 只能保存数据,快的数据流不断地发出数据,积压的数据越来越多,消耗的内存也会越来越大。

    combineLatest 与 zip 不同,只要其他的 Observable 已经发出过值就行,顾名思义,就是与其他 Observable 最近发出的值结合。

    import { interval, combineLatest } from 'rxjs';
    import { take } from 'rxjs/operators';
    
    const source$ = interval(500).pipe(take(3))
    const newest$ = interval(300).pipe(take(6))
    
    combineLatest(source$, newest$).subscribe(x => console.log(x))
    // [0, 0]
    // [0, 1]
    // [0, 2]
    // [1, 2]
    // [1, 3]
    // [2, 3]
    // [2, 4]
    // [2, 5]
    

    withLatestFrom 没有静态方法,只有操作符方法,前面的方法所有 Observable 地位是平等的,而这个方法是使用这个操作符的 Observable 起到了主导作用,即只有它发出值才会进行合并产生数据发出给下游。

    import { interval } from 'rxjs';
    import { take, withLatestFrom } from 'rxjs/operators';
    
    const source$ = interval(500).pipe(take(3))
    const newest$ = interval(300).pipe(take(6))
    
    source$.pipe(
      withLatestFrom(newest$)
    ).subscribe(x => console.log(x))
    // [0, 0]
    // [1, 2]
    // [2, 4]
    
    1. source 发出 0 时,newest 最新发出的值为 0,结合为 [0, 0] 发出
    2. source 发出 1,此时 newest 最新发出的值为 2,结合为 [1, 2] 发出
    3. source 发出 2,此时 newest 最新发出的值为 4,结合为 [2, 4] 发出
    4. source 完结,整个 Observable 完结

    5)startWith、forkJoin、race

    startWith 是在 Observable 的一开始加入初始数据,同步立即发送,常用来提供初始状态。

    import { fromEvent, from } from 'rxjs';
    import { startWith, switchMap } from 'rxjs/operators';
    
    const source$ = fromEvent(document.querySelector('#btn'), 'click')
    
    let number = 0
    const fakeRequest = x => {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          resolve(number  )
        }, 1000)
      })
    }
    
    source$.pipe(
      startWith('initData'),
      switchMap(x => from(fakeRequest(x)))
    ).subscribe(x => document.querySelector('#number').textContent = x)
    

    这里通过 startWith 操作符获取了页面的初始数据,之后通过点击按钮获取更新数据。

    forkJoin 只有静态方法形式,类似 Promise.all ,它会等内部所有 Observable 都完结之后,将所有 Observable 对象最后发出来的最后一个数据合并成 Observable。

    race 操作符产生的 Observable 会完全镜像最先吐出数据的 Observable。

    const obs1 = interval(1000).pipe(mapTo('fast one'));
    const obs2 = interval(3000).pipe(mapTo('medium one'));
    const obs3 = interval(5000).pipe(mapTo('slow one'));
    
    race(obs3, obs1, obs2)
    .subscribe(
      winner => console.log(winner)
    );
    
    // result:
    // a series of 'fast one'
    

    exportdefault{

    一个小的练习

    本文中的例子基本来自30 天精通 RxJS,使用 RxJS v6 版本进行重写。

    页面上有一个 p 标签存放一个状态,初始为 0,有两个按钮,一个按钮点击后这个状态增加 1,另一个按钮点击后这个状态减少 1。

    <button id="addButton">Add</button>
    <button id="minusButton">Minus</button>
    <p id="state"></p>
    

    这两个按钮的点击事件我们都可以建立响应式数据流,可以使用 mapTo(1) 和 mapTo(-1) 分别表示点击后增加 1 和减少 1。我们可以使用 EMPTY 创建一个空的数据流来表示这个状态,用 startWith 设定初始值。然后 merge 这两个点击的数据流,但是这还有一个问题,点击事件的数据流需要与表示状态的数据流进行逻辑计算,发出最终的状态,我们才能去订阅这个最终的数据流来更改页面的显示。而这种累计计算的方法,可以用 scan 操作符来实现。最终实现如下:

    import { fromEvent, EMPTY, merge } from 'rxjs'
    import { mapTo, startWith, scan } from 'rxjs/operators'
    
    const addButton = document.getElementById('addButton')
    const minusButton = document.getElementById('minusButton')
    const state = document.getElementById('state')
    
    const addClick$ = fromEvent(addButton, 'click').pipe(mapTo(1))
    const minusClick$ = fromEvent(minusButton, 'click').pipe(mapTo(-1))
    
    merge(
      EMPTY.pipe(startWith(0)),
      addClick$, 
      minusClick$)
    .pipe(
      scan((origin, next) => origin   next)
    ).subscribe(item => {
      state.textContent = item
    })
    

    查看演示

    data() {

    简单拖拽

    页面上有一个 id 为 drag 的 div:

    <div id="drag"></div>
    

    页面 css:

    html, body {
      height: 100%;
      background-color: tomato;
      position: relative;
    }
    
    #drag {
      position: absolute;
      width: 100px;
      height: 100px;
      background-color: #fff;
      cursor: all-scroll;
    }
    

    要实现的功能如下:

    1. 当在这个 div 上按下鼠标左键(mousedown)时,开始监听鼠标移动(mousemove)位置
    2. 当鼠标松开(mouseup)时,结束监听鼠标移动
    3. 当鼠标移动被监听时,更新 div 样式来实现拖拽效果

    实现思路:

    1. 我们可以使用 fromEvent 去转化 DOM 事件

      const mouseDown$ = fromEvent(eleDrag, 'mousedown')
      const mouseMove$ = fromEvent(eleBody, 'mousemove')
      const mouseUp$ = fromEvent(eleBody, 'mouseup')
      
    2. 对于鼠标按下这个数据流,每次鼠标按下事件发生时都转成鼠标移动的数据流

      mouseDown$.pipe(
        map(mouseDownEvent => mouseMove$)
      )
      
    3. 鼠标松开时,结束监听鼠标移动,我们可以用 takeUntil 表示这个逻辑

      mouseDown$.pipe(
        map(mouseDownEvent => mouseMove$.pipe(
          takeUntil(mouseUp$)
        ))
      )
      
    4. 上面的 map 操作符内将每次 mousedown 映射为一个 Observable,形成了高阶 Observable,我们需要用 concatlAll 压平,map 和 concatAll 连用,可以用更简洁的 concatMap

      mouseDown$.pipe(
        concatMap(mouseDownEvent => mouseMove$.pipe(
          takeUntil(mouseUp$)
        ))
      )
      
    5. 订阅这个 mousemove 数据流更新 div 位置。我们可以获取 mousemove event 中的 clientX 和 clientY,减去初始鼠标按下时鼠标相对 div 元素的值来得到最终 div 的绝对位置的 left 和 top。也可以使用 withLatestFrom 操作符,见 demo。

      mouseDown$.pipe(
        concatMap(mouseDownEvent => mouseMove$.pipe(
          map(mouseMoveEvent => ({
            left: mouseMoveEvent.clientX - mouseDownEvent.offsetX,
            top: mouseMoveEvent.clientY - mouseDownEvent.offsetY
          })),
          takeUntil(mouseUp$)
        ))
      ).subscribe(position => {
        eleDrag.style.left = position.left   'px'
        eleDrag.style.top = position.top   'px'
      })
      

    这里是一个更复杂一些的例子,当页面滑动到视频出页面时视频 fixed 定位,这是可以拖拽移动视频位置。通过 getValidValue 对视频拖拽的位置进行了一个限制。

    return{

    缓存

    把上游的多个数据缓存起来,当时机合适时再把汇聚的数据传给下游。

    1)buffer、bufferTime、bufferCount、bufferWhen、bufferToggle

    对于 buffer 这一组操作符,数据汇聚的形式就是数组。

    buffer 接收一个 Observable 作为 notifier,当 notifier 发出数据时,将 缓存的数据传给下游。

    interval(300).pipe(
      take(30),
      buffer(interval(1000))
    ).subscribe(
      x => console.log(x)
    )
    // [0, 1, 2]
    // [3, 4, 5]
    // [6, 7, 8]
    // [9, 10, 11, 12]
    

    bufferTime 是用时间来控制时机,上面可以改成 bufferTime(1000)

    bufferCount 是用数量来控制时机,如 3 个一组,bufferCount(3)

    bufferWhen 接收一个叫做 closeSelector 的参数,它应该返回一个 Observable。通过这个 Observable 来控制缓存。这个函数没有参数。下面的方法等价于前面的 buffer:

    interval(300).pipe(
      take(30),
      bufferWhen(() => {
        return interval(1000)
      })
    ).subscribe(
      x => console.log(x)
    )
    

    bufferToggle 和 buffer 的不同是可以不断地控制缓存窗口的开和关,一个参数是一个 Observable,称为 opening,第二个参数是称为 closeSelector 的一个函数。这个函数的参数是 opening 产生的数据。前一个参数用来控制缓存的开始时间,后一个控制缓存的结束。与 bufferWhen 相比,它的 closeSelector 可以接收参数,控制性更强。

    我们可以使用 buffer 来做事件的过滤,下面的代码只有 500ms 内连续点击两次以上才会输出 ‘success’ 。

    fromEvent(document.querySelector('#btn'), 'click').pipe(
      bufferTime(500),
      filter(arr => arr.length >= 2)
    ).subscribe(
      x => console.log('success')
    )
    

    2)window、windowTime、windowCount、windowWhen、windowToggle

    与前面的 buffer 类似,不过 window 缓存数据汇聚的形式是 Observable,因此形成了高阶 Observable。

    filterVal:'',

    debounceTime、throttleTime

    类似 lodash 的 debounce 和 throttle,用来降低事件的触发频率。

    我们做搜索时,常常要对输入进行 debounce 来减少请求频率。

    fromEvent(document.querySelector('#searchInput'), 'input').pipe(
      debounceTime(300),
      map(e => e.target.value)
    ).subscribe(
      input => document.querySelector('#text').textContent = input
      // 发送请求
    )
    

    list: []

    distinct、distinctUntilChanged

    distinct 操作符可以用来去重,将上游重复的数据过滤掉。

    of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe(
      zip(interval(1000)),
      map(arr => arr[0]),
      distinct()
    ).subscribe(x => console.log(x))
    

    上面的代码只会输出 1, 2, 3, 4

    distinct 操作符还可以接收一个 keySelector 的函数作为参数,这是官网的一个 typescript 的例子:

    interface Person {
      age: number,
      name: string
    }
    
    of<Person>(
      { age: 4, name: 'Foo' },
      { age: 7, name: 'Bar' },
      { age: 5, name: 'Foo' },
    ).pipe(
      distinct((p: Person) => p.name),
    ).subscribe(x => console.log(x))
    
    // { age: 4, name: 'Foo' }
    // { age: 7, name: 'Bar' }
    

    distinctUntilChanged 也是过滤重复数据,但是只会与上一次发出的元素比较。这个操作符比 distinct 更常用。distinct 要与之前发出的不重复的值进行比较,因此要在内部存储这些值,要小心内存泄漏,而 distinctUntilChanged 只用保存上一个的值。

    }

    dalay、delayWhen

    用来延迟上游 Observable 数据的发出。

    delay 可以接受一个数字(单位默认为 ms)或者 date 对象作为延迟控制。

    const clicks = fromEvent(document, 'click')
    const delayedClicks = clicks.pipe(delay(1000)) // 所有点击事件延迟 1 秒
    delayedClicks.subscribe(x => console.log(x))
    

    我们前面介绍过 bufferWhen,dalayWhen 也带有 when,在 RxJS 中,这种操作符它接收的参数都是 Observable Factory,即一个返回 Observable 对象的回调函数,用这个 Observable 来进行控制。

    每个 click 都延迟 0 至 5 秒之间的任意一个时间:

    const clicks = fromEvent(document, 'click')
    const delayedClicks = clicks.pipe(
      delayWhen(event => interval(Math.random() * 5000)),
    )
    delayedClicks.subscribe(x => console.log(x))
    

    },

    异常错误处理

    异常处理的难点:

    1. try/catch 只支持同步
    2. 回调函数容易形成回调地狱,而且每个回调函数的最开始都要判断是否存在错误
    3. Promise 不能重试,而且不强制异常被捕获

    对错误处理的处理可以分为两类,即恢复(recover)和重试(retry)。

    恢复是虽然发生了错误但是让程序继续运行下去。重试,是认为这个错误是临时的,重试尝试发生错误的操作。实际中往往配合使用,因为一般重试是由次数限制的,当尝试超过这个限制时,我们应该使用恢复的方法让程序继续下去。

    1)catchError

    catchError 用来在管道中捕获上游传递过来的错误。

    interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      catchError(err => of(8))
    ).subscribe(x => console.log(x))
    // 0
    // 1
    // 2
    // 3
    // 8
    

    catchError 中的回调函数返回了一个 Observable,当捕获到上游的错误时,调用这个函数,返回的 Observable 中发出的数据会传递给下游。因此上面当 x 为4 时发生了错误,会用 8 来替换。

    catchError 中的回调函数除了接收错误对象为参数外,还有第二个参数 caught$ 表示上游的 Observable 对象。如果回调函数返回这个 Observable 对象,就会进行重试。

    interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      catchError((err, caught$) => caught$),
      take(20)
    ).subscribe(x => console.log(x))
    

    这个代码会依次输出 5 次 0, 1, 2, 3。

    2)retry

    retry 可以接收一个整数作为参数,表示重试次数,如果是负数或者没有传参,会无限次重试。重试实际上就是退订再重新订阅。

    interval(1000).pipe(
          take(6),
          map(x => {
            if (x === 4) {
              throw new Error('unlucky number 4')
            } else {
              return x
            }
          }),
          retry(5) // 重试 5 次
        ).subscribe(x => console.log(x))
    

    在实际开发中,如果是代码原因造成的错误,重试没有意义,如果是因为外部资源导致的异常错误适合重试,如用户网络或者服务器偶尔不稳定的时候。

    3)retryWhen

    和前面带 when 的操作符一样,retryWhen 操作符接收一个返回 Observable 的回调函数,用这个 Observable 来控制重试的节奏。当这个 Observable 发出一个数据时就会进行一次重试,它完结时 retryWhen 返回的 Observable 也立即完结。

    interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      retryWhen(err$ => err$.pipe(
        delay(1000),
        take(5))
      ) // 延迟 1 秒后重试,重试 5 次
    ).subscribe(x => console.log(x))
    

    retryWhen 的可定制性非常高,不仅可以实现延迟定制,还可以实现 retry 的控制重试次数。在实践中,这种重试频率固定的方法还不够好,如果之前的重试失败,之后重试成功的几率也不高。Angular 官网介绍了一个 Exponential backoff 的方法。将每次重试的延迟时间控制为指数级增长。

    import { pipe, range, timer, zip } from 'rxjs';
    import { ajax } from 'rxjs/ajax';
    import { retryWhen, map, mergeMap } from 'rxjs/operators';
    
    function backoff(maxTries, ms) {
     return pipe(
       retryWhen(attempts => range(1, maxTries)
         .pipe(
           zip(attempts, (i) => i),
           map(i => i * i),
           mergeMap(i =>  timer(i * ms))
         )
       )
     );
    }
    
    ajax('/api/endpoint')
      .pipe(backoff(3, 250))
      .subscribe(data => handleData(data));
    
    function handleData(data) {
      // ...
    }
    

    4)finalize

    返回上游数据流的镜像 Observable,当上游的 Observable 完结或出错时调用传给它的函数,不影响数据流。

    interval(1000).pipe(
      take(6),
      map(x => {
        if (x === 4) {
          throw new Error('unlucky number 4')
        } else {
          return x
        }
      }),
      finalize(() => console.log('finally'))
    ).subscribe(x => console.log('a'))
    

    created() {

    tap 操作符

    我们可以使用 tap 操作符来进行调试。

    拦截源 Observable 的每一次发送,执行一个函数,返回源 Observable 的镜像 Observable。

    这个 API 有助于我们对 Observable 的值进行验证(debug)和执行一个会带来副作用的函数,而不会影响源 Observable。如我们用鼠标进行 canvas 绘图,鼠标按下是开始画图,鼠标松开即停止。我们需要在 mousedown 的时候进行 moveTo,否则这次画的会和上次画的连在一起。我们应该把这个会带来副作用过程放在 tap 操作符的函数中,这样才不会影响原来的数据流。

    tap 操作符和订阅并不相同,tap 返回的 Observable 如果没有被订阅,tap 中产生副作用的函数并不会执行。

    Ajax.getData().then(data=> {

    其他一些操作符

    1) repeat

    repeat 用来重复上游 Observable

    2)pluck 类似 lodash 的方法 pluck,提取对象的嵌套属性的值。

    const click$ = fromEvent(document, 'click')
    const tagName$ = click$.pipe(pluck('target', 'tagName'))
    tagName$.subscribe(x => console.log(x))
    

    等价于:

    click$.pipe(map(e => e.target.tagName))
    

    3)toArray

    将发出的数据汇聚为数组

    interval(1000).pipe(
      take(3),
      toArray()
    ).subscribe(x => console.log(x))
    // [0, 1, 2]
    

    4)partition

    将上游的 Observable 分为两个,一个 Observable 的数据是符合判定的数据,另一个时不符合判定的数据。

    const part$ = interval(1000).pipe(
      take(6),
      partition(x => x % 2 === 0)
    )
    
    part$[0].subscribe(x => console.log(x)) // 0, 2, 4
    part$[1].subscribe(x => console.log(x)) // 1, 3, 5
    

    5) 更多操作符

    RxJS 中的操作符非常多,这里只介绍了一部分,更多请查看官网 API。

    this.list =data

    RxJS 最经典的例子——AutoComplete

    有一个用于搜索的 input,当输入时自动发送 ajax,并在下方显示结果列表,然后可以选择结果,这就是我们常见的 AutoComplete 效果。要实现这个效果有很多细节要考虑,如防止 race condition 和优化请求次数。

    <div class="autocomplete">
        <input class="input" type="search" id="search" autocomplete="off">
        <ul id="suggest-list" class="suggest"></ul>
    </div>
    

    先获取两个 DOM 元素:

    const input = document.querySelector('#search');
    const suggestList = document.querySelector('#suggest-list');
    

    我们先将输入框的 input 的事件转化为 Observable。

    const input$ = fromEvent(input, 'input');
    

    然后我们根据输入的值去发送 ajax 请求,由于我们是要获取最新的值而丢弃之前 ajax 返回的值,我们应该使用 switchMap 操作符。通过使用这个操作符,我们解决了 race condition 问题。

    input$.pipe(
      switchMap(e => from(getSuggestList(e.target.value)))
    )
    

    getSuggestList 是一个发送 ajax 请求的方法,返回 promise,我们使用 from 来将其转化为 Observable。

    为了优化请求,首先 e.target.value 是空字符串时不应该发送请求,然后可以使用 debounceTime 减少触发频率,也可以使用 distinctUntilChanged 操作符来表示只有与上次不同时才去发送请求。我们还可以在 API 失败时重试 3 次。

    input$.pipe(
      filter(e => e.target.value.length > 1),
      debounceTime(300),
      distinctUntilChanged(),
        switchMap(
          e => from(getSuggestList(e.target.value)).pipe(retry(3))
        )
      )
    

    然后我们去订阅渲染就可以了。

    对于结果列表上的点击事件,比较简单,具体见demo。

    })

    操作符和数组方法

    Observable 的操作符和数组的方法有相似之处,但是也有很大的不同,体现在以下两点:

    1. 延迟运算
    2. 渐进式取值

    延迟运算,我们之前有讲到过,就是只有订阅后才会开始对元素进行运算。

    因为 Observable 是时间上的集合,操作符不是像数组方法那样运算完所有元素再返回交给下一个方法,而是一个元素一直运算到底,就像管道中的水流一样,先发出的数据先经过操作符的运算。

    },

    多播

    前面的例子都是只有一个订阅者的情况,实际上当然可以有多个订阅者,这就是多播(multicast),即一个数据流的内容被多个 Observable 订阅。

    methods: {

    Hot Observable 和 Cold Observable

    先思考一下下面的例子结果是什么?

    const source$ = interval(1000).pipe(
      take(3)
    )
    
    source$.subscribe(x => console.log('Observer 1: '   x))
    
    setTimeout(() => {
      source$.subscribe(x => console.log('Observer 2: '   x))
    }, 1000)
    

    你可能会以为 Observer 2 一秒后才订阅,错过了数据 0,因此只会输出 1 和 2,但实际上会先输出 0。为什么如此呢?这就涉及到对已错过数据的两种处理策略。

    1. 错过的就让它过去,只要订阅之后生产的数据就好
    2. 不能错过,订阅之前生产的数据也要

    第一种策略类似于直播,第二种和点播相似。使用第一种策略的 Observable 叫做 Cold Observable,因为每次都要重新生产数据,是 “冷”的,需要重新发动。第二种,因为一直在生产数据,只要使用后面的数据就可以了,所以叫 Hot Observable。

    RxJS 中如 interval、range 这些方法产生的 Observable 都是 Cold Observable,产生 Hot Observable 的是由 Promise、Event 这些转化而来的 Observable,它们的数据源都在外部,和 Observer 无关。

    前面我们提到 Observable 都是 lazy evaluation 的,数据管道内的逻辑只有订阅后才会执行,但是 Cold Observable 相对更 lazy 一些。Cold Observable 如果没有订阅者连数据都不会产生,对于 Hot Observable,数据仍会产生,但是不会进入管道处理。

    Hot Observable 是多播,对于 Cold Observable,每次订阅都重新生产了一份数据流,所以不是多播。下面的例子更加明显,两个订阅者有很大的概率会接收到不同的数据。

    const source$ = interval(1000).pipe(
      map(x => Math.floor(Math.random() * 10)),
      take(3)
    )
    
    source$.subscribe(x => console.log('Observer 1: '   x))
    
    setTimeout(() => {
      source$.subscribe(x => console.log('Observer 2: '   x))
    }, 1000)
    

    如果想要实现多播,就要使用 RxJS 中 Subject。

    filter() {

    Subject

    为了防止每次订阅都重新生产一份数据流,我们可以使用中间人,让这个中间人去订阅源数据流,观察者都去订阅这个中间人。这个中间人能去订阅数据流,所以是个 Observer,又能被观察者订阅,所以也是 Observable。我们可以自己实现一个这样的中间人:

    const subject = {
      observers: [],
      subscribe: function (observer) {
        this.observers.push(observer)
      },
      next: function (value) {
        this.observers.forEach(o => o.next(value))
      },
      error: function (error) {
        this.observers.forEach(o => o.error(error))
      },
      complete: function () {
        this.observers.forEach(o => o.complete())
      }
    }
    

    这个 subject 拥有 Observer 的 next、error、complete 方法,每次被观察者订阅时都会在内部保存这个观察者。当接收到源数据流的数据时,会把数据发送给每一个观察者。

    const source$ = interval(1000).pipe(
      map(x => Math.floor(Math.random() * 10)),
      take(3)
    )
    
    const observerA = {
      next: x => console.log('Observer A: '   x),
      error: null,
      complete: () => console.log('Observer A completed')
    }
    const observerB = {
      next: x => console.log('Observer B: '   x),
      error: null,
      complete: () => console.log('Observer B completed')
    }
    
    source$.subscribe(subject)
    subject.subscribe(observerA)
    setTimeout(() => {
      subject.subscribe(observerB)
    }, 1000)
    

    这时我们发现两个观察者接收到的是同一份数据,ObserverB 由于延迟一秒订阅,所以少接收到一个数据。将我们自己实现的 subject 换成 RxJS 中的 Subject,效果相同:

    import { Subject } from 'rxjs'
    const subject = new Subject()
    

    从上面可以看到,Subject 和 Observable 有一个很大的不同:它内部保存有一个观察者列表。

    前面的 subject 是在源数据流发出值时调用 next 方法,向订阅的观察者发送这个值,我们也可以手动调用 subject 的next 方法送出值:

    const observerA = {
      next: x => console.log('Observer A: '   x)
    }
    const observerB = {
      next: x => console.log('Observer B: '   x)
    }
    
    const subject = new Subject()
    
    subject.subscribe(observerA)
    setTimeout(() => {
      subject.subscribe(observerB)
    }, 500)
    
    subject.next(1)
    setTimeout(() => {
      subject.next(2)
    }, 1000)
    

    总结一下,Subject 既是 Observable 又是 Observer,它会对内部的 observers 清单进行组播(multicast)。

    this.list =this.list.filter(item =>item.name===this.filterVal)

    Subject 的错误处理

    在 RxJS 5 中,如果 Subject 的某个下游数据流产生了错误异常,而又没有被 Observer 处理,那这个 Subject 的其他 Observer 都会失败。但是在 RxJS 6 中不会如此。

    在 v6 的这个例子 中,ObserverA 没有对错误进行处理,但是并不影响 ObserverB,而在 v5 这个demo中因为 ObserverA 没有对错误进行处理,使得 ObserverB 终止了。很明显 v6 的这种处理更符合直觉。

    }

    BehaviorSubject、ReplaySubject、AsyncSubject

    1)BehaviorSubject

    BehaviorSubject 需要在实例化时给定一个初始值,如果没有默认是 undefined,每次订阅时都会发出最新的状态,即使已经错过数据的发送时间。

    const observerA = {
      next: x => console.log('Observer A: '   x)
    }
    const observerB = {
      next: x => console.log('Observer B: '   x)
    }
    
    const subject = new BehaviorSubject(0)
    
    subject.subscribe(observerA) // Observer A: 0
    
    subject.next(1) // Observer A: 1
    subject.next(2) // Observer A: 2
    subject.next(3) // Observer A: 3
    
    setTimeout(() => {
      subject.subscribe(observerB) // Observer B: 3
    }, 500)
    

    observerB 已经错过流数据的发送时间,但是订阅时也能获取到最新数据 3。

    BehaviorSubject 有点类似于状态,一开始可以提供初始状态,之后订阅都可以获取最新的状态。

    2)ReplaySubject

    ReplaySubject 表示重放,在新的观察者订阅时重新发送原来的数据,可以通过参数指定重放最后几个数据。

    const observerA = {
      next: x => console.log('Observer A: '   x)
    }
    const observerB = {
      next: x => console.log('Observer B: '   x)
    }
    
    const subject = new ReplaySubject(2) // 重放最后两个
    
    subject.subscribe(observerA)
    
    subject.next(1) // Observer A: 1
    subject.next(2) // Observer A: 2
    subject.next(3) // Observer A: 3
    subject.complete()
    
    setTimeout(() => {
      subject.subscribe(observerB)
      // Observer B: 2
      // Observer B: 3
    }, 500)
    

    这里我们可以看到,即使 subject 完结后再去订阅依然可以重放最后两个数据。

    ReplaySubject(1) 和前面的 BehaviorSubject 是不一样的,首先后者可以提供默认数据,而前者不行,其次前者在 subject 终结后再去订阅依然可以得到最近发出的数据而后者不行。

    3)AsyncSubject

    AsyncSubject 有点类似 operator last,会在 subject 完结后送出最后一个值。

    const subject = new AsyncSubject()
    
    subject.subscribe(observerA)
    
    subject.next(1)
    subject.next(2)
    subject.next(3)
    subject.complete()
    // Observer A: 3
    setTimeout(() => {
      subject.subscribe(observerB)
      // Observer B: 3
    }, 500)
    

    observerA 即使早就订阅了,但是并不会响应前面的 next,完结后才接收到最后一个值 3。

    }

    多播操作符

    前面我们写的 Subject 需要去订阅源数据流和被观察者订阅,写起来比较繁琐,我们可以借助操作符来实现。

    1)multicast

    使用方式如下,接收一个 subject 或者 subject factory。这个操作符返回了一个 connectable 的 Observable。等到执行 connect() 才会用真的 subject 订阅 source,并开始发送数据,如果没有 connect,Observable 是不会执行的。

    const source = interval(1000).pipe(
      map(x => Math.floor(Math.random() * 10)),
      take(3),
      multicast(new Subject)
    )
    
    const observerA = {
      next: x => console.log('Observer A: '   x),
      error: null,
      complete: () => console.log('Observer A completed')
    }
    const observerB = {
      next: x => console.log('Observer B: '   x),
      error: null,
      complete: () => console.log('Observer B completed')
    }
    
    source.subscribe(observerA) // subject.subscribe(observerA)
    
    source.connect() // source.subscribe(subject)
    
    setTimeout(() => {
      source.subscribe(observerB) // subject.subscribe(observerB)
    }, 1000)
    

    2)refCount

    上面使用了 multicast,但是还是有些麻烦,还需要去手动 connect。这时我们可以再搭配 refCount 操作符创建只要有订阅就会自动 connect 的 Observable。只需要去掉 connect 方法调用,在 multicast 后面再加一个 refCount 操作符。

    multicast(new Subject),
    refCount()
    

    refCount 其实就是自动计数的意思,当 Observer 数量大于 1 时,subject 订阅上游数据流,减少为 0 时退订上游数据流。

    3)multicast selector 参数

    multicast 第一个参数除了是一个 subject,还可以是一个 subject factory,即返回 subject 的函数。这时使用了不同的中间人,每个观察者订阅时都重新生产数据,适用于退订了上游之后再次订阅的场景。

    multicast 还可以接收可选的第二个参数,称为 selector 参数。它可以使用上游数据流任意多次,而不会重复订阅上游的数据。当使用了这个参数时,multicast 不会返回 connectable Observable,而是这个参数(回调函数)返回的 Observable。selecetor 回调函数有一个参数,通常叫做 shared,即 multicast 第一个参数所代表的 subject 对象。

    const selector = shared => {
      return shared.pipe(concat(of('done')))
    }
    const source = interval(1000).pipe(
      take(3),
      multicast(new Subject, selector)
    )
    
    const observerA = {
      next: x => console.log('Observer A: '   x),
      error: null,
      complete: () => console.log('Observer A completed')
    }
    const observerB = {
      next: x => console.log('Observer B: '   x),
      error: null,
      complete: () => console.log('Observer B completed')
    }
    
    source.subscribe(observerA)
    setTimeout(() => {
      source.subscribe(observerB)
    }, 5000)
    // Observer A: 0
    // Observer A: 1
    // Observer A: 2
    // Observer A: done
    // Observer A completed
    // Observer B: done
    // Observer B: completed
    

    observerB 订阅时会调用 selector 函数,subject 即shared 已经完结,但是 concat 依然会在这个 Observable 后面加上 'done'。

    可以利用 selector 处理 “三角关系”的数据流,如有一个 tick$ 数据流,对其进行 delay(500) 操作后的下游 delayTick$, 一个由它们合并得到的 mergeTick$,这时就形成了三角关系。delayTick$ 和 mergeTick$ 都订阅了 tick$。

    const tick$ = interval(1000).pipe(
      take(1),
      tap(x => console.log('source: '   x))
    )
    
    const delayTick$ = tick$.pipe(
      delay(500)
    )
    
    const mergeTick$ = merge(tick$, delayTick$).subscribe(x => console.log('observer: '   x))
    // source: 0
    // observer: 0
    // source: 0
    // observer: 0
    

    从上面的结果我们可以验证,tick$ 被订阅了两次。

    我们可以使用 selector 函数来使其只订阅一次,将上面的过程移到 selector 函数内即可。

    const source$ = interval(1000).pipe(
      take(1),
      tap(x => console.log('source: '   x))
    )
    
    const result$ = source$.pipe(
      multicast(new Subject(), shared => {
        const tick$ = shared
        const delayTick$ = tick$.pipe(delay(500))
        const mergeTick$ = merge(tick$, delayTick$)
        return mergeTick$
      })
    )
    
    result$.subscribe(x => console.log('observer: '   x))
    

    这时只会输出一次 'source: 0'。

    4)publish

    publish 是 multicast 的一种简写方式,效果等同于如下:

    function publish (selector) {
      if (selector) {
        return multicast(() => new Subject(), selector)
      } else {
        return multicast(new Subject())
      }
    }
    

    有上一节说到的 selector 函数时,等价于:

    multicast(() => new Subject(), selector)
    

    没有时,等价于:

    multicast(new Subject())
    

    5)share

    share 是 multicast 和 refCount 的简写,share() 等同于在 pipe 中先调用了 multicast(() => new Subject()),再调用了 refCount()。

    const source = interval(1000).pipe(
      take(3),
      share()
    )
    
    const observerA = {
      next: x => console.log('Observer A: '   x),
      error: null,
      complete: () => console.log('Observer A completed')
    }
    const observerB = {
      next: x => console.log('Observer B: '   x),
      error: null,
      complete: () => console.log('Observer B completed')
    }
    
    source.subscribe(observerA)
    setTimeout(() => {
      source.subscribe(observerB)
    }, 5000)
    // Observer A: 0
    // Observer A: 1
    // Observer A: 2
    // Observer A completed
    // Observer B: 0
    // Observer B: 1
    // Observer B: 2
    // Observer B completed
    

    由于 share 是调用了 subject 工厂函数,而不是一个 subject 对象,因此 observerB 订阅时可以重新获取数据。

    6)publishLast、publishBehavior、publishReplay

    同前面的 publish,只不过使用的不是普通 Subject,而是对应的 AsyncSubject、BehaviorSubject、ReplaySubject。

    }

    Scheduler

    Scheduler(调度器)用于控制数据流中数据的推送节奏。

    import { range, asapScheduler } from 'rxjs'
    
    const source$ = range(1, 3, asapScheduler)
    
    console.log('before subscribe')
    source$.subscribe(x => console.log(x))
    console.log('subscribed')
    

    上面的代码,如果去掉 asapScheduler 参数,因为 range 是同步的,会先输出 1, 2, 3,再输出 'subscribed',但是加了以后就变成 先输出 'subscribed',改变了原来数据产生的方式。asap 是 as soon as possible 的缩写,同步任务完成后就会马上执行。

    Scheduler 拥有一个虚拟时钟,如 interval 创建的数据流每隔一段时间要发出数据,由 Scheduler 提供时间来判断是否到了发送数据的时间。

    新葡亰496net:服务端主动通报Web前端的一些研究,学习指南。试想一下,如果 View 直接绑定了以上代码中的list,那么在filter函数执行一次后,虽然 View 更新了,但同时list也被改变,不再是一个原始数据了,下一次执行filter函数将是从上一次的结果集中过滤。

    Scheduler 实例

    • undefined/null:不指定 Scheduler,代表同步执行的 Scheduler
    • asap:尽快执行的 Scheduler
    • async:利用 setInterval 实现的 Scheduler
    • queue:利用队列实现的 Scheduler,用于迭代一个的大的集合的场景。
    • animationFrame:用于动画的 Scheduler

    asap 会尽量使用 micro task,而 async 会使用 macro task。

    很尴尬,总不能重新请求数据吧,那样还搞什么 SPA。

    相关操作符

    一些创建数据流的方法可以提供 Scheduler 参数,合并类操作符如 merge 也可以,在创建数据流后我们也可以使用操作符,使得产生的下游 Observable 推送数据的节奏由指定的 Scheduler 来控制。这个操作符就是 observeOn。

    const tick$ = interval(10) // Intervals are scheduled with async scheduler by default...
    tick$.pipe(
      observeOn(animationFrameScheduler)  // but we will observe on animationFrame scheduler to ensure smooth animation.
    )
    .subscribe(val => {
      someDiv.style.height = val   'px'
    })
    

    本来每 10 ms 就会发送一个数据,修改 Scheduler 为 animationFrame 后只有浏览器重绘才会发送数据更新样式。

    我们还可以通过操作符 subscribeOn 控制订阅的时机。

    const source$ = new Observable(observer => {
      console.log('on subscribe')
      observer.next(1)
      observer.next(2)
      observer.next(3)
      return () => {
        console.log('on unsubscribe')
      }
    })
    
    const tweaked$ = source$.pipe(subscribeOn(asapScheduler))
    
    console.log('before subscribe')
    tweaked$.subscribe(x => console.log(x))
    console.log('subscribed')
    // before subscribe
    // subscribed
    // on subscribe
    // 1
    // 2
    // 3
    

    通过 subscribeOn(asapScheduler),我们把订阅时间推迟到尽快执行。

    现在我们有了新的发现:ViewModel受Model和ViewState的双重影响。

    TestScheduler

    RxJS 中有一个 用于测试的 TestScheduler,RxJS 的测试大家可以查看程墨的《深入浅出 RxJS》或者其他资料。

    import { TestScheduler } from 'rxjs/testing'
    

    ViewModel = 一个或多个 Model 组合 影响 View 展示的 ViewState

    RxJS 的一些实践

    Vue 中有没有好的方法可以很好的描述这个表达式呢?那就是计算属性computed。

    RxJS 与前端框架结合

    Angular 自身引用了 RxJS,如 http 和 animation 都使用了 Observable,状态管理可以使用 ngrx。

    Vue 官方有与 RxJS 集成的 vue-rx。

    React 可以通过 Subject 建立桥梁,Redux 也有与 RxJS 结合的中间件 Redux-Observable。

    exportdefault{

    轮询中的错误处理

    interval(10000).pipe(
      switchMap(() => from(axios.get(url))),
      catchError(err => EMPTY)
    ).subscribe(data => render(data))
    

    上面的代码,每隔 10s 去发送一个请求,当某个请求返回出错时,返回空的 Observable 而不渲染数据。这样处理貌似正确,但是实际上某个请求出错时,整个 Observable 终结了,因此轮询就结束了。为了保持轮询,我们需要进行隔离,把错误处理移到 switchMap 内部进行处理。

    interval(10000).pipe(
      switchMap(() => from(axios.get(url)).pipe(
        catchError(err => EMPTY)
      ))
    ).subscribe(data => render(data))
    

    data() {

    订阅管理

    如果没有及时退订可能会引发内存泄露,我们需要通过退订去释放资源。

    1)命令式管理

    const subscription = source$.subscribe(observer)
    // later...
    subscription.unsubscribe()
    

    上面的管理方式,数量很少时还好,如果数量较多,将会显得十分笨拙。

    2) 声明式管理

    const kill1 = fromEvent(button, 'click')
    const kill2 = getStreamOfRouteChanges()
    const kill3 = new Subject()
    
    const merged$ = mege(
        source1.pipe(takeUntil(kill1)),
        source2.pipe(takeUntil(kill2)),
        source3.pipe(takeUntil(kill3))
    )
    
    const sub = merged$.subscribe(observer)
    // later...
    sub.unsubscribe()
    
    // 或者发出任意结束的事件
    kill3.next(true)
    

    通过 takeUntil、map 或者其他操作符组合进行管理。这样更不容易漏掉某个退订,订阅也减少了。

    3)让框架或者某些类库去处理

    比如 Angular 中的 async pipe,当 unmount 时会自动退订,也不用写订阅。

    return{

    不要 Rx 一切

    不要过度使用 Rx,它比较适合以下场景:

    • 组合事件时
    • 增加延迟和控制频率
    • 组合异步任务
    • 需要取消时

    简单的应用并不需要 RxJS。

    filterVal:'',

    RxJS 的业务实践

    可以看看徐飞的相关思考:流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑

    list: []

    RxJS 与 Async Iterator

    Async Iterator 提案已经进入了 ES2018,可以认为是 iterator 的异步版本。在 Symbol 上部署了 asyncIterator 的接口,不过它的 next 方法返回的是 { value, done } 对象的 Promise 版本。可以使用 for-await-of 进行迭代:

    for await (const line of readLines(filePath)) {
      console.log(line)
    }
    

    使用 Async Iterator 我们可以很容易实现类似 RxJS 操作符的功能:

    const map = async function*(fn) {
      for await(const value of this) yield fn(value)
    }
    

    其他如 fromEvent 等也比较容易实现。Async Iterator 扩展库 axax 的一个例子:

    import { fromEvent } from "axax/es5/fromEvent";
    
    const clicks = fromEvent(document, 'click');
    
    for await (const click of clicks) {
        console.log('a button was clicked');
    }
    

    下面是 Benjamin Gruenbaum 用 Async Iterator 实现 AutoComplete 的一个例子:

    let tooSoon = false, last;
    for await (const {target: {value}} of fromEvent(el, "keyup")) {
      if(!value || tooSoon) continue;
      if(value === last) continue;
      last = value;
      yield await fetch("/autocomplete/"   value); // misses `last` 
      tooSoon = true;
      delay(500).then(() => tooSoon = false);
    }
    

    Async Iterator 相比 RxJS,没有那么多概念,上手快,也比较容易扩展实现那些操作符。

    从数据消费者的角度上看,RxJS 是 push stream,由生产者把数据推送过来,Async Iterator 是 pull stream,是自己去拉取数据。

    }

    参考链接

    博客:30 天精通 RxJS

    书:深入浅出RxJS

    视频:RxJS 5 Thinking Reactively | Ben Lesh

    },

    computed: {

    viewList() {

    returnthis.filterVal

    ?this.list.filter(item =>item.name===this.filterVal)

    :this.list

    }

    },

    created() {

    Ajax.getData().then(data=> {

    this.list =data

    })

    },

    }

    改写代码后,View 绑定计算属性viewList,有过滤关键字就返回过滤结果,否则返回原始数据。这才称得上是数据驱动。

    共享Model

    如果一个 View 中存在多处共享的 Model,那么毫不犹豫的使用 Vuex 吧。

    对于复杂单页应用,可以考虑分模块管理,避免全局状态过于庞大。即使是共享的 Model 也是分属不同的业务模块和共享级别。

    比如文档数据,可能只有/document起始路径下的视图需要共享。那么从节省内存的角度考虑,只有进入该路由时才去装载对应的 Vuex 模块。幸运的是 Vuex 提供的模块动态装载的 API。

    对于共享级别高的数据,比如用户相关的数据,可以直接绑定到 Vuex 模块中。

    store

    | actions.js

    | index.js

    | mutations.js

    ---global

    | user.js

    ---partial

    | foo.js

    | bar.js

    分模块管理后,马上就会遇到跨模块调用数据的问题。一个 View 中需要的数据往往是全局状态和模块状态数据的聚合,可以使用getter解决这个问题。

    exportdefault{

    // ...

    getters: {

    viewData (state, getters, rootState) {

    returnstate.data rootState.data

    }

    }

    }

    如果一个 View 是需要多个模块状态的数据呢?

    exportdefault{

    // ...

    getters: {

    viewData (state, getters) {

    returnstate.data getters.partialData

    }

    }

    }

    虽然不能直接访问到其他模块的 state,但是getter和action、mutation都注册在全局命名空间,访问不受限制。

    计算属性 vs Getter

    Getter 与组件的计算属性拥有相同的作用,其中引用的任何 state 或者 getter 变化都会触发这个 getter 重新计算。

    那么问题来了:什么时候我应当使用计算属性?什么时候使用 Getter?

    这里其实是有一个数据前置原则:能放到上层的就不放到下层。

    需要聚合多个 state 或 getter 时,使用 getter。如果有多个视图需要同样的数据组合就可以实现 getter 的复用。

    需要聚合的数据中包含 ViewState 时,使用 computed。因为在 store 中无法访问 ViewState。

    至此我们已经保证了应用内的任何一个共享数据最终都来源于某个全局状态或某个模块的状态。

    Model的更新

    Model 的更新有两种,一种是本地触发的更新,另一种是其他客户端更新再由服务器推送的更新。

    可以这样表示:

    Model = 本地原始数据 本地更新数据 推送数据

    我们似乎又回到了那个列表组件类似的问题上。要不把 3 种数据都设为 state,由 3 种数据组合的 getter 来表示 Model?

    现在来比较一下。另外有一个前提是 Vuex 只允许提交 mutation 来更改 state。

    单State

    对于一个 state 的更新不外乎是增、删、改、查四种情况,所以至少对应有 4 个 action 和 4 个 mutation,直接对表示源数据的 state 进行更改。

    exportdefault{

    state: {

    data: []

    },

    mutations: {

    init(state, payload) {

    state.data= payload

    },

    add(state, payload) {

    state.data.push(payload)

    },

    delete(state, payload) {

    state.data.splice(state.data.findIndex(item=>item.id===payload), 1)

    },

    update(state, payload) {

    Object.assign(state.data.find(item=>item.id===payload.id), payload)

    }

    },

    actions: {

    fetch({ commit }) {

    Api.getData().then(data=> {

    commit('init',data)

    })

    },

    add({ commit }, item) {

    Api.add(item).then(data=> {

    commit('add',item)

    })

    },

    delete({ commit }, id) {

    Api.delete(id).then(data=> {

    commit('delete',id)

    })

    },

    update({ commit }, item) {

    Api.update(item).then(data=> {

    commit('update',item)

    })

    }

    }

    }

    多State

    如果把一个 Model 拆成多个 state,本地更新数据和推送数据统一为变更数据,对应到增、删、改、查四种情况,那就需要 4 个 state,即:originData、addData、deleteData、updateData。

    mutation 和 action 到不会有什么变化,增、删、改原本就是分开写的,只是各自对应到不同的 state 上,最终的 Model 由一个 getter 来表示。

    export default {

    state: {

    originData:[],

    addData:[],

    deleteData:[],

    updateData:[]

    },

    getters:{

    data(state) {

    returnstate.originData.concat(state.addData) //add

    .map(item => Object.assign(item,

    state.updateData.find(uItem =>uItem.id===item.id))) //update

    .filter(item => 新葡亰496net:服务端主动通报Web前端的一些研究,学习指南。!state.deleteData.find(id => id ===item.id)) //delete

    }

    },

    mutations:{

    init(state, payload) {

    state.originData = payload

    },

    add(state, payload) {

    state.addData.push(payload)

    },

    delete(state, payload) {

    state.deleteData.push(payload)

    },

    update(state, payload) {

    state.updateData.push(payload)

    }

    },

    actions:{

    // 略...

    }

    }

    这么一大串方法链看起来很酷对不对,但是性能呢?任何一个 state 的变更都将引起这个复杂的 getter 重新执行 5 个循环操作。

    知乎上有个相关问题的讨论:JavaScript 函数式编程存在性能问题么?

    其中提到的解决办法是惰性计算。相关的函数库有:lazy.js,或者使用 lodash 中的_.chain函数。

    还有一种办法是统一为K, V数据结构,这样一个混合函数就搞定了Object.assign(originData, addData, updateData, deleteData)。

    对比而言,我认为多 state 的方式更符合数据驱动及响应式编程思维,但需要有好的办法去解决复杂的循环操作这个问题,单 state 的方式就是面向大众了,两者都可以解决问题。甚至于全面使用响应式编程,使用RxJS替代 Vuex。

    数据同步

    前面提到过了,不管是本地更新数据还是服务端推送数据,可以统一为增、删、改三种接口。不管是本地更新还是推送数据,根据数据同步类型走同一个数据更改函数。

    这在 Vuex 中很容易实现。利于 Vuex 的插件功能,可以在接受推送后提交到对应的 mutation。前提是要和后端约好数据格式,更方便的映射到对应的 mutationType,比如:{ 数据名,同步类型,同步数据 }。

    exportdefaultstore => {

    socket.on('data',data=> {

    const{name,type,data} =data

    store.commit(type name,data)

    })

    }

    这样就实现了本地增、删、改与推送数据增、删、改的无差异化。

    本文由新葡亰496net发布于新葡亰官网,转载请注明出处:新葡亰496net:服务端主动通报Web前端的一些研究

    关键词: