WebSocket?集群解決方案
-? ? ?問題起因? ? -最近做項(xiàng)目時(shí)遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請求,以及集群中WebSocket Session共享的問題。
期間我經(jīng)過了幾天的研究,總結(jié)出了幾個(gè)實(shí)現(xiàn)分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結(jié)出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。以下是我的場景描述
- 資源:4臺服務(wù)器。其中只有一臺服務(wù)器具備ssl認(rèn)證域名,一臺redis mysql服務(wù)器,兩臺應(yīng)用服務(wù)器(集群)
- 應(yīng)用發(fā)布限制條件:由于場景需要,應(yīng)用場所需要ssl認(rèn)證的域名才能發(fā)布。因此ssl認(rèn)證的域名服務(wù)器用來當(dāng)api網(wǎng)關(guān),負(fù)責(zé)https請求與wss(安全認(rèn)證的ws)連接。俗稱https卸載,用戶請求https域名服務(wù)器(eg:https://oiscircle.com/xxx),但真實(shí)訪問到的是http ip地址的形式。只要網(wǎng)關(guān)配置高,能handle多個(gè)應(yīng)用
- 需求:用戶登錄應(yīng)用,需要與服務(wù)器建立wss連接,不同角色之間可以單發(fā)消息,也可以群發(fā)消息
- 集群中的應(yīng)用服務(wù)類型:每個(gè)集群實(shí)例都負(fù)責(zé)http無狀態(tài)請求服務(wù)與ws長連接服務(wù)
-? ? ?系統(tǒng)架構(gòu)圖? ? -
在我的實(shí)現(xiàn)里,每個(gè)應(yīng)用服務(wù)器都負(fù)責(zé)http and ws請求,其實(shí)也可以將ws請求建立的聊天模型單獨(dú)成立為一個(gè)模塊。從分布式的角度來看,這兩種實(shí)現(xiàn)類型差不多,但從實(shí)現(xiàn)方便性來說,一個(gè)應(yīng)用服務(wù)http ws請求的方式更為方便。下文會有解釋。本文涉及的技術(shù)棧
- Eureka 服務(wù)發(fā)現(xiàn)與注冊
- Redis Session共享
- Redis 消息訂閱
- Spring Boot
- Zuul 網(wǎng)關(guān)
- Spring Cloud Gateway 網(wǎng)關(guān)
- Spring WebSocket 處理長連接
- Ribbon 負(fù)載均衡
- Netty 多協(xié)議NIO網(wǎng)絡(luò)通信框架
- Consistent Hash 一致性哈希算法
-? ? ?技術(shù)可行性分析? ? -下面我將描述session特性,以及根據(jù)這些特性列舉出n個(gè)解決分布式架構(gòu)中處理ws請求的集群方案
WebSocketSession與HttpSession在Spring所集成的WebSocket里面,每個(gè)ws連接都有一個(gè)對應(yīng)的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接之后可以通過類似這樣的方式進(jìn)行與客戶端的通信:
protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
???System.out.println("服務(wù)器接收到的消息:?" ?message?);
???//send?message?to?client
???session.sendMessage(new?TextMessage("message"));
}
那么問題來了:ws的session無法序列化到redis,因此在集群中,我們無法將所有WebSocketSession都緩存到redis進(jìn)行session共享。每臺服務(wù)器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前沒有websocket session共享的方案,因此走redis websocket session共享這條路是行不通的。有的人可能會想:我可不可以將sessin關(guān)鍵信息緩存到redis,集群中的服務(wù)器從redis拿取session關(guān)鍵信息然后重新構(gòu)建websocket session...我只想說這種方法如果有人能試出來,請告訴我一聲...以上便是websocket session與http session共享的區(qū)別,總的來說就是http session共享已經(jīng)有解決方案了,而且很簡單,只要引入相關(guān)依賴:spring-session-data-redis
和spring-boot-starter-redis
,大家可以從網(wǎng)上找個(gè)demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底層實(shí)現(xiàn)的方式,我們無法做到真正的websocket session共享。-? ? ?解決方案的演變??? -Netty與Spring WebSocket
剛開始的時(shí)候,我嘗試著用netty實(shí)現(xiàn)了websocket服務(wù)端的搭建。在netty里面,并沒有websocket session這樣的概念,與其類似的是channel,每一個(gè)客戶端連接都代表一個(gè)channel。前端的ws請求通過netty監(jiān)聽的端口,走websocket協(xié)議進(jìn)行ws握手連接之后,通過一些列的handler(責(zé)鏈模式)進(jìn)行消息處理。與websocket session類似地,服務(wù)端在連接建立后有一個(gè)channel,我們可以通過channel進(jìn)行與客戶端的通信。
???/**
????*?TODO?根據(jù)服務(wù)器傳進(jìn)來的id,分配到不同的group
????*/
???private?static?final?ChannelGroup?GROUP?=?new?DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
?
???@Override
???protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
???????//retain增加引用計(jì)數(shù),防止接下來的調(diào)用引用失效
???????System.out.println("服務(wù)器接收到來自?"? ?ctx.channel().id()? ?"?的消息:?"? ?msg.text());
???????//將消息發(fā)送給group里面的所有channel,也就是發(fā)送消息給客戶端
???????GROUP.writeAndFlush(msg.retain());
???}
那么,服務(wù)端用netty還是用spring websocket?以下我將從幾個(gè)方面列舉這兩種實(shí)現(xiàn)方式的優(yōu)缺點(diǎn)。-? ? ?使用 netty 實(shí)現(xiàn) websocket??? -
玩過netty的人都知道netty是的線程模型是nio模型,并發(fā)量非常高,spring5之前的網(wǎng)絡(luò)線程模型是servlet實(shí)現(xiàn)的,而servlet不是nio模型,所以在spring5之后,spring的底層網(wǎng)絡(luò)實(shí)現(xiàn)采用了netty。如果我們單獨(dú)使用netty來開發(fā)websocket服務(wù)端,速度快是絕對的,但是可能會遇到下列問題:
- 與系統(tǒng)的其他應(yīng)用集成不方便,在rpc調(diào)用的時(shí)候,無法享受springcloud里feign服務(wù)調(diào)用的便利性
- 業(yè)務(wù)邏輯可能要重復(fù)實(shí)現(xiàn)
- 使用netty可能需要重復(fù)造輪子
- 怎么連接上服務(wù)注冊中心,也是一件麻煩的事情
- restful服務(wù)與ws服務(wù)需要分開實(shí)現(xiàn),如果在netty上實(shí)現(xiàn)restful服務(wù),有多麻煩可想而知,用spring一站式restful開發(fā)相信很多人都習(xí)慣了。
-? ? ?使用 spring websocket 實(shí)現(xiàn) ws 服務(wù)? ? -spring websocket已經(jīng)被springboot很好地集成了,所以在springboot上開發(fā)ws服務(wù)非常方便,做法非常簡單第一步:添加依賴
<dependency>
???<groupId>org.springframework.bootgroupId>
???<artifactId>spring-boot-starter-websocketartifactId>
dependency>
第二步:添加配置類@Configuration
public?class?WebSocketConfig?implements?WebSocketConfigurer?{
@Override
public?void?registerWebSocketHandlers(WebSocketHandlerRegistry?registry)?{
????registry.addHandler(myHandler(),?"/")
????????.setAllowedOrigins("*");
}
?
@Bean
?public?WebSocketHandler?myHandler()?{
?????return?new?MessageHandler();
?}
}
第三步:實(shí)現(xiàn)消息監(jiān)聽類@Component
@SuppressWarnings("unchecked")
public?class?MessageHandler?extends?TextWebSocketHandler?{
???private?List?clients?=?new?ArrayList<>();
?
???@Override
???public?void?afterConnectionEstablished(WebSocketSession?session)?{
???????clients.add(session);
???????System.out.println("uri?:"? ?session.getUri());
???????System.out.println("連接建立:?"? ?session.getId());
???????System.out.println("current?seesion:?"? ?clients.size());
???}
?
???@Override
???public?void?afterConnectionClosed(WebSocketSession?session,?CloseStatus?status)?{
???????clients.remove(session);
???????System.out.println("斷開連接:?"? ?session.getId());
???}
?
???@Override
???protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
???????String?payload?=?message.getPayload();
???????Map?map?=?JSONObject.parseObject(payload,?HashMap.class);
???????System.out.println("接受到的數(shù)據(jù)"? ?map);
???????clients.forEach(s?->?{
???????????try?{
???????????????System.out.println("發(fā)送消息給:?"? ?session.getId());
???????????????s.sendMessage(new?TextMessage("服務(wù)器返回收到的信息,"? ?payload));
???????????}?catch?(Exception?e)?{
???????????????e.printStackTrace();
???????????}
???????});
???}
}
從這個(gè)demo中,使用spring websocket實(shí)現(xiàn)ws服務(wù)的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實(shí)現(xiàn)ws服務(wù)。因此我的應(yīng)用服務(wù)架構(gòu)是這樣子的:一個(gè)應(yīng)用既負(fù)責(zé)restful服務(wù),也負(fù)責(zé)ws服務(wù)。沒有將ws服務(wù)模塊拆分是因?yàn)椴鸱殖鋈ヒ褂胒eign來進(jìn)行服務(wù)調(diào)用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務(wù)間的io調(diào)用,所以就沒有這么做了。-? ? ?從zuul開始技術(shù)轉(zhuǎn)型? ? -要實(shí)現(xiàn)websocket集群,我們必不可免地得從zuul轉(zhuǎn)型到spring cloud gateway。原因如下:zuul1.0版本不支持websocket轉(zhuǎn)發(fā),zuul 2.0開始支持websocket,zuul2.0幾個(gè)月前開源了,但是2.0版本沒有被spring boot集成,而且文檔不健全。因此轉(zhuǎn)型是必須的,同時(shí)轉(zhuǎn)型也很容易實(shí)現(xiàn)。在gateway中,為了實(shí)現(xiàn)ssl認(rèn)證和動(dòng)態(tài)路由負(fù)載均衡,yml文件中以下的某些配置是必須的,在這里提前避免大家采坑
server:
??port:?443
??ssl:
????enabled:?true
????key-store:?classpath:xxx.jks
????key-store-password:?xxxx
????key-store-type:?JKS
????key-alias:?alias
spring:
??application:
????name:?api-gateway
??cloud:
????gateway:
??????httpclient:
????????ssl:
??????????handshake-timeout-millis:?10000
??????????close-notify-flush-timeout-millis:?3000
??????????close-notify-read-timeout-millis:?0
??????????useInsecureTrustManager:?true
??????discovery:
????????locator:
??????????enabled:?true
??????????lower-case-service-id:?true
??????routes:
??????-?id:?dc
????????uri:?lb://dc
????????predicates:
????????-?Path=/dc/**
??????-?id:?wecheck
????????uri:?lb://wecheck
????????predicates:
????????-?Path=/wecheck/**
如果要愉快地玩https卸載,我們還需要配置一個(gè)filter,否則請求網(wǎng)關(guān)時(shí)會出現(xiàn)錯(cuò)誤not an SSL/TLS record@Component
public?class?HttpsToHttpFilter?implements?GlobalFilter,?Ordered?{
??private?static?final?int?HTTPS_TO_HTTP_FILTER_ORDER?=?10099;
??@Override
??public?Mono?filter(ServerWebExchange?exchange,?GatewayFilterChain?chain)? {
??????URI?originalUri?=?exchange.getRequest().getURI();
??????ServerHttpRequest?request?=?exchange.getRequest();
??????ServerHttpRequest.Builder?mutate?=?request.mutate();
??????String?forwardedUri?=?request.getURI().toString();
??????if?(forwardedUri?!=?null?