| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 
 | @Componentpublic class SocketIOStartup implements CommandLineRunner {
 private static final Logger logger = LoggerFactory.getLogger(SocketIOStartup.class);
 
 private static final ConcurrentHashMap<String, SocketThread> socketThreadMap = new ConcurrentHashMap<>();
 
 @Autowired
 private SocketIOServer socketIOServer;
 
 private void startUpServer() {
 bindListener();
 socketIOServer.startAsync();
 }
 
 private void bindListener() {
 connect();
 disconnect();
 bindEventListener();
 }
 
 private void connect() {
 socketIOServer.addConnectListener(new ConnectListener() {
 
 @Override
 public void onConnect(SocketIOClient socketIOClient) {
 logger.info("{}已连接", socketIOClient.getRemoteAddress().toString() + socketIOClient.getSessionId());
 }
 });
 }
 
 private void disconnect() {
 socketIOServer.addDisconnectListener(new DisconnectListener() {
 @Override
 public void onDisconnect(SocketIOClient socketIOClient) {
 SocketThread st = socketThreadMap.get(socketIOClient.getSessionId().toString());
 if (st != null) {
 st.setLoop(false);
 socketThreadMap.remove(socketIOClient.getSessionId().toString());
 }
 logger.info("{}已关闭", socketIOClient.getSessionId());
 }
 });
 }
 
 private void bindEventListener() {
 socketIOServer.addEventListener("talk", Object.class, new DataListener<Object>() {
 @Override
 public void onData(SocketIOClient socketIOClient, Object value, AckRequest ackRequest) throws Exception {
 sendMessage(socketIOClient, value);
 }
 });
 }
 
 private void sendMessage(SocketIOClient socketIOClient, Object object) throws JsonParseException {
 String cmd;
 String value;
 try {
 String jsonString = JSONObject.toJSONString(object);
 JSONObject cmdJson = JSONObject.parseObject(jsonString);
 cmd = cmdJson.getString("cmd");
 value = cmdJson.getString("message");
 } catch (Exception e) {
 throw new RestApiException(ExceptionInfoEnum.DATA_PARSE_EXCEPTION);
 }
 
 
 
 
 SocketRoute route = SocketRoute.getSocketRoute(cmd);
 if (route == null) {
 throw new RuntimeException("找不到该cmd:" + cmd + " 对应的处理方法");
 }
 MonitorProcessor processor = SpringBeanUtils.getBean(route.getMonitorClass());
 
 startThread(socketIOClient, processor, value, route.getIntervalTime());
 }
 
 
 private void startThread(SocketIOClient socketIOClient, MonitorProcessor processor, String value, long interval) {
 SocketThread st = socketThreadMap.get(socketIOClient.getSessionId().toString());
 if (st != null) {
 st.setLoop(false);
 socketThreadMap.remove(socketIOClient.getSessionId().toString());
 }
 SocketThread st_new = new SocketThread(socketIOClient, processor, value, interval);
 socketThreadMap.put(socketIOClient.getSessionId().toString(), st_new);
 Thread t = new Thread(st_new);
 t.start();
 }
 
 class SocketThread implements Runnable {
 private SocketIOClient socketIOClient;
 private MonitorProcessor processor;
 private String value;
 private long interval;
 private boolean loop = true;
 
 public SocketThread(SocketIOClient socketIOClient, MonitorProcessor processor, String value, long interval) {
 this.socketIOClient = socketIOClient;
 this.processor = processor;
 this.value = value;
 this.interval = interval;
 }
 
 public void setLoop(boolean loop) {
 this.loop = loop;
 }
 
 @Override
 public void run() {
 while (loop && socketIOClient.isChannelOpen()) {
 Object returnVal = processor.process(value);
 if (loop && socketIOClient.isChannelOpen()) {
 socketIOClient.sendEvent("message", returnVal);
 }
 try {
 Thread.sleep(interval);
 } catch (InterruptedException e) {
 logger.error("Socket clientId:{} {}", socketIOClient.getSessionId(), e.getMessage());
 }
 }
 }
 }
 
 @Override
 public void run(String... args) {
 startUpServer();
 }
 
 }
 
 
 |