【发布时间】:2021-01-25 01:24:34
【问题描述】:
并发追加到文件支持功能吗?
我使用并发线程 + 每个线程的 fstream 对此进行了测试。我看到数据没有损坏,但是一些写入丢失了。 写入完成后文件大小小于预期。写入不会重叠。
如果我在每个 fstream 中使用自定义搜索来编写每个线程将写入的偏移量,则不会丢失任何写入。
这里是示例代码:
#include <fstream>
#include <vector>
#include <thread>
#include "gtest/gtest.h"
void append_concurrently(string filename, const int data_in_gb, const int num_threads, const char start_char,
bool stream_cache = true) {
const int offset = 1024;
const long long num_records_each_thread = (data_in_gb * 1024 * ((1024 * 1024) / (num_threads * offset)));
{
auto write_file_fn = [&](int index) {
// each thread has its own handle
fstream file_handle(filename, fstream::app | fstream::binary);
if (!stream_cache) {
file_handle.rdbuf()->pubsetbuf(nullptr, 0); // no bufferring in fstream
}
vector<char> data(offset, (char)(index + start_char));
for (long long i = 0; i < num_records_each_thread; ++i) {
file_handle.write(data.data(), offset);
if (!file_handle) {
std::cout << "File write failed: "
<< file_handle.fail() << " " << file_handle.bad() << " " << file_handle.eof() << std::endl;
break;
}
}
// file_handle.flush();
};
auto start_time = chrono::high_resolution_clock::now();
vector<thread> writer_threads;
for (int i = 0; i < num_threads; ++i) {
writer_threads.push_back(std::thread(write_file_fn, i));
}
for (int i = 0; i < num_threads; ++i) {
writer_threads[i].join();
}
auto end_time = chrono::high_resolution_clock::now();
std::cout << filename << " Data written : " << data_in_gb << " GB, " << num_threads << " threads "
<< ", cache " << (stream_cache ? "true " : "false ") << ", size " << offset << " bytes ";
std::cout << "Time taken: " << (end_time - start_time).count() / 1000 << " micro-secs" << std::endl;
}
{
ifstream file(filename, fstream::in | fstream::binary);
file.seekg(0, ios_base::end);
// This EXPECT_EQ FAILS as file size is smaller than EXPECTED
EXPECT_EQ(num_records_each_thread * num_threads * offset, file.tellg());
file.seekg(0, ios_base::beg);
EXPECT_TRUE(file);
char data[offset]{ 0 };
for (long long i = 0; i < (num_records_each_thread * num_threads); ++i) {
file.read(data, offset);
EXPECT_TRUE(file || file.eof()); // should be able to read until eof
char expected_char = data[0]; // should not have any interleaving of data.
bool same = true;
for (auto & c : data) {
same = same && (c == expected_char) && (c != 0);
}
EXPECT_TRUE(same); // THIS PASSES
if (!same) {
std::cout << "corruption detected !!!" << std::endl;
break;
}
if (file.eof()) { // THIS FAILS as file size is smaller
EXPECT_EQ(num_records_each_thread * num_threads, i + 1);
break;
}
}
}
}
TEST(fstream, file_concurrent_appends) {
string filename = "file6.log";
const int data_in_gb = 1;
{
// trunc file before write threads start.
{
fstream file(filename, fstream::in | fstream::out | fstream::trunc | fstream::binary);
}
append_concurrently(filename, data_in_gb, 4, 'B', false);
}
std::remove(filename.c_str());
}
编辑:
我将 fstream 移动到由所有线程共享。现在,对于 512 字节的缓冲区大小,我看到 8 次写入,总共丢失了 4 KB。
const int offset = 512;
const long long num_records_each_thread = (data_in_gb * 1024 * ((1024 * 1024) / (num_threads * offset)));
fstream file_handle(filename, fstream::app | fstream::binary);
if (!stream_cache) {
file_handle.rdbuf()->pubsetbuf(nullptr, 0); // no bufferring in fstream
}
4KB 缓冲区大小不会重现问题。
Running main() from gtest_main.cc
Note: Google Test filter = *file_conc*_*append*
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from fstream
[ RUN ] fstream.file_concurrent_appends
file6.log Data written : 1 GB, 1 threads , cache true , size 512 bytes Time taken: 38069289 micro-secs
d:\projs\logpoc\tests\test.cpp(279): error: Expected: num_records_each_thread * num_threads * offset
Which is: 1073741824
To be equal to: file.tellg()
Which is: 1073737728
d:\projs\logpoc\tests\test.cpp(301): error: Expected: num_records_each_thread * num_threads
Which is: 2097152
To be equal to: i + 1
Which is: 2097145
编辑 2:
在加入所有线程以从内部缓冲区刷新数据后关闭file_handle。这解决了上述问题。
【问题讨论】:
-
@Andreas Wenzel 不支持使用相同 fstream 的多线程,因为它内部有一个偏移位置。我以附加模式打开文件。所以,我希望操作系统会关心偏移量