Select
Now that we have covered how to spawn and run coroutines using cosock, let's talk about how we
could handle multiple IO sources in a single coroutine. For this kind of work, cosock provides
cosock.socket.select
, this function works in a very similar way to luasocket's socket.select
,
to call it would look something like local recvr, sendr, err = cosock.socket.select(recvt, sendt, timeout)
its arguments are
recvt
: This is a list of cosock sockets that are waiting to be ready toreceive
sendt
: This is a list of cosock sockets that are waiting to be ready tosend
timeout
: This is the maximum amount of seconds to wait for one or more entries inrecvt
orsendt
to be ready- If this value is
nil
or negative it will treat the timeout as infinity
- If this value is
Note: The list entries for
sendt
andrecvt
can be other "cosock aware" tables like the lustre WebSocket, for specifics on how to make a table "cosock aware" see the chapter on it
Its return values are
recvr
: A list of ready receivers, any entry here should be free to callreceive
and immediately be readysendr
: A list of ready senders, any entry here should be free to callsend
and immediately be readyerr
: If this value is notnil
it represents an error message- The most common error message here would be
"timeout"
if thetimeout
argument provided is notnil
and positive
- The most common error message here would be
So, how would we use something like this? Let's consider our clients_server.lua
example
from the spawn chapter, where we called cosock.spawn
every time a new
client was accept
ed, this works but we don't have much control over how many tasks we end
up spawning. In large part, this is because we don't know how long each task will run. To achieve
this, we would need to be able to handle all of the client connections on the same task as the
server and to do that, we can use select
.
-- clients_server_select.lua
local cosock = require "cosock"
local socket = require "cosock.socket"
local ip = "0.0.0.0"
local number_of_clients = 10
--- Since the clients and server are in the same application
--- we can use an OS assigned port and share it across the
--- two tasks, to coordinate the two tasks to start in the order
--- we want, we can use a cosock channel to make sure both tasks
--- have the same port number
local port_tx, port_rx = cosock.channel.new()
--- Handle a client being ready to receive
--- @param client cosock.socket.tcp
--- @return integer|nil @1 if successful
--- @return nil|string @nil if successful, error message if not
function handle_recv(client, clients)
local request, err = client:receive()
if not request then
if err == "closed" then
clients[client] = nil
end
return
end
print(string.format("received %q", request))
if request:match("ping") then
print("sending pong")
local s, err = client:send("pong\n")
if err == "closed" then
clients[client] = nil
elseif err then
print("error in recv: " .. tostring(err))
end
else
client:close()
clients[client] = nil
end
end
--- Handle a server being ready to accept
--- @param server cosock.socket.tcp
--- @return cosock.socket.tcp|nil
--- @return nil|string @nil if successful, error message if not
function handle_accept(server, clients)
local client, err = server:accept()
if err and err ~= "timeout" then
error("error in accept: " .. tostring(err))
end
if client then
clients[client] = true
end
end
--- Spawn a task for handling the server side of the socket
cosock.spawn(function()
local server = socket.tcp()
server:bind(ip, 0)
local _ip, p = server:getsockname()
port_tx:send(p)
server:listen()
local clients = {}
server:settimeout(0)
while true do
local recvt = {}
for client, _ in pairs(clients) do
table.insert(recvt, client)
end
if #recvt < 5 then
table.insert(recvt, server)
end
local recvr, _sendr, err = cosock.socket.select(recvt, {}, 5)
if err == "timeout" then
return
elseif err then
error("Error in select: "..tostring(err))
end
for _, sock in ipairs(recvr) do
if sock == server then
print("accepting new client")
handle_accept(server, clients)
elseif clients[sock] then
handle_recv(sock, clients)
end
end
end
end, "server task")
--- A single client task
---@param id integer The task's identifier
---@param port integer The server's port number
local function spawn_client(id, port)
print("spawn_client", id, port)
local client = socket.tcp()
client:connect(ip, port)
for _=1,10 do
print("sending ping", id)
client:send(string.format("ping %s\n", id))
local request = assert(client:receive())
assert(request == "pong")
socket.sleep(0.5)
end
client:close()
end
--- Wait for the port from the server task and then
--- spawn the `number_of_clients` client tasks
local function spawn_clients()
local port = assert(port_rx:receive())
for i=1,number_of_clients do
cosock.spawn(function()
spawn_client(i, port)
end, string.format("client-task-%s", i))
end
end
--- Spawn a bunch of client tasks
cosock.spawn(function()
spawn_clients()
end, "clients task")
--- Finally we tell cosock to run all our coroutines until they are done
--- which should be forever
cosock.run()
The above is an updated version of our clients/server example with some updates to limit the total number of connections to 5, let's go over the changes.
First, we've added a few helper functions to handle the different events in our system,
the first is for when a client connection is ready to receive handle_recv
takes 2 arguments,
client
which is a cosock.socket.tcp
that was returned from a call
to accept
and clients
which is a table where the keys are cosock.socket.tcp
clients
and the values are true
. We first call client:receive
to get the bytes from
the client and if that returns a string that contains "ping"
then we send our
"pong"
message. There are few places where this can go wrong, the call to receive
could return nil
and an error message or not "ping"
or the call to send
could
return nil
and an error message; if the error message is "closed"
or the request
didn't contain "ping"
then we want to remove client
from clients
and if it was
the latter then we want to call client:close
.
Next up we have handle_accept
this also takes 2 arguments server
which is a
cosock.socket.tcp
socket that is listening and the same map of clients
. If a
call to accept
returns a client
then we add that client
into our clients
map.
If accept
returns nil
and err
isn't "timeout"
then we raise an error.
Alright, with these two helper functions we can now update the "server"
task to
handle all of the connected clients w/o having to call spawn
. Our tasks starts
out the same as before, creating a server
socket, binding it to a random port,
gets that port and sends it to our "clients task"
and then calls listen
.
At this point, things start to change, first we define our clients
map as
empty we then use handle_accept
to accept the first connection and then call
server:settimeout(0)
to avoid a potential server that will yield forever.
Inside of our long-running loop, we start out by defining a new table recvt
which
will match the argument to select
which has the same name. We then loop over our
clients
table, inserting any of the keys into recvt
. We keep these as separate
tables because we want to be able to remove a client
from our readiness check
once it has closed. Next, we check to see how large recvt
is, if it is below 5
we add server
into it. By only including server
when recvt
has fewer than
5 clients we have enforced our max connections limit.
With recvt
defined we can finally call cosock.socket.select
, we use recvt
as
the first argument, an empty table as the sendt
argument and finally a timeout of 5 seconds.
We assign the result of select
into recvr, _sendr, err
, we would expect that
recvr
would contain any of our clients
that are ready to receive
and, if
we are below the limit, server
. If recvr
is nil
we would expect err
to be
the string describing that error. If err
is "timeout"
then we exit our server
task which should exit the application. If we don't have an err
then we loop over
all the recvr
s and check to see if they are our server
, if so we call
handle_accept
if not then we call handle_recv
. Each of our helpers will update
the clients
map to ensure that we service all of the client requests before exiting.
The last change we've made is to spawn_client
which previously would loop forever,
it now loops 10 times before exiting and closing the client
.
If we were to run this you would see each of the tasks spawn in a random order and
the first 5 of those would begin sending their "ping"
messages. Once 1 of them
completes, we would accept the next connection but not before that point which means
we have limited our total number of connected clients to 5!