summaryrefslogtreecommitdiff
path: root/server/src/async
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/async')
-rw-r--r--server/src/async/async.lua89
-rw-r--r--server/src/async/init.lua1
-rw-r--r--server/src/async/proto.lua59
3 files changed, 149 insertions, 0 deletions
diff --git a/server/src/async/async.lua b/server/src/async/async.lua
new file mode 100644
index 00000000..8c75540c
--- /dev/null
+++ b/server/src/async/async.lua
@@ -0,0 +1,89 @@
+local thread = require 'bee.thread'
+local errlog = thread.channel 'errlog'
+local taskId = 0
+local idlePool = {}
+local runningList = {}
+
+local function createTask()
+ taskId = taskId + 1
+ local id = taskId
+ local requestName = 'request' .. tostring(id)
+ local responseName = 'response' .. tostring(id)
+ thread.newchannel(requestName)
+ thread.newchannel(responseName)
+ local buf = ([[
+package.cpath = %q
+package.path = %q
+local thread = require 'bee.thread'
+local request = thread.channel(%q)
+local response = thread.channel(%q)
+local errlog = thread.channel 'errlog'
+
+local function task()
+ local dump, env = request:bpop()
+ if env then
+ setmetatable(env, { __index = _ENV })
+ else
+ env = _ENV
+ end
+ local f, err = load(dump, '=task', 't', env)
+ if not f then
+ errlog:push(err)
+ return
+ end
+ local results = table.pack(pcall(f))
+ local ok = table.remove(results, 1)
+ if not ok then
+ local err = table.remove(results, 1)
+ errlog:push(err)
+ return
+ end
+ results.n = results.n - 1
+ response:push(results)
+end
+
+while true do
+ task()
+end
+]]):format(package.cpath, package.path, requestName, responseName)
+ log.debug('Create thread, id: ', id)
+ return {
+ id = id,
+ thread = thread.thread(buf),
+ request = thread.channel(requestName),
+ response = thread.channel(responseName),
+ }
+end
+
+local function call(dump, env, callback)
+ local task = table.remove(idlePool)
+ if not task then
+ task = createTask()
+ end
+ runningList[task.id] = {
+ task = task,
+ callback = callback,
+ }
+ task.request:push(dump, env)
+end
+
+local function onTick()
+ local ok, msg = errlog:pop()
+ if ok then
+ log.error(msg)
+ end
+ for id, running in pairs(runningList) do
+ local ok, results = running.task.response:pop()
+ if ok then
+ runningList[id] = nil
+ idlePool[#idlePool+1] = running.task
+ xpcall(running.callback, log.debug, table.unpack(results))
+ end
+ end
+end
+
+return {
+ onTick = onTick,
+ call = call,
+ require = require,
+}
diff --git a/server/src/async/init.lua b/server/src/async/init.lua
new file mode 100644
index 00000000..3d75a720
--- /dev/null
+++ b/server/src/async/init.lua
@@ -0,0 +1 @@
+return require 'async.async'
diff --git a/server/src/async/proto.lua b/server/src/async/proto.lua
new file mode 100644
index 00000000..4570de2e
--- /dev/null
+++ b/server/src/async/proto.lua
@@ -0,0 +1,59 @@
+local thread = require 'bee.thread'
+local json = require 'json'
+local proto = thread.channel 'proto'
+local errlog = thread.channel 'errlog'
+
+local function pushError(...)
+ local t = table.pack(...)
+ for i = 1, t.n do
+ t[i] = tostring(t[i])
+ end
+ local buf = table.concat(t, '\t')
+ errlog:push(buf)
+end
+
+local function readProtoHeader()
+ local header = io.read 'l'
+ if header:sub(1, #'Content-Length') == 'Content-Length' then
+ return header
+ elseif header:sub(1, #'Content-Type') == 'Content-Type' then
+ return nil
+ else
+ pushError('Proto header error:', header)
+ return nil
+ end
+end
+
+local function readProtoContent(header)
+ local len = tonumber(header:match('%d+'))
+ if not len then
+ pushError('Proto header error:', header)
+ return nil
+ end
+ local buf = io.read(len+2)
+ if not buf then
+ return nil
+ end
+ local suc, res = pcall(json.decode, buf)
+ if not suc then
+ pushError('Proto error:', buf)
+ return nil
+ end
+ return res
+end
+
+local function readProto()
+ local header = readProtoHeader()
+ if not header then
+ return
+ end
+ local data = readProtoContent(header)
+ if not data then
+ return
+ end
+ proto:push(data)
+end
+
+while true do
+ readProto()
+end