Andrey Listopadov

Ad-hoc async in emacs-lisp via generators

@programming @emacs emacs-lisp async ~12 minutes read

TIL that Emacs Lisp has had generators since 2016, and generators are a cool feature in any language in my opinion! In short, Emacs has the generator.el package as it’s part since version 25.1. The generators are implemented via transforming code and act like iterators, even most of the functions are called iter-something. Thus we can’t cross iterator boundaries without creating another iterator, so it’s not like they’re proper coroutines but we can still use them as such with these limitations in mind.

As you may have noticed, lately I was fiddling with Clojure’s core.async library, and implementation of a similar concept for the Fennel language via fennel-async. Fennel, being a language that compiles to Lua, allows us to use Lua’s coroutines directly for implementing non-blocking operations on channels. Clojure, on the other hand, doesn’t have proper coroutines, so to implement core.async they transform the code into a state machine, which then gets executed on a thread pool. Much like what generator.el does! Except for the thread pool part.

I really like the concept of CSP (Communicating Sequential Processes) and when I learned that Emacs has generators I wanted to see if it is possible to implement something like core.async in emacs-lisp. Now, this isn’t like a proper implementation, but rather a simplified quick prototype of what can be done, and I know that there are a lot of other implementations, like emacs-aio with is built on top of generators, but it uses promises instead of channels, and I like channels more. I must say that I’m not great at writing Emacs Lisp code that mutates things, because I mostly try to write code in a functional style, to avoid errors that can be caused by mutation, but in this case mutation is the only way to achieve the desired result.

Layout

First, let’s describe the overall layout. We will have:

  • Channels - mutable places that store data in a sequential buffer;
  • Primitives to create lightweight threads that are executed by the scheduler;
    • Such threads are also channels.
  • Primitives to park threads when we do operations on channels.

That’s basically all we need to implement core.async-style approach for writing code.

Also, I should note that generator.el requires lexical-binding to be true, otherwise, it refuses to work, so before we begin, let’s set up it and require the library:

;;; -*- lexical-binding: t -*-

(require 'generator)

Let’s start with channels.

Channel

We start simple. Let’s create a function that returns a new channel, with an optional size indication, that basically affects the buffering.

(defun a/chan (&optional size)
  "Create a channel with optional SIZE attribute."
  (when (and size (< size 1))
    (error "size must be greater than 0"))
  (list size))

I know, I know. This is definitively not a performant way of creating a queue, but it will do for our experiment purposes. We can always remake this in a proper way later, and it with probably require very slight internal API changes.

Next, we need a way to put items to the channel, take items off the channel, and close the channel:

(defun a/chan-status (port)
  "Return the status of the PORT or its size if any."
  (car port))

(defun a/put! (port val)
  "Put value VAL to the channel PORT."
  (when (null val) (error "value can't be nil"))
  (let ((size (a/chan-status port)))
    (when (or (null size)
              (and (numberp size)
                   (length< port (1+ size))))
      (nconc port (list val))
      t)))

(defun a/take! (port)
  "Take a value off the PORT"
  (when (length> port 1)
    (let ((res (car (last port))))
      (nbutlast port)
      res)))

(defun a/close! (port)
  "Close the PORT."
  (setcar port :closed))

These functions never block. If we can’t put value to the channel, we simply bail out and don’t do it. If we can’t take value off the channel - again, we bail out. So we can use these functions without ever worrying that they will block, but in reality, we shouldn’t because they don’t respect the fundamental parameter of a channel: backpressure.

In reality, channels should have a separate queue of puts, a queue of takes, and an optional buffer, thus these functions should not drop any data, but to keep things simple, I’ve decided to only have the buffer. We can fix this by introducing a few macros:

(defmacro a/<! (port)
  "Parking take.

Takes a value off the PORT if it is not closed, and has something
in the buffer."
  (let ((loop (gensym)) (p (gensym)) (res (gensym)))
    `(let ((,p ,port))
       (named-let ,loop ((,res (a/take! ,p)))
         (cond (,res ,res)
               ((eq :closed (a/chan-status ,p)) nil)
               (t (iter-yield nil)
                  (,loop (a/take! ,p))))))))

(defmacro a/>! (port x)
  "Parking put.

Puts X onto the PORT if the port is not closed and has room in
the buffer.  X must not be NIL."
  (let ((loop (gensym)) (res (gensym)) (p (gensym)) (val (gensym)))
    `(let ((,p ,port) (,val ,x))
       (named-let ,loop ((,res (a/put! ,p ,val)))
         (cond (,res ,res)
               ((eq :closed (a/chan-status ,p)) nil)
               (t (iter-yield nil)
                  (,loop (a/put! ,p ,val))))))))

Why are these macros, you might ask? Well, because we will be only using these in the generator context, and we can’t call iter-yield from anywhere but the iterator’s body, in turn, we can’t make these two operations to be functions. Iter-yield is a transformation point for the iterator’s body, and thus it must be lexically present. Now, let’s look at how we can use these.

To test this out, we can create a channel and an iterator, which tries to take a value off the channel, and print it:

(defvar c (a/chan 1))
Code Snippet 1: the channel with a buffer of 1
(defvar g (funcall (iter-lambda ()
                     (message "val: %s" (a/<! c)))))
Code Snippet 2: an iterator that uses our custom macro

Advancing the iterator does nothing, even if we call it multiple times, because a/<! handles the wait:

(iter-next g) ; => nothing

However, if we put the value to the channel…

(a/put! c 42)

…and then advance the iterator…

(iter-next g)

…we’ll see the message "val: 42" and receive the signal iter-end-of-sequence.

So, as you can see, the g iterator parked itself at the a/<! call, and the channel was acting as the synchronization point between the main process and our iterator. Such an iterator really is like a lightweight thread, with cooperative scheduling, which we hid in the a/<! macro. However, manually advancing iterators isn’t great, so we can build a really simple scheduler that will do that for us.

Scheduler

Our scheduler will use a simple list of processes and it will simply go through and execute each one after another. Before we can write it though, we need a way to spawn our lightweight processes:

(setq a/processes '())

(defun a/spawn (f &rest args)
  "Start process F by supplying optional ARGS and `push' it to the task pool."
  (let ((p (apply f args)))
    (push p a/processes)
    (iter-next p)
    p))

Now we can rewrite our previous example like this:

(let ((c (a/chan 1)))
  (a/spawn (iter-lambda ()
             (message "val: %s" (a/<! c))))
  (a/put! c 42))

But we still need to run the process and maintain its state, e.g. when the iterator is exhausted the process should be removed from the processes list. This function takes care of this:

(defun a/run ()
  "Run each process in the task-pool, and remove those that have finished."
  (dolist (proc a/processes)
    (condition-case nil
        (iter-next proc)
      (iter-end-of-sequence
       (setq a/processes (delq proc a/processes)))))
  (length> a/processes 0))

The return value of this function is either t or nil, indicating whether there are processes left. The simplest event loop is then can be defined like this:

(while (a/run))

And, combined with the previous example:

(let ((c (a/chan 1)))
  (a/spawn (iter-lambda ()
             (message "val: %s" (a/<! c))))
  (a/put! c 42)
  (while (a/run)))

Again, this is the simplest scheduler possible, but we don’t care for now, as it is just a proof of concept. There should be error handling, the ability to cancel processes, and ways to avoid running processes when they don’t want to be run, like when the process is sleeping. A more sophisticated implementation can do all of this, but it’s a task for another day.

Still, spawning tasks like this is not convenient enough, so we can create another macro to automate some stuff:

(defmacro a/go (&rest body)
  "Create a lightweight thread that runs the BODY.

Thread is automatically parked when operations on channels occur.
The return value itself is a channel."
  (declare (indent 0))
  (let ((ch (gensym)) (task (gensym)))
    `(let* ((,ch (a/chan 1))
            (,task (iter-lambda ()
                     (a/>! ,ch (progn ,@body))
                     (a/close! ,ch))))
       (a/spawn ,task)
       ,ch)))

You can see that it does a bit more than just wrapping body into a iter-lambda. Instead, it also creates a channel, and splices in the a/>! operation inside the iter-lambda body. This is done to make sure that we can await on the a/go block on its own, using the same API as with channels, and actually get the result of the body. We can do so in a non-blocking way, by using one a/go block inside another and awaiting the result via a/<!, or we can define two blocking operations a/<!! and a/>!! to use from the main thread:

(defun a/>!! (port val)
  "Blocking put.

Puts X onto the PORT if the port is not closed and has room in
the buffer.  X must not be NIL.  If the buffer is full, block
until the buffer is ready to receive the value.  Should not be
used inside the `a/go' blocks."
  (named-let loop ((res (a/put! port val)))
    (cond (res res)
          ((eq :closed (a/chan-status port)) nil)
          (t (a/run)
             (loop (a/put! port val))))))

(defun a/<!! (port)
  "Blocking take.

Takes a value off the PORT if it is not closed, and has something
in the buffer.  If the buffer is empty, will block until the
buffer receives a value.  Should not be used inside the `a/go'
blocks."
  (named-let loop ((res (a/take! port)))
    (cond (res res)
          ((eq :closed (a/chan-status port)) nil)
          (t (a/run)
             (loop (a/take! port))))))

Note, that we’re calling a/run if the channel operation was not successful, either because it was full or empty, to allow other threads to advance and put or take data from it. This allows us to avoid blocking indefinitely, and since we don’t have a thread pool, this is our only choice.

Except, we do have threads in Emacs. So, we can create our own event loop inside Emacs’s own event loop like this:

(make-thread (lambda () (while t (a/run) (thread-yield))))

Personally, I wouldn’t recommend this unless the scheduler is sophisticated enough to avoid affecting emacs’ performance. This will run forever, and update all tasks that we’ve spawned with the a/go macro:

(let ((ch (a/chan 1)))
  (a/go
   (dotimes (x 10)
     (a/>! ch x)
     (message "a put %s" x))
   (message "a done")
   (a/close! ch))
  (a/go
   (while-let ((x (a/<! ch)))
     (message "b got %s" x))
   (message "b done")))
Log of the evaluation
a put 0
b got 0
a put 1
b got 1
a put 2
b got 2
a put 3
b got 3
a put 4
b got 4
a put 5
b got 5
a put 6
b got 6
a put 7
b got 7
a put 8
b got 8
a put 9
a done
b got 9
b done

Tweaking the channel’s buffer size can affect how these processes communicate, e.g. here’s the log with the buffer size of 3:

a put 0
a put 1
a put 2
b got 2
b got 1
b got 0
a put 3
a put 4
a put 5
b got 5
b got 4
b got 3
a put 6
a put 7
a put 8
b got 8
b got 7
b got 6
a put 9
a done
b got 9
b done

This code spawns two lightweight processes communicating through the channel ch. The first process puts values into the channel and logs after the value is accepted. The second one waits until values arrive in the channel, and logs the values it receives.

When the first channel finishes with the internal dotimes loop it closes the channel with a/close!. The second task then will complete once the channel is closed, because a/<! will return nil once the channel is exhausted.

And that’s basically it!

Examples

So, what can we do with this kind of system?

We can retrieve an URL in an asynchronous way with url-retrieve:

(let ((ch (a/chan 1)))
  (url-retrieve
   "https://andreyor.st/2022-09-26-reproducible-research-with-org-mode-fennel-and-love.org"
   (lambda (_)
     (a/>!! ch (length (buffer-string)))
     (a/close! ch)))
  (a/go
    (message "page size: %s" (a/<! ch))))

In the callback, provided to the function, we set the value of the ch channel to be the total length of the resulting buffer. Then we close the channel, so it won’t be used again by anyone else. The a/go block runs asynchronously and waits til the channel has the value. If we have the scheduler running in the background, the message will appear once the operation is done.

The main point is that we can convert a callback-based interface to a channel-based interface with ease:

(defun a/url-retrieve (url)
  "Asynchronously retrieve the URL.

Returns a channel, containing the resulting string once the
operation is complete."
  (let ((ch (a/chan 1)))
    (url-retrieve url (lambda (_)
                        (a/put! ch (buffer-string))
                        (a/close! ch)))
    ch))

Then our system built on top of channels will be able to work with this kind of function in a simple way. E.g., we can define an operation that will wait on multiple channels:

(defun a/alts! (&rest channels)
  "Wait for multiple CHANNELS.

Return the first one that completes along with the result."
  (a/go
    (catch 'done
      (while t
        (dolist (ch channels)
          (when-let ((res (a/take! ch)))
            (throw 'done (list ch res))))
        (iter-yield nil)))))

Yes, we may be required to manually yield from the a/go at times, and this ruins the abstraction a bit, but it can be fixed with a proper channel implementation. Anyhow, we can now do multiple requests simultaneously, and print the size of the fastest one:

(let ((c1 (a/url-retrieve "https://andreyor.st/posts/2023-01-09-comparison-of-manifold-and-clojurecoreasync/"))
      (c2 (a/url-retrieve "https://andreyor.st/posts/2022-12-09-fixed-version-of-pipeline-async-and-unordered-pipeline-variants/"))
      (c3 (a/url-retrieve "https://andreyor.st/posts/2022-11-21-clojures-coreasync-pipeline-async-off-by-two-error-explained/")))
  (a/go
    (seq-let (_ str) (a/<! (a/alts! c1 c2 c3))
      (message "page size: %s" (length str)))))

Will I make a library for this?

Probably not, I already have one for Fennel, and I don’t want to maintain another one for Emacs, plus I’m not that good at Emacs Lisp. I like the idea, but in its current state, it has a lot of places to be improved. Already mentioned emacs-aio is probably what you’d want to use if you want this kind of lightweight processes. Though, I’d like to have such a library as a part of Emacs’ core, tightly integrated with other core packages, such as comint for example. Still, I suggest you read the author’s explanation of their library principles here: https://nullprogram.com/blog/2019/03/10/.

Again, all of the above is rather an experiment, an exploration of an alternative way of organizing asynchronous cooperative processes. The channel interface described here was inspired by Clojure’s core.async and core.async was in turn inspired by Go’s channels. I, personally, like the idea of everything being a channel, and as you’ve seen in the last examples with url-retrieve, channels can too act like a promise, except, unlike a promise you can’t continue re-reading the channel.

Don’t get me wrong promises are fine, though I like them a bit less because you have to chain a lot of callbacks in a clever way, almost having a dedicated DSL for that. The code with channels looks quite linear in my opinion, and I highly value this property when working with asynchronous code.

Anyway, let’s hope Emacs will continue to advance, and its support for multithreading and concurrency will grow to be better and simpler to use.