ConcurrentLua
介绍
ConcurrentLua 是一个无共享异步消息传递模型的实现.该模型来自Erlang语言.
她改编了Erlang的并发元素并整合进Lua里.
ConcurrentLua的一个核心元素是 process(进程).一个进程是一个轻量级
虚拟机
线程,扮演和操作系统的进程同样的角色;他们不共享内存而是使用某这进程间通讯
机制.这些进程能够根据需要被创建和销毁,并通过一个简单的环罗宾(轮训)算法来
调度他们.
每一个进程关联到一个邮箱(临时存储消息的队列),通过邮箱来接受别的进程发来
的消息.进程可以在任何时候检查自己的邮箱有没有新消息抵达,如果有,进程可以
按照抵达的顺序依次读取.
每个进程都有一个唯一的数字作为进程标识,叫 PID(process identifier).也可以
给进程取一个名字,并用名字来指代进程.进程名和进程的对应关系被保存到一个
中心储藏室–registry(注册表).进程可以编辑注册表,添加或者删除表项.
错误捕捉机制也被实现成 monitors 和 links .通过 monitors 进程能够监视其他
进程,并在被监视进程异常终止的时候获得通知.通过 linker 进程绑定进程到一起,
当一个进程异常终止的时候,其他进程也被通知并终止.
本系统还支持分布式编程和所有相关的组件.分布式的进程通过和本地进程一样的
方式来通讯.
分布是基于 node(节点) 组件的.一个节点代表一个运行着很多进程的运行时环境.
节点们可以相互连接和通讯,于是建立了一个虚拟网络.分布的进程使用这个网络来
顺序交换信息.
每个节点有个名字.其他的节点可以通过这个名字来连接.一个端口映射器精灵进程
(译注:就是服务进程,类似Erlang提供的名字服务)提供了名字解析服务.端口映射器
知晓虚拟网络中所有的节点的信息.
正如进程可以在本地创建,进程已可以在远端节点被创建.一个远程进程能够被视同
为一个本地进程来操作.
如果虚拟网络中的节点是全互联的(每一个节点双向连接其他的节点),那么可以使用
全局进程名.节点们相互交流和保养虚拟全局注册表并保持自己本地的注册表时时
更新.
monitors 和 links 以同样的语义支持分布进程和本地进程.节点可以透明的处理
分布进程的错误.另外,进程可以像监视整个节点.
节点可以在通讯前进行鉴权.一个已鉴权的节点才可以成为虚拟网络的一部分.这些
策略通过一个简单安全机制来保证.
实现
ConcurrentLua的实现是基于Lua组件系统实现的.这个系统负责组织和管理Lua的模块和子模块.主模块有两个,分别提供了并发功能和分布式编程功能.并发模块可以单独加载,每个模块都可以选择性加载需要使用的子模块.独立的端口映射器精灵进程也是实现的一部分.
系统中的进程是通过Lua的协程机制来实现的.一个进程其实就是一个Lua协程,
通过 yield 来挂起一个进程,通过 resume 来继续执行一个进程.
进程的调度机制仍然是基于Lua使用的 协作式多线程模型. 进程自愿挂起自己,
从而让其它的进程获得运行的机会.然而,挂起和恢复进程被部分隐藏于高层机制
之下;当一个进程去等待消息抵达的时候挂起,而在消息抵达进程邮箱后准备恢复.
一个简单的环罗宾(轮训)调度器用来恢复进程的执行.
任何类型的Lua数据,除了内存引用外,都可以通过消息来发送.消息可以是布尔值,
数字,字符串,表或者函数,或者他们的混合.数据自动在发送时被序列化,并在接受
时反序列化,所以的数据都是值传递.
节点间的分布式进程间通讯机制是基于异步socket的.映射到网络层是非阻塞
socket和定时轮训.这是如今大部分Lua模块采用的方法,非阻塞语义也应该被用
在例如文件和管道的IO操作上.
用法
一些例子提供了系统不要组件的用法,例如,创建进程,分布式进程的消息传递和错误捕获.
创建进程
spawn()函数可以创建进程.spawn()函数接受至少一个参数,该参数标志进程的入口函数.其它附加参数则被直接转交给入口函数.
下面的例子示范创建一个进程.该进程输出指定次数的消息:
require 'concurrent'
function hello_world(times)
for i = 1, times do print('hello world') end
print('done')
end
concurrent.spawn(hello_world, 3)
concurrent.loop()
输出应该是:
hello world
hello world
hello world
done
首先加载系统:
require 'concurrent'
进程入口函数:
function hello_world(times)
for i = 1, times do print('hello world') end
print('done')
end
创建一个新进程:
concurrent.spawn(hello_world, 3)
最后调用系统无限循环:
concurrent.loop()消息交互进程通过 send() 和 receive() 函数来交换消息.同样,self()函数也被用来获取本进程ID.
下面的程序实现了两个进程交换消息然后终止:
require 'concurrent'
function pong()
while true do
local msg = concurrent.receive()
if msg.body == 'finished' then
break
elseif msg.body == 'ping' then
print('pong received ping')
concurrent.send(msg.from, { body = 'pong' })
end
end
print('pong finished')
end
function ping(n, pid)
for i = 1, n do
concurrent.send(pid, {
from = concurrent.self(),
body = 'ping'
})
local msg = concurrent.receive()
if msg.body == 'pong' then
print('ping received pong')
end
end
concurrent.send(pid, {
from = concurrent.self(),
body = 'finished'
})
print('ping finished')
end
pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)
concurrent.loop()
输出应该是:
pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
ping finished
在 pong 进程被创建后, ping 进程获得了 pong 进程的 PID:
pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)
ping 进程发送一个消息:
concurrent.send(pid, {
from = concurrent.self(),
body = 'ping'
})
pong 进程等待消息抵达,然后把接收到的消息保存到一个变量中:
local msg = concurrent.receive()
pong 进程回复:
concurrent.send(msg.from, { body = 'pong' })
pong 进程在接收到 ping 进程发来的一个提示后终结.
注册进程名可以用进程名替代PID来指定消息接收方. register() 函数可以用来在注册表
(译注:指系统的名字对应表,而不是Windows的注册表,顺便鄙视一下Windows. 😃 )
创建一个进程的名字:
require 'concurrent'
function pong()
while true do
local msg = concurrent.receive()
if msg.body == 'finished' then
break
elseif msg.body == 'ping' then
print('pong received ping')
concurrent.send(msg.from, { body = 'pong' })
end
end
print('pong finished')
end
function ping(n)
for i = 1, n do
concurrent.send('pong', {
from = concurrent.self(),
body = 'ping'
})
local msg = concurrent.receive()
if msg.body == 'pong' then
print('ping received pong')
end
end
concurrent.send('pong', {
from = concurrent.self(),
body = 'finished'
})
print('ping finished')
end
pid = concurrent.spawn(pong)
concurrent.register('pong', pid)
concurrent.spawn(ping, 3)
concurrent.loop()
相对前一个版本的改变就是 ping 进程发送消息的地方:
concurrent.send('pong', {
from = concurrent.self(),
body = 'ping'
})
和:
concurrent.send('pong', {
from = concurrent.self(),
body = 'finished'
})
以及现在 pong 进程注册了它的名字:
concurrent.register('pong', pid)
因此 ping 进程不需要知道 pong 进程的 PID 了.
分布式消息传递不同节点上的进程仍然可以使用同样的消息传递机制.远程进程通过 PID或进程名 加上
节点名来指定.先前的例子可以改造成两个程序,分别是一个独立进程.
pong 进程的代码如下:
require 'concurrent'
function pong()
while true do
local msg = concurrent.receive()
if msg.body == 'finished' then
break
elseif msg.body == 'ping' then
print('pong received ping')
concurrent.send(msg.from, { body = 'pong' })
end
end
print('pong finished')
end
concurrent.init('pong@gaia')
pid = concurrent.spawn(pong)
concurrent.register('pong', pid)
concurrent.loop()
concurrent.shutdown()
ping 进程的代码如下:
require 'concurrent'
function ping(n)
for i = 1, n do
concurrent.send({ 'pong', 'pong@gaia' }, {
from = { concurrent.self(), concurrent.node() },
body = 'ping'
})
local msg = concurrent.receive()
if msg.body == 'pong' then
print('ping received pong')
end
end
concurrent.send({ 'pong', 'pong@gaia' }, {
from = { concurrent.self(), concurrent.node() },
body = 'finished'
})
print('ping finished')
end
concurrent.spawn(ping, 3)
concurrent.init('ping@selene')
concurrent.loop()
concurrent.shutdown()
(译注: 如果你想自己跑这个例子需要修改上面的节点名后半部分的机器名部分,使之和你的网络环境相匹配.)pong 进程的输出应该是:
pong received ping
pong received ping
pong received ping
pong finished
ping 进程的输出应该是:
ping received pong
ping received pong
ping received pong
ping finished
在这个例子里,运行时系统运行在分布式模式.为了看到结果,端口映射器必须先运行:
$ clpmd
初始化 pong 进程所在节点的代码:
concurrent.init('pong@gaia')
初始化 ping 进程所在节点的代码:
concurrent.init('ping@selene')
上面两句代码注册节点到端口映射器.去注册是通过:
concurrent.shutdown()
这个例子的唯一改动是消息发送的目的地.node()函数会返回调用进程坐在节点的名字:
concurrent.send({ 'pong', 'pong@gaia' }, {
from = { concurrent.self(), concurrent.node() },
body = 'ping'
})
接下来:
concurrent.send({ 'pong', 'pong@gaia' }, {
from = { concurrent.self(), concurrent.node() },
body = 'finished'
})
错误处理一个捕获进程间错误的方法是连接进程.两个进程被绑定到一起,一个异常终止的后另一个也会终止.link()函数用来绑定进程:
require 'concurrent'
function ping(n, pid)
concurrent.link(pid)
for i = 1, n do
concurrent.send(pid, {
from = concurrent.self(),
body = 'ping'
})
local msg = concurrent.receive()
if msg.body == 'pong' then
print('ping received pong')
end
end
print('ping finished')
concurrent.exit('finished')
end
function pong()
while true do
local msg = concurrent.receive()
if msg.body == 'ping' then
print('pong received ping')
concurrent.send(msg.from, { body = 'pong' })
end
end
print('pong finished')
end
pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)
concurrent.loop()
输出应该是:
pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
– 译注:这里应该是: ping fininshedpong 进程永远不会运行到最后一行,因为他在接收到 ping 进程退出信号的时候会终止.
连接进程的代码如下:
concurrent.link(pid)
也可以捕获进程终止导致的exit信号.被捕获的exit信号会转换成一个特殊的消息:
require 'concurrent'
concurrent.setoption('trapexit', true)
function pong()
while true do
local msg = concurrent.receive()
if msg.signal == 'EXIT' then
break
elseif msg.body == 'ping' then
print('pong received ping')
concurrent.send(msg.from, { body = 'pong' })
end
end
print('pong finished')
end
function ping(n, pid)
concurrent.link(pid)
for i = 1, n do
concurrent.send(pid, {
from = concurrent.self(),
body = 'ping'
})
local msg = concurrent.receive()
if msg.body == 'pong' then
print('ping received pong')
end
end
print('ping finished')
concurrent.exit('finished')
end
pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)
concurrent.loop()
输出应该是:
pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
ping finished可以通过 setoption() 函数来设置进程链接的选项,这里是 trapexit 选项:
concurrent.setoption('trapexit', true)
pong 进程会接收到一个退出消息:
if msg.signal == 'EXIT' then
break
基于提示消息的monitor, 也可以用来处理错误.
原文连接:
http://floss.qiniucdn.com/data/20110831112702/index.html