Andrey Listopadov

Fennel Async at its Core

~5 minutes read

Fennel Async at its Core

  • channels
    • buffered or unbuffered
  • go-threads

That’s it!

Main features

  • Aim for ClojureScript compatibility.
    • Almost all tests were ported from the ClojureScript library and all the ones ported pass.
  • Cancellation of actions.
  • Limited detection of deadlocks.
  • Macros and functions provided via a single file.
  • Seamless scheduling - no need to run the scheduler manually.
  • Self-adjusting scheduler.
  • Additional experimental stuff: TCP, HTTP.

Clojure compatibility (Clojure example)

;; Clojure
(require '[clojure.core.async :as a :refer [>!]])

(let [n 1000
      cs (repeatedly n a/chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (a/go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (a/alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
Read 1000 msgs in 163 ms

Clojure compatibility (Fennel example)

;; Fennel
(local {: >! &as a} (require :src.async))
(local {: gettime} (require :socket))
(import-macros {: go} (doto :src.async require))

(let [n 1000
      cs (fcollect [_ 1 n] (a.chan))
      begin (gettime)]
  (each [_ c (ipairs cs)] (go (>! c "hi")))
  (for [_ 1 n]
    (let [[v c] (a.alts!! cs)]
      (assert (= "hi" v))))
  (print "Read" n "msgs in" (* (- (gettime) begin) 1000) "ms"))
Read	1000	msgs in	430.63902854919	ms

Differences from Clojure

  • Limitations in go blocks
  • Reasoning behind limitations
  • Real threads

Limitations in go blocks

Reasoning behind limitations

Real threads

(local {: >!! : <!! : <!} a)

(fn map [f coll]
  (icollect [_ v (ipairs coll)]
    (f v)))

(local cs (fcollect [i 1 10] (doto (a.chan 1) (>!! i))))

(<!! (go (map <! cs)))

[1 2 3 4 5 6 7 8 9 10]

Differences from my previous library

  • Proper cancellation of actions
  • Limited detection of deadlocks
  • Seamless Scheduling
  • Self-adjusting scheduler

Proper cancellation of actions (takes)

(local c (a.chan))

(go (print "original go block:" (<! c)) (a.close! c))

(let [t (a.timeout 100)]
  (match (a.alts!! [c t])
    [_ t] (print "timeout waiting for c")
    [?val c] (print "alts!! got val:" ?val "from c")))

(>!! c "hi")

(print (<!! c))
timeout waiting for c
original go block:	hi
nil

Proper cancellation of actions (puts)

(local c (a.chan))

(go (>! c 42) (print "done putting") (a.close! c))

(let [t (a.timeout 100)]
  (match (a.alts!! [[c 27] t])
    [_ t] (print "timeout waiting for c")
    [_ c] (print "alts!! managed to put 27 to c")))

(print :val (<!! c))
(print (<!! c))
timeout waiting for c
val	42
done putting
nil

Limited detection of deadlocks

(local c (a.chan))

(<!! c)
The ManyToManyChannel: 0x55a3f7753720 is not ready and there are no scheduled tasks. Value will never arrive.

stack traceback:
  ./src/async.fnl:877: in function ?
  /var/home/alist/.local/bin/fennel:880: in function ?
  [C]: in function 'xpcall'
  /var/home/alist/.local/bin/fennel:885: in function ?
  /var/home/alist/.local/bin/fennel:912: in function ?
  /var/home/alist/.local/bin/fennel:922: in function ?
  [C]: in ?
(local c (a.chan))

(go (<! (a.timeout 3000)))

(<!! c)
The ManyToManyChannel: 0x55a3f7878900 is not ready and there are no scheduled tasks. Value will never arrive.

stack traceback:
  ./src/async.fnl:877: in function ?
  /var/home/alist/.local/bin/fennel:880: in function ?
  [C]: in function 'xpcall'
  /var/home/alist/.local/bin/fennel:885: in function ?
  /var/home/alist/.local/bin/fennel:912: in function ?
  /var/home/alist/.local/bin/fennel:922: in function ?
  [C]: in ?
(local c (a.chan))

(go (<! (a.timeout 3000)) (>! c "hello there!"))

(<!! c)
hello there!

Limited detection of deadlocks (problems)

(import-macros {: go-loop} (doto :src.async require))
(local log-ch (a.chan))

(go-loop [msg (<! log-ch)]
  (when (not= nil msg)
    (print msg)
    (recur (<! log-ch))))
(go-loop []
  (when (>! log-ch (os.date))
    (<! (a.timeout 30000))
    (recur)))
(<!! c)

Seamless Scheduling

(go
  (for [i 1 10]
    (<! (a.timeout 100))
    (print :i i)))

(print :loop :start)
(for [i 1 1000000000] nil)
(print :loop :done)
loop	start
i	1
i	2
i	3
i	4
i	5
i	6
i	7
i	8
i	9
i	10
loop	done

Event Loops

Seamless Scheduling (in ClojureScript)

;; ClojureScript
(defn queue-dispatcher []
  (when-not (and queued? running?)
    (set! queued? true)
    (goog.async.nextTick process-messages)))

Seamless Scheduling

;; back to Fennel
(fn process-messages [event]
  {:private true}
  (let [took (- (os/clock) register-time)
        (_ n) (cancel-hook process-messages)]
    (set n-instr
      (if (not= event :count) n
          (m/floor (/ 0.01 (/ took n)))))
    (faccumulate [done nil _ 1 1024 :until done]
      (case (next dispatched-tasks)
        f (tset dispatched-tasks (doto f pcall) nil)
        nil true))
    (each [t ch (pairs timeouts)]
      (when (>= 0 (difftime t (time)))
        (tset timeouts t (ch:close!))))
    (when (or (next dispatched-tasks) (next timeouts))
      (schedule-hook process-messages n-instr))))

Self-adjusting scheduler

(var (n-instr register-time orig-hook orig-mask orig-n)
      1_000_000)

(fn schedule-hook [hook n]
  {:private true}
  (when (and gethook sethook)
    (let [(hook* mask n*) (gethook)]
      (when (not= hook hook*)
        (set (register-time orig-hook orig-mask orig-n)
          (values (os/clock) hook* mask n*))
        (sethook main-thread hook "" n)))))

(fn cancel-hook [hook]
  {:private true}
  (when (and gethook sethook)
    (match (gethook main-thread)
      (hook ?mask ?n)
      (do (sethook main-thread orig-hook orig-mask orig-n)
          (values ?mask ?n)))))

Problems with this approach

  • Interfering with other hooks
  • Calculations are imprecise and can get off by calling other things that affect CPU time
  • Dependency on the debug library.

Extras

  • TCP
  • HTTP, maybe

TCP

(require 'server)
(local {: <! : <!! : alts! : >! &as a} (require :src.async))
(require-macros (doto :src.async require))

(local [c1 c2 c3]
  [(a.tcp.chan {:host "localhost" :port 12345})
   (a.tcp.chan {:host "localhost" :port 12345})
   (a.tcp.chan {:host "localhost" :port 12345})])

(go (>! c1 "1")
    (>! c2 "2")
    (>! c3 "3"))

(a.alts!! [c1 c2 c3 (a.timeout 2000)])
["2 done" #<SocketChannel: 0x55a3f7456860>]
(<!! c1)
(<!! c2)
(<!! c3)

HTTP, maybe

(<!! (a.http.get "https://andreyor.st"))
{:body #<ManyToManyChannel: 0x55a3f796dff0>
 :headers {:cache-control "max-age=600"
           :connection "close"
           :content-length "13058"
           :content-type "text/html; charset=utf-8"
           :date "Sun, 31 Dec 2023 17:55:37 GMT"
           :etag "\"e1ac3c9ef30d35d48af8268c8f839e6141d57ce86027da2c1342897a0011341a\""
           :expires "Sun, 31 Dec 2023 18:05:37 UTC"
           :last-modified "Sun, 31 Dec 2023 12:54:17 GMT"
           :permissions-policy "interest-cohort=()"
           :vary "Origin"}
 :status 200}
(a.alts!! [(a.http.get "https://andreyor.st/posts/feed.xml")
           (a.http.get "https://andreyor.st/tags/css/feed.xml")])

Thanks

Resources: