Event-driven finite state machine for a distributed trading system

One problem I had when building my distributed trading system is managing states asynchronously from multiple triggers. For example, when the alpha engine say buy, it needs confirmation from the position engine to see if it is safe to enter a new position. I could chain one check after another imperatively or via callbacks. However, the underlying constraint is that these triggers:

  1. are resource-intensive to generate,
  2. might need to compose many of them,
  3. not sequential or have one-to-one depencency, and
  4. most importantly, they are in separate programs or different machines

Thus I've opted to abstract this problem out into its own module of the system as an event-driven finite state machine (FSM) to keep track of state transitions. Intimidating term, but my first implementation was just if-else statements to qualify as such. The benefit is that each of my system's components only need to push signals and pull states from a central interface without having to worry about what should it call next or poll anything else to see if the stars are aligned. That drastically simplified development and maintenance.

The responsiblities of my FSM module are to:

  1. listen to all the signals,
  2. figure out all the transitions, and
  3. publish the latest states for the rest of the system.

Handling asynchronous events

I use RabbitMQ as the message transport layer between my system's modules. All I need to do here is to associate an appropriate message handler to each triggering input for the FSM. Here's one example of the event handlers using the Clojure RabbitMQ library, Langohr. The rest of this part are just standard RabbitMQ publish/subscribe stuff.

(defn- event-message-handler [ch {:keys [headers delivery-tag redelivery?]} ^bytes payload]
  (let [{:keys [message-type user-id instrument quantity]} (read-payload payload)]
    (when (= :position-event message-type)
      (-> (get-cached-states user-id)
          (update-position-state instrument quantity)
          (cache-states user-id))))
  (lbc/ack ch delivery-tag))

This is called when a position event is received with information such as user, instrument, and quantity. This handler would thread these information by fetching current states for that user, evaluate next state with input, and then cache the new states for the user.

State transitions

Below is one of my system's state transition diagrams.

state transition example

There are 4 states represented by 4 colours with 4 triggers signalling state transition. The program is expected to handle up to hundreds of independent states concurrently with event triggers coming in a couple times per second.

As I was saying, my first implementation is just a set of if-else methods. For example, an engage trigger would call the engaging method to determine the next state given the implicit input engage and current state.

(defn engaging
  [current]
  (condp = current
    "white" "yellow"
    "yellow" "white"
    "green" "red"
    "red" "green"))

There were a handful of these boilerplate code. So after I deployed my system I came back to refactor them. I've been meaning to give core.logic a try for a while so this seem like a good place to start using it.

Before we can ask the logic solver question we need to define relations. Here I define a transition relation to specify all the state transition definition conveniently in one place.

(defrel transition from input to)
(facts transition [[nil :open :green]
                   [nil :close :white]
                   [:white :engage :yellow]
                   [:white :disengage :white]
                   [:white :open :green]
                   [:white :close :white]
                   [:yellow :engage :white]
                   [:yellow :disengage :white]
                   [:yellow :open :green]
                   [:yellow :close :yellow]
                   [:green :engage :red]
                   [:green :disengage :red]
                   [:green :open :green]
                   [:green :close :yellow]
                   [:red :engage :green]
                   [:red :disengage :red]
                   [:red :open :red]
                   [:red :close :white]])

And the event handler methods are just wrappers for a one-liner logic expression asking the question -- given current stage, cur-state, and input trigger, input, what state can q take to satisfy this constraint?

(defn next-state 
  "Solver for next state"
  [input cur-state]
  (first (run 1 [q] (transition cur-state input q))))

(def colour-clicked (partial next-state :engage))
(def colour-deactivate (partial next-state :disengage))
(defn next-position-colour [cur open?]
  (if open?
    (next-state :open cur)
    (next-state :close cur)))

Not the most illustrative core.logic example but it does the job.

Getting started with core.logic is surprisingly easy. I went through the Primer and tutorial and got this working in one try.

State caching and sharing

Now that the state transition have been taken care of, states are cached and served on Redis for other parts of the system. I use Redis for this because it is fast and easy. Values are stored in edn format instead of something more popular like JSON to maintain data structure through the wire.

This is my first time using edn in production. All inter-process messages in this trading system are edn formatted. It works seamlessly with Clojure by simply using str to write and clojure.edn/read-string to read. Besides my other Clojure components in the system, my trade broker interface is written in Java. My Java program use edn-java to parse and unparse complex Clojure data structures (e.g. nested maps with keywords).

(def pool         (car/make-conn-pool))
(def spec-server1 (car/make-conn-spec))
(defmacro with-car [& body] `(car/with-conn pool spec-server1 ~@body))

(defn get-cached-states
  "Generate edn from database."
  [id]
  (edn/read-string (with-car (car/get (str "states:" id)))))

(defn cache-states [m id]
  (with-car (car/set (str "states:" id) (str m))))

I find coupling edn with Redis is a fantastic choice as it's almost like working with Clojure's native concurrency data structures, such as atom, but also enable external programs to access the data.

Simple and quick

The entire event-driven FSM program is less than 200 lines of Clojure code and took no more than a few hours to do. However, I did give it some pondering time for a few days. I haven't done any benchmark to estimate performance result. So all I can say is that this setup can handle my simplistic use case with barely any load on the server so I'm happy with it.

A few years ago, I would have set a whole bunch of flags to switch states. In fact, that's what I did. The biggest satisfaction here for me isn't the implementation or technologies, it is seeing through the underlying problem at hand and solving it with a common pattern that made my work simpler.

Posted 20 May 2013 in computing.

What should I work on next for Cascalog?

Too many things to do, too little time. I figured we can do this in a data-driven way. So here's a poll. Please only submit an entry if you use Cascalog. And only one per person. Let's see if an honour system would work.

Poll result one week later

poll result

Votes Choice
15 Self-contained documentation site, e.g. http://cascalog.quantisan.com (demo address only)
10 Improve and consolidate guides into Github wiki
11 Bring Cascalog to Cascading 2.1/2.2
8 Fix open issues
13 Add features and performance increase, e.g. new logic solver, make use of new Cascading features since 2.0
2 Other

One person voted for integrating other machine learning library into Cascalog and another to isolate system library.

Posted 08 May 2013 in computing.

Securing a fresh Ubuntu server with Fabric tasks

An administrative task that I've done countless number of times is spinning up new servers to host applications like a web dashboard or a trading engine. At uSwitch my colleagues use Puppet and other smart devops tools that I have no idea about to automate our infrastructure. For my own work I just need an easy tool to run some commands over ssh. I chose to use Fabric to automate this repetitive but necessary task of hardening a fresh Ubuntu server. Now that I have this set of Fabric tasks, after I create a new instance, I can just run a single Fabric task and the server would be configured properly for use.

The choice for Fabric is because I've been using Fabric for some time already to perform simple deployment tasks. Then I stumbled on an open-source project that actually configures an Ubuntu server using Fabric. Much of these scripts are based on that source. Automating these tasks saves me a lot of time and ensures consistency of configurations. Perhaps when the need arises, I should look into other tools like Vagrant too.

Here are some basic tasks that I perform on a new Ubuntu 12.04 server and the corresponding Fabric task script.

Create an administrator account

from fabric.api import *
from fabric.contrib.files import append
from fabric.contrib.files import sed
from fabric.contrib.files import exists
from fabric.operations import prompt

def create_admin_account(admin, default_password=None):
    """Create an account for an admin to use to access the server."""
    env.user = "root"

    opts = dict(
        admin=admin,
        default_password=default_password or env.get('default_password') or 'secret',
    )

    # create user
    sudo('egrep %(admin)s /etc/passwd || adduser %(admin)s --disabled-password --gecos ""' % opts)

    # add public key for SSH access
    if not exists('/home/%(admin)s/.ssh' % opts):
        sudo('mkdir /home/%(admin)s/.ssh' % opts)

    opts['pub'] = prompt("Paste %(admin)s's public key: " % opts)
    sudo("echo '%(pub)s' > /home/%(admin)s/.ssh/authorized_keys" % opts)

    # allow this user in sshd_config
    append("/etc/ssh/sshd_config", 'AllowUsers %(admin)s@*' % opts, use_sudo=True)

    # allow sudo for maintenance user by adding it to 'sudo' group
    sudo('gpasswd -a %(admin)s sudo' % opts)

    # set default password for initial login
    sudo('echo "%(admin)s:%(default_password)s" | chpasswd' % opts)

harden ssh server

def harden_sshd():
    """Security harden sshd."""

    # Disable password authentication
    sed('/etc/ssh/sshd_config',
        '#PasswordAuthentication yes',
        'PasswordAuthentication no',
        use_sudo=True)

    # Deny root login
    sed('/etc/ssh/sshd_config',
        'PermitRootLogin yes',
        'PermitRootLogin no',
        use_sudo=True)

    sudo("restart ssh")

setup firewall

def install_ufw(rules=None):
    """Install and configure Uncomplicated Firewall."""
    sudo('apt-get update')
    sudo('apt-get -yq install ufw')
    configure_ufw(rules)

def configure_ufw(rules=None):
    """Configure Uncomplicated Firewall."""
    # reset rules so we start from scratch
    sudo('ufw --force reset')

    rules = rules or env.rules or err("env.rules must be set")
    for rule in rules:
        sudo(rule)

    # re-enable firewall and print rules
    sudo('ufw --force enable')
    sudo('ufw status verbose')

time synchronisation daemon

def set_system_time(timezone=None):
    """Set timezone and install ``ntp`` to keep time accurate."""

    opts = dict(
        timezone=timezone or env.get('timezone') or '/usr/share/zoneinfo/UTC',
    )

    # set timezone
    sudo('cp %(timezone)s /etc/localtime' % opts)

    # install NTP
    sudo('apt-get -yq install ntp')

enable unattended upgrades

def install_unattended_upgrades(email=None):
    """Configure Ubuntu to automatically install security updates."""
    opts = dict(
        email=email or env.get('email') or err('env.email must be set'),
    )

    sudo('apt-get -yq install unattended-upgrades')
    sed('/etc/apt/apt.conf.d/50unattended-upgrades',
        '//Unattended-Upgrade::Mail "root@localhost";',
        'Unattended-Upgrade::Mail "%(email)s";' % opts,
        use_sudo=True)

    sed('/etc/apt/apt.conf.d/20auto-upgrades',
        'APT::Periodic::Update-Package-Lists "0";',
        'APT::Periodic::Update-Package-Lists "1";',
        use_sudo=True)

    append('/etc/apt/apt.conf.d/20auto-upgrades',
           'APT::Periodic::Unattended-Upgrade "1";',
           use_sudo=True)
Posted 06 May 2013 in computing.

Unlock Lisp sorcery in your data structure by implementing Clojure ISeq

People that has gone through The Little Schemer might not find this exciting. One of the things that I discovered while patching Clatrix is that implementing clojure.lang.ISeq interface in your custom data structure unlocks the magic of Lisp composition. By enabling primative operators such as first, next, more, cons, higher-level operations such as map and reduce would just work when operating on your data structure. I find it fascinating that a native Fortran matrix object (through jBLAS) can be made clojury with a few magic operations implemented.

However, getting a deftype implementation of Matrix correct took some effort as these operators are not as simple as they seem.

public interface ISeq extends IPersistentCollection {
    Object first();
    ISeq next();
    ISeq more();
    ISeq cons(Object o);
}

For example, say we have a matrix M like so.

=> (def M (matrix [[1 2 3] [4 5 6] [7 8 9]]))
=> M
A 3x3 matrix
-------------
1.00e+00  2.00e+00  3.00e+00 
4.00e+00  5.00e+00  6.00e+00 
7.00e+00  8.00e+00  9.00e+00

Reducing M across its maps is equivalent to a column-wise operation.

=> (reduce #(map + %1 %2) M)
(12.0 15.0 18.0)

Yet for a while this doesn't work because I wasn't careful on my implementation of first.

Consider the case of a 2x2 matrix. A 2x2 matrix is structurally equivalent to a nested vector. Calling first on these would yield:

=> (first [[1 2] [3 4]])
[1 2]
=> (first (matrix [[1 2] [3 4]]))
A 1x2 matrix
-------------
1.00e+00  2.00e+00

And for a 3x1 vector matrix, i.e. one-dimensional, it is equivalent to a regular vector.

=> (first [1 2 3])
1
=> (first (matrix [1 2 3]))
1.0

But here's a tricky bit. What happens during reduce as it keeps recurring next and first?

Let's step this through for (reduce #(map + %1 %2) M). %1 is basically the result so far and %2 is the first of the remaining collection to be processed.

iterationaccumulated (%1)first (%2)remaining
0nil[1 2 3][[1 2 3] [4 5 6] [7 8 9]]
1[1 2 3][4 5 6][[4 5 6] [7 8 9]]
2, bad[5 7 9]7[[7 8 9]]
2, good[5 7 9][7 8 9][[7 8 9]]

The problem arises in the second iteration. Calling (rest [[4 5 6] [7 8 9]]) returns [[7 8 9]]. However, (matrix [[7 8 9]]) is a row vector and (matrix [7 8 9]) is a column vector. Both are considered one dimensional. In either case, first of a vector should return the first element, which is a number. Thus at this iteration, reduce breaks because you can't map a sequence with a number, (map + [5 7 9] 7), to get an accumulated value.

What we want though, is for the second iteration to return [7 8 9] instead because the original matrix is not a vector. Luckily, this particular problem has been solved by my colleague Antonio Garrote when he did this in Java a year ago by keeping a predicate field signifying is this matrix supposed to be vector or not.

So there you have it. If you find yourself needing to implement deftype to build your own data structure in Clojure. Do consider implementing clojure.lang.ISeq to leverage high-level Clojure functions but be careful about those seemingly simplistic primitive operators.

←   newer continue   →