Channels

Coordinating coroutines can be a huge pain, to ease this pain cosock offers a synchronization primitive called channels. A cosock.channel is a "multiple producer single consumer" message queue, this means you can have one coroutine own the receiving half of the queue and pass the sender out to however many coroutines you'd like. We've already seen how they are used, in our first example, we used a cosock.channel to coordinate which port the client should connect to. What if we wanted to re-write that example without using a channel, that might look something like this:

--client_server_no_channel.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local server = socket.tcp()
local shared_port
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
  server:bind(ip, 0)
  local _ip, p = server:getsockname()
  shared_port = p
  server:listen()
  local client = server:accept()
  while true do
    local request = assert(client:receive())
    print(string.format("received %q", request))
    assert(request == "ping")
    print("sending pong")
    client:send("pong\n")
  end
end, "server task")

--- Spawn a task for handling the client side of the socket
cosock.spawn(function()
  --- wait for the server to be ready.
  while shared_port == nil do
    socket.sleep(1)
  end
  local client = socket.tcp()
  client:connect(ip, shared_port)
  while true do    
    print("sending ping")
    client:send("ping\n")
    local request = assert(client:receive())
    assert(request == "pong")
  end
end, "client task")

--- Finally we tell cosock to run our 2 coroutines until they are done
--- which should be forever
cosock.run()

That update removed our channel in favor of polling that shared_port ~= nil once every second. This will absolutely work, however, we have introduced a race condition. What if we sleep for 1 full second right before the server is issued its port? That second would be wasted, we could have already been creating our client. While the consequence in this example isn't dire, it does show that relying on shared mutable state without some way to synchronize it between tasks can be problematic.

As a note, we could also solve this problem by having the server task spawn the client task but that wouldn't be nearly as interesting to our current subject.

Another place we have seen that could benefit from a channel is our tick/tock example, in that example, we hard-coded the synchronization. The first task would print then sleep for 2 seconds and the second task would sleep for 1 second and then print and sleep for 2 seconds. Let's take a look at what that might have looked like if we had used channels.

--tick_tock_channels.lua
local cosock = require "cosock"

-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()

local function task(tx, rx, name)
  while true do
    -- First wait for the other task to tell us it is done
    rx:receive()
    -- print our name
    print(cosock.socket.gettime(), name)
    -- sleep for 1 second
    cosock.socket.sleep(1)
    -- tell the other task we are done
    tx:send()
  end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
  task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
  task(tick_tx, tock_rx, "tock")
end)
-- prime the tick task to start first
tick_tx:send()

cosock.run()

In this version, we only have one definition for a task, looping forever, it will first wait to receive the signal, and then it prints its name, sleeps for 1 second and then sends the signal on to the next task. The key to making this all work is that we need to kick the process off by telling one of the tasks to start. Since a channel allows for multiple senders, it is ok that we call send in more than one place.

Now say we wanted to extend our little clock emulator to print "clack" every 60 seconds to simulate the minute hand moving. That might look something like this:

--tick_tock_clack.lua
local cosock = require "cosock"

-- We create 2 pairs of channels so our two task can send messages
-- back and forth
local tick_tx, tick_rx = cosock.channel.new()
local tock_tx, tock_rx = cosock.channel.new()
local clack_tx, clack_rx = cosock.channel.new()

local function task(tx, rx, name)
  while true do
    -- First wait for the other task to tell us the count
    local count =  rx:receive()
    -- print our name
    print(cosock.socket.gettime(), name)
    -- sleep for 1 second
    cosock.socket.sleep(1)
    -- tell the other task we are done
    if count >= 59 then
      clack_tx:send(0)
    else 
      tx:send(count + 1)
    end
  end
end
-- spawn the task to print tick every two seconds
cosock.spawn(function()
  task(tock_tx, tick_rx, "tick")
end)
-- spawn the task to print tock every 2 seconds
cosock.spawn(function()
  task(tick_tx, tock_rx, "tock")
end)
cosock.spawn(function()
  task(tick_tx, clack_rx, "clack")
end)
-- prime the tick task tp start first
tick_tx:send(1)

cosock.run()

Here we have updated the tasks to now share a counter across our channel. So at the start of each loop iteration, we first get the current count from our other task. We again print our name and then sleep for 1 second but now if the count is >= 59 we send the count of 0 to our "clack" task which will always then send a 1 to the "tick" task to start the whole process over again. Just to make sure it is clear, we can use the send half of the "tick" task's channel in 3 places, the main thread to "prime" the clock, the "tock" task and the "clack" task.

It is very important that we don't try and use the receiving half of a channel in more than one task, that would lead to potentially unexpected behavior. Let's look at an example of how that might go wrong.

-- bad_news_channels.lua
local cosock = require "cosock"

local tx, rx = cosock.channel.new()

cosock.spawn(function()
  rx:receive()
  print("task 1")
end)

cosock.spawn(function()
  rx:receive()
  print("task 2")
end)

tx:send()

cosock.run()

In this example, we create one channel pair and spawn two tasks which both call receive on our channel and just before run we call send. Since the choice for which task should run at which time is left entirely up to cosock, we can't say for sure which of these tasks will actually receive. It might print "task 1" or "task 2".

In actuality, cosock assumes that receive will only ever be called from the same coroutine. Calling receive in multiple coroutines will (eventually) raise in a error.