This commit is contained in:
limaoio
2023-05-09 17:18:32 +08:00
parent 4283690cad
commit 3a2bd8d0ee
85 changed files with 1667 additions and 496 deletions

164
README.md Normal file
View File

@@ -0,0 +1,164 @@
## WuKongIM (Everything so easy)
This project is a simple and easy to use, powerful performance, simple design concept instant messaging service, fully custom protocol chat (Chat) system
[中文文档](./README_CN.md)
<p align="center">
<img align="left" width="160" src="./docs/logo.png">
<ul>
<!-- <li><strong>QQ group</strong>: <a href="#">496193831</a></li> -->
<li><strong>Website</strong>: http://www.githubim.com</li>
<li><strong>Source</strong>: https://github.com/WuKongIM/WuKongIM</li>
<li><strong>Protocol</strong>: <a href="./docs/protocol.md">WuKongIM Protocol</a></li>
<li><strong>Issues</strong>: https://github.com/WuKongIM/WuKongIM/issues</li>
<li><strong>Docs</strong>: http://www.githubim.com/docs</li>
</ul>
</p>
[![Docs](https://img.shields.io/badge/docs-latest-green.svg)](http://githubim.com/docs)
[![](https://img.shields.io/apm/l/vim-mode)](./LICENSE)
## Feature
* Go language development, high performance and easy maintenance have both
* The bottom layer is 100% open source
* Binary protocol (supports customization), the packet size is extremely small, the minimum heartbeat packet is only 1 byte, the power consumption is small, the flow is small, and the transmission speed is fast
* The message channel and message content are fully encrypted to prevent man-in-the-middle attacks and modify of message content.
* Simple and easy to use, strong performance, MAC notebook stand-alone test more than 6w/sec message throughput, in order to achieve this performance and easy to use, completely independent message storage, no need to rely on third-party components, a single command can start the service
* The tcp + ack mechanism is used to ensure that the message is stable and reliable without loss
* Strong scalability Adopting the channel design concept, currently supports group channels, point-to-point channels, and you can customize channels according to your own business in the future. It can realize functions such as robot channels, customer service channels, etc
* Multi-terminal synchronization, real-time synchronization of web, pc, app messages
* TCP and Websocket are supported indiscriminately
* Ten thousand people support
* The message partition is permanently stored, and the message will not be lost when the device is uninstalled
* Support offline pull in read mode
## Client SDK
[Android SDK](https://github.com/WuKongIM/WuKongIMAndroidSDK.git)
[iOS SDK](https://github.com/WuKongIM/WuKongIMiOSSDK.git)
[JS SDK](https://github.com/WuKongIM/WuKongIMJSSDK.git)
[Docs](http://www.githubim.com/docs)
## App Demo
[Android Demo](https://github.com/WuKongIM/WuKongIMAndroidDemo.git)
[iOS Demo](https://github.com/WuKongIM/WuKongIMiOSDemo.git)
[Web Demo](https://github.com/WuKongIM/WuKongIMJSDemo.git)
<!-- ## Quick start -->
<!-- <img src="./docs/quick.gif" alt="Quick start"/> -->
<!-- [Get WuKongIM executable file](./INSTALL.md) -->
<!--
#### Run the server (Note: Because it rewrites the Go network library, this library is temporarily not supported by Windows. Windows recommends using Docker to run.)
```
$ go run cmd/app/main.go -e mode=test
```
After the server is running, visit http://127.0.0.1:1516/api to view the api document
#### Client SDK
Android SDK: [Android SDK (built-in simple demo)](https://github.com/WuKongIM/WuKongIMAndroidSDK.git)
iOS SDK: Open source from Star to 500 (please help us some Star, thanks 😄)
JS SDK: Star to 1000 open source (please help us some Star, thanks 😄)
note Please check [document](http://www.githubim.com/docs) for the use of SDK
## Quick play
***Log in to test1, test2 and test2 to send a message "hello" to test1***
```
// Log in to test1
$ go run cmd/play/main.go -user=test1
```
```
// Log in to test2
$ go run cmd/play/main.go -user=test2
```
test2 sends the message hello to test1
```
$ >send hello to test1
```
### Performance Testing
One-click pressure test
```
./bench.sh
```
My test results are as follows:
Achieve a throughput of 63420 messages per second, which is close to the pressure test data of redis!
```
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
SEND: 2021/06/29 15:05:49 duration: 10.605478656s - 12.096mb/s - 63420.051ops/s - 15.768us/op
``` -->
<!--
***分布式***
节点初始化
```
// 开启proxy服务 指定初始化的节点nodes
# WuKongIM proxy -c ./configs/proxy.toml -e replica=1
```
```
// 初始化的节点启动
# WuKongIM -c ./configs/config.toml -proxy=xx.xx.xx.xx:16666 -e nodeID=1001 -e nodeAddr=127.0.0.1:6666
(或者 WuKongIM -c ./configs/config.toml -peers=1@http://127.0.0.1:6000,2@http://127.0.0.1:6001,3@http://127.0.0.1:6002 -e nodeID=1)
```
```
// 初始化的节点启动
# WuKongIM -e proxy=xx.xx.xx.xx:16666 -e nodeID=1002 -e nodeAddr=127.0.0.1:6667
```
增加节点
```
# WuKongIM -proxy=xx.xx.xx.xx:16666 -e nodeID=1003 -join
```
移除节点
```
# WuKongIM -e nodeID=1003 -remove
``` -->
#### Run via Docker Compose
```
$ docker-compose up
```

150
README_CN.md Normal file
View File

@@ -0,0 +1,150 @@
## WuKongIM 一切很简单)
本项目是一款简单易用,性能强劲,设计理念简洁的即时通讯服务,完全自定义协议的聊天(Chat)系统
<p align="center">
<img align="left" width="160" src="./docs/logo.png">
<ul>
<li><strong>QQ群</strong>: <a href="#">750224611</a></li>
<li><strong>官网</strong>: http://www.githubim.com</li>
<li><strong>源码</strong>: https://github.com/WuKongIM/WuKongIM</li>
<li><strong>通讯协议</strong>: <a href="./docs/protocol.md">WuKongIM协议</a></li>
<li><strong>提问</strong>: https://github.com/WuKongIM/WuKongIM/issues</li>
<li><strong>文档</strong>: http://www.githubim.com/docs</li>
</ul>
</p>
[![Docs](https://img.shields.io/badge/docs-latest-green.svg)](http://githubim.com/docs)
[![](https://img.shields.io/apm/l/vim-mode)](./LICENSE)
## 特点
* go语言开发高性能与易维护兼得。
* 底层100%开源。
* 二进制协议(支持自定义)包大小极小最小心跳包只有1byte耗电小流量小传输速度快。
* 消息通道和消息内容全程加密,防中间人攻击和串改消息内容。
* 简单易用性能强劲MAC笔记本单机测试6w多/秒的消息吞吐量,为了达到这性能和简单易用,完全自主实现消息存储,无如何第三方组件依赖,一条命令即可启动服务
* 采用tcp协议+ack机制保证消息稳定可靠不丢。
* 扩展性强 采用频道设计理念,目前支持群组频道,点对点频道,后续可以根据自己业务自定义频道可实现机器人频道,客服频道等等功能。
* 多端同步webpcapp消息实时同步。
* 同时无差别支持tcpwebsocket。
* 万人群支持。
* 消息分区永久存储,卸载设备消息不丢。
* 支持读模式的离线拉取
<!-- ## 快速入门 -->
<!-- <img src="./docs/quick.gif" alt="快速入门"/> -->
<!-- [获取limaoim执行文件](./INSTALL.md) -->
<!-- #### 运行服务端 注意因为重写了Go的网络库此库Windows暂时不支持Windows建议使用Docker运行。 -->
<!--
```
$ go run cmd/app/main.go -e mode=test (mode=test是测试模式 方便快速试玩 生产不要加此参数)
``` -->
<!-- 服务器运行后,访问 http://127.0.0.1:1516/api 查看api文档 -->
## 客户端SDK
[Android SDK](https://github.com/WuKongIM/WuKongIMAndroidSDK.git)
[iOS SDK](https://github.com/WuKongIM/WuKongIMiOSSDK.git)
[JS SDK](https://github.com/WuKongIM/WuKongIMJSSDK.git)
SDK的使用请查看[文档](http://www.githubim.com/docs)
## 客户端Demo
[Android Demo](https://github.com/WuKongIM/WuKongIMAndroidDemo.git)
[iOS Demo](https://github.com/WuKongIM/WuKongIMiOSDemo.git)
[Web Demo](https://github.com/WuKongIM/WuKongIMJSDemo.git)
<!--
## 快速试玩
***登录test1,test2 test2向test1发送一条消息“hello”***
```
// 登录test1
$ go run cmd/play/main.go -user=test1
```
```
// 登录test2
$ go run cmd/play/main.go -user=test2
```
test2发送消息hello给test1
```
$ > send hello to test1
```
### 性能测试
一键压测
```
./bench.sh
```
本人测试结果如下:
达到每秒63420条消息的吞吐量接近redis的压测数据
```
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
SEND: 2021/06/29 15:05:49 duration: 10.605478656s - 12.096mb/s - 63420.051ops/s - 15.768us/op
``` -->
<!--
***分布式***
节点初始化
```
// 开启proxy服务 指定初始化的节点nodes
# limaoim proxy -c ./configs/proxy.toml -e replica=1
```
```
// 初始化的节点启动
# limaoim -c ./configs/config.toml -proxy=xx.xx.xx.xx:16666 -e nodeID=1001 -e nodeAddr=127.0.0.1:6666
(或者 limaoim -c ./configs/config.toml -peers=1@http://127.0.0.1:6000,2@http://127.0.0.1:6001,3@http://127.0.0.1:6002 -e nodeID=1)
```
```
// 初始化的节点启动
# limaoim -e proxy=xx.xx.xx.xx:16666 -e nodeID=1002 -e nodeAddr=127.0.0.1:6667
```
增加节点
```
# limaoim -proxy=xx.xx.xx.xx:16666 -e nodeID=1003 -join
```
移除节点
```
# limaoim -e nodeID=1003 -remove
``` -->
## 通过Docker Compose运行
```
$ docker-compose up
```

View File

@@ -6,7 +6,7 @@ import (
"os"
"github.com/WuKongIM/WuKongIM/internal/server"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/judwhite/go-svc"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@@ -17,9 +17,9 @@ var (
serverOpts = server.NewOptions()
mode string
rootCmd = &cobra.Command{
Use: "lim",
Short: "LiMaoIM 简洁,性能强劲的即时通讯平台",
Long: `LiMaoIM 简洁,性能强劲的即时通讯平台 详情查看文档https://docs.limaoim.cn`,
Use: "wk",
Short: "WuKongIM 简洁,性能强劲的即时通讯平台",
Long: `WuKongIM 简洁,性能强劲的即时通讯平台 详情查看文档https://docs.wukongim.cn`,
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
@@ -32,7 +32,7 @@ var (
func init() {
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.limao.yaml)")
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.wk.yaml)")
rootCmd.PersistentFlags().StringVar(&mode, "mode", "release", "模式")
}
@@ -47,7 +47,7 @@ func initConfig() {
vp.AddConfigPath(home)
vp.SetConfigType("yaml")
vp.SetConfigName(".limao")
vp.SetConfigName(".wukongim")
}
if err := vp.ReadInConfig(); err == nil {
@@ -61,11 +61,11 @@ func initConfig() {
}
func initServer() {
logOpts := limlog.NewOptions()
logOpts := wklog.NewOptions()
logOpts.Level = serverOpts.Logger.Level
logOpts.LogDir = serverOpts.Logger.Dir
logOpts.LineNum = serverOpts.Logger.LineNum
limlog.Configure(logOpts)
wklog.Configure(logOpts)
s := server.New(serverOpts)

BIN
docs/demo_qrcode.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.2 KiB

BIN
docs/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

862
docs/protocol.md Normal file
View File

@@ -0,0 +1,862 @@
## WuKongIM协议
## 控制报文结构
| 参数名 | 类型 | 说明 |
| :----- | :--- | :--- |
| Fixed header | 2 byte | 固定报头 |
| Variable header | bytes | 可变报头 |
| Payload | bytes | 消息体 |
## 固定报头
每个 WuKongIM 控制报文都包含一个固定报头
<table>
<tr>
<th>Bit</th>
<th>7</th>
<th>6</th>
<th>5</th>
<th>4</th>
<th>3</th>
<th>2</th>
<th>1</th>
<th>0</th>
</tr>
<tr>
<td>byte 1</td>
<td colspan="4">WuKongIM控制报文的类型</td>
<td colspan="4">用于指定控制报文类型的标志位</td>
</tr>
<tr>
<td>byte 2...</td>
<td colspan="8">剩余长度</td>
</tr>
</table>
#### WuKongIM控制报文的类型
<table>
<tr>
<th>名字</th>
<th>值</th>
<th>描述</th>
</tr>
<tr>
<td>Reserved</td>
<td >0</td>
<td>保留位</td>
</tr>
<tr>
<td>CONNECT</td>
<td >1</td>
<td>客户端请求连接到服务器(c2s)</td>
</tr>
<tr>
<td>CONNACK</td>
<td >2</td>
<td>服务端收到连接请求后确认的报文(s2c)</td>
</tr>
<tr>
<td>SEND</td>
<td >3</td>
<td>发送消息(c2s)</td>
</tr>
<tr>
<td>SENDACK</td>
<td >4</td>
<td>收到消息确认的报文(s2c)</td>
</tr>
<tr>
<td>RECVEIVED</td>
<td >5</td>
<td>收取消息(s2c)</td>
</tr>
<tr>
<td>REVACK</td>
<td >6</td>
<td>收取消息确认(c2s)</td>
</tr>
<tr>
<td>PING</td>
<td >7</td>
<td>ping请求</td>
</tr>
<tr>
<td>PONG</td>
<td >8</td>
<td>对ping请求的相应</td>
</tr>
<tr>
<td>DISCONNECT</td>
<td >9</td>
<td>请求断开连接</td>
</tr>
</table>
#### 用于指定控制报文类型的标志位
<table>
<tr>
<th>bit</th>
<th>3</th>
<th>2</th>
<th>1</th>
<th>0</th>
</tr>
<tr>
<td>byte</td>
<td>DUP</td>
<td>SyncOnce</td>
<td>RedDot</td>
<td>NoPersist</td>
</tr>
</table>
备注:
```
DUP: 是否是重复的消息(客户端重发消息的时候需要将DUP标记为1)
SyncOnce: 只同步一次 在多端设备的情况下 如果有一个设备拉取过此消息,其他设备将不会再拉取到此消息(比如加好友消息)
RedDot: 客户端收到消息是否显示红点
NoPersist: 是否不存储此消息
```
#### 剩余长度
在当前消息中剩余的byte(字节)数,包含可变头部和负荷(内容)。
单个字节最大值0111111116进制0x7F10进制为127。
WuKongIM协议规定第八位最高位若为1则表示还有后续字节存在。
WuKongIM协议最多允许4个字节表示剩余长度。最大长度为0xFF,0xFF,0xFF,0x7F二进制表示为:11111111,11111111,11111111,01111111十进制268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:
Digits | From | To
---|---|---
1 | 0 (0x00) | 127 (0x7F)
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F)
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F)
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)
其实换个方式理解第1字节的基数是1而第2字节的基数128以此类推第三字节的基数是128*128=2的14次方第四字节是128*128*128=2的21次方
例如需要表达321=2*128+65.2字节10100001 0000 0011.
(和我们理解的低位运算放置顺序不一样,第一个字节是低位,后续字节是高位,但字节内部本身是低位右边,高位左边)。
#### 字符串UTF-8编码
有关字符串WuKongIM采用的是修改版的UTF-8编码一般形式为如下
<table>
<tr>
<th>bit</th>
<th>7</th>
<th>6</th>
<th>5</th>
<th>4</th>
<th>3</th>
<th>2</th>
<th>1</th>
<th>0</th>
</tr>
<tr>
<td>byte 1</td>
<td colspan="8">String Length MSB</td>
</tr>
<tr>
<td>byte 2</td>
<td colspan="8">String Length MSB</td>
</tr>
<tr>
<td>bytes 3...</td>
<td colspan="8">Encoded Character Data</td>
</tr>
</table>
## 可变报头
某些控制报文包含一个可变报头部分。它在固定报头和有效载荷之间。可变报头的内容根据报文类型的不同而不同。可变报头的报文标识符(Packet Identifier)字段存在于在多个类型的报文里。
---
## CONNECT 连接报文
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(1)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>Protocol Version</td>
<td>int8</td>
<td>协议版本号</td>
</tr>
<tr>
<td>UID</td>
<td>string</td>
<td>用户ID</td>
</tr>
<tr>
<td>Token</td>
<td>string</td>
<td>用户的token</td>
</tr>
<tr>
<td>Client Key</td>
<td>string</td>
<td>客户端KEY (客户端KEY (base64编码的DH公钥))</td>
</tr>
<tr>
<td>Device Flag</td>
<td>int8</td>
<td>设备标示(同标示同账号互踢)</td>
</tr>
<tr>
<td>Device ID</td>
<td>string</td>
<td>设备唯一ID</td>
</tr>
<tr>
<td>Client Timestamp</td>
<td>int64</td>
<td>客户端当前时间戳(13位时间戳,到毫秒)</td>
</tr>
</table>
## CONNACK 连接确认
CONNACK 报文由服务端所发送,作为对来自客户端的 CONNECT 报文的响应,如果客户端在合理的时间内没有收到服务端的 CONNACK 报文,客户端应该关闭网络连接。合理的时间取决于应用的类型和通信基础设施。
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(2)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>Server Key</td>
<td>string</td>
<td>服务端base64的DH公钥</td>
</tr>
<tr>
<td>Salt</td>
<td>string</td>
<td>安全码
</td>
</tr>
<tr>
<td>Time Diff</td>
<td>int64</td>
<td>客户端时间与服务器的差值,单位毫秒。
</td>
</tr>
<tr>
<td>Reason Code</td>
<td>uint8</td>
<td>连接原因码(见附件)</td>
</tr>
</table>
## SEND 发送消息
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(3)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>Setting</td>
<td>1 byte</td>
<td>消息设置(见下 版本4有效</td>
</tr>
<tr>
<td>Msg Key</td>
<td>string</td>
<td>用于验证此消息是否合法(仿中间人篡改)</td>
</tr>
<tr>
<td>Client Seq</td>
<td>uint32</td>
<td>客户端消息序列号(由客户端生成,每个客户端唯一)</td>
</tr>
<tr>
<td>Client Msg No</td>
<td>string</td>
<td>客户端唯一标示用于客户端消息去重version==2</td>
</tr>
<tr>
<td>Channel Id</td>
<td>string</td>
<td>频道ID如果是个人频道ChannelId为个人的UID</td>
</tr>
<tr>
<td>Channel Type</td>
<td>int8</td>
<td>频道类型1.个人 2.群组)</td>
</tr>
<tr>
<td>Payload</td>
<td>... byte</td>
<td>消息内容</td>
</tr>
</table>
## SENDACK 发送消息确认
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(4)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>Client Seq</td>
<td>uint32</td>
<td>客户端消息序列号</td>
</tr>
<tr>
<td>Message ID</td>
<td>uint64</td>
<td>服务端的消息ID(全局唯一)</td>
</tr>
<tr>
<td>Message Seq</td>
<td>uint32</td>
<td>消息序号(有序递增,用户唯一)</td>
</tr>
<tr>
<td>Reason Code</td>
<td>uint8</td>
<td>发送原因代码 1表示成功</td>
</tr>
</table>
## RECV 收消息
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(5)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>Setting</td>
<td>1 byte</td>
<td>消息设置(见下 版本4有效</td>
</tr>
<tr>
<td>Msg Key</td>
<td>string</td>
<td>用于验证此消息是否合法(仿中间人篡改)</td>
</tr>
<tr>
<td>Message ID</td>
<td>uint64</td>
<td>服务端的消息ID(全局唯一)</td>
</tr>
<tr>
<td>Message Seq</td>
<td>uint32</td>
<td>服务端的消息序列号(有序递增,用户唯一)</td>
</tr>
<tr>
<td>Client Msg No</td>
<td>string</td>
<td>客户端唯一标示用于客户端消息去重version==2</td>
</tr>
<tr>
<td>Message Timestamp</td>
<td>int32</td>
<td>服务器消息时间戳(10位到秒)</td>
</tr>
<tr>
<td>Channel ID</td>
<td>string</td>
<td>频道ID</td>
</tr>
<tr>
<td>Channel Type</td>
<td>int8</td>
<td>频道类型</td>
</tr>
<tr>
<td>From UID</td>
<td>string</td>
<td>发送者UID</td>
</tr>
<tr>
<td>Payload</td>
<td>... byte</td>
<td>消息内容</td>
</tr>
</table>
## RECVACK 收消息确认
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(6)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>Message ID</td>
<td>uint64</td>
<td>服务端的消息ID(全局唯一)</td>
</tr>
<tr>
<td>Message Seq</td>
<td>uint32</td>
<td>序列号</td>
</tr>
</table>
## PING
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(7)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
</table>
## PONG
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(8)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
</table>
## DISCONNECT
<table>
<tr>
<th>参数名</th>
<th>类型</th>
<th>说明</th>
</tr>
<tr>
<td>Frame Type</td>
<td >0.5 byte</td>
<td>报文类型(9)</td>
</tr>
<tr>
<td>Flag</td>
<td >0.5 byte</td>
<td> 标示位</td>
</tr>
<tr>
<td>Remaining Length</td>
<td >... byte</td>
<td>报文剩余长度</td>
</tr>
<tr>
<td>ReasonCode</td>
<td >uint8</td>
<td>原因代码</td>
</tr>
<tr>
<td>Reason</td>
<td >string</td>
<td>原因</td>
</tr>
</table>
## 消息设置
<table>
<tr>
<th>bit</th>
<th>7</th>
<th>6</th>
<th>5</th>
<th>4</th>
<th>3</th>
<th>2</th>
<th>1</th>
<th>0</th>
</tr>
<tr>
<td>byte</td>
<td>Receipt</td>
<td>Reserved</td>
<td>Reserved</td>
<td>Reserved</td>
<td>Reserved</td>
<td>Reserved</td>
<td>Reserved</td>
<td>Reserved</td>
</tr>
</table>
消息设置目前大小为1byte 8个bit
Receipt 消息已读回执,此标记表示,此消息需要已读回执
Reserved保留位暂未用到
## Payload 推荐结构
* 文本
```
{
"type": 1,
"content": "这是一条文本消息"
}
```
* 文本(带@)
```
{
"type": 1,
"content": "这是一条文本消息",
"mention":{
"all": 0, // 是否@所有人 0. @用户 1. @所有
"uids":["1223","2323"] // 如果all=1 此字段为空
}
}
```
* 文本(带回复)
```
{
"type": 1,
"content": "回复了某某" ,
"reply": {
"root_mid": "xxx", // 根消息的message_id
"message_id": "xxxx", // 被回复的消息ID
"message_seq": xxx, // 被回复的消息seq
"from_uid": "xxxx", // 被回复消息的发送者
"from_name": "xxx", // 被回复消息的发送者名称
"payload": {} // 被回复消息的payload
}
}
```
* 图片
```
{
"type": 2,
"url": "http://xxxxx.com/xxx", // 图片下载地址
"width": 200, // 图片宽度
"height": 320 // 图片高度
}
```
* GIF
```
{
"type": 3,
"url": "http://xxxxx.com/xxx", // gif下载地址
"width": 72, // gif宽度
"height": 72 // gif高度
}
```
* 语音
```
{
"type": 4,
"url": "http://xxxxx.com/xxx", // 语音下载地址
"timeTrad": 10 // 语音秒长
}
```
* 文件
```
{
"type": 8,
"url": "http://xxxxx.com/xxx", // 文件下载地址
"name":"xxxx.docx", // 文件名称
"size": 238734 // 大小 单位byte
}
```
* 命令消息
```
{
"type": 99,
"cmd": "groupUpdate", // 命令指令标示
"param": {} // 命令对应的数据
}
```
## 系统消息
系统消息的type必须大于1000
* 创建群聊 (NoPersist:0,RedDot:0,SyncOnce:1)
张三邀请李四、王五加入群聊
```
{
"type": 1001,
"creator": "xxx", // 创建者uid
"creator_name": "张三", // 创建者名称
"content": "{0}邀请{1}、{2}加入群聊",
"extra": [{"uid":"xxx","name":"张三"},{"uid":"xx01","name":"李四"},{"uid":"xx02","name":"王五"}]
}
```
* 添加群成员 (NoPersist:0,RedDot:0,SyncOnce:1)
张三邀请李四、王五加入群聊
```
{
"type": 1002,
"content": "{0}邀请{1}、{2}加入群聊",
"extra": [{"uid":"xxx","name":"张三"},{"uid":"xx01","name":"李四"},{"uid":"xx02","name":"王五"}]
}
```
* 移除群成员 (NoPersist:0,RedDot:0,SyncOnce:1)
张三将李四移除群聊
```
{
"type": 1003,
"content": "{0}将{1}移除群聊",
"extra": [{"uid":"xxx","name":"张三"},{"uid":"xx01","name":"李四"}]
}
```
* 群成员被踢 (NoPersist:0,RedDot:1,SyncOnce:0)
```
{
"type": 1010,
"content": "你被{0}移除群聊",
"extra": [{"uid":"xxx","name":"张三"}]
}
```
张三将李四移除群聊
```
{
"type": 1003,
"content": "{0}将{1}移除群聊",
"extra": [{"uid":"xxx","name":"张三"},{"uid":"xx01","name":"李四"}]
}
```
* 更新群名称 (NoPersist:0,RedDot:0,SyncOnce:1)
张三修改群名称为"测试群"
```
{
"type": 1005,
"content": "{0}修改群名为\"测试群\"",
"extra": [{"uid":"xxx","name":"张三"}]
}
```
* 更新群公告 (NoPersist:0,RedDot:0,SyncOnce:1)
张三修改群公告为"这是一个群公告"
```
{
"type": 1005,
"content": "{0}修改群公告为\"这是一个群公告\"",
"extra": [{"uid":"xxx","name":"张三"}]
}
```
* 撤回消息 (NoPersist:0,RedDot:0,SyncOnce:1)
张三撤回了一条消息
```
{
"type": 1006,
"message_id": "234343435", // 需要撤回的消息ID
"content": "{0}撤回了一条消息",
"extra": [{"uid":"xxx","name":"张三"}]
}
```
## 命令类消息
* 命令类消息 (SyncOnce:1)
```
{
"type": 99,
"cmd": "cmd", // 命令标示
"param": {} // 命令参数
}
```
* 群成员信息有更新(收到此消息客户端应该增量同步群成员信息)
```
{
"type": 99,
"cmd": "memberUpdate",
"param": {
"group_no": "xxxx"
}
}
```
* 红点消除(收到此命令客户端应将对应的会话信息的红点消除)
```
{
"type": 99,
"cmd": "unreadClear",
"param": {
"channel_id": "xxxx",
"channel_type": 2
}
}
```

BIN
docs/quick.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 MiB

View File

@@ -8,28 +8,28 @@ import (
"strconv"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/lmhttp"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkhttp"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
)
type ConnzAPI struct {
limlog.Log
wklog.Log
s *Server
}
func NewConnzAPI(s *Server) *ConnzAPI {
return &ConnzAPI{
Log: limlog.NewLIMLog("ConnzAPI"),
Log: wklog.NewWKLog("ConnzAPI"),
s: s,
}
}
func (co *ConnzAPI) Route(r *lmhttp.LMHttp) {
func (co *ConnzAPI) Route(r *wkhttp.WKHttp) {
r.GET("/connz", co.HandleConnz)
}
func (co *ConnzAPI) HandleConnz(c *lmhttp.Context) {
func (co *ConnzAPI) HandleConnz(c *wkhttp.Context) {
clients := co.s.clientManager.GetAllClient()
sortStr := c.Query("sort")
offset64, _ := strconv.ParseInt(c.Query("offset"), 10, 64)
@@ -141,15 +141,15 @@ func device(cli *client) string {
d := "未知"
level := "主"
switch cli.deviceFlag {
case lmproto.APP:
case wkproto.APP:
d = "App"
case lmproto.PC:
case wkproto.PC:
d = "PC"
case lmproto.WEB:
case wkproto.WEB:
d = "Web"
}
if cli.deviceLevel == lmproto.DeviceLevelSlave {
if cli.deviceLevel == wkproto.DeviceLevelSlave {
level = "从"
}

View File

@@ -5,13 +5,13 @@ import (
"net/http"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/lmhttp"
"github.com/WuKongIM/WuKongIM/pkg/pse"
"github.com/WuKongIM/WuKongIM/pkg/wkhttp"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
)
type VarzAPI struct {
limlog.Log
wklog.Log
s *Server
}
@@ -19,15 +19,15 @@ func NewVarzAPI(s *Server) *VarzAPI {
return &VarzAPI{
s: s,
Log: limlog.NewLIMLog("VarzAPI"),
Log: wklog.NewWKLog("VarzAPI"),
}
}
func (v *VarzAPI) Route(r *lmhttp.LMHttp) {
func (v *VarzAPI) Route(r *wkhttp.WKHttp) {
r.GET("/varz", v.HandleVarz)
}
func (v *VarzAPI) HandleVarz(c *lmhttp.Context) {
func (v *VarzAPI) HandleVarz(c *wkhttp.Context) {
var rss, vss int64 // rss内存 vss虚拟内存
var pcpu float64 // cpu
@@ -46,7 +46,7 @@ func (v *VarzAPI) createVarz(pcpu float64, rss int64) *Varz {
connCount := v.s.clientManager.GetAllClientCount()
return &Varz{
ServerID: fmt.Sprintf("%d", opts.NodeID),
ServerName: "LiMaoIM",
ServerName: "WuKongIM",
Version: opts.Version,
Connections: connCount,
Uptime: myUptime(time.Since(v.s.start)),

View File

@@ -7,8 +7,8 @@ import (
"time"
"github.com/RussellLuo/timingwheel"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/valyala/bytebufferpool"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -31,13 +31,13 @@ type client struct {
clientStats
conn conn
uid string // 用户UID
deviceFlag lmproto.DeviceFlag // 设备标示
deviceLevel lmproto.DeviceLevel // 设备等级
deviceFlag wkproto.DeviceFlag // 设备标示
deviceLevel wkproto.DeviceLevel // 设备等级
deviceID string // 设备ID
s *Server
aesKey string // 消息密钥(用于加解密消息的)
aesIV string
limlog.Log
wklog.Log
lastActivity time.Time // 最后一次活动时间
uptime time.Time // 客户端启动时间
@@ -59,7 +59,7 @@ func (c *client) Start() {
c.stopped.Store(false)
c.Log = limlog.NewLIMLog(fmt.Sprintf("Client[%d]", c.ID()))
c.Log = wklog.NewWKLog(fmt.Sprintf("Client[%d]", c.ID()))
c.Activity()
c.uptime = time.Now()
outboundOpts := NewOutboundOptions()
@@ -195,7 +195,7 @@ func (c *client) handleInbound() error {
// 处理包
offset := 0
frames := make([]lmproto.Frame, 0)
frames := make([]wkproto.Frame, 0)
for len(msgBytes) > offset {
frame, size, err := c.s.opts.Proto.DecodeFrame(msgBytes[offset:], c.Version())
if err != nil { //

View File

@@ -3,7 +3,7 @@ package server
import (
"sync"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
)
// ClientManager 客户端管理
@@ -124,7 +124,7 @@ func (m *ClientManager) GetOnlineClients(uids []string) []*client {
}
// GetClientsWith 查询设备
func (m *ClientManager) GetClientsWith(uid string, deviceFlag lmproto.DeviceFlag) []*client {
func (m *ClientManager) GetClientsWith(uid string, deviceFlag wkproto.DeviceFlag) []*client {
clients := m.GetClientsWithUID(uid)
if len(clients) == 0 {
return nil
@@ -139,7 +139,7 @@ func (m *ClientManager) GetClientsWith(uid string, deviceFlag lmproto.DeviceFlag
}
// GetClientCountWith 获取设备的在线数量和用户所有设备的在线数量
func (m *ClientManager) GetClientCountWith(uid string, deviceFlag lmproto.DeviceFlag) (int, int) {
func (m *ClientManager) GetClientCountWith(uid string, deviceFlag wkproto.DeviceFlag) (int, int) {
clients := m.GetClientsWithUID(uid)
if len(clients) == 0 {
return 0, 0

View File

@@ -4,15 +4,15 @@ import (
"sync"
"sync/atomic"
"github.com/WuKongIM/WuKongIM/pkg/limnet"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wknet"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
)
type connContext struct {
isDisableRead bool
conn limnet.Conn
conn wknet.Conn
frameCacheLock sync.RWMutex
frameCaches []lmproto.Frame
frameCaches []wkproto.Frame
s *Server
inflightCount atomic.Int32 // frame inflight count
}
@@ -25,7 +25,7 @@ func newConnContext(s *Server) *connContext {
}
}
func (c *connContext) putFrame(frame lmproto.Frame) {
func (c *connContext) putFrame(frame wkproto.Frame) {
c.frameCacheLock.Lock()
defer c.frameCacheLock.Unlock()
@@ -38,11 +38,11 @@ func (c *connContext) putFrame(frame lmproto.Frame) {
}
func (c *connContext) popFrames() []lmproto.Frame {
func (c *connContext) popFrames() []wkproto.Frame {
c.frameCacheLock.RLock()
defer c.frameCacheLock.RUnlock()
newFrames := c.frameCaches
c.frameCaches = make([]lmproto.Frame, 0, 250)
c.frameCaches = make([]wkproto.Frame, 0, 250)
return newFrames
}
@@ -75,7 +75,7 @@ func (c *connContext) enableRead() {
}
func (c *connContext) init() {
c.frameCaches = make([]lmproto.Frame, 0, 250)
c.frameCaches = make([]wkproto.Frame, 0, 250)
}
func (c *connContext) release() {

View File

@@ -3,11 +3,11 @@ package server
import (
"sync"
"github.com/WuKongIM/WuKongIM/pkg/limnet"
"github.com/WuKongIM/WuKongIM/pkg/wknet"
)
type ConnManager struct {
connMap map[int64]limnet.Conn
connMap map[int64]wknet.Conn
userConnMap map[string][]int64
sync.RWMutex
}
@@ -16,7 +16,7 @@ func NewConnManager() *ConnManager {
return &ConnManager{}
}
func (c *ConnManager) Add(conn limnet.Conn) {
func (c *ConnManager) Add(conn wknet.Conn) {
c.Lock()
defer c.Unlock()
c.connMap[conn.ID()] = conn

View File

@@ -3,36 +3,36 @@ package server
import (
"sync"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limnet"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wknet"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"go.uber.org/zap"
)
type Dispatch struct {
engine *limnet.Engine
engine *wknet.Engine
s *Server
processor *Processor
limlog.Log
wklog.Log
framePool sync.Pool
}
func NewDispatch(s *Server) *Dispatch {
return &Dispatch{
engine: limnet.NewEngine(),
engine: wknet.NewEngine(),
s: s,
processor: NewProcessor(s),
Log: limlog.NewLIMLog("Dispatch"),
Log: wklog.NewWKLog("Dispatch"),
framePool: sync.Pool{
New: func() any {
return make([]lmproto.Frame, 20)
return make([]wkproto.Frame, 20)
},
},
}
}
// frame data in
func (d *Dispatch) dataIn(conn limnet.Conn) error {
func (d *Dispatch) dataIn(conn wknet.Conn) error {
buff, err := conn.Peek(-1)
if err != nil {
return err
@@ -45,20 +45,20 @@ func (d *Dispatch) dataIn(conn limnet.Conn) error {
return nil
}
if !conn.IsAuthed() { // conn is not authed must be connect packet
packet, _, err := d.s.opts.Proto.DecodeFrame(data, lmproto.LatestVersion)
packet, _, err := d.s.opts.Proto.DecodeFrame(data, wkproto.LatestVersion)
if err != nil {
d.Warn("Failed to decode the message", zap.Error(err))
conn.Close()
return nil
}
if packet.GetFrameType() != lmproto.CONNECT {
if packet.GetFrameType() != wkproto.CONNECT {
d.Warn("请先进行连接!")
conn.Close()
return nil
}
// process conn auth
conn.Discard(len(data))
d.processor.processAuth(conn, packet.(*lmproto.ConnectPacket))
d.processor.processAuth(conn, packet.(*wkproto.ConnectPacket))
} else { // authed
offset := 0
for len(data) > offset {
@@ -71,6 +71,9 @@ func (d *Dispatch) dataIn(conn limnet.Conn) error {
if frame == nil {
break
}
// if frame.GetFrameType() == wkproto.PING {
// d.processor.processPing(conn, frame.(*wkproto.PingPacket))
// }
conn.Context().(*connContext).putFrame(frame)
offset += size
}
@@ -82,7 +85,7 @@ func (d *Dispatch) dataIn(conn limnet.Conn) error {
}
// frame data out
func (d *Dispatch) dataOut(conn limnet.Conn, frames ...lmproto.Frame) {
func (d *Dispatch) dataOut(conn wknet.Conn, frames ...wkproto.Frame) {
if len(frames) == 0 {
return
}
@@ -101,7 +104,7 @@ func (d *Dispatch) dataOut(conn limnet.Conn, frames ...lmproto.Frame) {
}
func (d *Dispatch) connClose(conn limnet.Conn, err error) {
func (d *Dispatch) connClose(conn wknet.Conn, err error) {
d.processor.processClose(conn, err)
}

View File

@@ -6,16 +6,16 @@ import (
"strings"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"github.com/bwmarrin/snowflake"
"go.uber.org/zap"
)
type PacketHandler struct {
s *Server
limlog.Log
wklog.Log
messageIDGen *snowflake.Node // 消息ID生成器
}
@@ -23,7 +23,7 @@ type PacketHandler struct {
func NewPacketHandler(s *Server) *PacketHandler {
h := &PacketHandler{
s: s,
Log: limlog.NewLIMLog("Handler"),
Log: wklog.NewWKLog("Handler"),
}
var err error
h.messageIDGen, err = snowflake.NewNode(int64(s.opts.NodeID))
@@ -33,7 +33,7 @@ func NewPacketHandler(s *Server) *PacketHandler {
return h
}
func (s *PacketHandler) handleConnect2(c conn, connectPacket *lmproto.ConnectPacket) {
func (s *PacketHandler) handleConnect2(c conn, connectPacket *wkproto.ConnectPacket) {
if strings.TrimSpace(connectPacket.ClientKey) == "" {
s.writeConnackAuthFail(c)
return
@@ -41,7 +41,7 @@ func (s *PacketHandler) handleConnect2(c conn, connectPacket *lmproto.ConnectPac
c.SetVersion(connectPacket.Version)
dhServerPrivKey, dhServerPublicKey := limutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
dhServerPrivKey, dhServerPublicKey := wkutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
aesKey, aesIV, err := s.getClientAesKeyAndIV(connectPacket.ClientKey, dhServerPrivKey)
if err != nil {
s.Error("获取客户端的aesKey和aesIV失败", zap.Error(err))
@@ -65,10 +65,10 @@ func (s *PacketHandler) handleConnect2(c conn, connectPacket *lmproto.ConnectPac
s.s.clientManager.Add(cli)
data, _ := s.s.opts.Proto.EncodeFrame(&lmproto.ConnackPacket{
data, _ := s.s.opts.Proto.EncodeFrame(&wkproto.ConnackPacket{
Salt: aesIV,
ServerKey: dhServerPublicKeyEnc,
ReasonCode: lmproto.ReasonSuccess,
ReasonCode: wkproto.ReasonSuccess,
TimeDiff: timeDiff,
}, c.Version())
@@ -79,16 +79,16 @@ func (s *PacketHandler) handleConnect2(c conn, connectPacket *lmproto.ConnectPac
// func (s *PacketHandler) handleConnect(c *limContext) {
// c.c.SetAuthed(true)
// connectPacket := c.frame.(*lmproto.ConnectPacket)
// connectPacket := c.frame.(*wkproto.ConnectPacket)
// if strings.TrimSpace(connectPacket.ClientKey) == "" {
// s.writeConnackError(c.c, lmproto.ReasonClientKeyIsEmpty)
// s.writeConnackError(c.c, wkproto.ReasonClientKeyIsEmpty)
// return
// }
// c.c.SetVersion(connectPacket.Version)
// dhServerPrivKey, dhServerPublicKey := limutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
// dhServerPrivKey, dhServerPublicKey := wkutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
// aesKey, aesIV, err := s.getClientAesKeyAndIV(connectPacket.ClientKey, dhServerPrivKey)
// if err != nil {
// s.Error("获取客户端的aesKey和aesIV失败", zap.Error(err))
@@ -112,10 +112,10 @@ func (s *PacketHandler) handleConnect2(c conn, connectPacket *lmproto.ConnectPac
// s.s.clientManager.Add(cli)
// c.writePacket(&lmproto.ConnackPacket{
// c.writePacket(&wkproto.ConnackPacket{
// Salt: aesIV,
// ServerKey: dhServerPublicKeyEnc,
// ReasonCode: lmproto.ReasonSuccess,
// ReasonCode: wkproto.ReasonSuccess,
// TimeDiff: timeDiff,
// })
// }
@@ -123,8 +123,6 @@ func (s *PacketHandler) handleConnect2(c conn, connectPacket *lmproto.ConnectPac
// 处理发送包
func (s *PacketHandler) handleSend(c *limContext) {
fmt.Println("handleSend....")
cli := c.Client()
if cli == nil {
s.Warn("客户端不存在!")
@@ -136,14 +134,14 @@ func (s *PacketHandler) handleSend(c *limContext) {
return
}
sendackPackets := make([]lmproto.Frame, 0, len(sendPackets))
sendackPackets := make([]wkproto.Frame, 0, len(sendPackets))
if !cli.Allow() {
s.Warn("消息发送过快,限流处理!", zap.String("uid", cli.uid))
for _, sendPacketFrame := range sendPackets {
sendPacket := sendPacketFrame.(*lmproto.SendPacket)
sendackPackets = append(sendackPackets, &lmproto.SendackPacket{
ReasonCode: lmproto.ReasonRateLimit,
sendPacket := sendPacketFrame.(*wkproto.SendPacket)
sendackPackets = append(sendackPackets, &wkproto.SendackPacket{
ReasonCode: wkproto.ReasonRateLimit,
ClientSeq: sendPacket.ClientSeq,
ClientMsgNo: sendPacket.ClientMsgNo,
})
@@ -152,9 +150,9 @@ func (s *PacketHandler) handleSend(c *limContext) {
return
}
for _, sendPacketFrame := range sendPackets {
sendPacket := sendPacketFrame.(*lmproto.SendPacket)
sendackPackets = append(sendackPackets, &lmproto.SendackPacket{
ReasonCode: lmproto.ReasonSuccess,
sendPacket := sendPacketFrame.(*wkproto.SendPacket)
sendackPackets = append(sendackPackets, &wkproto.SendackPacket{
ReasonCode: wkproto.ReasonSuccess,
ClientSeq: sendPacket.ClientSeq,
ClientMsgNo: sendPacket.ClientMsgNo,
})
@@ -173,7 +171,7 @@ func (s *PacketHandler) handlePing(c *limContext) {
fmt.Println("handlePing....")
cli := c.Client()
if cli != nil {
data, _ := s.s.opts.Proto.EncodeFrame(&lmproto.PongPacket{}, cli.Version())
data, _ := s.s.opts.Proto.EncodeFrame(&wkproto.PongPacket{}, cli.Version())
dataLen := int64(len(data))
cli.inMsgs.Inc()
@@ -194,16 +192,16 @@ func (s *PacketHandler) handlePing(c *limContext) {
}
func (s *PacketHandler) writeConnackError(c conn, resaon lmproto.ReasonCode) error {
func (s *PacketHandler) writeConnackError(c conn, resaon wkproto.ReasonCode) error {
return s.writeConnack(c, 0, resaon)
}
func (s *PacketHandler) writeConnackAuthFail(c conn) error {
return s.writeConnack(c, 0, lmproto.ReasonAuthFail)
return s.writeConnack(c, 0, wkproto.ReasonAuthFail)
}
func (s *PacketHandler) writeConnack(c conn, timeDiff int64, code lmproto.ReasonCode) error {
data, err := s.s.opts.Proto.EncodeFrame(&lmproto.ConnackPacket{
func (s *PacketHandler) writeConnack(c conn, timeDiff int64, code wkproto.ReasonCode) error {
data, err := s.s.opts.Proto.EncodeFrame(&wkproto.ConnackPacket{
ReasonCode: code,
TimeDiff: timeDiff,
}, c.Version())
@@ -228,9 +226,9 @@ func (s *PacketHandler) getClientAesKeyAndIV(clientKey string, dhServerPrivKey [
copy(dhClientPubKeyArray[:], clientKeyBytes[:32])
// 获得DH的共享key
shareKey := limutil.GetCurve25519Key(dhServerPrivKey, dhClientPubKeyArray) // 共享key
shareKey := wkutil.GetCurve25519Key(dhServerPrivKey, dhClientPubKeyArray) // 共享key
aesIV := limutil.GetRandomString(16)
aesKey := limutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
aesIV := wkutil.GetRandomString(16)
aesKey := wkutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
return aesKey, aesIV, nil
}

View File

@@ -7,14 +7,14 @@ import (
"sync"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"go.uber.org/zap"
)
type InboundManager struct {
bitmap *limutil.SlotBitMap
bitmap *wkutil.SlotBitMap
inboundMap map[uint32][]uint32
inboundMapLock sync.RWMutex
@@ -25,7 +25,7 @@ type InboundManager struct {
stopChan chan struct{}
s *Server
limlog.Log
wklog.Log
}
func NewInboundManager(s *Server) *InboundManager {
@@ -33,9 +33,9 @@ func NewInboundManager(s *Server) *InboundManager {
slotNum: 1,
stopChan: make(chan struct{}),
s: s,
Log: limlog.NewLIMLog("InboundManager"),
Log: wklog.NewWKLog("InboundManager"),
}
i.bitmap = limutil.NewSlotBitMap(int(i.slotNum))
i.bitmap = wkutil.NewSlotBitMap(int(i.slotNum))
i.inboundMap = make(map[uint32][]uint32, i.slotNum)
i.workConds = make([]*sync.Cond, i.slotNum)
for j := 0; j < int(i.slotNum); j++ {
@@ -176,7 +176,7 @@ func (i *InboundManager) inboundRead(cli *client) error {
// 处理包
offset := 0
frames := make([]lmproto.Frame, 0)
frames := make([]wkproto.Frame, 0)
for len(msgBytes) > offset {
frame, size, err := i.s.opts.Proto.DecodeFrame(msgBytes[offset:], cli.Version())
if err != nil { //

View File

@@ -4,7 +4,7 @@ import (
"net"
"time"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/valyala/bytebufferpool"
)
@@ -32,9 +32,9 @@ type conn interface {
}
type limContext struct {
frames []lmproto.Frame
frameType lmproto.FrameType
proto lmproto.Protocol
frames []wkproto.Frame
frameType wkproto.FrameType
proto wkproto.Protocol
s *Server
cli *client
}
@@ -49,13 +49,13 @@ func (c *limContext) Client() *client {
return c.cli
}
func (c *limContext) Frames() []lmproto.Frame {
func (c *limContext) Frames() []wkproto.Frame {
return c.frames
}
func (c *limContext) GetFrameType() lmproto.FrameType {
func (c *limContext) GetFrameType() wkproto.FrameType {
return c.frameType
}
@@ -79,7 +79,7 @@ func (c *limContext) reset() {
c.frameType = 0
}
func (c *limContext) writePacket(frames ...lmproto.Frame) error {
func (c *limContext) writePacket(frames ...wkproto.Frame) error {
if len(frames) == 0 {
return nil
}

View File

@@ -6,8 +6,8 @@ import (
"sync"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limnet"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wknet"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/panjf2000/gnet/v2"
)
@@ -23,44 +23,44 @@ type NetEventHandler interface {
type Net struct {
s *Server
eventHandler NetEventHandler
engine *limnet.Engine
engine *wknet.Engine
}
func NewNet(s *Server, eventHandler NetEventHandler) *Net {
return &Net{
s: s,
eventHandler: eventHandler,
engine: limnet.NewEngine(),
engine: wknet.NewEngine(),
}
}
func (n *Net) Start() {
// n.s.waitGroupWrapper.Wrap(func() {
// gnet.Run(newGNetEventHandler(n), n.s.opts.Addr, gnet.WithLogLevel(n.s.opts.Logger.Level), gnet.WithMulticore(false), gnet.WithNumEventLoop(1))
// })
n.engine.OnData(func(conn limnet.Conn) error {
buff, _ := conn.Peek(-1)
data, _ := gnetUnpacket(buff)
wrapConn := conn.Context()
if wrapConn == nil {
wrapConn = NewConnWrapLimnet(n.s.GenClientID(), conn)
conn.SetContext(wrapConn)
}
rdata := n.eventHandler.OnPacket(wrapConn.(*ConnWrapLimnet), data)
if len(rdata) == 0 {
conn.Discard(len(data))
} else {
conn.Discard(len(data) - len(rdata))
}
return nil
n.s.waitGroupWrapper.Wrap(func() {
gnet.Run(newGNetEventHandler(n), n.s.opts.Addr, gnet.WithLogLevel(n.s.opts.Logger.Level), gnet.WithMulticore(false), gnet.WithNumEventLoop(1))
})
n.engine.Start()
// n.engine.OnData(func(conn wknet.Conn) error {
// buff, _ := conn.Peek(-1)
// data, _ := gnetUnpacket(buff)
// wrapConn := conn.Context()
// if wrapConn == nil {
// wrapConn = NewConnWrapLimnet(n.s.GenClientID(), conn)
// conn.SetContext(wrapConn)
// }
// rdata := n.eventHandler.OnPacket(wrapConn.(*ConnWrapLimnet), data)
// if len(rdata) == 0 {
// conn.Discard(len(data))
// } else {
// conn.Discard(len(data) - len(rdata))
// }
// return nil
// })
// n.engine.Start()
// addrs := strings.Split(n.s.opts.Addr, "://")
// ln, err := limnet.Listen(addrs[0], addrs[1][2:])
// ln, err := wknet.Listen(addrs[0], addrs[1][2:])
// if err != nil {
// panic(err)
// }
@@ -80,19 +80,19 @@ func (n *Net) Start() {
// for _, conn := range conns {
// action := conn.Action()
// fmt.Println("action---->", action, conn.FD(), "conns---->", len(conns))
// if action.IsSet(limnet.ActionOpen) {
// if action.IsSet(wknet.ActionOpen) {
// cn := NewConnWrapLimnet(n.s.GenClientID(), conn)
// conn.SetContext(cn)
// n.eventHandler.OnConnect(cn)
// action.Clear(limnet.ActionOpen)
// action.Clear(wknet.ActionOpen)
// } else if action.IsSet(limnet.ActionClose) {
// } else if action.IsSet(wknet.ActionClose) {
// conn.Close()
// cn := conn.Context().(*ConnWrapLimnet)
// n.eventHandler.OnClose(cn)
// action.Clear(limnet.ActionClose)
// action.Clear(wknet.ActionClose)
// } else if action.IsSet(limnet.ActionRead) {
// } else if action.IsSet(wknet.ActionRead) {
// cn := conn.Context().(*ConnWrapLimnet)
// var packet [0xFFFF]byte
// num, err := conn.Read(packet[:])
@@ -112,7 +112,7 @@ func (n *Net) Start() {
// if num > 0 {
// n.eventHandler.OnPacket(cn, packet[:num])
// }
// action.Clear(limnet.ActionRead)
// action.Clear(wknet.ActionRead)
// }
// conn.SetAction(action)
@@ -132,13 +132,13 @@ func (n *Net) Stop() {
}
type ConnWrapLimnet struct {
conn limnet.Conn
conn wknet.Conn
id uint32
authed bool
version uint8
}
func NewConnWrapLimnet(id uint32, conn limnet.Conn) *ConnWrapLimnet {
func NewConnWrapLimnet(id uint32, conn wknet.Conn) *ConnWrapLimnet {
return &ConnWrapLimnet{
conn: conn,
id: id,
@@ -150,11 +150,7 @@ func (c *ConnWrapLimnet) GetID() uint32 {
}
func (c *ConnWrapLimnet) Write(buf []byte) (int, error) {
start := time.Now().UnixMilli()
defer func() {
fmt.Println("write time->", time.Now().UnixMilli()-start, "dataSize", len(buf))
}()
return c.conn.Write(buf)
}
func (c *ConnWrapLimnet) Authed() bool {
@@ -217,11 +213,7 @@ func (c *ConnWrapGNet) Write(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}
start := time.Now().UnixMilli()
defer func() {
fmt.Println("write time->", time.Now().UnixMilli()-start, "dataSize", len(buf))
}()
if c.conn != nil {
// TODO作者说 c.conn.Write 是非线程安全的 c.conn.AsyncWrite 是线程安全的 但是c.conn.AsyncWrite压测不理性
// err := c.conn.AsyncWrite(buf, func(c gnet.Conn, err error) error {
@@ -376,8 +368,8 @@ func gnetUnpacket(buff []byte) ([]byte, error) {
for len(buff) > offset {
typeAndFlags := buff[offset]
packetType := lmproto.FrameType(typeAndFlags >> 4)
if packetType == lmproto.PING || packetType == lmproto.PONG {
packetType := wkproto.FrameType(typeAndFlags >> 4)
if packetType == wkproto.PING || packetType == wkproto.PONG {
offset++
continue
}

View File

@@ -7,7 +7,7 @@ import (
"strings"
"time"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/spf13/viper"
"go.uber.org/zap/zapcore"
)
@@ -25,7 +25,7 @@ const (
type Options struct {
NodeID int64 // 节点ID
Proto lmproto.Protocol // 狸猫IM protocol
Proto wkproto.Protocol // 狸猫IM protocol
DataDir string // 数据目录
Version string
HTTPAddr string // http api的监听地址 默认为 0.0.0.0:1516
@@ -53,7 +53,7 @@ type Options struct {
func NewOptions() *Options {
return &Options{
Proto: lmproto.New(),
Proto: wkproto.New(),
HandlePoolSize: 2048,
Version: "5.0.0",
TimingWheelTick: time.Millisecond * 10,
@@ -92,9 +92,9 @@ func (o *Options) configureDataDir() {
}
// 数据目录
if o.NodeID == 0 {
o.DataDir = o.getString("dataDir", filepath.Join(homeDir, "limaodata"))
o.DataDir = o.getString("dataDir", filepath.Join(homeDir, "wukongimdata"))
} else {
o.DataDir = o.getString("dataDir", filepath.Join(homeDir, fmt.Sprintf("limaodata-%d", o.NodeID)))
o.DataDir = o.getString("dataDir", filepath.Join(homeDir, fmt.Sprintf("wukongimdata-%d", o.NodeID)))
}
if strings.TrimSpace(o.DataDir) != "" {
err = os.MkdirAll(o.DataDir, 0755)

View File

@@ -9,7 +9,7 @@ import (
"sync"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -106,7 +106,7 @@ type Outbound struct {
mu sync.Mutex
limlog.Log
wklog.Log
s *Server
}
@@ -117,7 +117,7 @@ func NewOutbound(writer OutBoundWriter, opts *OutboundOptions, s *Server) *Outbo
writer: writer,
opts: opts,
startBufCap: outboundStartBufSize,
Log: limlog.NewLIMLog("Outbound"),
Log: wklog.NewWKLog("Outbound"),
s: s,
}
out.flushCond = sync.NewCond(&(out.mu))

View File

@@ -7,10 +7,10 @@ import (
"sync"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limnet"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wknet"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"go.uber.org/zap"
)
@@ -18,13 +18,13 @@ type Processor struct {
s *Server
connContextPool sync.Pool
frameWorkPool *FrameWorkPool
limlog.Log
wklog.Log
}
func NewProcessor(s *Server) *Processor {
return &Processor{
s: s,
Log: limlog.NewLIMLog("Processor"),
Log: wklog.NewWKLog("Processor"),
frameWorkPool: NewFrameWorkPool(),
connContextPool: sync.Pool{
New: func() any {
@@ -36,7 +36,7 @@ func NewProcessor(s *Server) *Processor {
}
}
func (p *Processor) process(conn limnet.Conn) {
func (p *Processor) process(conn wknet.Conn) {
connCtx := conn.Context().(*connContext)
frames := connCtx.popFrames()
@@ -44,15 +44,17 @@ func (p *Processor) process(conn limnet.Conn) {
}
func (p *Processor) processFrames(conn limnet.Conn, frames []lmproto.Frame) {
func (p *Processor) processFrames(conn wknet.Conn, frames []wkproto.Frame) {
p.sameFrames(frames, func(s, e int, frs []lmproto.Frame) {
// newFs := make([]lmproto.Frame, len(frs))
p.sameFrames(frames, func(s, e int, frs []wkproto.Frame) {
// newFs := make([]wkproto.Frame, len(frs))
// copy(newFs, frs)
p.frameWorkPool.Submit(func() {
p.processSameFrame(conn, frs[0].GetFrameType(), frs, s, e)
})
// go func(s1, e1 int, c limnet.Conn, fs []lmproto.Frame) {
// p.processSameFrame(conn, frs[0].GetFrameType(), frs, s, e)
// go func(s1, e1 int, c wknet.Conn, fs []wkproto.Frame) {
// p.processSameFrame(c, fs[0].GetFrameType(), fs, s1, e1)
// }(s, e, conn, frs)
@@ -60,7 +62,7 @@ func (p *Processor) processFrames(conn limnet.Conn, frames []lmproto.Frame) {
}
func (p *Processor) sameFrames(frames []lmproto.Frame, callback func(s, e int, fs []lmproto.Frame)) {
func (p *Processor) sameFrames(frames []wkproto.Frame, callback func(s, e int, fs []wkproto.Frame)) {
for i := 0; i < len(frames); {
frame := frames[i]
start := i
@@ -78,27 +80,27 @@ func (p *Processor) sameFrames(frames []lmproto.Frame, callback func(s, e int, f
}
}
func (p *Processor) processSameFrame(conn limnet.Conn, frameType lmproto.FrameType, frames []lmproto.Frame, s, e int) {
func (p *Processor) processSameFrame(conn wknet.Conn, frameType wkproto.FrameType, frames []wkproto.Frame, s, e int) {
switch frameType {
case lmproto.PING: // ping
p.processPing(conn, frames[0].(*lmproto.PingPacket))
case lmproto.SEND: // process send
case wkproto.PING: // ping
p.processPing(conn, frames[0].(*wkproto.PingPacket))
case wkproto.SEND: // process send
// TODO: tmpFrames need optimize
tmpFrames := make([]*lmproto.SendPacket, 0, len(frames))
tmpFrames := make([]*wkproto.SendPacket, 0, len(frames))
for _, frame := range frames {
tmpFrames = append(tmpFrames, frame.(*lmproto.SendPacket))
tmpFrames = append(tmpFrames, frame.(*wkproto.SendPacket))
}
p.processMsgs(conn, tmpFrames)
case lmproto.SENDACK: // process sendack
tmpFrames := make([]*lmproto.SendackPacket, 0, len(frames))
case wkproto.SENDACK: // process sendack
tmpFrames := make([]*wkproto.SendackPacket, 0, len(frames))
for _, frame := range frames {
tmpFrames = append(tmpFrames, frame.(*lmproto.SendackPacket))
tmpFrames = append(tmpFrames, frame.(*wkproto.SendackPacket))
}
p.processMsgAcks(conn, tmpFrames)
case lmproto.RECVACK: // process recvack
tmpFrames := make([]*lmproto.RecvackPacket, 0, len(frames))
case wkproto.RECVACK: // process recvack
tmpFrames := make([]*wkproto.RecvackPacket, 0, len(frames))
for _, frame := range frames {
tmpFrames = append(tmpFrames, frame.(*lmproto.RecvackPacket))
tmpFrames = append(tmpFrames, frame.(*wkproto.RecvackPacket))
}
p.processRecvacks(conn, tmpFrames)
}
@@ -106,7 +108,7 @@ func (p *Processor) processSameFrame(conn limnet.Conn, frameType lmproto.FrameTy
}
// #################### conn auth ####################
func (p *Processor) processAuth(conn limnet.Conn, connectPacket *lmproto.ConnectPacket) {
func (p *Processor) processAuth(conn wknet.Conn, connectPacket *wkproto.ConnectPacket) {
fmt.Println("#########processAuth##########")
connCtx := p.connContextPool.Get().(*connContext)
connCtx.init()
@@ -119,7 +121,7 @@ func (p *Processor) processAuth(conn limnet.Conn, connectPacket *lmproto.Connect
}
conn.SetProtoVersion(int(connectPacket.Version))
dhServerPrivKey, dhServerPublicKey := limutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
dhServerPrivKey, dhServerPublicKey := wkutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
aesKey, aesIV, err := p.getClientAesKeyAndIV(connectPacket.ClientKey, dhServerPrivKey)
if err != nil {
p.Error("获取客户端的aesKey和aesIV失败", zap.Error(err))
@@ -139,28 +141,28 @@ func (p *Processor) processAuth(conn limnet.Conn, connectPacket *lmproto.Connect
// p.s.connManager.Add(conn)
p.response(conn, &lmproto.ConnackPacket{
p.response(conn, &wkproto.ConnackPacket{
Salt: aesIV,
ServerKey: dhServerPublicKeyEnc,
ReasonCode: lmproto.ReasonSuccess,
ReasonCode: wkproto.ReasonSuccess,
TimeDiff: timeDiff,
})
}
// #################### ping ####################
func (p *Processor) processPing(conn limnet.Conn, pingPacket *lmproto.PingPacket) {
func (p *Processor) processPing(conn wknet.Conn, pingPacket *wkproto.PingPacket) {
fmt.Println("ping--->", conn.Fd(), conn.UID())
p.response(conn, &lmproto.PongPacket{})
p.response(conn, &wkproto.PongPacket{})
}
// #################### messages ####################
func (p *Processor) processMsgs(conn limnet.Conn, msgs []*lmproto.SendPacket) {
func (p *Processor) processMsgs(conn wknet.Conn, msgs []*wkproto.SendPacket) {
sendackPackets := make([]lmproto.Frame, 0, len(msgs))
sendackPackets := make([]wkproto.Frame, 0, len(msgs))
for _, msg := range msgs {
sendackPackets = append(sendackPackets, &lmproto.SendackPacket{
ReasonCode: lmproto.ReasonSuccess,
sendackPackets = append(sendackPackets, &wkproto.SendackPacket{
ReasonCode: wkproto.ReasonSuccess,
ClientSeq: msg.ClientSeq,
ClientMsgNo: msg.ClientMsgNo,
})
@@ -169,17 +171,17 @@ func (p *Processor) processMsgs(conn limnet.Conn, msgs []*lmproto.SendPacket) {
}
// #################### message ack ####################
func (p *Processor) processMsgAcks(conn limnet.Conn, acks []*lmproto.SendackPacket) {
func (p *Processor) processMsgAcks(conn wknet.Conn, acks []*wkproto.SendackPacket) {
}
// #################### recv ack ####################
func (p *Processor) processRecvacks(conn limnet.Conn, acks []*lmproto.RecvackPacket) {
func (p *Processor) processRecvacks(conn wknet.Conn, acks []*wkproto.RecvackPacket) {
}
// #################### process conn close ####################
func (p *Processor) processClose(conn limnet.Conn, err error) {
func (p *Processor) processClose(conn wknet.Conn, err error) {
p.Debug("conn is close", zap.Error(err), zap.Any("conn", conn))
if conn.Context() != nil {
connCtx := conn.Context().(*connContext)
@@ -190,17 +192,17 @@ func (p *Processor) processClose(conn limnet.Conn, err error) {
// #################### others ####################
func (p *Processor) response(conn limnet.Conn, frames ...lmproto.Frame) {
func (p *Processor) response(conn wknet.Conn, frames ...wkproto.Frame) {
p.s.dispatch.dataOut(conn, frames...)
}
func (p *Processor) responseConnackAuthFail(c limnet.Conn) {
p.responseConnack(c, 0, lmproto.ReasonAuthFail)
func (p *Processor) responseConnackAuthFail(c wknet.Conn) {
p.responseConnack(c, 0, wkproto.ReasonAuthFail)
}
func (p *Processor) responseConnack(c limnet.Conn, timeDiff int64, code lmproto.ReasonCode) {
func (p *Processor) responseConnack(c wknet.Conn, timeDiff int64, code wkproto.ReasonCode) {
p.response(c, &lmproto.ConnackPacket{
p.response(c, &wkproto.ConnackPacket{
ReasonCode: code,
TimeDiff: timeDiff,
})
@@ -219,9 +221,9 @@ func (p *Processor) getClientAesKeyAndIV(clientKey string, dhServerPrivKey [32]b
copy(dhClientPubKeyArray[:], clientKeyBytes[:32])
// 获得DH的共享key
shareKey := limutil.GetCurve25519Key(dhServerPrivKey, dhClientPubKeyArray) // 共享key
shareKey := wkutil.GetCurve25519Key(dhServerPrivKey, dhClientPubKeyArray) // 共享key
aesIV := limutil.GetRandomString(16)
aesKey := limutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
aesIV := wkutil.GetRandomString(16)
aesKey := wkutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
return aesKey, aesIV, nil
}

View File

@@ -27,16 +27,16 @@ func TestSameFrames(t *testing.T) {
kk(dd)
// p := &Processor{}
// p.sameFrames([]lmproto.Frame{
// &lmproto.SendPacket{},
// &lmproto.SendPacket{},
// &lmproto.SendPacket{},
// &lmproto.SendPacket{},
// &lmproto.SendPacket{},
// &lmproto.PingPacket{},
// &lmproto.SendPacket{},
// &lmproto.SendPacket{},
// }, func(fs []lmproto.Frame) {
// p.sameFrames([]wkproto.Frame{
// &wkproto.SendPacket{},
// &wkproto.SendPacket{},
// &wkproto.SendPacket{},
// &wkproto.SendPacket{},
// &wkproto.SendPacket{},
// &wkproto.PingPacket{},
// &wkproto.SendPacket{},
// &wkproto.SendPacket{},
// }, func(fs []wkproto.Frame) {
// fmt.Println("fs--->", fs)
// })
}

View File

@@ -13,9 +13,9 @@ import (
"go.uber.org/atomic"
"github.com/RussellLuo/timingwheel"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"github.com/judwhite/go-svc"
"github.com/panjf2000/ants/v2"
)
@@ -33,30 +33,30 @@ type Server struct {
contextPool sync.Pool
opts *Options
limlog.Log
packetHandler *PacketHandler // 包处理器
handleGoroutinePool *ants.Pool // 处理逻辑的池
waitGroupWrapper *limutil.WaitGroupWrapper // 协程组
clientIDGen atomic.Uint32 // 客户端ID生成
net *Net // 长连接服务网络
clientManager *ClientManager // 客户端管理者
apiServer *APIServer // api服务
start time.Time // 服务开始时间
timingWheel *timingwheel.TimingWheel // Time wheel delay task
wklog.Log
packetHandler *PacketHandler // 包处理器
handleGoroutinePool *ants.Pool // 处理逻辑的池
waitGroupWrapper *wkutil.WaitGroupWrapper // 协程组
clientIDGen atomic.Uint32 // 客户端ID生成
net *Net // 长连接服务网络
clientManager *ClientManager // 客户端管理者
apiServer *APIServer // api服务
start time.Time // 服务开始时间
timingWheel *timingwheel.TimingWheel // Time wheel delay task
deliveryManager *DeliveryManager
dispatch *Dispatch
connManager *ConnManager
// inboundManager *InboundManager
inboundManager *InboundManager
}
func New(opts *Options) *Server {
now := time.Now().UTC()
s := &Server{
opts: opts,
Log: limlog.NewLIMLog("Server"),
waitGroupWrapper: limutil.NewWaitGroupWrapper("Server"),
Log: wklog.NewWKLog("Server"),
waitGroupWrapper: wkutil.NewWaitGroupWrapper("Server"),
timingWheel: timingwheel.NewTimingWheel(opts.TimingWheelTick, opts.TimingWheelSize),
start: now,
}
@@ -69,6 +69,7 @@ func New(opts *Options) *Server {
s.clientManager = NewClientManager(s)
s.packetHandler = NewPacketHandler(s)
// s.inboundManager = NewInboundManager(s)
// s.net = NewNet(s, s)
s.deliveryManager = NewDeliveryManager(s)
s.dispatch = NewDispatch(s)
s.connManager = NewConnManager()
@@ -152,6 +153,7 @@ func (s *Server) Stop() error {
defer s.Info("Server is exited")
s.net.Stop()
s.dispatch.Stop()
// s.inboundManager.Stop()
s.apiServer.Stop()
return nil
@@ -164,13 +166,13 @@ func (s *Server) Schedule(interval time.Duration, f func()) *timingwheel.Timer {
}, f)
}
func (s *Server) handleFrames(frames []lmproto.Frame, cli *client) {
frameMap := make(map[lmproto.FrameType][]lmproto.Frame, len(frames)) // TODO: 这里map会增加gc负担可优化
func (s *Server) handleFrames(frames []wkproto.Frame, cli *client) {
frameMap := make(map[wkproto.FrameType][]wkproto.Frame, len(frames)) // TODO: 这里map会增加gc负担可优化
for i := 0; i < len(frames); i++ {
f := frames[i]
frameList := frameMap[f.GetFrameType()]
if frameList == nil {
frameList = make([]lmproto.Frame, 0)
frameList = make([]wkproto.Frame, 0)
}
frameList = append(frameList, f)
frameMap[f.GetFrameType()] = frameList
@@ -202,11 +204,11 @@ func (s *Server) handleContext(ctx *limContext) {
}
switch packetType {
case lmproto.PING:
case wkproto.PING:
s.packetHandler.handlePing(ctx)
case lmproto.SEND: // 客户端发送消息包
case wkproto.SEND: // 客户端发送消息包
s.packetHandler.handleSend(ctx)
case lmproto.SENDACK: // 客户端收到消息回执
case wkproto.SENDACK: // 客户端收到消息回执
s.packetHandler.handleRecvack(ctx)
}
// ########## 放回对象池 ##########
@@ -230,7 +232,7 @@ func (s *Server) generateManagerTokenIfNeed() (string, bool, error) {
}
var token string
if strings.TrimSpace(string(tokenBytes)) == "" {
token = limutil.GenUUID()
token = wkutil.GenUUID()
_, err := file.WriteString(token)
if err != nil {
return "", false, err

View File

@@ -2,9 +2,8 @@ package server
import (
"fmt"
"time"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"go.uber.org/zap"
)
@@ -19,15 +18,15 @@ func (s *Server) OnPacket(c conn, data []byte) []byte {
cli := s.clientManager.Get(c.GetID())
if cli != nil {
start := time.Now().UnixMilli()
// start := time.Now().UnixMilli()
cli.InboundAppend(data)
fmt.Println("InboundAppend---->", time.Now().UnixMilli()-start, "dataSize", len(data))
// fmt.Println("InboundAppend---->", time.Now().UnixMilli()-start, "dataSize", len(data))
// slot := s.inboundManager.AddClientID(cli.ID())
// s.inboundManager.Notify(slot)
return nil
}
packet, size, err := s.opts.Proto.DecodeFrame(data, lmproto.LatestVersion)
packet, size, err := s.opts.Proto.DecodeFrame(data, wkproto.LatestVersion)
if err != nil {
fmt.Println("decode err---->", err)
s.Warn("Failed to decode the message", zap.Error(err))
@@ -35,20 +34,20 @@ func (s *Server) OnPacket(c conn, data []byte) []byte {
return nil
}
fmt.Println("packet----->", packet.GetFrameType())
if packet.GetFrameType() != lmproto.CONNECT {
if packet.GetFrameType() != wkproto.CONNECT {
s.Warn("请先进行连接!")
c.Close()
return nil
}
s.packetHandler.handleConnect2(c, packet.(*lmproto.ConnectPacket))
s.packetHandler.handleConnect2(c, packet.(*wkproto.ConnectPacket))
return data[size:]
// // 处理包
// offset := 0
// for len(data) > offset {
// packet, size, err := s.opts.Proto.DecodePacket(data[offset:], lmproto.LatestVersion)
// packet, size, err := s.opts.Proto.DecodePacket(data[offset:], wkproto.LatestVersion)
// if err != nil { //
// s.Warn("Failed to decode the message", zap.Error(err))
// c.Close()
@@ -57,7 +56,7 @@ func (s *Server) OnPacket(c conn, data []byte) []byte {
// s.handlePacket(c, packet, size) // 处理包
// offset += size
// if !c.Authed() && packet.GetPacketType() != lmproto.CONNECT {
// if !c.Authed() && packet.GetPacketType() != wkproto.CONNECT {
// s.Warn("请先进行连接!")
// c.Close()
// return

View File

@@ -1,29 +1,29 @@
package server
import (
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/lmhttp"
"github.com/WuKongIM/WuKongIM/pkg/wkhttp"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"go.uber.org/zap"
)
// APIServer ApiServer
type APIServer struct {
r *lmhttp.LMHttp
r *wkhttp.WKHttp
addr string
s *Server
limlog.Log
wklog.Log
}
// NewAPIServer new一个api server
func NewAPIServer(s *Server) *APIServer {
r := lmhttp.New()
r.Use(lmhttp.CORSMiddleware())
r := wkhttp.New()
r.Use(wkhttp.CORSMiddleware())
hs := &APIServer{
r: r,
addr: s.opts.HTTPAddr,
s: s,
Log: limlog.NewLIMLog("APIServer"),
Log: wklog.NewWKLog("APIServer"),
}
return hs
}

View File

@@ -5,8 +5,8 @@ import (
"testing"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zapcore"
)
@@ -16,11 +16,11 @@ const RunTimes = 1e6
func connectTest(t *testing.T) (*TestConn, *Server) {
var clientPubKey [32]byte
_, clientPubKey = limutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
_, clientPubKey = wkutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
s := New(NewTestOptions())
data, err := s.opts.Proto.EncodeFrame(&lmproto.ConnectPacket{
Version: lmproto.LatestVersion,
data, err := s.opts.Proto.EncodeFrame(&wkproto.ConnectPacket{
Version: wkproto.LatestVersion,
ClientKey: base64.StdEncoding.EncodeToString(clientPubKey[:]),
ClientTimestamp: time.Now().Unix(),
UID: "test",
@@ -34,10 +34,10 @@ func connectTest(t *testing.T) (*TestConn, *Server) {
result := <-c.WriteChan()
frame, _, err := s.opts.Proto.DecodeFrame(result, lmproto.LatestVersion)
frame, _, err := s.opts.Proto.DecodeFrame(result, wkproto.LatestVersion)
assert.NoError(t, err)
assert.Equal(t, frame.(*lmproto.ConnackPacket).ReasonCode, lmproto.ReasonSuccess)
assert.Equal(t, frame.(*wkproto.ConnackPacket).ReasonCode, wkproto.ReasonSuccess)
return c, s
}
@@ -51,7 +51,7 @@ func TestHandleSend(t *testing.T) {
c.SetAuthed(true)
s := New(NewTestOptions(zapcore.DebugLevel))
data, err := s.opts.Proto.EncodeFrame(&lmproto.SendPacket{
data, err := s.opts.Proto.EncodeFrame(&wkproto.SendPacket{
ClientMsgNo: "123",
ClientSeq: 1,
ChannelID: "test",
@@ -62,10 +62,10 @@ func TestHandleSend(t *testing.T) {
s.OnPacket(c, data)
result := <-c.WriteChan()
frame, _, err := s.opts.Proto.DecodeFrame(result, lmproto.LatestVersion)
frame, _, err := s.opts.Proto.DecodeFrame(result, wkproto.LatestVersion)
assert.NoError(t, err)
assert.Equal(t, frame.(*lmproto.SendackPacket).ReasonCode, lmproto.ReasonSuccess)
assert.Equal(t, frame.(*wkproto.SendackPacket).ReasonCode, wkproto.ReasonSuccess)
}
@@ -78,20 +78,20 @@ func BenchmarkHandleSend(b *testing.B) {
for j := 0; j < RunTimes; j++ {
c := NewTestConn()
c.SetAuthed(true)
data, err := s.opts.Proto.EncodeFrame(&lmproto.SendPacket{
data, err := s.opts.Proto.EncodeFrame(&wkproto.SendPacket{
ClientMsgNo: "123",
ClientSeq: 1,
ChannelID: "test",
ChannelType: 1,
Payload: []byte("hello"),
}, lmproto.LatestVersion)
}, wkproto.LatestVersion)
assert.NoError(b, err)
s.OnPacket(c, data)
result := <-c.WriteChan()
frame, _, err := s.opts.Proto.DecodeFrame(result, lmproto.LatestVersion)
frame, _, err := s.opts.Proto.DecodeFrame(result, wkproto.LatestVersion)
assert.NoError(b, err)
assert.Equal(b, frame.(*lmproto.SendackPacket).ReasonCode, lmproto.ReasonSuccess)
assert.Equal(b, frame.(*wkproto.SendackPacket).ReasonCode, wkproto.ReasonSuccess)
}
}
}

View File

@@ -5,7 +5,7 @@ import (
"net"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@@ -14,7 +14,7 @@ func NewTestOptions(logLevel ...zapcore.Level) *Options {
opt := NewOptions()
opt.UnitTest = true
opts := limlog.NewOptions()
opts := wklog.NewOptions()
if len(logLevel) > 0 {
opts.Level = logLevel[0]
} else {
@@ -22,7 +22,7 @@ func NewTestOptions(logLevel ...zapcore.Level) *Options {
}
opts.LogDir, _ = ioutil.TempDir("", "limlog")
limlog.Configure(opts)
wklog.Configure(opts)
return opt
}

View File

@@ -12,20 +12,20 @@ import (
"sync"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)
// OnRecv 收到消息事件
type OnRecv func(recv *lmproto.RecvPacket) error
type OnSendack func(sendackPacket *lmproto.SendackPacket)
type OnRecv func(recv *wkproto.RecvPacket) error
type OnSendack func(sendackPacket *wkproto.SendackPacket)
type Client struct {
Statistics
limlog.Log
wklog.Log
aesKey string // aes密钥
salt string // 安全码
@@ -53,7 +53,7 @@ type Client struct {
wg sync.WaitGroup
mu sync.RWMutex
proto *lmproto.LiMaoProto
proto *wkproto.WKProto
pongs []chan struct{}
onRecv OnRecv
@@ -74,8 +74,8 @@ func New(addr string, opt ...Option) *Client {
c := &Client{
addr: addr,
opts: opts,
proto: lmproto.New(),
Log: limlog.NewLIMLog(fmt.Sprintf("IMClient[%s]", opts.UID)),
proto: wkproto.New(),
Log: wklog.NewWKLog(fmt.Sprintf("IMClient[%s]", opts.UID)),
writer: &limWriter{
limit: opts.DefaultBufSize,
plimit: opts.ReconnectBufSize,
@@ -433,7 +433,7 @@ func (c *Client) stopPingTimer() {
func (c *Client) sendPing(ch chan struct{}) error {
c.pongs = append(c.pongs, ch)
data, err := c.proto.EncodeFrame(&lmproto.PingPacket{}, c.opts.ProtoVersion)
data, err := c.proto.EncodeFrame(&wkproto.PingPacket{}, c.opts.ProtoVersion)
if err != nil {
return err
}
@@ -488,7 +488,7 @@ func (c *Client) readLoop() {
}
func (c *Client) handlePacketDatas(packetData []byte) error {
packets := make([]lmproto.Frame, 0)
packets := make([]wkproto.Frame, 0)
offset := 0
var err error
for len(packetData) > offset {
@@ -511,35 +511,35 @@ func (c *Client) handlePacketDatas(packetData []byte) error {
return nil
}
func (c *Client) handlePacket(frame lmproto.Frame) error {
func (c *Client) handlePacket(frame wkproto.Frame) error {
c.InBytes.Add(uint64(frame.GetFrameSize()))
c.InMsgs.Inc()
switch frame.GetFrameType() {
case lmproto.SENDACK: // 发送回执
c.handleSendackPacket(frame.(*lmproto.SendackPacket))
case lmproto.RECV: // 收到消息
c.handleRecvPacket(frame.(*lmproto.RecvPacket))
case lmproto.PONG: // pong
case wkproto.SENDACK: // 发送回执
c.handleSendackPacket(frame.(*wkproto.SendackPacket))
case wkproto.RECV: // 收到消息
c.handleRecvPacket(frame.(*wkproto.RecvPacket))
case wkproto.PONG: // pong
c.handlePong()
}
return nil
}
func (c *Client) handleSendackPacket(packet *lmproto.SendackPacket) {
func (c *Client) handleSendackPacket(packet *wkproto.SendackPacket) {
if c.onSendack != nil {
c.onSendack(packet)
}
}
// 处理接受包
func (c *Client) handleRecvPacket(packet *lmproto.RecvPacket) {
func (c *Client) handleRecvPacket(packet *wkproto.RecvPacket) {
var err error
var payload []byte
if c.onRecv != nil {
if !packet.Setting.IsSet(lmproto.SettingNoEncrypt) {
payload, err = limutil.AesDecryptPkcs7Base64(packet.Payload, []byte(c.aesKey), []byte(c.salt))
if !packet.Setting.IsSet(wkproto.SettingNoEncrypt) {
payload, err = wkutil.AesDecryptPkcs7Base64(packet.Payload, []byte(c.aesKey), []byte(c.salt))
if err != nil {
panic(err)
}
@@ -548,7 +548,7 @@ func (c *Client) handleRecvPacket(packet *lmproto.RecvPacket) {
err = c.onRecv(packet)
}
if err == nil {
c.sendPacket(&lmproto.RecvackPacket{
c.sendPacket(&wkproto.RecvackPacket{
Framer: packet.Framer,
MessageID: packet.MessageID,
MessageSeq: packet.MessageSeq,
@@ -578,26 +578,26 @@ func (c *Client) SendMessage(channel *Channel, payload []byte, opt ...SendOption
}
}
var err error
var setting lmproto.Setting
var setting wkproto.Setting
newPayload := payload
if !opts.NoEncrypt {
// 加密消息内容
newPayload, err = limutil.AesEncryptPkcs7Base64(payload, []byte(c.aesKey), []byte(c.salt))
newPayload, err = wkutil.AesEncryptPkcs7Base64(payload, []byte(c.aesKey), []byte(c.salt))
if err != nil {
c.Error("加密消息payload失败", zap.Error(err))
return err
}
} else {
setting.Set(lmproto.SettingNoEncrypt)
setting.Set(wkproto.SettingNoEncrypt)
}
clientMsgNo := opts.ClientMsgNo
if clientMsgNo == "" {
// clientMsgNo = limutil.GenUUID() // TODO: uuid生成非常耗性能
// clientMsgNo = wkutil.GenUUID() // TODO: uuid生成非常耗性能
}
clientSeq := c.clientIDGen.Add(1)
packet := &lmproto.SendPacket{
Framer: lmproto.Framer{
packet := &wkproto.SendPacket{
Framer: wkproto.Framer{
NoPersist: opts.NoPersist,
SyncOnce: opts.SyncOnce,
RedDot: opts.RedDot,
@@ -614,12 +614,12 @@ func (c *Client) SendMessage(channel *Channel, payload []byte, opt ...SendOption
// 加密消息通道
if !opts.NoEncrypt {
signStr := packet.VerityString()
actMsgKey, err := limutil.AesEncryptPkcs7Base64([]byte(signStr), []byte(c.aesKey), []byte(c.salt))
actMsgKey, err := wkutil.AesEncryptPkcs7Base64([]byte(signStr), []byte(c.aesKey), []byte(c.salt))
if err != nil {
c.Error("加密数据失败!", zap.Error(err))
return err
}
packet.MsgKey = limutil.MD5(string(actMsgKey))
packet.MsgKey = wkutil.MD5(string(actMsgKey))
}
return c.appendPacket(packet)
}
@@ -662,11 +662,11 @@ func (c *Client) flusher() {
func (c *Client) sendConnect() error {
var clientPubKey [32]byte
c.clientPrivKey, clientPubKey = limutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
packet := &lmproto.ConnectPacket{
c.clientPrivKey, clientPubKey = wkutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
packet := &wkproto.ConnectPacket{
Version: c.opts.ProtoVersion,
DeviceID: limutil.GenUUID(),
DeviceFlag: lmproto.APP,
DeviceID: wkutil.GenUUID(),
DeviceFlag: wkproto.APP,
ClientKey: base64.StdEncoding.EncodeToString(clientPubKey[:]),
ClientTimestamp: time.Now().Unix(),
UID: c.opts.UID,
@@ -680,11 +680,11 @@ func (c *Client) sendConnect() error {
if err != nil {
return err
}
connack, ok := f.(*lmproto.ConnackPacket)
connack, ok := f.(*wkproto.ConnackPacket)
if !ok {
return errors.New("返回包类型有误!不是连接回执包!")
}
if connack.ReasonCode != lmproto.ReasonSuccess {
if connack.ReasonCode != wkproto.ReasonSuccess {
return errors.New("连接失败!")
}
c.salt = connack.Salt
@@ -696,14 +696,14 @@ func (c *Client) sendConnect() error {
var serverPubKey [32]byte
copy(serverPubKey[:], serverKey[:32])
shareKey := limutil.GetCurve25519Key(c.clientPrivKey, serverPubKey) // 共享key
c.aesKey = limutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
shareKey := wkutil.GetCurve25519Key(c.clientPrivKey, serverPubKey) // 共享key
c.aesKey = wkutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
return nil
}
// 发送包
func (c *Client) sendPacket(packet lmproto.Frame) error {
func (c *Client) sendPacket(packet wkproto.Frame) error {
data, err := c.proto.EncodeFrame(packet, c.opts.ProtoVersion)
if err != nil {
return err
@@ -717,7 +717,7 @@ func (c *Client) sendPacket(packet lmproto.Frame) error {
return nil
}
func (c *Client) appendPacket(packet lmproto.Frame) error {
func (c *Client) appendPacket(packet wkproto.Frame) error {
data, err := c.proto.EncodeFrame(packet, c.opts.ProtoVersion)
if err != nil {

View File

@@ -11,9 +11,9 @@ import (
"sync"
"time"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/limutil"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
"github.com/WuKongIM/WuKongIM/pkg/wait"
"github.com/nats-io/nuid"
"go.uber.org/atomic"
@@ -21,10 +21,10 @@ import (
)
// OnRecv 收到消息事件
type OnRecv func(recv *lmproto.RecvPacket) error
type OnRecv func(recv *wkproto.RecvPacket) error
// OnSendack 发送消息回执
type OnSendack func(sendack *lmproto.SendackPacket)
type OnSendack func(sendack *wkproto.SendackPacket)
// OnClose 连接关闭
type OnClose func()
@@ -32,9 +32,9 @@ type OnClose func()
// Client 狸猫客户端
type Client struct {
opts *Options // 狸猫IM配置
sending []*lmproto.SendPacket // 发送中的包
sending []*wkproto.SendPacket // 发送中的包
sendingLock sync.RWMutex
proto *lmproto.LiMaoProto
proto *wkproto.WKProto
addr string // 连接地址
connected atomic.Bool // 是否已连接
conn net.Conn
@@ -46,7 +46,7 @@ type Client struct {
onClose OnClose
onSendack OnSendack
clientPrivKey [32]byte
limlog.Log
wklog.Log
aesKey string // aes密钥
salt string // 安全码
wait wait.Wait
@@ -72,11 +72,11 @@ func New(addr string, opts ...Option) *Client {
c := &Client{
opts: defaultOpts,
addr: addr,
sending: make([]*lmproto.SendPacket, 0),
proto: lmproto.New(),
sending: make([]*wkproto.SendPacket, 0),
proto: wkproto.New(),
heartbeatTicker: time.NewTicker(time.Second * 20),
stopHeartbeatChan: make(chan bool),
Log: limlog.NewLIMLog(fmt.Sprintf("IMClient[%s]", defaultOpts.UID)),
Log: wklog.NewWKLog(fmt.Sprintf("IMClient[%s]", defaultOpts.UID)),
wait: wait.New(),
disconnectChan: make(chan struct{}),
writerBytesChan: make(chan []byte, 100),
@@ -120,11 +120,11 @@ func (c *Client) onlyConnect() error {
if err != nil {
return err
}
connack, ok := f.(*lmproto.ConnackPacket)
connack, ok := f.(*wkproto.ConnackPacket)
if !ok {
return errors.New("返回包类型有误!不是连接回执包!")
}
if connack.ReasonCode != lmproto.ReasonSuccess {
if connack.ReasonCode != wkproto.ReasonSuccess {
return errors.New("连接失败!")
}
@@ -137,8 +137,8 @@ func (c *Client) onlyConnect() error {
var serverPubKey [32]byte
copy(serverPubKey[:], serverKey[:32])
shareKey := limutil.GetCurve25519Key(c.clientPrivKey, serverPubKey) // 共享key
c.aesKey = limutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
shareKey := wkutil.GetCurve25519Key(c.clientPrivKey, serverPubKey) // 共享key
c.aesKey = wkutil.MD5(base64.StdEncoding.EncodeToString(shareKey[:]))[:16]
c.connected.Store(true)
if len(c.sending) > 0 {
@@ -174,17 +174,17 @@ func (c *Client) SendMessage(channel *Channel, payload []byte, opt ...SendOption
}
}
var err error
var setting lmproto.Setting
var setting wkproto.Setting
newPayload := payload
if !opts.NoEncrypt {
// 加密消息内容
newPayload, err = limutil.AesEncryptPkcs7Base64(payload, []byte(c.aesKey), []byte(c.salt))
newPayload, err = wkutil.AesEncryptPkcs7Base64(payload, []byte(c.aesKey), []byte(c.salt))
if err != nil {
c.Error("加密消息payload失败", zap.Error(err))
return err
}
} else {
setting = setting.Set(lmproto.SettingNoEncrypt)
setting = setting.Set(wkproto.SettingNoEncrypt)
}
clientMsgNo := opts.ClientMsgNo
@@ -192,8 +192,8 @@ func (c *Client) SendMessage(channel *Channel, payload []byte, opt ...SendOption
clientMsgNo = nuid.Next()
}
packet := &lmproto.SendPacket{
Framer: lmproto.Framer{
packet := &wkproto.SendPacket{
Framer: wkproto.Framer{
NoPersist: opts.NoPersist,
SyncOnce: opts.SyncOnce,
RedDot: opts.RedDot,
@@ -210,12 +210,12 @@ func (c *Client) SendMessage(channel *Channel, payload []byte, opt ...SendOption
// 加密消息通道
if !opts.NoEncrypt {
signStr := packet.VerityString()
actMsgKey, err := limutil.AesEncryptPkcs7Base64([]byte(signStr), []byte(c.aesKey), []byte(c.salt))
actMsgKey, err := wkutil.AesEncryptPkcs7Base64([]byte(signStr), []byte(c.aesKey), []byte(c.salt))
if err != nil {
c.Error("加密数据失败!", zap.Error(err))
return err
}
packet.MsgKey = limutil.MD5(string(actMsgKey))
packet.MsgKey = wkutil.MD5(string(actMsgKey))
}
data, err := c.proto.EncodePacket(packet, c.opts.ProtoVersion)
@@ -240,17 +240,17 @@ func (c *Client) Flush() error {
}
// SendMessageSync 同步发送
func (c *Client) SendMessageSync(ctx context.Context, channel *Channel, payload []byte) (*lmproto.SendackPacket, error) {
func (c *Client) SendMessageSync(ctx context.Context, channel *Channel, payload []byte) (*wkproto.SendackPacket, error) {
// 加密消息内容
encPayload, err := limutil.AesEncryptPkcs7Base64(payload, []byte(c.aesKey), []byte(c.salt))
encPayload, err := wkutil.AesEncryptPkcs7Base64(payload, []byte(c.aesKey), []byte(c.salt))
if err != nil {
c.Error("加密消息payload失败", zap.Error(err))
return nil, err
}
packet := &lmproto.SendPacket{
packet := &wkproto.SendPacket{
ClientSeq: c.clientIDGen.Add(1),
ClientMsgNo: limutil.GenerUUID(),
ClientMsgNo: wkutil.GenerUUID(),
ChannelID: channel.ChannelID,
ChannelType: channel.ChannelType,
Payload: encPayload,
@@ -259,12 +259,12 @@ func (c *Client) SendMessageSync(ctx context.Context, channel *Channel, payload
// 加密消息通道
signStr := packet.VerityString()
actMsgKey, err := limutil.AesEncryptPkcs7Base64([]byte(signStr), []byte(c.aesKey), []byte(c.salt))
actMsgKey, err := wkutil.AesEncryptPkcs7Base64([]byte(signStr), []byte(c.aesKey), []byte(c.salt))
if err != nil {
c.Error("加密数据失败!", zap.Error(err))
return nil, err
}
packet.MsgKey = limutil.MD5(string(actMsgKey))
packet.MsgKey = wkutil.MD5(string(actMsgKey))
resultChan := c.wait.Register(packet.ClientSeq)
err = c.sendPacket(packet)
@@ -274,7 +274,7 @@ func (c *Client) SendMessageSync(ctx context.Context, channel *Channel, payload
}
select {
case result := <-resultChan:
return result.(*lmproto.SendackPacket), nil
return result.(*wkproto.SendackPacket), nil
case <-c.disconnectChan:
return nil, nil
case <-ctx.Done():
@@ -318,14 +318,14 @@ exit:
}
func (c *Client) ping() {
err := c.sendPacket(&lmproto.PingPacket{})
err := c.sendPacket(&wkproto.PingPacket{})
if err != nil {
c.Warn("Ping发送失败", zap.Error(err))
}
}
// 发送包
func (c *Client) sendPacket(packet lmproto.Frame) error {
func (c *Client) sendPacket(packet wkproto.Frame) error {
data, err := c.proto.EncodePacket(packet, c.opts.ProtoVersion)
if err != nil {
return err
@@ -343,10 +343,10 @@ func (c *Client) sendPacket(packet lmproto.Frame) error {
func (c *Client) sendConnect() error {
var clientPubKey [32]byte
c.clientPrivKey, clientPubKey = limutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
packet := &lmproto.ConnectPacket{
c.clientPrivKey, clientPubKey = wkutil.GetCurve25519KeypPair() // 生成服务器的DH密钥对
packet := &wkproto.ConnectPacket{
Version: c.opts.ProtoVersion,
DeviceFlag: lmproto.APP,
DeviceFlag: wkproto.APP,
ClientKey: base64.StdEncoding.EncodeToString(clientPubKey[:]),
ClientTimestamp: time.Now().Unix(),
UID: c.opts.UID,
@@ -355,7 +355,7 @@ func (c *Client) sendConnect() error {
return c.sendPacket(packet)
}
// func (c *Client) sendPacketNoFlush(packet lmproto.Frame) error {
// func (c *Client) sendPacketNoFlush(packet wkproto.Frame) error {
// data, err := c.proto.EncodePacket(packet, c.opts.ProtoVersion)
// if err != nil {
// return err
@@ -421,19 +421,19 @@ func (c *Client) loopWrite() {
}
func (c *Client) handlePacket(frame lmproto.Frame) {
func (c *Client) handlePacket(frame wkproto.Frame) {
c.InBytes.Add(uint64(frame.GetFrameSize()))
c.InMsgs.Inc()
c.retryPingCount = 0 // 只要收到消息则重置ping次数
switch frame.GetPacketType() {
case lmproto.SENDACK: // 发送回执
c.handleSendackPacket(frame.(*lmproto.SendackPacket))
case lmproto.RECV: // 收到消息
c.handleRecvPacket(frame.(*lmproto.RecvPacket))
case wkproto.SENDACK: // 发送回执
c.handleSendackPacket(frame.(*wkproto.SendackPacket))
case wkproto.RECV: // 收到消息
c.handleRecvPacket(frame.(*wkproto.RecvPacket))
}
}
func (c *Client) handleSendackPacket(packet *lmproto.SendackPacket) {
func (c *Client) handleSendackPacket(packet *wkproto.SendackPacket) {
c.sendingLock.Lock()
defer c.sendingLock.Unlock()
@@ -453,12 +453,12 @@ func (c *Client) handleSendackPacket(packet *lmproto.SendackPacket) {
}
// 处理接受包
func (c *Client) handleRecvPacket(packet *lmproto.RecvPacket) {
func (c *Client) handleRecvPacket(packet *wkproto.RecvPacket) {
var err error
var payload []byte
if c.onRecv != nil {
if !packet.Setting.IsSet(lmproto.SettingNoEncrypt) {
payload, err = limutil.AesDecryptPkcs7Base64(packet.Payload, []byte(c.aesKey), []byte(c.salt))
if !packet.Setting.IsSet(wkproto.SettingNoEncrypt) {
payload, err = wkutil.AesDecryptPkcs7Base64(packet.Payload, []byte(c.aesKey), []byte(c.salt))
if err != nil {
panic(err)
}
@@ -467,7 +467,7 @@ func (c *Client) handleRecvPacket(packet *lmproto.RecvPacket) {
err = c.onRecv(packet)
}
if err == nil {
c.sendPacket(&lmproto.RecvackPacket{
c.sendPacket(&wkproto.RecvackPacket{
Framer: packet.Framer,
MessageID: packet.MessageID,
MessageSeq: packet.MessageSeq,

View File

@@ -1,6 +1,6 @@
package client
import "github.com/WuKongIM/WuKongIM/pkg/lmproto"
import "github.com/WuKongIM/WuKongIM/pkg/wkproto"
func parse(buff []byte) ([]byte, []byte, error) {
if len(buff) == 0 {
@@ -10,8 +10,8 @@ func parse(buff []byte) ([]byte, []byte, error) {
packetData := make([]byte, 0, len(buff))
for len(buff) > offset {
typeAndFlags := buff[offset]
packetType := lmproto.FrameType(typeAndFlags >> 4)
if packetType == lmproto.PING || packetType == lmproto.PONG {
packetType := wkproto.FrameType(typeAndFlags >> 4)
if packetType == wkproto.PING || packetType == wkproto.PONG {
packetData = append(packetData, buff[offset])
offset++
continue

View File

@@ -38,11 +38,11 @@ const (
)
var (
ErrStaleConnection = errors.New("limao: " + STALE_CONNECTION)
ErrNoServers = errors.New("limao: no servers available for connection")
ErrBadTimeout = errors.New("limao: timeout invalid")
ErrConnectionClosed = errors.New("limao: connection closed")
ErrTimeout = errors.New("limao: timeout")
ErrStaleConnection = errors.New("wukongim " + STALE_CONNECTION)
ErrNoServers = errors.New("wukongim no servers available for connection")
ErrBadTimeout = errors.New("wukongim timeout invalid")
ErrConnectionClosed = errors.New("wukongim connection closed")
ErrTimeout = errors.New("wukongim timeout")
)
type Statistics struct {

View File

@@ -3,7 +3,7 @@ package client
import (
"time"
"github.com/WuKongIM/WuKongIM/pkg/lmproto"
"github.com/WuKongIM/WuKongIM/pkg/wkproto"
)
// Options Options
@@ -42,7 +42,7 @@ type Options struct {
// NewOptions 创建默认配置
func NewOptions() *Options {
return &Options{
ProtoVersion: lmproto.LatestVersion,
ProtoVersion: wkproto.LatestVersion,
AutoReconn: false,
DefaultBufSize: 32768,
ReconnectBufSize: 8 * 1024 * 1024,

View File

@@ -1,21 +0,0 @@
useage
```go
limnet.Serve("tcp://127.0.0.1:1999",handler,opts...)
```
event
```go
OnConnect(c *limnet.Conn)
OnClose(c *limnet.Conn)
OnData(c *limnet.Conn)
```

View File

@@ -18,7 +18,7 @@
package ring
import (
"github.com/WuKongIM/WuKongIM/pkg/limnet/io"
"github.com/WuKongIM/WuKongIM/pkg/wknet/io"
"golang.org/x/sys/unix"
)

View File

@@ -1,4 +1,4 @@
package lmhttp
package wkhttp
import (
"io/ioutil"
@@ -10,13 +10,13 @@ import (
"github.com/sendgrid/rest"
)
type LMHttp struct {
type WKHttp struct {
r *gin.Engine
pool sync.Pool
}
func New() *LMHttp {
l := &LMHttp{
func New() *WKHttp {
l := &WKHttp{
r: gin.Default(),
pool: sync.Pool{},
}
@@ -28,12 +28,12 @@ func New() *LMHttp {
}
// GetGinRoute GetGinRoute
func (l *LMHttp) GetGinRoute() *gin.Engine {
func (l *WKHttp) GetGinRoute() *gin.Engine {
return l.r
}
// Static Static
func (l *LMHttp) Static(relativePath string, root string) {
func (l *WKHttp) Static(relativePath string, root string) {
l.r.Static(relativePath, root)
}
func allocateContext() *Context {
@@ -41,11 +41,11 @@ func allocateContext() *Context {
}
// Use Use
func (l *LMHttp) Use(handlers ...HandlerFunc) {
func (l *WKHttp) Use(handlers ...HandlerFunc) {
l.r.Use(l.handlersToGinHandleFuncs(handlers)...)
}
func (l *LMHttp) handlersToGinHandleFuncs(handlers []HandlerFunc) []gin.HandlerFunc {
func (l *WKHttp) handlersToGinHandleFuncs(handlers []HandlerFunc) []gin.HandlerFunc {
newHandlers := make([]gin.HandlerFunc, 0, len(handlers))
if handlers != nil {
for _, handler := range handlers {
@@ -149,7 +149,7 @@ func (c *Context) CopyRequestHeader(request *http.Request) map[string]string {
type HandlerFunc func(c *Context)
// LMHttpHandler LMHttpHandler
func (l *LMHttp) LMHttpHandler(handlerFunc HandlerFunc) gin.HandlerFunc {
func (l *WKHttp) LMHttpHandler(handlerFunc HandlerFunc) gin.HandlerFunc {
return func(c *gin.Context) {
hc := l.pool.Get().(*Context)
hc.reset()
@@ -160,35 +160,35 @@ func (l *LMHttp) LMHttpHandler(handlerFunc HandlerFunc) gin.HandlerFunc {
}
// Run Run
func (l *LMHttp) Run(addr ...string) error {
func (l *WKHttp) Run(addr ...string) error {
return l.r.Run(addr...)
}
// POST POST
func (l *LMHttp) POST(relativePath string, handlers ...HandlerFunc) {
func (l *WKHttp) POST(relativePath string, handlers ...HandlerFunc) {
l.r.POST(relativePath, l.handlersToGinHandleFunc(handlers)...)
}
// GET GET
func (l *LMHttp) GET(relativePath string, handlers ...HandlerFunc) {
func (l *WKHttp) GET(relativePath string, handlers ...HandlerFunc) {
l.r.GET(relativePath, l.handlersToGinHandleFunc(handlers)...)
}
// DELETE DELETE
func (l *LMHttp) DELETE(relativePath string, handlers ...HandlerFunc) {
func (l *WKHttp) DELETE(relativePath string, handlers ...HandlerFunc) {
l.r.DELETE(relativePath, l.handlersToGinHandleFunc(handlers)...)
}
func (l *LMHttp) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (l *WKHttp) ServeHTTP(w http.ResponseWriter, req *http.Request) {
l.r.ServeHTTP(w, req)
}
// Group Group
func (l *LMHttp) Group(relativePath string, handlers ...HandlerFunc) {
func (l *WKHttp) Group(relativePath string, handlers ...HandlerFunc) {
l.r.Group(relativePath, l.handlersToGinHandleFunc(handlers)...)
}
func (l *LMHttp) handlersToGinHandleFunc(handlers []HandlerFunc) []gin.HandlerFunc {
func (l *WKHttp) handlersToGinHandleFunc(handlers []HandlerFunc) []gin.HandlerFunc {
newHandlers := make([]gin.HandlerFunc, 0, len(handlers))
if handlers != nil {
for _, handler := range handlers {

View File

@@ -1,4 +1,4 @@
package limlog
package wklog
import (
"fmt"
@@ -151,33 +151,33 @@ type Log interface {
Warn(msg string, fields ...zap.Field)
}
// LIMLog TLog
type LIMLog struct {
// WKLog TLog
type WKLog struct {
prefix string // 日志前缀
}
// NewLIMLog NewLIMLog
func NewLIMLog(prefix string) *LIMLog {
// NewWKLog NewWKLog
func NewWKLog(prefix string) *WKLog {
return &LIMLog{prefix: prefix}
return &WKLog{prefix: prefix}
}
// Info Info
func (t *LIMLog) Info(msg string, fields ...zap.Field) {
func (t *WKLog) Info(msg string, fields ...zap.Field) {
Info(fmt.Sprintf("【%s】%s", t.prefix, msg), fields...)
}
// Debug Debug
func (t *LIMLog) Debug(msg string, fields ...zap.Field) {
func (t *WKLog) Debug(msg string, fields ...zap.Field) {
Debug(fmt.Sprintf("【%s】%s", t.prefix, msg), fields...)
}
// Error Error
func (t *LIMLog) Error(msg string, fields ...zap.Field) {
func (t *WKLog) Error(msg string, fields ...zap.Field) {
Error(fmt.Sprintf("【%s】%s", t.prefix, msg), fields...)
}
// Warn Warn
func (t *LIMLog) Warn(msg string, fields ...zap.Field) {
func (t *WKLog) Warn(msg string, fields ...zap.Field) {
Warn(fmt.Sprintf("【%s】%s", t.prefix, msg), fields...)
}

View File

@@ -1,4 +1,4 @@
package limlog
package wklog
import (
"testing"

View File

@@ -1,4 +1,4 @@
package limlog
package wklog
import "go.uber.org/zap/zapcore"

21
pkg/wknet/README.md Normal file
View File

@@ -0,0 +1,21 @@
useage
```go
wknet.Serve("tcp://127.0.0.1:1999",handler,opts...)
```
event
```go
OnConnect(c *wknet.Conn)
OnClose(c *wknet.Conn)
OnData(c *wknet.Conn)
```

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import (
"errors"
@@ -6,9 +6,9 @@ import (
"os"
perrors "github.com/WuKongIM/WuKongIM/pkg/errors"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limnet/netpoll"
"github.com/WuKongIM/WuKongIM/pkg/socket"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wknet/netpoll"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)
@@ -20,7 +20,7 @@ type Acceptor struct {
listen *listener
limlog.Log
wklog.Log
}
func NewAcceptor(eg *Engine) *Acceptor {
@@ -33,7 +33,7 @@ func NewAcceptor(eg *Engine) *Acceptor {
eg: eg,
reactorSubs: reactorSubs,
listenPoller: netpoll.NewPoller(),
Log: limlog.NewLIMLog("Acceptor"),
Log: wklog.NewWKLog("Acceptor"),
}
return a

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
type Buffer interface {
// IsEmpty returns true if the buffer is empty.

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import (
"fmt"
@@ -8,7 +8,7 @@ import (
"sync"
"syscall"
lio "github.com/WuKongIM/WuKongIM/pkg/limnet/io"
lio "github.com/WuKongIM/WuKongIM/pkg/wknet/io"
"golang.org/x/sys/unix"
)

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import "sync"

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import "net"

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import (
"errors"

View File

@@ -9,7 +9,7 @@ import (
"runtime"
"syscall"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)
@@ -22,7 +22,7 @@ var note = []unix.Kevent_t{{
type Poller struct {
fd int
limlog.Log
wklog.Log
shutdown bool
}
@@ -42,7 +42,7 @@ func NewPoller() *Poller {
poller = nil
panic(err)
}
poller.Log = limlog.NewLIMLog("KqueuePoller")
poller.Log = wklog.NewWKLog("KqueuePoller")
return poller
}

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import (
"runtime"

View File

@@ -1,13 +1,11 @@
package limnet
package wknet
import (
"github.com/WuKongIM/WuKongIM/pkg/limlog"
)
import "github.com/WuKongIM/WuKongIM/pkg/wklog"
type ReactorMain struct {
acceptor *Acceptor
eg *Engine
limlog.Log
wklog.Log
}
func NewReactorMain(eg *Engine) *ReactorMain {
@@ -15,7 +13,7 @@ func NewReactorMain(eg *Engine) *ReactorMain {
return &ReactorMain{
acceptor: NewAcceptor(eg),
eg: eg,
Log: limlog.NewLIMLog("ReactorMain"),
Log: wklog.NewWKLog("ReactorMain"),
}
}

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import (
"bytes"
@@ -6,8 +6,8 @@ import (
"fmt"
"os"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/limnet/netpoll"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wknet/netpoll"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sys/unix"
@@ -19,7 +19,7 @@ type ReactorSub struct {
eg *Engine
idx int // index of the current sub reactor
connCount atomic.Int32
limlog.Log
wklog.Log
ReadBuffer []byte
cache bytes.Buffer // temporary buffer for scattered bytes
}
@@ -32,7 +32,7 @@ func NewReactorSub(eg *Engine, index int) *ReactorSub {
eg: eg,
poller: poller,
idx: index,
Log: limlog.NewLIMLog(fmt.Sprintf("ReactorSub-%d", index)),
Log: wklog.NewWKLog(fmt.Sprintf("ReactorSub-%d", index)),
ReadBuffer: make([]byte, eg.options.ReadBufferSize),
}
}

View File

@@ -1,4 +1,4 @@
package limnet
package wknet
import (
"io"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import "fmt"
@@ -78,11 +78,12 @@ func (f Framer) String() string {
// return uint8(encodeBool(s.Receipt) << 7)
// }
// func SettingFromUint8(v uint8) Setting {
// s := Setting{}
// s.Receipt = (v >> 7 & 0x01) > 0
// return s
// }
// func SettingFromUint8(v uint8) Setting {
// s := Setting{}
// s.Receipt = (v >> 7 & 0x01) > 0
// return s
// }
//
// FrameType 包类型
type FrameType uint8

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"io"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
// PingPacket ping包
type PingPacket struct {

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
// PongPacket pong包对ping的回应
type PongPacket struct {

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,10 +1,10 @@
package lmproto
package wkproto
import (
"fmt"
"io"
"github.com/WuKongIM/WuKongIM/pkg/limlog"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/pkg/errors"
)
@@ -21,9 +21,9 @@ type Protocol interface {
EncodeFrame(packet Frame, version uint8) ([]byte, error)
}
// LiMaoProto 狸猫协议对象
type LiMaoProto struct {
limlog.Log
// WKroto 狸猫协议对象
type WKProto struct {
wklog.Log
}
// LatestVersion 最新版本
@@ -32,10 +32,10 @@ const LatestVersion = 5
// MaxRemaingLength 最大剩余长度 // 1<<28 - 1
const MaxRemaingLength uint32 = 1024 * 1024
// New 创建limao协议对象
func New() *LiMaoProto {
return &LiMaoProto{
limlog.NewLIMLog("LiMaoProto"),
// New 创建wukong协议对象
func New() *WKProto {
return &WKProto{
wklog.NewWKLog("WKProto"),
}
}
@@ -66,7 +66,7 @@ var packetDecodeMap = map[FrameType]PacketDecodeFunc{
// }
// DecodePacketWithConn 解码包
func (l *LiMaoProto) DecodePacketWithConn(conn io.Reader, version uint8) (Frame, error) {
func (l *WKProto) DecodePacketWithConn(conn io.Reader, version uint8) (Frame, error) {
framer, err := l.decodeFramerWithConn(conn)
if err != nil {
return nil, err
@@ -102,7 +102,7 @@ func (l *LiMaoProto) DecodePacketWithConn(conn io.Reader, version uint8) (Frame,
}
// DecodePacket 解码包
func (l *LiMaoProto) DecodeFrame(data []byte, version uint8) (Frame, int, error) {
func (l *WKProto) DecodeFrame(data []byte, version uint8) (Frame, int, error) {
framer, remainingLengthLength, err := l.decodeFramer(data)
if err != nil {
return nil, 0, nil
@@ -146,7 +146,7 @@ func (l *LiMaoProto) DecodeFrame(data []byte, version uint8) (Frame, int, error)
}
// EncodePacket 编码包
func (l *LiMaoProto) EncodeFrame(frame Frame, version uint8) ([]byte, error) {
func (l *WKProto) EncodeFrame(frame Frame, version uint8) ([]byte, error) {
frameType := frame.GetFrameType()
if frameType == PING || frameType == PONG {
@@ -236,14 +236,14 @@ func (l *LiMaoProto) EncodeFrame(frame Frame, version uint8) ([]byte, error) {
return enc.Bytes(), nil
}
func (l *LiMaoProto) encodeFrame(f Frame, enc *Encoder, remainingLength uint32) {
func (l *WKProto) encodeFrame(f Frame, enc *Encoder, remainingLength uint32) {
enc.WriteByte(ToFixHeaderUint8(f))
encodeVariable2(remainingLength, enc)
}
func (l *LiMaoProto) encodeFramer(f Frame, remainingLength uint32) ([]byte, error) {
func (l *WKProto) encodeFramer(f Frame, remainingLength uint32) ([]byte, error) {
if f.GetFrameType() == PING || f.GetFrameType() == PONG {
return []byte{byte(int(f.GetFrameType()<<4) | 0)}, nil
@@ -259,7 +259,7 @@ func (l *LiMaoProto) encodeFramer(f Frame, remainingLength uint32) ([]byte, erro
return append(header, varHeader...), nil
}
func (l *LiMaoProto) decodeFramer(data []byte) (Framer, int, error) {
func (l *WKProto) decodeFramer(data []byte) (Framer, int, error) {
typeAndFlags := data[0]
p := FramerFromUint8(typeAndFlags)
var remainingLengthLength uint32 = 0 // 剩余长度的长度
@@ -277,7 +277,7 @@ func (l *LiMaoProto) decodeFramer(data []byte) (Framer, int, error) {
return p, int(remainingLengthLength), nil
}
func (l *LiMaoProto) decodeFramerWithConn(conn io.Reader) (Framer, error) {
func (l *WKProto) decodeFramerWithConn(conn io.Reader) (Framer, error) {
b := make([]byte, 1)
_, err := io.ReadFull(conn, b)
if err != nil {

View File

@@ -1,11 +1,11 @@
package lmproto
package wkproto
import (
"fmt"
"testing"
)
func TestEncodeAndDecodeLength(t *testing.T) {
func TestEncodeAndDecodeLength(t *testing.T) {
bys := encodeVariable(1241)
fmt.Println(bys)
}

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package lmproto
package wkproto
import (
"testing"

View File

@@ -1,4 +1,4 @@
package limutil
package wkutil
import (
"bytes"
@@ -47,7 +47,7 @@ func AesEncrypt(origData []byte, key []byte, iv []byte, paddingFunc func([]byte,
return crypted, nil
}
//AesDecryptSimple 解密
// AesDecryptSimple 解密
func AesDecryptSimple(crypted []byte, key string, iv string) ([]byte, error) {
return AesDecryptPkcs5(crypted, []byte(key), []byte(iv))
}

View File

@@ -1,4 +1,4 @@
package limutil
package wkutil
import (
"fmt"

View File

@@ -1,4 +1,4 @@
package limutil
package wkutil
import (
"bytes"
@@ -166,7 +166,7 @@ func AnyToDecimal(num string, n int) int64 {
return int64(newNum)
}
//GetRandomString 生成随机字符串
// GetRandomString 生成随机字符串
func GetRandomString(num int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
bytes := []byte(str)

View File

@@ -1,4 +1,4 @@
package limutil
package wkutil
import (
"crypto/rand"

View File

@@ -1,11 +1,11 @@
package limutil
package wkutil
import (
"crypto/md5"
"encoding/hex"
)
//MD5 加密
// MD5 加密
func MD5(str string) string {
h := md5.New()
h.Write([]byte(str)) // 需要加密的字符串

View File

@@ -1,4 +1,4 @@
package limutil
package wkutil
import (
"strings"

View File

@@ -1,4 +1,4 @@
package limutil
package wkutil
import (
"sync"