summaryrefslogtreecommitdiff
path: root/script/async/async.lua
diff options
context:
space:
mode:
Diffstat (limited to 'script/async/async.lua')
-rw-r--r--script/async/async.lua126
1 files changed, 126 insertions, 0 deletions
diff --git a/script/async/async.lua b/script/async/async.lua
new file mode 100644
index 00000000..34716502
--- /dev/null
+++ b/script/async/async.lua
@@ -0,0 +1,126 @@
+local thread = require 'bee.thread'
+local errlog = thread.channel 'errlog'
+
+local TaskId = 0
+local IdlePool = {}
+local RunningList = {}
+local GCInfo = {}
+
+thread.newchannel 'gc'
+
+local function createTask(name)
+ TaskId = TaskId + 1
+ GCInfo[TaskId] = false
+ local id = TaskId
+ local requestName = 'request' .. tostring(id)
+ local responseName = 'response' .. tostring(id)
+ thread.newchannel(requestName)
+ thread.newchannel(responseName)
+ local buf = ([[
+ID = %d
+package.cpath = %q
+package.path = %q
+local thread = require 'bee.thread'
+local request = thread.channel(%q)
+local response = thread.channel(%q)
+local errlog = thread.channel 'errlog'
+local gc = thread.channel 'gc'
+
+local function task()
+ local dump, arg = request:bpop()
+ local env = setmetatable({
+ IN = request,
+ OUT = response,
+ ERR = errlog,
+ GC = gc,
+ }, { __index = _ENV })
+ local f, err = load(dump, '=task', 't', env)
+ if not f then
+ errlog:push(err .. '\n' .. dump)
+ return
+ end
+ local result = f(arg)
+ response:push(result)
+end
+
+while true do
+ local ok, result = xpcall(task, debug.traceback)
+ if not ok then
+ errlog:push(result)
+ end
+ collectgarbage()
+ gc:push(ID, collectgarbage 'count')
+end
+]]):format(id, package.cpath, package.path, requestName, responseName)
+ log.debug('Create thread, id: ', id, 'task: ', name)
+ return {
+ id = id,
+ thread = thread.thread(buf),
+ request = thread.channel(requestName),
+ response = thread.channel(responseName),
+ }
+end
+
+local function run(name, arg, callback)
+ local dump = io.load(ROOT / 'src' / 'async' / (name .. '.lua'))
+ if not dump then
+ error(('找不到[%s]'):format(name))
+ end
+ local task = table.remove(IdlePool)
+ if not task then
+ task = createTask(name)
+ end
+ RunningList[task.id] = {
+ task = task,
+ callback = callback,
+ }
+ task.request:push(dump, arg)
+ -- TODO 线程回收后禁止外部再使用通道
+ return task.request, task.response
+end
+
+local function callback(id, running)
+ if running.callback then
+ while true do
+ local results = table.pack(running.task.response:pop())
+ if not results[1] then
+ break
+ end
+ -- TODO 封装成对象
+ local suc, destroy = xpcall(running.callback, log.error, table.unpack(results, 2))
+ if not suc or destroy then
+ RunningList[id] = nil
+ IdlePool[#IdlePool+1] = running.task
+ break
+ end
+ end
+ end
+end
+
+local function checkGC()
+ local gc = thread.channel 'gc'
+ while true do
+ local ok, id, count = gc:pop()
+ if not ok then
+ break
+ end
+ GCInfo[id] = count
+ end
+end
+
+local function onTick()
+ local ok, msg = errlog:pop()
+ if ok then
+ log.error(msg)
+ end
+ for id, running in pairs(RunningList) do
+ callback(id, running)
+ end
+ checkGC()
+end
+
+return {
+ onTick = onTick,
+ run = run,
+ info = GCInfo,
+}