Advanced Overview of cosock.run
Global Variables
Cosock utilized a few global variables to allow for the potentially recursive nature of lua coroutines.
threads: List of all coroutines cosock is aware of- This is populated by the first argument to
cosock.spawn
- This is populated by the first argument to
threadnames: A map of coroutine<->name pairs- This is populated by the second argument ot
cosock.spawn
- This is populated by the second argument ot
threadswaitingfor: A map of coroutine<->select args- Select args have the type
{recvr: table<cosock.socket>, sendr: table<cosock.socket>, timeout: float?} - This populated by the values provided to
coroutine.yieldfor cosock tasks from a call tocosock.socket.select
- Select args have the type
readythreads: A map of coroutine<->resume args that will be ready on the next pass- Resume args have the type
{recvr = table<cosock.socket>, sendr = table<cosock.socket>, err: string?} - This is populated by coroutine wake-ups that occur on the current pass
- Resume args have the type
socketwrappermap: A map of luasocket<->cosock socket pairs- This map is keyed with the table pointer for the luasocket for easily getting back to a cosock socket when you only have a luasocket
- This gets populated when a cosock socket is included in a select args table
threaderrorhandler: Potential error handler function. Not currently settable.timers: see Timers
Run Loop
To start the loop we define a few local variables. First up is wakethreads, This table will be
populated by removing all of the elements from readythreads, which frees up readythreads to
be added to by any tasks we are about to wake up. Next are the two list tables sendt and recvt
along with the optional integer timeout, these will end up being passed to luasocket's socket.select
when we run out of ready threads. Now we can move all our readythreads into wakethreads and then
loop over all of the ready threads.
For each ready thread, we first check if the coroutine.status for it is "suspended", if it isn't we
will skip this thread. For any "suspended" thread, we first cancel any timers that might be scheduled
for that thread by calling timers.cancel. Next we pull out the values we stored in threadswaitingfor
and call skt:setwaker with nil for any sendr or recvr properties, this will prevent any potential
"double wakes" from occurring.
Now we call coroutine.resume with our thread, and the recvr, sendr and err values that were stored
in wakethreads. When coroutine.resume completes, we have 2 pieces of information that drive our next path.
The first return value from coroutine.resume will indicate if our thread raised an error or not, the second
is that we call coroutine.status on our thread. If our thread's status is "dead" and coroutine.resume
returned false, something has gone terribly wrong so we raise an error (with a traceback if debug is available).
If our thread's status is "dead" and coroutine.resume returned true, we just remove our thread from threads
and threadswaitingfor. If our thread's status is "suspended" and coroutine.resume returned true then we first
update threadswaitingfor with the remaining return values from coroutine.resume. We also then call skt:setwaker
for any sendr and recvr in those values to a function that will clear itself and call the local function
wake_thread. At this point we also update our local tables recvt and sendt, to include the skt.inner_sock
values from threadswaitingfor which are the luasockets associated with a cosock.socket.
Now that we have resumed all of the wakethreads and filled in our eventual arguments to socket.select, we then
determine if we are still running, we do this by looping over all of our threads and checking that at least 1 has
a coroutine.status that is not "dead". If all threads are "dead" and nothing got added to readythreads then
we exit the run loop. Next we update our socketwrappermap by looping over all of the values in threadswaitingfor
and insert each recvr and sendr into the key of skt.inner_sock.
With all the bookkeeping done, we run all of the timers that have reached their deadline, and update our
local variable timeout to the duration until the shortest remaining timeout. If at least one thread was added to
readythreads, we set our timeout to 0, because we already know we have new work to do. At this point if timeout is
nil and both sendt and recvt are empty, we raise an error because we are about to call
socket.select({}, {}, nil) which would just block forever. At this point, we call socket.select capturing the
values in recvr, sendr and err. If err isn't the value "timeout", we raise that error. If err is nil
or "timeout", we loop over all of the values in recvr and sendr, looking up the cosock socket in
socketwrappermap and calling skt:_wake which would call the function we provided to setwaker above.
With that complete, we now have a fully updated readythreads and we start the loop again.
Outline Version
- Define
wakethreads - Define an empty list of senders (
sendt), receivers (recvt) and atimeout - Pop all
readythreadsentries into thewakethreads - Loop over all threads in
wakethreads- If
coroutine.statusfor that thread returns "suspended"- Clear any timers
- Clear any wakers registered with a
timeout coroutine.resumewith the storedrecv4,sendranderrarguments- If
coroutine.resumereturnedtruein the first position andcoroutine.statusreturns "suspended"- Re-populate
threadswaitingfor[thread]with the 3 other return values fromcoroutine.resume- These should be the
recvt,sendtandtimeoutvalues that will populate select args
- These should be the
- Set the waker for all sockets in
recvtandsendtto callwake_threadand then unset themselves - If
coroutine.resumereturned atimeout, create a new timer for this thread which will callwake_thread_erron expirations with the value "timeout"
- Re-populate
- if
coroutine.statusreturned "dead"- If
coroutine.resumereturnedfalsein the first position and nothreaderrorhandlerhas been set- Raise an error
- If the
debuglibrary is available, include adebug.tracebackand the second return value fromcosock.resume - Else just raise an error with the second return value from
cosock.resume
- If the
- Exit the application
- This calls
os.exit(-1)
- This calls
- Raise an error
- If
- Else, print a warning message if printing is turned on
- If
- Initialize a variable
runningtofalse - Loop over all
threads, callingcoroutine.statuson each, if at least 1 doesn't return "dead", setrunningtotrue - If
runningisfalseandreadythreadsis empty- Exit the run loop
- Loop over all the values in
threadswaitingfor- Insert the luasockets on any
sendrorrecvrparameters to the loop local variablessendtandrecvt - Populate
socketwrappermapwith anysendrorrecvrs
- Insert the luasockets on any
- Call
timers.run - If
readythreadsis not empty- Set
timeoutto0
- Set
- If
timeoutis falsy andrecvtis empty andsendtis empty- Raise an error that cosock.select was called with no sockets and no timeouts
- Call luasocket's
socket.selectwith our loopsrecvt,sendtandtimeout - If
socket.selectreturns a value in the 3rd position and that value is not"timeout"- Raise an error with that return value
- Loop over the
recvr(1st) return fromsocket.select- Look up the
cosock.socketfromsocketwrappermap - Call
skt:_wake("recvr")
- Look up the
- Loop over the
sendr(2nd) return fromsocket.select- Look up the
cosock.socketfromsocketwrappermap - Call
skt:_wake("sendr")
- Look up the
Timers
Internally, we keep a list of timer objects to determine when any thread would have reached the maximum time
it should be running/yielding for. We can interact with these through a module local variable timers which
has a few associated functions.
Inside of a do block, we create 2 local variables timeouts and refs for use in the timer associated functions.
The first associated function worth discussing is timers.set, which takes the arguments timeout: float,
callback: fun() and ref: table. When called, we first capture the current timestamp via socket.gettime(),
we then calculate the deadline for this timer by adding timeout to that timestamps into a variable timeoutat.
We then table.insert the the table { timeoutat = timeoutat, callback = callback, ref = ref } into timeouts.
If ref isn't nil we also populate refs[ref] with that same table.
Next up is timers.cancel which takes the arguments ref: table. When called, we first lookup the timeout info
from refs[ref], if we find something there we remove the values in the properties callback and ref and finally
we remove the value from refs. By removing the callback we avoid ever calling the consequence of that timer.
Eventually it will be removed from timeouts in the next call to run.
Finally we have timers.run this function takes no arguments. When called, it first sorts the timeouts table in
ascending order by timeoutat, where nil values are the smallest values. We then capture the current timestamp
by calling socket.gettime. Now we consult the first element of timeouts, if that table as a timeoutat of nil
or is less than now, we pop it off list, if it has a callback property we call that, if it has a ref property
we remove it from refs.
Now that all the pending timers are done, we use the new first element of timeouts' timeoutat property to calculate
the next relative timeout (timeoutat - now) and return that as the earliest timeout. If timeouts is empty, we return
nil.
Outline Version
- A timer has the shape
{timeoutat: float, callback: fun(), ref: table?}timers.set- Updates
timersto include that value. Also updates a private scoped table namedrefsrefsis a map of table pointer<->timer which is used for cancellation of a timer
- Updates
timers.cancel- If the provided table pointer is in
refs, remove thecallbackandrefproperties from that table - Set the table pointer key in
refstonil
- If the provided table pointer is in
timers.run- Sort all timeouts by deadline (earliest first)
- Pop the timer off the front of the
timerslist - If that
timer.timeoutatisnilor< socket.gettime()- Call
timer.callback - remove this
timerfromrefs
- Call
- If there are any more timeouts left, return how long before that timeout should expire
- If there are no more timeouts, return
nil