[转载]Java与Flex通信 – 摇摆的蒜头酥 – 博客园.
提到通信就得面临两个问题,一是通信协议的选择,二是数据协议的定义。通信协议耳熟能详的就有好几种,TCP,UDP,HTTP,FTP等等。数据协议是一种数据交换的格式,像jason,xml,amf3,google protocol都可以用作数据协议,你也可以自己根据通信的效率,安全等因素来定义自己的数据协议。
通信系统的开发是一项很复杂的工作,不要以为往发服务端发一个Hello World!就认为完全掌握了通信系统的开发。概括来说要开发一个健壮的通信系统,必须从这几个方面来着手。
一,通信粘包的处理
这里包的概念是逻辑上的数据包,也就是我们发送的一个完整业务消息包,粘包情况有两种,一种是粘在一起的包都是完整的数据包,另一种情况是粘在一起的包有不完整的包。不是所有的粘包现象都需要处理,若传输的数据为不带结构的连续流数据(如文件传输),则不必把粘连的包分开(简称分包)。但在实际工程应用中,传输的数据一般为带结构的数据,这时就需要做分包处理。
为了避免粘包现象,可采取以下几种措施。一是对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软 件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;二是对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接 收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;三是由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手 段来避免粘包。
以上提到的三种措施,都有其不足之处。总的来说降低了通信系统的吞吐量。我们可以自己设计一个分包算法来处理粘包的问题,该算法的实现是这样的:
- 当有数据到达时,将数据压入程序缓冲区。
- 循环处理缓冲区,如果缓冲区长度大于包头长度,则取出长度信息n,否则跳出循环,如果缓冲区的长度大于n,则从缓冲区取出一个完整包进行处理,否则跳出循环。
如果你是Java的爱好都可以参考一下Mina和netty2的实现,像Mina和Netty2都提供了粘包处理类可供使用,像Mina的CumulativeProtocolDecoder类,Netty2的LengthFieldBasedFrameDecoder。
二,数据协议选择
现在已经有很多数据协议可供我们选择,像jason,xml,amf3,google protocol等等,这些协议相应的语言都有API来对自身数据做协议处理,我们选择协标准无非就是效率和大小,这里每个人可以根据实际的应用环境选择适合的数据协议。
三,网络系统的安全性
网络安全是一个永远的话题,对通信数据加密一般常RSA对byte流加密,FLOOD验证,IP黑名单验证都是必须考虑到的。
以上是做网 络开发必须了解的一些基础知识,在这里我们使用一个具体的实例来加深一下理解,Java与Flex使用AMF3数据协议通信。做过网络开发的一般都会知道 套接字(SOCKET),很多语言都会通SOCKET来提供对网络操作的API,Java的提供的NIO SOCKET是一个高效的异步通信API,当然可以在这个基础上来开发我们的网络应用,但这种Native API需要我们花很多精力来处理网络通信的细节,消弱了我们对业务的关心。为我们开发带来很多不便性,幸好Java有很多现成的NIO SOCKET框架可供使用,像Mina,Netty2,xSocket等等,这些框架处理了很多底层的通信问题,提供了一些易用的API以供使用。在这个实例中我们使用Netty2来做通信框架。
定义消息包,消息包有定长包和不定长包,不定长包无非就是要在消息包中加入长度信息,以对收到的网络字节流进行分界。消息包的定义如下
定义AMF3数据协议的编码和解码器
/* * @(#)AMF3Encoder.java 0.1 05/11/17 * * Copyright 2010 QISI, Inc. All rights reserved. * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.qidea.pushserver.codec; import java.io.ByteArrayOutputStream; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import com.qidea.pushserver.Constants; import com.qidea.pushserver.message.CommandMessage; import com.qidea.pushserver.message.PushMessage; import flex.messaging.io.SerializationContext; import flex.messaging.io.amf.Amf3Output; /** * * @author sunwei * @version 2010-7-21 * @since JDK1.5 */ public class AMF3Encoder extends OneToOneEncoder { /** * */ @Override protected Object encode(ChannelHandlerContext arg0, Channel arg1, Object arg2) throws Exception { ByteArrayOutputStream stream = new ByteArrayOutputStream(); SerializationContext serializationContext = new SerializationContext(); Amf3Output amf3Output = new Amf3Output(serializationContext); amf3Output.setOutputStream(stream); amf3Output.writeObject(arg2); byte[] objSe = stream.toByteArray(); if (objSe != null && objSe.length > 0) { ChannelBuffer buffer = ChannelBuffers.buffer(objSe.length + 8); if (arg2 instanceof PushMessage) buffer.writeInt(Constants.MAGIC_NUM_PUSH_MSG); else if (arg2 instanceof CommandMessage) buffer.writeInt(Constants.MAGIC_NUM_COMMAND_MSG); buffer.writeInt(objSe.length); buffer.writeBytes(objSe); return buffer; } return null; } } /* * @(#)AMF3Decoder.java 0.1 05/11/17 * * Copyright 2010 QISI, Inc. All rights reserved. * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.qidea.pushserver.codec; import java.io.ByteArrayInputStream; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import flex.messaging.io.SerializationContext; import flex.messaging.io.amf.Amf3Input; /** * amf3协议解码类 * * @author sunwei * @version 2010-7-21 * @since JDK1.5 */ public class AMF3Decoder extends LengthFieldBasedFrameDecoder { public static final Logger logger = LoggerFactory .getLogger(AMF3Decoder.class); /** * * @param maxFrameLength * 包的最大大小 * @param lengthFieldOffset * 包头信息,长度的偏移位 * @param lengthFieldLength * 包头信息,长度位数 */ public AMF3Decoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); } /** * * @param maxFrameLength */ public AMF3Decoder(int maxFrameLength) { super(maxFrameLength, 4, 4, 0, 0); } /** * */ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { ChannelBuffer frame = (ChannelBuffer) super .decode(ctx, channel, buffer); if (frame == null) { return null; } // int magicNum = frame.readInt(); int dataLength = frame.readInt(); logger.info("magic num={},data length={}", magicNum, dataLength); // 读AMF3字节流的内容 byte[] content = new byte[frame.readableBytes()]; frame.readBytes(content); SerializationContext serializationContext = new SerializationContext(); Amf3Input amf3Input = new Amf3Input(serializationContext); amf3Input.setInputStream(new ByteArrayInputStream(content)); Object message = amf3Input.readObject(); return message; } }
构建服务端
public class PushProtocolHandler extends SimpleChannelHandler { public static Logger log = LoggerFactory .getLogger(PushProtocolHandler.class); /** * */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { if (e.getMessage() != null) { ChannelManager channelManager = PushServerContext .getBean(ChannelManager.class); if (e.getMessage() instanceof CommandMessage) { channelManager.handleMsg((CommandMessage) e.getMessage(), e .getChannel()); } else if (e.getMessage() instanceof PushMessage) { channelManager.handleMsg((PushMessage) e.getMessage(), e .getChannel()); } else { log.warn("unkown message {}", e); } } } } import static org.jboss.netty.channel.Channels.*; /** * * @author sunwei * @version 2010-7-22 * @since JDK1.5 */ public class PushServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); // 处理日志 pipeline.addLast("logger", new LoggingHandler()); // 处理coder pipeline.addLast("decoder", new AMF3Decoder(Constants.MAX_OBJECT_SIZE)); pipeline.addLast("encoder", new AMF3Encoder()); // pipeline.addLast("handler", new PushProtocolHandler()); // return pipeline; } } public static main(String[] args) { // 开始NIO线程 ChannelFactory factory = new NioServerSocketChannelFactory(Executors .newCachedThreadPool(), Executors.newCachedThreadPool()); // 服务启始点 ServerBootstrap bootstrap = new ServerBootstrap(factory); // 处理过滤器 bootstrap.setPipelineFactory(new PushServerPipelineFactory()); // 设置相关参数 bootstrap.setOption("child.tcpNoDelay", true); // 设置相关参数 bootstrap.setOption("child.keepAlive", true); // 绑定相关端口 bootstrap.bind(new InetSocketAddress(getPushPort())); }
Flex客户端
public class FlexSocket { //发送包 public function send(type:int, obj:PushMessage):Boolean { if (_socket == null) { return false; } //手动限制不给发送的时候用 if (socketState == socket_state_closed || socketState == socket_state_connecting) { return false; } if (!_socket.connected) { return false; } var byteArr:ByteArray=objToByteaArray(obj); var msgHead:MsgHead=new MsgHead(type, byteArr.length); sendMsg(msgHead.getType(), msgHead.getSize(), byteArr); return true; } //接受包 private function getDataHandler(e:ProgressEvent):void { _timeServerDead.stop(); _timeServerDead.reset(); if (_socket.bytesAvailable >= 8 && !_isReadHead) { _recvPackageType=_socket.readInt(); //同意关闭 // if(_recvPackageType == 5) // { // close(); // } _recvPackageSize=_socket.readInt(); _isReadHead=true; } if (_isReadHead && _socket.bytesAvailable >= _recvPackageSize) { var byte:ByteArray=new ByteArray(); _socket.readBytes(byte, 0, _recvPackageSize); _msgObj=byteArraytoObject(byte); //暂时用上面一种 if (_recvPackageType == packageType.LOGIN_TYPE) { if (_msgObj.ret == bodyType.RECEIVE_OK) { _timerDetectSocket.start(); socketState=socket_state_connected; myEventDispatch.Instence().dispatcher(bodyType.INLINE_CURRENTSOCKETSTATE); } else if (_msgObj.ret == bodyType.RECEIVE_ERROR) { close(); } } else if (_recvPackageType == packageType.CHAT_TYPE) { myEventDispatch.Instence().dispatcher(selectEventName(_recvPackageType), _msgObj); } _recvPackageSize=0; _recvPackageType=0; _msgObj=null; _isReadHead=false; } } }
有关Mina的实现,你可以通过本博客向我索取相关代码。