Andrey Listopadov

async.fnl enchancements

@programming async fennel ~15 minutes read

Lately, I’ve been working on async.fnl in my spare time and realized that the previous code that I used to schedule timers was terrible. Well, realized isn’t an appropriate word here, because I knew it back when I first implemented it, I just didn’t bother to make it better until I had a fully working library. So, a few weeks ago I had an idea on how to make things both more efficient and prevent excessive CPU usage when there’s no need to.

Lua debug hooks

Lua comes with the debug library. This library provides a set of relatively low-level facilities to implement your own debugger, and one such things are hooks.

All hook manipulations are done with debug.sethook and debug.gethook functions - one to set/unset the hook, and the other to retrieve existing hook, respectively. A hook is a simple Lua function, that accepts the event which triggered it. Lua gives us five types of events we can check for in our hook: "call", "tail call", "return", "line", and "count", but we’re only really interested in one - the "count".

But before we discuss hooks, I’d like to do a small detour into how my old async library handled things. It will both allow you to understand why I decided to move away from the old implementation, and how we can use hooks to achieve similar facilities later. Feel free to skip if you’re not interested.

Scheduler

Here’s how asynchronous threads were handled in an older implementation of my other async library.

  • The library had a special scheduler object, that kept track of all tasks to be run.
  • Each task that wanted to stop itself for some time is called the async.park function, usually indirectly via something like async.take or async.put.
    • additionally, If a task wants to sleep for some time, it can call async.sleep, which, when called on the asynchronous thread, parks it with a special condition, and tells the scheduler that this task should not be touched for the specified amount of time.
  • Then, once the internal scheduler.run is called, all tasks are examined, and the shortest amount of sleep is chosen, if any.
    • If there are tasks that aren’t sleeping, the system resumes those tasks;
    • If, however, all tasks want to sleep, the system determines the minimum amount of time it needs to sleep and puts the main thread to sleep.
  • Once all tasks ran, the scheduler.sleep exits unless the user requested it to run until all tasks are fully complete.

This worked reasonably fine but required someone to call the scheduler.run periodically. The library tried to make it transparent for the program, as any operation that created threads, pushed or polled for values, and so on, called shceduler.run internally, to give an illusion that the system actually works in parallel. Thus, if you made a blocking take on some channel, the scheduler would run until that channel is ready to return the value (though, this might never happen).

The main downside of this was that the whole system depends on a central scheduling mechanism, and the more tasks you have spawned, the less performant the scheduler becomes. For example, here’s a function, that tracks the amount of time it took for value to be put to a channel and then read back in another thread:

(local {: gettime} (require :socket))
(local {: put : close! : take : chan &as async}
  (require :fennel-async.async))

(fn bench []
  (let [c (chan 1)
        lock (chan)]
    (var put-time 0)
    ;; a thread that loops putting the values one by one, and tracks
    ;; the time when it completed each put
    (async
     (fn []
       (for [i 1 10]
         (set put-time (gettime))
         (put c i))
       (close! c)))
    ;; a thread that takes values, and measures the amount of time it took
    ;; for value to be transferred.  Loops for more until the channel
    ;; is closed
    (async
     (fn loop []
       (let [val (take c)]
         (if val
             (do (print (- (gettime) put-time))
                 (loop))
             (close! lock)))))
    ;; waits til both threads are done
    (take lock)))

Let’s call it:

>> (bench)
6.0796737670898e-05
1.1205673217773e-05
2.8848648071289e-05
9.0599060058594e-06
7.8678131103516e-06
9.0599060058594e-06
8.1062316894531e-06
9.0599060058594e-06
7.8678131103516e-06
2.6941299438477e-05
nil

Pretty fast. Now, let’s add a few tasks that will immediately park because there’s no one to produce values, and will simply exist in a scheduler:

;; create 10000 channels
(local channels (fcollect [i 1 10000] (async.chan)))

;; spawn 10000 tasks waiting on these channels
(each [_ chan (ipairs channels)]
  (async (fn [] (async.take chan))))

This seemingly innocent each loop took a whopping 148 seconds to execute on my machine! It takes so much time because the library re-runs the scheduler over an ever-increasing number of tasks on every single call to async. So, it’s basically O(n²) loop even though it looks like O(n).

Now, let’s call our function again:

>> (bench)
0.033475875854492
0.040903806686401
0.044415950775146
0.043467998504639
0.039731979370117
0.039532899856567
0.039582014083862
0.039039134979248
0.039515972137451
0.039596080780029
nil

So, even though all of the threads were parked, and our bench function doesn’t interact with any of them directly, its execution was slowed down severely. For a good measure, let’s release all those tasks by putting some values onto the channels with another loop (each [_ c (ipairs channels)] (async.put c 42)) (another 163 seconds to complete), and then re-run the bench function, to check that it is fast again.

>> (bench)
0.00029706954956055
8.8214874267578e-06
9.7751617431641e-06
9.0599060058594e-06
9.0599060058594e-06
9.0599060058594e-06
6.1988830566406e-06
8.8214874267578e-06
1.5974044799805e-05
2.4080276489258e-05
nil

And we’re back to being fast again. Yay!..

This is a major downside of the system that keeps track of all tasks - there’s no way of optimizing it because it has to check for all tasks every time - what if some tasks are no longer blocked and can advance? We can’t even partition those tasks, because, as in the example above, the task is not related to timed events at all. Because of that, I decided to re-implement the library from scratch, but I already blogged about it.

Fake it til you actually need it

In the new version of the library, I avoided the scheduler altogether. Well, there is a scheduler still, but unless you’re using timeout channels, you can live completely without it (for the most part, that is). BTW, I shouldn’t call it a new version of the library, it is simply a different library, but the facilities are largely the same for both, so we can compare them.

First things first, let’s test the code from the above with the new library:

(import-macros {: go : go-loop} (doto :async require))
(local {: >! : <! : <!! : close! : chan : take!} (require :async))
(local {: gettime} (require :socket))

(fn bench []
  (let [c (chan)
        lock (chan)]
    (var put-time 0)
    ;; a thread that loops putting the values one by one, and tracks
    ;; the time when it completed each put
    (go
      (for [i 1 10]
        (set put-time (gettime))
        (>! c i))
      (close! c))
    ;; a thread that takes values, and measures the amount of time it took
    ;; for value to be transferred.  Loops for more until the channel
    ;; is closed
    (go-loop [val (<! c)]
      (if val
          (do (print (- (gettime) put-time))
              (recur (<! c)))
          (close! lock)))
    ;; waits til both threads are done
    (<!! lock)))
>> (bench)
8.9883804321289e-05
4.6968460083008e-05
0.00019192695617676
4.6014785766602e-05
0.00013494491577148
1.8835067749023e-05
9.7990036010742e-05
2.0027160644531e-05
9.8943710327148e-05
3.0994415283203e-05
nil

The overall speed is more or less the same but now, let’s repeat the same trick, again creating 10000 channels, and registering a task on each of them:

;; create 10000 channels
(local channels (fcollect [i 1 10000] (chan)))

;; spawn 10000 tasks waiting on these channels
(each [_ c (ipairs channels)]
  (go (<! c)))

Not only this code executes almost immediately, taking about a second, but this amount of channels also don’t affect our bench function at all:

>> (bench)
0.00028419494628906
9.2029571533203e-05
0.00049304962158203
6.6041946411133e-05
0.00028800964355469
5.1021575927734e-05
0.00026488304138184
5.6982040405273e-05
0.00023293495178223
7.9870223999023e-05
nil

So, what’s the trick?

If we take a look at the internals of the library, there’s also a queue of dispatched tasks, called, well dispatched-tasks. We can load the library as a whole into the REPL and poke around:

>> (local channels (fcollect [i 1 10000] (chan)))
nil
>> (each [_ c (ipairs channels)] (go (<! c)))
nil
>> dispatched-tasks
{}
>> (length channels)
10000
>> (. channels 1)
#<ManyToManyChannel: 0x55fbd1d328d0>
>> (. channels 1 :takes)
[#<reify: 0x55fbd2cbb690: Handler>]

So, even though we’ve actually created 10000 channels plus a 10000 of go blocks, each of which is also a channel, and registered a pending take (<!) on each one, no tasks were actually added to the dispatched-tasks at all. A task is only added to the scheduler when the channel is used! In other words, once we put something onto a channel, that has a pending take, only then the task will be scheduled to execute:

>> (put! (. channels 1) 42)
true
>> dispatched-tasks
{#<function: 0x55dcaf30ecb0> true}

This means that the work done by the scheduler is limited to the amount of resources really used by the program at a given point in time. So our partitioning problem is gone. There’s still another problem though - we need to run the scheduler somehow.

Now we can talk about hooks.

Hooks (the wrong way)

Initially, when implementing this system I realized, that while the scheduler actually isn’t necessary for plain channels at all, because you can always delay the execution of a pending operation until the channel is ready this won’t work if I add timeout channels. And actually, before I introduced timers to the library, there wasn’t any dispatched-tasks kind of thing at all, so the library was truly scheduler-less. But after I ported the whole thing from ClojureScript, I had to add timeout channels to be fully compatible, thus the scheduling had to be introduced too.

For a brief note, a timeout function returns a channel that closes automatically after the specified period of time:

>> (let [start (time)]
     (<!! (timeout 300))
     (print (.. "Elapsed: " (- (time) start) " s")))
Elapsed: 0.3021879196167 s
nil

But how exactly it knows when to close? While we can block our main thread with <!! OP and inspect the timeout object for the time it needs to sleep, thus putting the main thread to sleep for that time it will also stop all other background tasks from happening. And if the timeout is awaited asynchronously in a go block, we can’t do even that.

So, we do need a scheduler that will run regularly and will check for any timeouts that are ready to be released. My initial thought was that I once again need some kind of scheduler.run function that will need to be manually called by each function in the library, but this approach has other problems too.

For example, what if the user will write a tight loop that will run for too long and we’ll miss the time we need to release a timeout? Well, that isn’t that big of a deal, but I’d like not to have a system that is this unreliable. The user can even forget to call scheduler.run in their loop, wondering why the timer doesn’t even count. So I started searching for other approaches.

The solution was to use debug.sethook with some options, to run the hook regularly enough that it will not cause serious performance hit for the system as a whole, and will update timers with enough precision. And that’s exactly what the ClojureScript version of the library does! Except it uses the goog.async thing, which I have no idea how it works, but the solution is simple and effective. As far as I know, JavaScript has some facilities to register callbacks to be run after a certain amount of time, so maybe it’s something along the line there.

So I decided to try this out and made a simple hook that runs every time we return from functions, and every one thousand VM instructions:

;; ----8<---- piece of old code ahead -----8<-----
(local -timeouts {})

(fn -advance-timeouts []
  "Close any timeout channels whose closing time is less than or equal
to current time."
  (when (next -timeouts)
    (let [to-remove (icollect [t ch (pairs -timeouts)]
                      (when (> 0 (-difftime t (-time)))
                        (ch:close)
                        t))]
      (each [_ t (ipairs to-remove)]
        (tset -timeouts t nil)))))

(fn -register-hook [f]
  "Run function `f` on each function return and every 1000 instructions.
Appends the hook to existing one, if found, unless the hook is `f`."
  (when (and gethook sethook)
    (match (gethook)
      f nil
      nil (sethook f :r 1000)
      (hook mask count)
      (sethook
       (fn [...] (hook ...) (f ...))
       mask
       count))))

(-register-hook -advance-timeouts)
;; ----8<----

And it worked! Satisfied with the result, and not paying any attention to the slowdown in the REPL, I moved on. But the bitter taste of this code still followed me, as I knew that this isn’t a proper solution by any means. This had many problems.

  • The first and obvious one is that the hook runs too frequently, but I didn’t think about it too hard because I had other stuff to port to the library.
  • Second, and a bit less obvious, the hook runs all the time, even if there are no timeout channels at all. Unlike the ClojureScript solution, which schedules the hook once and only when needed, this one runs til the end of time. Well, if there are no timeout channels it does nothing but the VM still stops to execute it each time the condition is met, which is bad.
  • And third, I tried to keep the old hooks around, by using composition and thus made a foreign hook run possibly too often or even not often enough depending on the original settings.

Later, when I added the mentioned dispatched-tasks table to the code, I started noticing the slowdown even more. So I knew I need to do something.

Hooks (the less wrong way)

So, I’m pleased to say that all these problems were rectified recently.

One morning, or a late night, I don’t really remember, I thought that it would be nice to actually measure how much time it takes for Lua to execute these 1000 instructions. I then could adjust the parameter to only run the hook approximately 10ms after it was registered. Ten milliseconds feels like a decent enough precision for timeout channels. I mean no one in a sane mind will try to implement a hard real-time system with my library, or in Lua for that matter. Even a firm one.

Plus, to save up some performance and avoid keeping track of too many timers, I made an optimization to only create a new timer if the time passed since the previous one was created is greater than 10ms. Thus, the timers at minimum are 10 milliseconds apart from each other.

After a bit of testing, it turned out that a million and a half instructions approximately take 10 ms to complete on my machine. And while 1 million felt like a more appropriate amount to use for advancing timers, I knew that it would change on a per-machine basis, so I decided to tweak the system once more.

Inspired by the ClojureScript approach, I thought that there was no need to run the hook every N instructions. Instead, I can run it after N instructions, and then disable it from within the hook itself. Moreover, I can calculate how much time it took to execute this amount of instructions, and recalculate N to set the hook to run 10ms in the future.

Here’s the code behind that:

And the main interface for hook scheduling.

(var (n-instr register-time orig-hook orig-mask orig-n) 1_000_000)

(fn schedule-hook [hook n]
  (when (and gethook sethook)
    (let [(hook* mask n*) (gethook)]
      (when (not= hook hook*)
        (set (register-time orig-hook orig-mask orig-n)
          (values (os/clock) hook* mask n*))
        (sethook main-thread hook "" n)))))
(fn cancel-hook [hook]
  (when (and gethook sethook)
    (match (gethook main-thread)
      (hook ?mask ?n)
      (do (sethook main-thread orig-hook orig-mask orig-n)
          (values ?mask ?n)))))
Code Snippet 1: Hook cancellation. Returns the hook's settings for future analysis, and resets to the old hook if specified.
(fn process-messages [event]
  (let [took (- (os/clock) register-time)
        (_ n) (cancel-hook process-messages)]
    (set n-instr
      (if (not= event :count) n
          (// 0.01 (/ took n))))
    (faccumulate [done nil _ 1 1024 :until done]
      (case (next dispatched-tasks)
        f (tset dispatched-tasks (doto f pcall) nil)
        nil true))
    (each [t ch (pairs timeouts)]
      (when (>= 0 (difftime t (time)))
        (tset timeouts t (ch:close!))))
    (when (or (next dispatched-tasks) (next timeouts))
      (schedule-hook process-messages n-instr))))
Code Snippet 2: The main body of the hook, that processes the messages. Re-registers itself if there's more work to be done. Calculates the amount of time it takes 1 instruction to complete, and uses it to reschedule itself 10ms after the execution.

In addition to that I opted out of combining several hooks into a single one. This feels like a Lua problem - I don’t really understand why the VM only allows for a single hook to be registered. Lua has five different events to hook up to, so why not allow registering five hooks? And why isn’t it possible to register more than one hook for a single event type? I mean, if there were many hooks registered, just run them in succession.

I guess I can implement my own sethook and replace the one that comes with the debug library with my own version after someone loads my library. Then register a handler that will keep track of all other hooks registered and run them accordingly. Maybe I’ll try it.

Two problems though - there’s no way of removing a specific hook in stock Lua, and I will need to somehow determine what hook to call on count events if more than one was registered. Especially when each specifies a different amount of instructions. I guess this is why there’s this kind of limitation in the stock debug library, but I don’t think it’s impossible to do it in the VM itself with an ever-growing instruction counter. Probably it’s just an optimization.

In any case, the new system balances itself much better than before keeping 10ms intervals between tasks, and the slowdowns are gone.

And while I was on it, I realized, that given the fact that I have both scheduked-tasks and timeouts storages, instead of checking for them in a hook, I can also do the check when I do a blocking operation. This means that if the blocking operation only needs to wait for a timeout to close, it can actually sleep instead of doing a busy loop:

>> (let [start (time)
         cpu-start (os.clock)]
     (<!! (timeout 1000))
     (print (.. "CPU time: " (- (os.clock) cpu-start) " s"))
     (print (.. "Elapsed: " (- (time) start) " s")))
CPU time: 0.169319 s
Elapsed: 1.0089449882507 s
nil

Another realization came in that there is no point in doing any waiting if there are no active timeouts or tasks dispatched. Because blocking the only thread means that no other thread can dispatch its task. And a task can only be dispatched by another task, thus we can exit the waiting loop and throw an error without blocking the execution forever. This doesn’t mean that we’re free from data races, far from it, but at least we can detect that the data will never arrive on a channel if we block. In practice this shouldn’t happen too often.

Probably, there are more clean ways of handling hooks than the one I’ve came up with, so feel free to suggest how to change the approach in the project’s repository.