Skip to content

Event Driven State Machines

One of my favourite design patterns for embedded software is the event driven hierarchical state machine. I’ve used many different patterns out there, including super-loops, many threads with sequential steps and blocking, async/await functionality and more. Event driven HSMs offer the following benefits:

  • It is easy to write complex logic that needs to check many things and respond accordingly.
  • It is easy to add new requirements.
  • States intuitively map to how we humans think about the problem.
  • You can easily have a lot of complexity in a single thread, keeping in sequential but responsive.

It has the following design goals:

  • Event handlers must not block, i.e. they run to completion. This keeps the state machine responsive.
  • States have to be flexible. Each state is implemented as an object which has a entry, handleEvent and exit function. Using a function to handle events rather than a state transition table makes it much more flexible to add new requirements.
  • States can be children of other states, i.e. a hierarchical state machine. This allows for code reuse and generalization as you can install common event handlers to parent states.
  • Each state machine could run it’s own thread, or all state machines could be run sequentially in a single thread. That is not an important part of the design pattern.

Let’s take a look at a simple example in Python style pseudo code. This ignores much of the internals of the state machine (which would typically be implemented in a library that you would re-use across projects, e.g. see my C++ NinjaHSM library on GitHub), and just shows the core idea of what the implementation would look like for a specific application.

class StateMachine:
def handleEvent(event):
# Logic to dispatch to the correct state handler function...
def risingControlRods_Entry():
# Entry functions for states typically start timers whose expiry events
# are used to do things during the state.
controlRodsUpTimer.startMs(2000)
checkTemperatureTimer.startMs(10)
def risingControlRods_HandleEvent(event):
if event == ControlRodsUpTimerExpired:
transitionTo(State.RUNNING)
elif event == EmergencyStopButtonPressed:
transitionTo(State.EMERGENCY_STOP)
elif event == CheckTemperatureTimerExpired:
temperature = getTemperature()
if temperature > 1000:
transitionTo(State.EMERGENCY_STOP)
if __name__ == "__main__":
myStateMachine = StateMachine()
while True:
# waitForEvent() would contain the logic to calculate when the next timer expiry
# occurs, and block on a message queue for that long. When the block returns, you
# know you have either received an external event or the timer expired (the OS API
# will let you check this).
event = waitForEvent()
myStateMachine.handleEvent(event)

Downsides

Complex Sequential Logic Becomes Tedious

I once had to configure a motor controller over I2C. The motor controller had a lot of different settings that needed to be configured, and each one required it’s own I2C transaction. I needed to send about 20 commands in total. After filling up the TX buffer with the command you wanted to send, each I2C transaction required waiting (blocking) for command to be send and for the response to be received back. And in between some commands there were extra delays needed to allow the motor controller to do various things. In a app where you allowed blocking and/or async/await, this would have been just (we assume here that write() and read() are both blocking):

i2c.write(motorAdd, cmd1);
i2c.read(motorAdd, response1);
thread.sleepMs(100);
i2c.write(motorAdd, cmd2);
i2c.read(motorAdd, response2);
thread.sleepMs(100);
i2c.write(motorAdd, cmd3);
i2c.read(motorAdd, response3);
// ...

This gets tricky in a event driven architecture, as you can’t just block if you want the event loop to be responsive and handle other events in the interim. Creating 2 separate states for each command (cmd1_sending, cmd1_receiving, cmd2_sending, cmd2_receiving, etc.) would be tedious! Each handler would have to listen for a send finished and command received event emitted from an I2C interrupt or similar, and then transition to the next state. It would be easy to accidentally transition to the wrong next state.

One solution which prevents you from having so many states is to capture the “where I am up to” in a counter (stored as part of the state machine’s data) rather than a separate state. The following example shows pseudo code for how this could be done. It assumes that the I2C calls startWrite() and startRead() are non-blocking and just initiate the transaction. Events are received from an I2C interrupt or similar when they are finished. The cmdIdx variable prevents the need to have 20 different states just to configure the motor driver!

# This would normally be a member variable of the state machine class
cmdIdx = 0
def stateConfiguringMotorDriver_Entry():
cmdIdx = 0
# Start the first command
i2c.startWrite(motorAdd, cmd1)
def stateConfiguringMotorDriver_HandleEvent(event)
if event == I2cResponseReceived:
# We've finished sending a command, so start sending the next one
cmdIdx += 1
if cmdIdx == 1:
i2c.startWrite(motorAdd, cmd2)
elif cmdIdx == 2:
i2c.startWrite(motorAdd, cmd3)
# ...
elif event == I2cCmdSent:
# We've finished sending the command, so initiate the response
if cmdIdx == 0:
i2c.startRead(motorAdd, response1)
elif cmdIdx == 1:
i2c.startRead(motorAdd, response2)
elif cmdIdx == 2:
i2c.startRead(motorAdd, response3)
# ...

Another solution would be to use blocking code and just accept that the state machine will not be responsive while the blocking code is running. This may be ok in certain cases. However, even if it is ok now, it might cause problems later. What happens when you want another part of the application to be able to immediately stop the motor driver in the event of an emergency? If the state machine has started configuring the motor driver, your state machine will ignore any stop event until the motor driver is fully configured, which is probably unacceptable behaviour.

Sometimes You Have To Call Libraries That Block

Sometimes you are left with no choice --- you have to call code outside of your control that will block (be it a peripheral driver function, OS call, library call e.t.c.). In this case, you are left with a few options:

  1. Move the blocking code into a separate worked thread which is started by the state machine. An event is posted back to the state machine when the blocking code is finished.
  2. Just accept that the state machine will not be responsive while the blocking code is running. This may be ok in certain cases. If you have other code that does need to be responsive, perhaps you could move that into a different state machine in a different thread?

Comparison to Other Approaches

Both the traditional “many threads with blocking operations” and async/await approaches suffer from the sequential problem — your code may only block or await for one condition at a time. It becomes tricky to handle multiple conditions at once, which means developers typically resort to waiting for very small amounts of time and then checking “why did I wake up”?