OpenIM JSSDK (一)
WobSockets 学习
导入包 你导入了各种包,其中有一些关于 OpenIM SDK 的包、websocket、日志和HTTP相关的包。
主函数 在
main()
函数中,你的代码首先启动了一个 HTTP 服务,主要用于支持pprof
(一个 Go 语言内建的性能分析工具)。命令行参数解析 使用
flag
包,你解析了以下命令行参数:openIM_api_address
:OpenIM 的 API 服务器地址。msgParseopenIM_ws_address
:OpenIM 的 websocket 服务器地址。sdk_ws_port
:SDK websocket 的监听端口。openIM_log_level
:OpenIM 的日志输出级别。openIMDbDir
:OpenIM 的数据库目录。
这些参数允许你在启动应用程序时进行自定义配置。
日志初始化 使用
log.NewPrivateLog
初始化了日志系统,其中参数constant.LogFileName
是日志文件名,第二个参数是日志的输出级别。判断操作系统类型 使用
runtime.GOOS
判断操作系统类型。根据系统类型,代码选择性地调用ws_local_server.InitServer
。但在此代码中,不论系统类型如何,都执行了相同的逻辑。- 如果是 Windows:
函数定义
InitServer
函数接受一个指向sdk_struct.IMConfig
的指针参数,名为config
。这个结构体可能包含了 OpenIM SDK 的初始化配置信息。序列化配置
使用
json.Marshal
将config
结构体序列化成 JSON 格式。这样可以方便地将配置信息转化为字符串,用于日志输出或其他目的。data, _ := json.Marshal(config) ConfigSvr = string(data)
注意:在这里,错误被忽略了(用
_
表示)。在实际的生产环境中,错误处理是很重要的,应该检查json.Marshal
是否返回了一个错误,并相应地处理。初始化用户路由映射
UserRouteMap = make(map[string]RefRouter, 0)
初始化了一个名为
UserRouteMap
的映射(map)。这个映射可能是用来记录用户的路由信息。其中,键(key)是字符串类型(可能是用户的 ID 或用户名),值(value)是RefRouter
类型(该类型在这段代码中没有给出定义,但它可能是一个引用到路由或会话的结构或接口)。初始化 OpenIM SDK
open_im_sdk.InitOnce(config)
调用
open_im_sdk.InitOnce
函数来初始化 OpenIM SDK,并传入前面提到的config
作为参数。日志输出
log.Info("", "InitServer ", ConfigSvr)
使用日志库输出一条信息,表示服务器已初始化,并附带了序列化后的配置信息
ConfigSvr
。
Websocket 服务器初始化和启动 调用了
ws_local_server.WS.OnInit
和ws_local_server.WS.Run
来初始化并运行websocket服务器。
关于你的问题:
1. 登录和退出登录的问题 代码中并没有涉及具体的登录和退出登录的逻辑。但一般来说,使用WebSockets进行登录的流程大致是:
- 客户端向服务器发送一个包含登录信息(如用户名和密码或token)的websocket消息。
- 服务器验证这些信息,并回复一个确认消息,告诉客户端是否成功登录。
转换为网络调用并使用 WebSockets
,你需要确保客户端和服务器之间的消息格式和逻辑都明确并且一致。对于登录和退出登录,你可能需要定义不同的消息类型,并在服务器端进行相应的处理。
Run 代码:
func (ws *WServer) Run() {
go ws.getMsgAndSend()
go func() {
http.ListenAndServe("0.0.0.0:45000", nil)
}()
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
err := http.ListenAndServe(ws.wsAddr, nil) //Start listening
if err != nil {
log.Info("", "Ws listening err", "", "err", err.Error())
}
}
开启协程处理消息发送:
go ws.getMsgAndSend()
这里开启了一个新的协程(goroutine)来运行
ws.getMsgAndSend()
方法。在 Go 语言中,使用go
关键字可以异步运行一个函数,使其在单独的协程中执行。这意味着getMsgAndSend
方法是并行执行的,可能用于持续地从某个通道或队列中获取消息并发送它们。开启另一个 HTTP 服务器监听特定端口:
go func() { http.ListenAndServe("0.0.0.0:45000", nil) }()
这段代码在一个新的协程中启动了一个 HTTP 服务器,监听
0.0.0.0:45000
地址。这可能是为了特定的目的,例如监控或其他服务,但在此代码片段中没有详细说明。为 websocket 连接设置请求处理器:
http.HandleFunc("/", ws.wsHandler)
使用
http.HandleFunc
方法为 root path(即“/”)设置了一个处理函数ws.wsHandler
。当 HTTP 服务器收到一个针对这个路径的请求时,它将由ws.wsHandler
方法处理。很可能,这个方法用于处理来自客户端的 websocket 连接请求。启动 websocket 服务器:
err := http.ListenAndServe(ws.wsAddr, nil)
这行代码启动了一个 HTTP 服务器,监听在之前定义的
ws.wsAddr
地址上(例如:10003
)。由于它不在协程中,所以这是一个阻塞性调用,意味着它会阻塞直到服务器停止或发生错误。错误处理:
if err != nil { log.Info("", "Ws listening err", "", "err", err.Error()) }
如果
http.ListenAndServe
返回一个错误,这段代码会记录该错误。在这里,它使用了log.Info
方法来输出错误信息。
继续:
func (ws *WServer) getMsgAndSend() {
defer func() {
if r := recover(); r != nil {
log.Info("", "getMsgAndSend panic", " panic is ", r, debug.Stack())
ws.getMsgAndSend()
log.Info("", "goroutine getMsgAndSend restart")
}
}()
for {
select {
case r := <-ws.ch:
go func() {
operationID := utils2.OperationIDGenerator()
log.Info(operationID, "getMsgAndSend channel: ", string(r.data), r.uid)
// conns := ws.getUserConn(r.uid + " " + "Web")
conns := ws.getUserConn(r.uid + " " + utils.PlatformIDToName(sdk_struct.SvrConf.Platform))
if conns == nil {
log.Error(operationID, "uid no conn, failed ", r.uid+" "+utils.PlatformIDToName(sdk_struct.SvrConf.Platform))
r.data = nil
}
log.Info(operationID, "conns ", conns, r.uid+" "+utils.PlatformIDToName(sdk_struct.SvrConf.Platform))
for _, conn := range conns {
if conn != nil {
err := WS.writeMsg(conn, websocket.TextMessage, r.data)
if err != nil {
log.Error(operationID, "WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", r.uid, "error", err.Error())
} else {
log.Info(operationID, "writeMsg ", conn.RemoteAddr(), string(r.data), r.uid)
}
} else {
log.Error(operationID, "Conn is nil, failed")
}
}
r.data = nil
}()
}
}
}
异常恢复:
defer func() { if r := recover(); r != nil { // logging and restarting the goroutine } }()
这是一个
defer
语句中的匿名函数,它会在getMsgAndSend
方法结束时执行。这里的目的是捕获和处理任何在该方法中产生的panic,防止程序崩溃。如果发生panic,该函数将记录错误并尝试重新启动getMsgAndSend
方法。无限循环获取消息:
for {
使用一个无限循环确保方法持续地从通道获取消息。
从通道获取消息:
select { case r := <-ws.ch:
使用
select
语句从ws.ch
通道中读取消息。在这个例子中,select
只有一个case
,所以它将阻塞直到从ws.ch
通道中接收到一个消息。处理和发送消息:
- 为每个消息生成一个唯一的
operationID
。 - 从
ws.getUserConn
方法获取与消息相关的所有websocket连接。果r.uid
是"12345",sdk_struct.SvrConf.Platform
是iOS的平台ID,那么最终传递给ws.getUserConn
的字符串可能是"12345 iOS"。 - 对于每个websocket连接,使用
WS.writeMsg
方法发送消息。 - 如果发送消息时出现错误,记录错误信息。
- 如果连接为nil,记录错误信息。
- 为每个消息生成一个唯一的
注意几个点:
- 使用了一个内部的goroutine(
go func() {...}
)来处理和发送每个消息。这意味着消息的发送是并行执行的,不会阻塞其他消息的处理。 - 这个函数关心的主要是将消息分发到正确的websocket连接,而不是消息的内容或格式。
r.data
是消息的内容,它是一个字节切片。在发送完消息后,将其设为nil,可能是为了帮助垃圾收集器回收内存。
继续:
func PlatformIDToName(num int32) string {
return PlatformID2Name[num]
}
var PlatformID2Name = map[int32]string{
IOSPlatformID: IOSPlatformStr,
AndroidPlatformID: AndroidPlatformStr,
WindowsPlatformID: WindowsPlatformStr,
OSXPlatformID: OSXPlatformStr,
WebPlatformID: WebPlatformStr,
MiniWebPlatformID: MiniWebPlatformStr,
LinuxPlatformID: LinuxPlatformStr,
}
PlatformIDToName 函数:
func PlatformIDToName(num int32) string { return PlatformID2Name[num] }
这是一个简单的辅助函数,其接受一个
int32
类型的平台ID作为输入参数,并返回与之对应的平台名称。这个函数通过查找PlatformID2Name
映射(map)来获取相应的平台名称。PlatformID2Name 映射:
var PlatformID2Name = map[int32]string{ IOSPlatformID: IOSPlatformStr, AndroidPlatformID: AndroidPlatformStr, ... }
这是一个全局映射变量,其中的键是
int32
类型的平台ID,值是相应的平台名称字符串。在代码片段中,具体的ID和名称的值没有给出,但从变量命名可以推测它们分别代表不同的平台(如iOS、Android、Windows等)。
wsHandler
继续从 wsHandler 开始:
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
operationID := utils2.OperationIDGenerator()
defer func() {
if r := recover(); r != nil {
log.Info(operationID, "wsHandler panic recover", " panic is ", r)
buf := make([]byte, 1<<20)
runtime.Stack(buf, true)
log.Info(operationID, "panic", "call", string(buf))
}
}()
//var mem runtime.MemStats
//runtime.ReadMemStats(&mem)
//if mem.Alloc > 2*1024*1024*1024 {
// panic("Memory leak " + int64ToString(int64(mem.Alloc)))
//}
//log.Info(operationID, "wsHandler ", r.URL.Query(), "js sdk svr mem: ", mem.Alloc, mem.TotalAlloc, "all: ", mem)
if ws.headerCheck(w, r, operationID) {
query := r.URL.Query()
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
if err != nil {
log.Info(operationID, "upgrade http conn err", "", "err", err)
return
} else {
sendIDAndPlatformID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
newConn := &UserConn{conn, new(sync.Mutex)}
ws.addUserConn(sendIDAndPlatformID, newConn, operationID)
go ws.readMsg(newConn, sendIDAndPlatformID)
}
} else {
log.NewError(operationID, "headerCheck failed")
}
}
函数定义:
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request)
这是一个属于
WServer
结构体的方法。该方法有两个参数:HTTP响应写入器w
和HTTP请求r
。处理panic: 这里的
defer
函数与recover
配合,用于捕获并记录函数内部的任何panic,确保服务的正常运行。头部检查(headerCheck):
if ws.headerCheck(w, r, operationID) {
之前,
headerCheck
函数可能用于验证来自客户端的请求头部是否满足某些要求或包含有效的令牌。只有当此检查通过时,代码才会继续执行后续逻辑。升级HTTP连接为WebSocket:
conn, err := ws.wsUpGrader.Upgrade(w, r, nil)
使用
wsUpGrader
,该代码尝试将HTTP请求升级为WebSocket连接。处理WebSocket连接:
- 如果升级失败,则记录错误并返回。
- 如果成功,它将获取
sendID
和platformID
来识别用户,并组合它们以创建一个唯一标识符sendIDAndPlatformID
。 - 创建一个新的
UserConn
结构体实例,表示用户的WebSocket连接。 - 使用
addUserConn
方法将新的WebSocket连接添加到内部数据结构中。 - 使用
readMsg
方法在新的goroutine中异步读取来自该WebSocket连接的消息。
如果头部检查失败: 如果
headerCheck
失败,它将记录一条错误消息。
继续进行 headerCheck:
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool {
status := http.StatusUnauthorized
query := r.URL.Query()
log.Info(operationID, "headerCheck: ", query["token"], query["platformID"], query["sendID"], r.RemoteAddr)
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
if ws.getConnNum(SendID) >= POINTNUM {
log.Info(operationID, "Over quantity failed", query, ws.getConnNum(SendID), SendID)
w.Header().Set("Sec-Websocket-Version", "13")
http.Error(w, "Over quantity", status)
return false
}
//if utils.StringToInt(query["platformID"][0]) != utils.WebPlatformID {
// log.Info("check platform id failed", query["sendID"][0], query["platformID"][0])
// w.Header().Set("Sec-Websocket-Version", "13")
// http.Error(w, http.StatusText(status), StatusBadRequest)
// return false
//}
checkFlag := open_im_sdk.CheckToken(query["sendID"][0], query["token"][0], operationID)
if checkFlag != nil {
log.Info(operationID, "check token failed", query["sendID"][0], query["token"][0], checkFlag.Error())
w.Header().Set("Sec-Websocket-Version", "13")
http.Error(w, http.StatusText(status), status)
return false
}
log.Info(operationID, "Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0], "platformID", query["platformID"][0])
return true
} else {
log.Info(operationID, "Args err", "", "query", query)
w.Header().Set("Sec-Websocket-Version", "13")
http.Error(w, http.StatusText(status), StatusBadRequest)
return false
}
}
函数定义:
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool
这是一个属于
WServer
结构体的方法。该方法有三个参数:HTTP响应写入器w
、HTTP请求r
,以及operationID
,一个用于日志记录的标识符。日志记录:
log.Info(operationID, "headerCheck: ", query["token"], query["platformID"], query["sendID"], r.RemoteAddr)
为了方便跟踪和调试,该方法首先记录了传入请求的一些参数。
验证查询参数: 该方法检查请求URL的查询参数中是否存在
token
,sendID
和platformID
。这些参数是WebSocket连接的必需参数。检查连接数量:
if ws.getConnNum(SendID) >= POINTNUM
此处限制了与特定用户关联的WebSocket连接数。如果超过了限制(
POINTNUM
),则返回错误。验证平台ID: 虽然代码中注释掉了此部分,但原来的意图是检查
platformID
是否等于特定的平台ID,例如Web。Token验证:
checkFlag := open_im_sdk.CheckToken(query["sendID"][0], query["token"][0], operationID)
使用提供的SDK函数
CheckToken
来验证用户的token
。处理失败的请求: 如果验证失败,该方法将在响应头部设置
Sec-Websocket-Version
并发送一个错误消息。返回值: 方法最后返回一个布尔值,表示请求是否已成功验证。
总结:headerCheck
方法的主要目的是验证WebSocket连接请求。它确保请求具有所有必要的查询参数,并且这些参数都是有效的。如果所有检查都通过,则该请求被认为是有效的,方法将返回true
;否则返回false
。
继续看看检查 token:
func CheckToken(userID, token string, operationID string) (error, uint32) {
if operationID == "" {
operationID = utils.OperationIDGenerator()
}
log.Debug(operationID, utils.GetSelfFuncName(), userID, token)
p := ws.NewPostApi(token, sdk_struct.SvrConf.ApiAddr)
user := user.NewUser(nil, p, userID, nil)
//_, err := user.GetSelfUserInfoFromSvr(operationID)
//if err != nil {
// return utils.Wrap(err, "GetSelfUserInfoFromSvr failed "+operationID), 0
//}
exp, err := user.ParseTokenFromSvr(operationID)
return err, exp
}
函数定义:
func CheckToken(userID, token string, operationID string) (error, uint32)
此函数接受三个参数:用户ID(
userID
)、token(token
)和用于日志记录的操作ID(operationID
)。它返回一个错误对象和一个无符号的32位整数。生成操作ID:
if operationID == "" { operationID = utils.OperationIDGenerator() }
如果传入的操作ID是空的,则会生成一个新的操作ID。
日志记录:
log.Debug(operationID, utils.GetSelfFuncName(), userID, token)
记录调试信息,包括函数名和传入的参数。
创建Post API和用户对象:
p := ws.NewPostApi(token, sdk_struct.SvrConf.ApiAddr) user := user.NewUser(nil, p, userID, nil)
为了进一步与服务器通信,这里首先创建了一个新的Post API对象。然后,使用该API对象创建了一个新的用户对象。
API请求的一种常见方式是使用HTTP POST请求。这种请求可以发送数据(例如用户名和密码)到服务器,并期望返回某些数据(例如用户的详细信息)。
在给定的代码中,
ws.NewPostApi(token, sdk_struct.SvrConf.ApiAddr)
很可能是创建一个新的API客户端的方法,用于向地址sdk_struct.SvrConf.ApiAddr
发送POST请求,并使用token
进行身份验证。Token验证: 虽然已经注释掉了这部分代码,但原意是通过
GetSelfUserInfoFromSvr
方法从服务器获取用户信息来验证token。此验证已被新的ParseTokenFromSvr
方法替代。解析Token:
exp, err := user.ParseTokenFromSvr(operationID)
使用
ParseTokenFromSvr
方法解析token。这可能是为了检查token是否还有效或是否与给定的用户ID匹配。返回值: 函数最后返回由
ParseTokenFromSvr
方法产生的错误和无符号的32位整数值。
继续看 Parse Token 部分:
func (u *User) ParseTokenFromSvr(operationID string) (uint32, error) {
apiReq := api.ParseTokenReq{}
apiReq.OperationID = operationID
apiResp := api.ParseTokenResp{}
err := u.p.PostReturn(constant.ParseTokenRouter, apiReq, &apiResp.ExpireTime)
if err != nil {
return 0, utils.Wrap(err, apiReq.OperationID)
}
log.Info(operationID, "apiResp.ExpireTime.ExpireTimeSeconds ", apiResp.ExpireTime)
return apiResp.ExpireTime.ExpireTimeSeconds, nil
}
此代码片段定义了User
结构的方法ParseTokenFromSvr
,它旨在与服务器通信以解析(或验证)令牌。下面是该函数的详细步骤及解释:
函数签名:
func (u *User) ParseTokenFromSvr(operationID string) (uint32, error)
这是一个
User
结构的方法。它接受一个名为operationID
的字符串参数,并返回一个uint32
类型的值和一个error
。创建请求对象:
apiReq := api.ParseTokenReq{}
该代码初始化一个名为
apiReq
的ParseTokenReq
类型的对象。这个对象很可能是用于API调用的请求体。设置操作ID:
apiReq.OperationID = operationID
将传递给函数的
operationID
分配给apiReq
的OperationID
字段。创建响应对象:
apiResp := api.ParseTokenResp{}
此代码初始化一个预期的API响应对象。此对象将被填充,当服务器响应API请求时。
API请求:
err := u.p.PostReturn(constant.ParseTokenRouter, apiReq, &apiResp.ExpireTime)
使用
PostReturn
方法,该方法发送一个POST请求到服务器。它需要三个参数:constant.ParseTokenRouter
:API的路由或端点。apiReq
:请求的数据。&apiResp.ExpireTime
:响应的数据应被填充在这里。
错误检查:
if err != nil { return 0, utils.Wrap(err, apiReq.OperationID) }
如果在API请求中出现错误,将错误包装(使用
utils.Wrap
函数)并返回。记录响应:
log.Info(operationID, "apiResp.ExpireTime.ExpireTimeSeconds ", apiResp.ExpireTime)
将API响应记录到日志中。
返回结果:
return apiResp.ExpireTime.ExpireTimeSeconds, nil
返回从服务器获取的到期时间。如果没有错误,第二个返回值将为
nil
。
这个函数的目的是与服务器通信,验证(或解析)令牌,并从服务器获取令牌的到期时间。
OperationID;
operationID
是一个通常在日志、监控、调试和跟踪中使用的标识符。它的目的是在分布式系统中提供一个独特的、可识别的标记,使得特定的操作或请求可以跨多个组件、服务或日志条目进行追踪。
在给定的代码中,operationID
被用作一个标识符,其值随请求被传递,并可能被用于日志记录或错误处理。这种方式使得开发人员和运维人员能够轻松地跟踪或查找与特定请求或操作相关的所有活动或问题。
使用 operationID
的优势包括:
- 跟踪能力:在分布式系统中,一个请求可能会穿越多个服务或组件。使用唯一的
operationID
可以帮助你跟踪整个请求的生命周期。 - 简化问题诊断:当问题发生时,可以使用
operationID
快速定位所有与特定请求或操作相关的日志条目,从而更容易地确定问题的根源。 - 提高日志的有用性:
operationID
可以帮助将相关的日志条目分组,使其更容易分析。
在给定的代码中,似乎 operationID
被用于日志记录,从而能够跟踪请求、操作或特定的
继续看 headcheck 检查成功部分:
sendIDAndPlatformID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
newConn := &UserConn{conn, new(sync.Mutex)}
ws.addUserConn(sendIDAndPlatformID, newConn, operationID)
go ws.readMsg(newConn, sendIDAndPlatformID)
添加用户链接:
func (ws *WServer) addUserConn(uid string, conn *UserConn, operationID string) {
rwLock.Lock()
var flag int32
if oldConnMap, ok := ws.wsUserToConn[uid]; ok {
flag = 1
oldConnMap[conn.RemoteAddr().String()] = conn
ws.wsUserToConn[uid] = oldConnMap
log.Info(operationID, "this user is not first login", "", "uid", uid)
//err := oldConn.Close()
//delete(ws.wsConnToUser, oldConn)
//if err != nil {
// log.Info("", "close err", "", "uid", uid, "conn", conn)
//}
} else {
i := make(map[string]*UserConn)
i[conn.RemoteAddr().String()] = conn
ws.wsUserToConn[uid] = i
log.Info(operationID, "this user is first login", "", "uid", uid)
}
if oldStringMap, ok := ws.wsConnToUser[conn]; ok {
oldStringMap[conn.RemoteAddr().String()] = uid
ws.wsConnToUser[conn] = oldStringMap
log.Info(operationID, "find failed", "", "uid", uid)
//err := oldConn.Close()
//delete(ws.wsConnToUser, oldConn)
//if err != nil {
// log.Info("", "close err", "", "uid", uid, "conn", conn)
//}
} else {
i := make(map[string]string)
i[conn.RemoteAddr().String()] = uid
ws.wsConnToUser[conn] = i
log.Info(operationID, "this user is first login", "", "uid", uid)
}
log.Info(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid, "online_num", len(ws.wsUserToConn))
rwLock.Unlock()
//log.Info("", "after add, wsConnToUser map ", ws.wsConnToUser)
// log.Info("", "after add, wsUserToConn map ", ws.wsUserToConn)
if flag == 1 {
// DelUserRouter(uid)
}
}
这是一个函数,主要用于处理 WebSocket 服务器中用户连接的添加逻辑。我们来逐行解析它。
首先,声明函数:
func (ws *WServer) addUserConn(uid string, conn *UserConn, operationID string) {
这是一个方法,其接收者类型为 *WServer
,方法名为 addUserConn
。该方法接受三个参数:uid
(用户ID),conn
(用户连接信息)和operationID
(操作ID,可能用于日志记录)。
接下来:
rwLock.Lock()
它用一个读写锁(rwLock
)来加锁,保证在对数据结构进行修改时不会发生并发冲突。
var flag int32
定义了一个 flag
变量,初始化为0。此变量稍后用于标记某些情况。
接下来检查用户是否已经存在:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok {
如果用户 uid
已经在 wsUserToConn
映射中,则 ok
为 true
。
flag = 1
将 flag
设为1,表示用户不是首次登录。
接着,函数更新该用户的连接映射,并记录日志:
oldConnMap[conn.RemoteAddr().String()] = conn
ws.wsUserToConn[uid] = oldConnMap
log.Info(operationID, "this user is not first login", "", "uid", uid)
注释部分似乎是之前的一些逻辑,现在已被注释掉:
//err := oldConn.Close()
//delete(ws.wsConnToUser, oldConn)
//if err != nil {
// log.Info("", "close err", "", "uid", uid, "conn", conn)
//}
如果用户 uid
不在 wsUserToConn
映射中,那么将创建新的映射,并记录日志:
} else {
i := make(map[string]*UserConn)
i[conn.RemoteAddr().String()] = conn
ws.wsUserToConn[uid] = i
log.Info(operationID, "this user is first login", "", "uid", uid)
}
接着,检查当前连接是否已存在:
if oldStringMap, ok := ws.wsConnToUser[conn]; ok {
如果存在,更新映射并记录日志:
oldStringMap[conn.RemoteAddr().String()] = uid
ws.wsConnToUser[conn] = oldStringMap
log.Info(operationID, "find failed", "", "uid", uid)
同样,注释部分是一些之前的逻辑。
如果连接不存在,创建新的映射并记录日志:
} else {
i := make(map[string]string)
i[conn.RemoteAddr().String()] = uid
ws.wsConnToUser[conn] = i
log.Info(operationID, "this user is first login", "", "uid", uid)
}
之后,记录一条总体的日志信息:
log.Info(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "uid", uid, "online_num", len(ws.wsUserToConn))
然后解锁:
rwLock.Unlock()
最后,根据之前的 flag
判断,如果用户不是首次登录,可能需要执行一些额外的操作(这里也被注释掉了):
if flag == 1 {
// DelUserRouter(uid)
}
整体上,这个函数的目的是在 WebSocket 服务器中添加或更新用户连接。
从 WebSockets 不断读取链接:
func (ws *WServer) readMsg(conn *UserConn, sendIDAndPlatformID string) {
for {
msgType, msg, err := conn.ReadMessage()
if err != nil {
log.Info("", "ReadMessage error", "", "userIP", conn.RemoteAddr().String(), "userUid", sendIDAndPlatformID, "error", err.Error())
//log.Info("debug memory delUserConn begin ")
//time.Sleep(1 * time.Second)
ws.delUserConn(conn)
//log.Info("debug memory delUserConn end ")
//time.Sleep(1 * time.Second)
return
} else {
log.Info("", "ReadMessage ok ", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", sendIDAndPlatformID)
}
m := Req{}
json.Unmarshal(msg, &m)
//log.Info("debug memory msgParse begin ", m)
//time.Sleep(1 * time.Second)
ws.msgParse(conn, msg)
//log.Info("debug memory msgParse end ", m)
//time.Sleep(1 * time.Second)
}
}
这段代码定义了一个方法 readMsg
,其接收者类型为 *WServer
。这个方法的主要目的是从一个 UserConn
(似乎是一个 WebSocket 连接)中不断地读取消息。
让我们继续逐行分析:
func (ws *WServer) readMsg(conn *UserConn, sendIDAndPlatformID string) {
该方法接受两个参数:conn
(用户连接信息)和 sendIDAndPlatformID
(一个字符串,可能包含发送者ID和平台ID,但从这个名称来看,具体含义不太明确)。
开始无限循环,意味着它会持续地读取消息,直到发生错误或其他退出条件:
for {
尝试从 conn
读取消息:
msgType, msg, err := conn.ReadMessage()
检查是否出现读取错误:
if err != nil {
如果出现错误,记录错误日志:
log.Info("", "ReadMessage error", "", "userIP", conn.RemoteAddr().String(), "userUid", sendIDAndPlatformID, "error", err.Error())
接着,删除该用户连接:
ws.delUserConn(conn)
此处还有一些被注释掉的代码,看起来像是之前用于调试内存问题的:
//log.Info("debug memory delUserConn begin ")
//time.Sleep(1 * time.Second)
//...
//log.Info("debug memory delUserConn end ")
//time.Sleep(1 * time.Second)
因为读取消息出错,所以直接返回,结束函数:
return
如果没有错误,则记录日志:
} else {
log.Info("", "ReadMessage ok ", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", sendIDAndPlatformID)
}
将读取到的消息解析为 Req
结构:
m := Req{}
json.Unmarshal(msg, &m)
此处假设消息是JSON格式的,并尝试将其反序列化到 Req
类型的变量中。
然后,解析该消息:
ws.msgParse(conn, msg)
同样,有一些被注释掉的代码,可能用于调试:
//log.Info("debug memory msgParse begin ", m)
//time.Sleep(1 * time.Second)
//...
//log.Info("debug memory msgParse end ", m)
//time.Sleep(1 * time.Second)
整个循环结束后,它将回到开始,继续读取下一个消息。
func (ws *WServer) msgParse(conn *UserConn, jsonMsg []byte) {
m := Req{}
if err := json.Unmarshal(jsonMsg, &m); err != nil {
SendOneConnMessage(EventData{"error", 100, "Unmarshal failed", "", ""}, conn)
return
}
defer func() {
if r := recover(); r != nil {
SendOneConnMessage(EventData{m.ReqFuncName, StatusBadParameter, StatusText(StatusBadParameter), "", m.OperationID}, conn)
log.Info("", "msgParse", "bad request, panic is ", r)
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
log.Info("", "msgParse", "call", string(buf))
}
}()
log.Info("", "msgParse", "recv request from web: ", "reqFuncName ", m.ReqFuncName, "data ", m.Data, "recv jsonMsg: ", string(jsonMsg))
if m.ReqFuncName == "Login" {
// rwLock.Lock()
ws.DoLogin(m, conn)
log.Info(m.OperationID, "msgParse", m)
// rwLock.Unlock()
return
}
UserRouteRwLock.RLock()
defer UserRouteRwLock.RUnlock()
// rwLock.RLock()
// defer rwLock.RUnlock()
urm, ok := UserRouteMap[m.UserID]
if !ok {
log.Info("", "msgParse", "user not login failed, must login first: ", m.UserID)
SendOneConnMessage(EventData{"Login", StatusNoLogin, StatusText(StatusNoLogin), "", m.OperationID}, conn)
return
}
parms := []reflect.Value{reflect.ValueOf(m.Data), reflect.ValueOf(m.OperationID)}
vf, ok := (urm.refName)[m.ReqFuncName]
if ok {
vf.Call(parms)
} else {
log.Info("", "msgParse", "no func ", m.ReqFuncName)
SendOneConnMessage(EventData{m.ReqFuncName, StatusBadParameter, StatusText(StatusBadParameter), "", m.OperationID}, conn)
}
}
这段代码定义了一个方法 msgParse
,它的主要任务是对传入的JSON消息进行解析,并根据解析出的请求函数名称 (ReqFuncName
) 执行相应的操作。让我们再进行逐行解析。
func (ws *WServer) msgParse(conn *UserConn, jsonMsg []byte) {
这是 *WServer
的方法,名为 msgParse
。它接收一个 UserConn
类型的 conn
和一个字节切片 jsonMsg
(包含JSON消息内容)。
m := Req{}
if err := json.Unmarshal(jsonMsg, &m); err != nil {
尝试将 jsonMsg
反序列化为 Req
类型的变量 m
。如果出错,向连接发送一个错误消息,并退出函数。
SendOneConnMessage(EventData{"error", 100, "Unmarshal failed", "", ""}, conn)
return
接下来,有一个延迟执行的函数来捕获可能的 panic:
defer func() {
if r := recover(); r != nil {
...
}
}()
这是一个常见的Go错误处理模式,用于捕获函数中的任何panic,并对其进行处理。
log.Info("", "msgParse", "recv request from web: ", "reqFuncName ", m.ReqFuncName, "data ", m.Data, "recv jsonMsg: ", string(jsonMsg))
记录从Web接收到的请求日志。
if m.ReqFuncName == "Login" {
ws.DoLogin(m, conn)
log.Info(m.OperationID, "msgParse", m)
return
}
如果请求的函数名称是“Login”,则调用 DoLogin
方法,并记录日志。
接下来,锁定 UserRouteRwLock
:
UserRouteRwLock.RLock()
defer UserRouteRwLock.RUnlock()
这是一个读写锁,用于保护可能由多个goroutine并发访问的数据。
urm, ok := UserRouteMap[m.UserID]
尝试从 UserRouteMap
中获取用户的路由信息。
if !ok {
...
SendOneConnMessage(EventData{"Login", StatusNoLogin, StatusText(StatusNoLogin), "", m.OperationID}, conn)
return
}
如果用户不在UserRouteMap
中,发送一个需要登录的消息。
parms := []reflect.Value{reflect.ValueOf(m.Data), reflect.ValueOf(m.OperationID)}
vf, ok := (urm.refName)[m.ReqFuncName]
这里使用了Go的 reflect
包。根据请求的函数名称从用户的路由信息中查找函数。
if ok {
vf.Call(parms)
} else {
...
SendOneConnMessage(EventData{m.ReqFuncName, StatusBadParameter, StatusText(StatusBadParameter), "", m.OperationID}, conn)
}
如果函数存在,则调用它。否则,发送一个错误消息。
总体来说,这个 msgParse
方法是一个中心处理点,它基于传入的JSON消息内容决定应执行哪些操作。这个模式允许您在不更改服务器代码的情况下,通过向客户端发送不同的JSON请求来扩展或更改行为。
继续解析 Dologin 代码
func (ws *WServer) DoLogin(m Req, conn *UserConn) {
UserRouteRwLock.RLock()
defer UserRouteRwLock.RUnlock()
urm, ok := UserRouteMap[m.UserID]
if !ok {
log.Info(m.OperationID, "login", "user first login: ", m)
refR := GenUserRouterNoLock(m.UserID, m.Batch, m.OperationID)
params := []reflect.Value{reflect.ValueOf(m.Data), reflect.ValueOf(m.OperationID)}
vf, ok := refR.refName[m.ReqFuncName]
if ok {
vf.Call(params)
} else {
log.Info("", "login", "no func name: ", m.ReqFuncName, m)
SendOneConnMessage(EventData{m.ReqFuncName, StatusBadParameter, StatusText(StatusBadParameter), "", m.OperationID}, conn)
}
} else {
if urm.wsRouter.getMyLoginStatus() == constant.LoginSuccess {
//send ok
SendOneConnMessage(EventData{"Login", 0, "ok", "", m.OperationID}, conn)
} else {
log.Info("", "login", "login status pending, try after 5 second ", urm.wsRouter.getMyLoginStatus(), m.UserID)
SendOneConnMessage(EventData{"Login", StatusLoginPending, StatusText(StatusLoginPending), "", m.OperationID}, conn)
}
}
}
- 定义
DoLogin
方法:
func (ws *WServer) DoLogin(m Req, conn *UserConn) {
此方法接受两个参数:一个 Req
类型的 m
(表示请求)和一个 UserConn
类型的 conn
(表示用户连接)。
- 锁定
UserRouteRwLock
(读锁):
UserRouteRwLock.RLock()
defer UserRouteRwLock.RUnlock()
这是一个读写锁,用于保护可能由多个 goroutine 并发访问的 UserRouteMap
数据。
- 检查用户是否已在
UserRouteMap
中:
urm, ok := UserRouteMap[m.UserID]
尝试从 UserRouteMap
中获取用户的路由信息。
- 如果用户不在
UserRouteMap
中,即他们是首次登录:
if !ok {
log.Info(m.OperationID, "login", "user first login: ", m)
记录用户首次登录的日志。
- 使用
GenUserRouterNoLock
函数生成用户的路由信息:
refR := GenUserRouterNoLock(m.UserID, m.Batch, m.OperationID)
- 使用
reflect
包来调用相应的函数:
params := []reflect.Value{reflect.ValueOf(m.Data), reflect.ValueOf(m.OperationID)}
vf, ok := refR.refName[m.ReqFuncName]
基于请求中的函数名称,从用户的路由信息中查找函数。
- 如果函数存在,调用它:
if ok {
vf.Call(params)
}
- 如果函数不存在,发送一个错误消息:
else {
log.Info("", "login", "no func name: ", m.ReqFuncName, m)
SendOneConnMessage(EventData{m.ReqFuncName, StatusBadParameter, StatusText(StatusBadParameter), "", m.OperationID}, conn)
}
- 如果用户已经在
UserRouteMap
中,即他们之前已经登录过:
} else {
- 检查用户的登录状态。如果登录成功,则发送一个成功的消息:
if urm.wsRouter.getMyLoginStatus() == constant.LoginSuccess {
SendOneConnMessage(EventData{"Login", 0, "ok", "", m.OperationID}, conn)
}
- 如果登录状态是待定的,发送一个消息告知用户稍后重试:
else {
log.Info("", "login", "login status pending, try after 5 second ", urm.wsRouter.getMyLoginStatus(), m.UserID)
SendOneConnMessage(EventData{"Login", StatusLoginPending, StatusText(StatusLoginPending), "", m.OperationID}, conn)
}
总结:这个 DoLogin
方法处理登录请求,根据用户是否已经登录以及他们的登录状态来执行不同的操作。如果用户是首次登录,会为其生成路由信息并尝试调用相关的函数。如果用户已经登录,会根据其登录状态返回相应的消息。
GenUserRouterNoLock:
func GenUserRouterNoLock(uid string, batchMsg int, operationID string) *RefRouter {
_, ok := UserRouteMap[uid]
if ok {
return nil
}
RouteMap1 := make(map[string]reflect.Value, 0)
var wsRouter1 WsFuncRouter
wsRouter1.uId = uid
vf := reflect.ValueOf(&wsRouter1)
vft := vf.Type()
mNum := vf.NumMethod()
for i := 0; i < mNum; i++ {
mName := vft.Method(i).Name
log.Info(operationID, "index:", i, " MethodName:", mName)
RouteMap1[mName] = vf.Method(i)
}
wsRouter1.InitSDK(ConfigSvr, operationID)
log.Info(operationID, "SetAdvancedMsgListener() ", uid)
wsRouter1.SetAdvancedMsgListener()
if batchMsg == 1 {
log.Info(operationID, "SetBatchMsgListener() ", uid)
wsRouter1.SetBatchMsgListener()
}
wsRouter1.SetConversationListener()
log.Info(operationID, "SetFriendListener() ", uid)
wsRouter1.SetFriendListener()
log.Info(operationID, "SetGroupListener() ", uid)
wsRouter1.SetGroupListener()
log.Info(operationID, "SetUserListener() ", uid)
wsRouter1.SetUserListener()
log.Info(operationID, "SetSignalingListener() ", uid)
wsRouter1.SetSignalingListener()
log.Info(operationID, "setWorkMomentsListener()", uid)
wsRouter1.SetWorkMomentsListener()
log.Info(operationID, "SetOrganizationListener()", uid)
wsRouter1.SetOrganizationListener()
var rr RefRouter
rr.refName = RouteMap1
rr.wsRouter = &wsRouter1
UserRouteMap[uid] = rr
log.Info(operationID, "insert UserRouteMap: ", uid)
return &rr
}
这段代码定义了一个名为 GenUserRouterNoLock
的函数,它的任务是为特定的用户生成路由信息。具体来说,它为用户的WebSocket功能生成了一个路由映射。这里是详细的步骤和代码分析:
- 定义
GenUserRouterNoLock
函数:
func GenUserRouterNoLock(uid string, batchMsg int, operationID string) *RefRouter {
此函数接收三个参数:uid
(用户ID)、batchMsg
(批处理消息标识符)和operationID
(操作ID)。
- 检查用户是否已经存在于
UserRouteMap
中:
_, ok := UserRouteMap[uid]
if ok {
return nil
}
如果用户已经存在,则返回 nil
。
- 初始化本地路由映射和WebSocket函数路由器实例:
RouteMap1 := make(map[string]reflect.Value, 0)
var wsRouter1 WsFuncRouter
wsRouter1.uId = uid
- 使用
reflect
包获取wsRouter1
的所有方法,并将其名称和方法值添加到RouteMap1
中:
vf := reflect.ValueOf(&wsRouter1)
vft := vf.Type()
mNum := vf.NumMethod()
for i := 0; i < mNum; i++ {
mName := vft.Method(i).Name
log.Info(operationID, "index:", i, " MethodName:", mName)
RouteMap1[mName] = vf.Method(i)
}
- 初始化SDK并设置不同的监听器。这些监听器可能用于处理各种WebSocket消息或事件:
wsRouter1.InitSDK(ConfigSvr, operationID)
...
wsRouter1.SetAdvancedMsgListener()
...
wsRouter1.SetBatchMsgListener()
...
- 为用户创建一个新的
RefRouter
实例,其中包含了路由映射和WebSocket路由器的引用:
var rr RefRouter
rr.refName = RouteMap1
rr.wsRouter = &wsRouter1
- 将新的
RefRouter
实例添加到UserRouteMap
中,以用户ID为键:
UserRouteMap[uid] = rr
log.Info(operationID, "insert UserRouteMap: ", uid)
- 最后,返回新创建的
RefRouter
实例的引用:
return &rr
总体说,此函数的目的是为用户生成一个WebSocket函数的路由信息。如果用户已经有路由信息,函数将返回nil。否则,它将为用户的每个WebSocket功能方法创建一个路由映射,并将其添加到全局的UserRouteMap
中。