diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0abd7ea --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +main.exe \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 9d3be8d..e5fe6b2 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,7 +5,14 @@ - + + + + + + + + diff --git a/hooks/debug/debug.go b/hooks/debug/debug.go index 5fa852b..fc422db 100644 --- a/hooks/debug/debug.go +++ b/hooks/debug/debug.go @@ -82,7 +82,7 @@ func (h *Hook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, return pk, nil } - h.Log.Debug(fmt.Sprintf("%s << %s", strings.ToUpper(packets.PacketNames[pk.FixedHeader.Type]), cl.ID), "m", h.packetMeta(pk)) + h.Log.Debug(fmt.Sprintf("%s << %s", strings.ToUpper(packets.ZHPacketNames[packets.PacketNames[pk.FixedHeader.Type]]), cl.ID), "m", h.packetMeta(pk)) return pk, nil } @@ -92,7 +92,7 @@ func (h *Hook) OnPacketSent(cl *mqtt.Client, pk packets.Packet, b []byte) { return } - h.Log.Debug(fmt.Sprintf("%s >> %s", strings.ToUpper(packets.PacketNames[pk.FixedHeader.Type]), cl.ID), "m", h.packetMeta(pk)) + h.Log.Debug(fmt.Sprintf("%s >> %s", strings.ToUpper(packets.ZHPacketNames[packets.PacketNames[pk.FixedHeader.Type]]), cl.ID), "m", h.packetMeta(pk)) } // OnRetainMessage is called when a published message is retained (or retain deleted/modified). @@ -122,12 +122,12 @@ func (h *Hook) OnLWTSent(cl *mqtt.Client, pk packets.Packet) { // OnRetainedExpired is called when the server clears expired retained messages. func (h *Hook) OnRetainedExpired(filter string) { - h.Log.Debug("retained message expired", "method", "OnRetainedExpired", "topic", filter) + h.Log.Debug("保留消息过期", "method", "OnRetainedExpired", "topic", filter) } // OnClientExpired is called when the server clears an expired client. func (h *Hook) OnClientExpired(cl *mqtt.Client) { - h.Log.Debug("client session expired", "method", "OnClientExpired", "client", cl.ID) + h.Log.Debug("客户端 session 过期", "method", "OnClientExpired", "client", cl.ID) } // StoredClients is called when the server restores clients from a store. diff --git a/mqtt/server.go b/mqtt/server.go index af7992a..419d36c 100644 --- a/mqtt/server.go +++ b/mqtt/server.go @@ -13,9 +13,9 @@ import ( "os" "runtime" "sort" - "strconv" "strings" "sync/atomic" + "testmqtt/utils" "time" "testmqtt/hooks/storage" @@ -28,41 +28,76 @@ import ( const ( // Version 服务器版本 - Version = "2.6.5" // the current server version. + // the current server version. + Version = "2.6.5" // 默认系统主题发布时间间隔 - defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes - LocalListener = "local" - InlineClientId = "inline" + // the interval between $SYS topic publishes + defaultSysTopicInterval int64 = 1 + + LocalListener = "local" + InlineClientId = "inline" ) var ( + // DefaultServerCapabilities 弃用:使用 NewDefaultServerCapabilities 可避免数据争用问题。 // Deprecated: Use NewDefaultServerCapabilities to avoid data race issue. DefaultServerCapabilities = NewDefaultServerCapabilities() - ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists - ErrConnectionClosed = errors.New("connection not open") // connection is closed - ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default - ErrOptionsUnreadable = errors.New("unable to read options from bytes") + // ErrListenerIDExists 具有相同 ID 的侦听器已存在 + // ErrListenerIDExists a listener with the same id already exists + ErrListenerIDExists = errors.New("监听器 id 已经存在") + + // ErrConnectionClosed 连接已关闭 + // ErrConnectionClosed connection is closed + ErrConnectionClosed = errors.New("连接已关闭") + + // ErrInlineClientNotEnabled 请设置 Options.InlineClient=true 以使用此功能 + // inline client is not enabled by default + ErrInlineClientNotEnabled = errors.New("请设置 options.inline_client=true") + + // ErrOptionsUnreadable 无法从字节中读取选项 + ErrOptionsUnreadable = errors.New("无法从字节中读取选项") ) +// Capabilities 表示服务器提供的功能和特性。 // Capabilities indicates the capabilities and features provided by the server. type Capabilities struct { - MaximumClients int64 `yaml:"maximum_clients" json:"maximum_clients"` // maximum number of connected clients - MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` // maximum message expiry if message expiry is 0 or over - MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"` // maximum number of pending message writes for a client - MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` // maximum number of seconds to keep disconnected sessions - MaximumPacketSize uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"` // maximum packet size, no limit if 0 - maximumPacketID uint32 // unexported, used for testing only - ReceiveMaximum uint16 `yaml:"receive_maximum" json:"receive_maximum"` // maximum number of concurrent qos messages per client - MaximumInflight uint16 `yaml:"maximum_inflight" json:"maximum_inflight"` // maximum number of qos > 0 messages can be stored, 0(=8192)-65535 - TopicAliasMaximum uint16 `yaml:"topic_alias_maximum" json:"topic_alias_maximum"` // maximum topic alias value - SharedSubAvailable byte `yaml:"shared_sub_available" json:"shared_sub_available"` // support of shared subscriptions - MinimumProtocolVersion byte `yaml:"minimum_protocol_version" json:"minimum_protocol_version"` // minimum supported mqtt version - Compatibilities Compatibilities `yaml:"compatibilities" json:"compatibilities"` // version compatibilities the server provides - MaximumQos byte `yaml:"maximum_qos" json:"maximum_qos"` // maximum qos value available to clients - RetainAvailable byte `yaml:"retain_available" json:"retain_available"` // support of retain messages - WildcardSubAvailable byte `yaml:"wildcard_sub_available" json:"wildcard_sub_available"` // support of wildcard subscriptions - SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"` // support of subscription identifiers + // 连接的客户端的最大数量 + // MaximumClients maximum number of connected clients + MaximumClients int64 `yaml:"maximum_clients" json:"maximum_clients"` + + // 消息过期的最大时间间隔,单位为秒。超过这个时间的消息将被丢弃。 + // maximum message expiry if message expiry is 0 or over + MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` + + // maximum number of pending message writes for a client + MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"` + // maximum number of seconds to keep disconnected sessions + MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` + // maximum packet size, no limit if 0 + MaximumPacketSize uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"` + // unexported, used for testing only + maximumPacketID uint32 + // maximum number of concurrent qos messages per client + ReceiveMaximum uint16 `yaml:"receive_maximum" json:"receive_maximum"` + // maximum number of qos > 0 messages can be stored, 0(=8192)-65535 + MaximumInflight uint16 `yaml:"maximum_inflight" json:"maximum_inflight"` + // maximum topic alias value + TopicAliasMaximum uint16 `yaml:"topic_alias_maximum" json:"topic_alias_maximum"` + // support of shared subscriptions + SharedSubAvailable byte `yaml:"shared_sub_available" json:"shared_sub_available"` + // minimum supported mqtt version + MinimumProtocolVersion byte `yaml:"minimum_protocol_version" json:"minimum_protocol_version"` + // version compatibilities the server provides + Compatibilities Compatibilities `yaml:"compatibilities" json:"compatibilities"` + // maximum qos value available to clients + MaximumQos byte `yaml:"maximum_qos" json:"maximum_qos"` + // support of retain messages + RetainAvailable byte `yaml:"retain_available" json:"retain_available"` + // support of wildcard subscriptions + WildcardSubAvailable byte `yaml:"wildcard_sub_available" json:"wildcard_sub_available"` + // support of subscription identifiers + SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"` } // NewDefaultServerCapabilities defines the default features and capabilities provided by the server. @@ -86,15 +121,22 @@ func NewDefaultServerCapabilities() *Capabilities { } } +// Compatibilities 兼容模式配置 为了保持与旧版本或特殊客户端的兼容性 // Compatibilities provides flags for using compatibility modes. type Compatibilities struct { - ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` // return unspecified errors instead of not authorized - PassiveClientDisconnect bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"` // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation) - AlwaysReturnResponseInfo bool `yaml:"always_return_response_info" json:"always_return_response_info"` // always return response info (useful for testing) - RestoreSysInfoOnRestart bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"` // restore system info from store as if server never stopped - NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"` // don't allow inherited user properties on ack (paho - spec violation) + // return unspecified errors instead of not authorized + ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` + // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation) + PassiveClientDisconnect bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"` + // always return response info (useful for testing) + AlwaysReturnResponseInfo bool `yaml:"always_return_response_info" json:"always_return_response_info"` + // restore system info from store as if server never stopped + RestoreSysInfoOnRestart bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"` + // don't allow inherited user properties on ack (paho - spec violation) + NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"` } +// Options 服务器可配置项 // Options contains configurable options for the server. type Options struct { // Listeners specifies any listeners which should be dynamically added on serve. Used when setting listeners by config. @@ -166,7 +208,8 @@ type ops struct { log *slog.Logger // a structured logger for the client } -// New returns a new instance of mochi mqtt broker. Optional parameters +// New 创建 MQTT Broker 新实例 。可以指定可选参数来覆盖某些默认设置(请参阅选项)。 +// New returns a new instance of mqtt broker. Optional parameters // can be specified to override some default settings (see Options). func New(opts *Options) *Server { if opts == nil { @@ -207,6 +250,7 @@ func New(opts *Options) *Server { return s } +// 如果未提供默认配置,则用合理的默认值启动 // ensureDefaults ensures that the server starts with sane default values, if none are provided. func (o *Options) ensureDefaults() { if o.Capabilities == nil { @@ -482,7 +526,7 @@ func (s *Server) attachClient(cl *Client, listener string) error { } else { cl.Properties.Will = Will{} // [MQTT-3.14.4-3] [MQTT-3.1.2-10] } - s.Log.Debug("client disconnected", "error", err, "client", cl.ID, "remote", cl.Net.Remote, "listener", listener) + s.Log.Debug("客户端断开连接", "error", err, "client", cl.ID, "remote", cl.Net.Remote, "listener", listener) expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean) s.hooks.OnDisconnect(cl, err, expire) @@ -1454,27 +1498,39 @@ func (s *Server) publishSysTopics() { atomic.StoreInt64(&s.Info.ClientsDisconnected, atomic.LoadInt64(&s.Info.ClientsTotal)-atomic.LoadInt64(&s.Info.ClientsConnected)) info := s.Info.Clone() + topics := map[string]string{ - SysPrefix + "/broker/version": s.Info.Version, - SysPrefix + "/broker/time": Int64toa(info.Time), - SysPrefix + "/broker/uptime": Int64toa(info.Uptime), - SysPrefix + "/broker/started": Int64toa(info.Started), - SysPrefix + "/broker/load/bytes/received": Int64toa(info.BytesReceived), - SysPrefix + "/broker/load/bytes/sent": Int64toa(info.BytesSent), - SysPrefix + "/broker/clients/connected": Int64toa(info.ClientsConnected), - SysPrefix + "/broker/clients/disconnected": Int64toa(info.ClientsDisconnected), - SysPrefix + "/broker/clients/maximum": Int64toa(info.ClientsMaximum), - SysPrefix + "/broker/clients/total": Int64toa(info.ClientsTotal), - SysPrefix + "/broker/packets/received": Int64toa(info.PacketsReceived), - SysPrefix + "/broker/packets/sent": Int64toa(info.PacketsSent), - SysPrefix + "/broker/messages/received": Int64toa(info.MessagesReceived), - SysPrefix + "/broker/messages/sent": Int64toa(info.MessagesSent), - SysPrefix + "/broker/messages/dropped": Int64toa(info.MessagesDropped), - SysPrefix + "/broker/messages/inflight": Int64toa(info.Inflight), - SysPrefix + "/broker/retained": Int64toa(info.Retained), - SysPrefix + "/broker/subscriptions": Int64toa(info.Subscriptions), - SysPrefix + "/broker/system/memory": Int64toa(info.MemoryAlloc), - SysPrefix + "/broker/system/threads": Int64toa(info.Threads), + // 代理的版本-静态 + SysPrefix + "/broker/version": s.Info.Version, + // 服务器上的当前时间 + SysPrefix + "/broker/time": utils.Int64toa(info.Time), + // 代理在线的时间(以秒为单位) + SysPrefix + "/broker/uptime": utils.Int64toa(info.Uptime), + SysPrefix + "/broker/started": utils.Int64toa(info.Started), + // 自代理启动接收的总字节数 + SysPrefix + "/broker/load/bytes/received": utils.Int64toa(info.BytesReceived), + // 自代理启动以来发送的总字节数 + SysPrefix + "/broker/load/bytes/sent": utils.Int64toa(info.BytesSent), + // 当前连接的客户端数量 + SysPrefix + "/broker/clients/connected": utils.Int64toa(info.ClientsConnected), + // 在代理上注册但当前已断开连接的持久客户端总数(已取消清理会话) + SysPrefix + "/broker/clients/disconnected": utils.Int64toa(info.ClientsDisconnected), + // 已连接到代理的最大活动客户端数量。这仅在更新 $SYS 主题树时计算,因此可能不会计算短暂的客户端连接 + SysPrefix + "/broker/clients/maximum": utils.Int64toa(info.ClientsMaximum), + // 当前在代理上连接和注册的具有持久会话的连接和断开客户端的总数。 + SysPrefix + "/broker/clients/total": utils.Int64toa(info.ClientsTotal), + SysPrefix + "/broker/packets/received": utils.Int64toa(info.PacketsReceived), + SysPrefix + "/broker/packets/sent": utils.Int64toa(info.PacketsSent), + // 自代理启动以来收到的任何类型的消息总数。 + SysPrefix + "/broker/messages/received": utils.Int64toa(info.MessagesReceived), + //自代理启动以来发送的任何类型的消息总数。 + SysPrefix + "/broker/messages/sent": utils.Int64toa(info.MessagesSent), + SysPrefix + "/broker/messages/dropped": utils.Int64toa(info.MessagesDropped), + SysPrefix + "/broker/messages/inflight": utils.Int64toa(info.Inflight), + SysPrefix + "/broker/retained": utils.Int64toa(info.Retained), + SysPrefix + "/broker/subscriptions": utils.Int64toa(info.Subscriptions), + SysPrefix + "/broker/system/memory": utils.Int64toa(info.MemoryAlloc), + SysPrefix + "/broker/system/threads": utils.Int64toa(info.Threads), } for topic, payload := range topics { @@ -1487,15 +1543,16 @@ func (s *Server) publishSysTopics() { s.hooks.OnSysInfoTick(info) } +// Close 尝试正常关闭服务器、所有侦听器、客户端和存储。 // Close attempts to gracefully shut down the server, all listeners, clients, and stores. func (s *Server) Close() error { close(s.done) - s.Log.Info("gracefully stopping server") + s.Log.Info("正在停止MQTT服务器") s.Listeners.CloseAll(s.closeListenerClients) s.hooks.OnStopped() s.hooks.Stop() - s.Log.Info("mochi mqtt server stopped") + s.Log.Info("MQTT服务器已停止") return nil } @@ -1596,6 +1653,7 @@ func (s *Server) readStore() error { return nil } +// 从数据存储中恢复服务器信息 // loadServerInfo restores server info from the datastore. func (s *Server) loadServerInfo(v system.Info) { if s.Options.Capabilities.Compatibilities.RestoreSysInfoOnRestart { @@ -1616,6 +1674,7 @@ func (s *Server) loadServerInfo(v system.Info) { atomic.StoreInt64(&s.Info.Subscriptions, v.Subscriptions) } +// 从数据存储中还原订阅 // loadSubscriptions restores subscriptions from the datastore. func (s *Server) loadSubscriptions(v []storage.Subscription) { for _, sub := range v { @@ -1635,6 +1694,7 @@ func (s *Server) loadSubscriptions(v []storage.Subscription) { } } +// 从数据存储中恢复客户端 // loadClients restores clients from the datastore. func (s *Server) loadClients(v []storage.Client) { for _, c := range v { @@ -1752,8 +1812,3 @@ func (s *Server) sendDelayedLWT(dt int64) { } } } - -// Int64toa converts an int64 to a string. -func Int64toa(v int64) string { - return strconv.FormatInt(v, 10) -} diff --git a/packets/packets.go b/packets/packets.go index d52d5af..224a091 100644 --- a/packets/packets.go +++ b/packets/packets.go @@ -39,9 +39,11 @@ const ( ) var ( + // ErrNoValidPacketAvailable 表示MQTT规范中不存在提供的数据包类型字节 // ErrNoValidPacketAvailable indicates the packet type byte provided does not exist in the mqtt specification. - ErrNoValidPacketAvailable = errors.New("no valid packet available") + ErrNoValidPacketAvailable = errors.New("没有可用的有效数据包") + // PacketNames 数据包字节到人类可读名称的映射,以便于调试 // PacketNames is a map of packet bytes to human-readable names, for easier debugging. PacketNames = map[byte]string{ 0: "Reserved", @@ -61,6 +63,25 @@ var ( 14: "Disconnect", 15: "Auth", } + + ZHPacketNames = map[string]string{ + "Reserved": "Reserved保留字段", + "Connect": "请求连接", + "Connack": "确认连接请求", + "Publish": "发布", + "Puback": "发布确认", + "Pubrec": "发布接收", + "Pubrel": "发布释放", + "Pubcomp": "发布完成", + "Subscribe": "订阅", + "Unsubscribe": "取消订阅", + "Suback": "订阅确认", + "Unsuback": "取消订阅确认", + "Pingreq": "心跳请求", + "Pingresp": "心跳响应", + "Disconnect": "断开连接", + "Auth": "认证", + } ) // Packets is a concurrency safe map of packets. diff --git a/system/system.go b/system/system.go index 1ceda94..a7bfc01 100644 --- a/system/system.go +++ b/system/system.go @@ -11,9 +11,9 @@ import "sync/atomic" // commonly found in $SYS topics (and others). // based on https://github.com/mqtt/mqtt.org/wiki/SYS-Topics type Info struct { - Version string `json:"version"` // the current version of the server - Started int64 `json:"started"` // the time the server started in unix seconds - Time int64 `json:"time"` // current time on the server + Version string `json:"version"` // 服务器的当前版本 + Started int64 `json:"started"` // 服务器启动的时间-时间戳 + Time int64 `json:"time"` // 服务器上的当前时间 Uptime int64 `json:"uptime"` // the number of seconds the server has been online BytesReceived int64 `json:"bytes_received"` // total number of bytes received since the broker started BytesSent int64 `json:"bytes_sent"` // total number of bytes sent since the broker started @@ -30,7 +30,7 @@ type Info struct { Subscriptions int64 `json:"subscriptions"` // total number of subscriptions active on the broker PacketsReceived int64 `json:"packets_received"` // the total number of publish messages received PacketsSent int64 `json:"packets_sent"` // total number of messages of any type sent since the broker started - MemoryAlloc int64 `json:"memory_alloc"` // memory currently allocated + MemoryAlloc int64 `json:"memory_alloc"` // 当前分配的内存 Threads int64 `json:"threads"` // number of active goroutines, named as threads for platform ambiguity } diff --git a/system/system_test.go b/system/system_test.go deleted file mode 100644 index b76df21..0000000 --- a/system/system_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package system - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestClone(t *testing.T) { - o := &Info{ - Version: "version", - Started: 1, - Time: 2, - Uptime: 3, - BytesReceived: 4, - BytesSent: 5, - ClientsConnected: 6, - ClientsMaximum: 7, - ClientsTotal: 8, - ClientsDisconnected: 9, - MessagesReceived: 10, - MessagesSent: 11, - MessagesDropped: 20, - Retained: 12, - Inflight: 13, - InflightDropped: 14, - Subscriptions: 15, - PacketsReceived: 16, - PacketsSent: 17, - MemoryAlloc: 18, - Threads: 19, - } - - n := o.Clone() - - require.Equal(t, o, n) -} diff --git a/utils/func.go b/utils/func.go new file mode 100644 index 0000000..7606b9c --- /dev/null +++ b/utils/func.go @@ -0,0 +1,8 @@ +package utils + +import "strconv" + +// Int64toa Int64转换为String。 +func Int64toa(v int64) string { + return strconv.FormatInt(v, 10) +}