1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| @Component public class SocketIOStartup implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(SocketIOStartup.class); private static final ConcurrentHashMap<String, SocketThread> socketThreadMap = new ConcurrentHashMap<>();
@Autowired private SocketIOServer socketIOServer; private void startUpServer() { bindListener(); socketIOServer.startAsync(); }
private void bindListener() { connect(); disconnect(); bindEventListener(); } private void connect() { socketIOServer.addConnectListener(new ConnectListener() {
@Override public void onConnect(SocketIOClient socketIOClient) { logger.info("{}已连接", socketIOClient.getRemoteAddress().toString() + socketIOClient.getSessionId()); } }); } private void disconnect() { socketIOServer.addDisconnectListener(new DisconnectListener() { @Override public void onDisconnect(SocketIOClient socketIOClient) { SocketThread st = socketThreadMap.get(socketIOClient.getSessionId().toString()); if (st != null) { st.setLoop(false); socketThreadMap.remove(socketIOClient.getSessionId().toString()); } logger.info("{}已关闭", socketIOClient.getSessionId()); } }); } private void bindEventListener() { socketIOServer.addEventListener("talk", Object.class, new DataListener<Object>() { @Override public void onData(SocketIOClient socketIOClient, Object value, AckRequest ackRequest) throws Exception { sendMessage(socketIOClient, value); } }); } private void sendMessage(SocketIOClient socketIOClient, Object object) throws JsonParseException { String cmd; String value; try { String jsonString = JSONObject.toJSONString(object); JSONObject cmdJson = JSONObject.parseObject(jsonString); cmd = cmdJson.getString("cmd"); value = cmdJson.getString("message"); } catch (Exception e) { throw new RestApiException(ExceptionInfoEnum.DATA_PARSE_EXCEPTION); }
SocketRoute route = SocketRoute.getSocketRoute(cmd); if (route == null) { throw new RuntimeException("找不到该cmd:" + cmd + " 对应的处理方法"); } MonitorProcessor processor = SpringBeanUtils.getBean(route.getMonitorClass()); startThread(socketIOClient, processor, value, route.getIntervalTime()); }
private void startThread(SocketIOClient socketIOClient, MonitorProcessor processor, String value, long interval) { SocketThread st = socketThreadMap.get(socketIOClient.getSessionId().toString()); if (st != null) { st.setLoop(false); socketThreadMap.remove(socketIOClient.getSessionId().toString()); } SocketThread st_new = new SocketThread(socketIOClient, processor, value, interval); socketThreadMap.put(socketIOClient.getSessionId().toString(), st_new); Thread t = new Thread(st_new); t.start(); }
class SocketThread implements Runnable { private SocketIOClient socketIOClient; private MonitorProcessor processor; private String value; private long interval; private boolean loop = true;
public SocketThread(SocketIOClient socketIOClient, MonitorProcessor processor, String value, long interval) { this.socketIOClient = socketIOClient; this.processor = processor; this.value = value; this.interval = interval; }
public void setLoop(boolean loop) { this.loop = loop; }
@Override public void run() { while (loop && socketIOClient.isChannelOpen()) { Object returnVal = processor.process(value); if (loop && socketIOClient.isChannelOpen()) { socketIOClient.sendEvent("message", returnVal); } try { Thread.sleep(interval); } catch (InterruptedException e) { logger.error("Socket clientId:{} {}", socketIOClient.getSessionId(), e.getMessage()); } } } }
@Override public void run(String... args) { startUpServer(); }
}
|