Last active
August 10, 2023 02:15
-
-
Save liutianpeng/85ce524452c8206396c94ab93506deda to your computer and use it in GitHub Desktop.
消息队列高手课 - 13节 - 大爷聊天初版
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
jsoniter "github.com/json-iterator/go" | |
"io" | |
"net" | |
"sync" | |
) | |
type HuTongMsg struct { | |
From string | |
To string | |
Message string | |
} | |
func (m *HuTongMsg) Pack() []byte { | |
d, err := jsoniter.Marshal(m) | |
if err != nil { | |
fmt.Errorf(err.Error()) | |
} | |
data := make([]byte, len(d)+1, len(d)+1) | |
data[0] = byte(len(d)) | |
copy(data[1:], d) | |
//fmt.Printf("Pack:大爷·%s 对大爷·%s 说:%s\n", m.From, m.To, m.Message) | |
return data | |
} | |
func (m *HuTongMsg) UnPack(data []byte) error { | |
err := jsoniter.Unmarshal(data[1:], m) | |
if err != nil { | |
return fmt.Errorf(err.Error()) | |
} | |
//fmt.Printf("UnPack:大爷·%s 对大爷·%s 说:%s\n", m.From, m.To, m.Message) | |
return nil | |
} | |
/* | |
胡同协议: | |
1Byte(长度)|Payload | |
连接后的第一个消息表明身份 | |
*/ | |
var hotong = "127.0.0.1:9999" | |
var wg sync.WaitGroup | |
func recvMessage(conn *net.TCPConn) ([]byte, error) { | |
size := make([]byte, 1, 1) | |
if _, err := io.ReadFull(conn, size); err != nil { | |
return nil, fmt.Errorf("胡同故障:%s", err.Error()) | |
} | |
data := make([]byte, size[0]+1, size[0]+1) | |
data[0] = size[0] | |
if n, err := io.ReadFull(conn, data[1:]); err != nil && n != int(size[0]) { | |
return nil, fmt.Errorf("胡同故障:%s", err.Error()) | |
} | |
return data, nil | |
} | |
//胡同口 | |
func HuTongKou() { | |
wg.Add(1) | |
daYeConn := make(map[string]*net.TCPConn) | |
daYeMailBox := make(map[string][][]byte) | |
daYesLock := &sync.Mutex{} | |
//大爷来了 | |
addDaYe := func(name string, conn *net.TCPConn) { | |
daYesLock.Lock() | |
defer daYesLock.Unlock() | |
daYeConn[name] = conn | |
if box, ok := daYeMailBox[name]; ok { | |
//fmt.Printf("大爷·%s 的邮箱有数据\n", name) | |
for _, data := range box { | |
n, e := conn.Write(data) | |
if n != len(data) || e != nil { | |
fmt.Println("大爷不收数据") | |
} | |
} | |
delete(daYeMailBox, name) | |
} | |
} | |
//阿爷离线 | |
delDaYe := func(name string) { | |
daYesLock.Lock() | |
defer daYesLock.Unlock() | |
delete(daYeConn, name) | |
} | |
//寻找大爷 | |
findDaYe := func(name string) *net.TCPConn { | |
daYesLock.Lock() | |
defer daYesLock.Unlock() | |
if dy, ok := daYeConn[name]; ok { | |
return dy | |
} | |
return nil | |
} | |
//写数据到大爷的邮箱(缓存) | |
sendDaYeMailBox := func(name string, data []byte) { | |
daYesLock.Lock() | |
defer daYesLock.Unlock() | |
//fmt.Printf("写入大爷·%s 的邮箱\n", name) | |
if box, ok := daYeMailBox[name]; ok == false { | |
box = make([][]byte, 0, 10) | |
daYeMailBox[name] = box | |
} | |
daYeMailBox[name] = append(daYeMailBox[name], data) | |
} | |
//胡同口转发函数 | |
hutongFunc := func(conn *net.TCPConn) { | |
daye := "" //大爷的名字 | |
for { | |
data, err := recvMessage(conn) | |
if err != nil { | |
//大爷离开胡同口 | |
delDaYe(daye) | |
return | |
} | |
msg := &HuTongMsg{} | |
msg.UnPack(data) | |
if msg.Message == "咳咳咳" { | |
//大爷来到胡同口 | |
//fmt.Println("大爷·" + msg.From + " 来了") | |
if len(daye) == 0 { | |
addDaYe(msg.From, conn) | |
daye = msg.From | |
} | |
} else { | |
msg.From = daye | |
data = msg.Pack() | |
if c := findDaYe(msg.To); c != nil { | |
c.Write(data) | |
} else { | |
sendDaYeMailBox(msg.To, data) | |
} | |
} | |
} | |
} | |
tcpAddr, _ := net.ResolveTCPAddr("tcp", hotong) | |
tcpListener, _ := net.ListenTCP("tcp", tcpAddr) | |
defer tcpListener.Close() | |
fmt.Println("胡同口开放 ...") | |
wg.Done() | |
for { | |
tcpConn, err := tcpListener.AcceptTCP() | |
if err != nil { | |
fmt.Println(err) | |
continue | |
} | |
fmt.Println("A client connected :" + tcpConn.RemoteAddr().String()) | |
go hutongFunc(tcpConn) | |
} | |
} | |
//大爷发言 | |
func daYeSpeak(fromName string, toName string, message string, conn *net.TCPConn) { | |
msg := &HuTongMsg{} | |
msg.To = toName | |
msg.From = fromName | |
msg.Message = message | |
//fmt.Printf("Speak:大爷·%s 对大爷·%s 说:%s\n", msg.From, msg.To, msg.Message) | |
conn.Write(msg.Pack()) | |
} | |
//大爷竖起耳朵 | |
func daYeListen(name string, conn *net.TCPConn) *HuTongMsg { | |
//fmt.Printf("大爷·%s 竖起了耳朵\n", name) | |
data, err := recvMessage(conn) | |
if err != nil { | |
fmt.Println("有个大爷没听清...") | |
return nil | |
} | |
msg := &HuTongMsg{} | |
msg.UnPack(data) | |
return msg | |
} | |
//大爷·张 | |
func DaYe_Zhang(conn *net.TCPConn) { | |
var msg *HuTongMsg | |
daYeSpeak("张", "胡同", "咳咳咳", conn) | |
daYeSpeak("张", "李", "吃了没,您呐?", conn) | |
msg = daYeListen("张", conn) | |
if msg == nil || msg.Message != "刚吃。" { | |
fmt.Println("大爷消息错乱了") | |
} | |
daYeSpeak("张", "李", "你这,嘛去?", conn) | |
msg = daYeListen("张", conn) | |
if msg == nil || msg.Message != "嗨,没事溜溜弯。" { | |
fmt.Println("大爷消息错乱了") | |
} | |
daYeSpeak("张", "李", "有空家里坐坐啊。", conn) | |
msg = daYeListen("张", conn) | |
if msg == nil || msg.Message != "回头去给老太太请安!" { | |
fmt.Println("大爷消息错乱了") | |
} | |
} | |
//大爷·李 | |
func DaYe_Li(conn *net.TCPConn) { | |
var msg *HuTongMsg | |
daYeSpeak("李", "胡同", "咳咳咳", conn) | |
msg = daYeListen("李", conn) | |
if msg == nil || msg.Message != "吃了没,您呐?" { | |
fmt.Println("大爷消息错乱了") | |
} | |
daYeSpeak("李", "张", "刚吃。", conn) | |
msg = daYeListen("李", conn) | |
if msg == nil || msg.Message != "你这,嘛去?" { | |
fmt.Println("大爷消息错乱了") | |
} | |
daYeSpeak("李", "张", "嗨,没事溜溜弯。", conn) | |
msg = daYeListen("李", conn) | |
if msg == nil || msg.Message != "有空家里坐坐啊。" { | |
fmt.Println("大爷消息错乱了") | |
} | |
daYeSpeak("李", "张", "回头去给老太太请安!", conn) | |
} | |
func HuTong() { | |
go HuTongKou() | |
wg.Wait() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
var tcpAddr *net.TCPAddr | |
tcpAddr, _ = net.ResolveTCPAddr("tcp", hotong) | |
conn, _ := net.DialTCP("tcp", nil, tcpAddr) | |
for i := 0; i < 10000; i++ { | |
DaYe_Zhang(conn) | |
} | |
conn.Close() | |
}() | |
//time.Sleep(time.Second) | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
var tcpAddr *net.TCPAddr | |
tcpAddr, _ = net.ResolveTCPAddr("tcp", hotong) | |
conn, _ := net.DialTCP("tcp", nil, tcpAddr) | |
for i := 0; i < 10000; i++ { | |
DaYe_Li(conn) | |
} | |
conn.Close() | |
}() | |
wg.Wait() | |
fmt.Println("大爷们都回家去了") | |
} | |
//运行 HuTong() | |
//BenchmarkHuTong-12 1 3892092200 ns/op | |
//3892ms |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
看不懂 是我太弱鸡了...