Concurrency – C++0x

1a. basics 1

#include <iostream>
#include <thread>

using namespace std;

void do_anything()
{
  // sleep functions:
  std::this_thread::sleep_for(chrono::seconds(10));
  // or
  std::this_thread::sleep_until(chrono::system_clock::now() + chrono::seconds(10));
}

class X {
public: 
    void do_something();

    void operator()() const
    {
        do_something();
    }
}

void main()
{
    X x;
    // 1
    std::thread t1(x);
    t1.join();// of t1.detach();

    // 2
    std::thread t2(&X::do_something, &x);
    t2.join();// of t2.detach();

    // 3
    std::thread t3{X()}; // or t3((X()));
    t3.join();// of t3.detach();

    // 4
    std::thread t4([]{
        do_anything();
    });
    t4.join();// of t4.detach();
}

1b. basics, passing parameters:
by default parameters are copied into internal storage, where they can be accessed by the newly created thread of execution, even if the corresponding parameter in the function is expecting a reference.

void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

void update_data(some_data& data);

void foo()
{
    some_data data;
    std::thread t(update_data, std::ref(data));
    t.join();
}


void update_data(std::unique_ptr data);

void foo()
{
    std::unique_prt p(new some_data);
    std::thread t(update_data, std::move(p));
    t.join();
}

1c. more realistic

#include <iostream>
#include <thread>
#include <future>
using namespace std;

void ThreadTask(promise <long long>& promise)
{
    for (unsigned int i = 0; i < 20; ++i)   {
        this_thread::sleep_for(chrono::seconds(2));
        cout << "Output from thread" << endl;
    }
    promise.set_value(1);
}

int main(int argc, char* argv[])
{
    const unsigned int numberOfProcessors{ thread::hardware_concurrency() }; 
    //to query the number of simultaneous threads that can be run on the computer executing the program
    cout << "This system can run " << numberOfProcessors << " concurrent tasks" << endl;

    promise <long long> promise;
    future taskFuture{ promise.get_future() };

    if (numberOfProcessors > 1)   {
        thread myThread{ ThreadTask, std::move(promise) };
        cout << "Output from main" << endl;

        while (taskFuture.wait_until(system_clock::now() + seconds(1)) != future_status::ready) {
            cout << "Still Waiting!" << endl;
        }
        cout << "Result was " << taskFuture.get() << endl;
        myThread.join();
    } else {
        cout << "CPU does not have multiple cores." << endl;
    }
    return 0;
}

1d. using std::bind


void func(int x, int y);

auto l = [](int x, int y) {};

class C {
public:
    void operator()(int x, int y) const;
    void memfunc(int x, int y) const;
};

int main()
{
    C c;
    std::shared_ptr sp(new C);

    std::bind(func, 77, 33)(); //calls func(77,33)
    std::bind(l, 77, 33)(); //calls l(77,33)
    std::bind(C(), 77, 33)(); //calls C::operator()(77,33)
    std::bind(&C::memfunc, c, 77, 33)(); //calls c.memfunc(77,33)
    std::bind(&C::memfunc, sp, 77, 33)(); //calls sp->memfunc(77,33)

    std::async(func, 42, 77);    //calls func(42,77)
    std::async(l, 42, 77);    //calls l(42,77)
    std::async(c, 42, 77);    //calls c.operator()(42,77)
    std::async(&C::memfunc, &c, 42, 77);    //calls c.memfunc()(42,77)
    std::async(&C::memfunc, sp, 42, 77);    //calls sp->memfunc(42,77)
}

2. to have a static variable per thread:

class MyManagedObject
{
private:
    static thread_local const unsigned int MAX_OBJECTS;
}
thread_local const unsigned int MyManagedObject::MAX_OBJECTS{ 8 };
...

3. some other method to start thread

#include <thread>
#include <future>
#include <iostream>
using namespace std;

long long Factorial(unsigned int value)
{
    cout << "ThreadTask thread: " << this_thread::get_id() << endl;
    this_thread::sleep_for(chrono::seconds(2));
    return value == 1 ? 1 : value * Factorial(value - 1);
}
int main(int argc, char* argv[])
{
    using namespace chrono;
    cout << "main thread: " << this_thread::get_id() << endl;

// using packaged_task:
    packaged_task task{ Factorial };
    future taskFuture{ task.get_future() };
    thread taskThread{ std::move(task), 3 };
    while (taskFuture.wait_until(system_clock::now() + seconds(1)) != future_status::ready) {
        cout << "Still Waiting!" << endl;
    }
    cout << "Factorial result was " << taskFuture.get() << endl;
    taskThread.join();

// using async
    auto taskFuture1 = async(Factorial, 3);
    cout << "Factorial result was " << taskFuture1.get() << endl;

    auto taskFuture2 = async(launch::async, Factorial, 3);
    cout << "Factorial result was .j" << taskFuture2.get() << endl;

    auto taskFuture3 = async(launch::deferred, Factorial, 3);
    cout << "Factorial result was " << taskFuture3.get() << endl;

    auto taskFuture4 = async(launch::async | launch::deferred, Factorial, 3);
    cout << "Factorial result was " << taskFuture4.get() << endl;

    return 0;
}

3b. swap examples:

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);

class X
{
private:
  some_big_object ob;
  std::mutes m;

public:
  X(some_big_object const& o) : ob(o) {}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs == & rhs) return;
    std::lock(lhs.m, rhs.m);
    std::lock_guard lock_a(lhs.m, std::adopt_lock);
    std::lock_guard lock_b(rhs.m, std::adopt_lock);
    swap(lhs.ob, rhs.ob);
  }
};


class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);

class X
{
private:
  some_big_object ob;
  std::mutes m;

public:
  X(some_big_object const& o) : ob(o) {}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs == & rhs) return;
    std::unique_lock lock_a(lhs.m, std::defer_lock);
    std::unique_lock lock_b(rhs.m, std::defer_lock);
    std::lock(lhs.m, rhs.m);
    swap(lhs.ob, rhs.ob);
  }
};

3c. thread safe initialization:

// call_once:
class X
{
private:
  std::once_flag connection_init_flag;

  void openj_connection() {}
public:
  void send_data()
  {
    std::call_once(connection_inin_flag, &X::open_connection, this);
    //send data
  }

  void receive_data()
  {
    std::call_once(connection_inin_flag, &X::open_connection, this);
    //receive data
  }
};

//2 static variable:
class my_class;

my_class& get_my_class_instance()
{
  static my_class instance;
  return instance;
}

3d. protecting data with boost::shared_mutex

#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>

class dns_entree;

class dns_cache
{
  std::map<std::string, dns_entree> entries;
  mutable boost::shared_mutex entee_mutex;

  dns_entree find_entree(std::string const& domain) const
  {
    boost::shared_lock<boost::shared_mutex> lk(entee_mutex);
    std::map<std::string, dns_entree>::const_iterator const it = entries.find(domain);
    return (it == entries.end()) ? dns_enrtee() : it->second;
  }

  void update_or_add_entree(std::string const& domain, dns_entree const& details)
  {
    std::lock_quard<boost::shared_mutex> lk(entrée_mutex);
    entries[domain] = details;
  }
};

3e. waiting for data with condition_variable:

std::mutex mtx;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;

void data_preparation_thread()
{
  while(...)  {
     data_chunk const data = getdata();
     std::lock_quard<std::mutex> lk(mtx);
     data_queue.push(data);
     data_cond.notify_one(); // .notify_all();
  }
}

void data_processing_thread()
{
  while(true)  {
    std::unique_lock<std::mutex> lk(mtx);
    data_cond.wait(lk, [] { return !data_queue.empty(); } );
    data_chunk data = data_queue.front();
    data_queue.pop();
    lk.unlock();
    process(data);
  }
}

4. copy file example:

#include <string>
#include <vector>
#include <fstream>
#include <iostream>
#include <future>
using namespace std; 
 
vector readFile(const string& inPath) 
{ 
    ifstream file(inPath, ios::binary | ios::ate); 
    size_t length = (size_t)file.tellg(); 
    vector<char> buffer(length); 
    file.seekg(0, std::ios::beg); 
    file.read(&buffer[0], length); 
    return buffer; 
} 
 
size_t writeFile(const vector<char>& buffer, const string& outPath) 
{ 
    ofstream file(outPath, ios::binary); 
    file.write(&buffer[0], buffer.size()); 
    return (size_t)file.tellp(); 
} 

// sync copy (no threads)
size_t sync_copyFile(const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

// task based:
size_t future_copyFile(const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 
 
    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 
 
    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

// packaged task:
size_t packagedtask_copyFile(const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = vector<char>(const string&); 
    packaged_task<Task_Type_Read> pt1(readFile); 
    future<vector<char>> fut1{ pt1.get_future() }; 
    thread th1{ move(pt1), inFile }; 
 
    using Task_Type_Write = size_t(const string&); 
    packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    future<size_t> fut2{ pt2.get_future() }; 
    thread th2{ move(pt2), outFile }; 
 
    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
}

// std::async:
size_t async_copyFile(const string& inFile, const string& outFile) 
{ 
    auto fut1 = async(readFile, inFile); 
    auto fut2 = async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, 
    outFile); 
 
    return fut2.get(); 
} 

// c++17
future future_then_copyFile(const string& inFile, const string& outFile)
{
    return async([inFile]() {
        return readFile(inFile);
    }).then([outFile](const vector<char>& buffer) {
        return writeFile(buffer, outFile);
    });
}

5. parallel quicksort using futures:

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
  if(input.empty()) return input;

  std::list<T> result;
  result.splice(result.begin(), input, input.begin());
  T const& pivot = *result.begin();
  auto divide_point = std::partition(input.begin(), input.end(), [&] (T const& t) { return t < pivot; } );
  
  std::list<T> lower_part;
  lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

  std::future<std::list<T>> new_lower(std::async(&parallel_quick_sort<T>, std::move(lower_part)));

  auto new_higher(parallel_quick_sort(std::move(input)));

  result.splice(result.end(), new_higher);
  result.splice(resulf.begin(), new_lower.get());

  return result;
}

6. be careful with async

auto fut = std::async(f); // as above
 if (fut.wait_for(0s) == // if task is
    std::future_status::deferred) // deferred…
{
                        // …use wait or get on fut
  … // to call f synchronously
} else { // task isn’t deferred
  while (fut.wait_for(100ms) != // infinite loop not
         std::future_status::ready) { // possible (assuming
                                           // f finishes)
     … // task is neither deferred nor ready,
                       // so do concurrent work until it’s ready
  }
   … // fut is ready
}

some useful tips (mostly for myself)