Ad-hoc async in emacs-lisp via generators
TIL that Emacs Lisp has had generators since 2016, and generators are a cool feature in any language in my opinion!
In short, Emacs has the
generator.el package as it’s part since version 25.1.
The generators are implemented via transforming code and act like iterators, even most of the functions are called
Thus we can’t cross iterator boundaries without creating another iterator, so it’s not like they’re proper coroutines but we can still use them as such with these limitations in mind.
As you may have noticed, lately I was fiddling with Clojure’s
core.async library, and implementation of a similar concept for the Fennel language via fennel-async.
Fennel, being a language that compiles to Lua, allows us to use Lua’s coroutines directly for implementing non-blocking operations on channels.
Clojure, on the other hand, doesn’t have proper coroutines, so to implement
core.async they transform the code into a state machine, which then gets executed on a thread pool.
Much like what
Except for the thread pool part.
I really like the concept of CSP (Communicating Sequential Processes) and when I learned that Emacs has generators I wanted to see if it is possible to implement something like
core.async in emacs-lisp.
Now, this isn’t like a proper implementation, but rather a simplified quick prototype of what can be done, and I know that there are a lot of other implementations, like emacs-aio with is built on top of generators, but it uses promises instead of channels, and I like channels more.
I must say that I’m not great at writing Emacs Lisp code that mutates things, because I mostly try to write code in a functional style, to avoid errors that can be caused by mutation, but in this case mutation is the only way to achieve the desired result.
First, let’s describe the overall layout. We will have:
- Channels - mutable places that store data in a sequential buffer;
- Primitives to create lightweight threads that are executed by the scheduler;
- Such threads are also channels.
- Primitives to park threads when we do operations on channels.
That’s basically all we need to implement
core.async-style approach for writing code.
Also, I should note that
lexical-binding to be true, otherwise, it refuses to work, so before we begin, let’s set up it and require the library:
;;; -*- lexical-binding: t -*- (require 'generator)
Let’s start with channels.
We start simple. Let’s create a function that returns a new channel, with an optional size indication, that basically affects the buffering.
(defun a/chan (&optional size) "Create a channel with optional SIZE attribute." (when (and size (< size 1)) (error "size must be greater than 0")) (list size))
I know, I know. This is definitively not a performant way of creating a queue, but it will do for our experiment purposes. We can always remake this in a proper way later, and it with probably require very slight internal API changes.
Next, we need a way to put items to the channel, take items off the channel, and close the channel:
(defun a/chan-status (port) "Return the status of the PORT or its size if any." (car port)) (defun a/put! (port val) "Put value VAL to the channel PORT." (when (null val) (error "value can't be nil")) (let ((size (a/chan-status port))) (when (or (null size) (and (numberp size) (length< port (1+ size)))) (nconc port (list val)) t))) (defun a/take! (port) "Take a value off the PORT" (when (length> port 1) (let ((res (car (last port)))) (nbutlast port) res))) (defun a/close! (port) "Close the PORT." (setcar port :closed))
These functions never block. If we can’t put value to the channel, we simply bail out and don’t do it. If we can’t take value off the channel - again, we bail out. So we can use these functions without ever worrying that they will block, but in reality, we shouldn’t because they don’t respect the fundamental parameter of a channel: backpressure.
In reality, channels should have a separate queue of puts, a queue of takes, and an optional buffer, thus these functions should not drop any data, but to keep things simple, I’ve decided to only have the buffer. We can fix this by introducing a few macros:
(defmacro a/<! (port) "Parking take. Takes a value off the PORT if it is not closed, and has something in the buffer." (let ((loop (gensym)) (p (gensym)) (res (gensym))) `(let ((,p ,port)) (named-let ,loop ((,res (a/take! ,p))) (cond (,res ,res) ((eq :closed (a/chan-status ,p)) nil) (t (iter-yield nil) (,loop (a/take! ,p)))))))) (defmacro a/>! (port x) "Parking put. Puts X onto the PORT if the port is not closed and has room in the buffer. X must not be NIL." (let ((loop (gensym)) (res (gensym)) (p (gensym)) (val (gensym))) `(let ((,p ,port) (,val ,x)) (named-let ,loop ((,res (a/put! ,p ,val))) (cond (,res ,res) ((eq :closed (a/chan-status ,p)) nil) (t (iter-yield nil) (,loop (a/put! ,p ,val))))))))
Why are these macros, you might ask?
Well, because we will be only using these in the generator context, and we can’t call
iter-yield from anywhere but the iterator’s body, in turn, we can’t make these two operations to be functions.
Iter-yield is a transformation point for the iterator’s body, and thus it must be lexically present.
Now, let’s look at how we can use these.
To test this out, we can create a channel and an iterator, which tries to take a value off the channel, and print it:
(defvar c (a/chan 1))
(defvar g (funcall (iter-lambda () (message "val: %s" (a/<! c)))))
Advancing the iterator does nothing, even if we call it multiple times, because
a/<! handles the wait:
(iter-next g) ; => nothing
However, if we put the value to the channel…
(a/put! c 42)
…and then advance the iterator…
…we’ll see the message
"val: 42" and receive the signal
So, as you can see, the
g iterator parked itself at the
a/<! call, and the channel was acting as the synchronization point between the main process and our iterator.
Such an iterator really is like a lightweight thread, with cooperative scheduling, which we hid in the
However, manually advancing iterators isn’t great, so we can build a really simple scheduler that will do that for us.
Our scheduler will use a simple list of processes and it will simply go through and execute each one after another. Before we can write it though, we need a way to spawn our lightweight processes:
(setq a/processes '()) (defun a/spawn (f &rest args) "Start process F by supplying optional ARGS and `push' it to the task pool." (let ((p (apply f args))) (push p a/processes) (iter-next p) p))
Now we can rewrite our previous example like this:
(let ((c (a/chan 1))) (a/spawn (iter-lambda () (message "val: %s" (a/<! c)))) (a/put! c 42))
But we still need to run the process and maintain its state, e.g. when the iterator is exhausted the process should be removed from the processes list. This function takes care of this:
(defun a/run () "Run each process in the task-pool, and remove those that have finished." (dolist (proc a/processes) (condition-case nil (iter-next proc) (iter-end-of-sequence (setq a/processes (delq proc a/processes))))) (length> a/processes 0))
The return value of this function is either
nil, indicating whether there are processes left.
The simplest event loop is then can be defined like this:
And, combined with the previous example:
(let ((c (a/chan 1))) (a/spawn (iter-lambda () (message "val: %s" (a/<! c)))) (a/put! c 42) (while (a/run)))
Again, this is the simplest scheduler possible, but we don’t care for now, as it is just a proof of concept. There should be error handling, the ability to cancel processes, and ways to avoid running processes when they don’t want to be run, like when the process is sleeping. A more sophisticated implementation can do all of this, but it’s a task for another day.
Still, spawning tasks like this is not convenient enough, so we can create another macro to automate some stuff:
(defmacro a/go (&rest body) "Create a lightweight thread that runs the BODY. Thread is automatically parked when operations on channels occur. The return value itself is a channel." (declare (indent 0)) (let ((ch (gensym)) (task (gensym))) `(let* ((,ch (a/chan 1)) (,task (iter-lambda () (a/>! ,ch (progn ,@body)) (a/close! ,ch)))) (a/spawn ,task) ,ch)))
You can see that it does a bit more than just wrapping
body into a
Instead, it also creates a channel, and splices in the
a/>! operation inside the
This is done to make sure that we can await on the
a/go block on its own, using the same API as with channels, and actually get the result of the
We can do so in a non-blocking way, by using one
a/go block inside another and awaiting the result via
a/<!, or we can define two blocking operations
a/>!! to use from the main thread:
(defun a/>!! (port val) "Blocking put. Puts X onto the PORT if the port is not closed and has room in the buffer. X must not be NIL. If the buffer is full, block until the buffer is ready to receive the value. Should not be used inside the `a/go' blocks." (named-let loop ((res (a/put! port val))) (cond (res res) ((eq :closed (a/chan-status port)) nil) (t (a/run) (loop (a/put! port val)))))) (defun a/<!! (port) "Blocking take. Takes a value off the PORT if it is not closed, and has something in the buffer. If the buffer is empty, will block until the buffer receives a value. Should not be used inside the `a/go' blocks." (named-let loop ((res (a/take! port))) (cond (res res) ((eq :closed (a/chan-status port)) nil) (t (a/run) (loop (a/take! port))))))
Note, that we’re calling
a/run if the channel operation was not successful, either because it was full or empty, to allow other threads to advance and put or take data from it.
This allows us to avoid blocking indefinitely, and since we don’t have a thread pool, this is our only choice.
Except, we do have threads in Emacs. So, we can create our own event loop inside Emacs’s own event loop like this:
(make-thread (lambda () (while t (a/run) (thread-yield))))
Personally, I wouldn’t recommend this unless the scheduler is sophisticated enough to avoid affecting emacs’ performance.
This will run forever, and update all tasks that we’ve spawned with the
(let ((ch (a/chan 1))) (a/go (dotimes (x 10) (a/>! ch x) (message "a put %s" x)) (message "a done") (a/close! ch)) (a/go (while-let ((x (a/<! ch))) (message "b got %s" x)) (message "b done")))
Log of the evaluation
a put 0 b got 0 a put 1 b got 1 a put 2 b got 2 a put 3 b got 3 a put 4 b got 4 a put 5 b got 5 a put 6 b got 6 a put 7 b got 7 a put 8 b got 8 a put 9 a done b got 9 b done
Tweaking the channel’s buffer size can affect how these processes communicate, e.g. here’s the log with the buffer size of
a put 0 a put 1 a put 2 b got 2 b got 1 b got 0 a put 3 a put 4 a put 5 b got 5 b got 4 b got 3 a put 6 a put 7 a put 8 b got 8 b got 7 b got 6 a put 9 a done b got 9 b done
This code spawns two lightweight processes communicating through the channel
The first process puts values into the channel and logs after the value is accepted.
The second one waits until values arrive in the channel, and logs the values it receives.
When the first channel finishes with the internal
dotimes loop it closes the channel with
The second task then will complete once the channel is closed, because
a/<! will return
nil once the channel is exhausted.
And that’s basically it!
So, what can we do with this kind of system?
We can retrieve an URL in an asynchronous way with
(let ((ch (a/chan 1))) (url-retrieve "https://andreyor.st/2022-09-26-reproducible-research-with-org-mode-fennel-and-love.org" (lambda (_) (a/>!! ch (length (buffer-string))) (a/close! ch))) (a/go (message "page size: %s" (a/<! ch))))
In the callback, provided to the function, we set the value of the
ch channel to be the total length of the resulting buffer.
Then we close the channel, so it won’t be used again by anyone else.
a/go block runs asynchronously and waits til the channel has the value.
If we have the scheduler running in the background, the message will appear once the operation is done.
The main point is that we can convert a callback-based interface to a channel-based interface with ease:
(defun a/url-retrieve (url) "Asynchronously retrieve the URL. Returns a channel, containing the resulting string once the operation is complete." (let ((ch (a/chan 1))) (url-retrieve url (lambda (_) (a/put! ch (buffer-string)) (a/close! ch))) ch))
Then our system built on top of channels will be able to work with this kind of function in a simple way. E.g., we can define an operation that will wait on multiple channels:
(defun a/alts! (&rest channels) "Wait for multiple CHANNELS. Return the first one that completes along with the result." (a/go (catch 'done (while t (dolist (ch channels) (when-let ((res (a/take! ch))) (throw 'done (list ch res)))) (iter-yield nil)))))
Yes, we may be required to manually yield from the
a/go at times, and this ruins the abstraction a bit, but it can be fixed with a proper channel implementation.
Anyhow, we can now do multiple requests simultaneously, and print the size of the fastest one:
(let ((c1 (a/url-retrieve "https://andreyor.st/posts/2023-01-09-comparison-of-manifold-and-clojurecoreasync/")) (c2 (a/url-retrieve "https://andreyor.st/posts/2022-12-09-fixed-version-of-pipeline-async-and-unordered-pipeline-variants/")) (c3 (a/url-retrieve "https://andreyor.st/posts/2022-11-21-clojures-coreasync-pipeline-async-off-by-two-error-explained/"))) (a/go (seq-let (_ str) (a/<! (a/alts! c1 c2 c3)) (message "page size: %s" (length str)))))
Will I make a library for this?
Probably not, I already have one for Fennel, and I don’t want to maintain another one for Emacs, plus I’m not that good at Emacs Lisp.
I like the idea, but in its current state, it has a lot of places to be improved.
Already mentioned emacs-aio is probably what you’d want to use if you want this kind of lightweight processes.
Though, I’d like to have such a library as a part of Emacs’ core, tightly integrated with other core packages, such as
comint for example.
Still, I suggest you read the author’s explanation of their library principles here: https://nullprogram.com/blog/2019/03/10/.
Again, all of the above is rather an experiment, an exploration of an alternative way of organizing asynchronous cooperative processes.
The channel interface described here was inspired by Clojure’s
core.async was in turn inspired by Go’s channels.
I, personally, like the idea of everything being a channel, and as you’ve seen in the last examples with
url-retrieve, channels can too act like a promise, except, unlike a promise you can’t continue re-reading the channel.
Don’t get me wrong promises are fine, though I like them a bit less because you have to chain a lot of callbacks in a clever way, almost having a dedicated DSL for that. The code with channels looks quite linear in my opinion, and I highly value this property when working with asynchronous code.
Anyway, let’s hope Emacs will continue to advance, and its support for multithreading and concurrency will grow to be better and simpler to use.