代码整理

This commit is contained in:
2024-08-21 17:14:00 +08:00
parent e9bc1f37cc
commit 3a378cd7d2
8 changed files with 162 additions and 107 deletions

View File

@@ -13,9 +13,9 @@ import (
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync/atomic"
"testmqtt/utils"
"time"
"testmqtt/hooks/storage"
@@ -28,41 +28,76 @@ import (
const (
// Version 服务器版本
Version = "2.6.5" // the current server version.
// the current server version.
Version = "2.6.5"
// 默认系统主题发布时间间隔
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
LocalListener = "local"
InlineClientId = "inline"
// the interval between $SYS topic publishes
defaultSysTopicInterval int64 = 1
LocalListener = "local"
InlineClientId = "inline"
)
var (
// DefaultServerCapabilities 弃用:使用 NewDefaultServerCapabilities 可避免数据争用问题。
// Deprecated: Use NewDefaultServerCapabilities to avoid data race issue.
DefaultServerCapabilities = NewDefaultServerCapabilities()
ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists
ErrConnectionClosed = errors.New("connection not open") // connection is closed
ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default
ErrOptionsUnreadable = errors.New("unable to read options from bytes")
// ErrListenerIDExists 具有相同 ID 的侦听器已存在
// ErrListenerIDExists a listener with the same id already exists
ErrListenerIDExists = errors.New("监听器 id 已经存在")
// ErrConnectionClosed 连接已关闭
// ErrConnectionClosed connection is closed
ErrConnectionClosed = errors.New("连接已关闭")
// ErrInlineClientNotEnabled 请设置 Options.InlineClient=true 以使用此功能
// inline client is not enabled by default
ErrInlineClientNotEnabled = errors.New("请设置 options.inline_client=true")
// ErrOptionsUnreadable 无法从字节中读取选项
ErrOptionsUnreadable = errors.New("无法从字节中读取选项")
)
// Capabilities 表示服务器提供的功能和特性。
// Capabilities indicates the capabilities and features provided by the server.
type Capabilities struct {
MaximumClients int64 `yaml:"maximum_clients" json:"maximum_clients"` // maximum number of connected clients
MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` // maximum message expiry if message expiry is 0 or over
MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"` // maximum number of pending message writes for a client
MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` // maximum number of seconds to keep disconnected sessions
MaximumPacketSize uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"` // maximum packet size, no limit if 0
maximumPacketID uint32 // unexported, used for testing only
ReceiveMaximum uint16 `yaml:"receive_maximum" json:"receive_maximum"` // maximum number of concurrent qos messages per client
MaximumInflight uint16 `yaml:"maximum_inflight" json:"maximum_inflight"` // maximum number of qos > 0 messages can be stored, 0(=8192)-65535
TopicAliasMaximum uint16 `yaml:"topic_alias_maximum" json:"topic_alias_maximum"` // maximum topic alias value
SharedSubAvailable byte `yaml:"shared_sub_available" json:"shared_sub_available"` // support of shared subscriptions
MinimumProtocolVersion byte `yaml:"minimum_protocol_version" json:"minimum_protocol_version"` // minimum supported mqtt version
Compatibilities Compatibilities `yaml:"compatibilities" json:"compatibilities"` // version compatibilities the server provides
MaximumQos byte `yaml:"maximum_qos" json:"maximum_qos"` // maximum qos value available to clients
RetainAvailable byte `yaml:"retain_available" json:"retain_available"` // support of retain messages
WildcardSubAvailable byte `yaml:"wildcard_sub_available" json:"wildcard_sub_available"` // support of wildcard subscriptions
SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"` // support of subscription identifiers
// 连接的客户端的最大数量
// MaximumClients maximum number of connected clients
MaximumClients int64 `yaml:"maximum_clients" json:"maximum_clients"`
// 消息过期的最大时间间隔,单位为秒。超过这个时间的消息将被丢弃。
// maximum message expiry if message expiry is 0 or over
MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"`
// maximum number of pending message writes for a client
MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"`
// maximum number of seconds to keep disconnected sessions
MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"`
// maximum packet size, no limit if 0
MaximumPacketSize uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"`
// unexported, used for testing only
maximumPacketID uint32
// maximum number of concurrent qos messages per client
ReceiveMaximum uint16 `yaml:"receive_maximum" json:"receive_maximum"`
// maximum number of qos > 0 messages can be stored, 0(=8192)-65535
MaximumInflight uint16 `yaml:"maximum_inflight" json:"maximum_inflight"`
// maximum topic alias value
TopicAliasMaximum uint16 `yaml:"topic_alias_maximum" json:"topic_alias_maximum"`
// support of shared subscriptions
SharedSubAvailable byte `yaml:"shared_sub_available" json:"shared_sub_available"`
// minimum supported mqtt version
MinimumProtocolVersion byte `yaml:"minimum_protocol_version" json:"minimum_protocol_version"`
// version compatibilities the server provides
Compatibilities Compatibilities `yaml:"compatibilities" json:"compatibilities"`
// maximum qos value available to clients
MaximumQos byte `yaml:"maximum_qos" json:"maximum_qos"`
// support of retain messages
RetainAvailable byte `yaml:"retain_available" json:"retain_available"`
// support of wildcard subscriptions
WildcardSubAvailable byte `yaml:"wildcard_sub_available" json:"wildcard_sub_available"`
// support of subscription identifiers
SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"`
}
// NewDefaultServerCapabilities defines the default features and capabilities provided by the server.
@@ -86,15 +121,22 @@ func NewDefaultServerCapabilities() *Capabilities {
}
}
// Compatibilities 兼容模式配置 为了保持与旧版本或特殊客户端的兼容性
// Compatibilities provides flags for using compatibility modes.
type Compatibilities struct {
ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` // return unspecified errors instead of not authorized
PassiveClientDisconnect bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"` // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation)
AlwaysReturnResponseInfo bool `yaml:"always_return_response_info" json:"always_return_response_info"` // always return response info (useful for testing)
RestoreSysInfoOnRestart bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"` // restore system info from store as if server never stopped
NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"` // don't allow inherited user properties on ack (paho - spec violation)
// return unspecified errors instead of not authorized
ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"`
// don't disconnect the client forcefully after sending disconnect packet (paho - spec violation)
PassiveClientDisconnect bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"`
// always return response info (useful for testing)
AlwaysReturnResponseInfo bool `yaml:"always_return_response_info" json:"always_return_response_info"`
// restore system info from store as if server never stopped
RestoreSysInfoOnRestart bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"`
// don't allow inherited user properties on ack (paho - spec violation)
NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"`
}
// Options 服务器可配置项
// Options contains configurable options for the server.
type Options struct {
// Listeners specifies any listeners which should be dynamically added on serve. Used when setting listeners by config.
@@ -166,7 +208,8 @@ type ops struct {
log *slog.Logger // a structured logger for the client
}
// New returns a new instance of mochi mqtt broker. Optional parameters
// New 创建 MQTT Broker 新实例 。可以指定可选参数来覆盖某些默认设置(请参阅选项)。
// New returns a new instance of mqtt broker. Optional parameters
// can be specified to override some default settings (see Options).
func New(opts *Options) *Server {
if opts == nil {
@@ -207,6 +250,7 @@ func New(opts *Options) *Server {
return s
}
// 如果未提供默认配置,则用合理的默认值启动
// ensureDefaults ensures that the server starts with sane default values, if none are provided.
func (o *Options) ensureDefaults() {
if o.Capabilities == nil {
@@ -482,7 +526,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
} else {
cl.Properties.Will = Will{} // [MQTT-3.14.4-3] [MQTT-3.1.2-10]
}
s.Log.Debug("client disconnected", "error", err, "client", cl.ID, "remote", cl.Net.Remote, "listener", listener)
s.Log.Debug("客户端断开连接", "error", err, "client", cl.ID, "remote", cl.Net.Remote, "listener", listener)
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, err, expire)
@@ -1454,27 +1498,39 @@ func (s *Server) publishSysTopics() {
atomic.StoreInt64(&s.Info.ClientsDisconnected, atomic.LoadInt64(&s.Info.ClientsTotal)-atomic.LoadInt64(&s.Info.ClientsConnected))
info := s.Info.Clone()
topics := map[string]string{
SysPrefix + "/broker/version": s.Info.Version,
SysPrefix + "/broker/time": Int64toa(info.Time),
SysPrefix + "/broker/uptime": Int64toa(info.Uptime),
SysPrefix + "/broker/started": Int64toa(info.Started),
SysPrefix + "/broker/load/bytes/received": Int64toa(info.BytesReceived),
SysPrefix + "/broker/load/bytes/sent": Int64toa(info.BytesSent),
SysPrefix + "/broker/clients/connected": Int64toa(info.ClientsConnected),
SysPrefix + "/broker/clients/disconnected": Int64toa(info.ClientsDisconnected),
SysPrefix + "/broker/clients/maximum": Int64toa(info.ClientsMaximum),
SysPrefix + "/broker/clients/total": Int64toa(info.ClientsTotal),
SysPrefix + "/broker/packets/received": Int64toa(info.PacketsReceived),
SysPrefix + "/broker/packets/sent": Int64toa(info.PacketsSent),
SysPrefix + "/broker/messages/received": Int64toa(info.MessagesReceived),
SysPrefix + "/broker/messages/sent": Int64toa(info.MessagesSent),
SysPrefix + "/broker/messages/dropped": Int64toa(info.MessagesDropped),
SysPrefix + "/broker/messages/inflight": Int64toa(info.Inflight),
SysPrefix + "/broker/retained": Int64toa(info.Retained),
SysPrefix + "/broker/subscriptions": Int64toa(info.Subscriptions),
SysPrefix + "/broker/system/memory": Int64toa(info.MemoryAlloc),
SysPrefix + "/broker/system/threads": Int64toa(info.Threads),
// 代理的版本-静态
SysPrefix + "/broker/version": s.Info.Version,
// 服务器上的当前时间
SysPrefix + "/broker/time": utils.Int64toa(info.Time),
// 代理在线的时间(以秒为单位)
SysPrefix + "/broker/uptime": utils.Int64toa(info.Uptime),
SysPrefix + "/broker/started": utils.Int64toa(info.Started),
// 自代理启动接收的总字节数
SysPrefix + "/broker/load/bytes/received": utils.Int64toa(info.BytesReceived),
// 自代理启动以来发送的总字节数
SysPrefix + "/broker/load/bytes/sent": utils.Int64toa(info.BytesSent),
// 当前连接的客户端数量
SysPrefix + "/broker/clients/connected": utils.Int64toa(info.ClientsConnected),
// 在代理上注册但当前已断开连接的持久客户端总数(已取消清理会话)
SysPrefix + "/broker/clients/disconnected": utils.Int64toa(info.ClientsDisconnected),
// 已连接到代理的最大活动客户端数量。这仅在更新 $SYS 主题树时计算,因此可能不会计算短暂的客户端连接
SysPrefix + "/broker/clients/maximum": utils.Int64toa(info.ClientsMaximum),
// 当前在代理上连接和注册的具有持久会话的连接和断开客户端的总数。
SysPrefix + "/broker/clients/total": utils.Int64toa(info.ClientsTotal),
SysPrefix + "/broker/packets/received": utils.Int64toa(info.PacketsReceived),
SysPrefix + "/broker/packets/sent": utils.Int64toa(info.PacketsSent),
// 自代理启动以来收到的任何类型的消息总数。
SysPrefix + "/broker/messages/received": utils.Int64toa(info.MessagesReceived),
//自代理启动以来发送的任何类型的消息总数。
SysPrefix + "/broker/messages/sent": utils.Int64toa(info.MessagesSent),
SysPrefix + "/broker/messages/dropped": utils.Int64toa(info.MessagesDropped),
SysPrefix + "/broker/messages/inflight": utils.Int64toa(info.Inflight),
SysPrefix + "/broker/retained": utils.Int64toa(info.Retained),
SysPrefix + "/broker/subscriptions": utils.Int64toa(info.Subscriptions),
SysPrefix + "/broker/system/memory": utils.Int64toa(info.MemoryAlloc),
SysPrefix + "/broker/system/threads": utils.Int64toa(info.Threads),
}
for topic, payload := range topics {
@@ -1487,15 +1543,16 @@ func (s *Server) publishSysTopics() {
s.hooks.OnSysInfoTick(info)
}
// Close 尝试正常关闭服务器、所有侦听器、客户端和存储。
// Close attempts to gracefully shut down the server, all listeners, clients, and stores.
func (s *Server) Close() error {
close(s.done)
s.Log.Info("gracefully stopping server")
s.Log.Info("正在停止MQTT服务器")
s.Listeners.CloseAll(s.closeListenerClients)
s.hooks.OnStopped()
s.hooks.Stop()
s.Log.Info("mochi mqtt server stopped")
s.Log.Info("MQTT服务器已停止")
return nil
}
@@ -1596,6 +1653,7 @@ func (s *Server) readStore() error {
return nil
}
// 从数据存储中恢复服务器信息
// loadServerInfo restores server info from the datastore.
func (s *Server) loadServerInfo(v system.Info) {
if s.Options.Capabilities.Compatibilities.RestoreSysInfoOnRestart {
@@ -1616,6 +1674,7 @@ func (s *Server) loadServerInfo(v system.Info) {
atomic.StoreInt64(&s.Info.Subscriptions, v.Subscriptions)
}
// 从数据存储中还原订阅
// loadSubscriptions restores subscriptions from the datastore.
func (s *Server) loadSubscriptions(v []storage.Subscription) {
for _, sub := range v {
@@ -1635,6 +1694,7 @@ func (s *Server) loadSubscriptions(v []storage.Subscription) {
}
}
// 从数据存储中恢复客户端
// loadClients restores clients from the datastore.
func (s *Server) loadClients(v []storage.Client) {
for _, c := range v {
@@ -1752,8 +1812,3 @@ func (s *Server) sendDelayedLWT(dt int64) {
}
}
}
// Int64toa converts an int64 to a string.
func Int64toa(v int64) string {
return strconv.FormatInt(v, 10)
}