[TOC]
使用版本
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core-spring-boot-starter</artifactId>
<version>3.5.9.v20200214-RELEASE</version>
</dependency>
协议格式定义
package demo;
import java.nio.ByteBuffer;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
/**
* 描述:协议定义,含编码解码
*
* @author [天明]
* @version: 0.0.1 Mar 25, 2020-11:07:29 AM
*
*/
public class HelloPacket extends Packet {
private static final long serialVersionUID = -172060606924066412L;
public static final int HEADER_LENGTH = 4; // 消息头的长度
public static final String CHARSET = "utf-8";
private byte[] body;
/**
* @return the body
*/
public byte[] getBody() {
return body;
}
/**
* @param body
* the body to set
*/
public void setBody(byte[] body) {
this.body = body;
}
public static ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
HelloPacket tioPacket = (HelloPacket) packet;
byte[] body = tioPacket.getBody();
int bodyLen = 0;
if (body != null) {
bodyLen = body.length;
}
// bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
int allLen = HelloPacket.HEADER_LENGTH + bodyLen;
// 创建一个新的bytebuffer
ByteBuffer buffer = ByteBuffer.allocate(allLen);
// 设置字节序
buffer.order(tioConfig.getByteOrder());
// 写入消息头----消息头的内容就是消息体的长度
buffer.putInt(bodyLen);
// 写入消息体
if (body != null) {
buffer.put(body);
}
return buffer;
}
public static HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength,
ChannelContext channelContext) throws AioDecodeException {
if (readableLength < HelloPacket.HEADER_LENGTH) {
return null;
}
// 读取消息体的长度
int bodyLength = buffer.getInt();
// 数据不正确,则抛出AioDecodeException异常
if (bodyLength < 0) {
throw new AioDecodeException(
"bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
}
// 计算本次需要的数据长度
int neededLength = HelloPacket.HEADER_LENGTH + bodyLength;
// 收到的数据是否足够组包
int isDataEnough = readableLength - neededLength;
// 不够消息体长度(剩下的buffe组不了消息体)
if (isDataEnough < 0) {
return null;
}
HelloPacket imPacket = new HelloPacket();
if (bodyLength > 0) {
byte[] dst = new byte[bodyLength];
buffer.get(dst);
imPacket.setBody(dst);
}
return imPacket;
}
}
Server端消息处理
package demo;
import java.nio.ByteBuffer;
import org.springframework.stereotype.Component;
import org.tio.common.starter.annotation.TioServerMsgHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;
/**
* 描述:
*
* <pre>
* 消息处理 handler, 通过加 {@link TioServerMsgHandler} 注解启用,否则不会启用
* 注意: handler 是必须要启用的,否则启动会抛出 {@link TioMsgHandlerNotFoundException} 异常
* </pre>
*
* @author [天明]
* @version: 0.0.1 Mar 25, 2020-11:06:42 AM
*
*/
@Component
@TioServerMsgHandler
public class HelloServerMsgHandler implements ServerAioHandler {
/**
* 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包 总的消息结构:消息头 + 消息体 消息头结构: 4个字节,存储消息体的长度
* 消息体结构: 对象的json串的byte[]
*/
@Override
public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength,
ChannelContext channelContext) throws AioDecodeException {
return HelloPacket.decode(buffer, limit, position, readableLength, channelContext);
}
/**
* 编码:把业务消息包编码为可以发送的ByteBuffer 总的消息结构:消息头 + 消息体 消息头结构: 4个字节,存储消息体的长度 消息体结构:
* 对象的json串的byte[]
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return HelloPacket.encode(packet, tioConfig, channelContext);
}
/**
* 处理消息
*/
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
HelloPacket helloPacket = (HelloPacket) packet;
byte[] body = helloPacket.getBody();
if (body != null) {
String str = new String(body, HelloPacket.CHARSET);
System.out.println("收到消息:" + str);
HelloPacket resppacket = new HelloPacket();
resppacket.setBody(("收到了你的消息,你的消息是:" + str).getBytes(HelloPacket.CHARSET));
Tio.send(channelContext, resppacket);
}
return;
}
}
Client消息处理
package demo;
import java.nio.ByteBuffer;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
/**
* 描述:
*
* <pre>
* 客户端编码解码处理
* </pre>
*
* @author [天明]
* @version: 0.0.1 Mar 25, 2020-11:57:17 AM
*
*/
public class HelloClientAioHandler implements ClientAioHandler {
private static HelloPacket heartbeatPacket = new HelloPacket();
/*
* (non-Javadoc)
*
* @see org.tio.core.intf.AioHandler#decode(java.nio.ByteBuffer, int, int,
* int, org.tio.core.ChannelContext)
*/
@Override
public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
throws AioDecodeException {
return HelloPacket.decode(buffer, limit, position, readableLength, channelContext);
}
/*
* (non-Javadoc)
*
* @see org.tio.core.intf.AioHandler#encode(org.tio.core.intf.Packet,
* org.tio.core.TioConfig, org.tio.core.ChannelContext)
*/
@Override
public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
return HelloPacket.encode(packet, tioConfig, channelContext);
}
/*
* (non-Javadoc)
*
* @see org.tio.core.intf.AioHandler#handler(org.tio.core.intf.Packet,
* org.tio.core.ChannelContext)
*/
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
HelloPacket helloPacket = (HelloPacket) packet;
byte[] body = helloPacket.getBody();
if (body != null) {
String str = new String(body, HelloPacket.CHARSET);
System.out.println("收到消息:" + str);
}
}
/*
* (non-Javadoc)
*
* @see org.tio.client.intf.ClientAioHandler#heartbeatPacket(org.tio.core.
* ChannelContext)
*/
@Override
/**
* 此方法如果返回null,框架层面则不会发心跳;如果返回非null,框架层面会定时发本方法返回的消息包
*/
public Packet heartbeatPacket(ChannelContext channelContext) {
return heartbeatPacket;
}
}
Server配置
tio.core.server.port=6789
tio.core.server.heartbeat-timeout=60000
tio.core.cluster.enabled=false
Server启动
@SpringBootApplication
# 增加注解
@EnableTioServerServer
Client 启动
package demo;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.Tio;
/**
* 描述:
*
* @author [天明]
* @version: 0.0.1 Mar 25, 2020-12:02:27 PM
*
*/
public class HelloClientStarter {
/**
* 服务器地址
*/
public static final String SERVER = "127.0.0.1";
/**
* 监听端口
*/
public static final int PORT = 6789;
/**
* 心跳超时时间
*/
public static final int TIMEOUT = 5000;
// 服务器节点
public static Node serverNode = new Node(SERVER, PORT);
// handler, 包括编码、解码、消息处理
public static ClientAioHandler tioClientHandler = new HelloClientAioHandler();
// 事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
public static ClientAioListener aioListener = null;
// 断链后自动连接的,不想自动连接请设为null
private static ReconnConf reconnConf = new ReconnConf(5000L);
// 一组连接共用的上下文对象
public static ClientTioConfig clientTioConfig = new ClientTioConfig(tioClientHandler, aioListener,
reconnConf);
public static TioClient tioClient = null;
public static ClientChannelContext clientChannelContext = null;
/**
* 启动程序入口
*/
public static void main(String[] args) throws Exception {
clientTioConfig.setHeartbeatTimeout(TIMEOUT);
tioClient = new TioClient(clientTioConfig);
clientChannelContext = tioClient.connect(serverNode);
// 连上后,发条消息玩玩
send();
}
private static void send() throws Exception {
HelloPacket packet = new HelloPacket();
packet.setBody("hello world".getBytes(HelloPacket.CHARSET));
Tio.send(clientChannelContext, packet);
}
}
Comments