Andrey Listopadov

Fennel Async

~5 minutes read

Fennel Async Features

  • A small and simple library for asynchronous programming.
  • Linear code.
  • Channels, Promises, and Agents.
  • All operations are blocking on the main thread and non-blocking inside the async thread.
  • TCP support via luasocket.
  • Asynchronous Network REPL.

Simple example

  • Requiring a library and creating a channel:

    (local async (require :async))
    (local c (async.chan))
    
  • Spawning an asynchronous thread:

    (async
     #(let [data (async.take c)]
        (print (.. "got: " data))
        (async.put c (+ 1 data))))
    
  • Communicating with the channel:

    (async.put c 41)
    
    (async.take c)
    

When threads advance

Threads advance when:

  • Creating a new Thread via async function.
  • Putting onto or taking from a Channel.
  • Awaiting or delivering a Promise.

Synchronization primitives:

  • Channel.
  • Promise.

Channels and backpressure

  • Channels are buffered by default with an unlimited buffer size.

  • There are three kinds of limited-size buffers:

    • parking/blocking buffer,
    • dropping-buffer,
    • sliding-buffer.
  • Blocking channel example:

    (local bc (async.chan 1))
    
    (async #(for [i 1 5]
              (async.put bc i)
              (print (.. "done putting: " i))))
    
  • Taking from such buffer advances the loop:

    (async.take bc 100 :no-more-values)
    

Dropping buffer

  • Dropping buffer is meant for high throughput when you don’t care about missing data.
  • Only keeps the data that came first:
    (local dc (async.chan (async.dropping-buffer 1)))
    
    All puts succeed:
    (async #(for [i 1 5]
              (async.put dc i)
              (print (.. "done putting: " i))))
    
    The loop is completed but only the first-ever message is left:
    (async.take dc 100 :no-more-values)
    

Sliding buffer

  • Sliding buffer keeps the most recent data instead:
    (local sc (async.chan (async.sliding-buffer 3)))
    
    All puts succeed:
    (async #(for [i 1 5]
              (async.put sc i)
              (print (.. "done putting: " i))))
    
    The loop is completed but only the last-ever messages are left:
    (async.take sc 100 :no-more-data)
    

Promises

  • Non-blocking puts.

  • Single put.

  • Can contain errors.

    A basic promise:

    (async.promise)
    
  • Advances threads on awaiting the result:

    (local c (async.chan))
    (async #(do (async.sleep 100) (async.put c 41)))
    
    (async.await (async #(+ 1 (async.take c))))
    

Agents

  • Agents are stateful entities.
  • Support sending tasks to run on the agent thread-pool.
  • Each Agent has its own task queue.
  • Tasks run in the order they’re given.
    (local a (async.agent 0))
    
    (async.send a #(do (async.sleep 100) (+ $ 1)))
    (async.send a #(* $ 2))
    
    (a:deref)
    
    Manually advancing all threads:
    (do (async.run :once)
        (a:deref))
    

A simple TCP server

  • Asynchronously accepts connections.
  • Uses an asynchronous handler for requests.

Handler example

  • Accepts data as a string.
  • Runs on an asynchronous thread-pool.
  • Can use all async ops, like sleep:
(local async (require :async))

(fn handler [data]
  (async.sleep 1000)
  (->> data tonumber (+ 1) tostring))

Running the server

  • Creating a server thread:

    (async.tcp.start-server
     handler
     {:host "127.0.0.1" :port 8080})
    
  • Telling the scheduler to start managing tasks:

    (async.run)
    

Connecting a client to the TCP server

  • Clients are socket-channels.
  • Socket channels support non-blocking operations.

Simple client

  • Connects to the server, creating a channel.
  • Creates a promise for delivering response asynchronously.
  • Spawns a thread that puts data to a channel, and waits for the response:
(local async (require :async))

(fn send [conn data]
  (let [c (async.tcp.connect conn)]
    (async #(if (async.put c data)
                (async.take c)
                (error "unable to put data to connection")))))

Sending and receiving requests sequentially

  • Spawn a thread.
  • Immediately await the promise.
  • Repeat:
(let [start (async.time)
      res (fcollect [data 1 3]
            (async.await (send {:host "127.0.0.1" :port 8080} data)))]
  (print (: "; Elapsed %s ms" :format (- (async.time) start)))
  res)

Sending and receiving requests simultaneously

  • Spawn all threads at once.
  • Await on all promises at once:
(let [start (async.time)
      promises (fcollect [data 1 10]
                 (send {:host "127.0.0.1" :port 8080} data))
      res (async.zip* promises)]
  (print (: "; Elapsed %s ms" :format (- (async.time) start)))
  (doto res (tset :n nil)))

Asynchronous HTTP example

In this section:

  • Implementing an URL and response parsers.
  • Creating a TCP connection and sending HTTP as plain text.
  • Receiving a response from the server.

URL parser

  • Parses basic http(s):// style URLs:
    (fn parse-authority [authority]
      (let [userinfo (authority:match "([^@]+)@")
            port (authority:match ":(%d+)")
            host (if userinfo
                     (authority:match (.. "@([^:]+)" (if port ":" "")))
                     (authority:match (.. "([^:]+)" (if port ":" ""))))]
        {: userinfo : port : host}))
    
    (fn parse-url [url]
      "Parses http(s) URLs."
      (let [scheme (url:match "^([^:]+)://")
            {: host : port : userinfo} (parse-authority
                                        (if scheme
                                            (url:match "//([^/]+)/")
                                            (url:match "^([^/]+)/")))
            scheme (or scheme "http")
            path (url:match "//[^/]+/([^?#]+)")
            query (url:match "%?([^#]+)#?")
            fragment (url:match "#([^?]+)%??")]
        (-> {: scheme : host : port : userinfo : path : query : fragment})))
    
    (fn format-path [{: path : query : fragment}]
      (.. "/" (or path "") (if query (.. "?" query) "") (if fragment (.. "?" fragment) "")))
    

Response parser

  • Parses HTTP response headers and body
    (fn parse-headers [headers]
      (let [[resp & headers] (icollect [s (headers:gmatch "(.-)\r\n")] s)]
        (values (tonumber (resp:match " (%d+) "))
                (collect [_ s (ipairs headers)]
                  (let [key (s:match "(.-): ")
                        val (s:match ".-: (.*)")]
                    (values key val))))))
    
    (fn parse-response [data]
      (let [headers (data:match "^(.-)\r\n\r\n")
            body (data:match "^.-\r\n\r\n(.*)")
            (code headers) (parse-headers headers)]
        {: headers : code : body}))
    

GET request implementation

  • Connects to the server via TCP.

  • Formats a request string, with necessary headers.

  • Puts the string into the socket channel.

  • Loops until socket channel was closed:

    (fn get [url]
      (let [{: host : port &as parsed} (parse-url url)
            c (async.tcp.connect {:host host :port (or port 80)})
            request (.. "GET " (format-path parsed) " HTTP/1.1\r\n"
                        "Host: " host (if port (.. ":" port) "") "\r\n"
                        "Connection: close\r\n"
                        "User-Agent: Mozilla/5.0\r\n"
                        "\r\n")]
        (async #(if (async.put c request)
                    ((fn loop [data]
                       (match (async.take c)
                         false (parse-response (table.concat data))
                         data* (loop (doto data (table.insert data*)))))
                     [])
                    (error "unable to put request")))))
    
  • Requesting a file from the www.w3.org:

    (async.await (get "https://www.w3.org/TR/1999/REC-html401-19991224/html40.txt"))
    

Network REPL

  • Basic REPL:

    (global async (require :async))
    (async.tcp.start-repl {:host :localhost :port 1234})
    (async.run)
    
  • Basic client:

    (local async (require :async))
    (local repl (async.tcp.connect {:host :localhost :port 1234}))
    
    (async.put repl "(+ 1 2 3)")
    (async.take repl)
    

Thanks!

Questions?