Internals

The module cosock.socket.internals is where luasocket gets wrapped into a "cosock aware" table. Initially, a call to passthroughbuilder is used to create a "builder" function. passthroughbuilder takes 2 arguments, recvmethods and sendmethods which are both a table where the keys are a method name and the values are a set-like table of error messages that it would be appropriate to yield for. A good example of one of these is the tcp module's recvmethods.

local recvmethods = {
  receive = {timeout = true},
  accept = {timeout = true},
}

In both of the methods defined here, if we were to get the return value of nil, "timeout" would be a signal to call coroutine.yield and try again. The return value of passthroughbuilder is a function that we will call builder. builder takes 2 arguments method which is a string and an optional transformsrc which is a table, or a function that returns a table, with the following properties.

  • input: this is an optional function that takes the method inputs and returns those inputs potentially transformed.
    • This is only called once, just before we call the luasocket method for the first time
  • blocked: this is an optional function that takes the return values from the method called and returns the input arguments to the next call of the same method
  • output: This is an optional function that will be called with the return values of the method
    • This is only called once, just before we return from the method

Let's use a few examples to go over each of these starting with the input property.

The method receive on a luasocket takes 2 arguments, a pattern string or number indicating how many bytes to try and read for and an optional prefix to put at the front of what was received.

local socket = require "luasocket"
local t = socket.tcp()
t:connect("0.0.0.0", 8000)
print(t:receive("*l", "next line:"))

Assuming that some server is listening on port 8080 of the machine we run this on, we would receive 1 line, for example, "ping\n" this would print "next line: ping". As we will get into later, a call to cosock's receive may end up calling luasocket's receive until we get to a new line character. So what if our server sent 1 byte at a time? We would end up printing "next line: pnext line: inext line: nnext line: g" if we passed the second argument to the underlying luasocket. To avoid this we can use the input property to store this pattern and only add it once to the eventual return value.

Now let's consider the blocked property, continuing to use receive as our example method, what happens if we call t:receive(10) and again our server returns 1 byte at at time?

We can't call the underlying luasocket method with 10 over and over, that would result in us requesting too many bytes from the socket. Instead, we need a way to capture the partial value we received and reduce the number of bytes accordingly. Thankfully luasocket returns any partial data on error as the 3rd return argument so we could do something like

{
  blocked = function(success, err, partial)
    table.insert(shared_buffer, partial)
    remaining_recv = remaining_recv - #partial
    return remaining_recv 
  end
}

This example assumes that shared_buffer and remaining_recv exist somewhere but you can see that we are appropriately reducing the number of bytes we return here. This will eventually be the argument provided to the next call to the luasocket method. Here is a longer-form example of how a response of 1 byte at a time would look for our luasocket.

local shared_buffer = {}
local remaining_recv = 5
--p
local _, err, chunk = t:receive(remaining_recv)
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--i
err, chunk = t:receive(remaining_recv) --i
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--n
err, chunk = t:receive(remaining_recv) --n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--g
err, chunk = t:receive(remaining_recv) --g
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)
--\n
err, chunk = t:receive(remaining_recv) --\n
remaining_recv = remaining_recv - #chunk
table.insert(shared_buffer, chunk)

Finally we have the output property which gets called with the last return values from our method. If we complete our example, this is where we would end up calling table.concat(shared_buffer) to add all the chunks together before returning.

Continuing to use receive as an example, this is what the transform argument might look like.

local recvmethods = {
  receive = {timeout = true}
}
local sendmethods = {}
-- First we define a builder injecting the method<->error message maps
local builder = passthroughbuilder(recvmethods, sendmethods)
-- Now we can use the builder to define a method that doesn't do any
-- transformations
m.bind = builder("bind")
-- Here we define a method that performs some transformations
m.receive = builder("receive", function()
  local shared_buffer = {}
  local remaining_recv
  local pattern
  return {
    input = function(pat, prefix)
      -- insert the prefix as the first part of our return
      -- value if present
      if prefix then
        table.insert(shared_buffer, prefix)
      end
      if type(pat) == "number" then
        -- we know how many bytes to wait for, set this
        -- for use in blocked
        remaining_recv = pat
      else
        -- store this for use in blocked
        pattern = pat
      end
      -- return only pattern to avoid duplicate prefixes
      return pattern
    end,
    blocked = function(_, err, partial)
      if type(partial) == "string" and #partial > 0 then
        table.insert(shared_buffer, partial)
        -- only reduce remaining_recv if it is a number
        if remaining_recv then
          remaining_recv = remaining_recv - #full
          -- returning the updated remaining receive
          return remaining_recv
        end
        -- otherwise we return the pattern provided to input
        return pattern
      end
    end,
    output = function(full, err, partial)
      -- if the first return is a string with a length > 0 then
      -- add it to the buffer
      if type(full) == "string" and #full > 0 then
        table.insert(shared_buffer, full)
      end
      -- if the third return is a string with a length > 0 then
      -- add it to the buffer
      if type(partial) == "string" and #partial > 0 then
        table.insert(shared_buffer, partial)
      end
      -- concatenate all the strings together
      local all = table.concat(shared_buffer)
      if err then
        -- if ther was an error it should go in the 3rd return
        -- position
        return nil, err, all
      else
        -- if not error then it should go in the 1st return
        -- position
        return all
      end
    end
  }
end)

With the arguments defined, we can now discuss the return value of builder which will be a third function, this one being the method's implementation, its first argument is self and varargs are used to allow for any additional arguments.

Let's pause here and go over this because 3 levels of functions can be a bit difficult to follow. Our goal here is to re-use as much as possible for each of the methods on a cosock socket and since yield -> retry loop is going to be a common pattern we can define all of that in 1 place. The key is that these methods are going to need to know about a few extra pieces which is achieved by the fact that each function's arguments are available to the returned function.

Which means that the receive method would have the following environment.

local recvmethods = {
  receive = { timeout = true }
}
local sendmethods = {}
local method = "receive"
local transformsrc = function() --[[see above]] end

Now let's go over what actually happens in this shared method implementation. First, we capture all of the varargs into a table named inputparams, if the transform object had an input property defined, we then overwrite the variable with {input(table.unpack(inputparams))}. Now that we have our inputs the way they need to be we begin a long-running repeat/until loop.

At the top of the loop we call self.inner_sock[method], inner_sock is the property name for the luasocket on all of the cosock sockets. If the first return from that function is nil we check to see if the second return value can be found in receivemethods or sendmethods, if so we know that we need to yield, so we check if blocked is defined and call that if it is, again overwriting inputparams with the return value.

Now we determine what kind of yield we are going to do, if the second return was found in receivemethods it would be "recvr" if it was found in sendmethods it would be "sendr". Now we set up our arguments for coroutine.yield putting self into recvt if our kind is "recvr" or into sendt if our kind is "sendr". Now we can call coroutine.yield(sendt, recvt, self.timeout) assigning the returns there to recvr, sendr, rterr. If rterr is not nil, we are going to return early, if its value matches the error from our method call (i.e. "timeout" for both) then we return the values from our call to that method.

The last thing we do in this case before heading back to the top of the loop is to assert our kind and the result of cosock.socket.select match, meaning we have a kind of "sendr", the sendr variable is populated, and the recvr variable is unpopulated; or vice versa.


If the first return argument to our method call was not nil then we can exit early transforming the return value with output if that is populated.

The only other function provided by cosock.socket.internals is setuprealsocketwaker which completes the wrapping of our cosock socket.

This function takes the socket table and an optional list of kinds, if kinds is not provided then the default will be both sendr and recvr.

We then define a method on socket called setwaker which is used by cosock to wake up sleeping coroutines (see the integrating chapter for more info). This setwaker will assign the provided waker function to a self.wakers table based on the kind of waker. It also defines a method _wake which takes an argument kind and varargs for any additional arguments. This method will see if self.wakers[kind] is not nil and if so call that with the varargs. It then replaces self.wakers[kind] with nil.