Bounded Channel

To start we are going to define a Lua table that will represent our shared bounded queue.

local BoundedChannel = {}
BoundedChannel.__index = BoundedChannel

This table should have the following properties

  • _wakers: This is a table with 2 keys
    • sendr: This is a map of potential functions where the keys are a table representing the waiting sender
    • recvr: An optional function that takes no arguments, this will wake our receiver
  • _max_depth: This is the integer value that our queue should not grow larger than
  • _msg_queue: This is the message queue we will use to hold pending messages
  • _closed: This is a boolean to indicate if we have been explicitly closed

To make our lives easier, next we will add a couple of methods that will enforce the queue nature of our _msg_queue, one for removing the oldest message and one for adding a new message.

function BoundedChannel:pop_front()
  return table.remove(self._msg_queue, 1)
end

--- Add a new element to the back of this channel
function BoundedChannel:push_back(ele)
  table.insert(self._msg_queue, ele)
end

Next, let's add a method for testing if we can send a new message.

function BoundedChannel:can_send()
  if self._closed then
    -- Check first that we are not closed, if we are
    -- return an error message
    return nil, "closed"
  end
  if #self._msg_queue >= self._max_depth then
    -- Check next if our queue is full, if it is
    -- return an error message
    return nil, "full"
  end
  -- The queue is not full and we are not closed, return 1
  return 1
end

This method first checks that we haven't been closed, if we have then it returns nil, "closed", if we are still open it next checks to see if we have any space in our queue, if not it returns nil, "full". So if we are not closed and not full it returns 1.

Now we should create a similar method for checking if we can receive a new message.

function BoundedChannel:can_recv()
  if self._closed and #self._msg_queue == 0 then
    -- Check first that we haven't closed, if so
    -- return an error message
    return nil, "closed"
  end
  if #self._msg_queue == 0 then
    -- Check next that we have at least 1 message,
    -- if not, return an error message
    return nil, "empty"
  end
  -- We are not closed and we have at least 1 pending message, return 1
  return 1
end

Again, we being by checking for the closed case, returning nil, "closed", then we check to see if the queue is empty, if so we return nil, "empty" if there is at least 1 message and we aren't closed then we return 1.

Now we can define a few methods for interacting with our self._wakers property. First up is the set_waker_recvr method.

function BoundedChannel:set_waker_recvr(waker)
  self._wakers.recvr = waker
  if waker and self:can_recv() then
    -- Check if receiving is currently available, if
    -- so call the waker to wake up the yielded receiver
    waker()
  end
end

Ok, so this one is pretty simple now that we have our helpers. First, we populate the value in self._wakers.recvr. If waker is not nil and self:can_recv returns 1 we want to immediately call the waker because it means we are ready to be woken up.

For our next method, we are going to define set_waker_sendr.

function BoundedChannel:set_waker_sendr(sendr, waker)
  self._wakers.sendr[sendr] = waker
end

This looks quite a bit different from set_waker_recvr! First of all, we have an extra argument sendr which will represent a unique call to coroutine.yield and is how we can allow for multiple senders. The second thing to notice is that we are not checking to see if self:can_send this is because we don't know if another waker has already been woken up for the current state. This is all a bit hand-wavy right now but when we implement the sender things should become clear.

Now that we can set a waker, it is time to add a method for calling those waker functions.

function BoundedChannel:try_wake(kind)
  local waker
  if kind == "sendr" then
    _, waker = next(self._wakers.sendr)
  else
    waker = self._wakers.recvr
  end
  if type(waker) == "function" then
    waker()
  end
end

Our new try_wake method takes 1 argument, which will either be the string "sendr" or "recvr". If we are trying to wake a "sendr" then we use the next function to find 1 entry in the table self._wakers.sendr, if we have at least 1 entry in that table we assign the value to waker. If we are trying to wake a "recvr" we assign self._wakers.recvr to waker. If waker is a function (aka not nil) then we call that function.

function BoundedChannel:close()
  self._closed = true
end

close will just set our _closed property to true

Ok, now that we have our shared channel defined, let's implement our receiver.