Bounded Channel: Sendr
To start, we define our Lua table which represents the sending half of our channel.
local BoundedChannelSender = {}
BoundedChannelSender.__index = BoundedChannelSender
This table will have the same shape as our BoundedChannelReceiver
, it will have a
_link
property and a timeout
property; our constructor is nearly identical.
function BoundedChannelSender.new(shared_queue)
return setmetatable({_link = shared_queue}, BoundedChannelSender)
end
We will also define a settimeout
method that looks exactly the same.
function BoundedChannelSender:settimeout(timeout)
self.timeout = timeout
end
Our close
method is also very close to the receiver's implementation.
function BoundedChannelSender:close()
self._link:close()
-- If anything is waiting to receieve it should wake up with an error
self._link:try_wake("recvr")
end
Here, just the argument to self._link:try_wake
changed from "sendr"
to "recvr"
.
Now we start to see things become quite different. The first thing to note is that we are
not going to define a setwaker
method for this table. This may seem strange since it is
one of the few things that we need to do to make a "cosock aware" table but if we were
to use the same setwaker
for all of the places that we call BoundedSender:send
, we would
end up gumming up the internal workings of cosock. To see how we get around this it would be
good to go over the implementation of send
.
function BoundedChannelSender:send(msg)
while true do
local can_send, err = self._link:can_send()
if can_send then
self._link:push_back(msg)
-- Wake any receivers wo might be waiting to receive
self._link:try_wake("recvr")
return 1
end
if err == "closed" then
return nil, "closed"
end
if err == "full" then
local wake_t = {
setwaker = function(t, kind, waker)
assert(kind == "sendr")
self._link:set_waker_sendr(t, waker)
end
}
local _r, _s, err = coroutine.yield(nil, {wake_t}, self.timeout)
if err then
return nil, err
end
end
end
end
To start, things look a lot like our BoundedRecvr:receive
implementation, we have
a long-running loop that first calls self._link:can_send
, if that returns 1
we
use the push_back
helper to add the message to our queue and then we try to wake up
any yielded "recvr"
s, returning 1
to indicate it was successful. If can_send
returned
an error message and that message was "closed"
we return nil, "closed"
. If can_send
returned
the error message "full"
we want to yield
until we can try again.
To prepare for yielding, we first create a table called wake_t
this will represent a single
call to BoundedSender:send
that is yielding. On wake_t
we set 1 property and that is the
setwaker
method which uses assert
to raise an error if it was called with a kind
of "recvr"
and then uses the BoundedChannel:set_waker_sendr
method to associate the
waker
argument with wake_t
. By creating this temporary table, what we are doing is allowing
for a unique waker
function to be defined on any threads that need waking. If we were to use a
single BoundedChannel._wakers.sendr
function, we would end up removing the ability
to wake any yields
beyond the last because calling waker
always calls setwaker(kind, nil)
to
avoid potential "double wakes".
Now that we have set up our wake_t
we can call couroutine.yield
this time we are going
to use the arguments nil, {wake_t}, self.timeout
. Since we put wake_t
in the sendt
argument, we will wait until we either reach the duration of self.timeout
or when someone
calls BoundedChannel:try_wake("sendr")
and our wake_t
is returned from next
.
This is probably the easiest way to create these unique
waker
s but it does come with a potential issue. If you are interested in using this implementation please review Appendix-A
Now, let's finish up our implementation and see if we can see our bounded channel working.