WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。通常web上面都是用的http协议,客户端和只能发送了请求之后服务器才会响应消息回来,用websocket可以实现服务器主动推送消息到浏览器。最近用websocket做了个聊天室的demo,这算是websocket最常见的应用demo了吧。后来想到弄一个集群版的websocket聊天室,问题就是要解决多个websocket之间通信的问题。

    首先我想到了用redis作为中间存储,所有的websocket都序列化存储在redis中,各个服务器通过redis共享websocketsession,达到消息共享的目的。but,但是,websocketsession是无法被序列化的,websocketsession存储有连接的状态等东西,即使被序列化了取出来未必也用得了。so,这个方法显然不可行。

    网上找了一下很多答案说使用消息队列来解决,什么ActiveMQ,kafka,这些我都不熟![](""{#ZC_BLOG_HOST#}zb_users/emotion/default/roll.png""/)。redis里也有类似的消息队列功能,所以用redis来实现。

201807281532783857582371.jpg

图示说明:

        在图中,redis充当一个转发服务器的作用,每一台websocket服务器都订阅redis上相同的一个主题,每台websocket服务器都把接收到的消息发布到redis上,这样每台websocket服务器都能收到消息了。服务器发布的订阅消息中定义好发布消息者、接收消息者、聊天室id等信息。websocket session保存在各自服务器的变量中,在服务器里接收到订阅消息后,判断消息的接收者是否在本机里,若存在则取出相应的session将消息发送出去。

服务端代码:

后台我用的是ssm框架

public class ChatHandler implements WebSocketHandler {

private final Logger logger = LoggerFactory.getLogger(ChatHandler.class);

// 存放在线用户列表

public static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketSession>> onlineUsers = new ConcurrentHashMap<>();

// 自动回复机器人id

private static String ROBOT = "robot";

// 存放用户的会话信息

@Autowired

private ChatroomDAO chatroomDAO;

@Autowired

private MessageDAO messageDAO;

//消息计数,防止用户发送消息频率过高

private static volatile ConcurrentHashMap<String ,Long> counter=new ConcurrentHashMap<>();

/\*\*\*

 \* 建立连接调用

 \*/

@Override

public void afterConnectionEstablished(WebSocketSession session) throws Exception {

// TODO Auto-generated method stub

Client client = (Client) session.getHandshakeAttributes().get("client");

logger.info("websocket 客户端连接成功,会话信息:" + JSONObject.toJSONString(client));

if (null != onlineUsers.get(client.getChatroomId())) {

if (null != onlineUsers.get(client.getChatroomId()).get(client.getUser().getUserId())) {

logger.debug("连接失败!客户端重复连接!");

return;

}

}

if (null != client) {

synchronized (this) {

if (null == onlineUsers.get(client.getChatroomId())) {

onlineUsers.put(client.getChatroomId(), new ConcurrentHashMap<String, WebSocketSession>());

}

onlineUsers.get(client.getChatroomId()).put(client.getUser().getUserId(), session);

chatroomDAO.addMermber(client);

}

}

// 加入聊天机器人

if (null == onlineUsers.get(client.getChatroomId()).get(ROBOT)) {

onlineUsers.get(client.getChatroomId()).put(ROBOT, new WebSocketSession() {

@Override

public void sendMessage(WebSocketMessage<?> message) throws IOException {

// TODO Auto-generated method stub

}

@Override

public boolean isOpen() {

// TODO Auto-generated method stub

return false;

}

@Override

public URI getUri() {

// TODO Auto-generated method stub

return null;

}

@Override

public InetSocketAddress getRemoteAddress() {

// TODO Auto-generated method stub

return null;

}

@Override

public Principal getPrincipal() {

// TODO Auto-generated method stub

return null;

}

@Override

public InetSocketAddress getLocalAddress() {

// TODO Auto-generated method stub

return null;

}

@Override

public String getId() {

// TODO Auto-generated method stub

return null;

}

@Override

public HttpHeaders getHandshakeHeaders() {

// TODO Auto-generated method stub

return null;

}

@Override

public Map<String, Object> getHandshakeAttributes() {

// TODO Auto-generated method stub

return null;

}

@Override

public List<WebSocketExtension> getExtensions() {

// TODO Auto-generated method stub

return null;

}

@Override

public String getAcceptedProtocol() {

// TODO Auto-generated method stub

return null;

}

@Override

public void close(CloseStatus status) throws IOException {

// TODO Auto-generated method stub

}

@Override

public void close() throws IOException {

// TODO Auto-generated method stub

}

});

User robotUser = new User();

robotUser.setNickname("聊天机器人");

robotUser.setUserId(ROBOT);

Client robotClient = new Client();

robotClient.setChatroomId(client.getChatroomId());

robotClient.setSessionId(ROBOT);

robotClient.setUser(robotUser);

chatroomDAO.addMermber(robotClient);

}

String content = client.getUser().getNickname() + "加入聊天室";

List<JSONObject> clients = getOnlineUsers(client.getChatroomId());

// 发送系统消息

sendSystemMessage(client.getChatroomId(), content, clients);

}

/\*\*

 \* 接收到消息时调用

 \*/

@Override

public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {

Client client = (Client) session.getHandshakeAttributes().get("client");

logger.debug("----接收到客户端消息:" + message.getPayload());

CommonMessage commonMessage = JSON.parseObject((String) message.getPayload(), CommonMessage.class);

if(null!=counter.get(commonMessage.getFromUserId())){

if((new Date().getTime()-counter.get(commonMessage.getFromUserId()))<1500){

logger.debug("消息发送频率过高,IP="+client.getIp());

SystemMessage systemMessage=new SystemMessage();

systemMessage.setContent("消息发送频率过高");

systemMessage.setCreateTime(new Date().getTime());

systemMessage.setMsgId(MessageType.SYS\_MSG);

systemMessage.setMsgId(WebsocketUtil.getUUID());

sendMessageToUser(client.getChatroomId(), commonMessage.getFromUserId(), new TextMessage(JSONObject.toJSONString(systemMessage)));

return;

}

// 发布订阅消息

JSONObject msg = new JSONObject();

msg.put("chatroomId", client.getChatroomId());

msg.put("msgType", commonMessage.getMsgType());

msg.put("message", commonMessage);

chatroomDAO.publish(MessageConstant.CHANNEL, msg.toJSONString());

String msgType = commonMessage.getMsgType();

// 保存公共聊天信息

if (MessageType.PUB\_MSG.equals(msgType)) {

messageDAO.savePubMessage(client.getChatroomId(), commonMessage);

} else if (MessageType.PRI\_MSG.equals(msgType)) {

// 保存用户私聊消息

messageDAO.savePriMessage(commonMessage);

}

counter.put(commonMessage.getFromUserId(), new Date().getTime());

}

/\*\*

 \* 订阅消息监听

 \* 

 \* @param message

 \*/

public void handleMessage(String message) {

logger.debug("监听到订阅消息:" + message);

JSONObject json = JSONObject.parseObject(message);

String msgType = json.getString("msgType");

String chatroomId = json.getString("chatroomId");

String msg = json.getString("message");

logger.debug("消息类型:" + msgType);

TextMessage textMessage = new TextMessage(msg);

if (MessageType.SYS\_MSG.equals(msgType)) {

broadcast(chatroomId, textMessage);

} else if (MessageType.PUB\_MSG.equals(msgType)) {

broadcast(chatroomId, textMessage);

} else if (MessageType.PRI\_MSG.equals(msgType)) {

JSONObject msgJson = JSONObject.parseObject(msg);

String toUserId = msgJson.getString("toUserId");

String fromUserId = msgJson.getString("fromUserId");

// 若消息接收者是机器人,那么调用自动回复

if (toUserId.equals(ROBOT)) {

String robotMsg = Tuling.TulingRobot(msg);

JSONObject robotJson = JSONObject.parseObject(robotMsg);

String respMsg = robotJson.getString("text");

logger.debug("autoMSG:"+respMsg);

CommonMessage commonMessage=new CommonMessage();

commonMessage.setContent(respMsg);

commonMessage.setCreateTime(new Date().getTime());

commonMessage.setFromUserId(ROBOT);

commonMessage.setToUserId(fromUserId);

commonMessage.setMsgType(msgType);

commonMessage.setMsgId(WebsocketUtil.getUUID());

commonMessage.setNickname("聊天机器人");

textMessage = new TextMessage(JSONObject.toJSONString(commonMessage));

if (null != onlineUsers.get(chatroomId)) {

if (null != onlineUsers.get(chatroomId).get(fromUserId)) {

sendMessageToUser(chatroomId, fromUserId, textMessage);

}

}

}else{

if (null != onlineUsers.get(chatroomId)) {

if (null != onlineUsers.get(chatroomId).get(toUserId)) {

sendMessageToUser(chatroomId, toUserId, textMessage);

}

}

}

}

}

/\*\*

 \* 连接出错触发

 \*/

@Override

public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {

Client client = (Client) session.getHandshakeAttributes().get("client");

logger.debug(client.getUser().getNickname() + "连接出错");

synchronized (this) {

// 将用户从在线列表移除

onlineUsers.get(client.getChatroomId()).remove(client.getUser().getUserId());

boolean res = chatroomDAO.delMermber(client.getChatroomId(), client.getUser().getUserId());

if (res) {

logger.info("删除成功");

} else {

logger.info("删除失败");

}

List<JSONObject> clients = getOnlineUsers(client.getChatroomId());

// 发送系统消息

sendSystemMessage(client.getChatroomId(), client.getUser().getNickname() + "离开了聊天室", clients);

// 关闭连接

if (session.isOpen()) {

session.close();

}

}

}

/\*\*

 \* 连接断开触发

 \*/

@Override

public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {

Client client = (Client) session.getHandshakeAttributes().get("client");

logger.debug(client.getUser().getNickname() + "连接断开");

synchronized (this) {

// 将用户从在线列表移除

onlineUsers.get(client.getChatroomId()).remove(client.getUser().getUserId());

boolean res = chatroomDAO.delMermber(client.getChatroomId(), client.getUser().getUserId());

if (res) {

logger.info("删除成功");

} else {

logger.info("删除失败");

}

List<JSONObject> clients = getOnlineUsers(client.getChatroomId());

// 发送系统消息

sendSystemMessage(client.getChatroomId(), client.getUser().getNickname() + "离开了聊天室", clients);

// 关闭连接

if (session.isOpen()) {

session.close();

}

}

}

@Override

public boolean supportsPartialMessages() {

// TODO Auto-generated method stub

return false;

}

/\*\*

 \* 广播消息

 \*/

public boolean broadcast(String chatroomId, TextMessage textMessage) {

ConcurrentHashMap<String, WebSocketSession> olUsers = onlineUsers.get(chatroomId);

for (WebSocketSession webSocketSession : olUsers.values()) {

try {

if (webSocketSession.isOpen()) {

webSocketSession.sendMessage(textMessage);

}

} catch (IOException e) {

e.printStackTrace();

return false;

}

}

return true;

}

/\*\*

 \* 发送消息给指定用户

 \*/

public boolean sendMessageToUser(String chatroomId, String userId, TextMessage message) {

logger.debug("开始发送私聊消息to:" + userId);

WebSocketSession session = onlineUsers.get(chatroomId).get(userId);

if (session == null)

return false;

if (!session.isOpen())

return false;

try {

session.sendMessage(message);

} catch (IOException e) {

e.printStackTrace();

return false;

}

return true;

}

/\*\*

 \* 获取在线用户

 \* 

 \* @return

 \*/

private List<JSONObject> getOnlineUsers(String chatroomId) {

List<JSONObject> list = new ArrayList<>();

Map<String, Client> clients = chatroomDAO.getOnlineMermbers(chatroomId);

for (Client client : clients.values()) {

JSONObject jsonObject = new JSONObject();

jsonObject.put("userId", client.getUser().getUserId());

jsonObject.put("nickname", client.getUser().getNickname());

list.add(jsonObject);

}

return list;

}

/\*\*

 \* 发送系统提示

 \* 

 \* @throws Exception

 \*/

public void sendSystemMessage(String chatroomId, String content, Object args) throws Exception {

SystemMessage systemMessage = new SystemMessage();

systemMessage.setMsgType(MessageType.SYS\_MSG);

systemMessage.setMsgId(WebsocketUtil.getUUID());

systemMessage.setCreateTime(new Date().getTime());

systemMessage.setContent(content);

systemMessage.setArgs(args);

JSONObject message = new JSONObject();

message.put("chatroomId", chatroomId);

message.put("msgType", MessageType.SYS\_MSG);

message.put("message", systemMessage);

// 发布订阅消息

chatroomDAO.publish(MessageConstant.CHANNEL, message.toJSONString());

}

/\*\*

 \* 封装发来的消息

 \*/

public CommonMessage buildMessage(JSONObject json) {

CommonMessage message = new CommonMessage();

message.setMsgId(WebsocketUtil.getUUID());

message.setNickname(json.getString("nickname"));

message.setFromUserId(json.getString("fromUserId"));

message.setToUserId(json.getString("toUserId"));

message.setCreateTime(Long.parseLong(json.getString("createTime")));

message.setContent(json.getString("content"));

message.setMsgType(json.getString("msgType"));

return message;

}

}

客户端代码:

$(function() {

var chatroomId="chatroom1";

var toUserId="";

var msgType="0";

var pageNo=1;

var ws;

if (!window.WebSocket) {

alert("WebSocket not supported by this browser!");

return;

}else{

startWebsoket();

}

function startWebsoket() {

ws = new WebSocket("ws://"+window.location.host+"/chat/chat?chatroomId="+chatroomId);

//监听消息  

ws.onmessage = function(event) {

var data=JSON.parse(event.data);

console.log("接收到消息:",data);

var msgType=data.msgType;

var userList=data.args;

if(msgType=="0"){//群消息

showMessage(data);

}else if(msgType=="1"){//私聊

showMessage(data);

}else{//系统消息

showTips(data);

if(userList){

update(userList);

}

}

$(".main-wrap").scrollTop($(".main").height());

};

// 打开WebSocket   

ws.onclose = function(event) {

//alert("连接已关闭");

console.log("连接已关闭");

ws=null;

};

// 打开WebSocket  

ws.onopen = function(event) {

//加载聊天记录

pageNo=1;

loadPubHistory(pageNo);

console.log("连接成功");

};

ws.onerror = function(event) {

//WebSocket Status:: Error was reported  

alert("连接发生错误");

};

}

/\*\*

 \* 单击事件

 \*/

$(".send").click(function(){

var user=JSON.parse(sessionStorage.getItem("user"));

console.log("user:",sessionStorage.getItem("user"));

if(!$(".message-box textarea").val()){

alert("消息不能为空");

return;

}

textMessage.fromUserId=user.userId;

textMessage.toUserId=toUserId;

textMessage.msgType=msgType;

textMessage.nickname=user.nickname;

textMessage.createTime=new Date().getTime();

textMessage.content=$(".message-box textarea").val();

$(".message-box textarea").val("");

if(msgType=='1'){

html='<div class="message-wrap clearfix"><div class="message pull-right"><div><span class="time">'+formatDate(new Date(textMessage.createTime))+'</span><span class="username">'+user.nickname+'</span></div><div class="message-content pull-right">'+textMessage.content+'</div></div></div>';

$(".main").append(html);

}

console.log("发送的消息:",textMessage);

sendMsg(textMessage);

});

var textMessage={

toUserId:"",

fromUserId:"",

nickname:"",

createTime:new Date().getTime(),

content:"",

msgType:""

}

function sendMsg(textMessage) {

if(ws){

ws.send(JSON.stringify(textMessage));

}else{

alert("连接已关闭,请刷新页面重新连接");

}

}

/\*\*

 \* 显示系统提示

 \*/

function showTips(data){

var html='<div class="tips"><div class="tips-time">'+formatDate(new Date(data.createTime))+'</div><div class="tips-content">'+data.content+'</div></div>';

$(".main").append(html);

}

/\*\*

 \* 显示对话消息

 \*/

function showMessage(data){

var user=JSON.parse(sessionStorage.getItem("user"));

var html="";

if(data.fromUserId==user.userId){

html='<div class="message-wrap clearfix"><div class="message pull-right"><div><span class="time">'+formatDate(new Date(data.createTime))+'</span><span class="username">'+user.nickname+'</span></div><div class="message-content pull-right">'+data.content+'</div></div></div>';

}else{

html='<div class="message-wrap clearfix"><div class="message pull-left"><div><span class="username">'+data.nickname+'</span><span class="time">'+formatDate(new Date(data.createTime))+'</span></div><div class="message-content pull-left">'+data.content+'</div></div></div>';

}

if(html!=""){

$(".main").append(html);

}

}

/\*\*

 \* 更新用户列表

 \*/

function update(list){

console.log("开始更新在线列表");

var html="";

for(var i=0;i<list.length;i++){

html+='<li class="online-user"><a href="javascript:"><span class="fa fa-user" data-username="'+list\[i\].nickname+'" data-userid="'+list\[i\].userId+'">'+list\[i\].nickname+'</span></a></li>';

}

//删除所有dd节点

$(".list-online").find("dd").remove();

$(".list-online").html(html);

console.log("在线列表更新完成");

//点击私聊

$(".online-user").click(function() {

var toUserName=$(this).find(".fa-user").attr("data-username");

toUserId=$(this).find(".fa-user").attr("data-userid");

$(".main-content h2").html("与【"+toUserName+"】聊天");

console.log(toUserId);

msgType="1";

$(this).addClass("active");

$(this).siblings().removeClass("active");

$(".main").html("");

});

}

//选择聊天室

$(".chatroom-id").click(function() {

$(this).addClass("active");

$(this).siblings().removeClass("active");

$(".online-user").removeClass("active");

msgType="0";

var sTitle=$(this).find("span").html();

chatroomId=$(this).find("span").attr("data-chatroom");

$(".main-content h2").html(sTitle);

console.log("chatroomId:",chatroomId);

$(".main").html("");

//重新连接客户端

ws.close();

startWebsoket();

});

/\*\*

 \* 格式化时间

 \*/

function formatDate(now) {

var year = now.getFullYear(),

month = now.getMonth() + 1,

date = now.getDate(),

hour = now.getHours(),

minute = now.getMinutes();

if(minute<10){

minute="0"+minute;

}

second = now.getSeconds();

if(second<10){

second="0"+second;

}

 

return year + "-" + month + "-" + date + " " + hour + ":" + minute + ":" + second;

}

function loadPubHistory(pageNo){

$.ajax({

url:"message/getPubHistory",

dataType:'json',

data:{

page:pageNo,

size:5,

chatroom:chatroomId

},

success:function(data){

console.log("聊天记录:",data);

if(data.type=="success"){

var html="";

for(var i=data.args.length-1;i>=0;i--){

html+='<div class="message-wrap clearfix"><div class="message pull-left"><div><span class="username">'+data.args\[i\].nickname+'</span><span class="time">'+formatDate(new Date(data.args\[i\].createTime))+'</span></div><div class="message-content pull-left">'+data.args\[i\].content+'</div></div></div>';

}

if(pageNo==1){

$(".main").html(html);

}else{

$(".main").prepend(html);

}

}else{

console.log(data.content);

}

}});

}

$(".tab-user").click(function() {

$(".list-online").show();

$(".list-chatroom").hide();

$(this).addClass("active");

$(this).siblings().removeClass("active");

});

$(".tab-chatroom").click(function() {

$(".list-online").hide();

$(".list-chatroom").show();

$(this).addClass("active");

$(this).siblings().removeClass("active");

});

//滚动条加载下一页聊天记录

$(".main-wrap").scroll(function() {

var nScrollTop = $(this).scrollTop();

if(nScrollTop<=5){

pageNo++;

console.log("page",pageNo);

if(msgType=='0'){

loadPubHistory(pageNo);

}else {

//TODO

console.log("加载私聊聊天记录");

}

}

});

});

spring 配置:

spring-redis.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xmlns:util="http://www.springframework.org/schema/util" xmlns:redis="http://www.springframework.org/schema/redis"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.0.xsd

http://www.springframework.org/schema/jee

http://www.springframework.org/schema/jee/spring-jee-4.0.xsd

http://www.springframework.org/schema/aop

http://www.springframework.org/schema/aop/spring-aop-4.0.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-4.0.xsd

http://www.springframework.org/schema/util

                        http://www.springframework.org/schema/util/spring-util-4.0.xsd

                        http://www.springframework.org/schema/redis

        http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"

default-lazy-init="false">

<context:component-scan base-package="com.mjy.chat.redis.dao" />

<context:property-placeholder location="classpath:redis.properties" />

<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">

<property name="maxIdle" value="${redis.maxIdle}" />

<property name="maxActive" value="${redis.maxActive}" />

<property name="maxWait" value="${redis.maxWait}" />

<property name="testOnBorrow" value="${redis.testOnBorrow}" />

</bean>

<bean id="connectionFactory"

class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"

p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}"

p:pool-config-ref="poolConfig" />

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">

<property name="connectionFactory" ref="connectionFactory" />

<!-- 使用string主要是key 在redis端用命令好读 不然默认的序列化没办法读 -->

<property name="keySerializer">

<bean

class="org.springframework.data.redis.serializer.StringRedisSerializer" />

</property>

<property name="hashKeySerializer">

<bean

class="org.springframework.data.redis.serializer.StringRedisSerializer" />

</property>

<property name="valueSerializer">

<bean

class="org.springframework.data.redis.serializer.StringRedisSerializer" />

</property>

<property name="hashValueSerializer">

<bean

class="org.springframework.data.redis.serializer.StringRedisSerializer" />

</property>

</bean>

  <!-- redis发布订阅配置 -->

   <bean id="serialization" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />  

<!-- 消息监听适配器  delegate属性指定真正的目标处理器-->  

    <bean id="smsMessageListener"

        class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">

        <property name="delegate" ref="chatHandler" />

       <!--  <property name="serializer" ref="serialization" />   -->

    </bean>

    <bean id="chatHandler" class="com.mjy.chat.websocket.ChatHandler"/>