Fennel Async
Fennel Async Features
- A small and simple library for asynchronous programming.
- Linear code.
- Channels, Promises, and Agents.
- All operations are blocking on the main thread and non-blocking inside the async thread.
- TCP support via
luasocket
. - Asynchronous Network REPL.
Simple example
-
Requiring a library and creating a channel:
(local async (require :async)) (local c (async.chan))
-
Spawning an asynchronous thread:
(async #(let [data (async.take c)] (print (.. "got: " data)) (async.put c (+ 1 data))))
-
Communicating with the channel:
(async.put c 41)
(async.take c)
When threads advance
Threads advance when:
- Creating a new Thread via
async
function. - Putting onto or taking from a Channel.
- Awaiting or delivering a Promise.
Synchronization primitives:
- Channel.
- Promise.
Channels and backpressure
-
Channels are buffered by default with an unlimited buffer size.
-
There are three kinds of limited-size buffers:
- parking/blocking
buffer
, dropping-buffer
,sliding-buffer
.
- parking/blocking
-
Blocking channel example:
(local bc (async.chan 1))
(async #(for [i 1 5] (async.put bc i) (print (.. "done putting: " i))))
-
Taking from such buffer advances the loop:
(async.take bc 100 :no-more-values)
Dropping buffer
- Dropping buffer is meant for high throughput when you don’t care about missing data.
- Only keeps the data that came first:
All puts succeed:
(local dc (async.chan (async.dropping-buffer 1)))
The loop is completed but only the first-ever message is left:(async #(for [i 1 5] (async.put dc i) (print (.. "done putting: " i))))
(async.take dc 100 :no-more-values)
Sliding buffer
- Sliding buffer keeps the most recent data instead:
All puts succeed:
(local sc (async.chan (async.sliding-buffer 3)))
The loop is completed but only the last-ever messages are left:(async #(for [i 1 5] (async.put sc i) (print (.. "done putting: " i))))
(async.take sc 100 :no-more-data)
Promises
-
Non-blocking puts.
-
Single put.
-
Can contain errors.
A basic promise:
(async.promise)
-
Advances threads on awaiting the result:
(local c (async.chan)) (async #(do (async.sleep 100) (async.put c 41)))
(async.await (async #(+ 1 (async.take c))))
Agents
- Agents are stateful entities.
- Support sending tasks to run on the agent thread-pool.
- Each Agent has its own task queue.
- Tasks run in the order they’re given.
Manually advancing all threads:
(local a (async.agent 0)) (async.send a #(do (async.sleep 100) (+ $ 1))) (async.send a #(* $ 2)) (a:deref)
(do (async.run :once) (a:deref))
A simple TCP server
- Asynchronously accepts connections.
- Uses an asynchronous handler for requests.
Handler example
- Accepts data as a string.
- Runs on an asynchronous thread-pool.
- Can use all async ops, like
sleep
:
(local async (require :async))
(fn handler [data]
(async.sleep 1000)
(->> data tonumber (+ 1) tostring))
Running the server
-
Creating a server thread:
(async.tcp.start-server handler {:host "127.0.0.1" :port 8080})
-
Telling the scheduler to start managing tasks:
(async.run)
Connecting a client to the TCP server
- Clients are socket-channels.
- Socket channels support non-blocking operations.
Simple client
- Connects to the server, creating a channel.
- Creates a promise for delivering response asynchronously.
- Spawns a thread that puts data to a channel, and waits for the response:
(local async (require :async))
(fn send [conn data]
(let [c (async.tcp.connect conn)]
(async #(if (async.put c data)
(async.take c)
(error "unable to put data to connection")))))
Sending and receiving requests sequentially
- Spawn a thread.
- Immediately await the promise.
- Repeat:
(let [start (async.time)
res (fcollect [data 1 3]
(async.await (send {:host "127.0.0.1" :port 8080} data)))]
(print (: "; Elapsed %s ms" :format (- (async.time) start)))
res)
Sending and receiving requests simultaneously
- Spawn all threads at once.
- Await on all promises at once:
(let [start (async.time)
promises (fcollect [data 1 10]
(send {:host "127.0.0.1" :port 8080} data))
res (async.zip* promises)]
(print (: "; Elapsed %s ms" :format (- (async.time) start)))
(doto res (tset :n nil)))
Asynchronous HTTP example
In this section:
- Implementing an URL and response parsers.
- Creating a TCP connection and sending HTTP as plain text.
- Receiving a response from the server.
URL parser
- Parses basic
http(s)://
style URLs:(fn parse-authority [authority] (let [userinfo (authority:match "([^@]+)@") port (authority:match ":(%d+)") host (if userinfo (authority:match (.. "@([^:]+)" (if port ":" ""))) (authority:match (.. "([^:]+)" (if port ":" ""))))] {: userinfo : port : host})) (fn parse-url [url] "Parses http(s) URLs." (let [scheme (url:match "^([^:]+)://") {: host : port : userinfo} (parse-authority (if scheme (url:match "//([^/]+)/") (url:match "^([^/]+)/"))) scheme (or scheme "http") path (url:match "//[^/]+/([^?#]+)") query (url:match "%?([^#]+)#?") fragment (url:match "#([^?]+)%??")] (-> {: scheme : host : port : userinfo : path : query : fragment}))) (fn format-path [{: path : query : fragment}] (.. "/" (or path "") (if query (.. "?" query) "") (if fragment (.. "?" fragment) "")))
Response parser
- Parses HTTP response headers and body
(fn parse-headers [headers] (let [[resp & headers] (icollect [s (headers:gmatch "(.-)\r\n")] s)] (values (tonumber (resp:match " (%d+) ")) (collect [_ s (ipairs headers)] (let [key (s:match "(.-): ") val (s:match ".-: (.*)")] (values key val)))))) (fn parse-response [data] (let [headers (data:match "^(.-)\r\n\r\n") body (data:match "^.-\r\n\r\n(.*)") (code headers) (parse-headers headers)] {: headers : code : body}))
GET request implementation
-
Connects to the server via TCP.
-
Formats a request string, with necessary headers.
-
Puts the string into the socket channel.
-
Loops until socket channel was closed:
(fn get [url] (let [{: host : port &as parsed} (parse-url url) c (async.tcp.connect {:host host :port (or port 80)}) request (.. "GET " (format-path parsed) " HTTP/1.1\r\n" "Host: " host (if port (.. ":" port) "") "\r\n" "Connection: close\r\n" "User-Agent: Mozilla/5.0\r\n" "\r\n")] (async #(if (async.put c request) ((fn loop [data] (match (async.take c) false (parse-response (table.concat data)) data* (loop (doto data (table.insert data*))))) []) (error "unable to put request")))))
-
Requesting a file from the
www.w3.org
:(async.await (get "https://www.w3.org/TR/1999/REC-html401-19991224/html40.txt"))
Network REPL
-
Basic REPL:
(global async (require :async)) (async.tcp.start-repl {:host :localhost :port 1234}) (async.run)
-
Basic client:
(local async (require :async)) (local repl (async.tcp.connect {:host :localhost :port 1234})) (async.put repl "(+ 1 2 3)") (async.take repl)
Thanks!
- Project repository: https://gitlab.com/andreyorst/fennel-async
- Article about the implementation: https://andreyorst.gitlab.io/posts/2021-10-27-naive-async-implementation-in-fennel/
Questions?