diff options
-rw-r--r-- | script/brave/work.lua | 2 | ||||
-rw-r--r-- | script/proto/proto.lua | 4 | ||||
-rw-r--r-- | script/service/net.lua | 207 |
3 files changed, 92 insertions, 121 deletions
diff --git a/script/brave/work.lua b/script/brave/work.lua index 47cc9459..a6a7a41e 100644 --- a/script/brave/work.lua +++ b/script/brave/work.lua @@ -54,7 +54,7 @@ brave.on('loadProtoBySocket', function (param) function lsmaster:on_data(data) lsclient:write(data) - lsclient:update() + net.update() end while true do diff --git a/script/proto/proto.lua b/script/proto/proto.lua index 991a3c60..d01c8f36 100644 --- a/script/proto/proto.lua +++ b/script/proto/proto.lua @@ -57,7 +57,7 @@ function m.send(data) io.write(buf) elseif m.mode == 'socket' then m.client:write(buf) - m.client:update() + net.update() end end @@ -265,7 +265,7 @@ function m.listen(mode, socketPort) t:remove() m.client = client client:write(dummyClient.buf) - client:update() + net.update() end pub.task('loadProtoBySocket', { diff --git a/script/service/net.lua b/script/service/net.lua index 86edc9a6..2019406e 100644 --- a/script/service/net.lua +++ b/script/service/net.lua @@ -1,42 +1,39 @@ local socket = require "bee.socket" +local select = require "bee.select" +local selector = select.create() +local SELECT_READ <const> = select.SELECT_READ +local SELECT_WRITE <const> = select.SELECT_WRITE -local readfds = {} -local writefds = {} -local map = {} - -local function FD_SET(set, fd) - for i = 1, #set do - if fd == set[i] then - return - end +local function fd_set_read(s) + if s._flags & SELECT_READ ~= 0 then + return end - set[#set+1] = fd + s._flags = s._flags | SELECT_READ + selector:event_mod(s._fd, s._flags) end -local function FD_CLR(set, fd) - for i = 1, #set do - if fd == set[i] then - set[i] = set[#set] - set[#set] = nil - return - end +local function fd_clr_read(s) + if s._flags & SELECT_READ == 0 then + return end + s._flags = s._flags & (~SELECT_READ) + selector:event_mod(s._fd, s._flags) end -local function fd_set_read(fd) - FD_SET(readfds, fd) -end - -local function fd_clr_read(fd) - FD_CLR(readfds, fd) -end - -local function fd_set_write(fd) - FD_SET(writefds, fd) +local function fd_set_write(s) + if s._flags & SELECT_WRITE ~= 0 then + return + end + s._flags = s._flags | SELECT_WRITE + selector:event_mod(s._fd, s._flags) end -local function fd_clr_write(fd) - FD_CLR(writefds, fd) +local function fd_clr_write(s) + if s._flags & SELECT_WRITE == 0 then + return + end + s._flags = s._flags & (~SELECT_WRITE) + selector:event_mod(s._fd, s._flags) end local function on_event(self, name, ...) @@ -49,8 +46,8 @@ end local function close(self) local fd = self._fd on_event(self, "close") + selector:event_del(fd) fd:close() - map[fd] = nil end local stream_mt = {} @@ -69,7 +66,7 @@ function stream:write(data) return end if self._writebuf == "" then - fd_set_write(self._fd) + fd_set_write(self) end self._writebuf = self._writebuf .. data end @@ -79,35 +76,17 @@ end function stream:close() if not self.shutdown_r then self.shutdown_r = true - fd_clr_read(self._fd) + fd_clr_read(self) end if self.shutdown_w or self._writebuf == "" then self.shutdown_w = true - fd_clr_write(self._fd) + fd_clr_write(self) close(self) end end -function stream:update(timeout) - local fd = self._fd - local r = {fd} - local w = r - if self._writebuf == "" then - w = nil - end - local rd, wr = socket.select(r, w, timeout or 0) - if rd then - if #rd > 0 then - self:select_r() - end - if #wr > 0 then - self:select_w() - end - end -end local function close_write(self) - fd_clr_write(self._fd) + fd_clr_write(self) if self.shutdown_r then - fd_clr_read(self._fd) close(self) end end @@ -133,26 +112,43 @@ function stream:select_w() end end end +local function update_stream(s, event) + if event & SELECT_READ ~= 0 then + s:select_r() + end + if event & SELECT_WRITE ~= 0 then + s:select_w() + end +end local function accept_stream(fd) - local self = setmetatable({ + local s = setmetatable({ _fd = fd, + _flags = SELECT_READ, _event = {}, _writebuf = "", shutdown_r = false, shutdown_w = false, }, stream_mt) - map[fd] = self - fd_set_read(fd) - return self -end -local function connect_stream(self) - setmetatable(self, stream_mt) - fd_set_read(self._fd) - if self._writebuf ~= "" then - self:select_w() + selector:event_add(fd, SELECT_READ, function (event) + update_stream(s, event) + end) + return s +end +local function connect_stream(s) + setmetatable(s, stream_mt) + selector:event_del(s._fd) + if s._writebuf ~= "" then + s._flags = SELECT_READ | SELECT_WRITE + selector:event_add(s._fd, SELECT_READ | SELECT_WRITE, function (event) + update_stream(s, event) + end) + s:select_w() else - fd_clr_write(self._fd) + s._flags = SELECT_READ + selector:event_add(s._fd, SELECT_READ, function (event) + update_stream(s, event) + end) end end @@ -170,35 +166,32 @@ function listen:is_closed() end function listen:close() self.shutdown_r = true - fd_clr_read(self._fd) close(self) end -function listen:update(timeout) - local fd = self._fd - local r = {fd} - local rd = socket.select(r, nil, timeout or 0) - if rd then - if #rd > 0 then - self:select_r() - end - end -end -function listen:select_r() - local newfd = self._fd:accept() - if newfd:status() then - local news = accept_stream(newfd) - on_event(self, "accept", news) - end -end local function new_listen(fd) local s = { _fd = fd, + _flags = SELECT_READ, _event = {}, shutdown_r = false, shutdown_w = true, } - map[fd] = s - fd_set_read(fd) + selector:event_add(fd, SELECT_READ, function () + local newfd, err = fd:accept() + if not newfd then + on_event(s, "error", err) + return + end + local ok, err = newfd:status() + if not ok then + on_event(s, "error", err) + return + end + if newfd:status() then + local news = accept_stream(newfd) + on_event(s, "accept", news) + end + end) return setmetatable(s, listen_mt) end @@ -221,39 +214,27 @@ function connect:is_closed() end function connect:close() self.shutdown_w = true - fd_clr_write(self._fd) close(self) end -function connect:update(timeout) - local fd = self._fd - local w = {fd} - local rd, wr = socket.select(nil, w, timeout or 0) - if rd then - if #wr > 0 then - self:select_w() - end - end -end -function connect:select_w() - local ok, err = self._fd:status() - if ok then - connect_stream(self) - on_event(self, "connect") - else - on_event(self, "error", err) - self:close() - end -end local function new_connect(fd) local s = { _fd = fd, + _flags = SELECT_WRITE, _event = {}, _writebuf = "", shutdown_r = false, shutdown_w = false, } - map[fd] = s - fd_set_write(fd) + selector:event_add(fd, SELECT_WRITE, function () + local ok, err = fd:status() + if ok then + connect_stream(s) + on_event(s, "connect") + else + on_event(s, "error", err) + s:close() + end + end) return setmetatable(s, connect_mt) end @@ -293,18 +274,8 @@ function m.connect(protocol, ...) end function m.update(timeout) - local rd, wr = socket.select(readfds, writefds, timeout or 0) - if rd then - for i = 1, #rd do - local fd = rd[i] - local s = map[fd] - s:select_r() - end - for i = 1, #wr do - local fd = wr[i] - local s = map[fd] - s:select_w() - end + for func, event in selector:wait(timeout or 0) do + func(event) end end |