理解服务端推送与实践

在Web开发的时候,通常会需要服务端向客户端推送消息。技术选型可以选择Comet(基于Ajax或HTTP流)、SSE(Server-sent Events, 只能是服务端推送)、Websocket(应用于浏览器Socket,基于TCP的双向通信协议),但随着Websocket在浏览器中的广泛支持,一般首推Websocket,如果只有服务端推送的需求可以选用SSE,在古老的浏览器环境中才选用Comet或通过ajax轮询。

一、Comet

Comet是一种用于web的推送技术,能使服务器实时地将更新的信息传送到客户端,而无须客户端发出请求,目前有两种实现方式,长轮询和iframe流。

  • 长轮询
    长轮询不是长链接,是在打开一条连接以后保持,等待服务器推送来数据再关闭的方式。长轮询的实现需要借助异步请求来实现,比如AsyncContextDeferredResult
    Servlet3.0就引入了异步上下文,只是大家使用的较少。关于长轮询,并不是什么过时的技术,现在的配置中心(Nacos和Apollo)使用的就是长轮询来实现推送
    DeferredResult使用示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @RequestMapping("/polling")
    @RestController
    public class PollingController {

    private final static Multimap<Integer, DeferredResult<String>> watchRequestMap = Multimaps.synchronizedMultimap(HashMultimap.create());
    private static final Long TIME_OUT = 60000L;

    @RequestMapping("watch/{id}")
    public DeferredResult<String> watch(@PathVariable Integer id) {
    DeferredResult<String> result = new DeferredResult<>(TIME_OUT);
    result.onTimeout(() -> {
    System.err.println("Task time out.");
    });
    result.onCompletion(() -> {
    watchRequestMap.remove(id, result);
    });
    watchRequestMap.put(id, result);
    return result;
    }


    @RequestMapping("publish/{id}")
    public void publish(@PathVariable Integer id) {
    Collection<DeferredResult<String>> deferredResults = watchRequestMap.get(id);
    deferredResults.forEach(item -> {
    item.setResult(String.format("id=%d于%s更新完成", id, DateUtil.now()));
    });
    }
    }
  • iframe流
    iframe流方式是在页面中插入一个隐藏的iframe,利用其src属性在服务器和客户端之间创建一条长链接,服务器向iframe传输数据(通常是HTML,内有负责插入信息的javascript,例如<script type=\"text/javascript\">...</script>),来实时更新页面。iframe流方式的优点是浏览器兼容好,Google公司在一些产品中使用了iframe流,如Google Talk。

二、SSE

SSE其实就是基于comet搞出来一套规范的API,被纳入到HTML5规范,使用起来相对更简单了。

SSE 客户端

浏览器端主要是EventSource的使用:

附带一份SSE Demo的前端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (window.EventSource == null) {
alert('The browser does not support Server-Sent Events');
}else{
var source = new EventSource('http://localhost:8381/sse/subscribe?id=data001');
source.onmessage = function (event) {
refreshByData(event.data);
};
source.onopen = function (event) {
refreshByData('连接成功');
};
source.addEventListener("close", (event) => {
source.close();
refreshByData('连接关闭');
});
}

function refreshByData(data) {
var text = document.getElementById('result').innerText;
text += '\n' + data;
document.getElementById('result').innerText = text;
}

SSE 服务端

可以通过Spring自带的SseEmitter完成,主要是帮我们更容易实现SSE的协议:请求头和body格式。即必须有以下headers:

1
2
3
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

然后内容可以是以下几个,以:开头,以\n分隔:

1
2
3
4
data
event
id
retry

注意:
1. event即自定义监听事件类型,不传则为message
2. data必传且不可为空
附带一份SSE Demo的服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@ResponseBody
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) throws IOException {
SseEmitter sseEmitter = new SseEmitter(60000L);
sseEmitter.onCompletion(() -> log.warn("[推送完成]"));
new Thread(() -> {
IntStream.range(1, 10).forEach(n -> {
try {
sseEmitter.send(SseEmitter.event().data("推送消息 : " + n ));
TimeUnit.MILLISECONDS.sleep(500L);
}catch (Exception e){
e.printStackTrace();
}
});

try {
// 注意消息的data是必须填的,也不能为空
sseEmitter.send(SseEmitter.event().name("close").data("--anything but no null--"));
// 这里完全不是关闭,只是清空异步响应DeferredResult
sseEmitter.complete();
} catch (IOException e) {
e.printStackTrace();
}

}).start();
return sseEmitter;
}

附带谷歌浏览器中的EventStream信息:

SSE避坑指南

  1. 看了几篇blog,都没提及SSE中http流的关闭。不处理的话,看请求就是会不断的重连。流关闭是由客户端发起,调用close方法,但是内容由服务端发送,客户端肯定不知道啊,只能是服务端发送一个结束事件来触发。也可以是特定消息,由onmessage中判断。

  2. 浏览器有个不可突破的限制,就是SSE请求同事只能有6个(基于HTTP1.1)。详见:https://developer.mozilla.org/zh-CN/docs/Web/API/EventSource

三、WebSocket

websocket协议

websocket其实是一种双向通信协议,它是基于TCP协议的。有人说他是独立于HTTP的,并不准确,它握手使用的是HTTP的协议升级机制,而后数据传输使用的才是websocket协议。
相较于HTTP协议,它的优势在于它支持交换数据帧(客户端或服务端都可以在任何时间点发送数据),实现了双向通信。因此,SSE能实现的功能,websocket也全部能做到,而且,相较于SSE,它还有以下优势:

  • 支持客户端和服务端的双向通信
  • 浏览器没有连接数限制
  • 相对SSE只支持UTF8文本,它还支持二进制数据

客户端代码

依赖sockjs()完成websocket客户端的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 引用socketjs
<script src="https://cdn.bootcss.com/sockjs-client/0.3.4/sockjs.min.js"></script>
// 创建
ws = new SockJS(targetUri);
// 连接
ws.onopen = function () {
log('和服务端连接成功!');
};
ws.onmessage = function (event) {
log('服务端说:' + event.data);
};
ws.onclose = function () {
log('和服务端断开连接!')
}
// 发送数据
function sent() {
if (ws != null) {
ws.send(text.value);
log('客户端说:' + text.value);
} else {
log('请先建立连接!')
}
}
// 关闭
function disconnect() {
if (ws != null) {
ws.close();
ws = null;
}
}

服务端代码

服务端选择一种最简单的方式,依赖spring-boot-starter-websocket
完成步骤可以大致分为以下几步:

  1. pom新增websocket依赖:

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
  2. 开启@EnableWebSocket,通过WebSocketConfigurer注入WebSocketHandler。这里直接继承TextWebSocketHandler, 注意同时定义了客户端连接路径/connect

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Configuration
    @EnableWebSocket
    public class WebSocketServerConfigure implements WebSocketConfigurer {
    @Autowired
    private MyTextSocketHandler textSocketHandler;
    /**
    * 效果类似 @ServerEndpoint
    * @param registry
    */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(textSocketHandler, "/connect")
    .setAllowedOriginPatterns("*") // setAllowedOrigins已过时
    .withSockJS();
    }
    }
  3. 定义WebSocketHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    @Component
    @Slf4j
    public class MyTextSocketHandler extends TextWebSocketHandler {

    public static Map<String, WebSocketSession> connectManager = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
    log.info("和客户端建立连接");
    connectManager.put(session.getId(), session);
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
    session.close(CloseStatus.SERVER_ERROR);
    connectManager.remove(session.getId());
    log.error("连接异常", exception);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    super.afterConnectionClosed(session, status);
    connectManager.remove(session.getId());
    log.info("和客户端断开连接");
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    // 获取到客户端发送过来的消息
    String receiveMessage = message.getPayload();
    log.info(receiveMessage);
    // 发送消息给客户端
    session.sendMessage(new TextMessage(fakeAi(receiveMessage)));
    // 关闭连接
    // session.close(CloseStatus.NORMAL);
    }

    private static String fakeAi(String input) {
    if (input == null || "".equals(input)) {
    return "你说啥?";
    }
    return "收到消息 : " +input;
    }
    }
  4. 注意到以上代码只有服务端响应,并返回客户端数据,没有服务端主动推送的。而服务端响应都是通过WebSocketSession来完成,因此,我们只要把WebSocketSession保存,在另一个地方就可以实现服务端push了。

    1
    2
    3
    4
    Collection<WebSocketSession> values = MyTextSocketHandler.connectManager.values();
    for (WebSocketSession session : values) {
    socketHandler.handleTextMessage(session, new TextMessage(msg));
    }

    如果不想依赖Springboot,那么需要引入javax.websocket。可以参考A Guide to the Java API for WebSocket

四、总结

正如文章开头说的那样,web即时通讯中,commet是过时但稳定的技术;如果只考虑服务端推送,那么SSE也是可以的,基于HTTP流的它拥有着传统HTTP的优势,实现简便且能稳定处理网络异常;websocket随着日渐被广泛使用,已经被Html5说是未来的通信技术,而且已经被绝大多数的浏览器支持,比支持SSE的更多。如果你是应用于新的项目的话,那么放心地使用它吧。


如果你想了解更多的comet技术细节,可以参阅:
http://www.52im.net/thread-338-1-1.html
然后这篇文章对Comet、SSE、websocket进行了清晰的对比和说明:
https://blog.csdn.net/weixin_44739881/article/details/104072392