C++0x Синхронизация потоков

Модератор: Модераторы разделов

Аватара пользователя
AlphaGh0St
Сообщения: 41

C++0x Синхронизация потоков

Сообщение AlphaGh0St »

Всем привет, интересует организация синхронной работы с разделяемыми ресурсами нескольких потоков (в стандартной библиотеке потоков).
Например, есть следующий код:

Код: Выделить всё

    /*
            g++ -std=c++0x -pthread -o threads threads.cpp
    */

    #include <iostream>
    #include <thread>
    #include <mutex>

    #define MAX_THREADS 2
    std::mutex m;
    int i;

    void func(const int n){
            m.lock();
            for(i = 0; i < 8; i++){
                    std::cout << "thread number: " << n << std::endl;
                    std::cout << "            i: " << i << std::endl;
                    m.unlock();
            }
    }

    int main(int argc, char *argv[]){
            std::thread thr[MAX_THREADS];

            for(int i = 0; i < MAX_THREADS; i++){
                    thr[i] = std::thread(func, i);
            }

            for(int i = 0; i < MAX_THREADS; i++){
                    thr[i].join();
            }

            return 0;
    }


Как организовать работу потоков с помощью мьютекса таким образом, чтобы потоки входили в цикл по очереди?
Например:
поток=0 | i =0
поток=1 | i =1
поток=0 | i =2
поток=1 | i =3
поток=0 | i =4
поток=1 | i =5
поток=0 | i =6
поток=1 | i =7


А то получается так, что первый поток вошёл в цикл, прошёл все итерации, вышел. А за ним второй поток вошёл и тоже прошёл все итерации. Нужно сделать так, чтобы потоки не выполняли одну и ту же работу, а работали синхронно.

Подскажите, как правильно организовать такую работу?
Благодарю.
Спасибо сказали:
Аватара пользователя
TuxWare
Сообщения: 637
ОС: Windows 7

Re: C++0x Синхронизация потоков

Сообщение TuxWare »

1 раз лочится восемь раз унлочится. Следует переместить lock/unlock то ли в скобки то ли за скобки for.

void func(const int n){
m.lock();
for(i = 0; i < 8; ++i){
std::cout << "thread number: " << n << std::endl;
std::cout << " i: " << i << std::endl;
}
m.unlock();
}

Так нет никакого параллелизма вычислений.

void func(const int n){
for(i = 0; i < 8; ++i){
m.lock();
if (i < 8) { // Непропускать значения i большие чем 7.
std::cout << "thread number: " << n << std::endl;
std::cout << " i: " << i << std::endl;
}
m.unlock();
}
}

Понятно как i станет больше 7. Поток 0 i=7, поток 1 i=8, поток 2 i=9, поток 3 i=10. Есть еще шанс получить две инициализаии i. Мне кажется правильно i инициализировать при объявлении или перед циклом с thr[i].join() в main.

Если дать хоть какую то нагрузку на процессор и при MAX_THREADS=4 (у меня компьютер 8 ядер, 16 потоков)
Spoiler

void func(const int n){
for(i=0; i < 8; ++i) {
long d=0;
for (int j=0; j<1000000; ++j) d=j*j; // Грузим ядро вычислениями.
m.lock();
if (i < 8) { // Непропускать значения i большие чем 7.
std::cout << "thread number: " << n << std::endl;
std::cout << " i: " << i << std::endl;
}
m.unlock();
}
}

:~> ./threads
thread number: 2
i: 0
thread number: 0
i: 1
thread number: 1
i: 2
thread number: 3
i: 3
thread number: 0
i: 4
thread number: 1
i: 5
thread number: 2
i: 6
thread number: 0
i: 7

И зачем использовать во всех циклах i?

Избежать ряда случайных проблем также можно если вместо i создать класс со своим локером. Не с общим. Т.е. чтение доступно всем потокам, запись только одному. Или говоря иначе операции i=0 или ++i выполняются в один поток.
Спасибо сказали:
Аватара пользователя
AlphaGh0St
Сообщения: 41

Re: C++0x Синхронизация потоков

Сообщение AlphaGh0St »

Спасибо за помощь!

TuxWare писал(а):
18.07.2012 20:47
Если дать хоть какую то нагрузку на процессор и при MAX_THREADS=4 (у меня компьютер 8 ядер, 16 потоков)

Ух ты, подскажите, а что у Вас за процессор?

И зачем использовать во всех циклах i?
Избежать ряда случайных проблем также можно если вместо i создать класс со своим локером. Не с общим. Т.е. чтение доступно всем потокам, запись только одному. Или говоря иначе операции i=0 или ++i выполняются в один поток.

Как, например, здесь ?
Спасибо сказали:
Аватара пользователя
TuxWare
Сообщения: 637
ОС: Windows 7

Re: C++0x Синхронизация потоков

Сообщение TuxWare »

У меня два ксеона по 4 ядра с HT.

Решение изложенное выше определяет только суть, но не является рабочим. Но если совсем вкратце примерно так.
Скажем i=5. Первый поток выполняет инкремент.i=6 и это меньше 8 входим в тело цикла. Второй, поток выполняет инкремент.i=7. Пока первый поток начнет печатать значение само значение уже изменилось. И он напечатает 7. Второй поток проверил 7 меньше 8. Входит в тело цикла. Что он напечатает. Если первый поток не успеет сделать инкремент, второй поток напечатает еще раз 7. А если успеет сделать инкремент, то i станет 8 и первый поток завершится. Но второй поток уже готов к печати и напечатает 8, хотя это значение и не должно было появиться.
То есть решить первое надо избавится от обращений к i везде, а работать с локальной переменной скажем t. Если записать
int t;
for(t = i; t < 8; t=++i){...}
Два потока ворвавшихся параллельно присвоят своим локальным значениям t текущее значение i, скажем 0. И два потока войдут в тело цикла с одним и тем же значением t=0. И лочить здесь проблематично. Но скажем так будет легче
while ( 1 ) {
int t;
mutex.lock();
t = i++; // тогда можно оставить к i свободный доступ без RW-блокировок, но больше обращений к i в func() быть не должно
mutex.unlock();
if (t>=8) break;
...... // И далее работаем только с t.
}

Я везде использовал только mutex, как и в оригинале. А то есть и другие средства блокировок или не блокировок (атомарных операций).

По поводу примера, я не смотрел детально но в системной библиотеке gcc есть pthread_rwlock_rdlock и pthread_rwlock_wrlock, что проще.
И посмотрите http://en.cppreference.com/w/cpp/thread http://en.cppreference.com/w/cpp/atomic там и другие инструменты.
Спасибо сказали:
Аватара пользователя
TuxWare
Сообщения: 637
ОС: Windows 7

Re: C++0x Синхронизация потоков

Сообщение TuxWare »

Да еще
while (1) {
int t;
mutex.lock();
t = i++; // тогда можно оставить к i свободный доступ без RW-блокировок
mutex.unlock();
if (t>=8) break;
...... // И далее работаем только с t.
print_mutex.lock();
std::cout << "thread number: " << n << std::endl;
std::cout << " i: " << t << std::endl;
prinnt_mutex.unlock();
}

Печатаем в один поток дабы избежать чередования слов из разных потоков. Так как мы работаем с локальной переменной t, то мы имеем полное право в одном потоке печатать, а в другом выполнять инкремент i. Поэтому блокировки разные. Ну где то так.
Спасибо сказали:
Аватара пользователя
TuxWare
Сообщения: 637
ОС: Windows 7

Re: C++0x Синхронизация потоков

Сообщение TuxWare »

Код:

/* g++ -std=c++0x -pthread -o threads threads.cpp */ #include <iostream> #include <thread> #include <mutex> #define MAX_THREADS 4 std::mutex m1, m2; int i; void func(const int n){ while (1) { int t; m1.lock(); t = i++; m1.unlock(); if (t>=8) break; long d=0; for (int j=0; j<1000000; ++j) d=j*j; // Грузим ядро вычислениями. m2.lock(); std::cout << "thread number: " << n << std::endl; std::cout << " i: " << t << std::endl; m2.unlock(); } } int main(int argc, char *argv[]){ std::thread thr[MAX_THREADS]; for(int j = 0; j < MAX_THREADS; j++){ thr[j] = std::thread(func, j); } for(int j = 0; j < MAX_THREADS; j++){ thr[j].join(); } return 0; }


Код:

./threads thread number: 0 i: 0 thread number: 1 i: 2 thread number: 0 i: 3 thread number: 3 i: 1 thread number: 3 i: 7 thread number: 0 i: 6 thread number: 2 i: 4 thread number: 1 i: 5
Спасибо сказали:
Аватара пользователя
AlphaGh0St
Сообщения: 41

Re: C++0x Синхронизация потоков

Сообщение AlphaGh0St »

Возник ещё один вопрос, как корректно завершить все работающие дочерние потоки?
При использовании потоков POSIX, я написал такую функцию для этой цели:

Код: Выделить всё

void kill_threads(void){
    int i;
    for(i = 0; i < THREADS_MAX; i++){ // просматриваем все дочерние потоки
        if(threads[i] == pthread_self()) // если текущий поток - проверяющий, пропускаем
            continue;

        if(pthread_kill(threads[i], 0) != ESRCH)
            pthread_cancel(threads[i]); // завершаем i-й поток
    }

    pthread_cancel(pthread_self()); // завершаем проверяющий поток
}
Спасибо сказали:
Аватара пользователя
TuxWare
Сообщения: 637
ОС: Windows 7

Re: C++0x Синхронизация потоков

Сообщение TuxWare »

pthread_cancel(pthread_self()); Зачем нужны самоубийцы. Если закончится работа функции N закончится и N-ый поток.
Зачем проверять pthread_kill это же сделает и pthread_cancel без дополнительных проверок.
И самое главное любой поток может инициировать завершение или всегда один и тот же.
Спасибо сказали: