Naive async implementation in Fennel
Lately, I’ve been into understanding asynchronous programming. Since my background is mostly bare metal C, which has no asynchronous programming whatsoever (apart from running on multiple chips, and communicating via shared memory), and I did only a little bit of C++ and Rust, it’s fair to say, that async is pretty new for me. And because I now work in Clojure, I sometimes have to use asynchronous programming libraries like core.async, and manifold. Both are great libraries, providing ways to write asynchronous applications and stay sane.
Clojure has really good facilities for writing concurrent programs, as it was designed with that in mind. Fennel, on the other hand, wasn’t, as it targets Lua, and Lua doesn’t have multithreading capabilities at all. Because of that, there are no concurrency primitives in the Lua language, besides coroutines. Coroutines, however, provide a certain degree of multitasking but run on a single thread. It is possible to implement some kind of multitasking via coroutines, but it requires writing a program in a certain way.
I’ve decided to try implementing something similar to Clojure’s core.async
for Fennel, but probably naive and much simpler, and see where I can get with it.
This article will go into detail on how to implement a rather simple event loop, with a way of spawning concurrent tasks, which rely only on coroutines.
Writing an event loop
The event loop in our case will be an infinite loop, that cycles through all tasks over and over, executing each in a blocking way.
The only way of writing a non-blocking code in Lua is to use coroutine.create
and then coroutine.yield
to temporarily stop the execution.
Let’s start with a task queue:
(local queue [])
For our purposes, just a sequential table will do.
The queue
will hold coroutines, and we need to check their statuses in the loop.
Let’s write the loop itself:
(fn worker []
(while true
(each [i task (ipairs queue)]
(match (coroutine.status task)
:suspended (coroutine.resume task)
_ (table.remove queue i)))))
If the coroutine is suspended
, we resume
it, and go to the next one, once it yields.
If the coroutine has died, we remove it from the task queue to keep the queue clean.
Now we need a way to spawn coroutines. Let’s write a function, that accepts a function, creates the coroutine, and puts it in our queue:
(fn spawn [task]
(let [task (coroutine.create task)]
(table.insert queue task)
task))
That’s pretty much it!
Upon invocation, this function would return something like #<thread: 0x564c2c5e2ae8>
, which is a newly created thread object.
Not to be confused with real threads, this is just the way Lua refers to coroutines.
We now have a very, very simple event loop, that we can use to spawn coroutines like this:
(spawn (fn [] (for [i 1 5] (print "foo" i) (coroutine.yield))))
;; #(...) is the same as (fn [] ...) but shorter
(spawn #(for [i 1 5] (print "bar" i) (coroutine.yield)))
Right now nothing happens, because we need to start the worker
to see the result.
(worker)
We should see something like this:
foo 1
bar 1
foo 2
bar 2
foo 3
bar 3
foo 4
bar 4
foo 5
bar 5
As can be seen, both tasks were executed simultaneously, because each task pauses itself on each iteration step, so other tasks had a chance to run. It is an example of collaborative (preemptive) multithreading. We can’t spawn real threads in stock Lua runtime without special libraries that provide process-based multithreading.
But these libraries spawn separate lua
instances that can communicate via message-passing.
Coroutines on the other hand can share states via global variables, which is a bit more performant, as you don’t have to serialize and deserialize data.
Although, because everything runs in a single process, there’s no real performance gain, when compared to real threading, so where the performance matters, message-passing between real threads is the way to go.
Let’s leave a message passing out for now, as right now we don’t have a nice way of working with these tasks interactively.
Right now, since we’ve started the worker
, our tasks are completed, and the worker now loops in an infinite empty cycle, blocking our REPL.
Because of this, it is quite cumbersome to work, so let’s write a REPL that will work asynchronously in our event loop alongside other tasks.
Writing an asynchronous REPL
The REPL code below is based on Fennel’s REPL test code but slightly simplified for the purpose of this article. For those unfamiliar with Lisps, REPL stands for Read Eval Print Loop and is something like an interactive shell for working with a running Lisp process. Fennel has an inbuilt REPL, and we can reuse it, but we need to make sure, that its looping aspect doesn’t block our task queue when we wait for input.
Let’s start by declaring prompt variables:
(local prompt-ready "repl>> ")
(local prompt-multi ".. ")
(var prompt prompt-ready)
I’m deliberately changing the standard ">> "
prompt to "repl>> "
, so we could see that we’re in our special kind of REPL a bit easier.
There are two separate prompts here, and we will change the type of our REPL receives an unfinished expression, indicating that the REPL still waits for input.
Next, let’s set up the REPL itself:
(local fennel (require :fennel))
(fn repl [options]
(fn send []
(var output []) ; ➊
(fennel.repl
{:readChunk (fn [{: stack-size}]
(set prompt (if (< 0 stack-size) ; ➋
prompt-multi
prompt-ready))
(when (> (length output) 0)
(io.write (table.concat output "\t") "\n"))
(let [chunk (coroutine.yield)] ; ➌
(set output [])
(and chunk (.. chunk "\n"))))
:onValues (fn [x]
(table.insert output (table.concat x "\t")))
:onError (fn [_ e]
(table.insert output (.. "error: " e)))
:pp (fn [x]
(fennel.view x))}))
(let [send (coroutine.wrap send)]
(send) ; ➍
send))
Let’s look at it step by step.
First, we define an output
variable ➊.
It will store the current output, which we will use in the readChunk
method, which we can see in the table, being passed to fennel.repl
call.
This method accepts a table with REPL’s state, and we’re interested in the stack-size
value.
If it is greater than 0
, we set the prompt to multiline prompt ..
, and to repl>>
otherwise ➋.
Next, we check if the output
table contains anything.
If it does, we concatenate the elements and print it to standard out.
After which we yield
➌ from the REPL until new input arrives.
This yield
right there is what allows us to use the REPL in a non-blocking way.
The rest methods handle values and errors that happen in the REPL, and pp
pretty-prints values for display.
Lastly, we wrap
our function, execute it to start the REPL ➍ and return the wrapped function.
The resulting function is called send
, which sends the input to the REPL for evaluation.
But how can we get input?
Especially since we don’t want to block other tasks from running.
Input polling
We can get the input with the io.read
function, and luckily for us, this function works with the stdin buffer:
>> (io.read 0)
1234
""
>> (io.read "l*")
"1234"
We’ve called io.read(0)
which reads 0
characters from standard input.
It returned an empty string because that’s what zero characters are.
Next, we called it again but with the "l*"
argument, which means to read a complete line, and the result was "1234"
- the data that we’ve previously sent via stdin, and we’ve successfully read it back.
But this has a problem - we don’t know if stdin already has something in the buffer, so if the buffer is empty, the (io.read "l*")
call will block until the input is supplied.
Unfortunately, Lua runtime doesn’t have any kind of input polling, and its io
module is very minimalist.
So we need a way to check if stdin
has anything in the buffer.
For this task we can use luaposix
library which can be installed via luarocks
:
$ luarocks install --local luaposix
This library provides the rpoll
function, which can be used to check if a file descriptor contains anything ready to be read.
Let’s write a task for our worker
that will first set up a REPL, and then go into an infinite loop, polling for the input:
(local p (require :posix))
(fn async-repl []
(let [send (repl)]
(io.write prompt)
(io.flush)
(while true
(when (< 0 (p.rpoll p.STDIN_FILENO 0))
(send (io.read "l*"))
(io.write "\r" prompt)
(io.flush))
(coroutine.yield))))
(spawn async-repl)
(worker)
The p.rpool
function checks if p.STDIN_FILENO
has any data in the buffer.
When this function returns a non-zero value, we read
the whole line from stdio, and send
it to the REPL.
Then we display the prompt
, flush
, and finally yield
so other spawned processes could cooperate.
Let’s try it out.
The code up to this point can be saved to a file, called async.fnl
so we could easier run it from the shell:
$ fennel async.fnl
repl>> (+ 1 2 3)
6
repl>> (+ 1
.. 2
.. 3)
6
repl>>
Now, when we have the REPL, and it’s working we can spawn
some processes!
repl>> (spawn #(for [i 1 5] (print i) (coroutine.yield)))
error: global 'spawn' is not callable (a nil value)
Uh oh, the spawn
function is not known by our REPL.
But that’s easy to fix.
As you may remember, coroutines can access the global state, and we just need to make spawn
function and the queue
globals:
(global queue [])
(fn _G.spawn [task]
(let [task (coroutine.create task)]
(table.insert queue task)
task))
With this change we can start spawning processes from our REPL:
$ fennel async.fnl
repl>> (spawn #(for [i 1 5] (print i) (for [i 1 1000000] (coroutine.yield))))
#<thread: 0x56055ef88ae8>
repl>> 1
2
3
(print :foo)
foo
repl>> 4
5
repl>>
As you can see, I’ve added an empty loop with coroutine.yield
call to slow down the process, so I could input the (print :foo)
expression.
Because our system toggles between processes, we were able to input the expression and see the result without waiting until the first task completes.
This basically means that our non-blocking REPL works.
Though, unfortunately, the prompt gets in the way when printing from different threads.
As a workaround, the print
function can be redefined to clear the line before printing, but it’s not a proper solution either.
Communicating between threads
The last missing point for our system is a way to communicate between tasks/threads. While global variables can be used by coroutines as a way of sharing the state, it is a really poor choice. Global variables don’t have any concurrency semantics and are especially very hard to debug. Another problem is that we will have to implement synchronization primitives ourselves each time we interact with globals, which is not good.
There are two things that can help us solve this problem. First are so-called promises, which can be blocked upon reading if they weren’t realized, and the other one is a channel. Promises are easy to implement, and we can actually write one like this:
(fn _G.promise []
{:val nil
:deliver (fn [self val]
(if (= self.val nil)
(do (set self.val val)
true)
false))
:deref (fn [self]
(while (= nil self.val)
(coroutine.yield))
self.val)})
It’s a really simple implementation, but we can see that it works as expected:
repl>> (local p (promise))
nil
repl>> (spawn #(print (.. "promise: " (p:deref))))
#<thread: 0x561ae3b49568>
repl>> (p:deliver 42)
true
repl>> promise: 42
repl>>
As we can see, we’ve created a thread, that parked itself when it tried to dereference the promise, and the moment we delivered the promise, we saw the message from that thread.
Note: Once again,
repl>>
prompt got in the way, and it will be a common thing in all following examples. I’ve thought about removing the prompt from output manually but decided not to, as if someone will try this code, they might think that something is wrong, as the output will differ from the one in this article.
This is a handy construct when we need to deliver value from one thread to another, without blocking our program, but after the delivery is complete, the promise becomes useless.
Because we can’t deliver a promise twice - once it was delivered, the value is set in stone1.
What if we need to pass several values from one thread to another?
Of course, we can deliver
a table with values to our promise, but what if we don’t have all the necessary data to produce these values right now?
Channels are really good for this - we can put as many values as we want to a named channel and take from it in another thread, parking the reading thread, if the channel is empty, or putting a thread if the channel is full in case of backpressure.
Let’s create the chan
, take
and put
functions:
(fn _G.chan [size]
{:size (or size math.huge)
:buf []})
For now, chan
will simply create a table with our channel buffer buf
stored as a sequential table, and optional size
, indicating channel size and acting as back pressure.
Now we can write a non-blocking put
and take
:
(fn _G.put [{: buf : size} value]
(assert (not= nil value) "value must not be nil")
(while (>= (length buf) size)
(coroutine.yield))
(table.insert buf value)
true)
(fn _G.take [{: buf}]
(while (= 0 (length buf))
(coroutine.yield))
(table.remove buf 1))
You can see, that our put
will wait until the length
of the channel’s buffer is less than its maximum size
.
Similarly, take
will wait for elements to appear.
These operations are non-blocking, which means, that they must be used only in the processes we’re created via the spawn
function.
Let’s try this out:
repl>> (local c (chan 3))
nil
repl>> (spawn #(for [i 1 10] (put c i) (print (.. "put: " i))))
#<thread: 0x564b48408608>
repl>> put: 1
put: 2
put: 3
First, we’ve created a channel c
with a size of 3.
Next, we’ve spawned a numeric for loop, that will try putting indexes to the channel, and printing the values afterward.
You can see that right after that the put: 1
to put: 3
messages appeared, indicating that we’ve successfully put 3 values to our channel c
.
However, the values 4
to 10
were not put into the channel, because of the backpressure.
Let’s spawn a single take
from the channel:
repl>> (spawn #(print (.. "take: " (take c))))
#<thread: 0x564b483f0ae8>
repl>> take: 1
put: 4
As can be seen, we’ve taken the value 1
from the channel, and immediately the next value was put to it, as shown by put: 4
message.
Let’s spawn another loop, that repeatedly takes values from our channel and prints them:
repl>> (spawn #(while true (print "take: " (take c))))
#<thread: 0x564b483dbeb8>
repl>> take: 2
take: 3
take: 4
put: 5
put: 6
put: 7
take: 5
take: 6
take: 7
put: 8
put: 9
put: 10
take: 8
take: 9
take: 10
Aha! Now we can see that this spawned task took 3 elements from the channel and parked itself, waiting for elements. Then our other loop inserted values 5, 6, and 7 into the channel, and parked itself because of the back pressure. And the cycle repeated itself until the putting loop was exhausted.
We can spawn another put
, and we’ll immediately see take
:
repl>> (spawn #(put c 42))
#<thread: 0x564b48408608>
repl>> take: 42
repl>>
Because the size of the channel was set to 3
, we saw puts and takes in groups of three.
If we create a channel with unlimited buffer size, we can potentially block our program if some process would decide to put an infinite amount of values to the channel.
So it is better to have channels with some buffering, to prevent that from happening.
We can create two more channels to illustrate that:
repl>> (local c-no-bp (chan))
nil
repl>> (do (spawn #(for [i 1 5] (put c-no-bp i) (print (.. "put: " i))))
.. (spawn #(while true (print (.. "take: " (take c-no-bp))))))
#<thread: 0x5607d13a0eb8>
repl>> put: 1
put: 2
put: 3
put: 4
put: 5
take: 1
take: 2
take: 3
take: 4
take: 5
repl>>
As can be seen, the first for
loop puts all of its elements into a channel c-no-bp
, which has no backpressure, and only then the next loop takes each of these elements.
On the other hand, if we create a channel with the size of 1
, we’ll get this output:
repl>> (local c1 (chan 1))
nil
repl>> (do (spawn #(for [i 1 5] (put c1 i) (print (.. "put: " i))))
.. (spawn #(while true (print (.. "take: " (take c1))))))
#<thread: 0x5607d1365f78>
repl>> put: 1
take: 1
put: 2
take: 2
put: 3
take: 3
put: 4
take: 4
put: 5
take: 5
repl>>
Channel with a fixed size can act as a synchronization primitive to your program.
Alternatively, if you really need an unlimited buffer, you could use coroutine.yield
directly in the putting loop, to put one value at a time.
Practical example
Now, when our system is mostly ready, let’s create a practical example with our channels!
For this, I’ve decided to use Love2d - a game engine, that can run Lua directly. Love2d actually has its own implementation of threads, that work similarly to what I’ve described at the beginning of this article - by passing messages, serializing, and deserializing data. The real reason is that I want to draw something using collaborative multitasking, and I could as well take Fengari, but I don’t want to mess with the web until necessary.
So let’s put all our code to the file, and set up the LÖVE project, we’ll only need one file main.fnl
for this.
First let’s add our spawn
, worker
, and channel-related code to the main.fnl
file:
(local queue [])
(fn spawn [task]
(let [task (coroutine.create task)]
(table.insert queue task)
task))
(fn chan [size]
{:size (or size math.huge)
:buf []})
(fn put [{: buf : size} value]
(assert (not= nil value) "value must not be nil")
(while (>= (length buf) size)
(coroutine.yield))
(table.insert buf value)
true)
(fn take [{: buf}]
(while (= 0 (length buf))
(coroutine.yield))
(table.remove buf 1))
(fn worker []
(each [i task (ipairs queue)]
(match (coroutine.status task)
:suspended (coroutine.resume task)
_ (table.remove queue i))))
Note that the worker
no longer has the while
loop in it, because it will be handled by love.draw
function.
Now, let’s set up our channel, and some threads:
(local c (chan 1))
(let [(width height) (love.graphics.getDimensions)]
(for [i 0 (/ width 8)]
(for [j 0 (/ height 8)]
(spawn #(while true
(put c [(math.random)
(math.random)
(math.random)])))
(spawn #(while true
(love.graphics.setColor (take c))
(love.graphics.rectangle :fill i j 1 1))))))
We’ve created a channel with a buffer of size 1
.
Next, for each point on the canvas, we spawn
a thread, that will try to put a random color to the channel.
Our channel will refuse to take more than 1 color at a time, so these threads will park once it’s full.
The next thread reads one color from the channel and draws a rectangle with this color.
Next, we need to draw all of this:
(fn love.load []
(love.window.setTitle "Async")
(love.window.setMode 640 480 {:resizable false :vsync false}))
(fn love.draw []
(love.graphics.scale 8 8)
(worker))
The love.load
simply sets the window parameters and love.draw
sets the scaling and calls our worker
, which handles our threads.
Compiling this code with fennel -c main.fnl > main.lua
and running love .
in this directory should result in something like this:
The color for each square is set by a separate thread, and the squares are also drawn by separate threads.
As you can see here, each thread is a while
loop, that is being parked by calls to put
and take
, each time our channel c
is full.
Thus, we’ve almost fully abstracted coroutines away.
Further up
This was a very basic implementation that can be expanded upon if one wants to.
Obviously, it’s far from being complete, but even in the current state, you can write code with some sort of multitasking in mind.
One thing to improve is our implementation of the worker
function, which loops over all threads.
Right now it’s just an each
loop, but a proper scheduler might reorder tasks in a more efficient way, depending on how tasks perform.
The key problem with this system is that your main application must be running inside this worker as well as one of the threads.
Because if you want to block on waiting for a promise, you can’t use a conventional while loop, as it will stop worker
from running, and your program will hang.
This is something that just needed to be kept in mind though, and I believe it is possible to write programs in such a way.
Additionally to parking on channel or promise operations, a set of macros can be created to replace while
, for
, each
, and other blocking things with non-blocking variants.
Such macros will allow running loops in threads without blocking other threads:
(macro pwhile [condition ...]
`(while ,condition
(do ,...)
(coroutine.yield)))
(macro pfor [bindings ...]
`(for ,bindings
(do ,...)
(coroutine.yield)))
(macro peach [bindings ...]
`(each ,bindings
(do ,...)
(coroutine.yield)))
Alternatively, a single macro, that wraps user code can be created, that will analyze code for iteration constructs, and upgrade them to use coroutine.yield
.
This is somewhat similar to what Clojure’s core.async
does with its go
macro, which transforms the code.
Overall, I think it is an interesting topic to explore, and I might create a library like this if there will be enough interest.
-
Nothing is really set in stone, as this is a mutable object, and you can break this guarantee by mutating it. This can be avoided by using proxy tables with special
__newindex
metamethod, but I’ve left this out for the purpose of this article. ↩︎