Andrey Listopadov

Implementing SSE with fnl-http and async.fnl

@programming http fennel ~7 minutes read

Some time ago, I was working on an HTTP library for Fennel. As a proof of concept, I added a module that implements a simple web server and wanted to experiment with it. The server can serve files from a directory using this simple handler:

(fn handler [{: method : path &as request}]
  (case method
    :GET (let [file (io.open (path:gsub "^/" "") :r)]
           (or file {:status 404 :body "not found"}))
    _ {:status 405
       :body "Method not allowed"}))

This handler is simple - when the client sends a GET request with a given path, for example GET http://localhost:8080/some/file.txt, the handler simply removes the leading / and tries to open the given path as a file handle. If the file is present, we return it as is, and the server will handle the closure of this file once the client disconnects. Otherwise, we return a table with the 404 status code, indicating that the file wasn’t found. Other methods are not allowed, and we respond with a 405 error by matching on method with case.

Now we can require the server module and start a small file-serving server:

(local server
  (require :io.gitlab.andreyorst.fnl-http.server))
(local s
  (server.start handler {:port 8080}))

(s:wait)

The (s:wait) call simply locks the current thread because, without it, the process will just exit.

This is, of course, the simplest web server there is; it just accesses files in the current directory, returning them as application/octet-stream. The handler function can be more sophisticated, handling different methods, parsing arguments, etc. For our purposes today, we won’t need most of these features, but I thought it would be better to familiarize you with how the server is set up.

The topic of today’s post is Server-Sent Events, or SSE for short. It is a protocol for communicating between a server and multiple clients that uses a persistent connection and pushes messages from the server to the client. The client can then process these messages at their own pace and react accordingly.

We’ll need an asynchronous library to handle SSE for multiple clients, so let’s add it to the mix:

(import-macros {: go : go-loop}
  :io.gitlab.andreyorst.async)

(local {: <! : >! : timeout : alts! : pub : sub : chan : close! &as async}
  (require :io.gitlab.andreyorst.async))

(local json
  (require :io.gitlab.andreyorst.json))

I’ve also included a JSON library; both are already used by fnl-http. Now, let’s begin!

SSE stream

We’ll send events to our clients via a channel:

(local event-ch (chan))

This event channel will be our main way to send events to the clients, but we won’t read from it directly. Instead, we can create a “publication” or pub that will allow clients to subscribe to the messages:

(local event-pub
  (pub event-ch #:sse #(async.sliding-buffer 100)))

For the purposes of this post, there’s only one type of message in this event-pub thing, but in a more complex scenario, you can separate messages and subscribe to different topics altogether. Speaking of subscribing, here’s a helper function:

(fn subscribe-to-events [client-ch]
  (sub event-pub :sse client-ch))

Finally, we need a function to register an event:

(fn register-event [event]
  (>! event-ch event))

Now the fun part. Right now, our event system is ready to be accessed by clients, but the events themselves don’t make any sense, as they’re just some objects. We’ll define events as tables with the following structure:

{:event "optional event type"
 :id    "optional event id"
 :data  "event data"}

Since our register-event function simply writes to the event-ch, we need a way to intercept that. We can do so by writing middleware that accepts a channel with events as tables and returns a new channel that will have all of the events formatted:

(fn wrap-format-sse-event [ch]
  (let [out (chan)]
    (go-loop [e (<! ch)]
      (case e
        {: event : id : data}
        (doto out
          (>! (.. "event: " (tostring event) "\n"))
          (>! (.. "id: " (tostring id) "\n"))
          (>! (.. "data: " (tostring data) "\n")))
        {: event : data}
        (doto out
          (>! (.. "event: " (tostring event) "\n"))
          (>! (.. "data: " (tostring data) "\n")))
        {: data}
        (doto out
          (>! (.. "data: " (tostring data) "\n"))))
      (if e
          (recur (<! ch))
          (close! out)))
    out))

Now, the general mechanism is as follows:

  1. When a client connects, we create a channel and subscribe to events.
  2. We wrap this channel with our formatter and some other middleware if needed.
  3. Finally, we return this channel as the response body.

Before we write an HTTP handler that does this, we need to consider one other thing: the connection. If the connection is not used for a long time, it can be closed automatically. Because of that, it is common to send meaningless heartbeat events every now and then. Let’s write another wrapper that will track if the client’s event stream doesn’t have any events for some time and return a heartbeat:

(fn wrap-heartbeat [ch timeout-ms]
  (let [out (chan)]
    (go-loop []
      (let [tout (timeout timeout-ms)]
        (match (alts! [tout ch])
          [_ tout]
          (do (>! out {:event "heartbeat" :data 0})
              (recur))
          [event ch]
          (do (>! out event)
              (recur))
          [nil ch] (close! out))))
    out))

Similarly to our previous wrapper, we accept a channel with events and return a new channel. The asynchronous go-loop, which I didn’t mention before, will run in a separate virtual thread, trying to read from both the given channel and a timeout channel. Whichever returns first will determine what event will be sent to the client. If a normal event comes through, we send it to the out channel as is and restart the timer. If no events come and our timer has been exhausted, we send a heartbeat and restart the timer again. If our events channel returns nil, it means that it was closed, so we also close the out channel.

Finally, we can implement our handler:

(fn handler [{: path : method : headers : content}]
  (case path
    (where "/events" (= :GET method))
    (let [events (chan)]
      (subscribe-to-events events)
      {:status 200
       :headers {:content-type "text/event-stream"}
       :body (-> events
                 (wrap-heartbeat 10_000)
                 wrap-format-sse-event)})
    (where "/notify" (= :POST method))
    (case (tonumber headers.Content-Length)
      len (let [event (json.decode (content:read len))]
            (register-event event)
            {:status 200
             :body "OK"})
      _ (error "malformed Content-Lenght"))
    _
    {:status 405
     :headers {:content-type "application/json"}
     :body (json.encode {:error "Method not allowed"})}))

This handler has two main entry points: /events and /notify. The /events entry is for subscribing to the events stream with a GET request. The /notify entry is for sending events from the client to the server (because right now we don’t want to make the server too complicated to generate events on its own).

Same as before, we can start a server:

(local s
  (server.start handler {:port 12345}))

(s:wait)

Reading the event stream and triggering events

With the server up and running, we can connect a pair of clients.

The first client will subscribe to the event stream by executing a GET request:

(local http
  (require :io.gitlab.andreyorst.fnl-http.client))

(local resp
  (http.get "http://localhost:12345/events" {:as :stream}))

If we look at the response, we’ll see the headers containing text/event-stream as the content type, as per the response from our server, and the body as a stream, as requested by the client:

{:body #<Reader: 0x564ae85ca7d0>
 :headers {:Connection "keep-alive"
           :Content-Type "text/event-stream"
           :Transfer-Encoding "chunked"}
 :http-client #<tcp-client: 0x564ae812fa10>
 :protocol-version {:major 1 :minor 1 :name "HTTP"}
 :reason-phrase "OK"
 :request-time 38
 :status 200
 :trace-redirects {}}

We can consume events with a helper function:

(fn consume-events [stream]
  (fn read-line [stream buf]
    (let [c (stream:read 1)]
      (when (not= "\n" c)
        (table.insert buf c)
        (read-line stream buf))))
  (while true
    (let [buf []]
      (read-line stream buf)
      (io.write "[" (os.date) "]\t" (table.concat buf) "\n"))))

(consume-events resp.body)

In the process output, we’ll be able to see heartbeat events appearing every ten seconds:

[Wed Jul 30 00:34:35 2025]	event: heartbeat
[Wed Jul 30 00:34:45 2025]	data: 0
[Wed Jul 30 00:34:45 2025]	event: heartbeat
[Wed Jul 30 00:34:55 2025]	data: 0

Next, we can send a POST request from another process, sending an event:

(local http
  (require :io.gitlab.andreyorst.fnl-http.client))
(local json
  (require :io.gitlab.andreyorst.json))

(http.post "http://localhost:12345/notify"
           {:body (json.encode {:event "message" :id "client-2" :data "Hi from client-2"})})

We immediately see the event in the first client’s output, followed by heartbeat events:

[Wed Jul 30 00:35:03 2025]	event: message
[Wed Jul 30 00:35:03 2025]	id: client-2
[Wed Jul 30 00:35:03 2025]	data: Hi from client-2
[Wed Jul 30 00:35:03 2025]	event: heartbeat
[Wed Jul 30 00:35:13 2025]	data: 0

As you can see, the time when the heartbeat event arrived is exactly ten seconds after the last non-heartbeat event. Of course, we can connect multiple clients subscribed to the event stream, and they will all receive events.

Conclusion1

This should give you an idea of how to implement an SSE stream in your server using the fnl-http library. Of course, this implementation is only loosely based on the specification, so I wouldn’t suggest it as a general implementation guide, but I hope it provides a clear enough explanation of how things work.

The server module is still very much in a proof-of-concept stage, but as this article shows, it is working quite well. All in all, I believe the ability to return an asynchronous channel as a body from the server is a nice way to handle this kind of event system. The code can be written as usual, and no specific handling for returning asynchronous data via HTTP is required.

Thanks for reading, and I hope you’ll find fnl-http interesting enough to try!


  1. As much as I don’t like “Conclusion” as the last heading in an article, I can’t think of a different one here. ↩︎