使用webSocket长连接实现弹幕

小豆丁 1年前 ⋅ 973 阅读

使用webSocket长连接实现弹幕

maven坐标

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.5.1</version>
        </dependency>

创建一个实体

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

创建一个连接 WebSocketService

package com.bibibi.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author victor_Yao
 * @createDate 2022/12/31 16:26
 */
@Component
//前端要连接到这个service地址
@ServerEndpoint("/imserService/{token}")
public class WebSocketService {


    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 线程安全 
     */
    private static final AtomicInteger ONULINE_COUNT = new AtomicInteger(0);

    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();

    private Session session;

    private String sessionId;

    private String userId;

    /**
     * 因为是多例 没办法直接植入Bean
     */
    private static ApplicationContext APPLICATION_CONTEXT;
	//在启动类上 在创建把app带入
    public static void setApplicationContext(ApplicationContext applicationContext) {
        WebSocketService.APPLICATION_CONTEXT = applicationContext;
    }


    @OnOpen
    public void openConnection(Session session, @PathParam("token") String token) {
        /*    this.userId= TokenUtil.ver*/
    /*    RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) WebSocketService.APPLICATION_CONTEXT.getBean("redisTemplate");
        redisTemplate.opsForValue().get("sss");*/
        this.sessionId = session.getId();
        this.session = session;
        if (WEBSOCKET_MAP.containsKey(sessionId)) {
            WEBSOCKET_MAP.remove(sessionId);
            WEBSOCKET_MAP.put(sessionId, this);
        } else {
            WEBSOCKET_MAP.put(sessionId, this);
            //在线人数加一
            ONULINE_COUNT.getAndIncrement();
        }
        logger.info("用户连接成功:" + sessionId + "当前在线人数:" + ONULINE_COUNT.get());
        try {
            this.sendMessage("0");
        } catch (Exception e) {
            logger.error("连接异常");
        }

    }

    private void sendMessage(String message) throws IOException {
        //发送信息
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 关闭
     */
    @OnClose
    public void closeConnection() {
        if (WEBSOCKET_MAP.containsKey(sessionId)) {
            WEBSOCKET_MAP.remove(sessionId);
            //在线人数减一
            ONULINE_COUNT.getAndDecrement();
        }
        logger.info("用户退出:" + sessionId + "当前在线人数:" + ONULINE_COUNT.get());
    }

    /**
     * 拿到信息
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        logger.info("用户信息:" + sessionId + ":::" + "报文:" + message);
        if (Objects.nonNull(message)) {
            try {
                //群发消息
                for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {
                    WebSocketService webSocketService = entry.getValue();

                    //TODO 加入MQ队列


                    if (webSocketService.session.isOpen()) {
                        webSocketService.sendMessage(message);
                    }
                }
                //TODO 发送弹幕到数据库中 或REDIS

            } catch (Exception e) {
                logger.error("弹幕接收异常");
                e.printStackTrace();
            }
        }


    }

    /**
     * 异常
     *
     * @param error
     */
    @OnError
    public void onError(Throwable error) {

    }

    public Session getSession() {
        return session;
    }

    public String getSessionId() {
        return sessionId;
    }
}

在启动类中添加一个全局变量

@SpringBootApplication
@ComponentScan(basePackages = "com.bibibi.*")
@ServletComponentScan
//开启定时器
@EnableScheduling
//开启异步
@EnableAsync
public class BibibiApplication {

    public static void main(String[] args) {
        ApplicationContext app = SpringApplication.run(BibibiApplication.class, args);
        //全局的变量
        WebSocketService.setApplicationContext(app);

    }

}

前端测试页面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket</title>
 
</head>
<body>
<h3>hello socket</h3>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>操作:<div><a onclick="openSocket()">开启socket</a></div>
<p>【操作】:<div><a onclick="sendMessage()">发送消息</a></div>
</body>

    
 
    var socket;
    function openSocket() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        }else{
            console.log("您的浏览器支持WebSocket");
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            var userId = document.getElementById('userId').value;
     			//添加webSocket地址  dfsfdsf /token模拟
            var socketUrl="ws://127.0.0.1:8081/imserService/dfsfdsf";
            console.log(socketUrl);
            if(socket!=null){
                socket.close();
                socket=null;
            }
            socket = new WebSocket(socketUrl);
            //打开事件
            socket.onopen = function() {
                console.log("websocket已打开");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            //获得消息事件
            socket.onmessage = function(msg) {
                var serverMsg = "收到服务端信息:" + msg.data;
                console.log(serverMsg);
                //发现消息进入    开始处理前端触发逻辑
            };
            //关闭事件
            socket.onclose = function() {
                console.log("websocket已关闭");
            };
            //发生了错误事件
            socket.onerror = function() {
                console.log("websocket发生了错误");
            }
        }
    }
    function sendMessage() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        }else {
            // console.log("您的浏览器支持WebSocket");
            var toUserId = document.getElementById('toUserId').value;
            var contentText = document.getElementById('contentText').value;
            var msg = '{"toUserId":"'+toUserId+'","contentText":"'+contentText+'"}';
            console.log(msg);
            socket.send(msg);
        }
    }
 
    
</html>