commit cbfcc91eec8da887018752d9235bcef6d460f6b4 Author: iuu <2167162990@qq.com> Date: Mon Dec 23 18:34:46 2024 +0800 init diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/DT.iml b/.idea/DT.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/DT.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..148fa65 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..59d213f --- /dev/null +++ b/config.yaml @@ -0,0 +1,39 @@ +is_debug: true + + +service: + http: + host: 0.0.0.0 # 默认localhost + port: 8001 # 默认9999 + tcp: + host: 0.0.0.0 # 默认localhost + port: 8002 # 默认9999 + +db: + db_host: "127.0.0.1" + db_port: "3306" + db_name: "dianti" + db_user: "root" + db_pass: "root" + table_prefix: "dt_" + time_zone: "Local" + log_level: 4 + slow_threshold: 200 + idle_conns: 10 + open_conns: 50 + + + +#jwt: +# signing_key: iuu +# expires_time: 7d +# buffer_time: 1d +# issuer: iuu.me + + + +#influxdb: +# # host: "http://192.168.0.47:18086" +# # token: "BfG_zW41Wddgbuig0GXt6TuDWpHjUgjTJGDFi9ZI6fOXeYwyWKhakTrwbRT8f4uqFQCWXbRGxs8f5GaChW5tqw==" +# host: "http://192.168.0.9:8086" +# token: "4Sz5qRK-VY0aGh7m7sYDkG5tWCwKjbDYAlkgFpvUcZTBn7XahGqczeye7BQCRjcWP8fqsefPeNkwfvUnqa69oA==" \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..6c8875b --- /dev/null +++ b/config/config.go @@ -0,0 +1,12 @@ +package config + +const ( + AppEnv = "dev" + EnvConfig = "config.yaml" +) + +type Config struct { + IsDebug bool `mapstructure:"is_debug" json:"is_debug" yaml:"is_debug"` + Db DbConf `mapstructure:"db" json:"db" yaml:"db"` + Service ServiceConf `mapstructure:"service" json:"service" yaml:"service"` +} diff --git a/config/db.go b/config/db.go new file mode 100644 index 0000000..93c5f0b --- /dev/null +++ b/config/db.go @@ -0,0 +1,15 @@ +package config + +type DbConf struct { + DbHost string `mapstructure:"db_host" json:"db_host" yaml:"db_host"` + DbPort int `mapstructure:"db_port" json:"db_port" yaml:"db_port"` + DbName string `mapstructure:"db_name" json:"db_name" yaml:"db_name"` + DbUser string `mapstructure:"db_user" json:"db_user" yaml:"db_user"` + DbPass string `mapstructure:"db_pass" json:"db_pass" yaml:"db_pass"` + TablePrefix string `mapstructure:"table_prefix" json:"table_prefix" yaml:"table_prefix"` + TimeZone string `mapstructure:"time_zone" json:"time_zone" yaml:"time_zone"` + LogLevel int `mapstructure:"log_level" json:"log_level" yaml:"log_level"` // LogLevel SQL日志级别 (1-静音 2-错误 3-警告 4-信息) + SlowThreshold int `mapstructure:"slow_threshold" json:"slow_threshold" yaml:"slow_threshold"` // SlowThreshold 慢SQL阈值(毫秒)。慢SQL会在log_level大于等于3时输出。 + IdleConns int `mapstructure:"idle_conns" json:"idle_conns" yaml:"idle_conns"` // 空闲连接池中的最大连接数,建议为open_conns的百分之5-20之间 + OpenConns int `mapstructure:"open_conns" json:"open_conns" yaml:"open_conns"` // 最大打开连接数,建议这里设置为50 +} diff --git a/config/service.go b/config/service.go new file mode 100644 index 0000000..c865470 --- /dev/null +++ b/config/service.go @@ -0,0 +1,15 @@ +package config + +type ServiceConf struct { + Http HttpConf `mapstructure:"http" json:"http" yaml:"http"` + Tcp TcpConf `mapstructure:"tcp" json:"tcp" yaml:"tcp"` +} + +type HttpConf struct { + Host string `mapstructure:"host" json:"host" yaml:"host"` + Port string `mapstructure:"port" json:"port" yaml:"port"` +} +type TcpConf struct { + Host string `mapstructure:"host" json:"host" yaml:"host"` + Port string `mapstructure:"port" json:"port" yaml:"port"` +} diff --git a/core/gorm/gorm.go b/core/gorm/gorm.go new file mode 100644 index 0000000..b242575 --- /dev/null +++ b/core/gorm/gorm.go @@ -0,0 +1,61 @@ +package gorm + +import ( + "DT/global" + "DT/model" + "fmt" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" + "gorm.io/gorm/schema" + "log" + "os" + "time" +) + +func InitGorm() { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=%s", global.AppConf.Db.DbUser, global.AppConf.Db.DbPass, global.AppConf.Db.DbHost, global.AppConf.Db.DbPort, global.AppConf.Db.DbName, global.AppConf.Db.TimeZone) + postgresConfig := mysql.Config{ + DSN: dsn, + } + + newLogger := logger.New( + log.New(os.Stdout, "\r\n", log.LstdFlags), + logger.Config{ + SlowThreshold: time.Duration(global.AppConf.Db.SlowThreshold) * time.Millisecond, + LogLevel: logger.LogLevel(global.AppConf.Db.LogLevel), + IgnoreRecordNotFoundError: true, + Colorful: true, + }) + + MYDB, err := gorm.Open(mysql.New(postgresConfig), &gorm.Config{ + Logger: newLogger, + NamingStrategy: schema.NamingStrategy{ + SingularTable: true, + TablePrefix: global.AppConf.Db.TablePrefix, + }, + }) + + DB := MYDB + + if err != nil { + fmt.Println("[-] 数据库连接失败") + os.Exit(0) + } + sqlDB, _ := DB.DB() + sqlDB.SetMaxIdleConns(global.AppConf.Db.IdleConns) + sqlDB.SetMaxOpenConns(global.AppConf.Db.OpenConns) + sqlDB.SetConnMaxLifetime(time.Hour) + fmt.Println("[+]连接成功!") + global.Db = DB +} + +func AutoMigrate(db *gorm.DB) { + err := db.AutoMigrate( + new(model.Device), + ) + if err != nil { + fmt.Println("[-] 迁移数据表失败:", err.Error()) + os.Exit(0) + } +} diff --git a/core/logger/logger.go b/core/logger/logger.go new file mode 100644 index 0000000..9071076 --- /dev/null +++ b/core/logger/logger.go @@ -0,0 +1,91 @@ +package logger + +import ( + "DT/global" + "fmt" + rotatelogs "github.com/lestrrat-go/file-rotatelogs" + "github.com/sirupsen/logrus" + "runtime" + "strings" + "sync" + "time" +) + +func InitLogger() *global.Logger { + L := logrus.New() + path := "./log/log" + file, _ := rotatelogs.New( + path+".%Y%m%d", + rotatelogs.WithLinkName(path), + rotatelogs.WithMaxAge(time.Duration(100*24)*time.Hour), //自动删除 + rotatelogs.WithRotationTime(time.Duration(24*60)*time.Minute), //分割时间 + ) + L.SetOutput(file) + L.SetFormatter(&logrus.JSONFormatter{}) + Log := &global.Logger{Logger: L, Heads: []string{}} + global.Log = Log + return Log +} + +var errs = &errFunc{lock: new(sync.Mutex)} + +func ErrorRegister(f func(string)) { + errs.register(f, false) +} +func ErrorRegisterOnly1(f func(string)) { + errs.register(f, true) +} +func ErrorWx(e ...string) { + errs.run(strings.Join(e, " ")) + global.Log.Error(e) +} + +type errFunc struct { + f []func(string) + lastTime int64 + lock *sync.Mutex +} + +func (e *errFunc) register(f func(string), only bool) { + if only { + e.f = []func(string){f} + } else { + e.f = append(e.f, f) + } +} + +func (e *errFunc) run(logs string) { + if len(e.f) == 0 { + return + } + if time.Now().Unix()-e.lastTime > 1 { + e.lock.Lock() + e.lastTime = time.Now().Unix() + defer e.lock.Unlock() + for _, v := range e.f { + v(logs) + } + } +} + +func StackSend(skip int, e string) { + e += "\n" + Stack(skip) + if global.AppConf.IsDebug { + fmt.Print(e) + } + errs.run(e) +} + +func Stack(skip int) (re string) { + for i := skip; ; i++ { + _, file, line, ok := runtime.Caller(i) + if !ok { + break + } + if !strings.Contains(file, "local/go/src") && !strings.Contains(file, "/go/pkg") { + logs := fmt.Sprintf("%s:%d\n", file, line) + re += logs + } + } + return +} diff --git a/core/viper/viper.go b/core/viper/viper.go new file mode 100644 index 0000000..a94b88c --- /dev/null +++ b/core/viper/viper.go @@ -0,0 +1,59 @@ +package viper + +import ( + "DT/config" + "DT/global" + "flag" + "fmt" + "github.com/fsnotify/fsnotify" + "github.com/spf13/viper" + "os" +) + +// InitViper 解析yaml格式文件 +func InitViper(path ...string) { + var confPath string + if len(path) == 0 { + flag.StringVar(&confPath, "c", "", "choose config file.") + flag.Parse() + if confPath == "" { + if AppEnv := os.Getenv(config.AppEnv); AppEnv == "" { + confPath = config.EnvConfig + } else { + confPath = AppEnv + } + } else { + } + } else { + confPath = path[0] + } + + v := viper.New() + // 指定配置文件路径 + v.SetConfigFile(confPath) + // 如果配置文件的名称中没有扩展名,则需要配置此项 + v.SetConfigType("yaml") + // 查找并读取配置文件 + err := v.ReadInConfig() + if err != nil { + fmt.Printf("[-]读取配置文件错误: %s \n", err) + os.Exit(0) + } + // 监控并重新读取配置文件 + v.WatchConfig() + v.OnConfigChange(func(e fsnotify.Event) { + if err = v.Unmarshal(&global.AppConf); err != nil { + fmt.Printf("[-]重新解析配置文件失败: %s \n", err) + os.Exit(0) + } + fmt.Println("[+]重新加载配置文件完成") + }) + if err = v.Unmarshal(&global.AppConf); err != nil { + fmt.Printf("[-]解析配置文件失败: %s \n", err) + os.Exit(0) + } + fmt.Println("[+]加载配置文件完成") + + //dump.P(global.AppConf) + +} diff --git a/global/logger.go b/global/logger.go new file mode 100644 index 0000000..93ce028 --- /dev/null +++ b/global/logger.go @@ -0,0 +1,14 @@ +package global + +import "github.com/sirupsen/logrus" + +type Logger struct { + *logrus.Logger + Heads []string +} + +func (l *Logger) AddHead(args ...string) { + for _, v := range args { + l.Heads = append(l.Heads, v) + } +} diff --git a/global/var.go b/global/var.go new file mode 100644 index 0000000..242fd68 --- /dev/null +++ b/global/var.go @@ -0,0 +1,19 @@ +package global + +import ( + "DT/config" + "gorm.io/gorm" +) + +var ( + // AppConf 配置信息 + AppConf *config.Config + // Db 数据库 + Db *gorm.DB + + //InFluxDb influxdb2.Client + + Log *Logger + + //Cron *cron.Cron +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9eb0a65 --- /dev/null +++ b/go.mod @@ -0,0 +1,66 @@ +module DT + +go 1.23.3 + +require ( + github.com/fsnotify/fsnotify v1.8.0 + github.com/gin-gonic/gin v1.10.0 + github.com/gookit/goutil v0.6.18 + github.com/gorilla/websocket v1.5.3 + github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible + github.com/sirupsen/logrus v1.9.3 + github.com/spf13/viper v1.19.0 + gorm.io/driver/mysql v1.5.7 + gorm.io/gorm v1.25.12 +) + +require ( + github.com/bytedance/sonic v1.11.6 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.20.0 // indirect + github.com/go-sql-driver/mysql v1.7.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/gookit/color v1.5.4 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/jonboulle/clockwork v0.4.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/lestrrat-go/strftime v1.1.0 // indirect + github.com/magiconair/properties v1.8.7 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sagikazarmark/locafero v0.4.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/arch v0.8.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d3beaf1 --- /dev/null +++ b/go.sum @@ -0,0 +1,162 @@ +github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= +github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= +github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= +github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= +github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= +github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= +github.com/gookit/goutil v0.6.18 h1:MUVj0G16flubWT8zYVicIuisUiHdgirPAkmnfD2kKgw= +github.com/gookit/goutil v0.6.18/go.mod h1:AY/5sAwKe7Xck+mEbuxj0n/bc3qwrGNe3Oeulln7zBA= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= +github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4= +github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA= +github.com/lestrrat-go/strftime v1.1.0 h1:gMESpZy44/4pXLO/m+sL0yBd1W6LjgjrrD4a68Gapyg= +github.com/lestrrat-go/strftime v1.1.0/go.mod h1:uzeIB52CeUJenCo1syghlugshMysrqUT51HlxphXVeI= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= +github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= +github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= +github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= +github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= +github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= +github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= +github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= +golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo= +gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= +gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/httpserver/server.go b/httpserver/server.go new file mode 100644 index 0000000..a321803 --- /dev/null +++ b/httpserver/server.go @@ -0,0 +1,194 @@ +package httpserver + +import ( + "DT/tcpserver" + "DT/ws" + "context" + "fmt" + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "log" + "net/http" +) + +// Server HTTP服务器结构 +type Server struct { + tcpServer *tcpserver.Server + httpServer *http.Server + router *gin.Engine + hub *ws.Hub +} + +// NewServer 创建一个新的HTTP服务器 +func NewServer(addr string, tcpServer *tcpserver.Server) *Server { + router := gin.Default() + hub := ws.NewHub() + + server := &Server{ + tcpServer: tcpServer, + router: router, + httpServer: &http.Server{ + Addr: addr, + Handler: router, + }, + hub: hub, + } + + // 启动 hub + go hub.Run() + + // 注册路由 + server.registerRoutes() + return server +} + +// 添加请求结构体 +type SendCommandRequest struct { + Imei string `json:"Imei" binding:"required"` + Command string `json:"command" binding:"required"` +} + +// registerRoutes 方法 +func (s *Server) registerRoutes() { + // 获取在线客户端列表 + s.router.GET("/clients", s.handleGetClients) + + // 添加发送命令的路由 + s.router.POST("/clients_send", s.handleSendCommand) + + // WebSocket 路由 - 使用查询参数获取 IMEI + s.router.GET("/ws/:imei", s.handleWebSocket) +} + +// 处理发送命令的方法 +func (s *Server) handleSendCommand(c *gin.Context) { + var req SendCommandRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "msg": "invalid request: " + err.Error(), + }) + return + } + + // 查找目标客户端 + clients := s.tcpServer.GetOnlineClients() + var targetClient *tcpserver.Client + found := false + + for _, client := range clients { + if client["imei"] == req.Imei { + if clientObj, ok := s.tcpServer.GetClient(client["id"].(string)); ok { + targetClient = clientObj + found = true + break + } + } + } + + if !found { + c.JSON(http.StatusNotFound, gin.H{ + "code": 404, + "msg": "client not found", + }) + return + } + + // 发送命令 + _, err := targetClient.Conn.Write(append([]byte(req.Command), '\r', '\n')) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 500, + "msg": "failed to send command: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 200, + "msg": "success", + "data": gin.H{ + "imei": req.Imei, + "command": req.Command, + }, + }) +} + +// handleGetClients 处理获取客户端列表的请求 +func (s *Server) handleGetClients(c *gin.Context) { + clients := s.tcpServer.GetOnlineClients() + c.JSON(http.StatusOK, gin.H{ + "code": 200, + "msg": "success", + "data": gin.H{ + "total": len(clients), + "clients": clients, + }, + }) +} + +// 添加 WebSocket 处理函数 +func (s *Server) handleWebSocket(c *gin.Context) { + imei := c.Param("imei") + //imei := c.Query("imei") + if imei == "" { + log.Printf("IMEI 参数为空") + c.String(http.StatusBadRequest, "IMEI parameter is required") + return + } + + //devId, err := strconv.Atoi(deviceId) + //if err != nil { + // log.Printf("设备ID转换失败: %v", err) + // c.String(http.StatusBadRequest, "Invalid device ID") + // return + //} + + log.Printf("WebSocket 连接请求: imei=%s", imei) + + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + log.Printf("WebSocket 升级失败: %v", err) + return + } + + client := &ws.Client{ + Hub: s.hub, + Conn: conn, + Send: make(chan *ws.WsMessage, 256), + Imei: imei, + //DeviceId: devId, + } + + log.Printf("WebSocket 客户端创建: imei=%s", imei) + client.Hub.Register <- client + + go client.WritePump() + go client.ReadPump() +} + +// Start 启动HTTP服务器 +func (s *Server) Start() error { + return s.httpServer.ListenAndServe() +} + +// Stop 优雅关闭HTTP服务器 +func (s *Server) Stop(ctx context.Context) error { + fmt.Println("Stopping HTTP server...") + if err := s.httpServer.Shutdown(ctx); err != nil { + return fmt.Errorf("HTTP server forced to shutdown: %w", err) + } + fmt.Println("HTTP server stopped gracefully") + return nil +} + +// GetHub 添加获取 Hub 的方法 +func (s *Server) GetHub() *ws.Hub { + return s.hub +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..8aeba83 --- /dev/null +++ b/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "DT/core/gorm" + "DT/core/logger" + "DT/core/viper" + "DT/global" + "DT/httpserver" + "DT/tcpserver" + "context" + "errors" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" +) + +func init() { + viper.InitViper() + logger.InitLogger() + gorm.InitGorm() + //gorm.AutoMigrate(global.Db) +} + +func main() { + handler := &tcpserver.TCPHandler{} + + tcpHost := global.AppConf.Service.Tcp.Host + tcpPort := global.AppConf.Service.Tcp.Port + tcpServer := tcpserver.NewServer(tcpHost+":"+tcpPort, handler.HandleClient) + handler.Server = tcpServer + + httpHost := global.AppConf.Service.Http.Host + httpPort := global.AppConf.Service.Http.Port + httpServer := httpserver.NewServer(httpHost+":"+httpPort, tcpServer) + + handler.Hub = httpServer.GetHub() + + go func() { + if err := tcpServer.Start(); err != nil { + fmt.Printf("TCP server error: %v\n", err) + } + }() + + go func() { + fmt.Printf("Listening and serving HTTP on %s:%s\r\n", httpHost, httpPort) + if err := httpServer.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { + fmt.Printf("HTTP server error: %v\n", err) + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + fmt.Println("Shutting down servers...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := httpServer.Stop(ctx); err != nil { + fmt.Printf("Error stopping HTTP server: %v\n", err) + } + tcpServer.Stop() + fmt.Println("All servers stopped") +} + +// 客户端发送 {"Type":"reg","Imei":"8610123456","Pwd":"123456","Ver":"v1.0.0"} +// 服务器回复 {"Type":"reg","Time":1734948442} + +// 心跳 +// 客户端{"Type":"ping"} +// 服务端{"Type":"pong"} + +// 服务端发 {"Type":"ota","Ip":"192.168.31.1:80","File":"/xxx/1.bin"} +// 服务端发 {\"Type\":\"ota\",\"Ip\":\"192.168.31.1:80\",\"File\":\"/xxx/1.bin\"} +// 客户端回复 {"Type":"ota","State":"1"} + +// 开始实时上传数据 +// 服务端发 {"Type":"start"} +// 客户端回复 {"Type":"start","Data":"{\"Type\":\"ota\",\"Ip\":\"192.168.31.1:80\",\"File\":\"/xxx/1.bin\"}"} +// 客户端回复 {"Type":"start","Data":"{\"Type\":\"ota\",\"Ip\":\"192.168.31.1:80\",\"File\":\"/xxx/1.bin\"}"} + +// 停止实时上传数据 +// 服务端发 {"Type":"stop"} +// 客户端回复 {"Type":"stop","State":"1"} diff --git a/model/device.go b/model/device.go new file mode 100644 index 0000000..6810fda --- /dev/null +++ b/model/device.go @@ -0,0 +1,25 @@ +package model + +import ( + "DT/global" + "gorm.io/gorm" + "time" +) + +type Device struct { + Id int `gorm:"column:id;primaryKey" json:"id"` + //DriverId string `gorm:"column:driver_id;comment:设备ID;type:varchar(255)" json:"driver_id"` + Imei string `gorm:"column:imei;comment:IMEI;type:varchar(255)" json:"imei"` + //DriverName string `gorm:"column:driver_name;comment:设备名称;type:varchar(255)" json:"driver_name"` + DriverPass string `gorm:"column:driver_pass;comment:设备密码;type:varchar(255)" json:"driver_pass"` + DriverVer string `gorm:"column:driver_ver;comment:固件版本;type:varchar(255)" json:"driver_ver"` + //DriverFd string `gorm:"column:driver_fd;comment:设备FD;type:varchar(255)" json:"driver_fd"` + Remark string `gorm:"column:remark;comment:备注;type:varchar(255)" json:"remark"` + Created time.Time `gorm:"column:created;autoCreateTime;comment:创建时间" json:"created"` + Updated time.Time `gorm:"column:updated;autoUpdateTime;comment:修改时间" json:"updated"` + DeletedAt gorm.DeletedAt `gorm:"index;comment:删除时间" json:"-"` +} + +func (r *Device) TableName() string { + return global.AppConf.Db.TablePrefix + "devices" +} diff --git a/repository/device.go b/repository/device.go new file mode 100644 index 0000000..a96b5f4 --- /dev/null +++ b/repository/device.go @@ -0,0 +1,81 @@ +package repository + +import ( + "DT/global" + "DT/model" +) + +type Device struct { +} + +//func (r *Device) GetDeviceList(qr map[string]interface{}) (p []*model.Device, err error) { +// db := global.Db +// for key, value := range qr { +// db = db.Where(key, value) +// } +// err = db.Order("id DESC").Find(&p).Error +// return +//} + +//func (r *Device) GetDevicePage(req *form.DevicePageReq) (count int64, list []*device_model.Device, err error) { +// db := global.Db.Model(&device_model.Device{}) +// //for key, value := range qr { +// // db = db.Where(key, value) +// //} +// //if req.DriverId > 0 { +// // db = db.Where("driver_id = ?", req.DriverId) +// //} +// err = db.Count(&count).Error +// if err != nil { +// return +// } +// //err = db.Offset((req.Page.GetPageIndex() - 1) * req.Page.GetPageSize()).Limit(req.Page.GetPageSize()).Order("id desc").Preload("Rule").Find(&list).Error +// err = db.Offset((req.Page.GetPageIndex() - 1) * req.Page.GetPageSize()).Limit(req.Page.GetPageSize()).Order("id desc").Find(&list).Error +// return +//} + +func (r *Device) CreateDevice(d *model.Device) error { + return global.Db.Create(d).Error +} +func (r *Device) UpdateDevice(d *model.Device) error { + return global.Db.Save(d).Error +} + +func (r *Device) GetDevice(qr map[string]interface{}) (d *model.Device, err error) { + db := global.Db + for key, value := range qr { + db = db.Where(key, value) + } + err = db.First(&d).Error + return +} + +//func (r *User) GetPeakValleyQuarterPage(req *form.PeakValleyQuarterListReq) (count int64, list []*peak_valley_model.PeakValleyQuarter, err error) { +// db := global.Db.Model(&peak_valley_model.PeakValleyQuarter{}) +// +// //for key, value := range qr { +// // db = db.Where(key, value) +// //} +// err = db.Count(&count).Error +// if err != nil { +// return +// } +// err = db.Offset((req.Page.GetPageIndex() - 1) * req.Page.GetPageSize()).Limit(req.Page.GetPageSize()).Order("id desc").Preload("Rule").Find(&list).Error +// return +//} + +//func (r *User) CreatePeakValleyQuarter(d *user_model.User) error { +// return global.Db.Create(d).Error +//} +//func (r *User) UpdatePeakValleyQuarter(d *user_model.User) error { +// return global.Db.Save(d).Error +//} + +//func (r *User) GetUser(qr map[string]interface{}) (d *user_model.User, err error) { +// db := global.Db +// for key, value := range qr { +// db = db.Where(key, value) +// } +// err = db.First(&d).Error +// return +//} diff --git a/repository/enter.go b/repository/enter.go new file mode 100644 index 0000000..d00ebba --- /dev/null +++ b/repository/enter.go @@ -0,0 +1,7 @@ +package repository + +type groupRepository struct { + Device +} + +var GroupRepositorys = new(groupRepository) diff --git a/tcpserver/handler.go b/tcpserver/handler.go new file mode 100644 index 0000000..5407694 --- /dev/null +++ b/tcpserver/handler.go @@ -0,0 +1,267 @@ +package tcpserver + +import ( + "DT/repository" + "DT/ws" + "bufio" + "bytes" + "encoding/json" + "fmt" + "net" + "time" +) + +type TCPHandler struct { + Server *Server + Hub *ws.Hub +} + +func (h *TCPHandler) HandleClient(conn net.Conn) { + reader := bufio.NewReader(conn) + clientID := conn.RemoteAddr().String() + var client *Client + if value, ok := h.Server.GetClient(clientID); ok { + client = value + } else { + fmt.Println("找不到客户端:", clientID) + return + } + + broadcastMessage := func(data []byte) { + if h.Hub != nil { + h.Hub.Broadcast <- &ws.WsMessage{IMEI: client.Imei, Data: string(data)} + } + } + + for { + message, err := reader.ReadBytes('\n') + if err != nil { + fmt.Println("客户端已断开连接:", err) + break + } + + // 去除末尾的换行符 + message = bytes.TrimSpace(message) + fmt.Printf("收到消息来自 %s: %s\n", conn.RemoteAddr(), string(message)) + + if !json.Valid(message) { + fmt.Printf("来自客户端的数据非法 %s\n", conn.RemoteAddr()) + conn.Close() + return + } + + var fullMsg map[string]interface{} + if err := json.Unmarshal(message, &fullMsg); err != nil { + fmt.Printf("Error parsing message: %v\n", err) + continue + } + + msgType, _ := fullMsg["Type"].(string) + if msgType == "" { + fmt.Printf("接收到的消息类型为空 %s\n", conn.RemoteAddr()) + conn.Close() + return + } + + switch msgType { + case "reg": + // 处理登录请求 + if err := h.Server.HandleAuth(client, message); err != nil { + fmt.Printf("客户端授权失败: %v\n", err) + conn.Close() + return + } + fmt.Printf("客户端已授权: %s\n", client.Imei) + // 广播登录消息 + broadcastMessage(message) + + case "ping": + // 处理心跳 + if !client.IsAuth { + fmt.Printf("来自未授权客户端的心跳 %s\n", conn.RemoteAddr()) + conn.Close() + return + } + if err := h.Server.HandleHeartbeat(client, message); err != nil { + fmt.Printf("心跳错误: %v\n", err) + continue + } + // 广播心跳消息 + broadcastMessage(message) + + case "ota": + // 处理 OTA + if !client.IsAuth { + fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) + conn.Close() + return + } + if err := h.Server.HandleOta(client, message); err != nil { + fmt.Printf("OTA 错误: %v\n", err) + continue + } + // 广播 OTA 消息 + broadcastMessage(message) + + case "start": + // 处理 客户端实时上报数据 + if !client.IsAuth { + fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) + conn.Close() + return + } + if err := h.Server.RealTimeReporting(client, message); err != nil { + fmt.Printf("OTA 错误: %v\n", err) + continue + } + // 广播 OTA 消息 + broadcastMessage(message) + + default: + // 处理其他消息类型 + if !client.IsAuth { + fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) + conn.Close() + return + } + // 广播其他类型的消息 + broadcastMessage(message) + } + } +} + +func (s *Server) HandleHeartbeat(client *Client, message []byte) error { + if !client.IsAuth { + return fmt.Errorf("unauthorized") + } + + var msg Message + if err := json.Unmarshal(message, &msg); err != nil { + return err + } + + if msg.Type == "ping" { + client.LastPing = time.Now() + response := Message{ + MessageType: MessageType{Type: "pong"}, + } + + responseData, err := json.Marshal(response) + if err != nil { + return err + } + _, err = client.Conn.Write(append(responseData, '\r', '\n')) + return err + } + return nil +} + +func (s *Server) HandleAuth(client *Client, message []byte) error { + var msg Message + if err := json.Unmarshal(message, &msg); err != nil { + return err + } + if msg.Type != "reg" { + return fmt.Errorf("unauthorized") + } + + model, err := repository.GroupRepositorys.Device.GetDevice(map[string]interface{}{"imei": msg.Imei}) + if err != nil { + return fmt.Errorf("设备不存在") + } + if msg.Pwd != model.DriverPass { + return fmt.Errorf("设备密码不正确") + } + + // 更新版本号 + model.DriverVer = msg.Ver + err = repository.GroupRepositorys.Device.UpdateDevice(model) + if err != nil { + return fmt.Errorf("更新设备版本号失败") + } + + // 认证成功,停止登录超时定时器 + if client.authTimer != nil { + client.authTimer.Stop() + client.authTimer = nil + } + + // 认证成功 + client.Imei = msg.Imei + client.IsAuth = true + + // 发送响应 + response := Message{ + MessageType: MessageType{Type: "reg"}, + MessageTime: MessageTime{Time: time.Now().Unix()}, + } + responseData, err := json.Marshal(response) + if err != nil { + return err + } + + _, err = client.Conn.Write(append(responseData, '\r', '\n')) + return err +} + +func (s *Server) HandleOta(client *Client, message []byte) error { + var msg Message + if err := json.Unmarshal(message, &msg); err != nil { + return err + } + if msg.Type != "ota" { + return fmt.Errorf("unauthorized") + } + + fmt.Printf("设备升级结果:%s\r\n", msg.State) + + //model, err := repository.GroupRepositorys.Device.GetDevice(map[string]interface{}{"imei": msg.Imei}) + //if err != nil { + // return fmt.Errorf("设备不存在") + //} + //if msg.Pwd != model.DriverPass { + // return fmt.Errorf("设备密码不正确") + //} + + // 更新版本号 + //model.DriverVer = msg.Ver + //err = repository.GroupRepositorys.Device.UpdateDevice(model) + //if err != nil { + // return fmt.Errorf("更新设备版本号失败") + //} + + // 认证成功,停止登录超时定时器 + //if client.authTimer != nil { + // client.authTimer.Stop() + // client.authTimer = nil + //} + + // 认证成功 + //client.Imei = msg.Imei + //client.IsAuth = true + + // 发送响应 + //response := Message{ + // MessageType: MessageType{Type: "reg"}, + // MessageTime: MessageTime{Time: time.Now().Unix()}, + //} + //responseData, err := json.Marshal(response) + //if err != nil { + // return err + //} + // + //_, err = client.Conn.Write(append(responseData, '\r', '\n')) + return nil +} + +func (s *Server) RealTimeReporting(client *Client, message []byte) error { + var msg Message + if err := json.Unmarshal(message, &msg); err != nil { + return err + } + if msg.Type != "start" { + return fmt.Errorf("unauthorized") + } + fmt.Printf("设备实时上报数据:%s\r\n", msg.Data) + return nil +} diff --git a/tcpserver/message.go b/tcpserver/message.go new file mode 100644 index 0000000..a994051 --- /dev/null +++ b/tcpserver/message.go @@ -0,0 +1,35 @@ +package tcpserver + +type MessageType struct { + Type string `json:"Type"` +} + +type MessagePassword struct { + Pwd string `json:"Pwd,omitempty"` +} +type MessageImei struct { + Imei string `json:"Imei,omitempty"` +} +type MessageVer struct { + Ver string `json:"Ver,omitempty"` +} +type MessageTime struct { + Time int64 `json:"Time,omitempty"` +} +type MessageState struct { + State string `json:"State,omitempty"` +} + +type MessageData struct { + Data string `json:"Data,omitempty"` +} + +type Message struct { + MessageType + MessageImei + MessagePassword + MessageVer + MessageTime + MessageState + MessageData +} diff --git a/tcpserver/tcpserver.go b/tcpserver/tcpserver.go new file mode 100644 index 0000000..44ec66a --- /dev/null +++ b/tcpserver/tcpserver.go @@ -0,0 +1,225 @@ +package tcpserver + +import ( + "fmt" + "net" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +// Server 定义 TCP 服务器结构 +type Server struct { + Address string // 监听地址 + Handler func(net.Conn) // 客户端连接处理函数 + listener net.Listener // TCP 监听器 + connections sync.Map // 活跃的客户端连接 + wg sync.WaitGroup // 等待所有 Goroutine 完成 + stopChan chan struct{} // 关闭信号 + + clients map[string]*Client + + clientsMux sync.RWMutex + stopOnce sync.Once // 添加 sync.Once 来确保 Stop 只被执行一次 +} + +// Client 定义客户端结构 +type Client struct { + ID string + Imei string // 添加 IMEI + Conn net.Conn + ConnectedAt time.Time + LastPing time.Time + Done chan struct{} + IsAuth bool // 添加认证状态 + authTimer *time.Timer // 添加登录超时定时器 +} + +// NewServer 创建一个新的 TCP 服务器 +func NewServer(address string, handler func(net.Conn)) *Server { + return &Server{ + Address: address, + Handler: handler, + stopChan: make(chan struct{}), + clients: make(map[string]*Client), + } +} + +// Start 启动服务器 +func (s *Server) Start() error { + var err error + s.listener, err = net.Listen("tcp", s.Address) + if err != nil { + return fmt.Errorf("failed to start server: %w", err) + } + + fmt.Println("Listening and serving TCP on", s.Address) + + // 捕获终止信号 + go s.handleShutdown() + + for { + // 非阻塞检查关闭信号 + select { + case <-s.stopChan: + return nil + default: + } + + // 接受新连接 + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.stopChan: + return nil // 服务器已停止 + default: + fmt.Println("Error accepting connection:", err) + continue + } + } + + // 记录活跃连接并添加到 clients 映射 + s.connections.Store(conn.RemoteAddr(), conn) + client := s.addClient(conn) + fmt.Printf("客户端已连接: %s\n", client.ID) + + // 使用 Goroutine 处理客户端 + s.wg.Add(1) + go func(c net.Conn, clientID string) { + defer s.wg.Done() + defer func() { + s.connections.Delete(c.RemoteAddr()) + s.removeClient(clientID) + c.Close() + fmt.Printf("客户端已断开连接: %s\n", clientID) + }() + s.Handler(c) + }(conn, client.ID) + } +} + +// handleShutdown 处理优雅关闭 +func (s *Server) handleShutdown() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + + <-c + fmt.Println("\nReceived shutdown signal...") + s.Stop() +} + +// GetOnlineClients 获取所有在线客户端信息 +func (s *Server) GetOnlineClients() []map[string]interface{} { + s.clientsMux.RLock() + defer s.clientsMux.RUnlock() + + clients := make([]map[string]interface{}, 0, len(s.clients)) + for _, client := range s.clients { + clientInfo := map[string]interface{}{ + "id": client.ID, + "imei": client.Imei, + "addr": client.Conn.RemoteAddr().String(), + "connected_at": client.ConnectedAt, + "last_ping": client.LastPing, + "is_auth": client.IsAuth, + } + clients = append(clients, clientInfo) + } + return clients +} + +// addClient 添加新客户端 +func (s *Server) addClient(conn net.Conn) *Client { + s.clientsMux.Lock() + defer s.clientsMux.Unlock() + + client := &Client{ + ID: conn.RemoteAddr().String(), + Conn: conn, + ConnectedAt: time.Now(), + LastPing: time.Now(), + Done: make(chan struct{}), + } + s.clients[client.ID] = client + + // 启动心跳检测 + go client.startHeartbeat(s) + + // 添加登录超时检测 + client.authTimer = time.AfterFunc(time.Minute, func() { + if !client.IsAuth { + fmt.Printf("Client %s authentication timeout\n", client.ID) + client.Conn.Close() // 强制关闭连接 + } + }) + + return client +} + +// removeClient 移除客户端 +func (s *Server) removeClient(id string) { + s.clientsMux.Lock() + defer s.clientsMux.Unlock() + if client, ok := s.clients[id]; ok { + if client.authTimer != nil { + client.authTimer.Stop() + client.authTimer = nil + } + close(client.Done) // 停止心跳检测 + delete(s.clients, id) + } +} + +// Stop 添加一个新方法用于外部调用关闭服务 +func (s *Server) Stop() { + s.stopOnce.Do(func() { + fmt.Println("Stopping TCP server...") + + // 关闭监听器 + close(s.stopChan) + if s.listener != nil { + s.listener.Close() + } + + // 关闭所有连接 + s.clientsMux.Lock() + for id, client := range s.clients { + client.Conn.Close() + delete(s.clients, id) + } + s.clientsMux.Unlock() + + // 等待所有 Goroutine 完成 + s.wg.Wait() + fmt.Println("TCP server stopped.") + }) +} + +// 添加心跳检测方法 +func (c *Client) startHeartbeat(s *Server) { + ticker := time.NewTicker(20 * time.Second) // 每10秒检查一次 + defer ticker.Stop() + + for { + select { + case <-c.Done: + return + case <-ticker.C: + if time.Since(c.LastPing) > 120*time.Second { + fmt.Printf("客户端 %s 心跳超时 \n", c.ID) + c.Conn.Close() // 强制关闭连接 + return + } + } + } +} + +// GetClient 获取指定ID的客户端 +func (s *Server) GetClient(id string) (*Client, bool) { + s.clientsMux.RLock() + defer s.clientsMux.RUnlock() + client, ok := s.clients[id] + return client, ok +} diff --git a/ws/client.go b/ws/client.go new file mode 100644 index 0000000..fcc0d6f --- /dev/null +++ b/ws/client.go @@ -0,0 +1,116 @@ +package ws + +import ( + "encoding/json" + "github.com/gorilla/websocket" + "log" + "time" +) + +const ( + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + maxMessageSize = 512 +) + +// Client WebSocket 客户端连接 +type Client struct { + Hub *Hub + Conn *websocket.Conn + Send chan *WsMessage + Imei string + DeviceId int +} + +func (c *Client) ReadPump() { + defer func() { + c.Hub.Unregister <- c + c.Conn.Close() + }() + + c.Conn.SetReadLimit(maxMessageSize) + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + c.Conn.SetPongHandler(func(string) error { + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + _, message, err := c.Conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("读取错误: %v", err) + } + return + } + + // 检查是否是字符串 "PING" + if string(message) == "PING" { + c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.Conn.WriteMessage(websocket.TextMessage, []byte("PONG")) + if err != nil { + log.Printf("发送 PONG 响应失败: %v", err) + return + } + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + continue + } + + // 处理 JSON 消息 + var msg struct { + Type string `json:"Type"` + } + if err := json.Unmarshal(message, &msg); err == nil && msg.Type == "ping" { + c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) + response := struct { + Type string `json:"Type"` + }{ + Type: "pong", + } + responseData, _ := json.Marshal(response) + err := c.Conn.WriteMessage(websocket.TextMessage, responseData) + if err != nil { + log.Printf("发送 pong 响应失败: %v", err) + return + } + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + } + } +} + +func (c *Client) WritePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.Conn.Close() + }() + + for { + select { + case message, ok := <-c.Send: + if !ok { + c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) + log.Printf("准备发送消息到 WebSocket: IMEI=%s, Data=%s", message.IMEI, message.Data) + + // 直接发送原始消息字符串 + err := c.Conn.WriteMessage(websocket.TextMessage, []byte(message.Data)) + if err != nil { + log.Printf("发送消息失败: %v", err) + return + } + log.Printf("消息发送成功: IMEI=%s", message.IMEI) + + case <-ticker.C: + c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { + log.Printf("发送 ping 失败: %v", err) + return + } + } + } +} diff --git a/ws/hub.go b/ws/hub.go new file mode 100644 index 0000000..0f6ea06 --- /dev/null +++ b/ws/hub.go @@ -0,0 +1,88 @@ +package ws + +import ( + "log" + "sync" +) + +// Hub 维护活动客户端的集合并广播消息 +type Hub struct { + // 按 IMEI 分组的客户端连接 + Clients map[string]map[*Client]bool + + // 广播消息的通道(添加 IMEI) + Broadcast chan *WsMessage + + // 注册请求 + Register chan *Client + + // 注销请求 + Unregister chan *Client + + // 互斥锁保护 clients map + mu sync.RWMutex +} + +// Message WebSocket 消息结构 +type WsMessage struct { + IMEI string `json:"imei"` + Data string `json:"data"` +} + +func NewHub() *Hub { + return &Hub{ + Broadcast: make(chan *WsMessage), + Register: make(chan *Client), + Unregister: make(chan *Client), + Clients: make(map[string]map[*Client]bool), + } +} + +func (h *Hub) Run() { + for { + select { + case client := <-h.Register: + h.mu.Lock() + if _, ok := h.Clients[client.Imei]; !ok { + h.Clients[client.Imei] = make(map[*Client]bool) + } + h.Clients[client.Imei][client] = true + log.Printf("客户端注册: IMEI=%s", client.Imei) + h.mu.Unlock() + + case client := <-h.Unregister: + h.mu.Lock() + if _, ok := h.Clients[client.Imei]; ok { + if _, ok := h.Clients[client.Imei][client]; ok { + delete(h.Clients[client.Imei], client) + close(client.Send) + if len(h.Clients[client.Imei]) == 0 { + delete(h.Clients, client.Imei) + } + log.Printf("客户端注销: IMEI=%s", client.Imei) + } + } + h.mu.Unlock() + + case broadcast := <-h.Broadcast: + h.mu.RLock() + log.Printf("收到广播消息: IMEI=%s, Data=%s", broadcast.IMEI, broadcast.Data) + if clients, ok := h.Clients[broadcast.IMEI]; ok { + log.Printf("找到目标客户端组: IMEI=%s, 客户端数=%d", broadcast.IMEI, len(clients)) + for client := range clients { + select { + case client.Send <- broadcast: + log.Printf("消息已发送到客户端: IMEI=%s", client.Imei) + default: + log.Printf("发送失败,关闭客户端: IMEI=%s", client.Imei) + close(client.Send) + delete(clients, client) + } + } + } else { + log.Printf("未找到目标客户端组: IMEI=%s", broadcast.IMEI) + } + h.mu.RUnlock() + } + } +}