新增处理

This commit is contained in:
2024-12-24 13:44:52 +08:00
parent 78d1dfeeca
commit 6e4b37776b
6 changed files with 108 additions and 17 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
u_boot

13
main.go
View File

@@ -21,7 +21,7 @@ func init() {
viper.InitViper() viper.InitViper()
logger.InitLogger() logger.InitLogger()
gorm.InitGorm() gorm.InitGorm()
//gorm.AutoMigrate(global.Db) gorm.AutoMigrate(global.Db)
} }
func main() { func main() {
@@ -77,10 +77,15 @@ func main() {
// 客户端回复 {"Type":"ota","State":"1"} // 客户端回复 {"Type":"ota","State":"1"}
// 开始实时上传数据 // 开始实时上传数据
// 服务端发 {"Type":"start"} // 服务端发 {\"Type\":\"start\"}
// 客户端回复 {"Type":"start","Data":"{\"Type\":\"ota\",\"Ip\":\"192.168.31.1:80\",\"File\":\"/xxx/1.bin\"}"}
// 客户端回复 {"Type":"start","Data":"{\"Type\":\"ota\",\"Ip\":\"192.168.31.1:80\",\"File\":\"/xxx/1.bin\"}"} // 客户端回复 {"Type":"start","Data":"{\"Type\":\"ota\",\"Ip\":\"192.168.31.1:80\",\"File\":\"/xxx/1.bin\"}"}
// 停止实时上传数据 // 停止实时上传数据
// 服务端发 {"Type":"stop"} // 服务端发 {\"Type\":\"stop\"}
// 客户端回复 {"Type":"stop","State":"1"} // 客户端回复 {"Type":"stop","State":"1"}
// 修改IP端口
// 服务端发 {\"Type\":\"server\",\"Ip\":\"1222\",\"Port\":\"1222\"}
// 客户端回复 {"Type":"server","State":"1"}
//{\n\t"Type":"ping"}

View File

@@ -8,6 +8,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net" "net"
"strings"
"time" "time"
) )
@@ -34,21 +35,28 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
} }
for { for {
message, err := reader.ReadBytes('\n') //message, err := reader.ReadBytes('\n')
//if err != nil {
// fmt.Println("客户端已断开连接:", err)
// break
//}
message, err := readUntilDelimiter(reader, []byte("\r\n"))
if err != nil { if err != nil {
fmt.Println("客户端已断开连接:", err) fmt.Println("Error reading message:", err)
break break
} }
// 去除末尾的换行符 // 去除末尾的换行符
message = bytes.TrimSpace(message) message = bytes.TrimSpace(message)
fmt.Printf("收到消息来自 %s: %s\n", conn.RemoteAddr(), string(message))
if !json.Valid(message) { output := strings.ReplaceAll(string(message), "\t", "")
fmt.Printf("来自客户端的数据非法 %s\n", conn.RemoteAddr()) output = strings.ReplaceAll(output, "\n", "")
conn.Close() message = []byte(output)
return fmt.Printf("收到消息来自 %s: %s\n", conn.RemoteAddr(), message)
} //if !json.Valid(message) {
// fmt.Printf("来自客户端的数据非法 %s\n", conn.RemoteAddr())
// conn.Close()
// return
//}
var fullMsg map[string]interface{} var fullMsg map[string]interface{}
if err := json.Unmarshal(message, &fullMsg); err != nil { if err := json.Unmarshal(message, &fullMsg); err != nil {
@@ -117,6 +125,34 @@ func (h *TCPHandler) HandleClient(conn net.Conn) {
// 广播 OTA 消息 // 广播 OTA 消息
broadcastMessage(message) broadcastMessage(message)
case "stop":
// 处理 客户端停止实时上报数据
if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close()
return
}
if err := h.Server.StopRealTimeReporting(client, message); err != nil {
fmt.Printf("客户端停止实时上报数据 错误: %v\n", err)
continue
}
// 广播 OTA 消息
broadcastMessage(message)
case "up":
// 处理 客户端定时上报数据
if !client.IsAuth {
fmt.Printf("来自未授权客户端的消息 %s\n", conn.RemoteAddr())
conn.Close()
return
}
if err := h.Server.TimingReporting(client, message); err != nil {
fmt.Printf("客户端定时上报数据 错误: %v\n", err)
continue
}
// 广播 OTA 消息
broadcastMessage(message)
default: default:
// 处理其他消息类型 // 处理其他消息类型
if !client.IsAuth { if !client.IsAuth {
@@ -262,6 +298,31 @@ func (s *Server) RealTimeReporting(client *Client, message []byte) error {
if msg.Type != "start" { if msg.Type != "start" {
return fmt.Errorf("unauthorized") return fmt.Errorf("unauthorized")
} }
fmt.Printf("设备实时上报数据:%s\r\n", msg.Data) fmt.Printf("设备实时上报数据:%v\r\n", msg)
return nil
}
func (s *Server) StopRealTimeReporting(client *Client, message []byte) error {
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
return err
}
if msg.Type != "stop" {
return fmt.Errorf("unauthorized")
}
fmt.Printf("设备停止实时上报数据:%s\r\n", msg.State)
return nil
}
// TimingReporting 定时上报数据
func (s *Server) TimingReporting(client *Client, message []byte) error {
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
return err
}
if msg.Type != "up" {
return fmt.Errorf("unauthorized")
}
fmt.Printf("设备定时上报数据:%v\r\n", msg.MessageData)
return nil return nil
} }

View File

@@ -21,7 +21,13 @@ type MessageState struct {
} }
type MessageData struct { type MessageData struct {
Data string `json:"Data,omitempty"` UpDataStruct
}
type UpDataStruct struct {
Sum int `json:"sum"`
Time int `json:"time"`
Mile int `json:"mile"`
} }
type Message struct { type Message struct {
@@ -31,5 +37,5 @@ type Message struct {
MessageVer MessageVer
MessageTime MessageTime
MessageState MessageState
MessageData MessageData `json:"Data"`
} }

View File

@@ -1,6 +1,8 @@
package tcpserver package tcpserver
import ( import (
"bufio"
"bytes"
"fmt" "fmt"
"net" "net"
"os" "os"
@@ -223,3 +225,19 @@ func (s *Server) GetClient(id string) (*Client, bool) {
client, ok := s.clients[id] client, ok := s.clients[id]
return client, ok return client, ok
} }
func readUntilDelimiter(reader *bufio.Reader, delimiter []byte) ([]byte, error) {
var buffer bytes.Buffer
for {
chunk, err := reader.ReadBytes('\n')
if err != nil {
return nil, err
}
buffer.Write(chunk)
if bytes.HasSuffix(buffer.Bytes(), delimiter) {
break
}
}
data := buffer.Bytes()
return data[:len(data)-len(delimiter)], nil
}

View File

@@ -23,7 +23,7 @@ type Hub struct {
mu sync.RWMutex mu sync.RWMutex
} }
// Message WebSocket 消息结构 // WsMessage WebSocket 消息结构
type WsMessage struct { type WsMessage struct {
IMEI string `json:"imei"` IMEI string `json:"imei"`
Data string `json:"data"` Data string `json:"data"`