From 4ffc826d7ce5ca57c2929e040ed5eefb53f9d115 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Tue, 22 Jan 2019 15:18:27 +0800 Subject: =?UTF-8?q?=E6=95=B4=E7=90=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/async/async.lua | 89 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 server/src/async/async.lua (limited to 'server/src/async/async.lua') diff --git a/server/src/async/async.lua b/server/src/async/async.lua new file mode 100644 index 00000000..8c75540c --- /dev/null +++ b/server/src/async/async.lua @@ -0,0 +1,89 @@ +local thread = require 'bee.thread' +local errlog = thread.channel 'errlog' +local taskId = 0 +local idlePool = {} +local runningList = {} + +local function createTask() + taskId = taskId + 1 + local id = taskId + local requestName = 'request' .. tostring(id) + local responseName = 'response' .. tostring(id) + thread.newchannel(requestName) + thread.newchannel(responseName) + local buf = ([[ +package.cpath = %q +package.path = %q +local thread = require 'bee.thread' +local request = thread.channel(%q) +local response = thread.channel(%q) +local errlog = thread.channel 'errlog' + +local function task() + local dump, env = request:bpop() + if env then + setmetatable(env, { __index = _ENV }) + else + env = _ENV + end + local f, err = load(dump, '=task', 't', env) + if not f then + errlog:push(err) + return + end + local results = table.pack(pcall(f)) + local ok = table.remove(results, 1) + if not ok then + local err = table.remove(results, 1) + errlog:push(err) + return + end + results.n = results.n - 1 + response:push(results) +end + +while true do + task() +end +]]):format(package.cpath, package.path, requestName, responseName) + log.debug('Create thread, id: ', id) + return { + id = id, + thread = thread.thread(buf), + request = thread.channel(requestName), + response = thread.channel(responseName), + } +end + +local function call(dump, env, callback) + local task = table.remove(idlePool) + if not task then + task = createTask() + end + runningList[task.id] = { + task = task, + callback = callback, + } + task.request:push(dump, env) +end + +local function onTick() + local ok, msg = errlog:pop() + if ok then + log.error(msg) + end + for id, running in pairs(runningList) do + local ok, results = running.task.response:pop() + if ok then + runningList[id] = nil + idlePool[#idlePool+1] = running.task + xpcall(running.callback, log.debug, table.unpack(results)) + end + end +end + +return { + onTick = onTick, + call = call, + require = require, +} -- cgit v1.2.3