Event-driven finite state machine for a distributed trading system

One problem I had when building my distributed trading system is managing states asynchronously from multiple triggers. For example, when the alpha engine say buy, it needs confirmation from the position engine to see if it is safe to enter a new position. I could chain one check after another imperatively or via callbacks. However, the underlying constraint is that these triggers:

  1. are resource-intensive to generate,
  2. might need to compose many of them,
  3. not sequential or have one-to-one depencency, and
  4. most importantly, they are in separate programs or different machines

Thus I've opted to abstract this problem out into its own module of the system as an event-driven finite state machine (FSM) to keep track of state transitions. Intimidating term, but my first implementation was just if-else statements to qualify as such. The benefit is that each of my system's components only need to push signals and pull states from a central interface without having to worry about what should it call next or poll anything else to see if the stars are aligned. That drastically simplified development and maintenance.

The responsiblities of my FSM module are to:

  1. listen to all the signals,
  2. figure out all the transitions, and
  3. publish the latest states for the rest of the system.

Handling asynchronous events

I use RabbitMQ as the message transport layer between my system's modules. All I need to do here is to associate an appropriate message handler to each triggering input for the FSM. Here's one example of the event handlers using the Clojure RabbitMQ library, Langohr. The rest of this part are just standard RabbitMQ publish/subscribe stuff.

(defn- event-message-handler [ch {:keys [headers delivery-tag redelivery?]} ^bytes payload]
  (let [{:keys [message-type user-id instrument quantity]} (read-payload payload)]
    (when (= :position-event message-type)
      (-> (get-cached-states user-id)
          (update-position-state instrument quantity)
          (cache-states user-id))))
  (lbc/ack ch delivery-tag))

This is called when a position event is received with information such as user, instrument, and quantity. This handler would thread these information by fetching current states for that user, evaluate next state with input, and then cache the new states for the user.

State transitions

Below is one of my system's state transition diagrams.

state transition example

There are 4 states represented by 4 colours with 4 triggers signalling state transition. The program is expected to handle up to hundreds of independent states concurrently with event triggers coming in a couple times per second.

As I was saying, my first implementation is just a set of if-else methods. For example, an engage trigger would call the engaging method to determine the next state given the implicit input engage and current state.

(defn engaging
  [current]
  (condp = current
    "white" "yellow"
    "yellow" "white"
    "green" "red"
    "red" "green"))

There were a handful of these boilerplate code. So after I deployed my system I came back to refactor them. I've been meaning to give core.logic a try for a while so this seem like a good place to start using it.

Before we can ask the logic solver question we need to define relations. Here I define a transition relation to specify all the state transition definition conveniently in one place.

(defrel transition from input to)
(facts transition [[nil :open :green]
                   [nil :close :white]
                   [:white :engage :yellow]
                   [:white :disengage :white]
                   [:white :open :green]
                   [:white :close :white]
                   [:yellow :engage :white]
                   [:yellow :disengage :white]
                   [:yellow :open :green]
                   [:yellow :close :yellow]
                   [:green :engage :red]
                   [:green :disengage :red]
                   [:green :open :green]
                   [:green :close :yellow]
                   [:red :engage :green]
                   [:red :disengage :red]
                   [:red :open :red]
                   [:red :close :white]])

And the event handler methods are just wrappers for a one-liner logic expression asking the question -- given current stage, cur-state, and input trigger, input, what state can q take to satisfy this constraint?

(defn next-state 
  "Solver for next state"
  [input cur-state]
  (first (run 1 [q] (transition cur-state input q))))

(def colour-clicked (partial next-state :engage))
(def colour-deactivate (partial next-state :disengage))
(defn next-position-colour [cur open?]
  (if open?
    (next-state :open cur)
    (next-state :close cur)))

Not the most illustrative core.logic example but it does the job.

Getting started with core.logic is surprisingly easy. I went through the Primer and tutorial and got this working in one try.

State caching and sharing

Now that the state transition have been taken care of, states are cached and served on Redis for other parts of the system. I use Redis for this because it is fast and easy. Values are stored in edn format instead of something more popular like JSON to maintain data structure through the wire.

This is my first time using edn in production. All inter-process messages in this trading system are edn formatted. It works seamlessly with Clojure by simply using str to write and clojure.edn/read-string to read. Besides my other Clojure components in the system, my trade broker interface is written in Java. My Java program use edn-java to parse and unparse complex Clojure data structures (e.g. nested maps with keywords).

(def pool         (car/make-conn-pool))
(def spec-server1 (car/make-conn-spec))
(defmacro with-car [& body] `(car/with-conn pool spec-server1 ~@body))

(defn get-cached-states
  "Generate edn from database."
  [id]
  (edn/read-string (with-car (car/get (str "states:" id)))))

(defn cache-states [m id]
  (with-car (car/set (str "states:" id) (str m))))

I find coupling edn with Redis is a fantastic choice as it's almost like working with Clojure's native concurrency data structures, such as atom, but also enable external programs to access the data.

Simple and quick

The entire event-driven FSM program is less than 200 lines of Clojure code and took no more than a few hours to do. However, I did give it some pondering time for a few days. I haven't done any benchmark to estimate performance result. So all I can say is that this setup can handle my simplistic use case with barely any load on the server so I'm happy with it.

A few years ago, I would have set a whole bunch of flags to switch states. In fact, that's what I did. The biggest satisfaction here for me isn't the implementation or technologies, it is seeing through the underlying problem at hand and solving it with a common pattern that made my work simpler.

Comments