beego框架算是golang比较成熟的一个框架了,最近看了下这个框架其中的一个在线聊天室的例子,觉得还是有很多可以学习借鉴的地方,所以就总结下。
这个例子的源码在这里,该例子配合bee工具可以很简单的进行运行。
首先看下这个项目的结构:
标准的beego框架,各个文件夹包含了不同的功能。
然后我们从main.go(这里是WebIM.go)看起:
package main
import (
"github.com/astaxie/beego"
"github.com/beego/i18n"
"github.com/beego/samples/WebIM/controllers"
)
const (
APP_VER = "0.1.1.0227"
)
func main() {
beego.Info(beego.BConfig.AppName, APP_VER)
// Register routers.
beego.Router("/", &controllers.AppController{})
// Indicate AppController.Join method to handle POST requests.
beego.Router("/join", &controllers.AppController{}, "post:Join")
// Long polling.
beego.Router("/lp", &controllers.LongPollingController{}, "get:Join")
beego.Router("/lp/post", &controllers.LongPollingController{})
beego.Router("/lp/fetch", &controllers.LongPollingController{}, "get:Fetch")
// WebSocket.
beego.Router("/ws", &controllers.WebSocketController{})
beego.Router("/ws/join", &controllers.WebSocketController{}, "get:Join")
// Register template functions.
beego.AddFuncMap("i18n", i18n.Tr)
beego.Run()
}
虽然是从该文件看起,但是并非是从该文件开始执行的,而是import中的package中的init方法执行的,有关init方法的执行顺序可以看这里,好了,先收回来看这个文件,WebIM.go中主要是定义了路由,那我们来看下路由的细节:
请求/
会跳转到controllers.AppController
对应的Get方法,该方法直接返回welcome.html页面,页面如下:
然后在此处输入用户名并选择使用的连接技术,用户名很简单就是用户ID,使用的技术这里采用长轮询或者WebSocket的方式,稍后会专门谈谈这两种方式。
点击‘进入聊天室’后,就会请求/join
,同时会携带两个参数,一个是用户名,另外一个参数是连接的方式(技术),该请求为post请求,根据beego.Router("/join", &controllers.AppController{}, "post:Join")
可知,该请求会被AppController的Join方法:
func (this *AppController) Join() {
// Get form value.
uname := this.GetString("uname")
tech := this.GetString("tech")
// Check valid.
if len(uname) == 0 {
this.Redirect("/", 302)
return
}
switch tech {
case "longpolling":
this.Redirect("/lp?uname="+uname, 302)
case "websocket":
this.Redirect("/ws?uname="+uname, 302)
default:
this.Redirect("/", 302)
}
// Usually put return after redirect.
return
}
该方法首先获取到post请求的两个参数,然后判断用户名是否为空,若为空重新跳回欢迎页面,如不为空,开始判断当前连接方式(技术)是什么,根据不同分别跳转/lp?uname=uname
和/ws?uname=uname
,然后WebIM.go中的路由路由就会根据本次请求的方式分别请求不同的控制器,/lp
将会请求controllers.LongPollingController{}
的Join
方法,/ws
将会请求controllers.WebSocketController{}
默认的Get方法,这两种请求的方式返回的前端页面是相同的,但是后台的处理是不同的。
那么,我们先区分一下这两种获取数据的方式有什么不同,首先是websocket的方式,websocket实现了服务器端主动向客户端浏览器进行消息的推送,实现了消息的同步,实质上是建立了一条socket连接,通过该链接进行通信,而长轮询则依旧是采用http的方式去请求数据,区别于轮询的方式,轮询是客户端浏览器每隔一段时间发起一次http请求,而长轮询则是发起一次请求,然后服务器端将该连接保持住(hold,暂时不拒绝也不响应),等到服务器端到达指定的情况(收到另一个消息)将该消息作为内容进行响应,客户端收到响应后再次发起长轮询请求,这样就能保证每次有新的消息到达的时候都能及时的收到响应。
在这个WebIM中是通过chan来维持长轮询的,在介绍维持长轮询这块之前,我们按次序来看下在这个例子中使用的数据结构models:
archive.go
package models
import (
"container/list"
)
//用int类型重新定义客户端产生的事件类型
type EventType int
//三种事件类型:加入、离开、消息
const (
EVENT_JOIN = iota
EVENT_LEAVE
EVENT_MESSAGE
)
//定义事件结构(事件类型、用户名、事件、内容)
type Event struct {
Type EventType // JOIN, LEAVE, MESSAGE
User string
Timestamp int // Unix timestamp (secs)
Content string
}
//用来保存服务器上能够保存的消息记录,保存最新的20条
const archiveSize = 20
// 事件归档保存
var archive = list.New()
// 将一个新的事件保存在archive中,若事件的个数已经大于等于20则删除第一个,只保留最新的20个
func NewArchive(event Event) {
if archive.Len() >= archiveSize {
archive.Remove(archive.Front())
}
archive.PushBack(event)
}
// 根据传过来的时间戳返回该时间戳之后的所有事件消息
func GetEvents(lastReceived int) []Event {
events := make([]Event, 0, archive.Len())
for event := archive.Front(); event != nil; event = event.Next() {
e := event.Value.(Event)
if e.Timestamp > int(lastReceived) {
events = append(events, e)
}
}
return events
}
看完了models才想起来上面还说到WebIM.go并不是程序执行的其实,而应该是import包时调用的init方法,由于此处导入了github.com/beego/samples/WebIM/controllers
package,所以我们来看下controllers中的init方法。
在app.go中的init方法如下:
func init() {
// 从配置文件中获取语言类型列表
langTypes = strings.Split(beego.AppConfig.String("lang_types"), "|")
// 根据语言类型加载语言环境文件
for _, lang := range langTypes {
beego.Trace("Loading language: " + lang)
if err := i18n.SetMessage(lang, "conf/"+"locale_"+lang+".ini"); err != nil {
beego.Error("Fail to set message file:", err)
return
}
}
}
在chatroom.go中init方法如下:
func init() {
go chatroom()
}
该方法直接启动一个goroutine,让其独立运行,下面我们就来看下这个chatroom方法的作用。
首先在chatroom.go中定义了一系列的变量,如:subscribe
、unsubscribe
、publish
、waitingList
、subscribers
,其中subscribe
、unsubscribe
、publish
都是chan类型,三个变量代表的含义分别是:订阅者,未订阅者,以及要进行发布的消息,之所以设置为缓冲为10的chan,这是为了应对同时多个客户端发起的请求事件。watingList
表示的是当前等待长轮询的list,该list中存储的类型为无缓冲的chan类型,也就是通过这样一个无缓冲的chan类型来保证了长轮询的“保持(hold)”:当有长轮询请求到达时,该list添加一个空的chan,然后从该ch中读取数据,由于刚才是添加的空的chan,所以这里直接从chan中读取数据自然不能读取到,那么执行到此处的无缓冲的chan自然就会阻塞,这也就是长轮询可以保持住的原因!
那么,我们在看下当事件触发的情况下,长轮询返回响应的情况:
// Notify waiting list.
for ch := waitingList.Back(); ch != nil; ch = ch.Prev() {
ch.Value.(chan bool) <- true
waitingList.Remove(ch)
}
当有事件触发时,此处循环waitingList,往list中的每个chan填入元素true,那么这时在上面使用chan读取数据来保持长轮询的部分将会被激活(启动),此时也将返回响应信息,完成一次长轮询的过程,进而开始下次的长轮询。此处使用chan来保持长轮询还是设计的很巧妙的!
我们再看下使用websocket方式来进行通信的场景。
wobsocket和longpolling共用了较多的数据,如所有的事件存档,当有新的websocket用户加入时,会调用如下函数:
websocket.go
// Join method handles WebSocket requests for WebSocketController.
//当有新的用户通过websocket方式加入时,调用执行该函数
func (this *WebSocketController) Join() {
//获取加入的用户的用户名并进行是否为空的校验
uname := this.GetString("uname")
if len(uname) == 0 {
this.Redirect("/", 302)
return
}
// 从http请求升级到WebSocket。
ws, err := websocket.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil, 1024, 1024)
if _, ok := err.(websocket.HandshakeError); ok {
http.Error(this.Ctx.ResponseWriter, "Not a websocket handshake", 400)
return
} else if err != nil {
beego.Error("Cannot setup WebSocket connection:", err)
return
}
// 将该请求转换成的websocket联通用户名一起加入chatroom
Join(uname, ws)
defer Leave(uname)
// 循环从websocket中读取数据,无数据时阻塞,有数据到达时往publish chan中添加事件,从而引起其他事件的响应
for {
_, p, err := ws.ReadMessage()
if err != nil {
return
}
publish <- newEvent(models.EVENT_MESSAGE, uname, string(p))
}
}
websocket.go中还有如下函数:
func broadcastWebSocket(event models.Event) {
//将要进行广播的事件json格式化
data, err := json.Marshal(event)
if err != nil {
beego.Error("Fail to marshal event:", err)
return
}
//循环遍历通过websocket方式加入聊天室的用户,广播该事件(单条)
for sub := subscribers.Front(); sub != nil; sub = sub.Next() {
// Immediately send event to WebSocket users.
ws := sub.Value.(Subscriber).Conn
//若是通过longpolling方式加入的,则ws为nil
if ws != nil {
//如下是将事件消息写入websocket中,若写入失败(返回err)则证明客户端已关闭websocket,此时从订阅列表中将该用户删除
if ws.WriteMessage(websocket.TextMessage, data) != nil {
// User disconnected.
unsubscribe <- sub.Value.(Subscriber).Name
}
}
}
}
在chatroom.go中源码中有一段如下:
chatroom.go
//当有新的事件消息到达时,执行如下
case event := <-publish:
// Notify waiting list.
for ch := waitingList.Back(); ch != nil; ch = ch.Prev() {
ch.Value.(chan bool) <- true
waitingList.Remove(ch)
}
broadcastWebSocket(event)
models.NewArchive(event)
if event.Type == models.EVENT_MESSAGE {
beego.Info("Message from", event.User, ";Content:", event.Content)
}
此时,我发现了如下的问题,这里的chatroom实际上已经是一个单独的goroutine,即就是此处的添加消息事件和长轮询的响应处理函数是两个goroutine,所以这里就存在了同步的问题,我们先来看下这里有的两个goroutine的执行情况:
从图中可以看出,当chatroom的goroutine中收到事件消息时,首先是激活了长轮询的等待,然后依次是广播事件、添加事件消息到消息归档中,而在另一个goroutine中激活了长轮询的等待之后立刻就会去获取lastReceived之后新的事件消息,这里就有可能产生不同步,即就是事件消息在chatroom的goroutine中还未加入事件消息归档,这里就开始去获取,这样的话当然是不能获取到消息,也就是返回的消息响应为空!虽然这样理解但是并非每次都是这样,而且即便此时此刻返回的响应消息为空,依旧不会影响消息的获取,因为当前的请求返回空之后,立刻回发起一次新的fetch请求,这次请求的lastReceived依旧是上次请求的timestamp,所以消息不会遗漏,但是,消息事件响应的灵敏度就不够高,需要发起两次请求。
为了测试,我添加了如下的代码,用来模拟chatroom的goroutine运行较慢的情况:
// Notify waiting list.
for ch := waitingList.Front(); ch != nil; ch = waitingList.Front() {
ch.Value.(chan bool) <- true
waitingList.Remove(ch)
}
time.Sleep(time.Second * 2)
broadcastWebSocket(event)
models.NewArchive(event)
如下是收到空的事件消息的情况:
从请求中也可以看出,当返回响应消息为空时,下次请求的lastRecived依旧为上次请求的时间戳,所以消息并不会丢失。
关于两个goroutine的运行差不多就这样了,在这个例子中,源码的作者有这样一处小失误,代码如下:
// Notify waiting list.
for ch := waitingList.Back(); ch != nil; ch = ch.Prev() {
ch.Value.(chan bool) <- true
waitingList.Remove(ch)
}
本来是想要循环遍历所有的长轮询的等待队列,遍历之后删除节点,但是由于循环条件出错,并不能达到预想的情况,而是遗漏了很多的节点。修改如下:
// Notify waiting list.
for ch := waitingList.Front(); ch != nil; ch = waitingList.Front() {
ch.Value.(chan bool) <- true
waitingList.Remove(ch)
}
关于前端是如何请求以及后台模板的响应这里并没有介绍,还有前端的websocket以及长轮询自动发起的请求还需要自己去体会。。。
好了,就这些了,算是对这个例子的一点认识!!!