From 039b11fc0b7387ca5e19b20b33d839d42d3068aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=80=E8=90=8C=E5=B0=8F=E6=B1=90?= Date: Thu, 19 Sep 2019 19:59:22 +0800 Subject: =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server-beta/main.lua | 2 +- server-beta/src/pub/brave.lua | 22 ++++- server-beta/src/pub/client.lua | 46 ++++++--- server-beta/src/service.lua | 26 +++-- server-beta/src/task.lua | 51 +++++++++- server-beta/src/timer.lua | 213 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 333 insertions(+), 27 deletions(-) create mode 100644 server-beta/src/timer.lua (limited to 'server-beta') diff --git a/server-beta/main.lua b/server-beta/main.lua index 79af5014..32fb213c 100644 --- a/server-beta/main.lua +++ b/server-beta/main.lua @@ -15,4 +15,4 @@ ac = {} xpcall(dofile, log.debug, rootPath .. '/debugger.lua') local service = require 'service' -service:start() +service.start() diff --git a/server-beta/src/pub/brave.lua b/server-beta/src/pub/brave.lua index c9fd0afe..7662a39e 100644 --- a/server-beta/src/pub/brave.lua +++ b/server-beta/src/pub/brave.lua @@ -5,7 +5,23 @@ local m = {} m.type = 'pub.brave' --- 注册成为勇者 -function m:register(id) - self.taskpad = thread.channel('taskpad' .. id) - self.waiter = thread.channel('waiter' .. id) +function m.register(id) + m.taskpad = thread.channel('taskpad' .. id) + m.waiter = thread.channel('waiter' .. id) + m.start() end + +--- 开始找工作 +function m.start() + while true do + local suc, name, id, params = m.taskpad:pop() + if not suc then + -- 找不到工作的勇者,只好睡觉 + thread.sleep(0.01) + end + local result = require(name)(params) + m.waiter:push(id, result) + end +end + +return m diff --git a/server-beta/src/pub/client.lua b/server-beta/src/pub/client.lua index 5ac24684..005bf9ce 100644 --- a/server-beta/src/pub/client.lua +++ b/server-beta/src/pub/client.lua @@ -1,13 +1,14 @@ local thread = require 'bee.thread' local utility = require 'utility' local task = require 'task' +local type = require 'type' local braveTemplate = [[ package.path = %q package.cpath = %q local brave = require 'pub.brave' -brave:register(%d) +brave.register(%d) ]] ---@class pub_client @@ -17,13 +18,13 @@ m.braves = {} --- 招募勇者,勇者会从公告板上领取任务,完成任务后到看板娘处交付任务 ---@param num integer -function m:recruitBraves(num) +function m.recruitBraves(num) for _ = 1, num do - local id = #self.braves + 1 + local id = #m.braves + 1 log.info('Create pub brave:', id) thread.newchannel('taskpad' .. id) thread.newchannel('waiter' .. id) - self.braves[id] = { + m.braves[id] = { id = id, taskpad = thread.channel('taskpad' .. id), waiter = thread.channel('waiter' .. id), @@ -40,42 +41,57 @@ function m:recruitBraves(num) end --- 勇者是否有空 -function m:isIdle(brave) - return brave.currentTask == nil and not next(brave.taskList) +function m.isIdle(brave) + return next(brave.taskList) == nil end --- 给勇者推送任务 -function m:pushTask(brave, name, ...) +function m.pushTask(brave, name, params) local taskID = brave.counter() local co = coroutine.running() - brave.taskpad:push(name, taskID, ...) + brave.taskpad:push(name, taskID, params) brave.taskList[taskID] = co return coroutine.yield(co) end --- 从勇者处接收任务反馈 -function m:popTask(brave, id, ...) +function m.popTask(brave, id, params) local co = brave.taskList[id] if not co then log.warn(('Brave pushed unknown task result: [%d] => [%d]'):format(brave.id, id)) return end brave.taskList[id] = nil - coroutine.resume(co, ...) + coroutine.resume(co, params) end --- 发布任务 ---@parma name string -function m:task(name, ...) +function m.task(name, params) local _, main = coroutine.running() if main then - error('不能在主协程中发布任务') + error('不能在主线程中发布任务') end - for _, brave in ipairs(self.braves) do - if self:isIdle(brave) then - return self:pushTask(brave, name, ...) + for _, brave in ipairs(m.braves) do + if m.isIdle(brave) then + return m.pushTask(brave, name, params) end end end +--- 接收反馈 +function m.recieve() + local _, main = coroutine.running() + if main then + error('不能在主线程中接收反馈') + end + for _, brave in ipairs(m.braves) do + local suc, id, result + if not suc then + goto CONTINUE + end + ::CONTINUE:: + end +end + return m diff --git a/server-beta/src/service.lua b/server-beta/src/service.lua index 0aaf5a23..639e2ae2 100644 --- a/server-beta/src/service.lua +++ b/server-beta/src/service.lua @@ -3,26 +3,40 @@ local subprocess = require 'bee.subprocess' local thread = require 'bee.thread' local task = require 'task' local utility = require 'utility' +local timer = require 'timer' local m = {} m.type = 'service' -function m:listenProto() +function m.listenProto() subprocess.filemode(io.stdin, 'b') subprocess.filemode(io.stdout, 'b') io.stdin:setvbuf 'no' io.stdout:setvbuf 'no' - coroutine.wrap(function () + task.create(function () while true do - local proto = client:task('loadProto') + local proto = client.task('loadProto') log.debug('proto:', utility.dump(proto)) end end) end -function m:start() - client:recruitBraves(4) - self:listenProto() +function m.startTimer() + local last = os.clock() + while true do + thread.sleep(0.01) + local current = os.clock() + local delta = current - last + last = current + m.update(delta) + end +end + +function m.start() + client.recruitBraves(4) + m.listenProto() + + m.startTimer() end return m diff --git a/server-beta/src/task.lua b/server-beta/src/task.lua index fa44b7c6..23b6ae2f 100644 --- a/server-beta/src/task.lua +++ b/server-beta/src/task.lua @@ -1,7 +1,54 @@ -local thread = require 'bee.thread' +local timer = require 'timer' ---@class task local m = {} m.type = 'task' -return m +--- 设置错误处理器 +---@param errHandle function {comment = '当有错误发生时,会以错误堆栈为参数调用该函数'} +function m.setErrorHandle(errHandle) + m.errorHandle = errHandle +end + +--- 创建一个任务 +function m.create(callback) + coroutine.create(callback) +end + +--- 休眠一段时间 +---@param time number +function m.sleep(time) + local co, main = coroutine.running() + if main then + if m.errorHandle then + m.errorHandle(debug.traceback('Cant sleep in main thread')) + end + return + end + timer.wait(time, function () + local suc, err = coroutine.resume(co) + if not suc and m.errHandle then + m.errHandle(debug.traceback(co, err)) + end + end) + coroutine.yield() +end + +--- 等待直到唤醒 +---@param waker function +function m.wait(waker) + local co, main = coroutine.running() + if main then + if m.errorHandle then + m.errorHandle(debug.traceback('Cant wait in main thread')) + end + return + end + waker(function () + local suc, err = coroutine.resume(co) + if not suc and m.errHandle then + m.errHandle(debug.traceback(co, err)) + end + end) + coroutine.yield() +end diff --git a/server-beta/src/timer.lua b/server-beta/src/timer.lua new file mode 100644 index 00000000..0add4c22 --- /dev/null +++ b/server-beta/src/timer.lua @@ -0,0 +1,213 @@ +local setmetatable = setmetatable +local mathMax = math.max +local mathFloor = math.floor + +_ENV = nil + +local curFrame = 0 +local maxFrame = 0 +local curIndex = 0 +local freeQueue = {} +local timer = {} + +local function allocQueue() + local n = #freeQueue + if n > 0 then + local r = freeQueue[n] + freeQueue[n] = nil + return r + else + return {} + end +end + +local function mTimeout(self, timeout) + if self._pauseRemaining or self._running then + return + end + local ti = curFrame + timeout + local q = timer[ti] + if q == nil then + q = allocQueue() + timer[ti] = q + end + self._timeoutFrame = ti + self._running = true + q[#q + 1] = self +end + +local function mWakeup(self) + if self._removed then + return + end + self._running = false + if self._onTimer then + self:_onTimer() + end + if self._removed then + return + end + if self._timerCount then + if self._timerCount > 1 then + self._timerCount = self._timerCount - 1 + mTimeout(self, self._timeout) + else + self._removed = true + end + else + mTimeout(self, self._timeout) + end +end + +local function getRemaining(self) + if self._removed then + return 0 + end + if self._pauseRemaining then + return self._pauseRemaining + end + if self._timeoutFrame == curFrame then + return self._timeout or 0 + end + return self._timeoutFrame - curFrame +end + +local function onTick() + local q = timer[curFrame] + if q == nil then + curIndex = 0 + return + end + for i = curIndex + 1, #q do + local callback = q[i] + curIndex = i + q[i] = nil + if callback then + mWakeup(callback) + end + end + curIndex = 0 + timer[curFrame] = nil + freeQueue[#freeQueue + 1] = q +end + +local m = {} +local mt = {} +mt.__index = mt +mt.type = 'timer' + +function mt:__tostring() + return '[table:timer]' +end + +function mt:__call() + if self._onTimer then + self:_onTimer() + end +end + +function mt:remove() + self._removed = true +end + +function mt:pause() + if self._removed or self._pauseRemaining then + return + end + self._pauseRemaining = getRemaining(self) + self._running = false + local ti = self._timeoutFrame + local q = timer[ti] + if q then + for i = #q, 1, -1 do + if q[i] == self then + q[i] = false + return + end + end + end +end + +function mt:resume() + if self._removed or not self._pauseRemaining then + return + end + local timeout = self._pauseRemaining + self._pauseRemaining = nil + mTimeout(self, timeout) +end + +function mt:restart() + if self._removed or self._pauseRemaining or not self._running then + return + end + local ti = self._timeoutFrame + local q = timer[ti] + if q then + for i = #q, 1, -1 do + if q[i] == self then + q[i] = false + break + end + end + end + self._running = false + mTimeout(self, self._timeout) +end + +function mt:remaining() + return getRemaining(self) / 1000.0 +end + +function mt:onTimer() + self:_onTimer() +end + +function m.wait(timeout, onTimer) + local t = setmetatable({ + ['_timeout'] = mathMax(mathFloor(timeout * 1000.0), 1), + ['_onTimer'] = onTimer, + ['_timerCount'] = 1, + }, mt) + mTimeout(t, t._timeout) + return t +end + +function m.loop(timeout, onTimer) + local t = setmetatable({ + ['_timeout'] = mathFloor(timeout * 1000.0), + ['_onTimer'] = onTimer, + }, mt) + mTimeout(t, t._timeout) + return t +end + +function m.timer(timeout, count, onTimer) + if count == 0 then + return m.loop(timeout, onTimer) + end + local t = setmetatable({ + ['_timeout'] = mathFloor(timeout * 1000.0), + ['_onTimer'] = onTimer, + ['_timerCount'] = count, + }, mt) + mTimeout(t, t._timeout) + return t +end + +function m.clock() + return curFrame / 1000.0 +end + +function m.update(delta) + if curIndex ~= 0 then + curFrame = curFrame - 1 + end + maxFrame = maxFrame + delta * 1000.0 + while curFrame < maxFrame do + curFrame = curFrame + 1 + onTick() + end +end + +return m -- cgit v1.2.3