【发布时间】:2015-06-13 11:38:52
【问题描述】:
在我的程序中,多个线程(检查器)请求网页,如果这些页面包含一些数据,则另一个线程(消费者)处理数据。我只需要预定义的消费者数量就可以开始处理(不是全部)。我尝试使用 std::atomic 计数器和 fetch_add 来限制工作消费者的数量。但是,尽管计数器保持在界限内,但消费者获得相同的计数器值,并且实际处理消费者的数量超过了限制。行为取决于处理持续时间。简化代码包含 sleep_for 而不是获取页面和处理页面函数。
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
class cConsumer
{
public:
cConsumer::cConsumer(
const size_t aNumber,
std::atomic<bool> &aFire,
std::atomic<size_t> &aCounter) :
mNumber(aNumber),
mFire(aFire),
mCounter(aCounter){}
void cConsumer::operator ()()
{
while (true)
{
while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);
size_t vCounter = mCounter.fetch_add(1);
if (vCounter < 5)
{
std::cout << " FIRE! consumer " << mNumber << ", counter " << vCounter << "\n";
std::this_thread::sleep_for(mWorkDuration);
}
if (vCounter == 5)
{
mFire.store(false);
mCounter.store(0);
}
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mWorkDuration;
const size_t mNumber;
std::atomic<bool> &mFire;
std::atomic<size_t> &mCounter;
};
const std::chrono::milliseconds
cConsumer::mMillisecond(1),
cConsumer::mWorkDuration(1300);
class cChecker
{
public:
cChecker(
const size_t aNumber,
std::atomic<bool> &aFire) :
mNumber(aNumber),
mFire(aFire),
mStep(1){ }
void cChecker::operator ()()
{
while (true)
{
while (mFire.load()) std::this_thread::sleep_for(mMillisecond);
std::cout << "checker " << mNumber << " step " << mStep << "\n";
std::this_thread::sleep_for(mCheckDuration);
if (mStep % 20 == 1) mFire.store(true);
mStep++;
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mCheckDuration;
const size_t mNumber;
size_t mStep;
std::atomic<bool> &mFire;
};
const std::chrono::milliseconds
cChecker::mMillisecond(1),
cChecker::mCheckDuration(500);
void main()
{
std::atomic<bool> vFire(false);
std::atomic<size_t> vCounter(0);
std::thread vConsumerThreads[16];
for (size_t i = 0; i < 16; i++)
{
std::thread vConsumerThread((cConsumer(i, vFire, vCounter)));
vConsumerThreads[i] = std::move(vConsumerThread);
}
std::chrono::milliseconds vNextCheckerDelay(239);
std::thread vCheckerThreads[3];
for (size_t i = 0; i < 3; i++)
{
std::thread vCheckerThread((cChecker(i, vFire)));
vCheckerThreads[i] = std::move(vCheckerThread);
std::this_thread::sleep_for(vNextCheckerDelay);
}
for (size_t i = 0; i < 16; i++) vConsumerThreads[i].join();
for (size_t i = 0; i < 3; i++) vCheckerThreads[i].join();
}
输出示例(部分)
...
checker 1 step 19
checker 0 step 20
checker 2 step 19
checker 1 step 20
checker 0 step 21
checker 2 step 20
checker 1 step 21
FIRE! consumer 10, counter 0
FIRE! consumer 13, counter 4
FIRE! consumer 6, counter 1
FIRE! consumer 0, counter 2
FIRE! consumer 2, counter 3
checker 0 step 22
checker 2 step 21
FIRE! consumer 5, counter 3
FIRE! consumer 7, counter 4
FIRE! consumer 4, counter 1
FIRE! consumer 15, counter 2
FIRE! consumer 8, counter 0
checker 1 step 22
FIRE! consumer 9, counter 0
FIRE! consumer 11, counter 1
FIRE! consumer 3, counter 2
FIRE! consumer 14, counter 3
FIRE! consumer 1, counter 4
checker 0 step 23
checker 2 step 22
checker 1 step 23
checker 2 step 23
checker 0 step 24
checker 1 step 24
我找到了一种可行但并不优雅的解决方案:等待所有消费者尝试工作并了解火已熄灭。
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
class cConsumer
{
public:
cConsumer::cConsumer(
const size_t aNumber,
const size_t aConsumerCount,
std::atomic<bool> &aFire,
std::atomic<size_t> &aCounter) :
mNumber(aNumber),
mConsumerCount(aConsumerCount),
mFire(aFire),
mCounter(aCounter){}
void cConsumer::operator ()()
{
while (true)
{
while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);
const size_t vCounter = mCounter.fetch_add(1);
if (vCounter < 5)
{
std::cout << " FIRE! consumer " << mNumber << ", counter " << vCounter << "\n";
std::this_thread::sleep_for(mWorkDuration); //stub for process function
}
if (vCounter >= 5)
{
std::this_thread::sleep_for(mWorkDuration); //wait for other threads to increase counter
std::this_thread::sleep_for(mWorkDuration); //double wait for long processing
mFire.store(false);
}
if (vCounter == mConsumerCount)
{
mCounter.store(0);
}
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mWorkDuration;
const size_t
mNumber,
mConsumerCount;
std::atomic<bool> &mFire;
std::atomic<size_t> &mCounter;
};
const std::chrono::milliseconds
cConsumer::mMillisecond(1),
cConsumer::mWorkDuration(1300);
class cChecker
{
public:
cChecker(
const size_t aNumber,
std::atomic<bool> &aFire) :
mNumber(aNumber),
mFire(aFire),
mStep(1){ }
void cChecker::operator ()()
{
while (true)
{
while (mFire.load()) std::this_thread::sleep_for(mMillisecond);
std::cout << "checker " << mNumber << " step " << mStep << "\n";
std::this_thread::sleep_for(mCheckDuration);
if (mStep % 20 == 1) mFire.store(true);
mStep++;
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mCheckDuration;
const size_t mNumber;
size_t mStep;
std::atomic<bool> &mFire;
};
const std::chrono::milliseconds
cChecker::mMillisecond(1),
cChecker::mCheckDuration(500);
void main()
{
std::atomic<bool> vFire(false);
std::atomic<size_t> vCouter(0);
std::thread vConsumerThreads[16];
for (size_t i = 0; i < 16; i++)
{
vConsumerThreads[i] = std::move(std::thread(cConsumer(i, 16, vFire, vCouter)));
}
std::chrono::milliseconds vNextCheckerDelay(239);
std::thread vCheckerThreads[3];
for (size_t i = 0; i < 3; i++)
{
vCheckerThreads[i] = std::move(std::thread(cChecker(i, vFire)));
std::this_thread::sleep_for(vNextCheckerDelay);
}
for (size_t i = 0; i < 16; i++) vConsumerThreads[i].join();
for (size_t i = 0; i < 3; i++) vCheckerThreads[i].join();
我认为存在更好的解决方案。
【问题讨论】:
-
输出中有问题的值在哪里?
-
我需要“开火!”只有 5 次,但发生了 15 次。
-
每个checker都有私有的
step计数器,当发现是20*n+1时,消费者被重新武装(mFire.store(true)被执行)。这就是为什么你有 15 个“火!”事件 - 每个检查器 5 个(0,1 和 2),达到step21。 -
为什么你只期望 5 个?将 mCouter 设置为 5 的消费者会将其重置为 0,这也会导致其他人触发。
标签: c++ multithreading