1a. basics 1
#include <iostream>
#include <thread>
using namespace std;
void do_anything()
{
// sleep functions:
std::this_thread::sleep_for(chrono::seconds(10));
// or
std::this_thread::sleep_until(chrono::system_clock::now() + chrono::seconds(10));
}
class X {
public:
void do_something();
void operator()() const
{
do_something();
}
}
void main()
{
X x;
// 1
std::thread t1(x);
t1.join();// of t1.detach();
// 2
std::thread t2(&X::do_something, &x);
t2.join();// of t2.detach();
// 3
std::thread t3{X()}; // or t3((X()));
t3.join();// of t3.detach();
// 4
std::thread t4([]{
do_anything();
});
t4.join();// of t4.detach();
}
1b. basics, passing parameters:
by default parameters are copied into internal storage, where they can be accessed by the newly created thread of execution, even if the corresponding parameter in the function is expecting a reference. So to pass a reference std::ref should be used:
void f(int i, std::string const& s);
std::thread t(f, 3, "hello");
void update_data(some_data& data);
void foo()
{
some_data data;
std::thread t(update_data, std::ref(data));
t.join();
}
void update_data(std::unique_ptr data);
void foo()
{
std::unique_prt p(new some_data);
std::thread t(update_data, std::move(p));
t.join();
}
1c. more realistic
#include <iostream>
#include <thread>
#include <future>
using namespace std;
void ThreadTask(promise <long long>& promise)
{
for (unsigned int i = 0; i < 20; ++i) {
this_thread::sleep_for(chrono::seconds(2));
cout << "Output from thread" << endl;
}
promise.set_value(1);
}
int main(int argc, char* argv[])
{
const unsigned int numberOfProcessors{ thread::hardware_concurrency() };
//to query the number of simultaneous threads that can be run on the computer executing the program
cout << "This system can run " << numberOfProcessors << " concurrent tasks" << endl;
promise <long long> promise;
future taskFuture{ promise.get_future() };
if (numberOfProcessors > 1) {
thread myThread{ ThreadTask, std::move(promise) };
cout << "Output from main" << endl;
while (taskFuture.wait_until(system_clock::now() + seconds(1)) != future_status::ready) {
cout << "Still Waiting!" << endl;
}
cout << "Result was " << taskFuture.get() << endl;
myThread.join();
} else {
cout << "CPU does not have multiple cores." << endl;
}
return 0;
}
1d. using std::bind
void func(int x, int y);
auto l = [](int x, int y) {};
class C {
public:
void operator()(int x, int y) const;
void memfunc(int x, int y) const;
};
int main()
{
C c;
std::shared_ptr sp(new C);
std::bind(func, 77, 33)(); //calls func(77,33)
std::bind(l, 77, 33)(); //calls l(77,33)
std::bind(C(), 77, 33)(); //calls C::operator()(77,33)
std::bind(&C::memfunc, c, 77, 33)(); //calls c.memfunc(77,33)
std::bind(&C::memfunc, sp, 77, 33)(); //calls sp->memfunc(77,33)
std::async(func, 42, 77); //calls func(42,77)
std::async(l, 42, 77); //calls l(42,77)
std::async(c, 42, 77); //calls c.operator()(42,77)
std::async(&C::memfunc, &c, 42, 77); //calls c.memfunc()(42,77)
std::async(&C::memfunc, sp, 42, 77); //calls sp->memfunc(42,77)
}
1e. Using RAII to wait for a thread to complete
class ThreadGuard
{
std::thread& th;
public:
explicit ThreadGuard(std::thread& t)
: th(t) {}
~ThreadGuard() {
if(th.joinable())
th.join();
}
ThreadGuard(ThreadGuard const&) = delete;
ThreadGuard& operator=(ThreadGuard const&) = delete;
};
void foo() {
std::thread t(some_function);
ThreadGuard guard(t);
...
do some stuff...
}
2. to have a static variable per thread:
class MyManagedObject
{
private:
static thread_local const unsigned int MAX_OBJECTS;
}
thread_local const unsigned int MyManagedObject::MAX_OBJECTS{ 8 };
...
3. some other method to start thread
#include <thread>
#include <future>
#include <iostream>
using namespace std;
long long Factorial(unsigned int value)
{
cout << "ThreadTask thread: " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::seconds(2));
return value == 1 ? 1 : value * Factorial(value - 1);
}
int main(int argc, char* argv[])
{
using namespace chrono;
cout << "main thread: " << this_thread::get_id() << endl;
// using packaged_task:
packaged_task task{ Factorial };
future taskFuture{ task.get_future() };
thread taskThread{ std::move(task), 3 };
while (taskFuture.wait_until(system_clock::now() + seconds(1)) != future_status::ready) {
cout << "Still Waiting!" << endl;
}
cout << "Factorial result was " << taskFuture.get() << endl;
taskThread.join();
// using async
auto taskFuture1 = async(Factorial, 3);
cout << "Factorial result was " << taskFuture1.get() << endl;
auto taskFuture2 = async(launch::async, Factorial, 3);
cout << "Factorial result was .j" << taskFuture2.get() << endl;
auto taskFuture3 = async(launch::deferred, Factorial, 3);
cout << "Factorial result was " << taskFuture3.get() << endl;
auto taskFuture4 = async(launch::async | launch::deferred, Factorial, 3);
cout << "Factorial result was " << taskFuture4.get() << endl;
return 0;
}
3b. swap examples:
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object ob;
std::mutes m;
public:
X(some_big_object const& o) : ob(o) {}
friend void swap(X& lhs, X& rhs)
{
if(&lhs == & rhs) return;
std::lock(lhs.m, rhs.m);
std::lock_guard lock_a(lhs.m, std::adopt_lock);
std::lock_guard lock_b(rhs.m, std::adopt_lock);
swap(lhs.ob, rhs.ob);
}
};
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object ob;
std::mutes m;
public:
X(some_big_object const& o) : ob(o) {}
friend void swap(X& lhs, X& rhs)
{
if(&lhs == & rhs) return;
std::unique_lock lock_a(lhs.m, std::defer_lock);
std::unique_lock lock_b(rhs.m, std::defer_lock);
std::lock(lhs.m, rhs.m);
swap(lhs.ob, rhs.ob);
}
};
3c. thread safe initialization:
// call_once:
class X
{
private:
std::once_flag connection_init_flag;
void openj_connection() {}
public:
void send_data()
{
std::call_once(connection_inin_flag, &X::open_connection, this);
//send data
}
void receive_data()
{
std::call_once(connection_inin_flag, &X::open_connection, this);
//receive data
}
};
//2 static variable:
class my_class;
my_class& get_my_class_instance()
{
static my_class instance;
return instance;
}
3d. protecting data with boost::shared_mutex
#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>
class dns_entree;
class dns_cache
{
std::map<std::string, dns_entree> entries;
mutable boost::shared_mutex entee_mutex;
dns_entree find_entree(std::string const& domain) const
{
boost::shared_lock<boost::shared_mutex> lk(entee_mutex);
std::map<std::string, dns_entree>::const_iterator const it = entries.find(domain);
return (it == entries.end()) ? dns_enrtee() : it->second;
}
void update_or_add_entree(std::string const& domain, dns_entree const& details)
{
std::lock_quard<boost::shared_mutex> lk(entrée_mutex);
entries[domain] = details;
}
};
3e. waiting for data with condition_variable:
std::mutex mtx;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while(...) {
data_chunk const data = getdata();
std::lock_quard<std::mutex> lk(mtx);
data_queue.push(data);
data_cond.notify_one(); // .notify_all();
}
}
void data_processing_thread()
{
while(true) {
std::unique_lock<std::mutex> lk(mtx);
data_cond.wait(lk, [] { return !data_queue.empty(); } );
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
}
}
4. copy file example:
#include <string>
#include <vector>
#include <fstream>
#include <iostream>
#include <future>
using namespace std;
vector readFile(const string& inPath)
{
ifstream file(inPath, ios::binary | ios::ate);
size_t length = (size_t)file.tellg();
vector<char> buffer(length);
file.seekg(0, std::ios::beg);
file.read(&buffer[0], length);
return buffer;
}
size_t writeFile(const vector<char>& buffer, const string& outPath)
{
ofstream file(outPath, ios::binary);
file.write(&buffer[0], buffer.size());
return (size_t)file.tellp();
}
// sync copy (no threads)
size_t sync_copyFile(const string& inFile, const string& outFile)
{
return writeFile(readFile(inFile), outFile);
}
// task based:
size_t future_copyFile(const string& inFile, const string& outFile)
{
std::promise<vector<char>> prom1;
std::future<vector<char>> fut1 = prom1.get_future();
std::thread th1([&prom1, inFile](){
prom1.set_value(readFile(inFile));
});
std::promise<int> prom2;
std::future<int> fut2 = prom2.get_future();
std::thread th2([&fut1, &prom2, outFile](){
prom2.set_value(writeFile(fut1.get(), outFile));
});
size_t result = fut2.get();
th1.join();
th2.join();
return result;
}
// packaged task:
size_t packagedtask_copyFile(const string& inFile, const string& outFile)
{
using Task_Type_Read = vector<char>(const string&);
packaged_task<Task_Type_Read> pt1(readFile);
future<vector<char>> fut1{ pt1.get_future() };
thread th1{ move(pt1), inFile };
using Task_Type_Write = size_t(const string&);
packaged_task<Task_Type_Write> pt2([&fut1](const string& path){
return writeFile(fut1.get(), path);
});
future<size_t> fut2{ pt2.get_future() };
thread th2{ move(pt2), outFile };
size_t result = fut2.get();
th1.join();
th2.join();
return result;
}
// std::async:
size_t async_copyFile(const string& inFile, const string& outFile)
{
auto fut1 = async(readFile, inFile);
auto fut2 = async([&fut1](const string& path){
return writeFile(fut1.get(), path);
},
outFile);
return fut2.get();
}
// c++17
future future_then_copyFile(const string& inFile, const string& outFile)
{
return async([inFile]() {
return readFile(inFile);
}).then([outFile](const vector<char>& buffer) {
return writeFile(buffer, outFile);
});
}
5. parallel quicksort using futures:
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty()) return input;
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(), [&] (T const& t) { return t < pivot; } );
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
std::future<std::list<T>> new_lower(std::async(¶llel_quick_sort<T>, std::move(lower_part)));
auto new_higher(parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(resulf.begin(), new_lower.get());
return result;
}
6. be careful with async
auto fut = std::async(f); // as above
if (fut.wait_for(0s) == // if task is
std::future_status::deferred) // deferred…
{
// …use wait or get on fut
… // to call f synchronously
} else { // task isn’t deferred
while (fut.wait_for(100ms) != // infinite loop not
std::future_status::ready) { // possible (assuming
// f finishes)
… // task is neither deferred nor ready,
// so do concurrent work until it’s ready
}
… // fut is ready
}