Bounded Recvr
Next, we will define a Lua table that will represent the receiving half of our channel. This
table will have 2 properties _link
which will be our BoundedChannel
and a timeout
which will
be an optional number
.
local BoundedChannelReceiver = {}
BoundedChannelReceiver.__index = BoundedChannelReceiver
Ok, let's create a constructor for it, this will take 1 argument which should populate
its _link
property.
function BoundedChannelReceiver.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelReceiver)
end
Notice we didn't include a timeout
property at all, this is because
we want it to be nil
by default. In order to match the same API that
cosock.channel
uses let's add a method for setting our timeout
.
function BoundedChannelReceiver:settimeout(timeout)
self.timeout = timeout
end
Next, we want to define the setwaker
method used by this side of the queue.
function BoundedChannelReceiver:setwaker(kind, waker)
if kind ~= "recvr" then
error("Unsupported wake kind for receiver: " .. tostring(kind))
end
self._link:set_waker_recvr(waker)
end
In this method, we have added a check to make sure it isn't getting called
with a kind
of "sendr"
since that would be pretty much meaningless. If
we have a valid kind
then we pass the waker down to self._link:set_waker_recvr
.
Ok, now for the good stuff: the receive
method.
function BoundedChannelReceiver:receive()
while true do
local can_recv, err = self._link:can_recv()
if can_recv then
local element = self._link:pop_front()
self._link:try_wake("sendr")
return element
end
if err == "closed" then
return nil, err
end
if err == "empty" then
local _r, _s, err = coroutine.yield({self}, nil, self.timeout)
if err then
return nil, err
end
end
end
end
Alright, there is a lot going on here so let's unpack it. To start we have a long-running
loop that will only stop when we have reached either an error or a new message. Each iteration
of the loop first checks if self._link:can_recv()
, if that returns 1
, then we call
self._link:pop_front
to capture our eventual return value, next we want
to alert any senders that more space has just been made on our queue so we call
self._link:try_wake("sendr")
, finally we return the element we popped off ending our loop.
If can_recv
returned nil
we check to see which err
was provided, if it was "closed"
we return that in the error position also ending our loop. If can_recv
returns nil, "empty"
then we want to yield until either we get woken by a sender or we have waited for the duration
of self.timeout
. We do this by calling coroutine.yield({self}, nil, self.timeout)
, this
will give up control to cosock until either someone calls BoundedChannel:try_wake("recvr")
or our
timeout
is reached.
If we recall that coroutine.yield
returns a list of ready receivers and a list of senders
or nil, nil
and an error message. This means if coroutine.yield
returns {self}
then
we have a new message so we go to the top of the loop and the next call to self.link:can_recv
should return 1
or nil, "closed"
. If coroutine.yield
returns nil, nil, "timeout"
that
means we have yielded for self.timeout
.
One final thing we want to make sure is that we are keeping our end of that nil "closed"
bargain, so let's define a closed
method.
function BoundedChannelReceiver:close()
self._link:close()
--- If anything is waiting to send, it should wake up with an error
self._link:try_wake("sendr")
end
For this, we first call self.link:close
which will set our shared table's _closed
property
to true
which will ultimately make both can_send
and can_recv
return nil, "closed"
. Next,
we want to wake up any sending tasks since they are reliant on us to tell them something has
changed, so we call self._link:try_wake("sendr")
.
With that we have the complete receiver side of our channel, now let's write up the sender.