Bounded Channel: Finish
At this point, we have most of our BoundedChannel
and all of our BoundedChannelReceiver
and BoundedChannelSender
set up so the last thing we need to do is add a constructor
to BoundedChannel
.
function BoundedChannel.new(max_depth)
local link = setmetatable({
_max_depth = max_depth,
_wakers = {
sendr = {},
},
_msg_queue = {},
_closed = false,
}, BoundedChannel)
return BoundedChannelSender.new(link), BoundedChannelReceiver.new(link)
end
This constructor takes 1 argument, the number telling us how large our queue can get.
This returns 2 tables, the first return is a BoundedChannelSender
and the second
return is a BoundedChannelReceiver
both have the same shared BoundedChannel
as
their _link
property.
Now let's see our new channel in action!
local cosock = require "cosock"
local BoundedChannel = require "examples.bounded_channel"
local tx, rx = BoundedChannel.new(2)
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=1,10 do
tx:send(i)
local e = cosock.socket.gettime()
print(string.format("sent %s in %.1fs", i, e - s))
s = e
cosock.socket.sleep(0.2)
end
end, "sendr1")
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=11,20 do
tx:send(i)
local e = cosock.socket.gettime()
print(string.format("sent %s in %.1fs", i, e - s))
s = e
cosock.socket.sleep(0.2)
end
end, "sendr2")
cosock.spawn(function()
local s = cosock.socket.gettime()
for i=1,20 do
local msg = rx:receive()
local e = cosock.socket.gettime()
print(string.format("recd %s in %.1fs", msg, e - s))
s = e
cosock.socket.sleep(1)
end
end, "recvr")
cosock.run()
After we import both cosock
and our BoundedChannel
we create a new channel pair
with a maximum queue size of 2. We then spawn 2 new tasks for the sender, in these tasks
we loop 10 times, sending a message and then sleeping for 0.2 seconds.
We have a call to cosock.socket.gettime
here before and after the send
to see if there is any
delay.
Next, we spawn a task for our receiver, this receives a message and then sleeps for 1 second 20 times.
Since we are sending a lot faster than we are receiving, we would expect that after the first few messages we should see the amount of time it takes to send a message hits about 1 second indicating that our queue has reached its maximum of 2. If we were to run this we should see something like the following.
sent 1 in 0.0s
sent 11 in 0.0s
recd 1 in 0.0s
sent 2 in 0.2s
recd 11 in 1.0s
sent 12 in 1.0s
recd 2 in 1.0s
sent 13 in 1.0s
recd 12 in 1.0s
sent 14 in 1.0s
recd 13 in 1.0s
sent 15 in 1.0s
recd 14 in 1.0s
sent 16 in 1.0s
recd 15 in 1.0s
sent 17 in 1.0s
recd 16 in 1.0s
sent 18 in 1.0s
recd 17 in 1.0s
sent 19 in 1.0s
recd 18 in 1.0s
sent 20 in 1.0s
recd 19 in 1.0s
sent 3 in 9.8s
recd 20 in 1.0s
sent 4 in 1.0s
recd 3 in 1.0s
sent 5 in 1.0s
recd 4 in 1.0s
sent 6 in 1.0s
recd 5 in 1.0s
sent 7 in 1.0s
recd 6 in 1.0s
sent 8 in 1.0s
recd 7 in 1.0s
sent 9 in 1.0s
recd 8 in 1.0s
sent 10 in 1.0s
recd 9 in 1.0s
recd 10 in 1.0s
From this output, we can determine the exact order things played out. First,
we can see that sendr1
is able to push 1
onto the queue, then it sleeps for 0.2
seconds which allows sendr2
to push 11
onto the queue which also sleeps for 0.2
seconds then recvr
pops 1
off the queue followed by a 1-second sleep.
For the first time at this point, we have 3 sleeping tasks. Since sendr1
went first
it will be the first to wake from its sleep
, pushes 2
onto the queue and
then goes back to sleep
which allows sendr2
to wake up and try to send 12
but the queue is
full ({11, 2}
), so it has to wait until recvr
wakes up to pop 11
off the queue. Once recvr
pops 11
off the queue, we see that sendr2
is able to push 12
but it took 1 full second
to do so! Now we will stay in a state where both sendr1
and sendr2
are waiting to send
for ~1 second until recvr
is able to receive
at which point either sendr1
or sendr2
again pushes a new value. Once we reach the last 2 values, we see that our sendr
s go quiet
because they are all done with their work but recvr
still takes another 2 seconds to complete.
Did you notice that
sendr2
gets to go far more often thansendr1
at this start and it takessendr1
9.8 seconds to send3
? This is because of our waker scheme and Appendix A has more on that