Advanced Overview of cosock.run
Global Variables
Cosock utilized a few global variables to allow for the potentially recursive nature of lua coroutines.
threads
: List of all coroutines cosock is aware of- This is populated by the first argument to
cosock.spawn
- This is populated by the first argument to
threadnames
: A map of coroutine<->name pairs- This is populated by the second argument ot
cosock.spawn
- This is populated by the second argument ot
threadswaitingfor
: A map of coroutine<->select args- Select args have the type
{recvr: table<cosock.socket>, sendr: table<cosock.socket>, timeout: float?}
- This populated by the values provided to
coroutine.yield
for cosock tasks from a call tocosock.socket.select
- Select args have the type
readythreads
: A map of coroutine<->resume args that will be ready on the next pass- Resume args have the type
{recvr = table<cosock.socket>, sendr = table<cosock.socket>, err: string?}
- This is populated by coroutine wake-ups that occur on the current pass
- Resume args have the type
socketwrappermap
: A map of luasocket<->cosock socket pairs- This map is keyed with the table pointer for the luasocket for easily getting back to a cosock socket when you only have a luasocket
- This gets populated when a cosock socket is included in a select args table
threaderrorhandler
: Potential error handler function. Not currently settable.timers
: see Timers
Run Loop
To start the loop we define a few local variables. First up is wakethreads
, This table will be
populated by removing all of the elements from readythreads
, which frees up readythreads
to
be added to by any tasks we are about to wake up. Next are the two list tables sendt
and recvt
along with the optional integer timeout
, these will end up being passed to luasocket's socket.select
when we run out of ready threads. Now we can move all our readythreads
into wakethreads
and then
loop over all of the ready threads.
For each ready thread, we first check if the coroutine.status
for it is "suspended"
, if it isn't we
will skip this thread. For any "suspended"
thread, we first cancel any timers that might be scheduled
for that thread by calling timers.cancel
. Next we pull out the values we stored in threadswaitingfor
and call skt:setwaker
with nil
for any sendr
or recvr
properties, this will prevent any potential
"double wakes" from occurring.
Now we call coroutine.resume
with our thread
, and the recvr
, sendr
and err
values that were stored
in wakethreads
. When coroutine.resume
completes, we have 2 pieces of information that drive our next path.
The first return value from coroutine.resume
will indicate if our thread
raised an error or not, the second
is that we call coroutine.status
on our thread
. If our thread
's status is "dead"
and coroutine.resume
returned false
, something has gone terribly wrong so we raise an error (with a traceback if debug
is available).
If our thread
's status is "dead"
and coroutine.resume
returned true
, we just remove our thread
from threads
and threadswaitingfor
. If our thread
's status is "suspended"
and coroutine.resume
returned true
then we first
update threadswaitingfor
with the remaining return values from coroutine.resume
. We also then call skt:setwaker
for any sendr
and recvr
in those values to a function that will clear itself and call the local function
wake_thread
. At this point we also update our local tables recvt
and sendt
, to include the skt.inner_sock
values from threadswaitingfor
which are the luasocket
s associated with a cosock.socket
.
Now that we have resumed all of the wakethreads
and filled in our eventual arguments to socket.select
, we then
determine if we are still running
, we do this by looping over all of our threads
and checking that at least 1 has
a coroutine.status
that is not "dead"
. If all threads
are "dead"
and nothing got added to readythreads
then
we exit the run loop. Next we update our socketwrappermap
by looping over all of the values in threadswaitingfor
and insert each recvr
and sendr
into the key of skt.inner_sock
.
With all the bookkeeping done, we run all of the timers
that have reached their deadline, and update our
local variable timeout
to the duration until the shortest remaining timeout. If at least one thread was added to
readythreads
, we set our timeout to 0, because we already know we have new work to do. At this point if timeout
is
nil
and both sendt
and recvt
are empty, we raise an error because we are about to call
socket.select({}, {}, nil)
which would just block forever. At this point, we call socket.select
capturing the
values in recvr
, sendr
and err
. If err
isn't the value "timeout"
, we raise that error. If err
is nil
or "timeout"
, we loop over all of the values in recvr
and sendr
, looking up the cosock socket in
socketwrappermap
and calling skt:_wake
which would call the function we provided to setwaker
above.
With that complete, we now have a fully updated readythreads
and we start the loop again.
Outline Version
- Define
wakethreads
- Define an empty list of senders (
sendt
), receivers (recvt
) and atimeout
- Pop all
readythreads
entries into thewakethreads
- Loop over all threads in
wakethreads
- If
coroutine.status
for that thread returns "suspended"- Clear any timers
- Clear any wakers registered with a
timeout
coroutine.resume
with the storedrecv4
,sendr
anderr
arguments- If
coroutine.resume
returnedtrue
in the first position andcoroutine.status
returns "suspended"- Re-populate
threadswaitingfor[thread]
with the 3 other return values fromcoroutine.resume
- These should be the
recvt
,sendt
andtimeout
values that will populate select args
- These should be the
- Set the waker for all sockets in
recvt
andsendt
to callwake_thread
and then unset themselves - If
coroutine.resume
returned atimeout
, create a new timer for this thread which will callwake_thread_err
on expirations with the value "timeout"
- Re-populate
- if
coroutine.status
returned "dead"- If
coroutine.resume
returnedfalse
in the first position and nothreaderrorhandler
has been set- Raise an error
- If the
debug
library is available, include adebug.traceback
and the second return value fromcosock.resume
- Else just raise an error with the second return value from
cosock.resume
- If the
- Exit the application
- This calls
os.exit(-1)
- This calls
- Raise an error
- If
- Else, print a warning message if printing is turned on
- If
- Initialize a variable
running
tofalse
- Loop over all
threads
, callingcoroutine.status
on each, if at least 1 doesn't return "dead", setrunning
totrue
- If
running
isfalse
andreadythreads
is empty- Exit the run loop
- Loop over all the values in
threadswaitingfor
- Insert the luasockets on any
sendr
orrecvr
parameters to the loop local variablessendt
andrecvt
- Populate
socketwrappermap
with anysendr
orrecvr
s
- Insert the luasockets on any
- Call
timers.run
- If
readythreads
is not empty- Set
timeout
to0
- Set
- If
timeout
is falsy andrecvt
is empty andsendt
is empty- Raise an error that cosock.select was called with no sockets and no timeouts
- Call luasocket's
socket.select
with our loopsrecvt
,sendt
andtimeout
- If
socket.select
returns a value in the 3rd position and that value is not"timeout"
- Raise an error with that return value
- Loop over the
recvr
(1st) return fromsocket.select
- Look up the
cosock.socket
fromsocketwrappermap
- Call
skt:_wake("recvr")
- Look up the
- Loop over the
sendr
(2nd) return fromsocket.select
- Look up the
cosock.socket
fromsocketwrappermap
- Call
skt:_wake("sendr")
- Look up the
Timers
Internally, we keep a list of timer
objects to determine when any thread
would have reached the maximum time
it should be running/yielding for. We can interact with these through a module local variable timers
which
has a few associated functions.
Inside of a do
block, we create 2 local variables timeouts
and refs
for use in the timer
associated functions.
The first associated function worth discussing is timers.set
, which takes the arguments timeout: float
,
callback: fun()
and ref: table
. When called, we first capture the current timestamp via socket.gettime()
,
we then calculate the deadline for this timer by adding timeout
to that timestamps into a variable timeoutat
.
We then table.insert
the the table { timeoutat = timeoutat, callback = callback, ref = ref }
into timeouts
.
If ref
isn't nil
we also populate refs[ref]
with that same table.
Next up is timers.cancel
which takes the arguments ref: table
. When called, we first lookup the timeout info
from refs[ref]
, if we find something there we remove the values in the properties callback
and ref
and finally
we remove the value from refs
. By removing the callback
we avoid ever calling the consequence of that timer.
Eventually it will be removed from timeouts
in the next call to run
.
Finally we have timers.run
this function takes no arguments. When called, it first sorts the timeouts
table in
ascending order by timeoutat
, where nil
values are the smallest values. We then capture the current timestamp
by calling socket.gettime
. Now we consult the first element of timeouts
, if that table as a timeoutat
of nil
or is less than now, we pop it off list, if it has a callback
property we call that, if it has a ref
property
we remove it from refs
.
Now that all the pending timers are done, we use the new first element of timeouts
' timeoutat
property to calculate
the next relative timeout (timeoutat - now
) and return that as the earliest timeout. If timeouts
is empty, we return
nil
.
Outline Version
- A timer has the shape
{timeoutat: float, callback: fun(), ref: table?}
timers.set
- Updates
timers
to include that value. Also updates a private scoped table namedrefs
refs
is a map of table pointer<->timer which is used for cancellation of a timer
- Updates
timers.cancel
- If the provided table pointer is in
refs
, remove thecallback
andref
properties from that table - Set the table pointer key in
refs
tonil
- If the provided table pointer is in
timers.run
- Sort all timeouts by deadline (earliest first)
- Pop the timer off the front of the
timers
list - If that
timer.timeoutat
isnil
or< socket.gettime()
- Call
timer.callback
- remove this
timer
fromrefs
- Call
- If there are any more timeouts left, return how long before that timeout should expire
- If there are no more timeouts, return
nil