DESIGN PATTERNS

Concurrency Patterns

Date Published:
Last Modified:

Overview

This page focuses on concurrency patterns applicable to embedded software, that is software running on anything from an 8-bit microcontroller to embedded Linux in either C or C++.

The terms thread 1 and thread 2 are used as imaginary threads that need to share data in the following examples. These threads are treated as persistent threads which each carry out sporadic, important tasks, rather than a master/worker thread arrangement.

Some people, when confronted with a problem, think, “I know, I’ll use threads!” Now they have 10 problems. –Bill Schindler

Message Queues

Example

#include <iostream>
#include <memory>
#include <string>
#include <thread>
    
#include "ThreadSafeQueue.hpp"
    
using namespace mn::CppUtils;
    
struct Cmd {
    std::string name;
    std::shared_ptr<void> data;
};
    
// We require a thread-safe queue, which is not part of the standard!
// See https://github.com/gbmhunter/CppUtils/blob/master/include/CppUtils/ThreadSafeQueue.hpp
ThreadSafeQueue<Cmd> queue_;
    
void ThreadFn() {
    Cmd cmd;
    
    while(true) {
        queue_.Pop(cmd);
        std::cout << "Received command. cmd.name = " << cmd.name << std::endl;
        
        if(cmd.name == "CMD_1") {          
            auto data = std::static_pointer_cast<std::string>(cmd.data); // Cast back to exact data type
            std::cout << "Received data (as string) = \"" << *data << "\"" << std::endl;
        } else if(cmd.name == "CMD_2") {
            // Do stuff with cmd.data
        } else if(cmd.name == "QUIT") {
            // Break from infinite while loop, which will mean that
            // this function will return and then thread.join() will
            // return
            break;  
        } else
            throw std::runtime_error("Command name not recognized.");
    }
}
    
int main() {
    std::thread t(ThreadFn);
    
    auto data = std::shared_ptr<std::string>(new std::string("hello"));
    
    Cmd cmd;
    cmd.name = "CMD_1";
    cmd.data = std::static_pointer_cast<void>(data); // Cast away the exact data type
    queue_.Push(cmd);
    
    cmd.name = "QUIT";
    cmd.data = nullptr; // Some commands may not need data!
    queue_.Push(cmd);
    
    t.join();
}

Run this example online at https://wandbox.org/permlink/HtrZL147mUQGs4vK.

Pros/Cons

  • Easier to prove that deadlocks do not exist
  • Thread 1 cannot easily get data back from thread 2, as the only way to communicate is through the message queues. Thread 1 would have send a message to thread 2 requesting data, and then thread 2 would have to send a message back to thread 1 with the data. This can break the “flow” of the code for thread 1.
  • Difficulties in safely handling multiple “types” of data sent on the message queue. The example above creates the data on the heap, creates a shared pointer to it and then casts away the type to std::shared_ptr<void>. You then have to make sure the receiving thread casts back to the correct type depending on the message.

Synchronization Objects

The most basic form of synchronization object is a mutex. When using C, popular operating systems such as FreeRTOS or Linux provide OS specific mutexes. If using C++ and have the standard library available, you can use std::mutex (as of C++11).

Why do we have to use synchronization objects? Because if more than one threads happens to write to the same memory at the same time, we run into problems.

Pros/Cons

  • Enables thread 1 to call a standard public function belonging to thread 2, along with all the benefits that go along with this such as type-safe input arguments and return arguments.
  • Threads still require some notification object to block on
  • It is harder for thread 1 to tell thread 2 to do some “work”. Whilst in a message queue system thread 1 can just thread 2 a “do work” message,

A Hybrid Approach

What if we used a message queue for the sending thread to tell the receiving thread to perform some work, and a synchronization object when sending thread just wants to access some data from receiving thread?

This is possible with the use of a message queue for incoming messages and a synchronization object to synchronize the receiving message loop with the data accesses.

A Message Queue That Can Wait For Return Data

One way to solve the “no return data” issue with message queues is for the receiving thread to send a thread-safe data object along with the rest of the message data to the sending thread. When this message is processed in receiving thread, it calculates the return data and notifies the sending thread with the thread-safe data object.

This can be implemented in C++ by using std::future and std::promise. These are synchronization objects that allow data to be transmitted between threads. The below code shows an example of this. Note how the main() function can call thread1.SetData() and then block and wait for return data by calling thread1.GetData(). Both of these calls result in messages arriving in thread 1’s message queue.

///
/// \file 				MsgQueueTests.cpp
/// \author 			Geoffrey Hunter <gbmhunter@gmail.com>
/// \edited             n/a
/// \created			2017-10-24
/// \last-modified		2017-10-25
/// \brief 				Contains tests for the MsgQueue class.
/// \details
///		

// System includes
#include <future>
#include <iostream>
#include <memory>
#include <thread>

// User includes
#include "MsgQueue.hpp"

using namespace mn::CppUtils::MsgQueue;


class Thread1 {
    public:
    Thread1() {
        thread_ = std::thread(&Thread1::Process, this);
    }

    ~Thread1() {
        if(thread_.joinable()) {
            queue_.Push(TxMsg("EXIT"));
            thread_.join();
        }
    }

    void SetData(std::string data) {
        auto dataOnHeap = std::make_shared<std::string>(data);
        queue_.Push(TxMsg("SET_DATA", dataOnHeap));
    }

    std::string GetData() {
        TxMsg msg("GET_DATA", ReturnType::RETURN_DATA);
        queue_.Push(msg);
        auto retVal = msg.WaitForData();
        return *std::static_pointer_cast<std::string>(retVal);
    }

    private:

    void Process() {

        RxMsg msg;

        // This loop can be broken by sending the "EXIT" msg!
        while(true) {
            queue_.Pop(msg);

            //==============================================//
            //============= MSG PROCESSING LOOP ============//
            //==============================================//
            if(msg.GetId() == "SET_DATA") {
                auto data = std::static_pointer_cast<std::string>(msg.GetData()); // Cast back to exact data type
                data_ = *data;
            } else if(msg.GetId() == "GET_DATA") {
                auto retData = std::make_shared<std::string>(data_);
                msg.ReturnData(retData);
                break;
            } else if(msg.GetId() == "EXIT") {
                // Break from infinite while loop, which will mean that
                // this function will return and then thread.join() will
                // return
                break;
            } else
                throw std::runtime_error("Command name not recognized.");
        }
    }

    std::thread thread_;
    MsgQueue queue_;

    std::string data_;
};

int main() {
    Thread1 thread1;
    std::cout << "Sending \"Hello\" data to thread1." << std::endl;
    thread1.SetData("Hello");
    auto returnedData = thread1.GetData();
    std::cout << "Returned data from thread1 = \"" << returnedData << "\"." << std::endl;
    return 0;
}

The above code can be run online at https://wandbox.org/permlink/gAq8i3HMcPMe6AMS.


Like this page? Upvote with shurikens!

Tags:

    comments powered by Disqus