The Boost C++ Libraries

Synchronizing Threads

While using multiple threads can increase the performance of an application, it usually also increases complexity. If several functions execute at the same time, access to shared resources must be synchronized. This involves significant programming effort once the application reaches a certain size. This section introduces the classes provided by Boost.Thread to synchronize threads.

Example 44.7. Exclusive access with boost::mutex
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::mutex mutex;

void thread()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    mutex.lock();
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
    mutex.unlock();
  }
}

int main()
{
  boost::thread t1{thread};
  boost::thread t2{thread};
  t1.join();
  t2.join();
}

Multithreaded programs use mutexes for synchronization. Boost.Thread provides different mutex classes with boost::mutex being the simplest. The basic principle of a mutex is to prevent other threads from taking ownership while a particular thread owns the mutex. Once released, a different thread can take ownership. This causes threads to wait until the thread that owns the mutex has finished processing and releases its ownership of the mutex.

Example 44.7 uses a global mutex of type boost::mutex called mutex. The thread() function takes ownership of this object by calling lock(). This is done right before the function writes to the standard output stream. Once a message has been written, ownership is released by calling unlock().

main() creates two threads, both of which are executing the thread() function. Each thread counts to five and writes a message to the standard output stream in each iteration of the for loop. Because std::cout is a global object shared by the threads, access must be synchronized. Otherwise, messages could get mixed up. Synchronization guarantees that at any given time, only one thread has access to std::cout. Both threads try to acquire the mutex before writing to the standard output stream, but only one thread at a time actually accesses std::cout. No matter which thread successfully calls lock(), all other threads need to wait until unlock() has been called.

Acquiring and releasing mutexes is a typical scheme and is supported by Boost.Thread through different types. For example, instead of using lock() and unlock(), you can use boost::lock_guard.

Example 44.8. boost::lock_guard with guaranteed mutex release
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::mutex mutex;

void thread()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    boost::lock_guard<boost::mutex> lock{mutex};
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
  }
}

int main()
{
  boost::thread t1{thread};
  boost::thread t2{thread};
  t1.join();
  t2.join();
}

boost::lock_guard automatically calls lock() and unlock() in its constructor and its destructor, respectively. Access to the shared resource is synchronized in Example 44.8 just as it was when both member functions were called explicitly. The class boost::lock_guard is an example of the RAII idiom to make sure resources are released when they are no longer needed.

Besides boost::mutex and boost::lock_guard, Boost.Thread provides additional classes to support variants of synchronization. One of the essential ones is boost::unique_lock which provides several helpful member functions.

Example 44.9. The versatile lock boost::unique_lock
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::timed_mutex mutex;

void thread1()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    boost::unique_lock<boost::timed_mutex> lock{mutex};
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
    boost::timed_mutex *m = lock.release();
    m->unlock();
  }
}

void thread2()
{
  using boost::this_thread::get_id;
  for (int i = 0; i < 5; ++i)
  {
    wait(1);
    boost::unique_lock<boost::timed_mutex> lock{mutex,
      boost::try_to_lock};
    if (lock.owns_lock() || lock.try_lock_for(boost::chrono::seconds{1}))
    {
      std::cout << "Thread " << get_id() << ": " << i << std::endl;
    }
  }
}

int main()
{
  boost::thread t1{thread1};
  boost::thread t2{thread2};
  t1.join();
  t2.join();
}

Example 44.9 uses two variants of the thread() function. Both variants still write five numbers in a loop to the standard output stream, but they now use the class boost::unique_lock to lock a mutex.

thread1() passes the variable mutex to the constructor of boost::unique_lock, which makes boost::unique_lock try to lock the mutex. In this case boost::unique_lock behaves no differently than boost::lock_guard. The constructor of boost::unique_lock calls lock() on the mutex.

However, the destructor of boost::unique_lock doesn’t release the mutex in thread1(). In thread1() release() is called on the lock, which decouples the mutex from the lock. By default, the destructor of boost::unique_lock releases a mutex, like the destructor of boost::lock_guard – but not if the mutex is decoupled. That’s why there is an explicit call to unlock() in thread1().

thread2() passes mutex and boost::try_to_lock to the constructor of boost::unique_lock. This makes the constructor of boost::unique_lock not call lock() on the mutex but try_lock(). Thus, the constructor only tries to lock the mutex. If the mutex is owned by another thread, the try fails.

owns_lock() lets you detect whether boost::unique_lock was able to lock a mutex. If owns_lock() returns true, thread2() can access std::cout immediately. If owns_lock() returns false, try_lock_for() is called. This member function also tries to lock a mutex, but it waits for the mutex for a specified period of time before failing. In Example 44.9 the lock tries for one second to obtain the mutex. If try_lock_for() returns true, std::cout may be accessed. Otherwise, thread2() gives up and skips a number. Thus, it is possible that the second thread in the example won’t write five numbers to the standard output stream.

Please note that in Example 44.9, the type of mutex is boost::timed_mutex, not boost::mutex. The example uses boost::timed_mutex because this mutex is the only one that provides the member function try_lock_for(). This member function is called when try_lock_for() is called on the lock. boost::mutex provides only the member functions lock() and try_lock().

boost::unique_lock is an exclusive lock. An exclusive lock is always the sole owner of a mutex. Another lock can only get control of the mutex after the exclusive lock has released it. Boost.Thread also supports shared locks with the class boost::shared_lock, which is used with shared_mutex.

Example 44.10. Shared locks with boost::shared_lock
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>

void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

boost::shared_mutex mutex;
std::vector<int> random_numbers;

void fill()
{
  std::srand(static_cast<unsigned int>(std::time(0)));
  for (int i = 0; i < 3; ++i)
  {
    boost::unique_lock<boost::shared_mutex> lock{mutex};
    random_numbers.push_back(std::rand());
    lock.unlock();
    wait(1);
  }
}

void print()
{
  for (int i = 0; i < 3; ++i)
  {
    wait(1);
    boost::shared_lock<boost::shared_mutex> lock{mutex};
    std::cout << random_numbers.back() << '\n';
  }
}

int sum = 0;

void count()
{
  for (int i = 0; i < 3; ++i)
  {
    wait(1);
    boost::shared_lock<boost::shared_mutex> lock{mutex};
    sum += random_numbers.back();
  }
}

int main()
{
  boost::thread t1{fill}, t2{print}, t3{count};
  t1.join();
  t2.join();
  t3.join();
  std::cout << "Sum: " << sum << '\n';
}

Non-exclusive locks of type boost::shared_lock can be used if threads only need read-only access to a specific resource. A thread modifying the resource needs write access and thus requires an exclusive lock. Since a thread with read-only access is unaffected by other threads reading the same resource at the same time, it can use a non-exclusive lock and share a mutex.

In Example 44.10, both print() and count() only read the variable random_numbers. The print() function writes the last value in random_numbers to the standard output stream, and the count() function adds it to the variable sum. Because neither function modifies random_numbers, both can access it at the same time using a non-exclusive lock of type boost::shared_lock.

Inside the fill() function, an exclusive lock of type boost::unique_lock is required because it inserts new random numbers into random_numbers. fill() releases the mutex using the unlock() member function and then waits for one second. Unlike the previous examples, wait() is called at the end of the for loop to guarantee that at least one random number is placed in the container before it is accessed by either print() or count(). Both of these functions call the wait() function at the beginning of their for loops.

Looking at the individual calls to the wait() function from different locations, one potential issue becomes apparent: The order of the function calls is directly affected by the order in which the CPU actually executes the individual threads. Using condition variables, the individual threads can be synchronized so that values added to random_numbers are immediately processed by a different thread.

Example 44.11. Condition variables with boost::condition_variable_any
#include <boost/thread.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>

boost::mutex mutex;
boost::condition_variable_any cond;
std::vector<int> random_numbers;

void fill()
{
  std::srand(static_cast<unsigned int>(std::time(0)));
  for (int i = 0; i < 3; ++i)
  {
    boost::unique_lock<boost::mutex> lock{mutex};
    random_numbers.push_back(std::rand());
    cond.notify_all();
    cond.wait(mutex);
  }
}

void print()
{
  std::size_t next_size = 1;
  for (int i = 0; i < 3; ++i)
  {
    boost::unique_lock<boost::mutex> lock{mutex};
    while (random_numbers.size() != next_size)
      cond.wait(mutex);
    std::cout << random_numbers.back() << '\n';
    ++next_size;
    cond.notify_all();
  }
}

int main()
{
  boost::thread t1{fill};
  boost::thread t2{print};
  t1.join();
  t2.join();
}

Example 44.11 removes the wait() and count() functions. Threads no longer wait for one second in every iteration; rather, they execute as fast as possible. In addition, no total is calculated; numbers are just written to the standard output stream.

To ensure correct processing of the random numbers, the individual threads are synchronized using a condition variable, which can be checked for certain conditions between multiple threads.

As before, the fill() function generates a random number with each iteration and places it in the random_numbers container. To block other threads from accessing the container at the same time, an exclusive lock is used. Instead of waiting for one second, this example uses a condition variable. Calling notify_all() will wake up every thread that has been waiting for this notification with wait().

Looking at the for loop of the print() function, you can see that the member function wait() is called for the same condition variable. When the thread is woken up by a call to notify_all(), it tries to acquire the mutex, which will only succeed after the mutex has been successfully released in the fill() function.

The trick here is that calling wait() also releases the mutex which was passed as a parameter. After calling notify_all(), the fill() function releases the mutex by calling wait(). It then blocks and waits for some other thread to call notify_all(), which happens in the print() function once the random number has been written to the standard output stream.

Notice that the call to the wait() member function inside the print() function actually happens within a separate while loop. This is done to handle the scenario where a random number has already been placed in the container before the wait() member function is called for the first time in print(). By comparing the number of stored elements in random_numbers with the expected number of elements, this scenario is successfully handled and the random number is written to the standard output stream.

Example 44.11 also works if the locks aren’t local in the for loop but instantiated in the outer scope. In fact this makes more sense because the locks don’t need to be destroyed and recreated in every iteration. Since the mutex is always released with wait(), you don’t need to destroy the locks at the end of an iteration.