diff options
Diffstat (limited to 'server/src/async')
-rw-r--r-- | server/src/async/async.lua | 89 | ||||
-rw-r--r-- | server/src/async/init.lua | 1 | ||||
-rw-r--r-- | server/src/async/proto.lua | 59 |
3 files changed, 149 insertions, 0 deletions
diff --git a/server/src/async/async.lua b/server/src/async/async.lua new file mode 100644 index 00000000..8c75540c --- /dev/null +++ b/server/src/async/async.lua @@ -0,0 +1,89 @@ +local thread = require 'bee.thread' +local errlog = thread.channel 'errlog' +local taskId = 0 +local idlePool = {} +local runningList = {} + +local function createTask() + taskId = taskId + 1 + local id = taskId + local requestName = 'request' .. tostring(id) + local responseName = 'response' .. tostring(id) + thread.newchannel(requestName) + thread.newchannel(responseName) + local buf = ([[ +package.cpath = %q +package.path = %q +local thread = require 'bee.thread' +local request = thread.channel(%q) +local response = thread.channel(%q) +local errlog = thread.channel 'errlog' + +local function task() + local dump, env = request:bpop() + if env then + setmetatable(env, { __index = _ENV }) + else + env = _ENV + end + local f, err = load(dump, '=task', 't', env) + if not f then + errlog:push(err) + return + end + local results = table.pack(pcall(f)) + local ok = table.remove(results, 1) + if not ok then + local err = table.remove(results, 1) + errlog:push(err) + return + end + results.n = results.n - 1 + response:push(results) +end + +while true do + task() +end +]]):format(package.cpath, package.path, requestName, responseName) + log.debug('Create thread, id: ', id) + return { + id = id, + thread = thread.thread(buf), + request = thread.channel(requestName), + response = thread.channel(responseName), + } +end + +local function call(dump, env, callback) + local task = table.remove(idlePool) + if not task then + task = createTask() + end + runningList[task.id] = { + task = task, + callback = callback, + } + task.request:push(dump, env) +end + +local function onTick() + local ok, msg = errlog:pop() + if ok then + log.error(msg) + end + for id, running in pairs(runningList) do + local ok, results = running.task.response:pop() + if ok then + runningList[id] = nil + idlePool[#idlePool+1] = running.task + xpcall(running.callback, log.debug, table.unpack(results)) + end + end +end + +return { + onTick = onTick, + call = call, + require = require, +} diff --git a/server/src/async/init.lua b/server/src/async/init.lua new file mode 100644 index 00000000..3d75a720 --- /dev/null +++ b/server/src/async/init.lua @@ -0,0 +1 @@ +return require 'async.async' diff --git a/server/src/async/proto.lua b/server/src/async/proto.lua new file mode 100644 index 00000000..4570de2e --- /dev/null +++ b/server/src/async/proto.lua @@ -0,0 +1,59 @@ +local thread = require 'bee.thread' +local json = require 'json' +local proto = thread.channel 'proto' +local errlog = thread.channel 'errlog' + +local function pushError(...) + local t = table.pack(...) + for i = 1, t.n do + t[i] = tostring(t[i]) + end + local buf = table.concat(t, '\t') + errlog:push(buf) +end + +local function readProtoHeader() + local header = io.read 'l' + if header:sub(1, #'Content-Length') == 'Content-Length' then + return header + elseif header:sub(1, #'Content-Type') == 'Content-Type' then + return nil + else + pushError('Proto header error:', header) + return nil + end +end + +local function readProtoContent(header) + local len = tonumber(header:match('%d+')) + if not len then + pushError('Proto header error:', header) + return nil + end + local buf = io.read(len+2) + if not buf then + return nil + end + local suc, res = pcall(json.decode, buf) + if not suc then + pushError('Proto error:', buf) + return nil + end + return res +end + +local function readProto() + local header = readProtoHeader() + if not header then + return + end + local data = readProtoContent(header) + if not data then + return + end + proto:push(data) +end + +while true do + readProto() +end |