Life Beyond Relational Database in Haskell - The case for Event Sourcing

Posted on May 12, 2016

This post contains the code I demonstrated as part of my talk at nCrafts. This work is based on the following references (stepping on the shoulders of giants, as always…):

Imports, stuff to make the compiler happy

We first add the usual LANGUAGE extension incantations… Note they should probably go in to the .cabal file.

{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
module Hevents.Eff.Demo where

Then a whole bunch of imports… I never managed to choose a definite course of action on whether to import full module, import qualified, import only selected symbols. Seems to me this is pretty much a team-level convention.

import           Control.Category
import           Control.Concurrent.Async
import           Control.Concurrent.STM
import qualified Control.Eff                as E
import           Control.Eff.Exception
import           Control.Eff.Lift           as E hiding (lift)
import           Control.Exception          (finally)
import           Control.Monad.Except
import qualified Control.Monad.State        as ST
import           Control.Monad.Trans.Either
import qualified Data.ByteString.Builder    as BS
import           Data.Either                (rights)
import           Data.Proxy
import           Data.Serialize             (Serialize, get, put)
import           Data.Typeable
import           Data.Void
import           Hevents.Eff                as W
import           Prelude                    hiding (init, (.))
import           Servant
import           Servant.Client
import           System.Environment
import           Test.Hspec
import           Test.QuickCheck            as Q
import           Test.QuickCheck.Monadic    as Q

The “Business Domain”

We want to implement an event-sourced service that will allow us to manipulate a simple integer counter:

Let’s start writing a test…

We’ll first write some property describing the behaviour of our model for a single command. We expect that an Increment command shall set an initialized counter to the same value than the command, and that a Decrement command will decrease the value of a counter. Note here we anticipate a bit on bounds requirement checking by setting the counter to some value which is greater than any Decrement command we are supposed to issue.

prop_shouldApplySingleCommandRespectingBounds :: Command Counter*> Bool
prop_shouldApplySingleCommandRespectingBounds c@(Increment n) =
    let OK result = init `act` c
    in  init `apply` result == Counter n
prop_shouldApplySingleCommandRespectingBounds c@(Decrement n) =
    let bounderCounter = Counter singleCommandUpperBound
        OK result = bounderCounter `act` c
    in  bounderCounter `apply` result == Counter (singleCommandUpperBound - n)

This property requires some way to generate Arbitrary instances of our commands, which is straightforward:

instance Arbitrary (Command Counter) where
  arbitrary = oneof [ Increment <$> singleCommandValue
                    , Decrement <$> singleCommandValue
                    ]
    where
        singleCommandValue = choose (0,singleCommandUpperBound)

singleCommandUpperBound :: Int
singleCommandUpperBound = 20

Another useful property we want to define that our counter respects its bounds, no matter which sequence of events we send to it:

prop_shouldNotApplyCommandsOverBounds :: [ Command Counter ] -> Bool
prop_shouldNotApplyCommandsOverBounds commands =
  let finalCounter = ST.execState (mapM updateModel commands) init
  in  isWithinBounds finalCounter

isWithinBounds :: Counter -> Bool
isWithinBounds (Counter value) = value >= 0 && value <= 100

And of course we need some implementation of our Commands and Counter. The latter is pretty much a simple wrapping of Int but to define the former we need our Counter type to be an instance of Model typeclass, which defines the basic structure of an event-sourced component (or aggregate).

newtype Counter = Counter { counter :: Int } deriving (Eq,Show)

instance Model Counter where

We define the “component” types of our model: Commands, events and errors which here are very simple.

  data Command Counter = Increment Int
                       | Decrement Int
                       deriving (Eq, Show)
  data Event Counter = Added Int deriving (Eq,Show)
  data Error Counter = OutOfBounds deriving (Eq,Show)

Then comes our initial value…

  init = Counter 0

act computes the effect of applying a command to current state of our counter…


  Counter k `act` Increment n = if k + n <= 100
                                then OK $ Added n
                                else KO OutOfBounds

  Counter k `act` Decrement n = if k - n >= 0
                                then OK $ Added (-n)
                                else KO OutOfBounds

Then apply actually “updates” (or more precisely, create a new updated instance of) the counter by applying the value to add.

  Counter k `apply` Added n = Counter $ k + n

When we check the behaviour of applying a sequence of Commands to our counter, we make use of a library function which runs in the State monad and allow us to “fold” the application of a sequence of commands to a Counter:

updateModel :: (Model a) => Command a -> State a (Result a)

Exposing services built on our model

As always, we start with the testing part but this time we expect our tests to have side effects and model interactions of the outside world with our system’s fragment. We shall start with very simple modelling of client’s behaviour:

data CounterAction = GetCounter
                   | IncCounter Int
                   | DecCounter Int
                   deriving (Show)

In order to generate samples for our actions we assume some frequency distribution, giving more weight to actions that get state than to actions that update it.


instance Arbitrary CounterAction where
  arbitrary = frequency [ (3, return GetCounter)
                        , (2, IncCounter <$> choose (0,10))
                        , (1, DecCounter <$> choose (0,10))
                        ]

Then we use monadic QuickCheck to run an arbitrary sequence of user actions on an “effectful” model which is initialised with some state holder and a storage backend.

prop_servicesRespectCounterBounds :: [ CounterAction ] -> Property
prop_servicesRespectCounterBounds actions = Q.monadicIO $ do
  results <- Q.run $ do
    (model, storage) <- prepareContext
    mapM (effect storage model . interpret) actions

  assert $ all isWithinBounds (rights results)

This test might be considered to be a little weak, and we could probably enhance it with testing error conditions. That’s something definitely worth doing for production code, however for the sake of simplicity we will not add more tests here.

Prparation step is simple but deserve some explanations:

prepareContext = (,)           <$>
  newTVarIO (W.init :: Counter) <*>
  atomically W.makeMemoryStore

effect is actually a natural transformation that composes all the small little effects we need in our sample and “lift” them in the IO monad. Note the Eff type which exposes explicitly all the effects our code is allowed to make thus constraining its behaviour to a limited subset of possible interactions with outside world. The ServantErr type is the type of exceptions we can “throw” using Exc effect: This anticipates on the needs of the REST API we shall expose later on. Actually it could have been any kind of Exception instance but once again, this makes things simpler and removes a layer of transformation from custom exceptions to Servant errors, something we would probably want to do in production code.

type EventSourced s e a =
  E.Eff (State s E.:> Store E.:> Exc e E.:> Lift STM E.:> Void) a

effect :: (Typeable m, Typeable e, Storage STM s, Registrar STM m reg)
         => s -> reg
         -> E.Eff (EventSourced Counter ServantErr) a
         -> IO (Either e a)
effect s m = atomically . runSync . runExc . W.runStore s .  W.runState m

The definition of our services is pretty straightforward:

type CounterService a = EventSourced Counter ServantErr a

getCounter :: CounterService Int
getCounter = counter <$> getState

increment :: Int -> CounterService Int
increment n = applyCommand (Increment n) >>= storeEvent

decrement :: Int -> CounterService Int
decrement n = applyCommand (Decrement n) >>= storeEvent

The storeEvent function is where most of the grunt work happens and notably where we do error handling:

storeEvent :: Either (Error Counter) (Event Counter)
             -> CounterService Int
storeEvent (Left e)  = throwExc $ fromModelError e
storeEvent (Right e) = store e >>= either (throwExc . fromDBError) (const $ counter <$> getState)
  where
    fromModelError e = err400 { errBody = makeBody $ "Invalid command " ++ show e }
    fromDBError    e = err500 { errBody = makeBody $ "DB Error " ++ show e }
    makeBody         = BS.toLazyByteString . BS.stringUtf8

Note that because of the Store effect we need to be able to serialize our events:

instance Serialize (Event Counter) where
  put (Added i) = put i
  get           = Added <$> get

The last missing piece is the interpret function which turns our QuickCheck generated actions into actual effectful actions to be run against our system.

interpret GetCounter     = getCounter
interpret (IncCounter n) = increment n
interpret (DecCounter n) = decrement n

Expose our counter services through a REST API

The last step of our Counter “microservice” is to expose it as a REST interface. We will leverage the excellent work done on the Servant and firstly define out API’s type which is a simple exposition of the previously defined services:


type CounterApi =
 "counter" :> (Get '[JSON] Int
              :<|> "increment" :> Capture "inc" Int :> Get '[JSON] Int
              :<|> "decrement" :> Capture "dec" Int :> Get '[JSON] Int)

counterApi :: Proxy CounterApi
counterApi = Proxy

Writing our test is pretty straightforward and mostly repeats the previous test at the service level layer, the main difference being the effectful services are run within an actual web server on some predefined port. Note this is makes our test brittle and non parallelizable: It would be better to let the server select a free port and return it as part of its startup.

The only noteworthy part is that we can use our previously defined effect “interpreter” natural transformation and wrap it inside an EitherT transformer which is the type expected by Servant.

prop_counterServerImplementsCounterApi :: [ CounterAction ] -> Property
prop_counterServerImplementsCounterApi actions = Q.monadicIO $ do
  results <- Q.run $ do
    (model, storage) <- prepareContext
    server <- W.runWebServerErr 8082 counterApi
                  (Nat $ EitherT . effect storage model) handler
    mapM runClient actions `finally` cancel server

  assert $ all isWithinBounds (rights results)

The client-side services are obtained from a destructured assignment using Servant’s (:<|>) operator which is overloaded at the type and value level and then used to interpret user actions.

counterState :<|> incCounter :<|> decCounter = client counterApi (BaseUrl Http "localhost" 8082)

runClient GetCounter     = runEitherT $ counterState
runClient (IncCounter n) = runEitherT $ incCounter n
runClient (DecCounter n) = runEitherT $ decCounter n

It is also noteworthy we can simply build our REST server composing our already defined services. Servant’s type wizardy ensures the expected type for the whole API is matched by actual functions composed with (:<|>).

handler = getCounter :<|> increment :<|> decrement

Writing a main server that is able to listen on some port and runs our services is left as an exercise for the reader.

Conclusion

The goal of this post and the associated talk was to demonstrate how Haskell’s type system and some well designed and though out libraries made it easy to build type-safe “microservices”. In particular, I would like to emphasize the following points: