Bounded Channel: Sendr

To start, we define our Lua table which represents the sending half of our channel.

local BoundedChannelSender = {}
BoundedChannelSender.__index = BoundedChannelSender

This table will have the same shape as our BoundedChannelReceiver, it will have a _link property and a timeout property; our constructor is nearly identical.

function BoundedChannelSender.new(shared_queue)
  return setmetatable({_link = shared_queue}, BoundedChannelSender)
end

We will also define a settimeout method that looks exactly the same.

function BoundedChannelSender:settimeout(timeout)
  self.timeout = timeout
end

Our close method is also very close to the receiver's implementation.

function BoundedChannelSender:close()
  self._link:close()
  -- If anything is waiting to receieve it should wake up with an error
  self._link:try_wake("recvr")
end

Here, just the argument to self._link:try_wake changed from "sendr" to "recvr".

Now we start to see things become quite different. The first thing to note is that we are not going to define a setwaker method for this table. This may seem strange since it is one of the few things that we need to do to make a "cosock aware" table but if we were to use the same setwaker for all of the places that we call BoundedSender:send, we would end up gumming up the internal workings of cosock. To see how we get around this it would be good to go over the implementation of send.

function BoundedChannelSender:send(msg)
  while true do
    
    local can_send, err = self._link:can_send()
    if can_send then
      self._link:push_back(msg)
      -- Wake any receivers wo might be waiting to receive
      self._link:try_wake("recvr")
      return 1
    end
    if err == "closed" then
      return nil, "closed"
    end
    if err == "full" then
      local wake_t = {
        setwaker = function(t, kind, waker)
          assert(kind == "sendr")
          self._link:set_waker_sendr(t, waker)
        end
      }
      local _r, _s, err = coroutine.yield(nil, {wake_t}, self.timeout)
      if err then
        return nil, err
      end
    end
  end
end

To start, things look a lot like our BoundedRecvr:receive implementation, we have a long-running loop that first calls self._link:can_send, if that returns 1 we use the push_back helper to add the message to our queue and then we try to wake up any yielded "recvr"s, returning 1 to indicate it was successful. If can_send returned an error message and that message was "closed" we return nil, "closed". If can_send returned the error message "full" we want to yield until we can try again.

To prepare for yielding, we first create a table called wake_t this will represent a single call to BoundedSender:send that is yielding. On wake_t we set 1 property and that is the setwaker method which uses assert to raise an error if it was called with a kind of "recvr" and then uses the BoundedChannel:set_waker_sendr method to associate the waker argument with wake_t. By creating this temporary table, what we are doing is allowing for a unique waker function to be defined on any threads that need waking. If we were to use a single BoundedChannel._wakers.sendr function, we would end up removing the ability to wake any yields beyond the last because calling waker always calls setwaker(kind, nil) to avoid potential "double wakes".

Now that we have set up our wake_t we can call couroutine.yield this time we are going to use the arguments nil, {wake_t}, self.timeout. Since we put wake_t in the sendt argument, we will wait until we either reach the duration of self.timeout or when someone calls BoundedChannel:try_wake("sendr") and our wake_t is returned from next.

This is probably the easiest way to create these unique wakers but it does come with a potential issue. If you are interested in using this implementation please review Appendix-A

Now, let's finish up our implementation and see if we can see our bounded channel working.