Bounded Channel
To start we are going to define a Lua table that will represent our shared bounded queue.
local BoundedChannel = {}
BoundedChannel.__index = BoundedChannel
This table should have the following properties
_wakers
: This is a table with 2 keyssendr
: This is a map of potential functions where the keys are a table representing the waiting senderrecvr
: An optional function that takes no arguments, this will wake our receiver
_max_depth
: This is the integer value that our queue should not grow larger than_msg_queue
: This is the message queue we will use to hold pending messages_closed
: This is a boolean to indicate if we have been explicitly closed
To make our lives easier, next we will add a couple of methods that will enforce the
queue nature of our _msg_queue
, one for removing the oldest message and one for
adding a new message.
function BoundedChannel:pop_front()
return table.remove(self._msg_queue, 1)
end
--- Add a new element to the back of this channel
function BoundedChannel:push_back(ele)
table.insert(self._msg_queue, ele)
end
Next, let's add a method for testing if we can send a new message.
function BoundedChannel:can_send()
if self._closed then
-- Check first that we are not closed, if we are
-- return an error message
return nil, "closed"
end
if #self._msg_queue >= self._max_depth then
-- Check next if our queue is full, if it is
-- return an error message
return nil, "full"
end
-- The queue is not full and we are not closed, return 1
return 1
end
This method first checks that we haven't been closed, if we have then it returns
nil, "closed"
, if we are still open it next checks to see if we have any space in
our queue, if not it returns nil, "full"
. So if we are not closed and not full
it returns 1.
Now we should create a similar method for checking if we can receive a new message.
function BoundedChannel:can_recv()
if self._closed and #self._msg_queue == 0 then
-- Check first that we haven't closed, if so
-- return an error message
return nil, "closed"
end
if #self._msg_queue == 0 then
-- Check next that we have at least 1 message,
-- if not, return an error message
return nil, "empty"
end
-- We are not closed and we have at least 1 pending message, return 1
return 1
end
Again, we being by checking for the closed case, returning nil, "closed"
, then we
check to see if the queue is empty, if so we return nil, "empty"
if there is
at least 1 message and we aren't closed then we return 1
.
Now we can define a few methods for interacting with our self._wakers
property.
First up is the set_waker_recvr
method.
function BoundedChannel:set_waker_recvr(waker)
self._wakers.recvr = waker
if waker and self:can_recv() then
-- Check if receiving is currently available, if
-- so call the waker to wake up the yielded receiver
waker()
end
end
Ok, so this one is pretty simple now that we have our helpers. First, we populate the
value in self._wakers.recvr
. If waker
is not nil
and self:can_recv
returns 1
we want to
immediately call the waker
because it means we are ready to be woken up.
For our next method, we are going to define set_waker_sendr
.
function BoundedChannel:set_waker_sendr(sendr, waker)
self._wakers.sendr[sendr] = waker
end
This looks quite a bit different from set_waker_recvr
! First of all, we have an extra argument
sendr
which will represent a unique call to coroutine.yield
and is how we can allow for
multiple senders. The second thing to notice is that we are not checking to see if self:can_send
this is because we don't know if another waker
has already been woken up for the current state.
This is all a bit hand-wavy right now but when we implement the sender things should become clear.
Now that we can set a waker, it is time to add a method for calling those waker functions.
function BoundedChannel:try_wake(kind)
local waker
if kind == "sendr" then
_, waker = next(self._wakers.sendr)
else
waker = self._wakers.recvr
end
if type(waker) == "function" then
waker()
end
end
Our new try_wake
method takes 1 argument, which will either be the string "sendr"
or "recvr"
.
If we are trying to wake a "sendr"
then we use
the next
function to find 1 entry in the
table self._wakers.sendr
, if we have at least 1 entry in that table we assign the value to waker
.
If we are trying to wake a "recvr"
we assign self._wakers.recvr
to waker
. If waker
is
a function (aka not nil
) then we call that function.
function BoundedChannel:close()
self._closed = true
end
close
will just set our _closed
property
to true
Ok, now that we have our shared channel defined, let's implement our receiver.