# custom-ktor **Repository Path**: slientes/custom-ktor ## Basic Information - **Project Name**: custom-ktor - **Description**: 封装ktor的socket通信 - **Primary Language**: Kotlin - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-04-24 - **Last Updated**: 2023-05-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 当前版本 当前版本封装了ktor,支持**自定义消息转发**、**验证时长**、**自定义验证**、**自定义数据类型**(**真实数据、身份、凭证**)、**多个账户同时登陆处理**,**可拓展性强**。 **并整合成Spring-Boot-Starter。** ### 快速体验 在custom-ktor项目下找到: **TestServer.kt**(服务端) ```kotlin fun main(args: Array) { val serverSocketServer = ServerSocketServer(DefaultServerSocketSession()) serverSocketServer.start() } ``` **TestClient.kt**(客户端) ```kotlin fun main(args: Array) { //客户端 val clientSocketServer = ClientSocketServer( clientSocketSession = DefaultClientSocketSession(SimpleAuthPackPackageImpl("admin1", "ktor")) ) //设置消息处理器 clientSocketServer.setReceiveHandler { println(it) } //服务启动 clientSocketServer.start() //开启新线程来模拟发送数据 thread { val scanner = Scanner(System.`in`) while (scanner.hasNext()) { val message = StringBuilder(scanner.nextLine()) for (i in 1..5) { message.append(message) } clientSocketServer.sendMessage( DefaultProtocolImpl(FrameType.BROADCAST, SimpleBroadcastPackageImpl(message.toString())) ) } } } ``` ### springboot支持 该starter支持注入指定类型的通信服务:**字节数组**、**字符串** ```yaml ktor: enable: true #是否开启ktor-starter的自动配置 (默认开启) mode: byte #服务模式 可选值:simple(字符串)、 byte (字节数组) defaultPassWord: ktor #若没有配置认证处理器,默认的认证密码(默认密码:ktor) controller: false #是否自定义转发,false全局广播消息,为true时请重写ServerSocketSession的onMessageCustom方法(默认false) authTime: 1000 #等待验证时间,在指定时间没有发送权限验证包会自动放弃该连接(默认1000毫秒) port: 9000 #服务绑定端口 ``` 配置 ```kotlin @SpringBootApplication class SpringBootKtorApplication { companion object { @JvmStatic fun main(args: Array) { runApplication(*args) } } //mode: simple #字符串类型 @Autowired private lateinit var serverSocketServer: ServerSocketServer //mode: byte #字节数组类型 //@Autowired //private lateinit var serverSocketServer: ServerSocketServer @PostConstruct fun success() { serverSocketServer.start() } } ``` ### 项目架构 项目基于ktor的netty再封装,开发者只需要实现认证和转发就可以实现一个简单的通信模式。 重点说明一下:ktor封装的netty的方法是异步非阻塞式的,其原理是ktor使用了kotlin的协程(本质还是线程的上下文切换),使得单线程也能实现netty的selector模式,有效的利用了系统资源,且降低了并发编程难度。 更多使用文档请加入交流群: ![](https://www.jiayou.art/ktor/qq.jpg) ### 项目规划 目前实现了socket通信,下版本支持**websocket**。 ## 原始版本 ### 一、什么是Ktor ​ Ktor 是一个使用强大的 [Kotlin 语言](https://www.kotlincn.net/)在互联系统中构建异步服务器与客户端的框架。利用Ktor可以实现web服务器以及Socket周边的通信实现。 ​ 参考官网:https://ktor.kotlincn.net/ ​ 前置知识:IDEA Kotlin的函数式 Koltin协程 Socket通信 ### 二、如何使用 ​ 你可以使用gradle或者maven构建一个ktor应用,当然你可以在dockerhub上去找关于ktor的镜像,关于初学者建议使用前两个构建工具来搭建一个简单的ktor项目入门。 - 构建ktor应用方式一:idea插件 https://plugins.jetbrains.com/plugin/10823-ktor-obsolete- - 构建ktor应用方式二:官网项目初始化器 https://start.ktor.io/# 官网的web服务有例子,这便着重测试Socket编程(**官网socket文档很垃圾**) ### 三、基于maven的ktot使用netty实现的socket程序 ```xml io.ktor ktor-server-netty ${ktor_version} ch.qos.logback logback-classic ${logback_version} io.ktor ktor-server-core ${ktor_version} io.ktor ktor-server-sessions ${ktor_version} io.ktor ktor-websockets ${ktor_version} ``` ### 四、编程思路 ​ 这里我实现了一个基于netty的网络服务器,用来转发客户端的请求来实现网络通信,本来打算使用websocket实现(其实过),但由于考虑到websocket是socket的封装,性能毫无疑问的比socket低,对于游戏,fps,及时类的应用程序socket编程才是最佳之选。 ​ ![](https://www.jiayou.art/ktor/ktor.jpg) ​ 通过Socket Server来维护各个Socket Client的连接对象,对数据交互IO进行处理。 ### 五、代码实现 #### 对象序列化器 ​ 传递对象避免出现粘包 ```kotlin //jvm对象序列化为字节数组 fun objectToByteArray(obj: Any): ByteArray { val byteArrayOutputStream = ByteArrayOutputStream() val objectOutputStream = ObjectOutputStream(byteArrayOutputStream) objectOutputStream.writeObject(obj) objectOutputStream.flush() return byteArrayOutputStream.toByteArray() } //将字节数组反序列化为Protocol对象 @SuppressWarnings("unused") fun byteArrayToObject(byteArray: ByteArray): Protocol? { val `in` = ByteArrayInputStream(byteArray) val sIn = ObjectInputStream(`in`) val protocol: Protocol? protocol = try { sIn.readObject() as Protocol? } catch (e: Exception) { e.printStackTrace() null } return protocol } ``` #### 数据协议 ​ **各个socket通信我们使用自定义协议来实现,避免粘包** ```kotlin data class Translation(var data: ByteArray? = null, var messageCurrentTime: Long? = null) : Serializable ``` #### 服务端 ```kotlin fun main(args: Array) { //我们使用的Scanner在hasNext方法等待输入的时候是阻塞式的 //且输出操作的协程和读取操作的协程是在同一个协程上下文的,这就导致了该线程一直被Scanner的next方法阻塞 //协程调度器无法从阻塞的线程中再调度,也就是说输出操作的协程一直会阻塞读取操作的协程 //协程默认的上下文为当前线程(在这里是main线程),所以launch协程的默认调度上下文不符合我们的期望。 // launch是CoroutineScope的拓展函数,第一个参数指定是一个携程的上下文,不传默认就是当前线程 // public fun CoroutineScope.launch( // context: CoroutineContext = EmptyCoroutineContext, // start: CoroutineStart = CoroutineStart.DEFAULT, // block: suspend CoroutineScope.() -> Unit // ) // 这里我们new了两个单线程的协程上下文 // threadPoolOfInput为读取操作的协程的上下文 // threadPoolOfOutPut为输出操作的协程的上下文 // 他们其实是两个线程,也就是读取操作和写出操作是运行在两个独立的线程 // 所以他们不会互相阻塞对方 val threadPoolOfInput = newSingleThreadContext("input") val threadPoolOfOutPut = newSingleThreadContext("output") //启动一个阻塞式协程上下文构建器 runBlocking { //在指定主机和端口来连接一个ServerSockert服务,等待服务端响应连接(异步非阻塞式) val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress("127.0.0.1", 2323)) //打开socker套接字的输入流(后面的autoFlush参数为刷新缓冲区,不用再手动write完数据后手动调flush) val output = socket.openWriteChannel(autoFlush = true) //打开socker套接字的输入流 val input = socket.openReadChannel() //读取操作的协程 launch(threadPoolOfInput) { while (true) { input.awaitContent() if (input.availableForRead == 0) break val availableLength = input.readInt() val byteArray = ByteArray(availableLength) input.readFully(byteArray) val data = SerializableTool.ByteArrayToObject(byteArray) as Translation println(data.data?.let { kotlin.text.String(it) } + ",当前消息延迟为:" + "${System.currentTimeMillis() - (if (data.messageCurrentTime == null) 0 else data.messageCurrentTime)!!}") } } //输出操作的协程 launch(threadPoolOfOutPut) { val scanner = Scanner(System.`in`) while (scanner.hasNext()) { val objectToByteArray = SerializableTool.ObjectToByteArray( Translation( ("来自客户端${socket.remoteAddress}的消息:" + scanner.nextLine()).toByteArray(Charsets.UTF_8), System.currentTimeMillis() ) ) output.writeInt(objectToByteArray.size) output.writeFully(objectToByteArray) } } } } ``` #### 客户端 ```kotlin fun main(args: Array) { //我们使用的Scanner在hasNext方法等待输入的时候是阻塞式的 //且输出操作的协程和读取操作的协程是在同一个协程上下文的,这就导致了该线程一直被Scanner的next方法阻塞 //协程调度器无法从阻塞的线程中再调度,也就是说输出操作的协程一直会阻塞读取操作的协程 //协程默认的上下文为当前线程(在这里是main线程),所以launch协程的默认调度上下文不符合我们的期望。 // launch是CoroutineScope的拓展函数,第一个参数指定是一个携程的上下文,不传默认就是当前线程 // public fun CoroutineScope.launch( // context: CoroutineContext = EmptyCoroutineContext, // start: CoroutineStart = CoroutineStart.DEFAULT, // block: suspend CoroutineScope.() -> Unit // ) // 这里我们new了两个单线程的协程上下文 // threadPoolOfInput为读取操作的协程的上下文 // threadPoolOfOutPut为输出操作的协程的上下文 // 他们其实是两个线程,也就是读取操作和写出操作是运行在两个独立的线程 // 所以他们不会互相阻塞对方 val threadPoolOfInput = newSingleThreadContext("input") val threadPoolOfOutPut = newSingleThreadContext("output") //启动一个阻塞式协程上下文构建器 runBlocking { //在指定主机和端口来连接一个ServerSockert服务,等待服务端响应连接(异步非阻塞式) val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress("127.0.0.1", 2323)) //打开socker套接字的输入流(后面的autoFlush参数为刷新缓冲区,不用再手动write完数据后手动调flush) val output = socket.openWriteChannel(autoFlush = true) //打开socker套接字的输入流 val input = socket.openReadChannel() //如果是launch { // ... // } //读取操作协程会被输出操作的协程中scanner的hasNext()一直阻塞,收不到服务端发来的消息 //读取操作的协程 launch(threadPoolOfInput) { while (true) { input.awaitContent() if (input.availableForRead == 0) break val availableLength = input.readInt() val byteArray = ByteArray(availableLength) input.readFully(byteArray) val data = SerializableTool.ByteArrayToObject(byteArray) as Translation println(data.data?.let { kotlin.text.String(it) } + ",当前消息延迟为:" + "${System.currentTimeMillis() - (if (data.messageCurrentTime == null) 0 else data.messageCurrentTime)!!}") } } //如果是launch { // ... // } //输出操作的协程中scanner的hasNext()会一直阻塞读取操作协程,收不到服务端发来的消息 //输出操作的协程 launch(threadPoolOfOutPut) { val scanner = Scanner(System.`in`) while (scanner.hasNext()) { val objectToByteArray = SerializableTool.ObjectToByteArray( Translation( ("来自客户端${socket.remoteAddress}的消息:" + scanner.nextLine()).toByteArray(Charsets.UTF_8), System.currentTimeMillis() ) ) output.writeInt(objectToByteArray.size) output.writeFully(objectToByteArray) } } } } ``` ### 六、测试 ​ **启动一个服务端,和三个客户端** ![](https://www.jiayou.art/ktor/1.png) ​ **客户端一发送"hello ktor!"** ![](https://www.jiayou.art/ktor/2.png) ​ **服务端接收到消息** ![](https://www.jiayou.art/ktor/3.png) ​ **客户端二接收到消息** ![](https://www.jiayou.art/ktor/4.png) ​ **客户端三接收到消息** ![](https://www.jiayou.art/ktor/5.png) **大家忽略上面的延迟字样,第一次发消息延迟会比较高,因为会涉及第一次初始化,缓冲,建立流传输通道等操作,会后IO操作都是5ms左右。** ### 七、总结 ​ ktor非常实用,小巧,对netty和jetty都进行了封装,在web方面也提供了websocket、Auth、JWT、Jackson、SSL实现,包括web周边的路由,CORS,模板渲染引擎(Freemarker,Themyleaf,Velocity)实现,很适合作为项目的脚手架。 ​ **推荐链接:** ​ 中国唯一 Google 官方认证 Android 和 Kotlin 双领域开发专家(GDE): https://space.bilibili.com/27559447?from=search&seid=18022887471961950104 ​ ktor官网:https://ktor.kotlincn.net/ ​