Cosock

Cosock is a coroutine runtime written in pure Lua and based on the popular luasocket library.

The goal of the project is to provide the same interfaces that luasocket provides but wrapped up in coroutines to allow for concurrent IO.

Note: these docs will use the term coroutine, task, and thread interchangeably to all mean a lua coroutine

For example, the following 2 lua programs use luasocket to define a tcp client and server.

--client.lua
local socket = require "socket"
local client = socket.tcp()
client:connect("0.0.0.0", 9999)
while true do
  print("sending ping")
  client:send("ping\n")
  local response = assert(client:receive())
  assert(response == "pong")
end
--server.lua
local socket = require "socket"
local server = socket.tcp()
server:bind("0.0.0.0", 9999)
server:listen()
print("listening", server:getsockname())
local client = server:accept()
while true do    
  local request = assert(client:receive())
  assert(request == "ping")
  print("sending pong")
  client:send("pong\n")
end

If you were to run lua ./server.lua first and then run lua ./client.lua you should see each terminal print out their "sending ..." messages forever.

Using cosock, we can actually write the same thing as a single application.

-- client_server.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
--- Since the client and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()

--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
  server:bind(ip, 0)
  local _ip, p = server:getsockname()
  port_tx:send(p)
  server:listen()
  local client = server:accept()
  while true do
    local request = assert(client:receive())
    print(string.format("received %q", request))
    assert(request == "ping")
    print("sending pong")
    client:send("pong\n")
  end
end, "server task")

--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
  --- wait for the server to be ready.
  local port = assert(port_rx:receive())
  local client = socket.tcp()
  client:connect(ip, port)
  while true do    
    print("sending ping")
    client:send("ping\n")
    local request = assert(client:receive())
    assert(request == "pong")
  end
end, "client task")

--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()

Now if we run this with lua ./client_server.lua we should see the messages alternate.

Notice that we called cosock.spawn twice, once for the server task and once for the client task, we are going to dig into that next. We also added a call to cosock.run at the bottom of our example, this function will run our tasks until there is no more work to do so it is important you don't forget it or nothing will happen.

Getting Started

The easiest way to use cosock is to install it with luarocks.

luarocks install cosock

cosock depends on both luasocket and luasec, when running the above command luarocks will attempt to compile both of these libraries which have some system dependencies.

Luasocket

The version of luasocket we are using requires that the Lua development package is available.

Linux

For Debian-based systems, you would need to run the following

sudo apt-get install liblua-dev

For Fedora

sudo dnf install lua-devel

Windows

Help Wanted: please open a PR with info here if you have successfully got this working on windows.

MacOS

These can be downloaded for MacOS via brew

brew install lua

Luasec

Luasec depends on Openssl so you will need those development libraries

Linux

For Debian-based systems, you would need to run the following

sudo apt-get install libssl-dev

For Fedora

sudo dnf install openssl-devel

Windows

Help Wanted: please open a PR with info here if you have successfully got this working on windows.

MacOS

These can be downloaded for MacOS via brew

brew install openssl

Spawn

At the core of cosock is the ability to wrap any operation in a coroutine and register that with cosock. For this cosock exports the function cosock.spawn. This function takes 2 arguments, the first is a function that will be our coroutine, and the second is a name for that coroutine.

For example, this is a simple program that will spawn a single coroutine, which will print the current timestamp and the word "tick" and then sleep for 1 second in a loop forever.

--basic_spawn.lua
local cosock = require "cosock"

cosock.spawn(function()
  while true do
    print(cosock.socket.gettime(), "tick")
    cosock.socket.sleep(1)
  end
end, "clock")
cosock.run()

The act of calling cosock.spawn allows us to use the non-blocking cosock.socket.sleep function. This means we could extend our application to not only print this message every second but use the time this coroutine is sleeping to perform some other work. Let's extend our little example a bit.

--less_basic_spawn.lua
local cosock = require "cosock"

local function tick_task()
  while true do
    print(cosock.socket.gettime(), "tick")
    cosock.socket.sleep(2)
  end
end

local function tock_task()
  cosock.socket.sleep(1)
  while true do
    print(cosock.socket.gettime(), "tock")
    cosock.socket.sleep(2)
  end
end

cosock.spawn(tick_task, "tick-task")
cosock.spawn(tock_task, "tock-task")
cosock.run()

Very similar to our last example, this time we are spawning 2 coroutines one will print "tick" every two seconds the other will wait 1 second and then print "tock" every two seconds. This should result in a line getting printed to the terminal once a second alternating between our two strings. Notice though, there is a fair amount of code duplication as tick_task and tock_task are nearly identical. This is mostly driven by the fact that the first argument to cosock.spawn is a function that takes no arguments and returns no values which means we can't ask cosock to pass in any arguments. One way we can get around this is by using closures. So instead of passing a function to cosock.spawn we can return a function from another function and use it as the argument to cosock.spawn. For example:

--even_less_basic_spawn.lua
local cosock = require "cosock"

local function create_task(name, should_sleep_first)
  return function()
    if should_sleep_first then
      cosock.socket.sleep(1)
    end
    while true do
      print(cosock.socket.gettime(), name)
      cosock.socket.sleep(2)
    end
  end
end

cosock.spawn(create_task("tick", false), "tick-task")
cosock.spawn(create_task("tock", true), "tock-task")
cosock.run()

Notice here that create_task returns a function but takes a name argument and a should_sleep_first argument which are available to our returned function.

Now, let's consider our first example which may not look like it but is very similar to our tick/tock example.

Instead of using cosock.socket.sleep to tell cosock we are waiting around for something, it uses the receive method on a cosock.socket.tcp. Let's break down what is happening in that example.

To start, both tasks will be resumed which means that cosock has selected it to run, we can't say for sure which task will get resumed first which is why we used a cosock.channel to make the client task wait until the server was ready. Shortly after resuming, each task eventually calls some method that will yield which means that it is waiting on something so cosock can run another task. For the server, the first time we yield is in a call to accept, if the client hasn't already called connect we would end up blocking so instead of blocking, we let another task work, when we finally have a client connected cosock will wake us back up again. On the client-side we first yield on a call to channel:receive, if the server hasn't sent the port number we would end up blocking that task from calling bind so we let the other task work until we finally have a port number and then cosock will wake us back up.

This pattern continues, each task running exclusively until it needs to wait for something yielding control back to cosock. When the thing we were waiting for is ready, we can continue running again.

In both our tick/tock examples and our client/server example, we reach a point where cosock is just handing control from task 1 to task 2 and back again in an infinite loop. In a more real-world program, you might see any number of tasks, that need to be juggled. In our next example, we will extend the client/server example to handle any number of clients.

-- clients_server.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()

local number_of_clients = 10

--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()

--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
  server:bind(ip, 0)
  local _ip, p = server:getsockname()
  port_tx:send(p)
  server:listen()
  while true do
    local client = server:accept()
    cosock.spawn(function()
      while true do
        local request = assert(client:receive())
        print(string.format("received %q", request))
        if request:match("ping") then
          print("sending pong")
          client:send("pong\n")
        else
          client:close()
          break
        end
      end
    end)
  end
end, "server task")

--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
  print("spawn_client", id, port)
  local client = socket.tcp()
  client:connect(ip, port)
  while true do    
    print("sending ping", id)
    client:send(string.format("ping %s\n", id))
    local request = assert(client:receive())
    assert(request == "pong")
    socket.sleep(0.5)
  end
end

--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
  local port = assert(port_rx:receive())
  for i=1,number_of_clients do
    cosock.spawn(function()
      spawn_client(i, port)
    end, string.format("client-task-%s", i))
  end
end

--- Spawn a bunch of client tasks
cosock.spawn(function()
  spawn_clients()
end, "client task")

--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()

Surprisingly little has changed. First, we updated the socket task to call accept more than once and then pass the returned client into its own task to receive/send in a loop there.

For the client-side, we broke the client send/receive loop into its own task and added a parent task to wait for the port number and then cosock.spawn a bunch of client tasks.

If you were to run this example, you would see that the print statements end up in random order!

Channels

Coordinating coroutines can be a huge pain, to ease this pain cosock offers a synchronization primitive called channels. A cosock.channel is a "multiple producer single consumer" message queue, this means you can have one coroutine own the receiving half of the queue and pass the sender out to however many coroutines you'd like. We've already seen how they are used, in our first example, we used a cosock.channel to coordinate which port the client should connect to. What if we wanted to re-write that example without using a channel, that might look something like this:

--client_server_no_channel.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
local shared_port
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
  server:bind(ip, 0)
  local _ip, p = server:getsockname()
  shared_port = p
  server:listen()
  local client = server:accept()
  while true do
    local request = assert(client:receive())
    print(string.format("received %q", request))
    assert(request == "ping")
    print("sending pong")
    client:send("pong\n")
  end
end, "server task")

--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
  --- wait for the server to be ready.
  while shared_port == nil do
    socket.sleep(1)
  end
  local client = socket.tcp()
  client:connect(ip, shared_port)
  while true do    
    print("sending ping")
    client:send("ping\n")
    local request = assert(client:receive())
    assert(request == "pong")
  end
end, "client task")

--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()

That update removed our channel in favor of polling that shared_port ~= nil once every second. This will absolutely work, however, we have introduced a race condition. What if we sleep for 1 full second right before the server is issued its port? That second would be wasted, we could have already been creating our client. While the consequence in this example isn't dire, it does show that relying on shared mutable state without some way to synchronize it between tasks can be problematic.

As a note, we could also solve this problem by having the server task spawn the client task but that wouldn't be nearly as interesting to our current subject.

Another place we have seen that could benefit from a channel is our tick/tock example, in that example, we hard-coded the synchronization. The first task would print then sleep for 2 seconds and the second task would sleep for 1 second and then print and sleep for 2 seconds. Let's take a look at what that might have looked like if we had used channels.

--tick_tock_channels.lua
local cosock = require "cosock"

-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()

local function task(tx, rx, name)
  while true do
    -- First wait for the other task to tell us it is done
    rx:receive()
    -- print our name
    print(cosock.socket.gettime(), name)
    -- sleep for 1 second
    cosock.socket.sleep(1)
    -- tell the other task we are done
    tx:send()
  end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
  task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
  task(tick_tx, tock_rx, "tock")
end)
-- prime the tick task to start first
tick_tx:send()

cosock.run()

In this version, we only have one definition for a task, looping forever, it will first wait to receive the signal, and then it prints its name, sleeps for 1 second and then sends the signal on to the next task. The key to making this all work is that we need to kick the process off by telling one of the tasks to start. Since a channel allows for multiple senders, it is ok that we call send in more than one place.

Now say we wanted to extend our little clock emulator to print "clack" every 60 seconds to simulate the minute hand moving. That might look something like this:

--tick_tock_clack.lua
local cosock = require "cosock"

-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()
local clack_tx, clack_rx = cosock.channel.new()

local function task(tx, rx, name)
  while true do
    -- First wait for the other task to tell us the count
    local count =  rx:receive()
    -- print our name
    print(cosock.socket.gettime(), name)
    -- sleep for 1 second
    cosock.socket.sleep(1)
    -- tell the other task we are done
    if count >= 59 then
      clack_tx:send(0)
    else 
      tx:send(count + 1)
    end
  end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
  task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
  task(tick_tx, tock_rx, "tock")
end)
cosock.spawn(function()
  task(tick_tx, clack_rx, "clack")
end)
-- prime the tick task tp start first
tick_tx:send(1)

cosock.run()

Here we have updated the tasks to now share a counter across our channel. So at the start of each loop iteration, we first get the current count from our other task. We again print our name and then sleep for 1 second but now if the count is >= 59 we send the count of 0 to our "clack" task which will always then send a 1 to the "tick" task to start the whole process over again. Just to make sure it is clear, we can use the send half of the "tick" task's channel in 3 places, the main thread to "prime" the clock, the "tock" task and the "clack" task.

It is very important that we don't try and use the receiving half of a channel in more than one task, that would lead to potentially unexpected behavior. Let's look at an example of how that might go wrong.

-- bad_news_channels.lua
local cosock = require "cosock"

local tx, rx = cosock.channel.new()

cosock.spawn(function()
  rx:receive()
  print("task 1")
end)

cosock.spawn(function()
  rx:receive()
  print("task 2")
end)

tx:send()

cosock.run()

In this example, we create one channel pair and spawn two tasks which both call receive on our channel and just before run we call send. Since the choice for which task should run at which time is left entirely up to cosock, we can't say for sure which of these tasks will actually receive. It might print "task 1" or "task 2".

In actuality, cosock assumes that receive will only ever be called from the same coroutine. Calling receive in multiple coroutines will (eventually) raise in a error.

Select

Now that we have covered how to spawn and run coroutines using cosock, let's talk about how we could handle multiple IO sources in a single coroutine. For this kind of work, cosock provides cosock.socket.select, this function works in a very similar way to luasocket's socket.select, to call it would look something like local recvr, sendr, err = cosock.socket.select(recvt, sendt, timeout) its arguments are

  • recvt: This is a list of cosock sockets that are waiting to be ready to receive
  • sendt: This is a list of cosock sockets that are waiting to be ready to send
  • timeout: This is the maximum amount of seconds to wait for one or more entries in recvt or sendt to be ready
    • If this value is nil or negative it will treat the timeout as infinity

Note: The list entries for sendt and recvt can be other "cosock aware" tables like the lustre WebSocket, for specifics on how to make a table "cosock aware" see the chapter on it

Its return values are

  • recvr: A list of ready receivers, any entry here should be free to call receive and immediately be ready
  • sendr: A list of ready senders, any entry here should be free to call send and immediately be ready
  • err: If this value is not nil it represents an error message
    • The most common error message here would be "timeout" if the timeout argument provided is not nil and positive

So, how would we use something like this? Let's consider our clients_server.lua example from the spawn chapter, where we called cosock.spawn every time a new client was accepted, this works but we don't have much control over how many tasks we end up spawning. In large part, this is because we don't know how long each task will run. To achieve this, we would need to be able to handle all of the client connections on the same task as the server and to do that, we can use select.

-- clients_server_select.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"

local number_of_clients = 10

--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()

--- Handle a client being ready to receive
--- @param client cosock.socket.tcp
--- @return integer|nil @1 if successful
--- @return nil|string @nil if successful, error message if not
function handle_recv(client, clients)
  local request, err = client:receive()
  if not request then
    if err == "closed" then
      clients[client] = nil
    end
    return
  end
  print(string.format("received %q", request))
  if request:match("ping") then
    print("sending pong")
    local s, err = client:send("pong\n")
    if err == "closed" then
      clients[client] = nil
    elseif err then
      print("error in recv: " .. tostring(err))
    end
  else
    client:close()
    clients[client] = nil
  end
end

--- Handle a server being ready to accept
--- @param server cosock.socket.tcp
--- @return cosock.socket.tcp|nil
--- @return nil|string @nil if successful, error message if not
function handle_accept(server, clients)
  local client, err = server:accept()
  if err and err ~= "timeout" then
    error("error in accept: " .. tostring(err))
  end
  if client then
    clients[client] = true
  end
end

--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
  local server = socket.tcp()
  server:bind(ip, 0)
  local _ip, p = server:getsockname()
  port_tx:send(p)
  server:listen()
  local clients = {}
  server:settimeout(0)
  while true do
    local recvt = {}
    for client, _ in pairs(clients) do
      table.insert(recvt, client)
    end
    if #recvt < 5 then
      table.insert(recvt, server)
    end
    local recvr, _sendr, err = cosock.socket.select(recvt, {}, 5)
    if err == "timeout" then
      return
    elseif err then
      error("Error in select: "..tostring(err))
    end

    for _, sock in ipairs(recvr) do
      if sock == server then
        print("accepting new client")
        handle_accept(server, clients)
      elseif clients[sock] then
        handle_recv(sock, clients)
      end
    end
  end
end, "server task")

--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
  print("spawn_client", id, port)
  local client = socket.tcp()
  client:connect(ip, port)
  for _=1,10 do    
    print("sending ping", id)
    client:send(string.format("ping %s\n", id))
    local request = assert(client:receive())
    assert(request == "pong")
    socket.sleep(0.5)
  end
  client:close()
end

--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
  local port = assert(port_rx:receive())
  for i=1,number_of_clients do
    cosock.spawn(function()
      spawn_client(i, port)
    end, string.format("client-task-%s", i))
  end
end

--- Spawn a bunch of client tasks
cosock.spawn(function()
  spawn_clients()
end, "clients task")

--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()

The above is an updated version of our clients/server example with some updates to limit the total number of connections to 5, let's go over the changes.

First, we've added a few helper functions to handle the different events in our system, the first is for when a client connection is ready to receive handle_recv takes 2 arguments, client which is a cosock.socket.tcp that was returned from a call to accept and clients which is a table where the keys are cosock.socket.tcp clients and the values are true. We first call client:receive to get the bytes from the client and if that returns a string that contains "ping" then we send our "pong" message. There are few places where this can go wrong, the call to receive could return nil and an error message or not "ping" or the call to send could return nil and an error message; if the error message is "closed" or the request didn't contain "ping" then we want to remove client from clients and if it was the latter then we want to call client:close.

Next up we have handle_accept this also takes 2 arguments server which is a cosock.socket.tcp socket that is listening and the same map of clients. If a call to accept returns a client then we add that client into our clients map. If accept returns nil and err isn't "timeout" then we raise an error.

Alright, with these two helper functions we can now update the "server" task to handle all of the connected clients w/o having to call spawn. Our tasks starts out the same as before, creating a server socket, binding it to a random port, gets that port and sends it to our "clients task" and then calls listen. At this point, things start to change, first we define our clients map as empty we then use handle_accept to accept the first connection and then call server:settimeout(0) to avoid a potential server that will yield forever.

Inside of our long-running loop, we start out by defining a new table recvt which will match the argument to select which has the same name. We then loop over our clients table, inserting any of the keys into recvt. We keep these as separate tables because we want to be able to remove a client from our readiness check once it has closed. Next, we check to see how large recvt is, if it is below 5 we add server into it. By only including server when recvt has fewer than 5 clients we have enforced our max connections limit.

With recvt defined we can finally call cosock.socket.select, we use recvt as the first argument, an empty table as the sendt argument and finally a timeout of 5 seconds. We assign the result of select into recvr, _sendr, err, we would expect that recvr would contain any of our clients that are ready to receive and, if we are below the limit, server. If recvr is nil we would expect err to be the string describing that error. If err is "timeout" then we exit our server task which should exit the application. If we don't have an err then we loop over all the recvrs and check to see if they are our server, if so we call handle_accept if not then we call handle_recv. Each of our helpers will update the clients map to ensure that we service all of the client requests before exiting.

The last change we've made is to spawn_client which previously would loop forever, it now loops 10 times before exiting and closing the client.

If we were to run this you would see each of the tasks spawn in a random order and the first 5 of those would begin sending their "ping" messages. Once 1 of them completes, we would accept the next connection but not before that point which means we have limited our total number of connected clients to 5!

Integrating with Cosock

So far we have covered what cosock provides but what if we want to integrate our own libraries directly into cosock, what would that look like?

To start the general interface for a "cosock aware" lua table is to define a method setwaker which takes 2 arguments, kind: str and waker: fun()|nil. The general idea here is that a "waker" function can be provided that will get called when that task is ready to be woken again.

Let's try and build an example Timer that will define this setwaker method to make it "cosock aware"

local cosock = require "cosock"

local Timer = {}
Timer.__index = Timer

function Timer.new(secs)
  return setmetatable({
    secs = secs,
    waker = nil,
  }, Timer)
end

function Timer:wait()
  coroutine.yield({self}, {}, self.secs)
end

function Timer:setwaker(kind, waker)
  print("setwaker", kind, waker)
  if waker then
    self.waker = function()
      print("waking up!")
      waker()
    end
  else
    self.waker = nil
  end
end

cosock.spawn(function()
  local t = Timer.new(2)
  print("waiting")
  t:wait()
  print("waited")
end)

cosock.run()

To start we create a lua meta-table Timer, which has the properties secs: number and waker: fun()|nil. There is a constructor Timer.new(secs) which takes the number of seconds we want to wait for. Finally, we define Timer:wait which is where our magic happens. This method calls coroutine.yield, with 3 arguments {self}, an empty table, and self.secs. These arguments match exactly what would be passed to socket.select, the first is a list of any receivers, the second is a list of any senders and finally the timeout. Since we pass {self} as the first argument that means we are treating Timer as a receiver. Ultimately what we are doing here is asking cosock to call socket.select({self}, {}, self.secs). While we don't end up calling self.waker ourselves, cosock uses setwaker to register tasks to be resumed so we need to conform to that. Just to illustrate that is happening, a print statement has been added to setwaker, if we run this we would see something like the following.

waiting
setwaker        recvr   function: 0x5645e6410770
setwaker        recvr   nil
waited

We can see that cosock calls setwaker once with a function and a second time with nil. Notice though that self.waker never actually gets called, since we don't see a "waking up" message. That is because we don't really need to be woken up, our timer yields the whole coroutine until we have waited for self.secs, nothing can interrupt that. Let's extend our Timer to have a reason to call self.waker, we can do that by adding the ability to cancel a Timer.

local cosock = require "cosock"

local Timer = {}
Timer.__index = Timer

function Timer.new(secs)
  return setmetatable({
    secs = secs,
    waker = nil,
  }, Timer)
end

function Timer:wait()
  local r, s, err = coroutine.yield({self}, {}, self.secs)
  if err == "timeout" then
    return 1
  end
  return nil, "cancelled"
end

function Timer:setwaker(kind, waker)
  print("setwaker", kind, waker)
  if waker then
    self.waker = function()
      print("waking up!")
      waker()
    end
  else
    self.waker = nil
  end
end

function Timer:cancel()
  if self.waker then
    self.waker()
  end
end

cosock.spawn(function()
  local t = Timer.new(10)
  cosock.spawn(function()
    cosock.socket.sleep(3)
    t:cancel()
  end)
  print("waiting")
  local s = os.time()
  local success, err = t:wait()
  local e = os.time()
  print("waited", os.difftime(e, s), success, err)
end)

cosock.run()

In this example, we create our timer that will wait 10 seconds but before we call wait we spawn a new task that will sleep for 3 seconds and then call cancel. If we look over the changes made to wait we can see that we still call coroutine.yield({self}, {}, self.secs) but this time we are assigning its result to r, s, err. Cosock calls coroutine.resume with the same return values we would get from select, that is a list of ready receivers, a list of ready senders, and an optional error string. If the timer expires, we would expect to get back nil, nil, "timeout", if someone calls the waker before our timer expires we would expect to get back {self}, {}, nil. This means we can treat any err == "timeout" as a normal timer expiration but if err ~= "timeout" then we can safely assume our timer was canceled. If we were to run this code we would see something like the following.

waiting
setwaker        recvr   function: 0x556d39beb6d0
waking up!
setwaker        recvr   nil
setwaker        recvr   nil
waited  3.0     nil     cancelled

Notice we only slept for 3 seconds instead of 10, and wait returned nil, "cancelled"! One thing we can take away from this new example is that the waker API is designed to allow one coroutine to signal cosock that another coroutine is ready to wake up. With that in mind, let's try and build something a little more useful, a version of the cosock.channel api that allows for a maximum queue size. Looking over the existing channels, to implement this we are going to need to have 3 parts. A shared table for queueing and setting the appropriate wakers, a receiver table and a sender table. Let's start by defining the shared table.

Bounded Channel

To start we are going to define a Lua table that will represent our shared bounded queue.

local BoundedChannel = {}
BoundedChannel.__index = BoundedChannel

This table should have the following properties

  • _wakers: This is a table with 2 keys
    • sendr: This is a map of potential functions where the keys are a table representing the waiting sender
    • recvr: An optional function that takes no arguments, this will wake our receiver
  • _max_depth: This is the integer value that our queue should not grow larger than
  • _msg_queue: This is the message queue we will use to hold pending messages
  • _closed: This is a boolean to indicate if we have been explicitly closed

To make our lives easier, next we will add a couple of methods that will enforce the queue nature of our _msg_queue, one for removing the oldest message and one for adding a new message.

function BoundedChannel:pop_front()
  return table.remove(self._msg_queue, 1)
end

--- Add a new element to the back of this channel
function BoundedChannel:push_back(ele)
  table.insert(self._msg_queue, ele)
end

Next, let's add a method for testing if we can send a new message.

function BoundedChannel:can_send()
  if self._closed then
    -- Check first that we are not closed, if we are
    -- return an error message
    return nil, "closed"
  end
  if #self._msg_queue >= self._max_depth then
    -- Check next if our queue is full, if it is
    -- return an error message
    return nil, "full"
  end
  -- The queue is not full and we are not closed, return 1
  return 1
end

This method first checks that we haven't been closed, if we have then it returns nil, "closed", if we are still open it next checks to see if we have any space in our queue, if not it returns nil, "full". So if we are not closed and not full it returns 1.

Now we should create a similar method for checking if we can receive a new message.

function BoundedChannel:can_recv()
  if self._closed and #self._msg_queue == 0 then
    -- Check first that we haven't closed, if so
    -- return an error message
    return nil, "closed"
  end
  if #self._msg_queue == 0 then
    -- Check next that we have at least 1 message,
    -- if not, return an error message
    return nil, "empty"
  end
  -- We are not closed and we have at least 1 pending message, return 1
  return 1
end

Again, we being by checking for the closed case, returning nil, "closed", then we check to see if the queue is empty, if so we return nil, "empty" if there is at least 1 message and we aren't closed then we return 1.

Now we can define a few methods for interacting with our self._wakers property. First up is the set_waker_recvr method.

function BoundedChannel:set_waker_recvr(waker)
  self._wakers.recvr = waker
  if waker and self:can_recv() then
    -- Check if receiving is currently available, if
    -- so call the waker to wake up the yielded receiver
    waker()
  end
end

Ok, so this one is pretty simple now that we have our helpers. First, we populate the value in self._wakers.recvr. If waker is not nil and self:can_recv returns 1 we want to immediately call the waker because it means we are ready to be woken up.

For our next method, we are going to define set_waker_sendr.

function BoundedChannel:set_waker_sendr(sendr, waker)
  self._wakers.sendr[sendr] = waker
end

This looks quite a bit different from set_waker_recvr! First of all, we have an extra argument sendr which will represent a unique call to coroutine.yield and is how we can allow for multiple senders. The second thing to notice is that we are not checking to see if self:can_send this is because we don't know if another waker has already been woken up for the current state. This is all a bit hand-wavy right now but when we implement the sender things should become clear.

Now that we can set a waker, it is time to add a method for calling those waker functions.

function BoundedChannel:try_wake(kind)
  local waker
  if kind == "sendr" then
    _, waker = next(self._wakers.sendr)
  else
    waker = self._wakers.recvr
  end
  if type(waker) == "function" then
    waker()
  end
end

Our new try_wake method takes 1 argument, which will either be the string "sendr" or "recvr". If we are trying to wake a "sendr" then we use the next function to find 1 entry in the table self._wakers.sendr, if we have at least 1 entry in that table we assign the value to waker. If we are trying to wake a "recvr" we assign self._wakers.recvr to waker. If waker is a function (aka not nil) then we call that function.

function BoundedChannel:close()
  self._closed = true
end

close will just set our _closed property to true

Ok, now that we have our shared channel defined, let's implement our receiver.

Bounded Recvr

Next, we will define a Lua table that will represent the receiving half of our channel. This table will have 2 properties _link which will be our BoundedChannel and a timeout which will be an optional number.

local BoundedChannelReceiver = {}
BoundedChannelReceiver.__index = BoundedChannelReceiver

Ok, let's create a constructor for it, this will take 1 argument which should populate its _link property.

function BoundedChannelReceiver.new(shared_queue)
  return setmetatable({_link = shared_queue}, BoundedChannelReceiver)
end

Notice we didn't include a timeout property at all, this is because we want it to be nil by default. In order to match the same API that cosock.channel uses let's add a method for setting our timeout.

function BoundedChannelReceiver:settimeout(timeout)
  self.timeout = timeout
end

Next, we want to define the setwaker method used by this side of the queue.

function BoundedChannelReceiver:setwaker(kind, waker)
  if kind ~= "recvr" then
    error("Unsupported wake kind for receiver: " .. tostring(kind))
  end
  self._link:set_waker_recvr(waker)
end

In this method, we have added a check to make sure it isn't getting called with a kind of "sendr" since that would be pretty much meaningless. If we have a valid kind then we pass the waker down to self._link:set_waker_recvr.

Ok, now for the good stuff: the receive method.

function BoundedChannelReceiver:receive()
  while true do
    local can_recv, err = self._link:can_recv()
    if can_recv then
      local element = self._link:pop_front()
      self._link:try_wake("sendr")
      return element
    end
    if err == "closed" then
      return nil, err
    end
    if err == "empty" then
      local _r, _s, err = coroutine.yield({self}, nil, self.timeout)
      if err then
        return nil, err
      end
    end
  end
end

Alright, there is a lot going on here so let's unpack it. To start we have a long-running loop that will only stop when we have reached either an error or a new message. Each iteration of the loop first checks if self._link:can_recv(), if that returns 1, then we call self._link:pop_front to capture our eventual return value, next we want to alert any senders that more space has just been made on our queue so we call self._link:try_wake("sendr"), finally we return the element we popped off ending our loop. If can_recv returned nil we check to see which err was provided, if it was "closed" we return that in the error position also ending our loop. If can_recv returns nil, "empty" then we want to yield until either we get woken by a sender or we have waited for the duration of self.timeout. We do this by calling coroutine.yield({self}, nil, self.timeout), this will give up control to cosock until either someone calls BoundedChannel:try_wake("recvr") or our timeout is reached.

If we recall that coroutine.yield returns a list of ready receivers and a list of senders or nil, nil and an error message. This means if coroutine.yield returns {self} then we have a new message so we go to the top of the loop and the next call to self.link:can_recv should return 1 or nil, "closed". If coroutine.yield returns nil, nil, "timeout" that means we have yielded for self.timeout.

One final thing we want to make sure is that we are keeping our end of that nil "closed" bargain, so let's define a closed method.

function BoundedChannelReceiver:close()
  self._link:close()
  --- If anything is waiting to send, it should wake up with an error
  self._link:try_wake("sendr")
end

For this, we first call self.link:close which will set our shared table's _closed property to true which will ultimately make both can_send and can_recv return nil, "closed". Next, we want to wake up any sending tasks since they are reliant on us to tell them something has changed, so we call self._link:try_wake("sendr").

With that we have the complete receiver side of our channel, now let's write up the sender.

Bounded Channel: Sendr

To start, we define our Lua table which represents the sending half of our channel.

local BoundedChannelSender = {}
BoundedChannelSender.__index = BoundedChannelSender

This table will have the same shape as our BoundedChannelReceiver, it will have a _link property and a timeout property; our constructor is nearly identical.

function BoundedChannelSender.new(shared_queue)
  return setmetatable({_link = shared_queue}, BoundedChannelSender)
end

We will also define a settimeout method that looks exactly the same.

function BoundedChannelSender:settimeout(timeout)
  self.timeout = timeout
end

Our close method is also very close to the receiver's implementation.

function BoundedChannelSender:close()
  self._link:close()
  -- If anything is waiting to receieve it should wake up with an error
  self._link:try_wake("recvr")
end

Here, just the argument to self._link:try_wake changed from "sendr" to "recvr".

Now we start to see things become quite different. The first thing to note is that we are not going to define a setwaker method for this table. This may seem strange since it is one of the few things that we need to do to make a "cosock aware" table but if we were to use the same setwaker for all of the places that we call BoundedSender:send, we would end up gumming up the internal workings of cosock. To see how we get around this it would be good to go over the implementation of send.

function BoundedChannelSender:send(msg)
  while true do
    
    local can_send, err = self._link:can_send()
    if can_send then
      self._link:push_back(msg)
      -- Wake any receivers wo might be waiting to receive
      self._link:try_wake("recvr")
      return 1
    end
    if err == "closed" then
      return nil, "closed"
    end
    if err == "full" then
      local wake_t = {
        setwaker = function(t, kind, waker)
          assert(kind == "sendr")
          self._link:set_waker_sendr(t, waker)
        end
      }
      local _r, _s, err = coroutine.yield(nil, {wake_t}, self.timeout)
      if err then
        return nil, err
      end
    end
  end
end

To start, things look a lot like our BoundedRecvr:receive implementation, we have a long-running loop that first calls self._link:can_send, if that returns 1 we use the push_back helper to add the message to our queue and then we try to wake up any yielded "recvr"s, returning 1 to indicate it was successful. If can_send returned an error message and that message was "closed" we return nil, "closed". If can_send returned the error message "full" we want to yield until we can try again.

To prepare for yielding, we first create a table called wake_t this will represent a single call to BoundedSender:send that is yielding. On wake_t we set 1 property and that is the setwaker method which uses assert to raise an error if it was called with a kind of "recvr" and then uses the BoundedChannel:set_waker_sendr method to associate the waker argument with wake_t. By creating this temporary table, what we are doing is allowing for a unique waker function to be defined on any threads that need waking. If we were to use a single BoundedChannel._wakers.sendr function, we would end up removing the ability to wake any yields beyond the last because calling waker always calls setwaker(kind, nil) to avoid potential "double wakes".

Now that we have set up our wake_t we can call couroutine.yield this time we are going to use the arguments nil, {wake_t}, self.timeout. Since we put wake_t in the sendt argument, we will wait until we either reach the duration of self.timeout or when someone calls BoundedChannel:try_wake("sendr") and our wake_t is returned from next.

This is probably the easiest way to create these unique wakers but it does come with a potential issue. If you are interested in using this implementation please review Appendix-A

Now, let's finish up our implementation and see if we can see our bounded channel working.

Bounded Channel: Finish

At this point, we have most of our BoundedChannel and all of our BoundedChannelReceiver and BoundedChannelSender set up so the last thing we need to do is add a constructor to BoundedChannel.

function BoundedChannel.new(max_depth)
  local link = setmetatable({
    _max_depth = max_depth,
    _wakers = {
      sendr = {},
    },
    _msg_queue = {},
    _closed = false,
  }, BoundedChannel)
  return BoundedChannelSender.new(link), BoundedChannelReceiver.new(link)
end

This constructor takes 1 argument, the number telling us how large our queue can get. This returns 2 tables, the first return is a BoundedChannelSender and the second return is a BoundedChannelReceiver both have the same shared BoundedChannel as their _link property.

Now let's see our new channel in action!

local cosock = require "cosock"
local BoundedChannel = require "examples.bounded_channel"

local tx, rx = BoundedChannel.new(2)

cosock.spawn(function()
  local s = cosock.socket.gettime()
  for i=1,10 do
    tx:send(i)
    local e = cosock.socket.gettime()
    print(string.format("sent %s in %.1fs", i, e - s))
    s = e
    cosock.socket.sleep(0.2)
  end
end, "sendr1")

cosock.spawn(function()
  local s = cosock.socket.gettime()
  for i=11,20 do
    tx:send(i)
    local e = cosock.socket.gettime()
    print(string.format("sent %s in %.1fs", i, e - s))
    s = e
    cosock.socket.sleep(0.2)
  end
end, "sendr2")

cosock.spawn(function()
  local s = cosock.socket.gettime()
  for i=1,20 do
    local msg = rx:receive()
    local e = cosock.socket.gettime()
    print(string.format("recd %s in %.1fs", msg, e - s))
    s = e
    cosock.socket.sleep(1)
  end
end, "recvr")

cosock.run()

After we import both cosock and our BoundedChannel we create a new channel pair with a maximum queue size of 2. We then spawn 2 new tasks for the sender, in these tasks we loop 10 times, sending a message and then sleeping for 0.2 seconds. We have a call to cosock.socket.gettime here before and after the send to see if there is any delay.

Next, we spawn a task for our receiver, this receives a message and then sleeps for 1 second 20 times.

Since we are sending a lot faster than we are receiving, we would expect that after the first few messages we should see the amount of time it takes to send a message hits about 1 second indicating that our queue has reached its maximum of 2. If we were to run this we should see something like the following.

sent 1 in 0.0s
sent 11 in 0.0s
recd 1 in 0.0s
sent 2 in 0.2s
recd 11 in 1.0s
sent 12 in 1.0s
recd 2 in 1.0s
sent 13 in 1.0s
recd 12 in 1.0s
sent 14 in 1.0s
recd 13 in 1.0s
sent 15 in 1.0s
recd 14 in 1.0s
sent 16 in 1.0s
recd 15 in 1.0s
sent 17 in 1.0s
recd 16 in 1.0s
sent 18 in 1.0s
recd 17 in 1.0s
sent 19 in 1.0s
recd 18 in 1.0s
sent 20 in 1.0s
recd 19 in 1.0s
sent 3 in 9.8s
recd 20 in 1.0s
sent 4 in 1.0s
recd 3 in 1.0s
sent 5 in 1.0s
recd 4 in 1.0s
sent 6 in 1.0s
recd 5 in 1.0s
sent 7 in 1.0s
recd 6 in 1.0s
sent 8 in 1.0s
recd 7 in 1.0s
sent 9 in 1.0s
recd 8 in 1.0s
sent 10 in 1.0s
recd 9 in 1.0s
recd 10 in 1.0s

From this output, we can determine the exact order things played out. First, we can see that sendr1 is able to push 1 onto the queue, then it sleeps for 0.2 seconds which allows sendr2 to push 11 onto the queue which also sleeps for 0.2 seconds then recvr pops 1 off the queue followed by a 1-second sleep.

For the first time at this point, we have 3 sleeping tasks. Since sendr1 went first it will be the first to wake from its sleep, pushes 2 onto the queue and then goes back to sleep which allows sendr2 to wake up and try to send 12 but the queue is full ({11, 2}), so it has to wait until recvr wakes up to pop 11 off the queue. Once recvr pops 11 off the queue, we see that sendr2 is able to push 12 but it took 1 full second to do so! Now we will stay in a state where both sendr1 and sendr2 are waiting to send for ~1 second until recvr is able to receive at which point either sendr1 or sendr2 again pushes a new value. Once we reach the last 2 values, we see that our sendrs go quiet because they are all done with their work but recvr still takes another 2 seconds to complete.

Did you notice that sendr2 gets to go far more often than sendr1 at this start and it takes sendr1 9.8 seconds to send 3? This is because of our waker scheme and Appendix A has more on that

Appendix A: Bounded Channel Limits

In our BoundedChannelSender:send, BoundedChannel:set_waker_sendr and BoundedChannel:try_wake we have made some decisions that make our implementation a little easier to read/write but might cause some problems if we end up depending on it.

To review, any time BoundedChannelSender:send would yield we create a temporary table to handle our setwaker calls and in BoundedChannel:set_waker_sendr we use that table as the key to _wakers.sendr to store/remove the waker function.

In BoundedChannel:try_wake we use the next function to choose which entry of _wakers.sendr to call. The next function will always return the "first" key/value pair but what does that mean, how can Lua tables be ordered? When a table is created, it is assigned a memory address, unique to that table, we can see what the address is by using the default __tostring metamethod.

lua -e "print({})"
table: 0x5581bc0236a0

In the above, we can see the memory address of an empty table is 0x5581bc0236a0, which will change if we were to run it again. If we were to use this table as the key in another table that table would look something like this.

{
  [0x5581bc0236a0] = "first element"
}

So, let's look at how Lua might order a table like this with more than one key.

local tables = {}
for i=1, 10 do
  tables[{}] = i
end

for i=1, 10 do
  local t, v = next(tables)
  local tp = tonumber(tostring(t):match("table: 0x(.+)"), 16)
  print(tp, v)
  tables[t] = nil
end

This example will create a table tables then loop 10 times assigning tables[{}] with i. In a second loop to 10, we use the next function to pull the "first" key/value pair from our tables. We then convert t by converting it into a hex string and then converting it back into a number, which may be easier to read for some than trying to tell which hex value is larger than another. It prints out the table representation and which i was assigned to it then removes it from tables by assigning tables[t] with nil. If we run this once we might see something like.

93877018273488  1
93877018274240  9
93877018274016  7
93877018273792  5
93877018273616  3
93877018274352  10
93877018274128  8
93877018273904  6
93877018273680  4
93877018273552  2

At first, it looks like it might go in order because we get 1 but then we see the second return from next is 9. If we run it again we might see something like:

93837605766864  2
93837605767120  6
93837605767376  10
93837605766928  3
93837605767184  7
93837605766992  4
93837605767248  8
93837605766800  1
93837605767056  5
93837605767312  9

This time, we get 2 first and 9 last which means that we can expect the order to be somewhat random. We can also see pretty obviously that Lua has ordered the keys by the lowest memory address first. That means that next will return the waker associated to the wake_t that has the lowest memory address. So what happens if one coroutine always gets the lowest value? It could starve other coroutines from being able to send.

This might not be all bad though, randomness can be good since we don't want to show a preference and randomness does just that but that would require something like "a normalized distribution" which would mean it would take a very long time to see the same order. Let's see how random our temporary table keys are.

local m = {
  sets = {}
}

local table_size = 9

function m:add_set(set)
  for idx, existing in ipairs(self.sets) do
    local exact_match = false
    local left_t, left_v, right_t, right_v
    for i=1,table_size do
      left_t, left_v = next(set, left_t)
      right_t, right_v = next(existing, right_t)
      exact_match = left_v == right_v
      if not exact_match then
        break
      end
    end
    
    if exact_match then
      table.insert(self.sets, set)
      return {
        first = existing,
        first_idx = idx,
        second = set,
        second_idx = #self.sets
      }
    end
  end
  table.insert(self.sets, set)
end

local function gen_set()
  local tables = {}
  for i=1, table_size do
    tables[{}] = i
  end
  return tables
end

local result

repeat
  result = m:add_set(gen_set())
until result

print("RESULT")
print(result.first_idx, result.second_idx)
print("f,s")
for i=1,table_size do
  local t1, v1 = next(result.first)
  local t2, v2 = next(result.second)
  print(string.format("%s,%s", v1, v2))
  result.first[t1] = nil
  result.second[t2] = nil
end

Here we have extended our example to allow for repeating the creation of tables and then checking to see if the new version matches any of the previous versions. We reduced the number of entries to 9 to make it easier to read the results but otherwise, get_set will create the same table as our original example. We have defined a table to hold all of our sets named m and defined a method there add_set which will either return nil if the set argument isn't already in the list or a results table if it was found. So what happens if we run this?

RESULT
1       4
f,s
4,4
8,8
1,1
5,5
9,9
2,2
6,6
3,3
7,7

It looks like it only took us 3 sets to find the exact same order. Considering that 0-9 have a potential number of combinations greater than 300,000 it seems that our distribution not very normal.

Advanced

In the following chapters, we will discuss in detail the inner workings of cosock. The subject matter is going to shift focus from "how do I use cosock" to "how does cosock work" which may not be of interest to everyone.

Internals

This section will go over the contents of the cosock.socket.internals module and how that interacts with the cosock runtime and the underlying Luasocket library.

Run

This is a step-by-step explanation of what happens in each pass through the main loop of cosock.run.

Internals

The module cosock.socket.internals is where luasocket gets wrapped into a "cosock aware" table. Initially, a call to passthroughbuilder is used to create a "builder" function. passthroughbuilder takes 2 arguments, recvmethods and sendmethods which are both a table where the keys are a method name and the values are a set-like table of error messages that it would be appropriate to yield for. A good example of one of these is the tcp module's recvmethods.

local recvmethods = {
  receive = {timeout = true},
  accept = {timeout = true},
}

In both of the methods defined here, if we were to get the return value of nil, "timeout" would be a signal to call coroutine.yield and try again. The return value of passthroughbuilder is a function that we will call builder. builder takes 2 arguments method which is a string and an optional transformsrc which is a table, or a function that returns a table, with the following properties.

  • input: this is an optional function that takes the method inputs and returns those inputs potentially transformed.
    • This is only called once, just before we call the luasocket method for the first time
  • blocked: this is an optional function that takes the return values from the method called and returns the input arguments to the next call of the same method
  • output: This is an optional function that will be called with the return values of the method
    • This is only called once, just before we return from the method

Let's use a few examples to go over each of these starting with the input property.

The method receive on a luasocket takes 2 arguments, a pattern string or number indicating how many bytes to try and read for and an optional prefix to put at the front of what was received.

local socket = require "luasocket"
local t = socket.tcp()
t:connect("0.0.0.0", 8000)
print(t:receive("*l", "next line:"))

Assuming that some server is listening on port 8080 of the machine we run this on, we would receive 1 line, for example, "ping\n" this would print "next line: ping". As we will get into later, a call to cosock's receive may end up calling luasocket's receive until we get to a new line character. So what if our server sent 1 byte at a time? We would end up printing "next line: pnext line: inext line: nnext line: g" if we passed the second argument to the underlying luasocket. To avoid this we can use the input property to store this pattern and only add it once to the eventual return value.

Now let's consider the blocked property, continuing to use receive as our example method, what happens if we call t:receive(10) and again our server returns 1 byte at at time?

We can't call the underlying luasocket method with 10 over and over, that would result in us requesting too many bytes from the socket. Instead, we need a way to capture the partial value we received and reduce the number of bytes accordingly. Thankfully luasocket returns any partial data on error as the 3rd return argument so we could do something like

{
  blocked = function(success, err, partial)
    table.insert(shared_buffer, partial)
    remaining_recv = remaining_recv - #partial
    return remaining_recv 
  end
}

This example assumes that shared_buffer and remaining_recv exist somewhere but you can see that we are appropriately reducing the number of bytes we return here. This will eventually be the argument provided to the next call to the luasocket method. Here is a longer-form example of how a response of 1 byte at a time would look for our luasocket.

local shared_buffer = {}
local remaining_recv = 5
--p
local _, err, chunk = t:receive(remaining_recv)
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--i
err, chunk = t:receive(remaining_recv) --i
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--n
err, chunk = t:receive(remaining_recv) --n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--g
err, chunk = t:receive(remaining_recv) --g
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--\n
err, chunk = t:receive(remaining_recv) --\n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)

Finally we have the output property which gets called with the last return values from our method. If we complete our example, this is where we would end up calling table.concat(shared_buffer) to add all the chunks together before returning.

Continuing to use receive as an example, this is what the transform argument might look like.

local recvmethods = {
  receive = {timeout = true}
}
local sendmethods = {}
-- First we define a builder injecting the method<->error message maps
local builder = passthroughbuilder(recvmethods, sendmethods)
-- Now we can use the builder to define a method that doesn't do any
-- transformations
m.bind = builder("bind")
-- Here we define a method that performs some transformations
m.receive = builder("receive", function()
  local shared_buffer = {}
  local remaining_recv
  local pattern
  return {
    input = function(pat, prefix)
      -- insert the prefix as the first part of our return
      -- value if present
      if prefix then
        table.insert(shared_buffer, prefix)
      end
      if type(pat) == "number" then
        -- we know how many bytes to wait for, set this
        -- for use in blocked
        remaining_recv = pat
      else
        -- store this for use in blocked
        pattern = pat
      end
      -- return only pattern to avoid duplicate prefixes
      return pattern
    end,
    blocked = function(_, err, partial)
      if type(partial) == "string" and #partial > 0 then
        table.insert(shared_buffer, partial)
        -- only reduce remaining_recv if it is a number
        if remaining_recv then
          remaining_recv = remaining_recv - #full
          -- returning the updated remaining receive
          return remaining_recv
        end
        -- otherwise we return the pattern provided to input
        return pattern
      end
    end,
    output = function(full, err, partial)
      -- if the first return is a string with a length > 0 then
      -- add it to the buffer
      if type(full) == "string" and #full > 0 then
        table.insert(shared_buffer, full)
      end
      -- if the third return is a string with a length > 0 then
      -- add it to the buffer
      if type(partial) == "string" and #partial > 0 then
        table.insert(shared_buffer, partial)
      end
      -- concatenate all the strings together
      local all = table.concat(shared_buffer)
      if err then
        -- if ther was an error it should go in the 3rd return
        -- position
        return nil, err, all
      else
        -- if not error then it should go in the 1st return
        -- position
        return all
      end
    end
  }
end)

With the arguments defined, we can now discuss the return value of builder which will be a third function, this one being the method's implementation, its first argument is self and varargs are used to allow for any additional arguments.

Let's pause here and go over this because 3 levels of functions can be a bit difficult to follow. Our goal here is to re-use as much as possible for each of the methods on a cosock socket and since yield -> retry loop is going to be a common pattern we can define all of that in 1 place. The key is that these methods are going to need to know about a few extra pieces which is achieved by the fact that each function's arguments are available to the returned function.

Which means that the receive method would have the following environment.

local recvmethods = {
  receive = { timeout = true }
}
local sendmethods = {}
local method = "receive"
local transformsrc = function() --[[see above]] end

Now let's go over what actually happens in this shared method implementation. First, we capture all of the varargs into a table named inputparams, if the transform object had an input property defined, we then overwrite the variable with {input(table.unpack(inputparams))}. Now that we have our inputs the way they need to be we begin a long-running repeat/until loop.

At the top of the loop we call self.inner_sock[method], inner_sock is the property name for the luasocket on all of the cosock sockets. If the first return from that function is nil we check to see if the second return value can be found in receivemethods or sendmethods, if so we know that we need to yield, so we check if blocked is defined and call that if it is, again overwriting inputparams with the return value.

Now we determine what kind of yield we are going to do, if the second return was found in receivemethods it would be "recvr" if it was found in sendmethods it would be "sendr". Now we set up our arguments for coroutine.yield putting self into recvt if our kind is "recvr" or into sendt if our kind is "sendr". Now we can call coroutine.yield(sendt, recvt, self.timeout) assigning the returns there to recvr, sendr, rterr. If rterr is not nil, we are going to return early, if its value matches the error from our method call (i.e. "timeout" for both) then we return the values from our call to that method.

The last thing we do in this case before heading back to the top of the loop is to assert our kind and the result of cosock.socket.select match, meaning we have a kind of "sendr", the sendr variable is populated, and the recvr variable is unpopulated; or vice versa.


If the first return argument to our method call was not nil then we can exit early transforming the return value with output if that is populated.

The only other function provided by cosock.socket.internals is setuprealsocketwaker which completes the wrapping of our cosock socket.

This function takes the socket table and an optional list of kinds, if kinds is not provided then the default will be both sendr and recvr.

We then define a method on socket called setwaker which is used by cosock to wake up sleeping coroutines (see the integrating chapter for more info). This setwaker will assign the provided waker function to a self.wakers table based on the kind of waker. It also defines a method _wake which takes an argument kind and varargs for any additional arguments. This method will see if self.wakers[kind] is not nil and if so call that with the varargs. It then replaces self.wakers[kind] with nil.

Advanced Overview of cosock.run

Global Variables

Cosock utilized a few global variables to allow for the potentially recursive nature of lua coroutines.

  • threads: List of all coroutines cosock is aware of
    • This is populated by the first argument to cosock.spawn
  • threadnames: A map of coroutine<->name pairs
    • This is populated by the second argument ot cosock.spawn
  • threadswaitingfor: A map of coroutine<->select args
    • Select args have the type {recvr: table<cosock.socket>, sendr: table<cosock.socket>, timeout: float?}
    • This populated by the values provided to coroutine.yield for cosock tasks from a call to cosock.socket.select
  • readythreads: A map of coroutine<->resume args that will be ready on the next pass
    • Resume args have the type {recvr = table<cosock.socket>, sendr = table<cosock.socket>, err: string?}
    • This is populated by coroutine wake-ups that occur on the current pass
  • socketwrappermap: A map of luasocket<->cosock socket pairs
    • This map is keyed with the table pointer for the luasocket for easily getting back to a cosock socket when you only have a luasocket
    • This gets populated when a cosock socket is included in a select args table
  • threaderrorhandler: Potential error handler function. Not currently settable.
  • timers: see Timers

Run Loop

To start the loop we define a few local variables. First up is wakethreads, This table will be populated by removing all of the elements from readythreads, which frees up readythreads to be added to by any tasks we are about to wake up. Next are the two list tables sendt and recvt along with the optional integer timeout, these will end up being passed to luasocket's socket.select when we run out of ready threads. Now we can move all our readythreads into wakethreads and then loop over all of the ready threads.

For each ready thread, we first check if the coroutine.status for it is "suspended", if it isn't we will skip this thread. For any "suspended" thread, we first cancel any timers that might be scheduled for that thread by calling timers.cancel. Next we pull out the values we stored in threadswaitingfor and call skt:setwaker with nil for any sendr or recvr properties, this will prevent any potential "double wakes" from occurring.

Now we call coroutine.resume with our thread, and the recvr, sendr and err values that were stored in wakethreads. When coroutine.resume completes, we have 2 pieces of information that drive our next path. The first return value from coroutine.resume will indicate if our thread raised an error or not, the second is that we call coroutine.status on our thread. If our thread's status is "dead" and coroutine.resume returned false, something has gone terribly wrong so we raise an error (with a traceback if debug is available). If our thread's status is "dead" and coroutine.resume returned true, we just remove our thread from threads and threadswaitingfor. If our thread's status is "suspended" and coroutine.resume returned true then we first update threadswaitingfor with the remaining return values from coroutine.resume. We also then call skt:setwaker for any sendr and recvr in those values to a function that will clear itself and call the local function wake_thread. At this point we also update our local tables recvt and sendt, to include the skt.inner_sock values from threadswaitingfor which are the luasockets associated with a cosock.socket.

Now that we have resumed all of the wakethreads and filled in our eventual arguments to socket.select, we then determine if we are still running, we do this by looping over all of our threads and checking that at least 1 has a coroutine.status that is not "dead". If all threads are "dead" and nothing got added to readythreads then we exit the run loop. Next we update our socketwrappermap by looping over all of the values in threadswaitingfor and insert each recvr and sendr into the key of skt.inner_sock.

With all the bookkeeping done, we run all of the timers that have reached their deadline, and update our local variable timeout to the duration until the shortest remaining timeout. If at least one thread was added to readythreads, we set our timeout to 0, because we already know we have new work to do. At this point if timeout is nil and both sendt and recvt are empty, we raise an error because we are about to call socket.select({}, {}, nil) which would just block forever. At this point, we call socket.select capturing the values in recvr, sendr and err. If err isn't the value "timeout", we raise that error. If err is nil or "timeout", we loop over all of the values in recvr and sendr, looking up the cosock socket in socketwrappermap and calling skt:_wake which would call the function we provided to setwaker above.

With that complete, we now have a fully updated readythreads and we start the loop again.

Outline Version
  1. Define wakethreads
  2. Define an empty list of senders (sendt), receivers (recvt) and a timeout
  3. Pop all readythreads entries into the wakethreads
  4. Loop over all threads in wakethreads
    1. If coroutine.status for that thread returns "suspended"
      1. Clear any timers
      2. Clear any wakers registered with a timeout
      3. coroutine.resume with the stored recv4, sendr and err arguments
      4. If coroutine.resume returned true in the first position and coroutine.status returns "suspended"
        1. Re-populate threadswaitingfor[thread] with the 3 other return values from coroutine.resume
          1. These should be the recvt, sendt and timeout values that will populate select args
        2. Set the waker for all sockets in recvt and sendt to call wake_thread and then unset themselves
        3. If coroutine.resume returned a timeout, create a new timer for this thread which will call wake_thread_err on expirations with the value "timeout"
      5. if coroutine.status returned "dead"
        1. If coroutine.resume returned false in the first position and no threaderrorhandler has been set
          1. Raise an error
            1. If the debug library is available, include a debug.traceback and the second return value from cosock.resume
            2. Else just raise an error with the second return value from cosock.resume
          2. Exit the application
            1. This calls os.exit(-1)
    2. Else, print a warning message if printing is turned on
  5. Initialize a variable running to false
  6. Loop over all threads, calling coroutine.status on each, if at least 1 doesn't return "dead", set running to true
  7. If running is false and readythreads is empty
    1. Exit the run loop
  8. Loop over all the values in threadswaitingfor
    1. Insert the luasockets on any sendr or recvr parameters to the loop local variables sendt and recvt
    2. Populate socketwrappermap with any sendr or recvrs
  9. Call timers.run
  10. If readythreads is not empty
    1. Set timeout to 0
  11. If timeout is falsy and recvt is empty and sendt is empty
    1. Raise an error that cosock.select was called with no sockets and no timeouts
  12. Call luasocket's socket.select with our loops recvt, sendt and timeout
  13. If socket.select returns a value in the 3rd position and that value is not "timeout"
    1. Raise an error with that return value
  14. Loop over the recvr (1st) return from socket.select
    1. Look up the cosock.socket from socketwrappermap
    2. Call skt:_wake("recvr")
  15. Loop over the sendr (2nd) return from socket.select
    1. Look up the cosock.socket from socketwrappermap
    2. Call skt:_wake("sendr")

Timers

Internally, we keep a list of timer objects to determine when any thread would have reached the maximum time it should be running/yielding for. We can interact with these through a module local variable timers which has a few associated functions.

Inside of a do block, we create 2 local variables timeouts and refs for use in the timer associated functions.

The first associated function worth discussing is timers.set, which takes the arguments timeout: float, callback: fun() and ref: table. When called, we first capture the current timestamp via socket.gettime(), we then calculate the deadline for this timer by adding timeout to that timestamps into a variable timeoutat. We then table.insert the the table { timeoutat = timeoutat, callback = callback, ref = ref } into timeouts. If ref isn't nil we also populate refs[ref] with that same table.

Next up is timers.cancel which takes the arguments ref: table. When called, we first lookup the timeout info from refs[ref], if we find something there we remove the values in the properties callback and ref and finally we remove the value from refs. By removing the callback we avoid ever calling the consequence of that timer. Eventually it will be removed from timeouts in the next call to run.

Finally we have timers.run this function takes no arguments. When called, it first sorts the timeouts table in ascending order by timeoutat, where nil values are the smallest values. We then capture the current timestamp by calling socket.gettime. Now we consult the first element of timeouts, if that table as a timeoutat of nil or is less than now, we pop it off list, if it has a callback property we call that, if it has a ref property we remove it from refs.

Now that all the pending timers are done, we use the new first element of timeouts' timeoutat property to calculate the next relative timeout (timeoutat - now) and return that as the earliest timeout. If timeouts is empty, we return nil.

Outline Version
  • A timer has the shape {timeoutat: float, callback: fun(), ref: table?}
    • timers.set
      • Updates timers to include that value. Also updates a private scoped table named refs
        • refs is a map of table pointer<->timer which is used for cancellation of a timer
    • timers.cancel
      • If the provided table pointer is in refs, remove the callback and ref properties from that table
      • Set the table pointer key in refs to nil
    • timers.run
      • Sort all timeouts by deadline (earliest first)
      • Pop the timer off the front of the timers list
      • If that timer.timeoutat is nil or < socket.gettime()
        • Call timer.callback
        • remove this timer from refs
      • If there are any more timeouts left, return how long before that timeout should expire
      • If there are no more timeouts, return nil