1,、前言Apache MINA是Apache組織的一個優(yōu)秀的項目。MINA是Multipurpose Infrastructure for NetworkApplications的縮寫,。它是一個網(wǎng)絡應用程序框架,,用來幫助用戶非常方便地開發(fā)高性能和高可靠性的網(wǎng)絡應用程序。在本文中介紹了 如何通過Apache Mina2.0來實現(xiàn)TCP協(xié)議長連接和短連接應用,。 2,、系統(tǒng)介紹2.1系統(tǒng)框架整個系統(tǒng)由兩個服務端程序和兩個客戶端程序組成。分別實現(xiàn)TCP長連接和短連接通信,。 系統(tǒng)業(yè)務邏輯是一個客戶端與服務端建立長連接,,一個客戶端與服務端建立短連接,。數(shù)據(jù)從短連接客戶端經(jīng)過服務端發(fā)送到長連接客戶端,并從長連接客戶端接收響應數(shù)據(jù),。當收到響應數(shù)據(jù)后斷開連接,。 系統(tǒng)架構圖如下:
2.2處理流程系統(tǒng)處理流程如下: 1) 啟動服務端程序,監(jiān)聽8001和8002端口,。 2) 長連接客戶端向服務端8002端口建立連接,,服務端將連接對象保存到共享內(nèi)存中。由于采用長連接方式,,連接對象是唯一的,。 3) 短連接客戶端向服務端8001端口建立連接。建立連接后創(chuàng)建一個連接對象,。 4) 短連接客戶端連接成功后發(fā)送數(shù)據(jù),。服務端接收到數(shù)據(jù)后從共享內(nèi)存中得到長連接方式的連接對象,使用此對象向長連接客戶端發(fā)送數(shù)據(jù),。發(fā)送前將短連接對象設為長連接對象的屬性值,。 5) 長連接客戶端接收到數(shù)據(jù)后返回響應數(shù)據(jù)。服務端從長連接對象的屬性中取得短連接對象,,通過此對象將響應數(shù)據(jù)發(fā)送給短連接客戶端,。 6) 短連接客戶端收到響應數(shù)據(jù)后,關閉連接,。 3,、服務端程序3.1長連接服務端服務啟動 public class MinaLongConnServer { private static final int PORT = 8002;
public void start()throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaLongConnServerHandler()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.bind(new InetSocketAddress(PORT)); System.out.println("Listeningon port " + PORT); } } 消息處理 public class MinaLongConnServerHandler extends IoHandlerAdapter { private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
@Override public void sessionOpened(IoSession session) { InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress(); String clientIp = remoteAddress.getAddress().getHostAddress(); logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId())); logger.info("接收來自客戶端 :" + clientIp + "的連接."); Initialization init = Initialization.getInstance(); HashMap<String, IoSession> clientMap =init.getClientMap(); clientMap.put(clientIp, session); }
@Override public void messageReceived(IoSession session, Object message) { logger.info("Messagereceived in the long connect server.."); String expression = message.toString(); logger.info("Message is:" + expression); IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession"); logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId())); shortConnSession.write(expression); }
@Override public void sessionIdle(IoSession session, IdleStatus status) { logger.info("Disconnectingthe idle."); // disconnect an idle client session.close(true); }
@Override public void exceptionCaught(IoSession session, Throwable cause) { // close the connection onexceptional situation logger.warn(cause.getMessage(), cause); session.close(true); } } 3.2短連接服務端服務啟動 public class MinaShortConnServer { private static final int PORT = 8001;
public void start()throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaShortConnServerHandler()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3); acceptor.bind(new InetSocketAddress(PORT)); System.out.println("Listeningon port " + PORT); } } 消息處理 public class MinaShortConnServerHandler extends IoHandlerAdapter { private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
@Override public void sessionOpened(IoSession session) { InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress(); logger.info(remoteAddress.getAddress().getHostAddress()); logger.info(String.valueOf(session.getId())); }
@Override public void messageReceived(IoSession session, Object message) { logger.info("Messagereceived in the short connect server..."); String expression = message.toString(); Initialization init = Initialization.getInstance(); HashMap<String, IoSession> clientMap =init.getClientMap(); if (clientMap == null || clientMap.size() == 0) { session.write("error"); } else { IoSession longConnSession = null; Iterator<String> iterator =clientMap.keySet().iterator(); String key = ""; while (iterator.hasNext()) { key = iterator.next(); longConnSession = clientMap.get(key); } logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId())); logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId())); longConnSession.setAttribute("shortConnSession",session); longConnSession.write(expression); } }
@Override public void sessionIdle(IoSession session, IdleStatus status) { logger.info("Disconnectingthe idle."); // disconnect an idle client session.close(true); }
@Override public void exceptionCaught(IoSession session, Throwable cause) { // close the connection onexceptional situation logger.warn(cause.getMessage(), cause); session.close(true); } }
4、客戶端程序4.1長連接客戶端使用java.net.Socket來實現(xiàn)向服務端建立連接,。Socket建立后一直保持連接,,從服務端接收到數(shù)據(jù)包后直接將原文返回。 public class TcpKeepAliveClient { private String ip; private int port; private static Socket socket = null; private static int timeout = 50 * 1000;
public TcpKeepAliveClient(String ip, int port) { this.ip = ip; this.port = port; }
public void receiveAndSend() throws IOException { InputStream input = null; OutputStream output = null;
try { if (socket == null || socket.isClosed() || !socket.isConnected()) { socket = new Socket(); InetSocketAddress addr = new InetSocketAddress(ip, port); socket.connect(addr, timeout); socket.setSoTimeout(timeout); System.out.println("TcpKeepAliveClientnew "); }
input = socket.getInputStream(); output = socket.getOutputStream();
// read body byte[] receiveBytes = {};// 收到的包字節(jié)數(shù)組 while (true) { if (input.available() > 0) { receiveBytes = new byte[input.available()]; input.read(receiveBytes);
// send System.out.println("TcpKeepAliveClientsend date :" + new String(receiveBytes)); output.write(receiveBytes, 0, receiveBytes.length); output.flush(); } }
} catch (Exception e) { e.printStackTrace(); System.out.println("TcpClientnew socket error"); } }
public static void main(String[] args) throws Exception { TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002); client.receiveAndSend(); }
} 4.2短連接客戶端服務啟動 public class MinaShortClient { private static final int PORT = 8001;
public static void main(String[] args) throws IOException,InterruptedException { IoConnector connector = new NioSocketConnector(); connector.getSessionConfig().setReadBufferSize(2048);
connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaShortClientHandler()); for (int i = 1; i <= 10; i++) { ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", PORT)); future.awaitUninterruptibly(); IoSession session =future.getSession(); session.write(i); session.getCloseFuture().awaitUninterruptibly();
System.out.println("result=" + session.getAttribute("result")); } connector.dispose();
} } 消息處理 public class MinaShortClientHandler extends IoHandlerAdapter{ private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
public MinaShortClientHandler() {
}
@Override public void sessionOpened(IoSession session) { }
@Override public void messageReceived(IoSession session, Object message) { logger.info("Messagereceived in the client.."); logger.info("Message is:" + message.toString()); session.setAttribute("result", message.toString()); session.close(true); }
@Override public void exceptionCaught(IoSession session, Throwable cause) { session.close(true); } } 5,、總結通過本文中的例子,,Apache Mina在服務端可實現(xiàn)TCP協(xié)議長連接和短連接。在客戶端只實現(xiàn)了短連接模式,,長連接模式也是可以實現(xiàn)的(在本文中還是采用傳統(tǒng)的java Socket方式),。兩個服務端之間通過共享內(nèi)存的方式來傳遞連接對象也許有更好的實現(xiàn)方式。 |
|