Cosock
Cosock is a coroutine runtime written in pure Lua and based on the popular luasocket library.
The goal of the project is to provide the same interfaces that luasocket provides but wrapped up in coroutines to allow for concurrent IO.
Note: these docs will use the term coroutine, task, and thread interchangeably to all mean a lua coroutine
For example, the following 2 lua programs use luasocket to define a tcp client and server.
--client.lua
local socket = require "socket"
local client = socket.tcp()
client:connect("0.0.0.0", 9999)
while true do
print("sending ping")
client:send("ping\n")
local response = assert(client:receive())
assert(response == "pong")
end
--server.lua
local socket = require "socket"
local server = socket.tcp()
server:bind("0.0.0.0", 9999)
server:listen()
print("listening", server:getsockname())
local client = server:accept()
while true do
local request = assert(client:receive())
assert(request == "ping")
print("sending pong")
client:send("pong\n")
end
If you were to run lua ./server.lua first and then run lua ./client.lua you should see each terminal print out
their "sending ..." messages forever.
Using cosock, we can actually write the same thing as a single application.
-- client_server.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
--- Since the client and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
local client = server:accept()
while true do
local request = assert(client:receive())
print(string.format("received %q", request))
assert(request == "ping")
print("sending pong")
client:send("pong\n")
end
end, "server task")
--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
--- wait for the server to be ready.
local port = assert(port_rx:receive())
local client = socket.tcp()
client:connect(ip, port)
while true do
print("sending ping")
client:send("ping\n")
local request = assert(client:receive())
assert(request == "pong")
end
end, "client task")
--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()
Now if we run this with lua ./client_server.lua we should see the messages alternate.
Notice that we called cosock.spawn twice, once for the server task and
once for the client task, we are going to dig into that next. We also added a call to cosock.run
at the bottom of our example, this function will run our tasks until there is no more work to do
so it is important you don't forget it or nothing will happen.
Getting Started
The easiest way to use cosock is to install it with luarocks.
luarocks install cosock
cosock depends on both luasocket
and luasec, when running the above command
luarocks will attempt to compile both of these libraries which have some system
dependencies.
Luasocket
The version of luasocket we are using requires that the Lua development package is
available.
Linux
For Debian-based systems, you would need to run the following
sudo apt-get install liblua-dev
For Fedora
sudo dnf install lua-devel
Windows
Help Wanted: please open a PR with info here if you have successfully got this working on windows.
MacOS
These can be downloaded for MacOS via brew
brew install lua
Luasec
Luasec depends on Openssl so you will need those development libraries
Linux
For Debian-based systems, you would need to run the following
sudo apt-get install libssl-dev
For Fedora
sudo dnf install openssl-devel
Windows
Help Wanted: please open a PR with info here if you have successfully got this working on windows.
MacOS
These can be downloaded for MacOS via brew
brew install openssl
Spawn
At the core of cosock is the ability to wrap any operation in a coroutine and
register that with cosock. For this cosock exports the function cosock.spawn. This function takes
2 arguments, the first is a function that will be our coroutine, and the second is a name for that coroutine.
For example, this is a simple program that will spawn a single coroutine, which will print the current timestamp and the word "tick" and then sleep for 1 second in a loop forever.
--basic_spawn.lua
local cosock = require "cosock"
cosock.spawn(function()
while true do
print(cosock.socket.gettime(), "tick")
cosock.socket.sleep(1)
end
end, "clock")
cosock.run()
The act of calling cosock.spawn allows us to use the non-blocking cosock.socket.sleep function. This means
we could extend our application to not only print this message every second but use the time this coroutine
is sleeping to perform some other work. Let's extend our little example a bit.
--less_basic_spawn.lua
local cosock = require "cosock"
local function tick_task()
while true do
print(cosock.socket.gettime(), "tick")
cosock.socket.sleep(2)
end
end
local function tock_task()
cosock.socket.sleep(1)
while true do
print(cosock.socket.gettime(), "tock")
cosock.socket.sleep(2)
end
end
cosock.spawn(tick_task, "tick-task")
cosock.spawn(tock_task, "tock-task")
cosock.run()
Very similar to our last example, this time we are spawning 2 coroutines one will print "tick" every two seconds
the other will wait 1 second and then print "tock" every two seconds. This should result in a line getting
printed to the terminal once a second alternating between our two strings. Notice though, there is a fair amount
of code duplication as tick_task and tock_task are nearly identical. This is mostly driven by the fact
that the first argument to cosock.spawn is a function that takes no arguments and returns no values which
means we can't ask cosock to pass in any arguments. One way we can get around this is by using
closures. So instead of passing a function to
cosock.spawn we can return a function from another function and use it as the argument to cosock.spawn.
For example:
--even_less_basic_spawn.lua
local cosock = require "cosock"
local function create_task(name, should_sleep_first)
return function()
if should_sleep_first then
cosock.socket.sleep(1)
end
while true do
print(cosock.socket.gettime(), name)
cosock.socket.sleep(2)
end
end
end
cosock.spawn(create_task("tick", false), "tick-task")
cosock.spawn(create_task("tock", true), "tock-task")
cosock.run()
Notice here that create_task returns a function but takes a name argument and a should_sleep_first
argument which are available to our returned function.
Now, let's consider our first example which may not look like it but is very similar to our tick/tock example.
Instead of using cosock.socket.sleep to tell cosock we are waiting around for something, it uses
the receive method on a cosock.socket.tcp. Let's break down what is happening in that example.
To start, both tasks will be resumed which means that cosock has selected it to run, we can't say
for sure which task will get resumed first which is why we used a cosock.channel to make the
client task wait until the server was ready. Shortly after resuming, each task eventually calls
some method that will yield which means that it is waiting on something so cosock can run
another task. For the server, the first time we yield is in a call to accept, if the client
hasn't already called connect we would end up blocking so instead of blocking, we let another
task work, when we finally have a client connected cosock will wake us back up again. On the
client-side we first yield on a call to channel:receive, if the server hasn't sent the port
number we would end up blocking that task from calling bind so we let the other task work until
we finally have a port number and then cosock will wake us back up.
This pattern continues, each task running exclusively until it needs to wait for something yielding control back to cosock. When the thing we were waiting for is ready, we can continue running again.
In both our tick/tock examples and our client/server example, we reach a point where cosock is just handing control from task 1 to task 2 and back again in an infinite loop. In a more real-world program, you might see any number of tasks, that need to be juggled. In our next example, we will extend the client/server example to handle any number of clients.
-- clients_server.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
local number_of_clients = 10
--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
while true do
local client = server:accept()
cosock.spawn(function()
while true do
local request = assert(client:receive())
print(string.format("received %q", request))
if request:match("ping") then
print("sending pong")
client:send("pong\n")
else
client:close()
break
end
end
end)
end
end, "server task")
--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
print("spawn_client", id, port)
local client = socket.tcp()
client:connect(ip, port)
while true do
print("sending ping", id)
client:send(string.format("ping %s\n", id))
local request = assert(client:receive())
assert(request == "pong")
socket.sleep(0.5)
end
end
--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
local port = assert(port_rx:receive())
for i=1,number_of_clients do
cosock.spawn(function()
spawn_client(i, port)
end, string.format("client-task-%s", i))
end
end
--- Spawn a bunch of client tasks
cosock.spawn(function()
spawn_clients()
end, "client task")
--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()
Surprisingly little has changed. First, we updated the socket task to call accept more than once
and then pass the returned client into its own task to receive/send in a loop there.
For the client-side, we broke the client send/receive loop into its own task and added
a parent task to wait for the port number and then cosock.spawn a bunch of client tasks.
If you were to run this example, you would see that the print statements end up in random order!
Channels
Coordinating coroutines can be a huge pain, to ease this pain cosock offers a synchronization primitive
called channels. A cosock.channel is a "multiple producer single consumer" message queue, this means you can
have one coroutine own the receiving half of the queue and pass the sender out to however many
coroutines you'd like. We've already seen how they are used, in our
first example, we used a cosock.channel to coordinate which port
the client should connect to. What if we wanted to re-write that example without using a channel, that might look
something like this:
--client_server_no_channel.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
local shared_port
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
server:bind(ip, 0)
local _ip, p = server:getsockname()
shared_port = p
server:listen()
local client = server:accept()
while true do
local request = assert(client:receive())
print(string.format("received %q", request))
assert(request == "ping")
print("sending pong")
client:send("pong\n")
end
end, "server task")
--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
--- wait for the server to be ready.
while shared_port == nil do
socket.sleep(1)
end
local client = socket.tcp()
client:connect(ip, shared_port)
while true do
print("sending ping")
client:send("ping\n")
local request = assert(client:receive())
assert(request == "pong")
end
end, "client task")
--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()
That update removed our channel in favor of polling that shared_port ~= nil once every second. This will
absolutely work, however, we have introduced a race condition. What if we sleep for 1 full second right before
the server is issued its port? That second would be wasted, we could have already been creating our client.
While the consequence in this example isn't dire, it does show that relying on shared mutable state without
some way to synchronize it between tasks can be problematic.
As a note, we could also solve this problem by having the server task spawn the client task but that wouldn't be nearly as interesting to our current subject.
Another place we have seen that could benefit from a channel is our tick/tock example, in that example, we hard-coded the synchronization. The first task would print then sleep for 2 seconds and the second task would sleep for 1 second and then print and sleep for 2 seconds. Let's take a look at what that might have looked like if we had used channels.
--tick_tock_channels.lua
local cosock = require "cosock"
-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()
local function task(tx, rx, name)
while true do
-- First wait for the other task to tell us it is done
rx:receive()
-- print our name
print(cosock.socket.gettime(), name)
-- sleep for 1 second
cosock.socket.sleep(1)
-- tell the other task we are done
tx:send()
end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
task(tick_tx, tock_rx, "tock")
end)
-- prime the tick task to start first
tick_tx:send()
cosock.run()
In this version, we only have one definition for a task, looping forever, it will first wait to receive the signal,
and then it prints its name, sleeps for 1 second and then sends the signal on to the next task. The key to
making this all work is that we need to kick the process off by telling one of the tasks to start. Since a
channel allows for multiple senders, it is ok that we call send in more than one place.
Now say we wanted to extend our little clock emulator to print "clack" every 60 seconds to simulate the
minute hand moving. That might look something like this:
--tick_tock_clack.lua
local cosock = require "cosock"
-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()
local clack_tx, clack_rx = cosock.channel.new()
local function task(tx, rx, name)
while true do
-- First wait for the other task to tell us the count
local count = rx:receive()
-- print our name
print(cosock.socket.gettime(), name)
-- sleep for 1 second
cosock.socket.sleep(1)
-- tell the other task we are done
if count >= 59 then
clack_tx:send(0)
else
tx:send(count + 1)
end
end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
task(tick_tx, tock_rx, "tock")
end)
cosock.spawn(function()
task(tick_tx, clack_rx, "clack")
end)
-- prime the tick task tp start first
tick_tx:send(1)
cosock.run()
Here we have updated the tasks to now share a counter across our channel. So at the
start of each loop iteration, we first get the current count from our other task. We again print our name
and then sleep for 1 second but now if the count is >= 59 we send the count of 0 to our "clack" task which
will always then send a 1 to the "tick" task to start the whole process over again. Just to make sure it
is clear, we can use the send half of the "tick" task's channel in 3 places, the main thread
to "prime" the clock, the "tock" task and the "clack" task.
It is very important that we don't try and use the receiving half of a channel in more than one task, that would lead to potentially unexpected behavior. Let's look at an example of how that might go wrong.
-- bad_news_channels.lua
local cosock = require "cosock"
local tx, rx = cosock.channel.new()
cosock.spawn(function()
rx:receive()
print("task 1")
end)
cosock.spawn(function()
rx:receive()
print("task 2")
end)
tx:send()
cosock.run()
In this example, we create one channel pair and spawn two tasks which both call receive on our
channel and just before run we call send. Since the choice for which task
should run at which time is left entirely up to cosock, we can't say for sure which of these tasks
will actually receive. It might print "task 1" or "task 2".
In actuality, cosock assumes that
receivewill only ever be called from the same coroutine. Callingreceivein multiple coroutines will (eventually) raise in a error.
Select
Now that we have covered how to spawn and run coroutines using cosock, let's talk about how we
could handle multiple IO sources in a single coroutine. For this kind of work, cosock provides
cosock.socket.select, this function works in a very similar way to luasocket's socket.select,
to call it would look something like local recvr, sendr, err = cosock.socket.select(recvt, sendt, timeout)
its arguments are
recvt: This is a list of cosock sockets that are waiting to be ready toreceivesendt: This is a list of cosock sockets that are waiting to be ready tosendtimeout: This is the maximum amount of seconds to wait for one or more entries inrecvtorsendtto be ready- If this value is
nilor negative it will treat the timeout as infinity
- If this value is
Note: The list entries for
sendtandrecvtcan be other "cosock aware" tables like the lustre WebSocket, for specifics on how to make a table "cosock aware" see the chapter on it
Its return values are
recvr: A list of ready receivers, any entry here should be free to callreceiveand immediately be readysendr: A list of ready senders, any entry here should be free to callsendand immediately be readyerr: If this value is notnilit represents an error message- The most common error message here would be
"timeout"if thetimeoutargument provided is notniland positive
- The most common error message here would be
So, how would we use something like this? Let's consider our clients_server.lua example
from the spawn chapter, where we called cosock.spawn every time a new
client was accepted, this works but we don't have much control over how many tasks we end
up spawning. In large part, this is because we don't know how long each task will run. To achieve
this, we would need to be able to handle all of the client connections on the same task as the
server and to do that, we can use select.
-- clients_server_select.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local number_of_clients = 10
--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Handle a client being ready to receive
--- @param client cosock.socket.tcp
--- @return integer|nil @1 if successful
--- @return nil|string @nil if successful, error message if not
function handle_recv(client, clients)
local request, err = client:receive()
if not request then
if err == "closed" then
clients[client] = nil
end
return
end
print(string.format("received %q", request))
if request:match("ping") then
print("sending pong")
local s, err = client:send("pong\n")
if err == "closed" then
clients[client] = nil
elseif err then
print("error in recv: " .. tostring(err))
end
else
client:close()
clients[client] = nil
end
end
--- Handle a server being ready to accept
--- @param server cosock.socket.tcp
--- @return cosock.socket.tcp|nil
--- @return nil|string @nil if successful, error message if not
function handle_accept(server, clients)
local client, err = server:accept()
if err and err ~= "timeout" then
error("error in accept: " .. tostring(err))
end
if client then
clients[client] = true
end
end
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
local server = socket.tcp()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
local clients = {}
server:settimeout(0)
while true do
local recvt = {}
for client, _ in pairs(clients) do
table.insert(recvt, client)
end
if #recvt < 5 then
table.insert(recvt, server)
end
local recvr, _sendr, err = cosock.socket.select(recvt, {}, 5)
if err == "timeout" then
return
elseif err then
error("Error in select: "..tostring(err))
end
for _, sock in ipairs(recvr) do
if sock == server then
print("accepting new client")
handle_accept(server, clients)
elseif clients[sock] then
handle_recv(sock, clients)
end
end
end
end, "server task")
--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
print("spawn_client", id, port)
local client = socket.tcp()
client:connect(ip, port)
for _=1,10 do
print("sending ping", id)
client:send(string.format("ping %s\n", id))
local request = assert(client:receive())
assert(request == "pong")
socket.sleep(0.5)
end
client:close()
end
--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
local port = assert(port_rx:receive())
for i=1,number_of_clients do
cosock.spawn(function()
spawn_client(i, port)
end, string.format("client-task-%s", i))
end
end
--- Spawn a bunch of client tasks
cosock.spawn(function()
spawn_clients()
end, "clients task")
--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()
The above is an updated version of our clients/server example with some updates to limit the total number of connections to 5, let's go over the changes.
First, we've added a few helper functions to handle the different events in our system,
the first is for when a client connection is ready to receive handle_recv takes 2 arguments,
client which is a cosock.socket.tcp that was returned from a call
to accept and clients which is a table where the keys are cosock.socket.tcp clients
and the values are true. We first call client:receive to get the bytes from
the client and if that returns a string that contains "ping" then we send our
"pong" message. There are few places where this can go wrong, the call to receive
could return nil and an error message or not "ping" or the call to send could
return nil and an error message; if the error message is "closed" or the request
didn't contain "ping" then we want to remove client from clients and if it was
the latter then we want to call client:close.
Next up we have handle_accept this also takes 2 arguments server which is a
cosock.socket.tcp socket that is listening and the same map of clients. If a
call to accept returns a client then we add that client into our clients map.
If accept returns nil and err isn't "timeout" then we raise an error.
Alright, with these two helper functions we can now update the "server" task to
handle all of the connected clients w/o having to call spawn. Our tasks starts
out the same as before, creating a server socket, binding it to a random port,
gets that port and sends it to our "clients task" and then calls listen.
At this point, things start to change, first we define our clients map as
empty we then use handle_accept to accept the first connection and then call
server:settimeout(0) to avoid a potential server that will yield forever.
Inside of our long-running loop, we start out by defining a new table recvt which
will match the argument to select which has the same name. We then loop over our
clients table, inserting any of the keys into recvt. We keep these as separate
tables because we want to be able to remove a client from our readiness check
once it has closed. Next, we check to see how large recvt is, if it is below 5
we add server into it. By only including server when recvt has fewer than
5 clients we have enforced our max connections limit.
With recvt defined we can finally call cosock.socket.select, we use recvt as
the first argument, an empty table as the sendt argument and finally a timeout of 5 seconds.
We assign the result of select into recvr, _sendr, err, we would expect that
recvr would contain any of our clients that are ready to receive and, if
we are below the limit, server. If recvr is nil we would expect err to be
the string describing that error. If err is "timeout" then we exit our server
task which should exit the application. If we don't have an err then we loop over
all the recvrs and check to see if they are our server, if so we call
handle_accept if not then we call handle_recv. Each of our helpers will update
the clients map to ensure that we service all of the client requests before exiting.
The last change we've made is to spawn_client which previously would loop forever,
it now loops 10 times before exiting and closing the client.
If we were to run this you would see each of the tasks spawn in a random order and
the first 5 of those would begin sending their "ping" messages. Once 1 of them
completes, we would accept the next connection but not before that point which means
we have limited our total number of connected clients to 5!
Integrating with Cosock
So far we have covered what cosock provides but what if we want to integrate our own libraries directly into cosock, what would that look like?
To start the general interface for a "cosock aware" lua table is to define a method setwaker
which takes 2 arguments, kind: str and waker: fun()|nil. The general idea here is
that a "waker" function can be provided that will get called when that task is ready
to be woken again.
Let's try and build an example Timer that will define this setwaker method to make
it "cosock aware"
local cosock = require "cosock"
local Timer = {}
Timer.__index = Timer
function Timer.new(secs)
return setmetatable({
secs = secs,
waker = nil,
}, Timer)
end
function Timer:wait()
coroutine.yield({self}, {}, self.secs)
end
function Timer:setwaker(kind, waker)
print("setwaker", kind, waker)
if waker then
self.waker = function()
print("waking up!")
waker()
end
else
self.waker = nil
end
end
cosock.spawn(function()
local t = Timer.new(2)
print("waiting")
t:wait()
print("waited")
end)
cosock.run()
To start we create a lua meta-table Timer, which has the properties secs: number and
waker: fun()|nil. There is a constructor Timer.new(secs) which takes the number of
seconds we want to wait for. Finally, we define Timer:wait which is where our magic happens.
This method calls coroutine.yield, with 3 arguments {self}, an empty table, and self.secs.
These arguments match exactly what would be passed to socket.select, the first is a list of any
receivers, the second is a list of any senders and finally the timeout. Since we pass {self} as the
first argument that means we are treating Timer as a receiver. Ultimately what we are doing here
is asking cosock to call socket.select({self}, {}, self.secs). While we don't end up calling self.waker
ourselves, cosock uses setwaker to register tasks to be resumed so we need to conform to that. Just to
illustrate that is happening, a print statement has been added to setwaker, if we run this
we would see something like the following.
waiting
setwaker recvr function: 0x5645e6410770
setwaker recvr nil
waited
We can see that cosock calls setwaker once with a function and a second time with nil. Notice though
that self.waker never actually gets called, since we don't see a "waking up" message. That
is because we don't really need to be woken up, our timer yields the whole coroutine until
we have waited for self.secs, nothing can interrupt that. Let's extend our Timer to have a reason
to call self.waker, we can do that by adding the ability to cancel a Timer.
local cosock = require "cosock"
local Timer = {}
Timer.__index = Timer
function Timer.new(secs)
return setmetatable({
secs = secs,
waker = nil,
}, Timer)
end
function Timer:wait()
local r, s, err = coroutine.yield({self}, {}, self.secs)
if err == "timeout" then
return 1
end
return nil, "cancelled"
end
function Timer:setwaker(kind, waker)
print("setwaker", kind, waker)
if waker then
self.waker = function()
print("waking up!")
waker()
end
else
self.waker = nil
end
end
function Timer:cancel()
if self.waker then
self.waker()
end
end
cosock.spawn(function()
local t = Timer.new(10)
cosock.spawn(function()
cosock.socket.sleep(3)
t:cancel()
end)
print("waiting")
local s = os.time()
local success, err = t:wait()
local e = os.time()
print("waited", os.difftime(e, s), success, err)
end)
cosock.run()
In this example, we create our timer that will wait 10 seconds but before we call wait we
spawn a new task that will sleep for 3 seconds and then call cancel. If we look over the
changes made to wait we can see that we still call coroutine.yield({self}, {}, self.secs)
but this time we are assigning its result to r, s, err. Cosock calls coroutine.resume
with the same return values we would get from select, that is a list of ready receivers,
a list of ready senders, and an optional error string. If the timer expires, we would expect
to get back nil, nil, "timeout", if someone calls the waker before our timer expires
we would expect to get back {self}, {}, nil. This means we can treat any err == "timeout"
as a normal timer expiration but if err ~= "timeout" then we can safely assume our timer was canceled.
If we were to run this code we would see something like the following.
waiting
setwaker recvr function: 0x556d39beb6d0
waking up!
setwaker recvr nil
setwaker recvr nil
waited 3.0 nil cancelled
Notice we only slept for 3 seconds instead of 10, and wait returned nil, "cancelled"!
One thing we can take away from this new example is that the waker API is designed to allow
one coroutine to signal cosock that another coroutine is ready to wake up. With that in mind,
let's try and build something a little more useful, a version of the
cosock.channel api that allows for a maximum queue size. Looking over the
existing channels,
to implement this we are going to need to have 3 parts. A shared table for queueing and
setting the appropriate wakers, a receiver table and a sender table. Let's start by
defining the shared table.
Bounded Channel
To start we are going to define a Lua table that will represent our shared bounded queue.
local BoundedChannel = {}
BoundedChannel.__index = BoundedChannel
This table should have the following properties
_wakers: This is a table with 2 keyssendr: This is a map of potential functions where the keys are a table representing the waiting senderrecvr: An optional function that takes no arguments, this will wake our receiver
_max_depth: This is the integer value that our queue should not grow larger than_msg_queue: This is the message queue we will use to hold pending messages_closed: This is a boolean to indicate if we have been explicitly closed
To make our lives easier, next we will add a couple of methods that will enforce the
queue nature of our _msg_queue, one for removing the oldest message and one for
adding a new message.
function BoundedChannel:pop_front()
return table.remove(self._msg_queue, 1)
end
--- Add a new element to the back of this channel
function BoundedChannel:push_back(ele)
table.insert(self._msg_queue, ele)
end
Next, let's add a method for testing if we can send a new message.
function BoundedChannel:can_send()
if self._closed then
-- Check first that we are not closed, if we are
-- return an error message
return nil, "closed"
end
if #self._msg_queue >= self._max_depth then
-- Check next if our queue is full, if it is
-- return an error message
return nil, "full"
end
-- The queue is not full and we are not closed, return 1
return 1
end
This method first checks that we haven't been closed, if we have then it returns
nil, "closed", if we are still open it next checks to see if we have any space in
our queue, if not it returns nil, "full". So if we are not closed and not full
it returns 1.
Now we should create a similar method for checking if we can receive a new message.
function BoundedChannel:can_recv()
if self._closed and #self._msg_queue == 0 then
-- Check first that we haven't closed, if so
-- return an error message
return nil, "closed"
end
if #self._msg_queue == 0 then
-- Check next that we have at least 1 message,
-- if not, return an error message
return nil, "empty"
end
-- We are not closed and we have at least 1 pending message, return 1
return 1
end
Again, we being by checking for the closed case, returning nil, "closed", then we
check to see if the queue is empty, if so we return nil, "empty" if there is
at least 1 message and we aren't closed then we return 1.
Now we can define a few methods for interacting with our self._wakers property.
First up is the set_waker_recvr method.
function BoundedChannel:set_waker_recvr(waker)
self._wakers.recvr = waker
if waker and self:can_recv() then
-- Check if receiving is currently available, if
-- so call the waker to wake up the yielded receiver
waker()
end
end
Ok, so this one is pretty simple now that we have our helpers. First, we populate the
value in self._wakers.recvr. If waker is not nil and self:can_recv returns 1 we want to
immediately call the waker because it means we are ready to be woken up.
For our next method, we are going to define set_waker_sendr.
function BoundedChannel:set_waker_sendr(sendr, waker)
self._wakers.sendr[sendr] = waker
end
This looks quite a bit different from set_waker_recvr! First of all, we have an extra argument
sendr which will represent a unique call to coroutine.yield and is how we can allow for
multiple senders. The second thing to notice is that we are not checking to see if self:can_send
this is because we don't know if another waker has already been woken up for the current state.
This is all a bit hand-wavy right now but when we implement the sender things should become clear.
Now that we can set a waker, it is time to add a method for calling those waker functions.
function BoundedChannel:try_wake(kind)
local waker
if kind == "sendr" then
_, waker = next(self._wakers.sendr)
else
waker = self._wakers.recvr
end
if type(waker) == "function" then
waker()
end
end
Our new try_wake method takes 1 argument, which will either be the string "sendr" or "recvr".
If we are trying to wake a "sendr" then we use
the next function to find 1 entry in the
table self._wakers.sendr, if we have at least 1 entry in that table we assign the value to waker.
If we are trying to wake a "recvr" we assign self._wakers.recvr to waker. If waker is
a function (aka not nil) then we call that function.
function BoundedChannel:close()
self._closed = true
end
close will just set our _closed property
to true
Ok, now that we have our shared channel defined, let's implement our receiver.
Bounded Recvr
Next, we will define a Lua table that will represent the receiving half of our channel. This
table will have 2 properties _link which will be our BoundedChannel and a timeout which will
be an optional number.
local BoundedChannelReceiver = {}
BoundedChannelReceiver.__index = BoundedChannelReceiver
Ok, let's create a constructor for it, this will take 1 argument which should populate
its _link property.
function BoundedChannelReceiver.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelReceiver)
end
Notice we didn't include a timeout property at all, this is because
we want it to be nil by default. In order to match the same API that
cosock.channel uses let's add a method for setting our timeout.
function BoundedChannelReceiver:settimeout(timeout)
self.timeout = timeout
end
Next, we want to define the setwaker method used by this side of the queue.
function BoundedChannelReceiver:setwaker(kind, waker)
if kind ~= "recvr" then
error("Unsupported wake kind for receiver: " .. tostring(kind))
end
self._link:set_waker_recvr(waker)
end
In this method, we have added a check to make sure it isn't getting called
with a kind of "sendr" since that would be pretty much meaningless. If
we have a valid kind then we pass the waker down to self._link:set_waker_recvr.
Ok, now for the good stuff: the receive method.
function BoundedChannelReceiver:receive()
while true do
local can_recv, err = self._link:can_recv()
if can_recv then
local element = self._link:pop_front()
self._link:try_wake("sendr")
return element
end
if err == "closed" then
return nil, err
end
if err == "empty" then
local _r, _s, err = coroutine.yield({self}, nil, self.timeout)
if err then
return nil, err
end
end
end
end
Alright, there is a lot going on here so let's unpack it. To start we have a long-running
loop that will only stop when we have reached either an error or a new message. Each iteration
of the loop first checks if self._link:can_recv(), if that returns 1, then we call
self._link:pop_front to capture our eventual return value, next we want
to alert any senders that more space has just been made on our queue so we call
self._link:try_wake("sendr"), finally we return the element we popped off ending our loop.
If can_recv returned nil we check to see which err was provided, if it was "closed"
we return that in the error position also ending our loop. If can_recv returns nil, "empty"
then we want to yield until either we get woken by a sender or we have waited for the duration
of self.timeout. We do this by calling coroutine.yield({self}, nil, self.timeout), this
will give up control to cosock until either someone calls BoundedChannel:try_wake("recvr") or our
timeout is reached.
If we recall that coroutine.yield returns a list of ready receivers and a list of senders
or nil, nil and an error message. This means if coroutine.yield returns {self} then
we have a new message so we go to the top of the loop and the next call to self.link:can_recv
should return 1 or nil, "closed". If coroutine.yield returns nil, nil, "timeout" that
means we have yielded for self.timeout.
One final thing we want to make sure is that we are keeping our end of that nil "closed"
bargain, so let's define a closed method.
function BoundedChannelReceiver:close()
self._link:close()
--- If anything is waiting to send, it should wake up with an error
self._link:try_wake("sendr")
end
For this, we first call self.link:close which will set our shared table's _closed property
to true which will ultimately make both can_send and can_recv return nil, "closed". Next,
we want to wake up any sending tasks since they are reliant on us to tell them something has
changed, so we call self._link:try_wake("sendr").
With that we have the complete receiver side of our channel, now let's write up the sender.
Bounded Channel: Sendr
To start, we define our Lua table which represents the sending half of our channel.
local BoundedChannelSender = {}
BoundedChannelSender.__index = BoundedChannelSender
This table will have the same shape as our BoundedChannelReceiver, it will have a
_link property and a timeout property; our constructor is nearly identical.
function BoundedChannelSender.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelSender)
end
We will also define a settimeout method that looks exactly the same.
function BoundedChannelSender:settimeout(timeout)
self.timeout = timeout
end
Our close method is also very close to the receiver's implementation.
function BoundedChannelSender:close()
self._link:close()
-- If anything is waiting to receieve it should wake up with an error
self._link:try_wake("recvr")
end
Here, just the argument to self._link:try_wake changed from "sendr" to "recvr".
Now we start to see things become quite different. The first thing to note is that we are
not going to define a setwaker method for this table. This may seem strange since it is
one of the few things that we need to do to make a "cosock aware" table but if we were
to use the same setwaker for all of the places that we call BoundedSender:send, we would
end up gumming up the internal workings of cosock. To see how we get around this it would be
good to go over the implementation of send.
function BoundedChannelSender:send(msg)
while true do
local can_send, err = self._link:can_send()
if can_send then
self._link:push_back(msg)
-- Wake any receivers wo might be waiting to receive
self._link:try_wake("recvr")
return 1
end
if err == "closed" then
return nil, "closed"
end
if err == "full" then
local wake_t = {
setwaker = function(t, kind, waker)
assert(kind == "sendr")
self._link:set_waker_sendr(t, waker)
end
}
local _r, _s, err = coroutine.yield(nil, {wake_t}, self.timeout)
if err then
return nil, err
end
end
end
end
To start, things look a lot like our BoundedRecvr:receive implementation, we have
a long-running loop that first calls self._link:can_send, if that returns 1 we
use the push_back helper to add the message to our queue and then we try to wake up
any yielded "recvr"s, returning 1 to indicate it was successful. If can_send returned
an error message and that message was "closed" we return nil, "closed". If can_send returned
the error message "full" we want to yield until we can try again.
To prepare for yielding, we first create a table called wake_t this will represent a single
call to BoundedSender:send that is yielding. On wake_t we set 1 property and that is the
setwaker method which uses assert to raise an error if it was called with a kind
of "recvr" and then uses the BoundedChannel:set_waker_sendr method to associate the
waker argument with wake_t. By creating this temporary table, what we are doing is allowing
for a unique waker function to be defined on any threads that need waking. If we were to use a
single BoundedChannel._wakers.sendr function, we would end up removing the ability
to wake any yields beyond the last because calling waker always calls setwaker(kind, nil) to
avoid potential "double wakes".
Now that we have set up our wake_t we can call couroutine.yield this time we are going
to use the arguments nil, {wake_t}, self.timeout. Since we put wake_t in the sendt
argument, we will wait until we either reach the duration of self.timeout or when someone
calls BoundedChannel:try_wake("sendr") and our wake_t is returned from next.
This is probably the easiest way to create these unique
wakers but it does come with a potential issue. If you are interested in using this implementation please review Appendix-A
Now, let's finish up our implementation and see if we can see our bounded channel working.
Bounded Channel: Finish
At this point, we have most of our BoundedChannel and all of our BoundedChannelReceiver
and BoundedChannelSender set up so the last thing we need to do is add a constructor
to BoundedChannel.
function BoundedChannel.new(max_depth)
local link = setmetatable({
_max_depth = max_depth,
_wakers = {
sendr = {},
},
_msg_queue = {},
_closed = false,
}, BoundedChannel)
return BoundedChannelSender.new(link), BoundedChannelReceiver.new(link)
end
This constructor takes 1 argument, the number telling us how large our queue can get.
This returns 2 tables, the first return is a BoundedChannelSender and the second
return is a BoundedChannelReceiver both have the same shared BoundedChannel as
their _link property.
Now let's see our new channel in action!
local cosock = require "cosock"
local BoundedChannel = require "examples.bounded_channel"
local tx, rx = BoundedChannel.new(2)
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=1,10 do
tx:send(i)
local e = cosock.socket.gettime()
print(string.format("sent %s in %.1fs", i, e - s))
s = e
cosock.socket.sleep(0.2)
end
end, "sendr1")
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=11,20 do
tx:send(i)
local e = cosock.socket.gettime()
print(string.format("sent %s in %.1fs", i, e - s))
s = e
cosock.socket.sleep(0.2)
end
end, "sendr2")
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=1,20 do
local msg = rx:receive()
local e = cosock.socket.gettime()
print(string.format("recd %s in %.1fs", msg, e - s))
s = e
cosock.socket.sleep(1)
end
end, "recvr")
cosock.run()
After we import both cosock and our BoundedChannel we create a new channel pair
with a maximum queue size of 2. We then spawn 2 new tasks for the sender, in these tasks
we loop 10 times, sending a message and then sleeping for 0.2 seconds.
We have a call to cosock.socket.gettime here before and after the send to see if there is any
delay.
Next, we spawn a task for our receiver, this receives a message and then sleeps for 1 second 20 times.
Since we are sending a lot faster than we are receiving, we would expect that after the first few messages we should see the amount of time it takes to send a message hits about 1 second indicating that our queue has reached its maximum of 2. If we were to run this we should see something like the following.
sent 1 in 0.0s
sent 11 in 0.0s
recd 1 in 0.0s
sent 2 in 0.2s
recd 11 in 1.0s
sent 12 in 1.0s
recd 2 in 1.0s
sent 13 in 1.0s
recd 12 in 1.0s
sent 14 in 1.0s
recd 13 in 1.0s
sent 15 in 1.0s
recd 14 in 1.0s
sent 16 in 1.0s
recd 15 in 1.0s
sent 17 in 1.0s
recd 16 in 1.0s
sent 18 in 1.0s
recd 17 in 1.0s
sent 19 in 1.0s
recd 18 in 1.0s
sent 20 in 1.0s
recd 19 in 1.0s
sent 3 in 9.8s
recd 20 in 1.0s
sent 4 in 1.0s
recd 3 in 1.0s
sent 5 in 1.0s
recd 4 in 1.0s
sent 6 in 1.0s
recd 5 in 1.0s
sent 7 in 1.0s
recd 6 in 1.0s
sent 8 in 1.0s
recd 7 in 1.0s
sent 9 in 1.0s
recd 8 in 1.0s
sent 10 in 1.0s
recd 9 in 1.0s
recd 10 in 1.0s
From this output, we can determine the exact order things played out. First,
we can see that sendr1 is able to push 1 onto the queue, then it sleeps for 0.2
seconds which allows sendr2 to push 11 onto the queue which also sleeps for 0.2
seconds then recvr pops 1 off the queue followed by a 1-second sleep.
For the first time at this point, we have 3 sleeping tasks. Since sendr1 went first
it will be the first to wake from its sleep, pushes 2 onto the queue and
then goes back to sleep which allows sendr2 to wake up and try to send 12 but the queue is
full ({11, 2}), so it has to wait until recvr wakes up to pop 11 off the queue. Once recvr
pops 11 off the queue, we see that sendr2 is able to push 12 but it took 1 full second
to do so! Now we will stay in a state where both sendr1 and sendr2 are waiting to send
for ~1 second until recvr is able to receive at which point either sendr1 or sendr2
again pushes a new value. Once we reach the last 2 values, we see that our sendrs go quiet
because they are all done with their work but recvr still takes another 2 seconds to complete.
Did you notice that
sendr2gets to go far more often thansendr1at this start and it takessendr19.8 seconds to send3? This is because of our waker scheme and Appendix A has more on that
Appendix A: Bounded Channel Limits
In our BoundedChannelSender:send, BoundedChannel:set_waker_sendr and
BoundedChannel:try_wake we have made some decisions that make our implementation a little easier
to read/write but might cause some problems if we end up depending on it.
To review, any time BoundedChannelSender:send would yield we create a temporary table
to handle our setwaker calls and in BoundedChannel:set_waker_sendr
we use that table as the key to _wakers.sendr to store/remove the waker function.
In BoundedChannel:try_wake we use the next function to choose which entry of _wakers.sendr to
call. The next function will always return the "first" key/value pair but what does that mean,
how can Lua tables be ordered? When a table is created, it is assigned a memory address, unique
to that table, we can see what the address is by using the default __tostring metamethod.
lua -e "print({})"
table: 0x5581bc0236a0
In the above, we can see the memory address of an empty table is 0x5581bc0236a0, which will
change if we were to run it again. If we were to use this table as the key in another table that
table would look something like this.
{
[0x5581bc0236a0] = "first element"
}
So, let's look at how Lua might order a table like this with more than one key.
local tables = {}
for i=1, 10 do
tables[{}] = i
end
for i=1, 10 do
local t, v = next(tables)
local tp = tonumber(tostring(t):match("table: 0x(.+)"), 16)
print(tp, v)
tables[t] = nil
end
This example will create a table tables then loop 10 times assigning tables[{}] with i.
In a second loop to 10, we use the next function to pull the "first" key/value pair from
our tables. We then convert t by converting it into a hex string and then converting it
back into a number, which may be easier to read for some than trying to tell which hex value
is larger than another. It prints out the table representation and which i was assigned to it
then removes it from tables by assigning tables[t] with nil. If we run this once we might
see something like.
93877018273488 1
93877018274240 9
93877018274016 7
93877018273792 5
93877018273616 3
93877018274352 10
93877018274128 8
93877018273904 6
93877018273680 4
93877018273552 2
At first, it looks like it might go in order because we get 1 but then we
see the second return from next is 9. If we run it again we might see something like:
93837605766864 2
93837605767120 6
93837605767376 10
93837605766928 3
93837605767184 7
93837605766992 4
93837605767248 8
93837605766800 1
93837605767056 5
93837605767312 9
This time, we get 2 first and 9 last which means that we can expect the order to be somewhat
random. We can also see pretty obviously that Lua has ordered the keys by the lowest memory address
first. That means that next will return the waker associated to the wake_t that has
the lowest memory address. So what happens if one coroutine always gets the lowest value?
It could starve other coroutines from being able to send.
This might not be all bad though, randomness can be good since we don't want to show a preference and randomness does just that but that would require something like "a normalized distribution" which would mean it would take a very long time to see the same order. Let's see how random our temporary table keys are.
local m = {
sets = {}
}
local table_size = 9
function m:add_set(set)
for idx, existing in ipairs(self.sets) do
local exact_match = false
local left_t, left_v, right_t, right_v
for i=1,table_size do
left_t, left_v = next(set, left_t)
right_t, right_v = next(existing, right_t)
exact_match = left_v == right_v
if not exact_match then
break
end
end
if exact_match then
table.insert(self.sets, set)
return {
first = existing,
first_idx = idx,
second = set,
second_idx = #self.sets
}
end
end
table.insert(self.sets, set)
end
local function gen_set()
local tables = {}
for i=1, table_size do
tables[{}] = i
end
return tables
end
local result
repeat
result = m:add_set(gen_set())
until result
print("RESULT")
print(result.first_idx, result.second_idx)
print("f,s")
for i=1,table_size do
local t1, v1 = next(result.first)
local t2, v2 = next(result.second)
print(string.format("%s,%s", v1, v2))
result.first[t1] = nil
result.second[t2] = nil
end
Here we have extended our example to allow for repeating the creation of tables and then checking
to see if the new version matches any of the previous versions. We reduced the number of entries
to 9 to make it easier to read the results but otherwise, get_set will create the same table
as our original example. We have defined a table to hold all of our sets named m and defined a
method there add_set which will either return nil if the set argument isn't already in the
list or a results table if it was found. So what happens if we run this?
RESULT
1 4
f,s
4,4
8,8
1,1
5,5
9,9
2,2
6,6
3,3
7,7
It looks like it only took us 3 sets to find the exact same order. Considering that 0-9 have a potential number of combinations greater than 300,000 it seems that our distribution not very normal.
Advanced
In the following chapters, we will discuss in detail the inner workings of cosock. The subject
matter is going to shift focus from "how do I use cosock" to "how does cosock work" which may
not be of interest to everyone.
Internals
This section will go over the contents of the cosock.socket.internals module and how that
interacts with the cosock runtime and the underlying Luasocket library.
Run
This is a step-by-step explanation of what happens in each pass through the main loop
of cosock.run.
Internals
The module cosock.socket.internals is where luasocket gets wrapped into a "cosock aware"
table. Initially, a call to passthroughbuilder is used to create a "builder" function.
passthroughbuilder takes 2 arguments, recvmethods and sendmethods which are both
a table where the keys are a method name and the values are a set-like table of
error messages that it would be appropriate to yield for. A good example of one of these is
the tcp module's recvmethods.
local recvmethods = {
receive = {timeout = true},
accept = {timeout = true},
}
In both of the methods defined here, if we were to get the return value of nil, "timeout"
would be a signal to call coroutine.yield and try again. The return value of
passthroughbuilder is a function that we will call builder. builder takes 2 arguments
method which is a string and an optional transformsrc which is a table,
or a function that returns a table, with the following properties.
input: this is an optional function that takes the method inputs and returns those inputs potentially transformed.- This is only called once, just before we call the luasocket method for the first time
blocked: this is an optional function that takes the return values from the method called and returns the input arguments to the next call of the same methodoutput: This is an optional function that will be called with the return values of the method- This is only called once, just before we return from the method
Let's use a few examples to go over each of these starting with the input property.
The method receive on a luasocket takes 2 arguments, a pattern string or number indicating
how many bytes to try and read for and an optional prefix to put at the front of what was
received.
local socket = require "luasocket"
local t = socket.tcp()
t:connect("0.0.0.0", 8000)
print(t:receive("*l", "next line:"))
Assuming that some server is listening on port 8080 of the machine we run this on, we would
receive 1 line, for example, "ping\n" this would print "next line: ping". As we will get
into later, a call to cosock's receive may end up calling luasocket's receive
until we get to a new line character. So what if our server sent 1 byte at a time? We would end
up printing "next line: pnext line: inext line: nnext line: g" if we passed the second argument
to the underlying luasocket. To avoid this we can use the input property to store this pattern
and only add it once to the eventual return value.
Now let's consider the blocked property, continuing to use receive as our example method,
what happens if we call t:receive(10) and again our server returns 1 byte at at time?
We can't call the underlying luasocket method with 10 over and over, that would result in us
requesting too many bytes from the socket. Instead, we need a way to capture the partial value
we received and reduce the number of bytes accordingly. Thankfully luasocket returns
any partial data on error as the 3rd return argument so we could do something like
{
blocked = function(success, err, partial)
table.insert(shared_buffer, partial)
remaining_recv = remaining_recv - #partial
return remaining_recv
end
}
This example assumes that shared_buffer and remaining_recv exist somewhere but
you can see that we are appropriately reducing the number of bytes we return here. This
will eventually be the argument provided to the next call to the luasocket method.
Here is a longer-form example of how a response of 1 byte at a time would look for our
luasocket.
local shared_buffer = {}
local remaining_recv = 5
--p
local _, err, chunk = t:receive(remaining_recv)
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--i
err, chunk = t:receive(remaining_recv) --i
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--n
err, chunk = t:receive(remaining_recv) --n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--g
err, chunk = t:receive(remaining_recv) --g
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--\n
err, chunk = t:receive(remaining_recv) --\n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
Finally we have the output property which gets called with the last
return values from our method. If we complete our example, this is where
we would end up calling table.concat(shared_buffer) to add all
the chunks together before returning.
Continuing to use receive as an example, this is what the transform
argument might look like.
local recvmethods = {
receive = {timeout = true}
}
local sendmethods = {}
-- First we define a builder injecting the method<->error message maps
local builder = passthroughbuilder(recvmethods, sendmethods)
-- Now we can use the builder to define a method that doesn't do any
-- transformations
m.bind = builder("bind")
-- Here we define a method that performs some transformations
m.receive = builder("receive", function()
local shared_buffer = {}
local remaining_recv
local pattern
return {
input = function(pat, prefix)
-- insert the prefix as the first part of our return
-- value if present
if prefix then
table.insert(shared_buffer, prefix)
end
if type(pat) == "number" then
-- we know how many bytes to wait for, set this
-- for use in blocked
remaining_recv = pat
else
-- store this for use in blocked
pattern = pat
end
-- return only pattern to avoid duplicate prefixes
return pattern
end,
blocked = function(_, err, partial)
if type(partial) == "string" and #partial > 0 then
table.insert(shared_buffer, partial)
-- only reduce remaining_recv if it is a number
if remaining_recv then
remaining_recv = remaining_recv - #full
-- returning the updated remaining receive
return remaining_recv
end
-- otherwise we return the pattern provided to input
return pattern
end
end,
output = function(full, err, partial)
-- if the first return is a string with a length > 0 then
-- add it to the buffer
if type(full) == "string" and #full > 0 then
table.insert(shared_buffer, full)
end
-- if the third return is a string with a length > 0 then
-- add it to the buffer
if type(partial) == "string" and #partial > 0 then
table.insert(shared_buffer, partial)
end
-- concatenate all the strings together
local all = table.concat(shared_buffer)
if err then
-- if ther was an error it should go in the 3rd return
-- position
return nil, err, all
else
-- if not error then it should go in the 1st return
-- position
return all
end
end
}
end)
With the arguments defined, we can now discuss the return value of builder
which will be a third function, this one being
the method's implementation, its first argument is self and varargs
are used to allow for any additional arguments.
Let's pause here and go over this because 3 levels of functions can be a bit difficult to follow. Our goal here is to re-use as much as possible for each of the methods on a cosock socket and since yield -> retry loop is going to be a common pattern we can define all of that in 1 place. The key is that these methods are going to need to know about a few extra pieces which is achieved by the fact that each function's arguments are available to the returned function.
Which means that the receive method would have the following environment.
local recvmethods = {
receive = { timeout = true }
}
local sendmethods = {}
local method = "receive"
local transformsrc = function() --[[see above]] end
Now let's go over what actually happens in this shared method implementation.
First, we capture all of the varargs into a table named inputparams,
if the transform object had an input property defined, we then overwrite
the variable with {input(table.unpack(inputparams))}. Now that we have our
inputs the way they need to be we begin a long-running repeat/until
loop.
At the top of the loop we call self.inner_sock[method], inner_sock is
the property name for the luasocket on all of the cosock sockets. If the
first return from that function is nil we check to see if the second
return value can be found in receivemethods or sendmethods,
if so we know that we need to yield, so we check if blocked
is defined and call that if it is, again overwriting inputparams
with the return value.
Now we determine what kind of yield we are going to do, if the second
return was found in receivemethods it would be "recvr" if it was
found in sendmethods it would be "sendr". Now we set up our arguments
for coroutine.yield putting self into recvt if our kind is "recvr"
or into sendt if our kind is "sendr". Now we can call coroutine.yield(sendt, recvt, self.timeout) assigning the returns there to recvr, sendr, rterr. If rterr is not nil, we are going to return early, if its
value matches the error from our method call (i.e. "timeout" for both) then
we return the values from our call to that method.
The last thing we do in this case before heading back to the top of the loop
is to assert our kind and the result of cosock.socket.select match, meaning we have
a kind of "sendr", the sendr variable is populated, and the recvr variable is unpopulated;
or vice versa.
If the first return argument to our method call was not nil then we
can exit early transforming the return value with output if that is
populated.
The only other function provided by cosock.socket.internals is
setuprealsocketwaker which completes the wrapping of our cosock socket.
This function takes the socket table and an optional list of kinds, if
kinds is not provided then the default will be both sendr and recvr.
We then define a method on socket called setwaker which is used
by cosock to wake up sleeping coroutines
(see the integrating chapter for more info).
This setwaker will assign the provided waker function to
a self.wakers table based on the kind of waker. It also defines
a method _wake which takes an argument kind and varargs for any
additional arguments. This method will see if self.wakers[kind] is
not nil and if so call that with the varargs. It then replaces
self.wakers[kind] with nil.
Advanced Overview of cosock.run
Global Variables
Cosock utilized a few global variables to allow for the potentially recursive nature of lua coroutines.
threads: List of all coroutines cosock is aware of- This is populated by the first argument to
cosock.spawn
- This is populated by the first argument to
threadnames: A map of coroutine<->name pairs- This is populated by the second argument ot
cosock.spawn
- This is populated by the second argument ot
threadswaitingfor: A map of coroutine<->select args- Select args have the type
{recvr: table<cosock.socket>, sendr: table<cosock.socket>, timeout: float?} - This populated by the values provided to
coroutine.yieldfor cosock tasks from a call tocosock.socket.select
- Select args have the type
readythreads: A map of coroutine<->resume args that will be ready on the next pass- Resume args have the type
{recvr = table<cosock.socket>, sendr = table<cosock.socket>, err: string?} - This is populated by coroutine wake-ups that occur on the current pass
- Resume args have the type
socketwrappermap: A map of luasocket<->cosock socket pairs- This map is keyed with the table pointer for the luasocket for easily getting back to a cosock socket when you only have a luasocket
- This gets populated when a cosock socket is included in a select args table
threaderrorhandler: Potential error handler function. Not currently settable.timers: see Timers
Run Loop
To start the loop we define a few local variables. First up is wakethreads, This table will be
populated by removing all of the elements from readythreads, which frees up readythreads to
be added to by any tasks we are about to wake up. Next are the two list tables sendt and recvt
along with the optional integer timeout, these will end up being passed to luasocket's socket.select
when we run out of ready threads. Now we can move all our readythreads into wakethreads and then
loop over all of the ready threads.
For each ready thread, we first check if the coroutine.status for it is "suspended", if it isn't we
will skip this thread. For any "suspended" thread, we first cancel any timers that might be scheduled
for that thread by calling timers.cancel. Next we pull out the values we stored in threadswaitingfor
and call skt:setwaker with nil for any sendr or recvr properties, this will prevent any potential
"double wakes" from occurring.
Now we call coroutine.resume with our thread, and the recvr, sendr and err values that were stored
in wakethreads. When coroutine.resume completes, we have 2 pieces of information that drive our next path.
The first return value from coroutine.resume will indicate if our thread raised an error or not, the second
is that we call coroutine.status on our thread. If our thread's status is "dead" and coroutine.resume
returned false, something has gone terribly wrong so we raise an error (with a traceback if debug is available).
If our thread's status is "dead" and coroutine.resume returned true, we just remove our thread from threads
and threadswaitingfor. If our thread's status is "suspended" and coroutine.resume returned true then we first
update threadswaitingfor with the remaining return values from coroutine.resume. We also then call skt:setwaker
for any sendr and recvr in those values to a function that will clear itself and call the local function
wake_thread. At this point we also update our local tables recvt and sendt, to include the skt.inner_sock
values from threadswaitingfor which are the luasockets associated with a cosock.socket.
Now that we have resumed all of the wakethreads and filled in our eventual arguments to socket.select, we then
determine if we are still running, we do this by looping over all of our threads and checking that at least 1 has
a coroutine.status that is not "dead". If all threads are "dead" and nothing got added to readythreads then
we exit the run loop. Next we update our socketwrappermap by looping over all of the values in threadswaitingfor
and insert each recvr and sendr into the key of skt.inner_sock.
With all the bookkeeping done, we run all of the timers that have reached their deadline, and update our
local variable timeout to the duration until the shortest remaining timeout. If at least one thread was added to
readythreads, we set our timeout to 0, because we already know we have new work to do. At this point if timeout is
nil and both sendt and recvt are empty, we raise an error because we are about to call
socket.select({}, {}, nil) which would just block forever. At this point, we call socket.select capturing the
values in recvr, sendr and err. If err isn't the value "timeout", we raise that error. If err is nil
or "timeout", we loop over all of the values in recvr and sendr, looking up the cosock socket in
socketwrappermap and calling skt:_wake which would call the function we provided to setwaker above.
With that complete, we now have a fully updated readythreads and we start the loop again.
Outline Version
- Define
wakethreads - Define an empty list of senders (
sendt), receivers (recvt) and atimeout - Pop all
readythreadsentries into thewakethreads - Loop over all threads in
wakethreads- If
coroutine.statusfor that thread returns "suspended"- Clear any timers
- Clear any wakers registered with a
timeout coroutine.resumewith the storedrecv4,sendranderrarguments- If
coroutine.resumereturnedtruein the first position andcoroutine.statusreturns "suspended"- Re-populate
threadswaitingfor[thread]with the 3 other return values fromcoroutine.resume- These should be the
recvt,sendtandtimeoutvalues that will populate select args
- These should be the
- Set the waker for all sockets in
recvtandsendtto callwake_threadand then unset themselves - If
coroutine.resumereturned atimeout, create a new timer for this thread which will callwake_thread_erron expirations with the value "timeout"
- Re-populate
- if
coroutine.statusreturned "dead"- If
coroutine.resumereturnedfalsein the first position and nothreaderrorhandlerhas been set- Raise an error
- If the
debuglibrary is available, include adebug.tracebackand the second return value fromcosock.resume - Else just raise an error with the second return value from
cosock.resume
- If the
- Exit the application
- This calls
os.exit(-1)
- This calls
- Raise an error
- If
- Else, print a warning message if printing is turned on
- If
- Initialize a variable
runningtofalse - Loop over all
threads, callingcoroutine.statuson each, if at least 1 doesn't return "dead", setrunningtotrue - If
runningisfalseandreadythreadsis empty- Exit the run loop
- Loop over all the values in
threadswaitingfor- Insert the luasockets on any
sendrorrecvrparameters to the loop local variablessendtandrecvt - Populate
socketwrappermapwith anysendrorrecvrs
- Insert the luasockets on any
- Call
timers.run - If
readythreadsis not empty- Set
timeoutto0
- Set
- If
timeoutis falsy andrecvtis empty andsendtis empty- Raise an error that cosock.select was called with no sockets and no timeouts
- Call luasocket's
socket.selectwith our loopsrecvt,sendtandtimeout - If
socket.selectreturns a value in the 3rd position and that value is not"timeout"- Raise an error with that return value
- Loop over the
recvr(1st) return fromsocket.select- Look up the
cosock.socketfromsocketwrappermap - Call
skt:_wake("recvr")
- Look up the
- Loop over the
sendr(2nd) return fromsocket.select- Look up the
cosock.socketfromsocketwrappermap - Call
skt:_wake("sendr")
- Look up the
Timers
Internally, we keep a list of timer objects to determine when any thread would have reached the maximum time
it should be running/yielding for. We can interact with these through a module local variable timers which
has a few associated functions.
Inside of a do block, we create 2 local variables timeouts and refs for use in the timer associated functions.
The first associated function worth discussing is timers.set, which takes the arguments timeout: float,
callback: fun() and ref: table. When called, we first capture the current timestamp via socket.gettime(),
we then calculate the deadline for this timer by adding timeout to that timestamps into a variable timeoutat.
We then table.insert the the table { timeoutat = timeoutat, callback = callback, ref = ref } into timeouts.
If ref isn't nil we also populate refs[ref] with that same table.
Next up is timers.cancel which takes the arguments ref: table. When called, we first lookup the timeout info
from refs[ref], if we find something there we remove the values in the properties callback and ref and finally
we remove the value from refs. By removing the callback we avoid ever calling the consequence of that timer.
Eventually it will be removed from timeouts in the next call to run.
Finally we have timers.run this function takes no arguments. When called, it first sorts the timeouts table in
ascending order by timeoutat, where nil values are the smallest values. We then capture the current timestamp
by calling socket.gettime. Now we consult the first element of timeouts, if that table as a timeoutat of nil
or is less than now, we pop it off list, if it has a callback property we call that, if it has a ref property
we remove it from refs.
Now that all the pending timers are done, we use the new first element of timeouts' timeoutat property to calculate
the next relative timeout (timeoutat - now) and return that as the earliest timeout. If timeouts is empty, we return
nil.
Outline Version
- A timer has the shape
{timeoutat: float, callback: fun(), ref: table?}timers.set- Updates
timersto include that value. Also updates a private scoped table namedrefsrefsis a map of table pointer<->timer which is used for cancellation of a timer
- Updates
timers.cancel- If the provided table pointer is in
refs, remove thecallbackandrefproperties from that table - Set the table pointer key in
refstonil
- If the provided table pointer is in
timers.run- Sort all timeouts by deadline (earliest first)
- Pop the timer off the front of the
timerslist - If that
timer.timeoutatisnilor< socket.gettime()- Call
timer.callback - remove this
timerfromrefs
- Call
- If there are any more timeouts left, return how long before that timeout should expire
- If there are no more timeouts, return
nil