WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。通常web上面都是用的http协议,客户端和只能发送了请求之后服务器才会响应消息回来,用websocket可以实现服务器主动推送消息到浏览器。最近用websocket做了个聊天室的demo,这算是websocket最常见的应用demo了吧。后来想到弄一个集群版的websocket聊天室,问题就是要解决多个websocket之间通信的问题。
首先我想到了用redis作为中间存储,所有的websocket都序列化存储在redis中,各个服务器通过redis共享websocketsession,达到消息共享的目的。but,但是,websocketsession是无法被序列化的,websocketsession存储有连接的状态等东西,即使被序列化了取出来未必也用得了。so,这个方法显然不可行。
网上找了一下很多答案说使用消息队列来解决,什么ActiveMQ,kafka,这些我都不熟。redis里也有类似的消息队列功能,所以用redis来实现。
图示说明:
在图中,redis充当一个转发服务器的作用,每一台websocket服务器都订阅redis上相同的一个主题,每台websocket服务器都把接收到的消息发布到redis上,这样每台websocket服务器都能收到消息了。服务器发布的订阅消息中定义好发布消息者、接收消息者、聊天室id等信息。websocket session保存在各自服务器的变量中,在服务器里接收到订阅消息后,判断消息的接收者是否在本机里,若存在则取出相应的session将消息发送出去。
服务端代码:
后台我用的是ssm框架
public class ChatHandler implements WebSocketHandler {
private final Logger logger = LoggerFactory.getLogger(ChatHandler.class);
// 存放在线用户列表
public static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketSession>> onlineUsers = new ConcurrentHashMap<>();
// 自动回复机器人id
private static String ROBOT = "robot";
// 存放用户的会话信息
@Autowired
private ChatroomDAO chatroomDAO;
@Autowired
private MessageDAO messageDAO;
//消息计数,防止用户发送消息频率过高
private static volatile ConcurrentHashMap<String ,Long> counter=new ConcurrentHashMap<>();
/\*\*\*
\* 建立连接调用
\*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// TODO Auto-generated method stub
Client client = (Client) session.getHandshakeAttributes().get("client");
logger.info("websocket 客户端连接成功,会话信息:" + JSONObject.toJSONString(client));
if (null != onlineUsers.get(client.getChatroomId())) {
if (null != onlineUsers.get(client.getChatroomId()).get(client.getUser().getUserId())) {
logger.debug("连接失败!客户端重复连接!");
return;
}
}
if (null != client) {
synchronized (this) {
if (null == onlineUsers.get(client.getChatroomId())) {
onlineUsers.put(client.getChatroomId(), new ConcurrentHashMap<String, WebSocketSession>());
}
onlineUsers.get(client.getChatroomId()).put(client.getUser().getUserId(), session);
chatroomDAO.addMermber(client);
}
}
// 加入聊天机器人
if (null == onlineUsers.get(client.getChatroomId()).get(ROBOT)) {
onlineUsers.get(client.getChatroomId()).put(ROBOT, new WebSocketSession() {
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException {
// TODO Auto-generated method stub
}
@Override
public boolean isOpen() {
// TODO Auto-generated method stub
return false;
}
@Override
public URI getUri() {
// TODO Auto-generated method stub
return null;
}
@Override
public InetSocketAddress getRemoteAddress() {
// TODO Auto-generated method stub
return null;
}
@Override
public Principal getPrincipal() {
// TODO Auto-generated method stub
return null;
}
@Override
public InetSocketAddress getLocalAddress() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getId() {
// TODO Auto-generated method stub
return null;
}
@Override
public HttpHeaders getHandshakeHeaders() {
// TODO Auto-generated method stub
return null;
}
@Override
public Map<String, Object> getHandshakeAttributes() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<WebSocketExtension> getExtensions() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getAcceptedProtocol() {
// TODO Auto-generated method stub
return null;
}
@Override
public void close(CloseStatus status) throws IOException {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
});
User robotUser = new User();
robotUser.setNickname("聊天机器人");
robotUser.setUserId(ROBOT);
Client robotClient = new Client();
robotClient.setChatroomId(client.getChatroomId());
robotClient.setSessionId(ROBOT);
robotClient.setUser(robotUser);
chatroomDAO.addMermber(robotClient);
}
String content = client.getUser().getNickname() + "加入聊天室";
List<JSONObject> clients = getOnlineUsers(client.getChatroomId());
// 发送系统消息
sendSystemMessage(client.getChatroomId(), content, clients);
}
/\*\*
\* 接收到消息时调用
\*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
Client client = (Client) session.getHandshakeAttributes().get("client");
logger.debug("----接收到客户端消息:" + message.getPayload());
CommonMessage commonMessage = JSON.parseObject((String) message.getPayload(), CommonMessage.class);
if(null!=counter.get(commonMessage.getFromUserId())){
if((new Date().getTime()-counter.get(commonMessage.getFromUserId()))<1500){
logger.debug("消息发送频率过高,IP="+client.getIp());
SystemMessage systemMessage=new SystemMessage();
systemMessage.setContent("消息发送频率过高");
systemMessage.setCreateTime(new Date().getTime());
systemMessage.setMsgId(MessageType.SYS\_MSG);
systemMessage.setMsgId(WebsocketUtil.getUUID());
sendMessageToUser(client.getChatroomId(), commonMessage.getFromUserId(), new TextMessage(JSONObject.toJSONString(systemMessage)));
return;
}
// 发布订阅消息
JSONObject msg = new JSONObject();
msg.put("chatroomId", client.getChatroomId());
msg.put("msgType", commonMessage.getMsgType());
msg.put("message", commonMessage);
chatroomDAO.publish(MessageConstant.CHANNEL, msg.toJSONString());
String msgType = commonMessage.getMsgType();
// 保存公共聊天信息
if (MessageType.PUB\_MSG.equals(msgType)) {
messageDAO.savePubMessage(client.getChatroomId(), commonMessage);
} else if (MessageType.PRI\_MSG.equals(msgType)) {
// 保存用户私聊消息
messageDAO.savePriMessage(commonMessage);
}
counter.put(commonMessage.getFromUserId(), new Date().getTime());
}
/\*\*
\* 订阅消息监听
\*
\* @param message
\*/
public void handleMessage(String message) {
logger.debug("监听到订阅消息:" + message);
JSONObject json = JSONObject.parseObject(message);
String msgType = json.getString("msgType");
String chatroomId = json.getString("chatroomId");
String msg = json.getString("message");
logger.debug("消息类型:" + msgType);
TextMessage textMessage = new TextMessage(msg);
if (MessageType.SYS\_MSG.equals(msgType)) {
broadcast(chatroomId, textMessage);
} else if (MessageType.PUB\_MSG.equals(msgType)) {
broadcast(chatroomId, textMessage);
} else if (MessageType.PRI\_MSG.equals(msgType)) {
JSONObject msgJson = JSONObject.parseObject(msg);
String toUserId = msgJson.getString("toUserId");
String fromUserId = msgJson.getString("fromUserId");
// 若消息接收者是机器人,那么调用自动回复
if (toUserId.equals(ROBOT)) {
String robotMsg = Tuling.TulingRobot(msg);
JSONObject robotJson = JSONObject.parseObject(robotMsg);
String respMsg = robotJson.getString("text");
logger.debug("autoMSG:"+respMsg);
CommonMessage commonMessage=new CommonMessage();
commonMessage.setContent(respMsg);
commonMessage.setCreateTime(new Date().getTime());
commonMessage.setFromUserId(ROBOT);
commonMessage.setToUserId(fromUserId);
commonMessage.setMsgType(msgType);
commonMessage.setMsgId(WebsocketUtil.getUUID());
commonMessage.setNickname("聊天机器人");
textMessage = new TextMessage(JSONObject.toJSONString(commonMessage));
if (null != onlineUsers.get(chatroomId)) {
if (null != onlineUsers.get(chatroomId).get(fromUserId)) {
sendMessageToUser(chatroomId, fromUserId, textMessage);
}
}
}else{
if (null != onlineUsers.get(chatroomId)) {
if (null != onlineUsers.get(chatroomId).get(toUserId)) {
sendMessageToUser(chatroomId, toUserId, textMessage);
}
}
}
}
}
/\*\*
\* 连接出错触发
\*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
Client client = (Client) session.getHandshakeAttributes().get("client");
logger.debug(client.getUser().getNickname() + "连接出错");
synchronized (this) {
// 将用户从在线列表移除
onlineUsers.get(client.getChatroomId()).remove(client.getUser().getUserId());
boolean res = chatroomDAO.delMermber(client.getChatroomId(), client.getUser().getUserId());
if (res) {
logger.info("删除成功");
} else {
logger.info("删除失败");
}
List<JSONObject> clients = getOnlineUsers(client.getChatroomId());
// 发送系统消息
sendSystemMessage(client.getChatroomId(), client.getUser().getNickname() + "离开了聊天室", clients);
// 关闭连接
if (session.isOpen()) {
session.close();
}
}
}
/\*\*
\* 连接断开触发
\*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
Client client = (Client) session.getHandshakeAttributes().get("client");
logger.debug(client.getUser().getNickname() + "连接断开");
synchronized (this) {
// 将用户从在线列表移除
onlineUsers.get(client.getChatroomId()).remove(client.getUser().getUserId());
boolean res = chatroomDAO.delMermber(client.getChatroomId(), client.getUser().getUserId());
if (res) {
logger.info("删除成功");
} else {
logger.info("删除失败");
}
List<JSONObject> clients = getOnlineUsers(client.getChatroomId());
// 发送系统消息
sendSystemMessage(client.getChatroomId(), client.getUser().getNickname() + "离开了聊天室", clients);
// 关闭连接
if (session.isOpen()) {
session.close();
}
}
}
@Override
public boolean supportsPartialMessages() {
// TODO Auto-generated method stub
return false;
}
/\*\*
\* 广播消息
\*/
public boolean broadcast(String chatroomId, TextMessage textMessage) {
ConcurrentHashMap<String, WebSocketSession> olUsers = onlineUsers.get(chatroomId);
for (WebSocketSession webSocketSession : olUsers.values()) {
try {
if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(textMessage);
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
return true;
}
/\*\*
\* 发送消息给指定用户
\*/
public boolean sendMessageToUser(String chatroomId, String userId, TextMessage message) {
logger.debug("开始发送私聊消息to:" + userId);
WebSocketSession session = onlineUsers.get(chatroomId).get(userId);
if (session == null)
return false;
if (!session.isOpen())
return false;
try {
session.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/\*\*
\* 获取在线用户
\*
\* @return
\*/
private List<JSONObject> getOnlineUsers(String chatroomId) {
List<JSONObject> list = new ArrayList<>();
Map<String, Client> clients = chatroomDAO.getOnlineMermbers(chatroomId);
for (Client client : clients.values()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId", client.getUser().getUserId());
jsonObject.put("nickname", client.getUser().getNickname());
list.add(jsonObject);
}
return list;
}
/\*\*
\* 发送系统提示
\*
\* @throws Exception
\*/
public void sendSystemMessage(String chatroomId, String content, Object args) throws Exception {
SystemMessage systemMessage = new SystemMessage();
systemMessage.setMsgType(MessageType.SYS\_MSG);
systemMessage.setMsgId(WebsocketUtil.getUUID());
systemMessage.setCreateTime(new Date().getTime());
systemMessage.setContent(content);
systemMessage.setArgs(args);
JSONObject message = new JSONObject();
message.put("chatroomId", chatroomId);
message.put("msgType", MessageType.SYS\_MSG);
message.put("message", systemMessage);
// 发布订阅消息
chatroomDAO.publish(MessageConstant.CHANNEL, message.toJSONString());
}
/\*\*
\* 封装发来的消息
\*/
public CommonMessage buildMessage(JSONObject json) {
CommonMessage message = new CommonMessage();
message.setMsgId(WebsocketUtil.getUUID());
message.setNickname(json.getString("nickname"));
message.setFromUserId(json.getString("fromUserId"));
message.setToUserId(json.getString("toUserId"));
message.setCreateTime(Long.parseLong(json.getString("createTime")));
message.setContent(json.getString("content"));
message.setMsgType(json.getString("msgType"));
return message;
}
}
客户端代码:
$(function() {
var chatroomId="chatroom1";
var toUserId="";
var msgType="0";
var pageNo=1;
var ws;
if (!window.WebSocket) {
alert("WebSocket not supported by this browser!");
return;
}else{
startWebsoket();
}
function startWebsoket() {
ws = new WebSocket("ws://"+window.location.host+"/chat/chat?chatroomId="+chatroomId);
//监听消息
ws.onmessage = function(event) {
var data=JSON.parse(event.data);
console.log("接收到消息:",data);
var msgType=data.msgType;
var userList=data.args;
if(msgType=="0"){//群消息
showMessage(data);
}else if(msgType=="1"){//私聊
showMessage(data);
}else{//系统消息
showTips(data);
if(userList){
update(userList);
}
}
$(".main-wrap").scrollTop($(".main").height());
};
// 打开WebSocket
ws.onclose = function(event) {
//alert("连接已关闭");
console.log("连接已关闭");
ws=null;
};
// 打开WebSocket
ws.onopen = function(event) {
//加载聊天记录
pageNo=1;
loadPubHistory(pageNo);
console.log("连接成功");
};
ws.onerror = function(event) {
//WebSocket Status:: Error was reported
alert("连接发生错误");
};
}
/\*\*
\* 单击事件
\*/
$(".send").click(function(){
var user=JSON.parse(sessionStorage.getItem("user"));
console.log("user:",sessionStorage.getItem("user"));
if(!$(".message-box textarea").val()){
alert("消息不能为空");
return;
}
textMessage.fromUserId=user.userId;
textMessage.toUserId=toUserId;
textMessage.msgType=msgType;
textMessage.nickname=user.nickname;
textMessage.createTime=new Date().getTime();
textMessage.content=$(".message-box textarea").val();
$(".message-box textarea").val("");
if(msgType=='1'){
html='<div class="message-wrap clearfix"><div class="message pull-right"><div><span class="time">'+formatDate(new Date(textMessage.createTime))+'</span><span class="username">'+user.nickname+'</span></div><div class="message-content pull-right">'+textMessage.content+'</div></div></div>';
$(".main").append(html);
}
console.log("发送的消息:",textMessage);
sendMsg(textMessage);
});
var textMessage={
toUserId:"",
fromUserId:"",
nickname:"",
createTime:new Date().getTime(),
content:"",
msgType:""
}
function sendMsg(textMessage) {
if(ws){
ws.send(JSON.stringify(textMessage));
}else{
alert("连接已关闭,请刷新页面重新连接");
}
}
/\*\*
\* 显示系统提示
\*/
function showTips(data){
var html='<div class="tips"><div class="tips-time">'+formatDate(new Date(data.createTime))+'</div><div class="tips-content">'+data.content+'</div></div>';
$(".main").append(html);
}
/\*\*
\* 显示对话消息
\*/
function showMessage(data){
var user=JSON.parse(sessionStorage.getItem("user"));
var html="";
if(data.fromUserId==user.userId){
html='<div class="message-wrap clearfix"><div class="message pull-right"><div><span class="time">'+formatDate(new Date(data.createTime))+'</span><span class="username">'+user.nickname+'</span></div><div class="message-content pull-right">'+data.content+'</div></div></div>';
}else{
html='<div class="message-wrap clearfix"><div class="message pull-left"><div><span class="username">'+data.nickname+'</span><span class="time">'+formatDate(new Date(data.createTime))+'</span></div><div class="message-content pull-left">'+data.content+'</div></div></div>';
}
if(html!=""){
$(".main").append(html);
}
}
/\*\*
\* 更新用户列表
\*/
function update(list){
console.log("开始更新在线列表");
var html="";
for(var i=0;i<list.length;i++){
html+='<li class="online-user"><a href="javascript:"><span class="fa fa-user" data-username="'+list\[i\].nickname+'" data-userid="'+list\[i\].userId+'">'+list\[i\].nickname+'</span></a></li>';
}
//删除所有dd节点
$(".list-online").find("dd").remove();
$(".list-online").html(html);
console.log("在线列表更新完成");
//点击私聊
$(".online-user").click(function() {
var toUserName=$(this).find(".fa-user").attr("data-username");
toUserId=$(this).find(".fa-user").attr("data-userid");
$(".main-content h2").html("与【"+toUserName+"】聊天");
console.log(toUserId);
msgType="1";
$(this).addClass("active");
$(this).siblings().removeClass("active");
$(".main").html("");
});
}
//选择聊天室
$(".chatroom-id").click(function() {
$(this).addClass("active");
$(this).siblings().removeClass("active");
$(".online-user").removeClass("active");
msgType="0";
var sTitle=$(this).find("span").html();
chatroomId=$(this).find("span").attr("data-chatroom");
$(".main-content h2").html(sTitle);
console.log("chatroomId:",chatroomId);
$(".main").html("");
//重新连接客户端
ws.close();
startWebsoket();
});
/\*\*
\* 格式化时间
\*/
function formatDate(now) {
var year = now.getFullYear(),
month = now.getMonth() + 1,
date = now.getDate(),
hour = now.getHours(),
minute = now.getMinutes();
if(minute<10){
minute="0"+minute;
}
second = now.getSeconds();
if(second<10){
second="0"+second;
}
return year + "-" + month + "-" + date + " " + hour + ":" + minute + ":" + second;
}
function loadPubHistory(pageNo){
$.ajax({
url:"message/getPubHistory",
dataType:'json',
data:{
page:pageNo,
size:5,
chatroom:chatroomId
},
success:function(data){
console.log("聊天记录:",data);
if(data.type=="success"){
var html="";
for(var i=data.args.length-1;i>=0;i--){
html+='<div class="message-wrap clearfix"><div class="message pull-left"><div><span class="username">'+data.args\[i\].nickname+'</span><span class="time">'+formatDate(new Date(data.args\[i\].createTime))+'</span></div><div class="message-content pull-left">'+data.args\[i\].content+'</div></div></div>';
}
if(pageNo==1){
$(".main").html(html);
}else{
$(".main").prepend(html);
}
}else{
console.log(data.content);
}
}});
}
$(".tab-user").click(function() {
$(".list-online").show();
$(".list-chatroom").hide();
$(this).addClass("active");
$(this).siblings().removeClass("active");
});
$(".tab-chatroom").click(function() {
$(".list-online").hide();
$(".list-chatroom").show();
$(this).addClass("active");
$(this).siblings().removeClass("active");
});
//滚动条加载下一页聊天记录
$(".main-wrap").scroll(function() {
var nScrollTop = $(this).scrollTop();
if(nScrollTop<=5){
pageNo++;
console.log("page",pageNo);
if(msgType=='0'){
loadPubHistory(pageNo);
}else {
//TODO
console.log("加载私聊聊天记录");
}
}
});
});
spring 配置:
spring-redis.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:util="http://www.springframework.org/schema/util" xmlns:redis="http://www.springframework.org/schema/redis"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.0.xsd
http://www.springframework.org/schema/redis
http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"
default-lazy-init="false">
<context:component-scan base-package="com.mjy.chat.redis.dao" />
<context:property-placeholder location="classpath:redis.properties" />
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="${redis.maxIdle}" />
<property name="maxActive" value="${redis.maxActive}" />
<property name="maxWait" value="${redis.maxWait}" />
<property name="testOnBorrow" value="${redis.testOnBorrow}" />
</bean>
<bean id="connectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}"
p:pool-config-ref="poolConfig" />
<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<!-- 使用string主要是key 在redis端用命令好读 不然默认的序列化没办法读 -->
<property name="keySerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashKeySerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="valueSerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashValueSerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
</bean>
<!-- redis发布订阅配置 -->
<bean id="serialization" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
<!-- 消息监听适配器 delegate属性指定真正的目标处理器-->
<bean id="smsMessageListener"
class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="chatHandler" />
<!-- <property name="serializer" ref="serialization" /> -->
</bean>
<bean id="chatHandler" class="com.mjy.chat.websocket.ChatHandler"/>