基于Springboot+WebSocket+Netty实现在线聊天、群聊系统
一、文章前言
此文主要实现在好友添加、建群、聊天对话、群聊功能,使用Java作为后端语言进行支持,界面友好,开发简单。
二、开发流程及工具准备
2.1、下载安装IntelliJ IDEA(后端语言开发工具),Mysql数据库,微信Web开发者工具。
三、开发步骤
1.创建maven project
先创建一个名为SpringBootDemo的项目,选择【New Project】
然后在弹出的下图窗口中,选择左侧菜单的【New Project】(注:和2022之前的idea版本不同,这里左侧没有【Maven】选项,不要选【Maven Archetype】!!!),输入Name(项目名):SpringBootDemo,language选择【java】,build system选择【maven】,然后选择jdk,我这里选的是jdk18.
然后点击【Create】
2.在project下创建module
点击右键选择【new】—【Module…】
左侧选择【Spring initializr】,通过idea中集成的Spring initializr工具进行spring boot项目的快速创建。窗口右侧:name可根据自己喜好设置,group和artifact和上面一样的规则,其他选项保持默认值即可,【next】
Developer Tools模块勾选【Spring Boot DevTools】,web模块勾选【Spring Web】
此时,一个Springboot项目已经搭建完成,可开发后续功能
3.编写一个消息实体类、Mapper、service(三层架构)
@Data
public class Chat {
@TableId(type = IdType.AUTO)
private Long id;
private Long userId;
private Long targetUserId;
private LocalDateTime createTime;
private String userName;
private String targetUserName;
private String content;
}
由于我们使用mybatis-plus,所以简单的增删改查不用自己写,框架自带了,只需要实现或者继承他的Mapper、Service
4.编写WebSocket服务类
@ServerEndpoint("/imserver/{userId}")
@Component
public class WebSocketService {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static ConcurrentHashMap<String, WebSocketService> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
public static ChatMapper chatMapper = null;
/**
* 连接建立成功调用的方法
* <p>
* 1.用map存 每个客户端对应的MyWebSocket对象
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
webSocketMap.put(userId, this);
//加入set中
} else {
webSocketMap.put(userId, this);
//加入set中
}
}
/**
* 报错
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
/**
* 实现服务器推送到对应的客户端
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 自定义关闭
*
* @param userId
*/
public static void close(String userId) {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
}
}
/**
* 获取在线用户信息
*
* @return
*/
public static Map getOnlineUser() {
return webSocketMap;
}
5.创建控制器Controller
先创建Controller Package
创建一个Controller
输入类名,选在【Class】
因为要编写Rest风格的Api,要在Controller上标注@RestController注解
6.创建具体的Api接口
@RestController
public class DemoController {
@Autowired
private ChatService chatService;
@PostMapping("/push")
public ResponseEntity<String> pushToWeb(@RequestBody Chat chat) throws IOException {
chat.setCreateTime(LocalDateTime.now());
chatService.save(chat);
WebSocketService.sendInfo(chat);
return ResponseEntity.ok("MSG SEND SUCCESS");
}
@GetMapping("/close")
public String close(String userId) {
WebSocketService.close(userId);
return "ok";
}
@GetMapping("/getOnlineUser")
public Map getOnlineUser() {
return WebSocketService.getOnlineUser();
}
@GetMapping("/getMessage")
public ResponseEntity<List<Chat>> getMessage(String userId) {
QueryWrapper<Chat> queryWrapper = new QueryWrapper();
List<Chat> list = chatService.
list(queryWrapper.lambda().eq(Chat::getTargetUserId, userId).or().eq(Chat::getUserId, userId));
return ResponseEntity.ok(list);
}
}
7.编写netty配置
package com.example.demo.config;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.stereotype.Component;
public class NettyServer {
public void start(){
//创建两个线程组boosGroup和workerGroup,含有的子线程NioEventLoop的个数默认为cpu核数的两倍
//boosGroup只是处理链接请求,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
//设置两个线程组
bootstrap.group(boosGroup,workerGroup)
//使用NioSctpServerChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//初始化服务器链接队列大小,服务端处理客户端链接请求是顺序处理的,所以同一时间只能处理一个客户端链接
//多个客户端同时来的时候,服务端将不能处理的客户端链接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG,1024)
//创建通道初始化对象,设置初始化参数
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("收到到新的链接");
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
ch.pipeline().addLast(new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(new MessageHandler());//添加测试的聊天消息处理类
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
}
});
System.out.println("netty server start..");
//绑定一个端口并且同步,生成一个ChannelFuture异步对象,通过isDone()等方法判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(1245).sync();
//给cf注册监听器,监听我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (cf.isSuccess()){
System.out.println("监听端口成功");
}else {
System.out.println("监听端口失败");
}
}
});
//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
//通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
8.前端代码Websocket聊天功能
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
me.websocket = new WebSocket(me.ws + me.info.id);
me.websocket.onmessage = function(event) {
var json = JSON.parse(event.data);
me.msgListMethod();
console.log(json);
};
console.log(me.websocket)
me.websocket.onopen = function(event) {
console.log("Netty-WebSocket服务器。。。。。。连接");
};
me.websocket.onerror = function(evt) {
console.log('发生错误..., evt');
};
me.websocket.CONNECTING = function(evt) {
console.log('正在链接中');
};
} else {
alert("您的浏览器不支持WebSocket协议!");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
if (me.websocket != null) {
me.websocket.close();
}
};
这里用到了很多消息发送功能,比如文件、图片。群聊还可查看群成员功能
更多推荐
所有评论(0)