Netty-Json-Android-kotlin

使用纯kotlin创建的Android项目,并使用在真实项目中的。连接服务器使用的是neetty,这只是一个框架,有很多注释 在使用neetty和Json对象传输 连接服务器 NettyConnect 编码EncodeHandler 解码DecodeHandler MsgHandler 接收到的数据处理

/** * *@author: Jeff 15899859876@qq.com

  • @time: 2018-05-16 15:10
  • @description: 发送器,编码
  • EncodeHandler 继承自 Netty 中的 MessageToMessageEncoder 类,
  • 并重写抽象方法 encode(ctx: ChannelHandlerContext?, msg: Any?, out: MutableList?)
  • 它负责将Object类型的POJO对象编码为json对象

*/

internal class EncodeHandler : MessageToMessageEncoder() { override fun encode(ctx: ChannelHandlerContext?, msg: Any?, out: MutableList?) { LogUtils.d("EncodeHandler", "msg="+msg.toString()); //序列化为json字符串 val objectToJson:String=gson.toJson(msg); out!!.add(objectToJson); LogUtils.d("EncodeHandler", "objectToJson="+objectToJson);

}

}

/**

  • @author: Jeff 15899859876@qq.com

  • @date: 2018-05-16 15:58

  • @description: 接收器,解码

  • DecodeHandler 继承自 Netty 中的 MessageToMessageDecoder 类,

  • 并重写抽象方法 decode(ctx: ChannelHandlerContext?, msg: ByteBuf?, out: MutableList?)

  • 首先从数据报msg(数据类型取决于继承MessageToMessageDecoder时填写的泛型类型) *它负责将Object类型的json 对象编码为 POJO 对象 */ class DecodeHandler : MessageToMessageDecoder<Any?>() { override fun decode(ctx: ChannelHandlerContext?, msg: Any?, out: MutableList?) { LogUtils.d("DecodeHandler", "msg=" + msg.toString()) var str: String = msg.toString() str = str.substring(0, str.lastIndexOf("}") + 1); LogUtils.d("DecodeHandler", "str=" + str) val jsonToAny = gson.fromJson(str, Any::class.java) // // 解码并添加到解码列表out中 out!!.add(jsonToAny); LogUtils.d("DecodeHandler", "jsonToAny=" + jsonToAny.toString())

    }

}

/**

  • @author: Jeff 15899859876@qq.com

  • @date: 2018-05-16 16:56

  • @description: 接收到的数据处理

  • ChannelDuplexHandler 则同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口。

  • 如果一个所需的ChannelHandler既要处理入站事件又要处理出站事件,推荐继承此类。

  • 当然这里使用的是netty4

  • 如果使用netty5使用 ChannelHandlerAdapter 也同样实现 ChannelInboundHandler 和 ChannelOutboundHandler 接口 */ class MsgHandler : ChannelHandlerAdapter() {

    override fun channelActive(ctx: ChannelHandlerContext?) { super.channelActive(ctx) // if (ctx!!.channel()!!.isActive) { // ctx!!.close() // // } else { //注册设备,服务器连接成功,注册通道,认证 LogUtils.d("MsgHandler", "channelActive") //在这里只做发送注册的信息

     //在这里发送心跳
    

// //0029,000004,,C3,170413 181858,10#心跳包设置 // // val time = Integer.parseInt(arr[5].replace("#", "")) ctx?.executor()?.scheduleAtFixedRate( { ctx!!.writeAndFlush(RegisterBus()); LogUtils.d("MsgHandler", "channelActive里发送心跳${MSG_HEAD}---${TimeUnit.SECONDS}") }, 0, 30, TimeUnit.SECONDS)

    //   }
}

override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
    super.exceptionCaught(ctx, cause);
    LogUtils.d("MsgHandler", cause?.message.toString());
    if (ctx!!.channel().isOpen() && ctx!!.channel()!!.isActive) {
        nettyDestroy();//关闭服务器连接
        //ctx!!.close();
          nettyConnect();//连接服务器
    }

}

override fun channelRead(ctx: ChannelHandlerContext?, msg: Any?) {
    LogUtils.d("MsgHandler", "channelRead=${msg!!.toString()}");


    // val value:String = String(msg as ByteArray);
    //MessagePack.unpack(MessagePack.pack(msg),UserInfo.class);
    return;
    val listMsg: List<Any> = (msg!! as List<Any>)

    LogUtils.d("MsgHandler", "channelRead=${listMsg!!.toString()}");
        when (listMsg.get(0) ) {
            "C0" -> {
                // C0:[0048,000004,,C0,170413 181858,V1,170413 181618,0,1]设备注册成功返回值

            }
            "C120" -> {
                //C120:[0054,000003,,C120,170414 110212,V120,170414 110212,1,0,ok] 登录返回值

            }

// "C3"->{ // //[0029,000004,,C3,170413 181858,10]心跳包设置 // // val time = Integer.parseInt(arr[5].replace("#", "")) // ctx?.executor()?.scheduleAtFixedRate( // { // ctx.writeAndFlush(MSG_HEAD) // LogUtils.d("MsgHandler", "channelActive里发送心跳${MSG_HEAD}---${TimeUnit.SECONDS}") // }, // 0, // 10, // TimeUnit.SECONDS) // }

    }
    super.channelRead(ctx, msg)
}

}

/**

  • @author: Jeff 15899859876@qq.com

  • @date: 2018-05-16 16:54

  • @description: netty连接类

  • 1.Netty Client启动的时候需要重连

  • 2.在程序运行中连接断掉需要重连。

  • 使用object关键字替代class关键字就可以声明一个单例对象 / object NettyConnect { //companion静态声类声名对象,相当于static关键 //companion object { // 自定义委托实现单例,只能修改这个值一次. //如果要发送数据直接调用NettyConnect.channel?.writeAndFlush(""); 或着 导包直接使用channel.writeAndFlush("") var channel: Channel? by DelegatesExt.notNullSingleValue<Channel?>(); / NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程, 每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。/ private var eventLoopGroup: EventLoopGroup? = null;//=NioEventLoopGroup() / Bootstrap是开发netty客户端的基础,通过Bootstrap的connect方法来连接服务器端。该方法返回的也是ChannelFuture, 通过这个我们可以判断客户端是否连接成功,*/ private var bootstrap: Bootstrap? = null; //使用Thread线程进行异步连接 private var mThread: Thread? = null; //ChannelFuture的作用是用来保存Channel异步操作的结果。 private var future: ChannelFuture? = null; //保存连接成功或着失败 private var onDestrYN: Boolean = false; // }

    //创建连接方法,如果要调用连接,直接调用此方法即可( NettyConnect.reConnect() ) fun nettyConnect() {

     mThread = object : Thread("NettyConnect.reConnect") {
         override fun run() {
             try {
                 // super.run();
                 eventLoopGroup = NioEventLoopGroup(2);
                 bootstrap = Bootstrap()
                 bootstrap!!
                         .group(eventLoopGroup)
                         .channel(NioSocketChannel::class.java)
                         //使用TCP进行连接
                         .option(ChannelOption.TCP_NODELAY, true)
                         //// 设置TCP连接超时时间 10秒
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000 * 10)//设置连接超时时间
                         .option(ChannelOption.SO_KEEPALIVE, true)
                         //.remoteAddress(host, port)
                         .handler(object : ChannelInitializer<SocketChannel>() {
                             @kotlin.jvm.Throws(java.lang.Exception::class)
                             override fun initChannel(ch: SocketChannel) {
                                 var pipeline: ChannelPipeline = ch.pipeline();
                                 /* 使用pipeline.addLast()添加,Decoder、Encode和Handler对象
                                   Decoder接收数据,Encode发送数据,Handler是编码和解码工具*/
                                 //解析编码为UTF-8
                                 pipeline.addLast(StringDecoder(CharsetUtil.UTF_8));
                                 //打包编码为UTF-8
                                 pipeline.addLast(StringEncoder(CharsetUtil.UTF_8));
    
                                 // 添加长度字段解码器
                                 // 在MessagePack解码器之前增加LengthFieldBasedFrameDecoder,用于处理半包消息
                                 // 它会解析消息头部的长度字段信息,这样后面的MsgpackDecoder接收到的永远是整包消息
                                 ch.pipeline().addLast("frameDecoder",
                                         LengthFieldBasedFrameDecoder(16384, 0,
                                                 2, 0, 2))
                                 // 添加MesspagePack解码器
                                 ch.pipeline().addLast("msgpack decoder", DecodeHandler());//接收服务器返回的数据包解析
                                 // 添加长度字段编码器
                                 // 在MessagePack编码器之前增加LengthFieldPrepender,它将在ByteBuf之前增加2个字节的消息长度字段
                                 ch.pipeline().addLast("frameEncoder", LengthFieldPrepender(2))
                                 // 添加MessagePack编码器
                                 ch.pipeline().addLast("msgpack encoder", EncodeHandler());//发送前打包数据并编码
                                 // 添加业务处理handler
                                 ch.pipeline().addLast(MsgHandler());//接收返回数据
    
                             }
                         })
                 LogUtils.d("NettyConnect",
                         "正在连接服务器 ipStr=${COMMAND_IP},portInt=${COMMAND_PORT}");
                 //ChannelFuture的作用是用来保存Channel异步操作的结果。
                 //不使用监听
                 // val future: ChannelFuture = bootstrap!!.connect(FinalValue.COMMAND_IP,FinalValue.COMMAND_PORT).sync();
                 //使用监听,监听是否连接或是断开
                 // val future: ChannelFuture = bootstrap!!.connect(FinalValue.COMMAND_IP,FinalValue.COMMAND_PORT);
                 //{addListener(GenericFutureListener)}的方式来获得通知,而非await()。使用sync异步执行
                 future = bootstrap!!.connect(COMMAND_IP, COMMAND_PORT)!!.sync();//// 发起异步连接操作
                 //如果连接成功则保存ChannelFuture到Channel
    

// channel = future!!.awaitUninterruptibly().channel(); if (future!!.isSuccess) { //如果连接成功则保存ChannelFuture到Channel channel = future!!.awaitUninterruptibly().channel(); channel!!.closeFuture().sync();// 等待客户端链路关闭 // 优雅退出,释放NIO线程组 eventLoopGroup!!.shutdownGracefully(); // //如果连接成功则保存ChannelFuture到Channel // channel = future!!.channel() as Channel LogUtils.d("NettyConnect", "服务器连接成功ipStr=${COMMAND_IP},portInt=${COMMAND_PORT}") onDestrYN = true;//连接成功 } else { onDestrYN = false;//连接失败 while (onDestrYN) {//连接失败一直进行连接,不管是什么原因都进行连接 LogUtils.d("NettyConnect", "连接失败再次连接") //断开连接,重新进行连接 channel!!.disconnect(); channel!!.close(); future!!.channel().eventLoop().schedule( { //重新开新的线程进行连接 if (future!!.isSuccess) { //如果连接成功则保存ChannelFuture到Channel channel = future!!.channel() as Channel; //channel = future!!.awaitUninterruptibly().channel() onDestrYN = true;//连接成功 }; }, 30,//2秒重新连接 TimeUnit.SECONDS); }; }; } catch (ex: Exception) { LogUtils.d("NettyConnect", "连接出现异常重新连接${ex.toString()}"); executor.execute { try { TimeUnit.SECONDS.sleep(20); nettyDestroy(); nettyConnect();// 发起重连操作 } catch (e: InterruptedException) { e.printStackTrace(); }; }; }; }; };

    mThread!!.start();
}

private val executor = Executors.newScheduledThreadPool(1)
//? 表示当前对象是否可以为空
//!! 表示当前对象不为空的情况下执行
fun nettyDestroy() {
    // channel = null;
    //  Bootstrap
    bootstrap = null;
    //结束线程池
    if (eventLoopGroup != null) {
        // 优雅退出,释放NIO线程组
        eventLoopGroup!!.shutdownGracefully();
    }
    eventLoopGroup = null;
    //结束线程
    mThread!!.interrupt();
    mThread!!.join();
    mThread = null;
    //结束连接
    if (future != null && future!!.isSuccess) {
        /* 当对应的channel关闭的时候,就会返回对应的channel。
                   Returns the ChannelFuture which will be notified when this channel is closed.
                   This method always returns the same future instance.*/
        try {
            channel!!.closeFuture().sync();
            channel!!.flush();
        } catch (e: Exception) {
            e.printStackTrace()
        }
        channel!!.close();
    }

    //ChannelFuture,结束监听
    //future?.removeListener { }
    future = null;
}

}