Bounded Recvr
Next, we will define a Lua table that will represent the receiving half of our channel. This
table will have 2 properties _link which will be our BoundedChannel and a timeout which will
be an optional number.
local BoundedChannelReceiver = {}
BoundedChannelReceiver.__index = BoundedChannelReceiver
Ok, let's create a constructor for it, this will take 1 argument which should populate
its _link property.
function BoundedChannelReceiver.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelReceiver)
end
Notice we didn't include a timeout property at all, this is because
we want it to be nil by default. In order to match the same API that
cosock.channel uses let's add a method for setting our timeout.
function BoundedChannelReceiver:settimeout(timeout)
self.timeout = timeout
end
Next, we want to define the setwaker method used by this side of the queue.
function BoundedChannelReceiver:setwaker(kind, waker)
if kind ~= "recvr" then
error("Unsupported wake kind for receiver: " .. tostring(kind))
end
self._link:set_waker_recvr(waker)
end
In this method, we have added a check to make sure it isn't getting called
with a kind of "sendr" since that would be pretty much meaningless. If
we have a valid kind then we pass the waker down to self._link:set_waker_recvr.
Ok, now for the good stuff: the receive method.
function BoundedChannelReceiver:receive()
while true do
local can_recv, err = self._link:can_recv()
if can_recv then
local element = self._link:pop_front()
self._link:try_wake("sendr")
return element
end
if err == "closed" then
return nil, err
end
if err == "empty" then
local _r, _s, err = coroutine.yield({self}, nil, self.timeout)
if err then
return nil, err
end
end
end
end
Alright, there is a lot going on here so let's unpack it. To start we have a long-running
loop that will only stop when we have reached either an error or a new message. Each iteration
of the loop first checks if self._link:can_recv(), if that returns 1, then we call
self._link:pop_front to capture our eventual return value, next we want
to alert any senders that more space has just been made on our queue so we call
self._link:try_wake("sendr"), finally we return the element we popped off ending our loop.
If can_recv returned nil we check to see which err was provided, if it was "closed"
we return that in the error position also ending our loop. If can_recv returns nil, "empty"
then we want to yield until either we get woken by a sender or we have waited for the duration
of self.timeout. We do this by calling coroutine.yield({self}, nil, self.timeout), this
will give up control to cosock until either someone calls BoundedChannel:try_wake("recvr") or our
timeout is reached.
If we recall that coroutine.yield returns a list of ready receivers and a list of senders
or nil, nil and an error message. This means if coroutine.yield returns {self} then
we have a new message so we go to the top of the loop and the next call to self.link:can_recv
should return 1 or nil, "closed". If coroutine.yield returns nil, nil, "timeout" that
means we have yielded for self.timeout.
One final thing we want to make sure is that we are keeping our end of that nil "closed"
bargain, so let's define a closed method.
function BoundedChannelReceiver:close()
self._link:close()
--- If anything is waiting to send, it should wake up with an error
self._link:try_wake("sendr")
end
For this, we first call self.link:close which will set our shared table's _closed property
to true which will ultimately make both can_send and can_recv return nil, "closed". Next,
we want to wake up any sending tasks since they are reliant on us to tell them something has
changed, so we call self._link:try_wake("sendr").
With that we have the complete receiver side of our channel, now let's write up the sender.