-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync.lua
More file actions
130 lines (126 loc) · 2.54 KB
/
async.lua
File metadata and controls
130 lines (126 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async={}
async.threads=setmetatable({},{__mode="kv"})
local resume
function async.new(func,err)
local co=coroutine.create(func)
local sfunc
function sfunc(...)
local prev=async.current
async.current=sfunc
hook.del(sfunc)
if coroutine.status(co)=="dead" then
return
end
local p={coroutine.resume(co,...)}
async.current=prev
if coroutine.status(co)=="dead" then
hook.queue("async_dead",sfunc)
if not p[1] then
(err or error)((p[2] or "").."\nin async resume\n"..debug.traceback(co))
end
end
return unpack(p,2)
end
async.threads[sfunc]=co
sfunc()
return sfunc
end
function async.pull(...)
hook.new({...},async.current)
return coroutine.yield()
end
function async.wait(n)
hook.new(hook.timer(n),async.current)
coroutine.yield()
end
function async.join(...)
local r=setmetatable({},{__mode="k"})
for k,v in pairs({...}) do
r[v]=true
end
while next(r) do
local f=async.pull("async_dead",hook.timer(5))
if r[f] then
return f
end
end
end
function async.socket(sk,err)
assert(sk,err)
local out
out=setmetatable({
ip=sk:getpeername(),
receive=function(len,pfx)
local tmo=out.timeout
if type(len)=="number" and len<1 then
return "",nil
elseif type(tmo)=="number" and tmo<=0 then
return nil,"timeout"
end
hook.newsocket(sk)
local resume=async.current
local stop=false
if tmo then
hook.new(hook.timer(tmo),function()
stop=true
resume(nil,"timeout")
end)
end
hook.new("select",function()
local txt,err,str=sk:receive(len)
txt=txt or str
if err~="timeout" or (txt~="" and len=="*a") then
resume(txt,err)
stop=true
hook.stop()
elseif stop then
hook.stop()
end
end)
local txt,err=coroutine.yield()
hook.remsocket(sk)
return (pfx or "")..txt,err
end,
send=function(txt)
local ind=1
local _,l=sk:getstats()
local bse=l
while true do
local t,err,c=sk:send(txt,ind)
if not t and err~="timeout" then
return false,err
end
if not c then
break
end
local _,sl=sk:getstats()
ind=ind+(sl-l)
l=sl
hook.newrsocket(sk)
async.pull("select")
hook.remrsocket(sk)
end
return true
end,
accept=function()
hook.newsocket(sk)
while true do
local cl,err=sk:accept()
if err~="timeout" then
hook.remsocket(sk)
return async.socket(cl,err)
end
async.pull("select")
end
end,
close=function()
while hook.remsocket(sk) do end
sk:close()
end,
sk=sk,
},{__gc=function()
while hook.remsocket(sk) do end
sk:close()
end})
return out
end