diff --git a/Dockerfile b/Dockerfile index 9ee3a37..b6b6617 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.22 as build +FROM golang:1.23 as build # ENV GOPROXY=https://goproxy.cn,direct ENV GO111MODULE=on diff --git a/internal/plugin/api.go b/internal/plugin/api.go index bec062b..61187d4 100644 --- a/internal/plugin/api.go +++ b/internal/plugin/api.go @@ -1,34 +1,128 @@ package plugin import ( - "github.com/WuKongIM/WuKongIM/pkg/wklog" + "context" + "fmt" + "io" + "net/http" + "time" + + "github.com/WuKongIM/WuKongIM/internal/types" + "github.com/WuKongIM/WuKongIM/internal/types/pluginproto" + "github.com/WuKongIM/WuKongIM/pkg/wkhttp" + "github.com/gin-gonic/gin" + "go.uber.org/zap" ) -type api struct { - s *Server - wklog.Log +func (s *Server) SetRoute(r *wkhttp.WKHttp) { + + r.Any("/plugins/:plugin/*path", s.handlePluginRoute) // 处理插件的路由,将http请求转发给插件 + + r.GET("/plugins", s.handleGetPlugins) // 获取插件列表 } -func newApi(s *Server) *api { - return &api{ - s: s, - Log: wklog.NewWKLog("plugin.api"), +// 获取插件列表 + +func (s *Server) handleGetPlugins(c *wkhttp.Context) { + plugins := s.pluginManager.all() + + resps := make([]*pluginResp, 0, len(plugins)) + for _, p := range plugins { + resps = append(resps, &pluginResp{ + PluginInfo: p.info, + }) + } + c.JSON(http.StatusOK, resps) +} + +// 处理插件的路由 +func (s *Server) handlePluginRoute(c *wkhttp.Context) { + pluginNo := c.Param("plugin") + plugin := s.pluginManager.get(pluginNo) + if plugin == nil { + c.JSON(http.StatusBadRequest, gin.H{ + "msg": "plugin not found", + "status": http.StatusNotFound, + }) + return + } + + if plugin.Status() != types.PluginStatusNormal { + msg := fmt.Sprintf("plugin status not normal: %s", plugin.Status()) + switch plugin.Status() { + case types.PluginStatusOffline: + msg = "plugin offline" + } + c.JSON(http.StatusBadRequest, gin.H{ + "msg": msg, + "status": http.StatusServiceUnavailable, + }) + return + } + + pluginPath := c.Param("path") + + headerMap := make(map[string]string) + for k, v := range c.Request.Header { + if len(v) == 0 { + continue + } + headerMap[k] = v[0] + } + + queryMap := make(map[string]string) + values := c.Request.URL.Query() + for k, v := range values { + if len(v) == 0 { + continue + } + queryMap[k] = v[0] + } + + bodyRead := c.Request.Body + + var ( + body []byte + err error + ) + if bodyRead != nil { + body, err = io.ReadAll(bodyRead) + if err != nil { + c.Status(http.StatusInternalServerError) + c.String(http.StatusInternalServerError, err.Error()) + return + } + } + + request := &pluginproto.HttpRequest{ + Method: c.Request.Method, + Path: pluginPath, + Headers: headerMap, + Query: queryMap, + Body: body, + } + + // 请求插件的路由 + timeoutCtx, cancel := context.WithTimeout(c.Request.Context(), time.Second*5) + resp, err := plugin.Route(timeoutCtx, request) + cancel() + if err != nil { + c.Status(http.StatusInternalServerError) + c.String(http.StatusInternalServerError, err.Error()) + return + } + + // 处理插件的响应 + c.Status(int(resp.Status)) + for k, v := range resp.Headers { + c.Writer.Header().Set(k, v) + } + _, err = c.Writer.Write(resp.Body) + if err != nil { + s.Error("write response error", zap.Error(err), zap.String("plugin", pluginNo), zap.String("path", pluginPath)) } } -func (a *api) routes() { - // ------------------- 插件 ------------------- - a.s.rpcServer.Route("/plugin/start", a.pluginStart) // 插件开始 - a.s.rpcServer.Route("/plugin/stop", a.pluginStop) // 插件停止 - a.s.rpcServer.Route("/plugin/httpForward", a.pluginHttpForward) // 插件HTTP转发 - - // ------------------- 消息 ------------------- - a.s.rpcServer.Route("/channel/messages", a.channelMessages) // 获取频道消息 - - // ------------------- 分布式 ------------------- - a.s.rpcServer.Route("/cluster/config", a.clusterConfig) // 获取分布式配置 - a.s.rpcServer.Route("/cluster/channels/belongNode", a.clusterChannelBelongNode) // 获取频道所属节点 - - // ------------------- 最近会话 ------------------- - a.s.rpcServer.Route("/conversation/channels", a.conversationChannels) // 获取最近会话的频道集合 +type pluginResp struct { + *pluginproto.PluginInfo } diff --git a/internal/plugin/rpc.go b/internal/plugin/rpc.go new file mode 100644 index 0000000..59b30de --- /dev/null +++ b/internal/plugin/rpc.go @@ -0,0 +1,34 @@ +package plugin + +import ( + "github.com/WuKongIM/WuKongIM/pkg/wklog" +) + +type rpc struct { + s *Server + wklog.Log +} + +func newRpc(s *Server) *rpc { + return &rpc{ + s: s, + Log: wklog.NewWKLog("plugin.rpc"), + } +} + +func (a *rpc) routes() { + // ------------------- 插件 ------------------- + a.s.rpcServer.Route("/plugin/start", a.pluginStart) // 插件开始 + a.s.rpcServer.Route("/plugin/stop", a.pluginStop) // 插件停止 + a.s.rpcServer.Route("/plugin/httpForward", a.pluginHttpForward) // 插件HTTP转发 + + // ------------------- 消息 ------------------- + a.s.rpcServer.Route("/channel/messages", a.channelMessages) // 获取频道消息 + + // ------------------- 分布式 ------------------- + a.s.rpcServer.Route("/cluster/config", a.clusterConfig) // 获取分布式配置 + a.s.rpcServer.Route("/cluster/channels/belongNode", a.clusterChannelBelongNode) // 获取频道所属节点 + + // ------------------- 最近会话 ------------------- + a.s.rpcServer.Route("/conversation/channels", a.conversationChannels) // 获取最近会话的频道集合 +} diff --git a/internal/plugin/api_cluster.go b/internal/plugin/rpc_cluster.go similarity index 95% rename from internal/plugin/api_cluster.go rename to internal/plugin/rpc_cluster.go index d1f0cbd..1627c26 100644 --- a/internal/plugin/api_cluster.go +++ b/internal/plugin/rpc_cluster.go @@ -9,7 +9,7 @@ import ( "go.uber.org/zap" ) -func (a *api) clusterConfig(c *wkrpc.Context) { +func (a *rpc) clusterConfig(c *wkrpc.Context) { nodes := service.Cluster.Nodes() slots := service.Cluster.Slots() @@ -49,7 +49,7 @@ func (a *api) clusterConfig(c *wkrpc.Context) { c.Write(data) } -func (a *api) clusterChannelBelongNode(c *wkrpc.Context) { +func (a *rpc) clusterChannelBelongNode(c *wkrpc.Context) { req := &pluginproto.ClusterChannelBelongNodeReq{} err := req.Unmarshal(c.Body()) diff --git a/internal/plugin/api_conversation.go b/internal/plugin/rpc_conversation.go similarity index 95% rename from internal/plugin/api_conversation.go rename to internal/plugin/rpc_conversation.go index 273fc78..960fa2d 100644 --- a/internal/plugin/api_conversation.go +++ b/internal/plugin/rpc_conversation.go @@ -13,7 +13,7 @@ import ( "go.uber.org/zap" ) -func (a *api) conversationChannels(c *wkrpc.Context) { +func (a *rpc) conversationChannels(c *wkrpc.Context) { req := &pluginproto.ConversationChannelReq{} err := req.Unmarshal(c.Body()) if err != nil { @@ -85,7 +85,7 @@ func (a *api) conversationChannels(c *wkrpc.Context) { } // ForwardWithBody 转发请求 -func (a *api) post(url string, body []byte) (*rest.Response, error) { +func (a *rpc) post(url string, body []byte) (*rest.Response, error) { req := rest.Request{ Method: rest.Post, diff --git a/internal/plugin/api_message.go b/internal/plugin/rpc_message.go similarity index 97% rename from internal/plugin/api_message.go rename to internal/plugin/rpc_message.go index 848d287..d1efb03 100644 --- a/internal/plugin/api_message.go +++ b/internal/plugin/rpc_message.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" ) -func (a *api) channelMessages(c *wkrpc.Context) { +func (a *rpc) channelMessages(c *wkrpc.Context) { channelMessageBatchReq := &pluginproto.ChannelMessageBatchReq{} err := channelMessageBatchReq.Unmarshal(c.Body()) if err != nil { diff --git a/internal/plugin/api_plugin.go b/internal/plugin/rpc_plugin.go similarity index 95% rename from internal/plugin/api_plugin.go rename to internal/plugin/rpc_plugin.go index 7a69cde..18c35b8 100644 --- a/internal/plugin/api_plugin.go +++ b/internal/plugin/rpc_plugin.go @@ -23,7 +23,7 @@ const ( ) // 插件启动 -func (a *api) pluginStart(c *wkrpc.Context) { +func (a *rpc) pluginStart(c *wkrpc.Context) { pluginInfo := &pluginproto.PluginInfo{} err := pluginInfo.Unmarshal(c.Body()) if err != nil { @@ -79,7 +79,7 @@ func (a *api) pluginStart(c *wkrpc.Context) { } // 插件停止 -func (a *api) pluginStop(c *wkrpc.Context) { +func (a *rpc) pluginStop(c *wkrpc.Context) { pluginInfo := &pluginproto.PluginInfo{} err := pluginInfo.Unmarshal(c.Body()) if err != nil { @@ -91,7 +91,7 @@ func (a *api) pluginStop(c *wkrpc.Context) { c.WriteOk() } -func (a *api) pluginHttpForward(c *wkrpc.Context) { +func (a *rpc) pluginHttpForward(c *wkrpc.Context) { forwardReq := &pluginproto.ForwardHttpReq{} err := forwardReq.Unmarshal(c.Body()) if err != nil { @@ -157,7 +157,7 @@ func (a *api) pluginHttpForward(c *wkrpc.Context) { c.Write(data) } -func (a *api) ForwardWithBody(url string, req *pluginproto.HttpRequest) (*pluginproto.HttpResponse, error) { +func (a *rpc) ForwardWithBody(url string, req *pluginproto.HttpRequest) (*pluginproto.HttpResponse, error) { r := rest.Request{ Method: rest.Method(strings.ToUpper(req.Method)), BaseURL: url, diff --git a/internal/plugin/server.go b/internal/plugin/server.go index d029e55..89654cd 100644 --- a/internal/plugin/server.go +++ b/internal/plugin/server.go @@ -1,30 +1,24 @@ package plugin import ( - "context" "errors" "fmt" - "io" "log" - "net/http" "os" "os/exec" "path" - "time" "github.com/WuKongIM/WuKongIM/internal/types" - "github.com/WuKongIM/WuKongIM/internal/types/pluginproto" - "github.com/WuKongIM/WuKongIM/pkg/wkhttp" + "github.com/WuKongIM/WuKongIM/pkg/wklog" "github.com/WuKongIM/wkrpc" - "github.com/gin-gonic/gin" "go.uber.org/zap" ) type Server struct { rpcServer *wkrpc.Server pluginManager *pluginManager - api *api + rpc *rpc wklog.Log opts *Options sandboxDir string // 沙箱目录 @@ -67,7 +61,7 @@ func NewServer(opts *Options) *Server { Log: wklog.NewWKLog("plugin.server"), sandboxDir: sandboxDir, } - s.api = newApi(s) + s.rpc = newRpc(s) return s } @@ -75,7 +69,7 @@ func (s *Server) Start() error { if err := s.rpcServer.Start(); err != nil { return err } - s.api.routes() + s.rpc.routes() if err := s.startPlugins(); err != nil { s.Error("start plugins error", zap.Error(err)) @@ -91,11 +85,6 @@ func (s *Server) Stop() { s.stopPlugins() } -func (s *Server) SetRoute(r *wkhttp.WKHttp) { - - r.Any("/plugins/:plugin/*path", s.handlePluginRoute) -} - // Plugins 获取插件列表 func (s *Server) Plugins(methods ...types.PluginMethod) []types.Plugin { if len(methods) == 0 { @@ -153,93 +142,6 @@ func getUnixSocket() (string, error) { return fmt.Sprintf("unix://%s", socketPath), nil } -// 处理插件的路由 -func (s *Server) handlePluginRoute(c *wkhttp.Context) { - pluginNo := c.Param("plugin") - plugin := s.pluginManager.get(pluginNo) - if plugin == nil { - c.JSON(http.StatusBadRequest, gin.H{ - "msg": "plugin not found", - "status": http.StatusNotFound, - }) - return - } - - if plugin.Status() != types.PluginStatusNormal { - msg := fmt.Sprintf("plugin status not normal: %s", plugin.Status()) - switch plugin.Status() { - case types.PluginStatusOffline: - msg = "plugin offline" - } - c.JSON(http.StatusBadRequest, gin.H{ - "msg": msg, - "status": http.StatusServiceUnavailable, - }) - } - - pluginPath := c.Param("path") - - headerMap := make(map[string]string) - for k, v := range c.Request.Header { - if len(v) == 0 { - continue - } - headerMap[k] = v[0] - } - - queryMap := make(map[string]string) - values := c.Request.URL.Query() - for k, v := range values { - if len(v) == 0 { - continue - } - queryMap[k] = v[0] - } - - bodyRead := c.Request.Body - - var ( - body []byte - err error - ) - if bodyRead != nil { - body, err = io.ReadAll(bodyRead) - if err != nil { - c.Status(http.StatusInternalServerError) - c.String(http.StatusInternalServerError, err.Error()) - return - } - } - - request := &pluginproto.HttpRequest{ - Method: c.Request.Method, - Path: pluginPath, - Headers: headerMap, - Query: queryMap, - Body: body, - } - - // 请求插件的路由 - timeoutCtx, cancel := context.WithTimeout(c.Request.Context(), time.Second*5) - resp, err := plugin.Route(timeoutCtx, request) - cancel() - if err != nil { - c.Status(http.StatusInternalServerError) - c.String(http.StatusInternalServerError, err.Error()) - return - } - - // 处理插件的响应 - c.Status(int(resp.Status)) - for k, v := range resp.Headers { - c.Writer.Header().Set(k, v) - } - _, err = c.Writer.Write(resp.Body) - if err != nil { - s.Error("write response error", zap.Error(err), zap.String("plugin", pluginNo), zap.String("path", pluginPath)) - } -} - // 启动插件执行文件 func (s *Server) startPlugins() error { pluginDir := s.opts.Dir diff --git a/internal/types/pluginproto/plugin.pb.go b/internal/types/pluginproto/plugin.pb.go index 96ac755..bd0689f 100644 --- a/internal/types/pluginproto/plugin.pb.go +++ b/internal/types/pluginproto/plugin.pb.go @@ -24,7 +24,7 @@ const ( type PluginInfo struct { state protoimpl.MessageState `protogen:"open.v1"` // 插件唯一编号 - No string `protobuf:"bytes,1,opt,name=No,proto3" json:"No,omitempty"` + No string `protobuf:"bytes,1,opt,name=no,proto3" json:"no,omitempty"` // 插件名称 Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // 插件拥有的方法 @@ -1396,8 +1396,8 @@ var file_internal_types_pluginproto_plugin_proto_rawDesc = []byte{ 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x84, 0x02, 0x0a, 0x0a, 0x50, 0x6c, 0x75, 0x67, 0x69, - 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, 0x4e, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x4e, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, + 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, 0x6e, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x6e, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, diff --git a/internal/types/pluginproto/plugin.proto b/internal/types/pluginproto/plugin.proto index f5d7d5c..3962ebe 100644 --- a/internal/types/pluginproto/plugin.proto +++ b/internal/types/pluginproto/plugin.proto @@ -8,7 +8,7 @@ option go_package = "./;pluginproto"; // 插件信息 message PluginInfo { // 插件唯一编号 - string No = 1; + string no = 1; // 插件名称 string name = 2; // 插件拥有的方法