Bounded Recvr

Next, we will define a Lua table that will represent the receiving half of our channel. This table will have 2 properties _link which will be our BoundedChannel and a timeout which will be an optional number.

local BoundedChannelReceiver = {}
BoundedChannelReceiver.__index = BoundedChannelReceiver

Ok, let's create a constructor for it, this will take 1 argument which should populate its _link property.

function BoundedChannelReceiver.new(shared_queue)
  return setmetatable({_link = shared_queue}, BoundedChannelReceiver)
end

Notice we didn't include a timeout property at all, this is because we want it to be nil by default. In order to match the same API that cosock.channel uses let's add a method for setting our timeout.

function BoundedChannelReceiver:settimeout(timeout)
  self.timeout = timeout
end

Next, we want to define the setwaker method used by this side of the queue.

function BoundedChannelReceiver:setwaker(kind, waker)
  if kind ~= "recvr" then
    error("Unsupported wake kind for receiver: " .. tostring(kind))
  end
  self._link:set_waker_recvr(waker)
end

In this method, we have added a check to make sure it isn't getting called with a kind of "sendr" since that would be pretty much meaningless. If we have a valid kind then we pass the waker down to self._link:set_waker_recvr.

Ok, now for the good stuff: the receive method.

function BoundedChannelReceiver:receive()
  while true do
    local can_recv, err = self._link:can_recv()
    if can_recv then
      local element = self._link:pop_front()
      self._link:try_wake("sendr")
      return element
    end
    if err == "closed" then
      return nil, err
    end
    if err == "empty" then
      local _r, _s, err = coroutine.yield({self}, nil, self.timeout)
      if err then
        return nil, err
      end
    end
  end
end

Alright, there is a lot going on here so let's unpack it. To start we have a long-running loop that will only stop when we have reached either an error or a new message. Each iteration of the loop first checks if self._link:can_recv(), if that returns 1, then we call self._link:pop_front to capture our eventual return value, next we want to alert any senders that more space has just been made on our queue so we call self._link:try_wake("sendr"), finally we return the element we popped off ending our loop. If can_recv returned nil we check to see which err was provided, if it was "closed" we return that in the error position also ending our loop. If can_recv returns nil, "empty" then we want to yield until either we get woken by a sender or we have waited for the duration of self.timeout. We do this by calling coroutine.yield({self}, nil, self.timeout), this will give up control to cosock until either someone calls BoundedChannel:try_wake("recvr") or our timeout is reached.

If we recall that coroutine.yield returns a list of ready receivers and a list of senders or nil, nil and an error message. This means if coroutine.yield returns {self} then we have a new message so we go to the top of the loop and the next call to self.link:can_recv should return 1 or nil, "closed". If coroutine.yield returns nil, nil, "timeout" that means we have yielded for self.timeout.

One final thing we want to make sure is that we are keeping our end of that nil "closed" bargain, so let's define a closed method.

function BoundedChannelReceiver:close()
  self._link:close()
  --- If anything is waiting to send, it should wake up with an error
  self._link:try_wake("sendr")
end

For this, we first call self.link:close which will set our shared table's _closed property to true which will ultimately make both can_send and can_recv return nil, "closed". Next, we want to wake up any sending tasks since they are reliant on us to tell them something has changed, so we call self._link:try_wake("sendr").

With that we have the complete receiver side of our channel, now let's write up the sender.