forked from luapower/luapower
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathluapower_rpc.lua
246 lines (205 loc) · 4.9 KB
/
luapower_rpc.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
--Simple generic RPC server that allows uploading and executing Lua functions
--into separate Lua states, one state per connection.
--Written by Cosmin Apreutesei. Public Domain.
local glue = require'glue'
local pp = require'pp'
local luastate = require'luastate'
local rpc = {}
--logging
rpc.verbose = false
local function log(...)
if not rpc.verbose then return end
print(...)
end
--transport
rpc.bind_ip = '*'
rpc.default_ip = '127.0.0.1'
rpc.default_port = '19994'
local function packt(...)
return {n = select('#', ...), ...}
end
local function unpackt(t)
return unpack(t, 1, t.n)
end
local function parse(s)
local f, err = loadstring('return '..s)
if not f then return nil, err end
return glue.protect(f)()
end
local function format(t)
return pp.format(t)
end
local function send(skt, t)
local s = format(t)
return skt:send(#s..'\n'..s)
end
local function receive(skt)
local s, err = skt:receive'*l'
local sz = tonumber(s)
if not sz then return nil, 'message size expected, got '..tostring(s)..', '..tostring(err) end
local s = skt:receive(sz)
if not s then return nil, sz..' bytes message expected' end
return parse(s)
end
local function enum(t)
if t.error then
return 'error: '..t.error
end
local dt = {n=t.n}
for i=1,t.n do
dt[i] = pp.format(t[i])
end
return table.concat(dt, ', ', 1, t.n)
end
function rpc.server(ip, port)
local loop = require'socketloop'
local ffi = require'ffi'
local socket = require'socket'
ip = ip or rpc.bind_ip
port = port or rpc.default_port
local srv_skt
local function handler(skt)
local state = luastate.open()
state:openlibs()
local function pass(ok, ...)
return ok and packt(...) or {error = ...}
end
local function handle(msg) --{cmd=name, args = packt}
if msg.cmd == 'stop' then
skt:close() --important on Linux!
srv_skt:close()
log('rpc server: stopped.')
os.exit()
elseif msg.cmd == 'restart' then
local cmd =
(ffi.os == 'Windows' and 'start ' or '')..
arg[-1]..' '..arg[0]..' '..table.concat(arg, ' ')..
(ffi.os == 'Windows' and '' or ' &')
skt:close() --important on Linux!
srv_skt:close()
log('rpc server: restarting: '..cmd)
os.execute(cmd)
os.exit()
elseif msg.cmd == 'exec' then
local top = state:gettop()
state:getglobal'debug'
state:getfield(-1, 'traceback')
assert(state:type(-1) == 'function')
local args = msg.args
local func = args[1]
if type(func) == 'function' then
state:push(func)
elseif type(func) == 'string' then
state:getglobal(func)
end
local retvals = pass(state:xpcall(-2, select(2, unpackt(args))))
state:pop(2) --'debug', debug.traceback
assert(state:gettop() == top)
if not send(skt, retvals) then return end
log('rpc server: '..msg.cmd..'('..enum(args)..')')
return true
end
end
while true do
local msg = receive(skt)
if not msg then break end
if not handle(msg) then break end
end
skt:close()
state:close()
end
srv_skt = loop.newserver(ip, port, handler)
log('listening on '..ip..':'..port)
return srv_skt
end
function rpc.connect(ip, port, connect)
ip = ip or rpc.default_ip
port = port or rpc.default_port
if not connect then
local loop = require'socketloop'
connect = loop.connect
end
local skt, err = connect(ip, port)
if not skt then return nil, err end
local function close()
skt:close()
end
local function rpcall(cmd, ...)
assert(cmd)
local msg = {cmd = cmd, args = packt(...)}
assert(send(skt, msg))
if cmd == 'restart' or cmd == 'stop' then
skt:close()
return
end
local retvals = assert(receive(skt))
if retvals.error then
error(retvals.error, 2)
end
return unpackt(retvals)
end
return setmetatable({
socket = skt,
close = close,
}, {
__index = function(t, k)
t[k] = function(...)
return rpcall(k, ...)
end
return t[k]
end,
})
end
--loaded as module
if ... == 'luapower_rpc' then
return rpc
end
--cmdline interface
local loop = require'socketloop'
local v, ip, port = ...
if v ~= '-v' then
v, ip, port = false, v, ip
end
rpc.verbose = v and true
if ip == '--help' then
print('Usage: '..arg[-1]..' '..arg[0]..' [-v] [ip [port]]')
os.exit()
end
if ip == '--test' then
local srv = rpc.server()
loop.newthread(function()
--two connections, two states
local c1 = assert(rpc.connect())
local c2 = assert(rpc.connect())
--inject the inc function in each state
local function inject_inc()
function inc(x)
i = (i or 0) + x
return i
end
end
c1.exec(inject_inc)
c2.exec(inject_inc)
--exec inc 1000 times in each state
local function inc1000(x)
local n
for i = 1,1000 do
n = inc(x)
end
return n
end
local i1 = c1.exec(inc1000, 1)
local i2 = c2.exec(inc1000, 2)
assert(i1 == 1000)
assert(i2 == 2000)
--close connections
c1.close()
c2.close()
loop.stop()
print'ok'
end)
loop.start(1)
os.exit()
end
local srv = rpc.server(ip, port)
loop.start(1)