新增处理

This commit is contained in:
2024-12-24 14:07:17 +08:00
parent 6e4b37776b
commit 2a2e15c3d0
5 changed files with 36 additions and 121 deletions

View File

@@ -6,14 +6,7 @@ import (
) )
var ( var (
// AppConf 配置信息
AppConf *config.Config AppConf *config.Config
// Db 数据库
Db *gorm.DB Db *gorm.DB
//InFluxDb influxdb2.Client
Log *Logger Log *Logger
//Cron *cron.Cron
) )

View File

@@ -8,12 +8,9 @@ import (
type Device struct { type Device struct {
Id int `gorm:"column:id;primaryKey" json:"id"` Id int `gorm:"column:id;primaryKey" json:"id"`
//DriverId string `gorm:"column:driver_id;comment:设备ID;type:varchar(255)" json:"driver_id"`
Imei string `gorm:"column:imei;comment:IMEI;type:varchar(255)" json:"imei"` Imei string `gorm:"column:imei;comment:IMEI;type:varchar(255)" json:"imei"`
//DriverName string `gorm:"column:driver_name;comment:设备名称;type:varchar(255)" json:"driver_name"`
DriverPass string `gorm:"column:driver_pass;comment:设备密码;type:varchar(255)" json:"driver_pass"` DriverPass string `gorm:"column:driver_pass;comment:设备密码;type:varchar(255)" json:"driver_pass"`
DriverVer string `gorm:"column:driver_ver;comment:固件版本;type:varchar(255)" json:"driver_ver"` DriverVer string `gorm:"column:driver_ver;comment:固件版本;type:varchar(255)" json:"driver_ver"`
//DriverFd string `gorm:"column:driver_fd;comment:设备FD;type:varchar(255)" json:"driver_fd"`
Remark string `gorm:"column:remark;comment:备注;type:varchar(255)" json:"remark"` Remark string `gorm:"column:remark;comment:备注;type:varchar(255)" json:"remark"`
Created time.Time `gorm:"column:created;autoCreateTime;comment:创建时间" json:"created"` Created time.Time `gorm:"column:created;autoCreateTime;comment:创建时间" json:"created"`
Updated time.Time `gorm:"column:updated;autoUpdateTime;comment:修改时间" json:"updated"` Updated time.Time `gorm:"column:updated;autoUpdateTime;comment:修改时间" json:"updated"`

View File

@@ -35,28 +35,22 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
} }
for { for {
//message, err := reader.ReadBytes('\n')
//if err != nil {
// fmt.Println("客户端已断开连接:", err)
// break
//}
message, err := readUntilDelimiter(reader, []byte("\r\n")) message, err := readUntilDelimiter(reader, []byte("\r\n"))
if err != nil { if err != nil {
fmt.Println("Error reading message:", err) fmt.Println("Error reading message:", err)
break break
} }
// 去除末尾的换行符
message = bytes.TrimSpace(message)
message = bytes.TrimSpace(message)
output := strings.ReplaceAll(string(message), "\t", "") output := strings.ReplaceAll(string(message), "\t", "")
output = strings.ReplaceAll(output, "\n", "") output = strings.ReplaceAll(output, "\n", "")
message = []byte(output) message = []byte(output)
fmt.Printf("收到消息来自 %s: %s\n", conn.RemoteAddr(), message) fmt.Printf("收到消息来自 %s: %s\n", conn.RemoteAddr(), message)
//if !json.Valid(message) { if !json.Valid(message) {
// fmt.Printf("来自客户端的数据非法 %s\n", conn.RemoteAddr()) fmt.Printf("来自客户端的数据非法 %s\n", conn.RemoteAddr())
// conn.Close() conn.Close()
// return return
//} }
var fullMsg map[string]interface{} var fullMsg map[string]interface{}
if err := json.Unmarshal(message, &fullMsg); err != nil { if err := json.Unmarshal(message, &fullMsg); err != nil {
@@ -73,18 +67,15 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
switch msgType { switch msgType {
case "reg": case "reg":
// 处理登录请求
if err := h.Server.HandleAuth(client, message); err != nil { if err := h.Server.HandleAuth(client, message); err != nil {
fmt.Printf("客户端授权失败: %v\n", err) fmt.Printf("客户端授权失败: %v\n", err)
conn.Close() conn.Close()
return return
} }
fmt.Printf("客户端已授权: %s\n", client.Imei) fmt.Printf("客户端已授权: %s\n", client.Imei)
// 广播登录消息
broadcastMessage(message) broadcastMessage(message)
case "ping": case "ping":
// 处理心跳
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("来自未授权客户端的心跳 %s\n", conn.RemoteAddr()) fmt.Printf("来自未授权客户端的心跳 %s\n", conn.RemoteAddr())
conn.Close() conn.Close()
@@ -94,11 +85,9 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
fmt.Printf("心跳错误: %v\n", err) fmt.Printf("心跳错误: %v\n", err)
continue continue
} }
// 广播心跳消息
broadcastMessage(message) broadcastMessage(message)
case "ota": case "ota":
// 处理 OTA
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close() conn.Close()
@@ -108,11 +97,9 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
fmt.Printf("OTA 错误: %v\n", err) fmt.Printf("OTA 错误: %v\n", err)
continue continue
} }
// 广播 OTA 消息
broadcastMessage(message) broadcastMessage(message)
case "start": case "start":
// 处理 客户端实时上报数据
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close() conn.Close()
@@ -122,11 +109,9 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
fmt.Printf("OTA 错误: %v\n", err) fmt.Printf("OTA 错误: %v\n", err)
continue continue
} }
// 广播 OTA 消息
broadcastMessage(message) broadcastMessage(message)
case "stop": case "stop":
// 处理 客户端停止实时上报数据
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close() conn.Close()
@@ -136,11 +121,9 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
fmt.Printf("客户端停止实时上报数据 错误: %v\n", err) fmt.Printf("客户端停止实时上报数据 错误: %v\n", err)
continue continue
} }
// 广播 OTA 消息
broadcastMessage(message) broadcastMessage(message)
case "up": case "up":
// 处理 客户端定时上报数据
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close() conn.Close()
@@ -150,17 +133,14 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
fmt.Printf("客户端定时上报数据 错误: %v\n", err) fmt.Printf("客户端定时上报数据 错误: %v\n", err)
continue continue
} }
// 广播 OTA 消息
broadcastMessage(message) broadcastMessage(message)
default: default:
// 处理其他消息类型
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr()) fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close() conn.Close()
return return
} }
// 广播其他类型的消息
broadcastMessage(message) broadcastMessage(message)
} }
} }
@@ -209,24 +189,20 @@ func (s *Server) HandleAuth(client *Client, message []byte) error {
return fmt.Errorf("设备密码不正确") return fmt.Errorf("设备密码不正确")
} }
// 更新版本号
model.DriverVer = msg.Ver model.DriverVer = msg.Ver
err = repository.GroupRepositorys.Device.UpdateDevice(model) err = repository.GroupRepositorys.Device.UpdateDevice(model)
if err != nil { if err != nil {
return fmt.Errorf("更新设备版本号失败") return fmt.Errorf("更新设备版本号失败")
} }
// 认证成功,停止登录超时定时器
if client.authTimer != nil { if client.authTimer != nil {
client.authTimer.Stop() client.authTimer.Stop()
client.authTimer = nil client.authTimer = nil
} }
// 认证成功
client.Imei = msg.Imei client.Imei = msg.Imei
client.IsAuth = true client.IsAuth = true
// 发送响应
response := Message{ response := Message{
MessageType: MessageType{Type: "reg"}, MessageType: MessageType{Type: "reg"},
MessageTime: MessageTime{Time: time.Now().Unix()}, MessageTime: MessageTime{Time: time.Now().Unix()},
@@ -248,45 +224,7 @@ func (s *Server) HandleOta(client *Client, message []byte) error {
if msg.Type != "ota" { if msg.Type != "ota" {
return fmt.Errorf("unauthorized") return fmt.Errorf("unauthorized")
} }
fmt.Printf("设备升级结果:%s\r\n", msg.State) fmt.Printf("设备升级结果:%s\r\n", msg.State)
//model, err := repository.GroupRepositorys.Device.GetDevice(map[string]interface{}{"imei": msg.Imei})
//if err != nil {
// return fmt.Errorf("设备不存在")
//}
//if msg.Pwd != model.DriverPass {
// return fmt.Errorf("设备密码不正确")
//}
// 更新版本号
//model.DriverVer = msg.Ver
//err = repository.GroupRepositorys.Device.UpdateDevice(model)
//if err != nil {
// return fmt.Errorf("更新设备版本号失败")
//}
// 认证成功,停止登录超时定时器
//if client.authTimer != nil {
// client.authTimer.Stop()
// client.authTimer = nil
//}
// 认证成功
//client.Imei = msg.Imei
//client.IsAuth = true
// 发送响应
//response := Message{
// MessageType: MessageType{Type: "reg"},
// MessageTime: MessageTime{Time: time.Now().Unix()},
//}
//responseData, err := json.Marshal(response)
//if err != nil {
// return err
//}
//
//_, err = client.Conn.Write(append(responseData, '\r', '\n'))
return nil return nil
} }

View File

@@ -3,7 +3,6 @@ package tcpserver
type MessageType struct { type MessageType struct {
Type string `json:"Type"` Type string `json:"Type"`
} }
type MessagePassword struct { type MessagePassword struct {
Pwd string `json:"Pwd,omitempty"` Pwd string `json:"Pwd,omitempty"`
} }
@@ -24,12 +23,6 @@ type MessageData struct {
UpDataStruct UpDataStruct
} }
type UpDataStruct struct {
Sum int `json:"sum"`
Time int `json:"time"`
Mile int `json:"mile"`
}
type Message struct { type Message struct {
MessageType MessageType
MessageImei MessageImei
@@ -39,3 +32,9 @@ type Message struct {
MessageState MessageState
MessageData `json:"Data"` MessageData `json:"Data"`
} }
type UpDataStruct struct {
Sum int `json:"sum"`
Time int `json:"time"`
Mile int `json:"mile"`
}

View File

@@ -12,31 +12,30 @@ import (
"time" "time"
) )
// Server 定义 TCP 服务器结构
type Server struct { type Server struct {
Address string // 监听地址 Address string
Handler func(net.Conn) // 客户端连接处理函数 Handler func(net.Conn)
listener net.Listener // TCP 监听器 listener net.Listener
connections sync.Map // 活跃的客户端连接 connections sync.Map
wg sync.WaitGroup // 等待所有 Goroutine 完成 wg sync.WaitGroup
stopChan chan struct{} // 关闭信号 stopChan chan struct{}
clients map[string]*Client clients map[string]*Client
clientsMux sync.RWMutex clientsMux sync.RWMutex
stopOnce sync.Once // 添加 sync.Once 来确保 Stop 只被执行一次 stopOnce sync.Once
} }
// Client 定义客户端结构 // Client 定义客户端结构
type Client struct { type Client struct {
ID string ID string
Imei string // 添加 IMEI Imei string
Conn net.Conn Conn net.Conn
ConnectedAt time.Time ConnectedAt time.Time
LastPing time.Time LastPing time.Time
Done chan struct{} Done chan struct{}
IsAuth bool // 添加认证状态 IsAuth bool
authTimer *time.Timer // 添加登录超时定时器 authTimer *time.Timer
} }
// NewServer 创建一个新的 TCP 服务器 // NewServer 创建一个新的 TCP 服务器
@@ -49,7 +48,6 @@ func NewServer(address string, handler func(net.Conn)) *Server {
} }
} }
// Start 启动服务器
func (s *Server) Start() error { func (s *Server) Start() error {
var err error var err error
s.listener, err = net.Listen("tcp", s.Address) s.listener, err = net.Listen("tcp", s.Address)
@@ -59,35 +57,31 @@ func (s *Server) Start() error {
fmt.Println("Listening and serving TCP on", s.Address) fmt.Println("Listening and serving TCP on", s.Address)
// 捕获终止信号
go s.handleShutdown() go s.handleShutdown()
for { for {
// 非阻塞检查关闭信号
select { select {
case <-s.stopChan: case <-s.stopChan:
return nil return nil
default: default:
} }
// 接受新连接
conn, err := s.listener.Accept() conn, err := s.listener.Accept()
if err != nil { if err != nil {
select { select {
case <-s.stopChan: case <-s.stopChan:
return nil // 服务器已停止 return nil
default: default:
fmt.Println("Error accepting connection:", err) fmt.Println("Error accepting connection:", err)
continue continue
} }
} }
// 记录活跃连接并添加到 clients 映射
s.connections.Store(conn.RemoteAddr(), conn) s.connections.Store(conn.RemoteAddr(), conn)
client := s.addClient(conn) client := s.addClient(conn)
fmt.Printf("客户端已连接: %s\n", client.ID) fmt.Printf("客户端已连接: %s\n", client.ID)
// 使用 Goroutine 处理客户端
s.wg.Add(1) s.wg.Add(1)
go func(c net.Conn, clientID string) { go func(c net.Conn, clientID string) {
defer s.wg.Done() defer s.wg.Done()
@@ -112,7 +106,6 @@ func (s *Server) handleShutdown() {
s.Stop() s.Stop()
} }
// GetOnlineClients 获取所有在线客户端信息
func (s *Server) GetOnlineClients() []map[string]interface{} { func (s *Server) GetOnlineClients() []map[string]interface{} {
s.clientsMux.RLock() s.clientsMux.RLock()
defer s.clientsMux.RUnlock() defer s.clientsMux.RUnlock()
@@ -146,10 +139,8 @@ func (s *Server) addClient(conn net.Conn) *Client {
} }
s.clients[client.ID] = client s.clients[client.ID] = client
// 启动心跳检测
go client.startHeartbeat(s) go client.startHeartbeat(s)
// 添加登录超时检测
client.authTimer = time.AfterFunc(time.Minute, func() { client.authTimer = time.AfterFunc(time.Minute, func() {
if !client.IsAuth { if !client.IsAuth {
fmt.Printf("Client %s authentication timeout\n", client.ID) fmt.Printf("Client %s authentication timeout\n", client.ID)
@@ -169,7 +160,7 @@ func (s *Server) removeClient(id string) {
client.authTimer.Stop() client.authTimer.Stop()
client.authTimer = nil client.authTimer = nil
} }
close(client.Done) // 停止心跳检测 close(client.Done)
delete(s.clients, id) delete(s.clients, id)
} }
} }
@@ -179,13 +170,11 @@ func (s *Server) Stop() {
s.stopOnce.Do(func() { s.stopOnce.Do(func() {
fmt.Println("Stopping TCP server...") fmt.Println("Stopping TCP server...")
// 关闭监听器
close(s.stopChan) close(s.stopChan)
if s.listener != nil { if s.listener != nil {
s.listener.Close() s.listener.Close()
} }
// 关闭所有连接
s.clientsMux.Lock() s.clientsMux.Lock()
for id, client := range s.clients { for id, client := range s.clients {
client.Conn.Close() client.Conn.Close()
@@ -193,7 +182,6 @@ func (s *Server) Stop() {
} }
s.clientsMux.Unlock() s.clientsMux.Unlock()
// 等待所有 Goroutine 完成
s.wg.Wait() s.wg.Wait()
fmt.Println("TCP server stopped.") fmt.Println("TCP server stopped.")
}) })
@@ -211,7 +199,7 @@ func (c *Client) startHeartbeat(s *Server) {
case <-ticker.C: case <-ticker.C:
if time.Since(c.LastPing) > 120*time.Second { if time.Since(c.LastPing) > 120*time.Second {
fmt.Printf("客户端 %s 心跳超时 \n", c.ID) fmt.Printf("客户端 %s 心跳超时 \n", c.ID)
c.Conn.Close() // 强制关闭连接 c.Conn.Close()
return return
} }
} }