Cosock
Cosock is a coroutine runtime written in pure Lua and based on the popular luasocket library.
The goal of the project is to provide the same interfaces that luasocket provides but wrapped up in coroutines to allow for concurrent IO.
Note: these docs will use the term coroutine, task, and thread interchangeably to all mean a lua coroutine
For example, the following 2 lua programs use luasocket to define a tcp client and server.
--client.lua
local socket = require "socket"
local client = socket.tcp()
client:connect("0.0.0.0", 9999)
while true do
print("sending ping")
client:send("ping\n")
local response = assert(client:receive())
assert(response == "pong")
end
--server.lua
local socket = require "socket"
local server = socket.tcp()
server:bind("0.0.0.0", 9999)
server:listen()
print("listening", server:getsockname())
local client = server:accept()
while true do
local request = assert(client:receive())
assert(request == "ping")
print("sending pong")
client:send("pong\n")
end
If you were to run lua ./server.lua
first and then run lua ./client.lua
you should see each terminal print out
their "sending ..." messages forever.
Using cosock, we can actually write the same thing as a single application.
-- client_server.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
--- Since the client and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
local client = server:accept()
while true do
local request = assert(client:receive())
print(string.format("received %q", request))
assert(request == "ping")
print("sending pong")
client:send("pong\n")
end
end, "server task")
--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
--- wait for the server to be ready.
local port = assert(port_rx:receive())
local client = socket.tcp()
client:connect(ip, port)
while true do
print("sending ping")
client:send("ping\n")
local request = assert(client:receive())
assert(request == "pong")
end
end, "client task")
--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()
Now if we run this with lua ./client_server.lua
we should see the messages alternate.
Notice that we called cosock.spawn
twice, once for the server task and
once for the client task, we are going to dig into that next. We also added a call to cosock.run
at the bottom of our example, this function will run our tasks until there is no more work to do
so it is important you don't forget it or nothing will happen.
Getting Started
The easiest way to use cosock
is to install it with luarocks.
luarocks install cosock
cosock
depends on both luasocket
and luasec, when running the above command
luarocks
will attempt to compile both of these libraries which have some system
dependencies.
Luasocket
The version of luasocket
we are using requires that the Lua development package is
available.
Linux
For Debian-based systems, you would need to run the following
sudo apt-get install liblua-dev
For Fedora
sudo dnf install lua-devel
Windows
Help Wanted: please open a PR with info here if you have successfully got this working on windows.
MacOS
These can be downloaded for MacOS via brew
brew install lua
Luasec
Luasec depends on Openssl so you will need those development libraries
Linux
For Debian-based systems, you would need to run the following
sudo apt-get install libssl-dev
For Fedora
sudo dnf install openssl-devel
Windows
Help Wanted: please open a PR with info here if you have successfully got this working on windows.
MacOS
These can be downloaded for MacOS via brew
brew install openssl
Spawn
At the core of cosock is the ability to wrap any operation in a coroutine and
register that with cosock. For this cosock exports the function cosock.spawn
. This function takes
2 arguments, the first is a function that will be our coroutine, and the second is a name for that coroutine.
For example, this is a simple program that will spawn a single coroutine, which will print the current timestamp and the word "tick" and then sleep for 1 second in a loop forever.
--basic_spawn.lua
local cosock = require "cosock"
cosock.spawn(function()
while true do
print(cosock.socket.gettime(), "tick")
cosock.socket.sleep(1)
end
end, "clock")
cosock.run()
The act of calling cosock.spawn
allows us to use the non-blocking cosock.socket.sleep
function. This means
we could extend our application to not only print this message every second but use the time this coroutine
is sleeping to perform some other work. Let's extend our little example a bit.
--less_basic_spawn.lua
local cosock = require "cosock"
local function tick_task()
while true do
print(cosock.socket.gettime(), "tick")
cosock.socket.sleep(2)
end
end
local function tock_task()
cosock.socket.sleep(1)
while true do
print(cosock.socket.gettime(), "tock")
cosock.socket.sleep(2)
end
end
cosock.spawn(tick_task, "tick-task")
cosock.spawn(tock_task, "tock-task")
cosock.run()
Very similar to our last example, this time we are spawning 2 coroutines one will print "tick"
every two seconds
the other will wait 1 second and then print "tock"
every two seconds. This should result in a line getting
printed to the terminal once a second alternating between our two strings. Notice though, there is a fair amount
of code duplication as tick_task
and tock_task
are nearly identical. This is mostly driven by the fact
that the first argument to cosock.spawn
is a function that takes no arguments and returns no values which
means we can't ask cosock to pass in any arguments. One way we can get around this is by using
closures. So instead of passing a function to
cosock.spawn
we can return a function from another function and use it as the argument to cosock.spawn
.
For example:
--even_less_basic_spawn.lua
local cosock = require "cosock"
local function create_task(name, should_sleep_first)
return function()
if should_sleep_first then
cosock.socket.sleep(1)
end
while true do
print(cosock.socket.gettime(), name)
cosock.socket.sleep(2)
end
end
end
cosock.spawn(create_task("tick", false), "tick-task")
cosock.spawn(create_task("tock", true), "tock-task")
cosock.run()
Notice here that create_task
returns a function but takes a name
argument and a should_sleep_first
argument which are available to our returned function.
Now, let's consider our first example which may not look like it but is very similar to our tick/tock example.
Instead of using cosock.socket.sleep
to tell cosock we are waiting around for something, it uses
the receive
method on a cosock.socket.tcp
. Let's break down what is happening in that example.
To start, both tasks will be resumed which means that cosock has selected it to run, we can't say
for sure which task will get resumed first which is why we used a cosock.channel
to make the
client task wait until the server was ready. Shortly after resuming, each task eventually calls
some method that will yield
which means that it is waiting on something so cosock can run
another task. For the server, the first time we yield
is in a call to accept
, if the client
hasn't already called connect
we would end up blocking so instead of blocking, we let another
task work, when we finally have a client connected cosock will wake us back up again. On the
client-side we first yield
on a call to channel:receive
, if the server hasn't sent the port
number we would end up blocking that task from calling bind
so we let the other task work until
we finally have a port number and then cosock will wake us back up.
This pattern continues, each task running exclusively until it needs to wait for something yielding control back to cosock. When the thing we were waiting for is ready, we can continue running again.
In both our tick/tock examples and our client/server example, we reach a point where cosock is just handing control from task 1 to task 2 and back again in an infinite loop. In a more real-world program, you might see any number of tasks, that need to be juggled. In our next example, we will extend the client/server example to handle any number of clients.
-- clients_server.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
local number_of_clients = 10
--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
while true do
local client = server:accept()
cosock.spawn(function()
while true do
local request = assert(client:receive())
print(string.format("received %q", request))
if request:match("ping") then
print("sending pong")
client:send("pong\n")
else
client:close()
break
end
end
end)
end
end, "server task")
--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
print("spawn_client", id, port)
local client = socket.tcp()
client:connect(ip, port)
while true do
print("sending ping", id)
client:send(string.format("ping %s\n", id))
local request = assert(client:receive())
assert(request == "pong")
socket.sleep(0.5)
end
end
--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
local port = assert(port_rx:receive())
for i=1,number_of_clients do
cosock.spawn(function()
spawn_client(i, port)
end, string.format("client-task-%s", i))
end
end
--- Spawn a bunch of client tasks
cosock.spawn(function()
spawn_clients()
end, "client task")
--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()
Surprisingly little has changed. First, we updated the socket task to call accept
more than once
and then pass the returned client
into its own task to receive
/send
in a loop there.
For the client-side, we broke the client send
/receive
loop into its own task and added
a parent task to wait for the port number and then cosock.spawn
a bunch of client tasks.
If you were to run this example, you would see that the print statements end up in random order!
Channels
Coordinating coroutines can be a huge pain, to ease this pain cosock offers a synchronization primitive
called channels. A cosock.channel
is a "multiple producer single consumer" message queue, this means you can
have one coroutine own the receiving half of the queue and pass the sender out to however many
coroutines you'd like. We've already seen how they are used, in our
first example, we used a cosock.channel
to coordinate which port
the client should connect
to. What if we wanted to re-write that example without using a channel, that might look
something like this:
--client_server_no_channel.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
local shared_port
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
server:bind(ip, 0)
local _ip, p = server:getsockname()
shared_port = p
server:listen()
local client = server:accept()
while true do
local request = assert(client:receive())
print(string.format("received %q", request))
assert(request == "ping")
print("sending pong")
client:send("pong\n")
end
end, "server task")
--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
--- wait for the server to be ready.
while shared_port == nil do
socket.sleep(1)
end
local client = socket.tcp()
client:connect(ip, shared_port)
while true do
print("sending ping")
client:send("ping\n")
local request = assert(client:receive())
assert(request == "pong")
end
end, "client task")
--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()
That update removed our channel in favor of polling that shared_port ~= nil
once every second. This will
absolutely work, however, we have introduced a race condition. What if we sleep for 1 full second right before
the server is issued its port? That second would be wasted, we could have already been creating our client.
While the consequence in this example isn't dire, it does show that relying on shared mutable state without
some way to synchronize it between tasks can be problematic.
As a note, we could also solve this problem by having the server task spawn the client task but that wouldn't be nearly as interesting to our current subject.
Another place we have seen that could benefit from a channel is our tick/tock example, in that example, we hard-coded the synchronization. The first task would print then sleep for 2 seconds and the second task would sleep for 1 second and then print and sleep for 2 seconds. Let's take a look at what that might have looked like if we had used channels.
--tick_tock_channels.lua
local cosock = require "cosock"
-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()
local function task(tx, rx, name)
while true do
-- First wait for the other task to tell us it is done
rx:receive()
-- print our name
print(cosock.socket.gettime(), name)
-- sleep for 1 second
cosock.socket.sleep(1)
-- tell the other task we are done
tx:send()
end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
task(tick_tx, tock_rx, "tock")
end)
-- prime the tick task to start first
tick_tx:send()
cosock.run()
In this version, we only have one definition for a task, looping forever, it will first wait to receive
the signal,
and then it prints its name, sleeps for 1 second and then sends the signal on to the next task. The key to
making this all work is that we need to kick the process off by telling one of the tasks to start. Since a
channel allows for multiple senders, it is ok that we call send
in more than one place.
Now say we wanted to extend our little clock emulator to print "clack"
every 60 seconds to simulate the
minute hand moving. That might look something like this:
--tick_tock_clack.lua
local cosock = require "cosock"
-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()
local clack_tx, clack_rx = cosock.channel.new()
local function task(tx, rx, name)
while true do
-- First wait for the other task to tell us the count
local count = rx:receive()
-- print our name
print(cosock.socket.gettime(), name)
-- sleep for 1 second
cosock.socket.sleep(1)
-- tell the other task we are done
if count >= 59 then
clack_tx:send(0)
else
tx:send(count + 1)
end
end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
task(tick_tx, tock_rx, "tock")
end)
cosock.spawn(function()
task(tick_tx, clack_rx, "clack")
end)
-- prime the tick task tp start first
tick_tx:send(1)
cosock.run()
Here we have updated the tasks to now share a counter across our channel. So at the
start of each loop iteration, we first get the current count from our other task. We again print our name
and then sleep for 1 second but now if the count is >= 59 we send the count of 0 to our "clack"
task which
will always then send a 1 to the "tick"
task to start the whole process over again. Just to make sure it
is clear, we can use the send half of the "tick"
task's channel in 3 places, the main thread
to "prime" the clock, the "tock"
task and the "clack"
task.
It is very important that we don't try and use the receiving half of a channel in more than one task, that would lead to potentially unexpected behavior. Let's look at an example of how that might go wrong.
-- bad_news_channels.lua
local cosock = require "cosock"
local tx, rx = cosock.channel.new()
cosock.spawn(function()
rx:receive()
print("task 1")
end)
cosock.spawn(function()
rx:receive()
print("task 2")
end)
tx:send()
cosock.run()
In this example, we create one channel pair and spawn two tasks which both call receive
on our
channel and just before run
we call send
. Since the choice for which task
should run at which time is left entirely up to cosock, we can't say for sure which of these tasks
will actually receive. It might print "task 1" or "task 2".
In actuality, cosock assumes that
receive
will only ever be called from the same coroutine. Callingreceive
in multiple coroutines will (eventually) raise in a error.
Select
Now that we have covered how to spawn and run coroutines using cosock, let's talk about how we
could handle multiple IO sources in a single coroutine. For this kind of work, cosock provides
cosock.socket.select
, this function works in a very similar way to luasocket's socket.select
,
to call it would look something like local recvr, sendr, err = cosock.socket.select(recvt, sendt, timeout)
its arguments are
recvt
: This is a list of cosock sockets that are waiting to be ready toreceive
sendt
: This is a list of cosock sockets that are waiting to be ready tosend
timeout
: This is the maximum amount of seconds to wait for one or more entries inrecvt
orsendt
to be ready- If this value is
nil
or negative it will treat the timeout as infinity
- If this value is
Note: The list entries for
sendt
andrecvt
can be other "cosock aware" tables like the lustre WebSocket, for specifics on how to make a table "cosock aware" see the chapter on it
Its return values are
recvr
: A list of ready receivers, any entry here should be free to callreceive
and immediately be readysendr
: A list of ready senders, any entry here should be free to callsend
and immediately be readyerr
: If this value is notnil
it represents an error message- The most common error message here would be
"timeout"
if thetimeout
argument provided is notnil
and positive
- The most common error message here would be
So, how would we use something like this? Let's consider our clients_server.lua
example
from the spawn chapter, where we called cosock.spawn
every time a new
client was accept
ed, this works but we don't have much control over how many tasks we end
up spawning. In large part, this is because we don't know how long each task will run. To achieve
this, we would need to be able to handle all of the client connections on the same task as the
server and to do that, we can use select
.
-- clients_server_select.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local number_of_clients = 10
--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Handle a client being ready to receive
--- @param client cosock.socket.tcp
--- @return integer|nil @1 if successful
--- @return nil|string @nil if successful, error message if not
function handle_recv(client, clients)
local request, err = client:receive()
if not request then
if err == "closed" then
clients[client] = nil
end
return
end
print(string.format("received %q", request))
if request:match("ping") then
print("sending pong")
local s, err = client:send("pong\n")
if err == "closed" then
clients[client] = nil
elseif err then
print("error in recv: " .. tostring(err))
end
else
client:close()
clients[client] = nil
end
end
--- Handle a server being ready to accept
--- @param server cosock.socket.tcp
--- @return cosock.socket.tcp|nil
--- @return nil|string @nil if successful, error message if not
function handle_accept(server, clients)
local client, err = server:accept()
if err and err ~= "timeout" then
error("error in accept: " .. tostring(err))
end
if client then
clients[client] = true
end
end
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
local server = socket.tcp()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
local clients = {}
server:settimeout(0)
while true do
local recvt = {}
for client, _ in pairs(clients) do
table.insert(recvt, client)
end
if #recvt < 5 then
table.insert(recvt, server)
end
local recvr, _sendr, err = cosock.socket.select(recvt, {}, 5)
if err == "timeout" then
return
elseif err then
error("Error in select: "..tostring(err))
end
for _, sock in ipairs(recvr) do
if sock == server then
print("accepting new client")
handle_accept(server, clients)
elseif clients[sock] then
handle_recv(sock, clients)
end
end
end
end, "server task")
--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
print("spawn_client", id, port)
local client = socket.tcp()
client:connect(ip, port)
for _=1,10 do
print("sending ping", id)
client:send(string.format("ping %s\n", id))
local request = assert(client:receive())
assert(request == "pong")
socket.sleep(0.5)
end
client:close()
end
--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
local port = assert(port_rx:receive())
for i=1,number_of_clients do
cosock.spawn(function()
spawn_client(i, port)
end, string.format("client-task-%s", i))
end
end
--- Spawn a bunch of client tasks
cosock.spawn(function()
spawn_clients()
end, "clients task")
--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()
The above is an updated version of our clients/server example with some updates to limit the total number of connections to 5, let's go over the changes.
First, we've added a few helper functions to handle the different events in our system,
the first is for when a client connection is ready to receive handle_recv
takes 2 arguments,
client
which is a cosock.socket.tcp
that was returned from a call
to accept
and clients
which is a table where the keys are cosock.socket.tcp
clients
and the values are true
. We first call client:receive
to get the bytes from
the client and if that returns a string that contains "ping"
then we send our
"pong"
message. There are few places where this can go wrong, the call to receive
could return nil
and an error message or not "ping"
or the call to send
could
return nil
and an error message; if the error message is "closed"
or the request
didn't contain "ping"
then we want to remove client
from clients
and if it was
the latter then we want to call client:close
.
Next up we have handle_accept
this also takes 2 arguments server
which is a
cosock.socket.tcp
socket that is listening and the same map of clients
. If a
call to accept
returns a client
then we add that client
into our clients
map.
If accept
returns nil
and err
isn't "timeout"
then we raise an error.
Alright, with these two helper functions we can now update the "server"
task to
handle all of the connected clients w/o having to call spawn
. Our tasks starts
out the same as before, creating a server
socket, binding it to a random port,
gets that port and sends it to our "clients task"
and then calls listen
.
At this point, things start to change, first we define our clients
map as
empty we then use handle_accept
to accept the first connection and then call
server:settimeout(0)
to avoid a potential server that will yield forever.
Inside of our long-running loop, we start out by defining a new table recvt
which
will match the argument to select
which has the same name. We then loop over our
clients
table, inserting any of the keys into recvt
. We keep these as separate
tables because we want to be able to remove a client
from our readiness check
once it has closed. Next, we check to see how large recvt
is, if it is below 5
we add server
into it. By only including server
when recvt
has fewer than
5 clients we have enforced our max connections limit.
With recvt
defined we can finally call cosock.socket.select
, we use recvt
as
the first argument, an empty table as the sendt
argument and finally a timeout of 5 seconds.
We assign the result of select
into recvr, _sendr, err
, we would expect that
recvr
would contain any of our clients
that are ready to receive
and, if
we are below the limit, server
. If recvr
is nil
we would expect err
to be
the string describing that error. If err
is "timeout"
then we exit our server
task which should exit the application. If we don't have an err
then we loop over
all the recvr
s and check to see if they are our server
, if so we call
handle_accept
if not then we call handle_recv
. Each of our helpers will update
the clients
map to ensure that we service all of the client requests before exiting.
The last change we've made is to spawn_client
which previously would loop forever,
it now loops 10 times before exiting and closing the client
.
If we were to run this you would see each of the tasks spawn in a random order and
the first 5 of those would begin sending their "ping"
messages. Once 1 of them
completes, we would accept the next connection but not before that point which means
we have limited our total number of connected clients to 5!
Integrating with Cosock
So far we have covered what cosock provides but what if we want to integrate our own libraries directly into cosock, what would that look like?
To start the general interface for a "cosock aware" lua table is to define a method setwaker
which takes 2 arguments, kind: str
and waker: fun()|nil
. The general idea here is
that a "waker" function can be provided that will get called when that task is ready
to be woken again.
Let's try and build an example Timer
that will define this setwaker
method to make
it "cosock aware"
local cosock = require "cosock"
local Timer = {}
Timer.__index = Timer
function Timer.new(secs)
return setmetatable({
secs = secs,
waker = nil,
}, Timer)
end
function Timer:wait()
coroutine.yield({self}, {}, self.secs)
end
function Timer:setwaker(kind, waker)
print("setwaker", kind, waker)
if waker then
self.waker = function()
print("waking up!")
waker()
end
else
self.waker = nil
end
end
cosock.spawn(function()
local t = Timer.new(2)
print("waiting")
t:wait()
print("waited")
end)
cosock.run()
To start we create a lua meta-table Timer
, which has the properties secs: number
and
waker: fun()|nil
. There is a constructor Timer.new(secs)
which takes the number of
seconds we want to wait for. Finally, we define Timer:wait
which is where our magic happens.
This method calls coroutine.yield
, with 3 arguments {self}
, an empty table, and self.secs
.
These arguments match exactly what would be passed to socket.select
, the first is a list of any
receivers, the second is a list of any senders and finally the timeout. Since we pass {self}
as the
first argument that means we are treating Timer
as a receiver. Ultimately what we are doing here
is asking cosock
to call socket.select({self}, {}, self.secs)
. While we don't end up calling self.waker
ourselves, cosock uses setwaker
to register tasks to be resumed so we need to conform to that. Just to
illustrate that is happening, a print
statement has been added to setwaker
, if we run this
we would see something like the following.
waiting
setwaker recvr function: 0x5645e6410770
setwaker recvr nil
waited
We can see that cosock calls setwaker
once with a function and a second time with nil
. Notice though
that self.waker
never actually gets called, since we don't see a "waking up"
message. That
is because we don't really need to be woken up, our timer yields the whole coroutine until
we have waited for self.secs
, nothing can interrupt that. Let's extend our Timer
to have a reason
to call self.waker
, we can do that by adding the ability to cancel a Timer
.
local cosock = require "cosock"
local Timer = {}
Timer.__index = Timer
function Timer.new(secs)
return setmetatable({
secs = secs,
waker = nil,
}, Timer)
end
function Timer:wait()
local r, s, err = coroutine.yield({self}, {}, self.secs)
if err == "timeout" then
return 1
end
return nil, "cancelled"
end
function Timer:setwaker(kind, waker)
print("setwaker", kind, waker)
if waker then
self.waker = function()
print("waking up!")
waker()
end
else
self.waker = nil
end
end
function Timer:cancel()
if self.waker then
self.waker()
end
end
cosock.spawn(function()
local t = Timer.new(10)
cosock.spawn(function()
cosock.socket.sleep(3)
t:cancel()
end)
print("waiting")
local s = os.time()
local success, err = t:wait()
local e = os.time()
print("waited", os.difftime(e, s), success, err)
end)
cosock.run()
In this example, we create our timer that will wait 10 seconds but before we call wait
we
spawn a new task that will sleep for 3 seconds and then call cancel
. If we look over the
changes made to wait
we can see that we still call coroutine.yield({self}, {}, self.secs)
but this time we are assigning its result to r, s, err
. Cosock calls coroutine.resume
with the same return values we would get from select
, that is a list of ready receivers,
a list of ready senders, and an optional error string. If the timer expires, we would expect
to get back nil, nil, "timeout"
, if someone calls the waker
before our timer expires
we would expect to get back {self}, {}, nil
. This means we can treat any err == "timeout"
as a normal timer expiration but if err ~= "timeout"
then we can safely assume our timer was canceled.
If we were to run this code we would see something like the following.
waiting
setwaker recvr function: 0x556d39beb6d0
waking up!
setwaker recvr nil
setwaker recvr nil
waited 3.0 nil cancelled
Notice we only slept for 3 seconds instead of 10, and wait
returned nil, "cancelled"
!
One thing we can take away from this new example is that the waker API is designed to allow
one coroutine to signal cosock that another coroutine is ready to wake up. With that in mind,
let's try and build something a little more useful, a version of the
cosock.channel
api that allows for a maximum queue size. Looking over the
existing channels,
to implement this we are going to need to have 3 parts. A shared table for queueing and
setting the appropriate wakers, a receiver table and a sender table. Let's start by
defining the shared table.
Bounded Channel
To start we are going to define a Lua table that will represent our shared bounded queue.
local BoundedChannel = {}
BoundedChannel.__index = BoundedChannel
This table should have the following properties
_wakers
: This is a table with 2 keyssendr
: This is a map of potential functions where the keys are a table representing the waiting senderrecvr
: An optional function that takes no arguments, this will wake our receiver
_max_depth
: This is the integer value that our queue should not grow larger than_msg_queue
: This is the message queue we will use to hold pending messages_closed
: This is a boolean to indicate if we have been explicitly closed
To make our lives easier, next we will add a couple of methods that will enforce the
queue nature of our _msg_queue
, one for removing the oldest message and one for
adding a new message.
function BoundedChannel:pop_front()
return table.remove(self._msg_queue, 1)
end
--- Add a new element to the back of this channel
function BoundedChannel:push_back(ele)
table.insert(self._msg_queue, ele)
end
Next, let's add a method for testing if we can send a new message.
function BoundedChannel:can_send()
if self._closed then
-- Check first that we are not closed, if we are
-- return an error message
return nil, "closed"
end
if #self._msg_queue >= self._max_depth then
-- Check next if our queue is full, if it is
-- return an error message
return nil, "full"
end
-- The queue is not full and we are not closed, return 1
return 1
end
This method first checks that we haven't been closed, if we have then it returns
nil, "closed"
, if we are still open it next checks to see if we have any space in
our queue, if not it returns nil, "full"
. So if we are not closed and not full
it returns 1.
Now we should create a similar method for checking if we can receive a new message.
function BoundedChannel:can_recv()
if self._closed and #self._msg_queue == 0 then
-- Check first that we haven't closed, if so
-- return an error message
return nil, "closed"
end
if #self._msg_queue == 0 then
-- Check next that we have at least 1 message,
-- if not, return an error message
return nil, "empty"
end
-- We are not closed and we have at least 1 pending message, return 1
return 1
end
Again, we being by checking for the closed case, returning nil, "closed"
, then we
check to see if the queue is empty, if so we return nil, "empty"
if there is
at least 1 message and we aren't closed then we return 1
.
Now we can define a few methods for interacting with our self._wakers
property.
First up is the set_waker_recvr
method.
function BoundedChannel:set_waker_recvr(waker)
self._wakers.recvr = waker
if waker and self:can_recv() then
-- Check if receiving is currently available, if
-- so call the waker to wake up the yielded receiver
waker()
end
end
Ok, so this one is pretty simple now that we have our helpers. First, we populate the
value in self._wakers.recvr
. If waker
is not nil
and self:can_recv
returns 1
we want to
immediately call the waker
because it means we are ready to be woken up.
For our next method, we are going to define set_waker_sendr
.
function BoundedChannel:set_waker_sendr(sendr, waker)
self._wakers.sendr[sendr] = waker
end
This looks quite a bit different from set_waker_recvr
! First of all, we have an extra argument
sendr
which will represent a unique call to coroutine.yield
and is how we can allow for
multiple senders. The second thing to notice is that we are not checking to see if self:can_send
this is because we don't know if another waker
has already been woken up for the current state.
This is all a bit hand-wavy right now but when we implement the sender things should become clear.
Now that we can set a waker, it is time to add a method for calling those waker functions.
function BoundedChannel:try_wake(kind)
local waker
if kind == "sendr" then
_, waker = next(self._wakers.sendr)
else
waker = self._wakers.recvr
end
if type(waker) == "function" then
waker()
end
end
Our new try_wake
method takes 1 argument, which will either be the string "sendr"
or "recvr"
.
If we are trying to wake a "sendr"
then we use
the next
function to find 1 entry in the
table self._wakers.sendr
, if we have at least 1 entry in that table we assign the value to waker
.
If we are trying to wake a "recvr"
we assign self._wakers.recvr
to waker
. If waker
is
a function (aka not nil
) then we call that function.
function BoundedChannel:close()
self._closed = true
end
close
will just set our _closed
property
to true
Ok, now that we have our shared channel defined, let's implement our receiver.
Bounded Recvr
Next, we will define a Lua table that will represent the receiving half of our channel. This
table will have 2 properties _link
which will be our BoundedChannel
and a timeout
which will
be an optional number
.
local BoundedChannelReceiver = {}
BoundedChannelReceiver.__index = BoundedChannelReceiver
Ok, let's create a constructor for it, this will take 1 argument which should populate
its _link
property.
function BoundedChannelReceiver.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelReceiver)
end
Notice we didn't include a timeout
property at all, this is because
we want it to be nil
by default. In order to match the same API that
cosock.channel
uses let's add a method for setting our timeout
.
function BoundedChannelReceiver:settimeout(timeout)
self.timeout = timeout
end
Next, we want to define the setwaker
method used by this side of the queue.
function BoundedChannelReceiver:setwaker(kind, waker)
if kind ~= "recvr" then
error("Unsupported wake kind for receiver: " .. tostring(kind))
end
self._link:set_waker_recvr(waker)
end
In this method, we have added a check to make sure it isn't getting called
with a kind
of "sendr"
since that would be pretty much meaningless. If
we have a valid kind
then we pass the waker down to self._link:set_waker_recvr
.
Ok, now for the good stuff: the receive
method.
function BoundedChannelReceiver:receive()
while true do
local can_recv, err = self._link:can_recv()
if can_recv then
local element = self._link:pop_front()
self._link:try_wake("sendr")
return element
end
if err == "closed" then
return nil, err
end
if err == "empty" then
local _r, _s, err = coroutine.yield({self}, nil, self.timeout)
if err then
return nil, err
end
end
end
end
Alright, there is a lot going on here so let's unpack it. To start we have a long-running
loop that will only stop when we have reached either an error or a new message. Each iteration
of the loop first checks if self._link:can_recv()
, if that returns 1
, then we call
self._link:pop_front
to capture our eventual return value, next we want
to alert any senders that more space has just been made on our queue so we call
self._link:try_wake("sendr")
, finally we return the element we popped off ending our loop.
If can_recv
returned nil
we check to see which err
was provided, if it was "closed"
we return that in the error position also ending our loop. If can_recv
returns nil, "empty"
then we want to yield until either we get woken by a sender or we have waited for the duration
of self.timeout
. We do this by calling coroutine.yield({self}, nil, self.timeout)
, this
will give up control to cosock until either someone calls BoundedChannel:try_wake("recvr")
or our
timeout
is reached.
If we recall that coroutine.yield
returns a list of ready receivers and a list of senders
or nil, nil
and an error message. This means if coroutine.yield
returns {self}
then
we have a new message so we go to the top of the loop and the next call to self.link:can_recv
should return 1
or nil, "closed"
. If coroutine.yield
returns nil, nil, "timeout"
that
means we have yielded for self.timeout
.
One final thing we want to make sure is that we are keeping our end of that nil "closed"
bargain, so let's define a closed
method.
function BoundedChannelReceiver:close()
self._link:close()
--- If anything is waiting to send, it should wake up with an error
self._link:try_wake("sendr")
end
For this, we first call self.link:close
which will set our shared table's _closed
property
to true
which will ultimately make both can_send
and can_recv
return nil, "closed"
. Next,
we want to wake up any sending tasks since they are reliant on us to tell them something has
changed, so we call self._link:try_wake("sendr")
.
With that we have the complete receiver side of our channel, now let's write up the sender.
Bounded Channel: Sendr
To start, we define our Lua table which represents the sending half of our channel.
local BoundedChannelSender = {}
BoundedChannelSender.__index = BoundedChannelSender
This table will have the same shape as our BoundedChannelReceiver
, it will have a
_link
property and a timeout
property; our constructor is nearly identical.
function BoundedChannelSender.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelSender)
end
We will also define a settimeout
method that looks exactly the same.
function BoundedChannelSender:settimeout(timeout)
self.timeout = timeout
end
Our close
method is also very close to the receiver's implementation.
function BoundedChannelSender:close()
self._link:close()
-- If anything is waiting to receieve it should wake up with an error
self._link:try_wake("recvr")
end
Here, just the argument to self._link:try_wake
changed from "sendr"
to "recvr"
.
Now we start to see things become quite different. The first thing to note is that we are
not going to define a setwaker
method for this table. This may seem strange since it is
one of the few things that we need to do to make a "cosock aware" table but if we were
to use the same setwaker
for all of the places that we call BoundedSender:send
, we would
end up gumming up the internal workings of cosock. To see how we get around this it would be
good to go over the implementation of send
.
function BoundedChannelSender:send(msg)
while true do
local can_send, err = self._link:can_send()
if can_send then
self._link:push_back(msg)
-- Wake any receivers wo might be waiting to receive
self._link:try_wake("recvr")
return 1
end
if err == "closed" then
return nil, "closed"
end
if err == "full" then
local wake_t = {
setwaker = function(t, kind, waker)
assert(kind == "sendr")
self._link:set_waker_sendr(t, waker)
end
}
local _r, _s, err = coroutine.yield(nil, {wake_t}, self.timeout)
if err then
return nil, err
end
end
end
end
To start, things look a lot like our BoundedRecvr:receive
implementation, we have
a long-running loop that first calls self._link:can_send
, if that returns 1
we
use the push_back
helper to add the message to our queue and then we try to wake up
any yielded "recvr"
s, returning 1
to indicate it was successful. If can_send
returned
an error message and that message was "closed"
we return nil, "closed"
. If can_send
returned
the error message "full"
we want to yield
until we can try again.
To prepare for yielding, we first create a table called wake_t
this will represent a single
call to BoundedSender:send
that is yielding. On wake_t
we set 1 property and that is the
setwaker
method which uses assert
to raise an error if it was called with a kind
of "recvr"
and then uses the BoundedChannel:set_waker_sendr
method to associate the
waker
argument with wake_t
. By creating this temporary table, what we are doing is allowing
for a unique waker
function to be defined on any threads that need waking. If we were to use a
single BoundedChannel._wakers.sendr
function, we would end up removing the ability
to wake any yields
beyond the last because calling waker
always calls setwaker(kind, nil)
to
avoid potential "double wakes".
Now that we have set up our wake_t
we can call couroutine.yield
this time we are going
to use the arguments nil, {wake_t}, self.timeout
. Since we put wake_t
in the sendt
argument, we will wait until we either reach the duration of self.timeout
or when someone
calls BoundedChannel:try_wake("sendr")
and our wake_t
is returned from next
.
This is probably the easiest way to create these unique
waker
s but it does come with a potential issue. If you are interested in using this implementation please review Appendix-A
Now, let's finish up our implementation and see if we can see our bounded channel working.
Bounded Channel: Finish
At this point, we have most of our BoundedChannel
and all of our BoundedChannelReceiver
and BoundedChannelSender
set up so the last thing we need to do is add a constructor
to BoundedChannel
.
function BoundedChannel.new(max_depth)
local link = setmetatable({
_max_depth = max_depth,
_wakers = {
sendr = {},
},
_msg_queue = {},
_closed = false,
}, BoundedChannel)
return BoundedChannelSender.new(link), BoundedChannelReceiver.new(link)
end
This constructor takes 1 argument, the number telling us how large our queue can get.
This returns 2 tables, the first return is a BoundedChannelSender
and the second
return is a BoundedChannelReceiver
both have the same shared BoundedChannel
as
their _link
property.
Now let's see our new channel in action!
local cosock = require "cosock"
local BoundedChannel = require "examples.bounded_channel"
local tx, rx = BoundedChannel.new(2)
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=1,10 do
tx:send(i)
local e = cosock.socket.gettime()
print(string.format("sent %s in %.1fs", i, e - s))
s = e
cosock.socket.sleep(0.2)
end
end, "sendr1")
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=11,20 do
tx:send(i)
local e = cosock.socket.gettime()
print(string.format("sent %s in %.1fs", i, e - s))
s = e
cosock.socket.sleep(0.2)
end
end, "sendr2")
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=1,20 do
local msg = rx:receive()
local e = cosock.socket.gettime()
print(string.format("recd %s in %.1fs", msg, e - s))
s = e
cosock.socket.sleep(1)
end
end, "recvr")
cosock.run()
After we import both cosock
and our BoundedChannel
we create a new channel pair
with a maximum queue size of 2. We then spawn 2 new tasks for the sender, in these tasks
we loop 10 times, sending a message and then sleeping for 0.2 seconds.
We have a call to cosock.socket.gettime
here before and after the send
to see if there is any
delay.
Next, we spawn a task for our receiver, this receives a message and then sleeps for 1 second 20 times.
Since we are sending a lot faster than we are receiving, we would expect that after the first few messages we should see the amount of time it takes to send a message hits about 1 second indicating that our queue has reached its maximum of 2. If we were to run this we should see something like the following.
sent 1 in 0.0s
sent 11 in 0.0s
recd 1 in 0.0s
sent 2 in 0.2s
recd 11 in 1.0s
sent 12 in 1.0s
recd 2 in 1.0s
sent 13 in 1.0s
recd 12 in 1.0s
sent 14 in 1.0s
recd 13 in 1.0s
sent 15 in 1.0s
recd 14 in 1.0s
sent 16 in 1.0s
recd 15 in 1.0s
sent 17 in 1.0s
recd 16 in 1.0s
sent 18 in 1.0s
recd 17 in 1.0s
sent 19 in 1.0s
recd 18 in 1.0s
sent 20 in 1.0s
recd 19 in 1.0s
sent 3 in 9.8s
recd 20 in 1.0s
sent 4 in 1.0s
recd 3 in 1.0s
sent 5 in 1.0s
recd 4 in 1.0s
sent 6 in 1.0s
recd 5 in 1.0s
sent 7 in 1.0s
recd 6 in 1.0s
sent 8 in 1.0s
recd 7 in 1.0s
sent 9 in 1.0s
recd 8 in 1.0s
sent 10 in 1.0s
recd 9 in 1.0s
recd 10 in 1.0s
From this output, we can determine the exact order things played out. First,
we can see that sendr1
is able to push 1
onto the queue, then it sleeps for 0.2
seconds which allows sendr2
to push 11
onto the queue which also sleeps for 0.2
seconds then recvr
pops 1
off the queue followed by a 1-second sleep.
For the first time at this point, we have 3 sleeping tasks. Since sendr1
went first
it will be the first to wake from its sleep
, pushes 2
onto the queue and
then goes back to sleep
which allows sendr2
to wake up and try to send 12
but the queue is
full ({11, 2}
), so it has to wait until recvr
wakes up to pop 11
off the queue. Once recvr
pops 11
off the queue, we see that sendr2
is able to push 12
but it took 1 full second
to do so! Now we will stay in a state where both sendr1
and sendr2
are waiting to send
for ~1 second until recvr
is able to receive
at which point either sendr1
or sendr2
again pushes a new value. Once we reach the last 2 values, we see that our sendr
s go quiet
because they are all done with their work but recvr
still takes another 2 seconds to complete.
Did you notice that
sendr2
gets to go far more often thansendr1
at this start and it takessendr1
9.8 seconds to send3
? This is because of our waker scheme and Appendix A has more on that
Appendix A: Bounded Channel Limits
In our BoundedChannelSender:send
, BoundedChannel:set_waker_sendr
and
BoundedChannel:try_wake
we have made some decisions that make our implementation a little easier
to read/write but might cause some problems if we end up depending on it.
To review, any time BoundedChannelSender:send
would yield
we create a temporary table
to handle our setwaker
calls and in BoundedChannel:set_waker_sendr
we use that table as the key to _wakers.sendr
to store/remove the waker
function.
In BoundedChannel:try_wake
we use the next
function to choose which entry of _wakers.sendr
to
call. The next
function will always return the "first" key/value pair but what does that mean,
how can Lua tables be ordered? When a table is created, it is assigned a memory address, unique
to that table, we can see what the address is by using the default __tostring
metamethod.
lua -e "print({})"
table: 0x5581bc0236a0
In the above, we can see the memory address of an empty table is 0x5581bc0236a0
, which will
change if we were to run it again. If we were to use this table as the key in another table that
table would look something like this.
{
[0x5581bc0236a0] = "first element"
}
So, let's look at how Lua might order a table like this with more than one key.
local tables = {}
for i=1, 10 do
tables[{}] = i
end
for i=1, 10 do
local t, v = next(tables)
local tp = tonumber(tostring(t):match("table: 0x(.+)"), 16)
print(tp, v)
tables[t] = nil
end
This example will create a table tables
then loop 10 times assigning tables[{}]
with i
.
In a second loop to 10, we use the next
function to pull the "first" key/value pair from
our tables
. We then convert t
by converting it into a hex string and then converting it
back into a number, which may be easier to read for some than trying to tell which hex value
is larger than another. It prints out the table representation and which i
was assigned to it
then removes it from tables
by assigning tables[t]
with nil
. If we run this once we might
see something like.
93877018273488 1
93877018274240 9
93877018274016 7
93877018273792 5
93877018273616 3
93877018274352 10
93877018274128 8
93877018273904 6
93877018273680 4
93877018273552 2
At first, it looks like it might go in order because we get 1
but then we
see the second return from next
is 9
. If we run it again we might see something like:
93837605766864 2
93837605767120 6
93837605767376 10
93837605766928 3
93837605767184 7
93837605766992 4
93837605767248 8
93837605766800 1
93837605767056 5
93837605767312 9
This time, we get 2
first and 9
last which means that we can expect the order to be somewhat
random. We can also see pretty obviously that Lua has ordered the keys by the lowest memory address
first. That means that next
will return the waker
associated to the wake_t
that has
the lowest memory address. So what happens if one coroutine always gets the lowest value?
It could starve other coroutines from being able to send
.
This might not be all bad though, randomness can be good since we don't want to show a preference and randomness does just that but that would require something like "a normalized distribution" which would mean it would take a very long time to see the same order. Let's see how random our temporary table keys are.
local m = {
sets = {}
}
local table_size = 9
function m:add_set(set)
for idx, existing in ipairs(self.sets) do
local exact_match = false
local left_t, left_v, right_t, right_v
for i=1,table_size do
left_t, left_v = next(set, left_t)
right_t, right_v = next(existing, right_t)
exact_match = left_v == right_v
if not exact_match then
break
end
end
if exact_match then
table.insert(self.sets, set)
return {
first = existing,
first_idx = idx,
second = set,
second_idx = #self.sets
}
end
end
table.insert(self.sets, set)
end
local function gen_set()
local tables = {}
for i=1, table_size do
tables[{}] = i
end
return tables
end
local result
repeat
result = m:add_set(gen_set())
until result
print("RESULT")
print(result.first_idx, result.second_idx)
print("f,s")
for i=1,table_size do
local t1, v1 = next(result.first)
local t2, v2 = next(result.second)
print(string.format("%s,%s", v1, v2))
result.first[t1] = nil
result.second[t2] = nil
end
Here we have extended our example to allow for repeating the creation of tables
and then checking
to see if the new version matches any of the previous versions. We reduced the number of entries
to 9 to make it easier to read the results but otherwise, get_set
will create the same table
as our original example. We have defined a table to hold all of our sets named m
and defined a
method there add_set
which will either return nil
if the set
argument isn't already in the
list or a results table if it was found. So what happens if we run this?
RESULT
1 4
f,s
4,4
8,8
1,1
5,5
9,9
2,2
6,6
3,3
7,7
It looks like it only took us 3 sets to find the exact same order. Considering that 0-9 have a potential number of combinations greater than 300,000 it seems that our distribution not very normal.
Advanced
In the following chapters, we will discuss in detail the inner workings of cosock
. The subject
matter is going to shift focus from "how do I use cosock" to "how does cosock work" which may
not be of interest to everyone.
Internals
This section will go over the contents of the cosock.socket.internals
module and how that
interacts with the cosock
runtime and the underlying Luasocket library.
Run
This is a step-by-step explanation of what happens in each pass through the main loop
of cosock.run
.
Internals
The module cosock.socket.internals
is where luasocket gets wrapped into a "cosock aware"
table. Initially, a call to passthroughbuilder
is used to create a "builder" function.
passthroughbuilder
takes 2 arguments, recvmethods
and sendmethods
which are both
a table where the keys are a method name and the values are a set-like table of
error messages that it would be appropriate to yield
for. A good example of one of these is
the tcp
module's recvmethods
.
local recvmethods = {
receive = {timeout = true},
accept = {timeout = true},
}
In both of the methods defined here, if we were to get the return value of nil, "timeout"
would be a signal to call coroutine.yield
and try again. The return value of
passthroughbuilder
is a function that we will call builder
. builder
takes 2 arguments
method
which is a string and an optional transformsrc
which is a table,
or a function that returns a table, with the following properties.
input
: this is an optional function that takes the method inputs and returns those inputs potentially transformed.- This is only called once, just before we call the luasocket method for the first time
blocked
: this is an optional function that takes the return values from the method called and returns the input arguments to the next call of the same methodoutput
: This is an optional function that will be called with the return values of the method- This is only called once, just before we return from the method
Let's use a few examples to go over each of these starting with the input
property.
The method receive
on a luasocket takes 2 arguments, a pattern string or number indicating
how many bytes to try and read for and an optional prefix to put at the front of what was
received.
local socket = require "luasocket"
local t = socket.tcp()
t:connect("0.0.0.0", 8000)
print(t:receive("*l", "next line:"))
Assuming that some server is listening on port 8080 of the machine we run this on, we would
receive 1 line, for example, "ping\n"
this would print "next line: ping"
. As we will get
into later, a call to cosock's receive
may end up calling luasocket's receive
until we get to a new line character. So what if our server sent 1 byte at a time? We would end
up printing "next line: pnext line: inext line: nnext line: g"
if we passed the second argument
to the underlying luasocket. To avoid this we can use the input
property to store this pattern
and only add it once to the eventual return value.
Now let's consider the blocked
property, continuing to use receive
as our example method,
what happens if we call t:receive(10)
and again our server returns 1 byte at at time?
We can't call the underlying luasocket method with 10
over and over, that would result in us
requesting too many bytes from the socket. Instead, we need a way to capture the partial value
we received and reduce the number of bytes accordingly. Thankfully luasocket returns
any partial data on error as the 3rd return argument so we could do something like
{
blocked = function(success, err, partial)
table.insert(shared_buffer, partial)
remaining_recv = remaining_recv - #partial
return remaining_recv
end
}
This example assumes that shared_buffer
and remaining_recv
exist somewhere but
you can see that we are appropriately reducing the number of bytes we return here. This
will eventually be the argument provided to the next call to the luasocket method.
Here is a longer-form example of how a response of 1 byte at a time would look for our
luasocket.
local shared_buffer = {}
local remaining_recv = 5
--p
local _, err, chunk = t:receive(remaining_recv)
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--i
err, chunk = t:receive(remaining_recv) --i
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--n
err, chunk = t:receive(remaining_recv) --n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--g
err, chunk = t:receive(remaining_recv) --g
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--\n
err, chunk = t:receive(remaining_recv) --\n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
Finally we have the output
property which gets called with the last
return values from our method. If we complete our example, this is where
we would end up calling table.concat(shared_buffer)
to add all
the chunks together before returning.
Continuing to use receive
as an example, this is what the transform
argument might look like.
local recvmethods = {
receive = {timeout = true}
}
local sendmethods = {}
-- First we define a builder injecting the method<->error message maps
local builder = passthroughbuilder(recvmethods, sendmethods)
-- Now we can use the builder to define a method that doesn't do any
-- transformations
m.bind = builder("bind")
-- Here we define a method that performs some transformations
m.receive = builder("receive", function()
local shared_buffer = {}
local remaining_recv
local pattern
return {
input = function(pat, prefix)
-- insert the prefix as the first part of our return
-- value if present
if prefix then
table.insert(shared_buffer, prefix)
end
if type(pat) == "number" then
-- we know how many bytes to wait for, set this
-- for use in blocked
remaining_recv = pat
else
-- store this for use in blocked
pattern = pat
end
-- return only pattern to avoid duplicate prefixes
return pattern
end,
blocked = function(_, err, partial)
if type(partial) == "string" and #partial > 0 then
table.insert(shared_buffer, partial)
-- only reduce remaining_recv if it is a number
if remaining_recv then
remaining_recv = remaining_recv - #full
-- returning the updated remaining receive
return remaining_recv
end
-- otherwise we return the pattern provided to input
return pattern
end
end,
output = function(full, err, partial)
-- if the first return is a string with a length > 0 then
-- add it to the buffer
if type(full) == "string" and #full > 0 then
table.insert(shared_buffer, full)
end
-- if the third return is a string with a length > 0 then
-- add it to the buffer
if type(partial) == "string" and #partial > 0 then
table.insert(shared_buffer, partial)
end
-- concatenate all the strings together
local all = table.concat(shared_buffer)
if err then
-- if ther was an error it should go in the 3rd return
-- position
return nil, err, all
else
-- if not error then it should go in the 1st return
-- position
return all
end
end
}
end)
With the arguments defined, we can now discuss the return value of builder
which will be a third function, this one being
the method's implementation, its first argument is self
and varargs
are used to allow for any additional arguments.
Let's pause here and go over this because 3 levels of functions can be a bit difficult to follow. Our goal here is to re-use as much as possible for each of the methods on a cosock socket and since yield -> retry loop is going to be a common pattern we can define all of that in 1 place. The key is that these methods are going to need to know about a few extra pieces which is achieved by the fact that each function's arguments are available to the returned function.
Which means that the receive
method would have the following environment.
local recvmethods = {
receive = { timeout = true }
}
local sendmethods = {}
local method = "receive"
local transformsrc = function() --[[see above]] end
Now let's go over what actually happens in this shared method implementation.
First, we capture all of the varargs into a table named inputparams
,
if the transform object had an input
property defined, we then overwrite
the variable with {input(table.unpack(inputparams))}
. Now that we have our
inputs the way they need to be we begin a long-running repeat
/until
loop.
At the top of the loop we call self.inner_sock[method]
, inner_sock
is
the property name for the luasocket on all of the cosock sockets. If the
first return from that function is nil
we check to see if the second
return value can be found in receivemethods
or sendmethods
,
if so we know that we need to yield, so we check if blocked
is defined and call that if it is, again overwriting inputparams
with the return value.
Now we determine what kind
of yield we are going to do, if the second
return was found in receivemethods
it would be "recvr"
if it was
found in sendmethods
it would be "sendr"
. Now we set up our arguments
for coroutine.yield
putting self
into recvt
if our kind
is "recvr"
or into sendt
if our kind
is "sendr"
. Now we can call coroutine.yield(sendt, recvt, self.timeout)
assigning the returns there to recvr, sendr, rterr
. If rterr
is not nil
, we are going to return early, if its
value matches the error from our method call (i.e. "timeout" for both) then
we return the values from our call to that method.
The last thing we do in this case before heading back to the top of the loop
is to assert our kind and the result of cosock.socket.select
match, meaning we have
a kind
of "sendr"
, the sendr
variable is populated, and the recvr
variable is unpopulated;
or vice versa.
If the first return argument to our method call was not nil
then we
can exit early transforming the return value with output
if that is
populated.
The only other function provided by cosock.socket.internals
is
setuprealsocketwaker
which completes the wrapping of our cosock socket.
This function takes the socket table and an optional list of kinds
, if
kinds
is not provided then the default will be both sendr
and recvr
.
We then define a method on socket
called setwaker
which is used
by cosock to wake up sleeping coroutines
(see the integrating chapter for more info).
This setwaker
will assign the provided waker
function to
a self.wakers
table based on the kind
of waker
. It also defines
a method _wake
which takes an argument kind
and varargs for any
additional arguments. This method will see if self.wakers[kind]
is
not nil
and if so call that with the varargs. It then replaces
self.wakers[kind]
with nil
.
Advanced Overview of cosock.run
Global Variables
Cosock utilized a few global variables to allow for the potentially recursive nature of lua coroutines.
threads
: List of all coroutines cosock is aware of- This is populated by the first argument to
cosock.spawn
- This is populated by the first argument to
threadnames
: A map of coroutine<->name pairs- This is populated by the second argument ot
cosock.spawn
- This is populated by the second argument ot
threadswaitingfor
: A map of coroutine<->select args- Select args have the type
{recvr: table<cosock.socket>, sendr: table<cosock.socket>, timeout: float?}
- This populated by the values provided to
coroutine.yield
for cosock tasks from a call tocosock.socket.select
- Select args have the type
readythreads
: A map of coroutine<->resume args that will be ready on the next pass- Resume args have the type
{recvr = table<cosock.socket>, sendr = table<cosock.socket>, err: string?}
- This is populated by coroutine wake-ups that occur on the current pass
- Resume args have the type
socketwrappermap
: A map of luasocket<->cosock socket pairs- This map is keyed with the table pointer for the luasocket for easily getting back to a cosock socket when you only have a luasocket
- This gets populated when a cosock socket is included in a select args table
threaderrorhandler
: Potential error handler function. Not currently settable.timers
: see Timers
Run Loop
To start the loop we define a few local variables. First up is wakethreads
, This table will be
populated by removing all of the elements from readythreads
, which frees up readythreads
to
be added to by any tasks we are about to wake up. Next are the two list tables sendt
and recvt
along with the optional integer timeout
, these will end up being passed to luasocket's socket.select
when we run out of ready threads. Now we can move all our readythreads
into wakethreads
and then
loop over all of the ready threads.
For each ready thread, we first check if the coroutine.status
for it is "suspended"
, if it isn't we
will skip this thread. For any "suspended"
thread, we first cancel any timers that might be scheduled
for that thread by calling timers.cancel
. Next we pull out the values we stored in threadswaitingfor
and call skt:setwaker
with nil
for any sendr
or recvr
properties, this will prevent any potential
"double wakes" from occurring.
Now we call coroutine.resume
with our thread
, and the recvr
, sendr
and err
values that were stored
in wakethreads
. When coroutine.resume
completes, we have 2 pieces of information that drive our next path.
The first return value from coroutine.resume
will indicate if our thread
raised an error or not, the second
is that we call coroutine.status
on our thread
. If our thread
's status is "dead"
and coroutine.resume
returned false
, something has gone terribly wrong so we raise an error (with a traceback if debug
is available).
If our thread
's status is "dead"
and coroutine.resume
returned true
, we just remove our thread
from threads
and threadswaitingfor
. If our thread
's status is "suspended"
and coroutine.resume
returned true
then we first
update threadswaitingfor
with the remaining return values from coroutine.resume
. We also then call skt:setwaker
for any sendr
and recvr
in those values to a function that will clear itself and call the local function
wake_thread
. At this point we also update our local tables recvt
and sendt
, to include the skt.inner_sock
values from threadswaitingfor
which are the luasocket
s associated with a cosock.socket
.
Now that we have resumed all of the wakethreads
and filled in our eventual arguments to socket.select
, we then
determine if we are still running
, we do this by looping over all of our threads
and checking that at least 1 has
a coroutine.status
that is not "dead"
. If all threads
are "dead"
and nothing got added to readythreads
then
we exit the run loop. Next we update our socketwrappermap
by looping over all of the values in threadswaitingfor
and insert each recvr
and sendr
into the key of skt.inner_sock
.
With all the bookkeeping done, we run all of the timers
that have reached their deadline, and update our
local variable timeout
to the duration until the shortest remaining timeout. If at least one thread was added to
readythreads
, we set our timeout to 0, because we already know we have new work to do. At this point if timeout
is
nil
and both sendt
and recvt
are empty, we raise an error because we are about to call
socket.select({}, {}, nil)
which would just block forever. At this point, we call socket.select
capturing the
values in recvr
, sendr
and err
. If err
isn't the value "timeout"
, we raise that error. If err
is nil
or "timeout"
, we loop over all of the values in recvr
and sendr
, looking up the cosock socket in
socketwrappermap
and calling skt:_wake
which would call the function we provided to setwaker
above.
With that complete, we now have a fully updated readythreads
and we start the loop again.
Outline Version
- Define
wakethreads
- Define an empty list of senders (
sendt
), receivers (recvt
) and atimeout
- Pop all
readythreads
entries into thewakethreads
- Loop over all threads in
wakethreads
- If
coroutine.status
for that thread returns "suspended"- Clear any timers
- Clear any wakers registered with a
timeout
coroutine.resume
with the storedrecv4
,sendr
anderr
arguments- If
coroutine.resume
returnedtrue
in the first position andcoroutine.status
returns "suspended"- Re-populate
threadswaitingfor[thread]
with the 3 other return values fromcoroutine.resume
- These should be the
recvt
,sendt
andtimeout
values that will populate select args
- These should be the
- Set the waker for all sockets in
recvt
andsendt
to callwake_thread
and then unset themselves - If
coroutine.resume
returned atimeout
, create a new timer for this thread which will callwake_thread_err
on expirations with the value "timeout"
- Re-populate
- if
coroutine.status
returned "dead"- If
coroutine.resume
returnedfalse
in the first position and nothreaderrorhandler
has been set- Raise an error
- If the
debug
library is available, include adebug.traceback
and the second return value fromcosock.resume
- Else just raise an error with the second return value from
cosock.resume
- If the
- Exit the application
- This calls
os.exit(-1)
- This calls
- Raise an error
- If
- Else, print a warning message if printing is turned on
- If
- Initialize a variable
running
tofalse
- Loop over all
threads
, callingcoroutine.status
on each, if at least 1 doesn't return "dead", setrunning
totrue
- If
running
isfalse
andreadythreads
is empty- Exit the run loop
- Loop over all the values in
threadswaitingfor
- Insert the luasockets on any
sendr
orrecvr
parameters to the loop local variablessendt
andrecvt
- Populate
socketwrappermap
with anysendr
orrecvr
s
- Insert the luasockets on any
- Call
timers.run
- If
readythreads
is not empty- Set
timeout
to0
- Set
- If
timeout
is falsy andrecvt
is empty andsendt
is empty- Raise an error that cosock.select was called with no sockets and no timeouts
- Call luasocket's
socket.select
with our loopsrecvt
,sendt
andtimeout
- If
socket.select
returns a value in the 3rd position and that value is not"timeout"
- Raise an error with that return value
- Loop over the
recvr
(1st) return fromsocket.select
- Look up the
cosock.socket
fromsocketwrappermap
- Call
skt:_wake("recvr")
- Look up the
- Loop over the
sendr
(2nd) return fromsocket.select
- Look up the
cosock.socket
fromsocketwrappermap
- Call
skt:_wake("sendr")
- Look up the
Timers
Internally, we keep a list of timer
objects to determine when any thread
would have reached the maximum time
it should be running/yielding for. We can interact with these through a module local variable timers
which
has a few associated functions.
Inside of a do
block, we create 2 local variables timeouts
and refs
for use in the timer
associated functions.
The first associated function worth discussing is timers.set
, which takes the arguments timeout: float
,
callback: fun()
and ref: table
. When called, we first capture the current timestamp via socket.gettime()
,
we then calculate the deadline for this timer by adding timeout
to that timestamps into a variable timeoutat
.
We then table.insert
the the table { timeoutat = timeoutat, callback = callback, ref = ref }
into timeouts
.
If ref
isn't nil
we also populate refs[ref]
with that same table.
Next up is timers.cancel
which takes the arguments ref: table
. When called, we first lookup the timeout info
from refs[ref]
, if we find something there we remove the values in the properties callback
and ref
and finally
we remove the value from refs
. By removing the callback
we avoid ever calling the consequence of that timer.
Eventually it will be removed from timeouts
in the next call to run
.
Finally we have timers.run
this function takes no arguments. When called, it first sorts the timeouts
table in
ascending order by timeoutat
, where nil
values are the smallest values. We then capture the current timestamp
by calling socket.gettime
. Now we consult the first element of timeouts
, if that table as a timeoutat
of nil
or is less than now, we pop it off list, if it has a callback
property we call that, if it has a ref
property
we remove it from refs
.
Now that all the pending timers are done, we use the new first element of timeouts
' timeoutat
property to calculate
the next relative timeout (timeoutat - now
) and return that as the earliest timeout. If timeouts
is empty, we return
nil
.
Outline Version
- A timer has the shape
{timeoutat: float, callback: fun(), ref: table?}
timers.set
- Updates
timers
to include that value. Also updates a private scoped table namedrefs
refs
is a map of table pointer<->timer which is used for cancellation of a timer
- Updates
timers.cancel
- If the provided table pointer is in
refs
, remove thecallback
andref
properties from that table - Set the table pointer key in
refs
tonil
- If the provided table pointer is in
timers.run
- Sort all timeouts by deadline (earliest first)
- Pop the timer off the front of the
timers
list - If that
timer.timeoutat
isnil
or< socket.gettime()
- Call
timer.callback
- remove this
timer
fromrefs
- Call
- If there are any more timeouts left, return how long before that timeout should expire
- If there are no more timeouts, return
nil