-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbus.go
More file actions
124 lines (109 loc) · 2.95 KB
/
bus.go
File metadata and controls
124 lines (109 loc) · 2.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package golwpush
import (
"context"
"encoding/binary"
"github.com/NullpointerW/golwpush/errs"
"github.com/NullpointerW/golwpush/logger"
"github.com/NullpointerW/golwpush/pkg"
"github.com/NullpointerW/golwpush/protocol"
"github.com/NullpointerW/golwpush/utils"
"net"
"time"
)
type Content struct {
Id uint64
Msg string
}
func (c Content) pkg() *pkg.Package {
return &pkg.Package{
Data: utils.Scb(c.Msg),
Mode: pkg.MSG,
}
}
var (
connAddCh0 chan *Conn = make(chan *Conn)
ConnAddCh chan<- *Conn = connAddCh0
connRmCh0 chan *Conn = make(chan *Conn)
ConnRmCh chan<- *Conn = connRmCh0
broadcast0 chan string = make(chan string, 2048)
Broadcast chan<- string = broadcast0
mergedMsg chan *pkg.Package = make(chan *pkg.Package, 1024)
multiPushCh0 chan *Contents = make(chan *Contents, 1024)
MultiPushCh chan<- *Contents = multiPushCh0
conns map[uint64]*Conn = make(map[uint64]*Conn)
pushCh0 chan Content = make(chan Content, 1024)
PushCh chan<- Content = pushCh0
bizCh0 chan BizReq = make(chan BizReq, 1024)
BizCh chan<- BizReq = bizCh0
)
func Handle() {
go lingerProcess()
for {
select {
case content := <-pushCh0:
if _, exist := conns[content.Id]; exist {
conns[content.Id].write(content.pkg())
}
case conn := <-connAddCh0:
if _, exist := conns[conn.Uid]; exist {
conn.errMsg <- errs.NewDuplicateConnIdErr(conn.Uid)
continue
}
conns[conn.Uid] = conn
now := time.Now().Format(utils.TimeParseLayout)
storeConnNum(uint64(len(conns)))
connInfos[conn.Uid] = ConnInfo{conn.Uid, conn.tcpConn.RemoteAddr().String(), now}
case conn := <-connRmCh0:
delete(conns, conn.Uid)
storeConnNum(uint64(len(conns)))
delete(connInfos, conn.Uid)
case msg := <-mergedMsg:
broadcaster(msg)
case contents := <-multiPushCh0:
multiSend(contents.pkg(), contents.Ids, contents.Res)
case req := <-bizCh0:
switch req.Typ {
case Info:
if info, exist := connInfos[req.Uid]; exist {
req.Res <- info
} else {
req.Res <- nil
}
case Kick:
//TODO
}
}
}
}
func InitConn(tcpConn net.Conn) {
uid, err := AuthCli(tcpConn)
if err != nil {
logger.Error(err)
return
}
newConn(tcpConn, uid)
}
func AuthCli(conn net.Conn) (uid uint64, err error) {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
t := time.NewTimer(time.Minute * 1)
defer t.Stop()
select {
case <-ctx.Done():
case <-t.C:
conn.Close()
logger.PlnNUid(logger.L_Err|logger.Host, conn.RemoteAddr().String(), errs.SendUidTimeOut)
}
}(ctx)
//接收客户端uid
data, err := protocol.UnPackByteStream(conn)
if err != nil {
logger.PfNUid(logger.CliErr, conn.RemoteAddr().String(), "read error:%v", err)
cancel()
return
}
cancel()
uid = binary.BigEndian.Uint64(data)
logger.PfNUid(logger.Cli|logger.Login|logger.Host, conn.RemoteAddr().String(), "recv uid:%d", uid)
return
}