很早就知道春哥有这个 lua-resty-websocket 的包了,可以让openresty具备websocket的通信能力,一直也以为这个包和node.js或者golang等其他语言的websocket包一样好用,可以很容易的实现全双工的操作,单播,多播,广播等功能。

但是事实证明,我的想法错误了,简单的搭建了 openresty 的websocket服务之后,发现我们只能进行简单的单播,甚至我一开始连挂起连接,等待客户端继续发送消息都不会。至于想要根据其他数据源来主动推送给某个链接,更加不敢想了。

挂起ws连接

我们需要解决的问题就是,如何挂起连接,让or能够持续的接受客户端的ws消息,并作出不同的响应。比如,发送一个ping,我响应一个pong,发送一个hello,我响应一个world之类的。代码其实很简单,只需要在春哥的示例代码上加一段 while 1 do … end 即可,如下:

1
2
3
4
while 1 do
local data, typ, err = wb:recv_frame()
...
end

这个 wb:recv_frame() 方法会自动帮我挂起链接,非阻塞式的挂起,这样我们就能够持续的接收客户端的消息。

单进程推送

现在需要解决的就是,如何在挂起请求的时候,通过外部的另外一个HTTP请求来对这个链接进行推送消息。 因为openresty是不允许lua跨进程读取变量的,所以我们先把nginx配置开成1核。 我首先想到的是,可以通过保存 wbServer 这个实例,来做到在Http请求示例的推送,预想的架构如下图:

clipboard.png

但是现实是残酷的,经过测试发现,其他Coroutine里是无法去操作ws的Coroutine里的推送消息的。于是方案1宣告失败了。

后来,突然想起来,春哥新发布了一个 semaphore 的包,这个包能解决单worker进程内的各个coroutine的通信问题。于是我们的伪代码,就写成了这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local sendText = function()
while 1 do
local ok, err = sema:wait(60) -- wait for a second at most
if ok then
wb:send_text(string.format("[%s][%s] ycfuck", uuid, ngx.now()))
wb:set_timeout(6000)
end
end
end
ngx.thread.spawn(sendText)

-- 下面还是不变
while 1 do
local data, typ, err = wb:recv_frame()
...
end

这时候,我们只需要对全局的 sema 实例,进行推送,就可以实现消息推送了,当然对于多播,广播,单播的各种需求都可以通过这样的思路来解决。然后是我们接受http请求的伪代码:

1
2
3
4
5
6
...
local c = g.sema:count()
if c < 0 then
g.sema:post(math.abs(c))
end
...

多Worker推送

我们现在已经实现了单Worker的Http推送ws的功能了,问题又来了,openresty是多Worker的模型,所以单Worker的模型还不足以能够支持业务,因为非常有可能,客户端的WS连接连接到了worker1上,而我们推送的HTTP请求被worker2接收了。

对于跨Worker通信,我们能想到的肯定就是ngx.shared.dict了,我们通过在 initWorker 阶段启动一个永远不退出的方法,循环的去判断 sharedict 中的某一个key数组是否存在内容,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
function loopWS(premature)
local wsBc = require('wsBc')

if premature then -- 如果提前触发,忽略掉
return
end

local key = string.format('ws_%s', ngxWorkerId)

local stateDict = ngx.shared.stateDict
local data = stateDict:rpop(key)
if data then
local wb = wsBc.getServer(data)

if wb then
local c = g.sema:count()
if c < 0 then
g.sema:post(math.abs(c))
end

end
end

return ngx.timer.at(0.001, loopWS)
end

而我们接收用户Http的请求的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local args = ngx.req.get_uri_args()
local uuid = args['uuid']
local stateDict = ngx.shared.stateDict

if uuid then

local stateDict = ngx.shared.stateDict

local count = ngx.worker.count() - 1
for i=0, count do
local key = string.format('ws_%s', i)
stateDict:lpush(key, uuid)
end

ngx.say(string.format('broadcast ok [%s]', ngx.now()))

else
ngx.say('invalid uuid')
end

这样,如果想要单播,我们就需要对每一个websocket的链接进行编号,然后控制者通过发送uuid这个编号,来推送信息。所以我们最终的模型如下图:

clipboard.png

总结

我们实现了在多Worker下,不依赖Redis等其他第三方依赖,可以自由的收发 WS 的消息了。简单了下压测,这套模型,在配置文件仅开16cpu的情况下,C100K的连接没有任何问题,其中还夹杂着单播,同样配置的NETTY服务器,能支持4W5左右,感觉openresty还没有被压到头。对于大量的多播和广播性能应该会有所下降,就没有做性能压测了

性能压测结果如下图:

clipboard.png

查看原文