Bounded Channel: Finish

At this point, we have most of our BoundedChannel and all of our BoundedChannelReceiver and BoundedChannelSender set up so the last thing we need to do is add a constructor to BoundedChannel.

function BoundedChannel.new(max_depth)
  local link = setmetatable({
    _max_depth = max_depth,
    _wakers = {
      sendr = {},
    },
    _msg_queue = {},
    _closed = false,
  }, BoundedChannel)
  return BoundedChannelSender.new(link), BoundedChannelReceiver.new(link)
end

This constructor takes 1 argument, the number telling us how large our queue can get. This returns 2 tables, the first return is a BoundedChannelSender and the second return is a BoundedChannelReceiver both have the same shared BoundedChannel as their _link property.

Now let's see our new channel in action!

local cosock = require "cosock"
local BoundedChannel = require "examples.bounded_channel"

local tx, rx = BoundedChannel.new(2)

cosock.spawn(function()
  local s = cosock.socket.gettime()
  for i=1,10 do
    tx:send(i)
    local e = cosock.socket.gettime()
    print(string.format("sent %s in %.1fs", i, e - s))
    s = e
    cosock.socket.sleep(0.2)
  end
end, "sendr1")

cosock.spawn(function()
  local s = cosock.socket.gettime()
  for i=11,20 do
    tx:send(i)
    local e = cosock.socket.gettime()
    print(string.format("sent %s in %.1fs", i, e - s))
    s = e
    cosock.socket.sleep(0.2)
  end
end, "sendr2")

cosock.spawn(function()
  local s = cosock.socket.gettime()
  for i=1,20 do
    local msg = rx:receive()
    local e = cosock.socket.gettime()
    print(string.format("recd %s in %.1fs", msg, e - s))
    s = e
    cosock.socket.sleep(1)
  end
end, "recvr")

cosock.run()

After we import both cosock and our BoundedChannel we create a new channel pair with a maximum queue size of 2. We then spawn 2 new tasks for the sender, in these tasks we loop 10 times, sending a message and then sleeping for 0.2 seconds. We have a call to cosock.socket.gettime here before and after the send to see if there is any delay.

Next, we spawn a task for our receiver, this receives a message and then sleeps for 1 second 20 times.

Since we are sending a lot faster than we are receiving, we would expect that after the first few messages we should see the amount of time it takes to send a message hits about 1 second indicating that our queue has reached its maximum of 2. If we were to run this we should see something like the following.

sent 1 in 0.0s
sent 11 in 0.0s
recd 1 in 0.0s
sent 2 in 0.2s
recd 11 in 1.0s
sent 12 in 1.0s
recd 2 in 1.0s
sent 13 in 1.0s
recd 12 in 1.0s
sent 14 in 1.0s
recd 13 in 1.0s
sent 15 in 1.0s
recd 14 in 1.0s
sent 16 in 1.0s
recd 15 in 1.0s
sent 17 in 1.0s
recd 16 in 1.0s
sent 18 in 1.0s
recd 17 in 1.0s
sent 19 in 1.0s
recd 18 in 1.0s
sent 20 in 1.0s
recd 19 in 1.0s
sent 3 in 9.8s
recd 20 in 1.0s
sent 4 in 1.0s
recd 3 in 1.0s
sent 5 in 1.0s
recd 4 in 1.0s
sent 6 in 1.0s
recd 5 in 1.0s
sent 7 in 1.0s
recd 6 in 1.0s
sent 8 in 1.0s
recd 7 in 1.0s
sent 9 in 1.0s
recd 8 in 1.0s
sent 10 in 1.0s
recd 9 in 1.0s
recd 10 in 1.0s

From this output, we can determine the exact order things played out. First, we can see that sendr1 is able to push 1 onto the queue, then it sleeps for 0.2 seconds which allows sendr2 to push 11 onto the queue which also sleeps for 0.2 seconds then recvr pops 1 off the queue followed by a 1-second sleep.

For the first time at this point, we have 3 sleeping tasks. Since sendr1 went first it will be the first to wake from its sleep, pushes 2 onto the queue and then goes back to sleep which allows sendr2 to wake up and try to send 12 but the queue is full ({11, 2}), so it has to wait until recvr wakes up to pop 11 off the queue. Once recvr pops 11 off the queue, we see that sendr2 is able to push 12 but it took 1 full second to do so! Now we will stay in a state where both sendr1 and sendr2 are waiting to send for ~1 second until recvr is able to receive at which point either sendr1 or sendr2 again pushes a new value. Once we reach the last 2 values, we see that our sendrs go quiet because they are all done with their work but recvr still takes another 2 seconds to complete.

Did you notice that sendr2 gets to go far more often than sendr1 at this start and it takes sendr1 9.8 seconds to send 3? This is because of our waker scheme and Appendix A has more on that