03月25, 2020

在springboot中使用tio

[TOC]

使用版本

<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core-spring-boot-starter</artifactId>
<version>3.5.9.v20200214-RELEASE</version>
</dependency>

协议格式定义

package demo;

import java.nio.ByteBuffer;

import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;

/**
 * 描述:协议定义,含编码解码
 * 
 * @author [天明]
 * @version: 0.0.1 Mar 25, 2020-11:07:29 AM
 *
 */
public class HelloPacket extends Packet {
    private static final long    serialVersionUID    = -172060606924066412L;
    public static final int        HEADER_LENGTH        = 4;                    // 消息头的长度
    public static final String    CHARSET                = "utf-8";
    private byte[]                body;

    /**
     * @return the body
     */
    public byte[] getBody() {
        return body;
    }

    /**
     * @param body
     *            the body to set
     */
    public void setBody(byte[] body) {
        this.body = body;
    }

    public static ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
        HelloPacket tioPacket = (HelloPacket) packet;
        byte[] body = tioPacket.getBody();
        int bodyLen = 0;
        if (body != null) {
            bodyLen = body.length;
        }

        // bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = HelloPacket.HEADER_LENGTH + bodyLen;
        // 创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        // 设置字节序
        buffer.order(tioConfig.getByteOrder());
        // 写入消息头----消息头的内容就是消息体的长度
        buffer.putInt(bodyLen);
        // 写入消息体
        if (body != null) {
            buffer.put(body);
        }
        return buffer;
    }

    public static HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength,
            ChannelContext channelContext) throws AioDecodeException {
        if (readableLength < HelloPacket.HEADER_LENGTH) {
            return null;
        }

        // 读取消息体的长度
        int bodyLength = buffer.getInt();

        // 数据不正确,则抛出AioDecodeException异常
        if (bodyLength < 0) {
            throw new AioDecodeException(
                    "bodyLength [" + bodyLength + "] is not right, remote:" + channelContext.getClientNode());
        }

        // 计算本次需要的数据长度
        int neededLength = HelloPacket.HEADER_LENGTH + bodyLength;
        // 收到的数据是否足够组包
        int isDataEnough = readableLength - neededLength;
        // 不够消息体长度(剩下的buffe组不了消息体)
        if (isDataEnough < 0) {
            return null;
        }
        HelloPacket imPacket = new HelloPacket();
        if (bodyLength > 0) {
            byte[] dst = new byte[bodyLength];
            buffer.get(dst);
            imPacket.setBody(dst);
        }
        return imPacket;
    }
}

Server端消息处理

package demo;

import java.nio.ByteBuffer;

import org.springframework.stereotype.Component;
import org.tio.common.starter.annotation.TioServerMsgHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;

/**
 * 描述:
 * 
 * <pre>
 * 消息处理 handler, 通过加 {@link TioServerMsgHandler} 注解启用,否则不会启用
 * 注意: handler 是必须要启用的,否则启动会抛出 {@link TioMsgHandlerNotFoundException} 异常
 * </pre>
 * 
 * @author [天明]
 * @version: 0.0.1 Mar 25, 2020-11:06:42 AM
 *
 */
@Component
@TioServerMsgHandler
public class HelloServerMsgHandler implements ServerAioHandler {
    /**
     * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包 总的消息结构:消息头 + 消息体 消息头结构: 4个字节,存储消息体的长度
     * 消息体结构: 对象的json串的byte[]
     */
    @Override
    public HelloPacket decode(ByteBuffer buffer, int limit, int position, int readableLength,
            ChannelContext channelContext) throws AioDecodeException {
        return HelloPacket.decode(buffer, limit, position, readableLength, channelContext);
    }

    /**
     * 编码:把业务消息包编码为可以发送的ByteBuffer 总的消息结构:消息头 + 消息体 消息头结构: 4个字节,存储消息体的长度 消息体结构:
     * 对象的json串的byte[]
     */
    @Override
    public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
        return HelloPacket.encode(packet, tioConfig, channelContext);
    }

    /**
     * 处理消息
     */
    @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        if (body != null) {
            String str = new String(body, HelloPacket.CHARSET);
            System.out.println("收到消息:" + str);
            HelloPacket resppacket = new HelloPacket();
            resppacket.setBody(("收到了你的消息,你的消息是:" + str).getBytes(HelloPacket.CHARSET));
            Tio.send(channelContext, resppacket);
        }
        return;
    }
}

Client消息处理


package demo;

import java.nio.ByteBuffer;

import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;

/**
 * 描述:
 * 
 * <pre>
 * 客户端编码解码处理
 * </pre>
 * 
 * @author [天明]
 * @version: 0.0.1 Mar 25, 2020-11:57:17 AM
 *
 */
public class HelloClientAioHandler implements ClientAioHandler {
    private static HelloPacket heartbeatPacket = new HelloPacket();

    /*
     * (non-Javadoc)
     * 
     * @see org.tio.core.intf.AioHandler#decode(java.nio.ByteBuffer, int, int,
     * int, org.tio.core.ChannelContext)
     */
    @Override
    public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext)
            throws AioDecodeException {
        return HelloPacket.decode(buffer, limit, position, readableLength, channelContext);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.tio.core.intf.AioHandler#encode(org.tio.core.intf.Packet,
     * org.tio.core.TioConfig, org.tio.core.ChannelContext)
     */
    @Override
    public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
        return HelloPacket.encode(packet, tioConfig, channelContext);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.tio.core.intf.AioHandler#handler(org.tio.core.intf.Packet,
     * org.tio.core.ChannelContext)
     */
    @Override
    public void handler(Packet packet, ChannelContext channelContext) throws Exception {
        HelloPacket helloPacket = (HelloPacket) packet;
        byte[] body = helloPacket.getBody();
        if (body != null) {
            String str = new String(body, HelloPacket.CHARSET);
            System.out.println("收到消息:" + str);
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.tio.client.intf.ClientAioHandler#heartbeatPacket(org.tio.core.
     * ChannelContext)
     */
    @Override
    /**
     * 此方法如果返回null,框架层面则不会发心跳;如果返回非null,框架层面会定时发本方法返回的消息包
     */
    public Packet heartbeatPacket(ChannelContext channelContext) {
        return heartbeatPacket;
    }

}

Server配置

tio.core.server.port=6789
tio.core.server.heartbeat-timeout=60000
tio.core.cluster.enabled=false

Server启动

@SpringBootApplication
# 增加注解
@EnableTioServerServer

Client 启动

package demo;

import org.tio.client.ClientChannelContext;
import org.tio.client.ClientTioConfig;
import org.tio.client.ReconnConf;
import org.tio.client.TioClient;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;
import org.tio.core.Tio;


/**
 * 描述:
 * 
 * @author [天明]
 * @version: 0.0.1 Mar 25, 2020-12:02:27 PM
 *
 */
public class HelloClientStarter {
    /**
     * 服务器地址
     */
    public static final String            SERVER                    = "127.0.0.1";
    /**
     * 监听端口
     */
    public static final int                PORT                    = 6789;
    /**
     * 心跳超时时间
     */
    public static final int                TIMEOUT                    = 5000;
    // 服务器节点
    public static Node                    serverNode                = new Node(SERVER, PORT);
    // handler, 包括编码、解码、消息处理
    public static ClientAioHandler        tioClientHandler        = new HelloClientAioHandler();
    // 事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
    public static ClientAioListener        aioListener                = null;
    // 断链后自动连接的,不想自动连接请设为null
    private static ReconnConf            reconnConf                = new ReconnConf(5000L);
    // 一组连接共用的上下文对象
    public static ClientTioConfig        clientTioConfig            = new ClientTioConfig(tioClientHandler, aioListener,
            reconnConf);
    public static TioClient                tioClient                = null;
    public static ClientChannelContext    clientChannelContext    = null;

    /**
     * 启动程序入口
     */
    public static void main(String[] args) throws Exception {
        clientTioConfig.setHeartbeatTimeout(TIMEOUT);
        tioClient = new TioClient(clientTioConfig);
        clientChannelContext = tioClient.connect(serverNode);
        // 连上后,发条消息玩玩
        send();
    }

    private static void send() throws Exception {
        HelloPacket packet = new HelloPacket();
        packet.setBody("hello world".getBytes(HelloPacket.CHARSET));
        Tio.send(clientChannelContext, packet);
    }
}

本文链接:https://blog.jnliok.com/post/tio-start.html

-- EOF --

Comments