淺談 Server-Sent Events
前幾天看到 Twitter 點(diǎn)贊區(qū)域蹦噠的很可愛(ài), 于是乎想研究研究怎么實(shí)現(xiàn)的. 一開(kāi)始下意識(shí)以為是 WebSocket, 但是在控制臺(tái)死活沒(méi)找到相關(guān)的信息, 去 stackoverflow 問(wèn)了下原來(lái)是 Server-Sent Events, 本文對(duì)此做個(gè)介紹.

什么是 Server-Sent Events
Server-Sent Events 是一種服務(wù)器推送技術(shù), 使客戶(hù)端可以通過(guò) HTTP 連接從服務(wù)器自動(dòng)接收更新. 每個(gè)通知以文本流(文本應(yīng)該為 utf-8)的形式發(fā)送, 并以一對(duì)換行符結(jié)尾. 與 WebSocket 相比:
它不是全雙工的, 只能服務(wù)器向?yàn)g覽器發(fā)送, 因?yàn)榱餍畔⒈举|(zhì)上就是下載, 一旦連接后不能再次發(fā)送請(qǐng)求(否則就變成了一次新的連接)
WebSocket 使用的 ws 協(xié)議, 而 SSE 使用的仍然是 HTTP 協(xié)議.
SSE 的特點(diǎn)
這里直接抄襲阮一峰聚聚的:
SSE 使用 HTTP 協(xié)議, 現(xiàn)有的服務(wù)器軟件都支持. WebSocket 是一個(gè)獨(dú)立協(xié)議.
SSE 屬于輕量級(jí), 使用簡(jiǎn)單;WebSocket 協(xié)議相對(duì)復(fù)雜.
SSE 默認(rèn)支持?jǐn)嗑€(xiàn)重連, WebSocket 需要自己實(shí)現(xiàn).
SSE 一般只用來(lái)傳送文本, 二進(jìn)制數(shù)據(jù)需要編碼后傳送, WebSocket 默認(rèn)支持傳送二進(jìn)制數(shù)據(jù).
SSE 支持自定義發(fā)送的消息類(lèi)型.
來(lái)個(gè)例子
無(wú)碼言屌, 下面我們通過(guò)一個(gè)例子來(lái)演示怎么使用 SSE. 首先看客戶(hù)端的代碼, 通過(guò) new 一個(gè) EventSource 來(lái)創(chuàng)建 SSE 實(shí)例, 第一個(gè)參數(shù)為后端接口, 第二個(gè) option 參數(shù)只有一個(gè) withCredentials, 如果為 true, 在跨域的情況下, 可被允許發(fā)送 cookie.
const evtSource = new EventSource("http://localhost:3002/sse", {
withCredentials: true,
});
EventSource 可以監(jiān)聽(tīng)三種事件, 分別是 onopen, onmessage, onerror, 第一個(gè)和第三個(gè)不用多說(shuō), 分別是建立連接成功和建立連接失敗(CORS 或者請(qǐng)求超時(shí)等等). 而 onmessage 最重要, 它用來(lái)監(jiān)聽(tīng)每次推送的信息流.
因?yàn)?SSE 接收的只能是 utf-8 的純文本, 因此最通用的做法是后端傳遞一個(gè) JSON 字符串. 下面的代碼中, 每次接收到新的推送, 就會(huì)打印出 like_count, 直到 like_count > 10, 客戶(hù)端會(huì)主動(dòng)要求服務(wù)端停止推送.
interface Data {
payload: { like_count: number };
}
evtSource.addEventListener("message", (e: MessageEvent) => {
const {
payload: { like_count },
}: Data = JSON.parse(e.data);
console.log(like_count);
if (like_count > 10) {
evtSource.close();
}
});
上面基本就是客戶(hù)端要做的事情了, 很簡(jiǎn)單, 下面看下服務(wù)端的. 因?yàn)?nestjs 封裝了對(duì) SSE 的支持, 這里就用這個(gè)框架搞下.
事件流僅僅是一個(gè)簡(jiǎn)單的文本數(shù)據(jù)流, 文本應(yīng)該使用 UTF-8 格式的編碼. 每條消息后面都由一個(gè)空行作為分隔符. 每條消息是由多個(gè)字段組成的, 每個(gè)字段由字段名, 一個(gè)冒號(hào), 以及字段值組成.
規(guī)范中支持四個(gè)字段, 分別是:
event: 該字段為 onmessage 的子集, 也就是說(shuō)你可以通過(guò)
evtSource.addEventListener("customEvt", () => {}來(lái)細(xì)粒度監(jiān)聽(tīng)指定 event 的推送.data: 也就是傳遞的實(shí)體, 如果該條消息包含多個(gè) data 字段, 則客戶(hù)端會(huì)用換行符把它們連接成一個(gè)字符串來(lái)作為字段值.
id 可以給每次推送增加一個(gè) id 標(biāo)識(shí), 比如是 tweetId, 這樣就可以把實(shí)體中的 likeCount 跟 tweetId 一一映射.
retry: 指定瀏覽器重新發(fā)起連接的時(shí)間間隔, 它是一個(gè)整數(shù)值, 指定了重新連接的時(shí)間(單位為毫秒), 如果不是正整數(shù)將被忽略
此外, 以冒號(hào)開(kāi)頭的行為注釋行, 會(huì)被忽略, 注釋行可以用來(lái)防止連接超時(shí), 服務(wù)器可以定期發(fā)送一條消息注釋行, 以保持連接不斷.
: just a comment\n\n
id: 12345\n
event: addLikeCount\n
retry: 10000\n
data: {\n
data: "likeCount": 1,\n
data: }\n\n下面直接看后端代碼實(shí)現(xiàn), 由于 nestjs 實(shí)現(xiàn) SSE 必須用 rxjs, 所以得有點(diǎn)兒相關(guān)基礎(chǔ). 代碼中每?jī)擅胪鲁鲆淮螖?shù)據(jù), 由于 nestjs 會(huì)將 data 轉(zhuǎn)化為 SSE 想要的結(jié)構(gòu), data 直接寫(xiě)成對(duì)象即可.
import { Injectable } from "@nestjs/common";
import { interval } from "rxjs";
import { map } from "rxjs/operators";
import { randomSeries } from "yancey-js-util";
@Injectable()
export class SSEService {
public sse() {
let count = 1;
return interval(2000).pipe(
map((_) => ({
id: randomSeries(6),
type: "addLikeCount",
data: { payload: { tweetId: randomSeries(6), likeCount: count++ } },
retry: 10000,
}))
);
}
}
如果沒(méi)什么意外, 前端就可以看到 stream 了.

分析下響應(yīng)頭, 很重要的兩個(gè)標(biāo)志是禁止了緩存, 并且 Content-Type: text/event-stream.

GraphQL 是否支持?

Emmmm, 似乎不支持, 不過(guò) GraphQL 本身已經(jīng)有了強(qiáng)大的 Subscriptions 系統(tǒng), 也沒(méi)必要支持這些玩意兒了(畢竟去年 GraphQL 支持個(gè)上傳還不利索, 逃).
Can I use SSE?
Emmmm, 除了某瀏覽器, 都可以用.

其他
突然想到, 我在后端配置了 rateLimit. 那么前端如此頻繁的獲取數(shù)據(jù), 會(huì)觸發(fā) rateLimit 嗎? 答案是不會(huì)的, 因?yàn)?SSE 始終只是一條 HTTP 請(qǐng)求, 而 rateLimit 限制的只是重復(fù)請(qǐng)求.
app.use(
rateLimit({
windowMs: 15 * 60 * 1000,
max: 100,
})
);奉上全部代碼
// 前端部分
// SSE.tsx
import { FC, useState, useEffect } from "react";
interface CustomEvent extends Event {
data: string;
}
interface Data {
payload: {
likeCount: number;
};
}
const SSE: FC = () => {
const [like, setLike] = useState(0);
const initialSSE = () => {
const evtSource = new EventSource("http://localhost:3002/sse", {
withCredentials: true,
});
evtSource.addEventListener("open", () => {
console.log("已開(kāi)啟");
});
// 這里使用的便是自定義事件
evtSource.addEventListener("addLikeCount", ((e: CustomEvent) => {
const {
payload: { likeCount },
}: Data = JSON.parse(e.data);
setLike(likeCount);
if (likeCount > 10) {
evtSource.close();
}
}) as EventListener);
evtSource.addEventListener("message", (e: MessageEvent) => {});
evtSource.addEventListener("error", (err: Event) => {
console.log(err);
});
};
useEffect(() => {
initialSSE();
}, []);
return <div>{like}</div>;
};
export default SSE;
// 后端部分
// sse.module.ts
import { Module } from "@nestjs/common";
import { SSEController } from "./sse.controller";
import { SSEService } from "./sse.service";
@Module({
controllers: [SSEController],
providers: [SSEService],
})
export class SSEModule {}
// sse.controller.ts
import { Controller, MessageEvent, Sse } from "@nestjs/common";
import { Observable } from "rxjs";
import { SSEService } from "./sse.service";
@Controller()
export class SSEController {
constructor(private readonly sseService: SSEService) {
this.sseService = sseService;
}
@Sse("sse")
public sse(): Observable<MessageEvent> {
return this.sseService.sse();
}
}
// sse.service.ts
import { Injectable } from "@nestjs/common";
import { interval } from "rxjs";
import { map } from "rxjs/operators";
import { randomSeries } from "yancey-js-util";
@Injectable()
export class SSEService {
public sse() {
let count = 1;
return interval(2000).pipe(
map((_) => ({
id: randomSeries(6),
type: "addLikeCount",
data: { payload: { tweetId: randomSeries(6), likeCount: count++ } },
retry: 10000,
}))
);
}
}參考
Using server-sent events
Server-Sent Events 教程
