Fennel Async at its Core
Fennel Async at its Core
- channels
- buffered or unbuffered
- go-threads
That’s it!
Main features
- Aim for ClojureScript compatibility.
- Almost all tests were ported from the ClojureScript library and all the ones ported pass.
- Cancellation of actions.
- Limited detection of deadlocks.
- Macros and functions provided via a single file.
- Seamless scheduling - no need to run the scheduler manually.
- Self-adjusting scheduler.
- Additional experimental stuff: TCP, HTTP.
Clojure compatibility (Clojure example)
;; Clojure
(require '[clojure.core.async :as a :refer [>!]])
(let [n 1000
cs (repeatedly n a/chan)
begin (System/currentTimeMillis)]
(doseq [c cs] (a/go (>! c "hi")))
(dotimes [i n]
(let [[v c] (a/alts!! cs)]
(assert (= "hi" v))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
Read 1000 msgs in 163 ms
Clojure compatibility (Fennel example)
;; Fennel
(local {: >! &as a} (require :src.async))
(local {: gettime} (require :socket))
(import-macros {: go} (doto :src.async require))
(let [n 1000
cs (fcollect [_ 1 n] (a.chan))
begin (gettime)]
(each [_ c (ipairs cs)] (go (>! c "hi")))
(for [_ 1 n]
(let [[v c] (a.alts!! cs)]
(assert (= "hi" v))))
(print "Read" n "msgs in" (* (- (gettime) begin) 1000) "ms"))
Read 1000 msgs in 430.63902854919 ms
Differences from Clojure
- Limitations in go blocks
- Reasoning behind limitations
- Real threads
Limitations in go blocks
Reasoning behind limitations
Real threads
(local {: >!! : <!! : <!} a)
(fn map [f coll]
(icollect [_ v (ipairs coll)]
(f v)))
(local cs (fcollect [i 1 10] (doto (a.chan 1) (>!! i))))
(<!! (go (map <! cs)))
[1 2 3 4 5 6 7 8 9 10]
Differences from my previous library
- Proper cancellation of actions
- Limited detection of deadlocks
- Seamless Scheduling
- Self-adjusting scheduler
Proper cancellation of actions (takes)
(local c (a.chan))
(go (print "original go block:" (<! c)) (a.close! c))
(let [t (a.timeout 100)]
(match (a.alts!! [c t])
[_ t] (print "timeout waiting for c")
[?val c] (print "alts!! got val:" ?val "from c")))
(>!! c "hi")
(print (<!! c))
timeout waiting for c
original go block: hi
nil
Proper cancellation of actions (puts)
(local c (a.chan))
(go (>! c 42) (print "done putting") (a.close! c))
(let [t (a.timeout 100)]
(match (a.alts!! [[c 27] t])
[_ t] (print "timeout waiting for c")
[_ c] (print "alts!! managed to put 27 to c")))
(print :val (<!! c))
(print (<!! c))
timeout waiting for c
val 42
done putting
nil
Limited detection of deadlocks
(local c (a.chan))
(<!! c)
The ManyToManyChannel: 0x55a3f7753720 is not ready and there are no scheduled tasks. Value will never arrive.
stack traceback:
./src/async.fnl:877: in function ?
/var/home/alist/.local/bin/fennel:880: in function ?
[C]: in function 'xpcall'
/var/home/alist/.local/bin/fennel:885: in function ?
/var/home/alist/.local/bin/fennel:912: in function ?
/var/home/alist/.local/bin/fennel:922: in function ?
[C]: in ?
(local c (a.chan))
(go (<! (a.timeout 3000)))
(<!! c)
The ManyToManyChannel: 0x55a3f7878900 is not ready and there are no scheduled tasks. Value will never arrive.
stack traceback:
./src/async.fnl:877: in function ?
/var/home/alist/.local/bin/fennel:880: in function ?
[C]: in function 'xpcall'
/var/home/alist/.local/bin/fennel:885: in function ?
/var/home/alist/.local/bin/fennel:912: in function ?
/var/home/alist/.local/bin/fennel:922: in function ?
[C]: in ?
(local c (a.chan))
(go (<! (a.timeout 3000)) (>! c "hello there!"))
(<!! c)
hello there!
Limited detection of deadlocks (problems)
(import-macros {: go-loop} (doto :src.async require))
(local log-ch (a.chan))
(go-loop [msg (<! log-ch)]
(when (not= nil msg)
(print msg)
(recur (<! log-ch))))
(go-loop []
(when (>! log-ch (os.date))
(<! (a.timeout 30000))
(recur)))
(<!! c)
Seamless Scheduling
(go
(for [i 1 10]
(<! (a.timeout 100))
(print :i i)))
(print :loop :start)
(for [i 1 1000000000] nil)
(print :loop :done)
loop start
i 1
i 2
i 3
i 4
i 5
i 6
i 7
i 8
i 9
i 10
loop done
Event Loops
Seamless Scheduling (in ClojureScript)
;; ClojureScript
(defn queue-dispatcher []
(when-not (and queued? running?)
(set! queued? true)
(goog.async.nextTick process-messages)))
Seamless Scheduling
;; back to Fennel
(fn process-messages [event]
{:private true}
(let [took (- (os/clock) register-time)
(_ n) (cancel-hook process-messages)]
(set n-instr
(if (not= event :count) n
(m/floor (/ 0.01 (/ took n)))))
(faccumulate [done nil _ 1 1024 :until done]
(case (next dispatched-tasks)
f (tset dispatched-tasks (doto f pcall) nil)
nil true))
(each [t ch (pairs timeouts)]
(when (>= 0 (difftime t (time)))
(tset timeouts t (ch:close!))))
(when (or (next dispatched-tasks) (next timeouts))
(schedule-hook process-messages n-instr))))
Self-adjusting scheduler
(var (n-instr register-time orig-hook orig-mask orig-n)
1_000_000)
(fn schedule-hook [hook n]
{:private true}
(when (and gethook sethook)
(let [(hook* mask n*) (gethook)]
(when (not= hook hook*)
(set (register-time orig-hook orig-mask orig-n)
(values (os/clock) hook* mask n*))
(sethook main-thread hook "" n)))))
(fn cancel-hook [hook]
{:private true}
(when (and gethook sethook)
(match (gethook main-thread)
(hook ?mask ?n)
(do (sethook main-thread orig-hook orig-mask orig-n)
(values ?mask ?n)))))
Problems with this approach
- Interfering with other hooks
- Calculations are imprecise and can get off by calling other things that affect CPU time
- Dependency on the debug library.
Extras
- TCP
- HTTP, maybe
TCP
(require 'server)
(local {: <! : <!! : alts! : >! &as a} (require :src.async))
(require-macros (doto :src.async require))
(local [c1 c2 c3]
[(a.tcp.chan {:host "localhost" :port 12345})
(a.tcp.chan {:host "localhost" :port 12345})
(a.tcp.chan {:host "localhost" :port 12345})])
(go (>! c1 "1")
(>! c2 "2")
(>! c3 "3"))
(a.alts!! [c1 c2 c3 (a.timeout 2000)])
["2 done" #<SocketChannel: 0x55a3f7456860>]
(<!! c1)
(<!! c2)
(<!! c3)
HTTP, maybe
(<!! (a.http.get "https://andreyor.st"))
{:body #<ManyToManyChannel: 0x55a3f796dff0>
:headers {:cache-control "max-age=600"
:connection "close"
:content-length "13058"
:content-type "text/html; charset=utf-8"
:date "Sun, 31 Dec 2023 17:55:37 GMT"
:etag "\"e1ac3c9ef30d35d48af8268c8f839e6141d57ce86027da2c1342897a0011341a\""
:expires "Sun, 31 Dec 2023 18:05:37 UTC"
:last-modified "Sun, 31 Dec 2023 12:54:17 GMT"
:permissions-policy "interest-cohort=()"
:vary "Origin"}
:status 200}
(a.alts!! [(a.http.get "https://andreyor.st/posts/feed.xml")
(a.http.get "https://andreyor.st/tags/css/feed.xml")])
Thanks
Resources:
- The async.fnl library: https://github.com/andreyorst/async.fnl
- Clojure core.async: https://github.com/clojure/core.async
- A post about the library with some additional details: https://andreyor.st/posts/2023-05-15-clojures-coreasync-port-for-the-fennel-language/