Merge "Adding struct to hold compresion parameters" into main

This commit is contained in:
Daniel Zheng 2023-08-08 23:08:03 +00:00 committed by Gerrit Code Review
commit 5061a1cef8
4 changed files with 16 additions and 11 deletions

View File

@ -161,6 +161,10 @@ enum CowCompressionAlgorithm : uint8_t {
kCowCompressLz4 = 3, kCowCompressLz4 = 3,
kCowCompressZstd = 4, kCowCompressZstd = 4,
}; };
struct CowCompression {
CowCompressionAlgorithm algorithm = kCowCompressNone;
uint32_t compression_level = 0;
};
static constexpr uint8_t kCowReadAheadNotStarted = 0; static constexpr uint8_t kCowReadAheadNotStarted = 0;
static constexpr uint8_t kCowReadAheadInProgress = 1; static constexpr uint8_t kCowReadAheadInProgress = 1;

View File

@ -472,9 +472,10 @@ TEST_P(CompressionTest, HorribleStream) {
if (strcmp(GetParam(), "none") == 0) { if (strcmp(GetParam(), "none") == 0) {
GTEST_SKIP(); GTEST_SKIP();
} }
CowCompression compression;
auto algorithm = CompressionAlgorithmFromString(GetParam()); auto algorithm = CompressionAlgorithmFromString(GetParam());
ASSERT_TRUE(algorithm.has_value()); ASSERT_TRUE(algorithm.has_value());
compression.algorithm = algorithm.value();
std::string expected = "The quick brown fox jumps over the lazy dog."; std::string expected = "The quick brown fox jumps over the lazy dog.";
expected.resize(4096, '\0'); expected.resize(4096, '\0');

View File

@ -124,7 +124,7 @@ bool CowWriterV2::ParseOptions() {
LOG(ERROR) << "unrecognized compression: " << options_.compression; LOG(ERROR) << "unrecognized compression: " << options_.compression;
return false; return false;
} }
compression_ = *algorithm; compression_.algorithm = *algorithm;
if (options_.cluster_ops == 1) { if (options_.cluster_ops == 1) {
LOG(ERROR) << "Clusters must contain at least two operations to function."; LOG(ERROR) << "Clusters must contain at least two operations to function.";
@ -165,7 +165,7 @@ void CowWriterV2::InitWorkers() {
return; return;
} }
for (int i = 0; i < num_compress_threads_; i++) { for (int i = 0; i < num_compress_threads_; i++) {
auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size); auto wt = std::make_unique<CompressWorker>(compression_.algorithm, header_.block_size);
threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
compress_threads_.push_back(std::move(wt)); compress_threads_.push_back(std::move(wt));
} }
@ -320,8 +320,8 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) {
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data); const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
compressed_buf_.clear(); compressed_buf_.clear();
if (num_threads <= 1) { if (num_threads <= 1) {
return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks, return CompressWorker::CompressBlocks(compression_.algorithm, options_.block_size, data,
&compressed_buf_); num_blocks, &compressed_buf_);
} }
// Submit the blocks per thread. The retrieval of // Submit the blocks per thread. The retrieval of
@ -366,7 +366,7 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
while (num_blocks) { while (num_blocks) {
size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks)); size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
if (compression_ && num_compress_threads_ > 1) { if (compression_.algorithm && num_compress_threads_ > 1) {
if (!CompressBlocks(pending_blocks, iter)) { if (!CompressBlocks(pending_blocks, iter)) {
return false; return false;
} }
@ -386,19 +386,19 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
op.source = next_data_pos_; op.source = next_data_pos_;
} }
if (compression_) { if (compression_.algorithm) {
auto data = [&, this]() { auto data = [&, this]() {
if (num_compress_threads_ > 1) { if (num_compress_threads_ > 1) {
auto data = std::move(*buf_iter_); auto data = std::move(*buf_iter_);
buf_iter_++; buf_iter_++;
return data; return data;
} else { } else {
auto data = auto data = CompressWorker::Compress(compression_.algorithm, iter,
CompressWorker::Compress(compression_, iter, header_.block_size); header_.block_size);
return data; return data;
} }
}(); }();
op.compression = compression_; op.compression = compression_.algorithm;
op.data_length = static_cast<uint16_t>(data.size()); op.data_length = static_cast<uint16_t>(data.size());
if (!WriteOperation(op, data.data(), data.size())) { if (!WriteOperation(op, data.data(), data.size())) {

View File

@ -63,7 +63,7 @@ class CowWriterV2 : public CowWriterBase {
private: private:
CowFooter footer_{}; CowFooter footer_{};
CowCompressionAlgorithm compression_ = kCowCompressNone; CowCompression compression_;
uint64_t current_op_pos_ = 0; uint64_t current_op_pos_ = 0;
uint64_t next_op_pos_ = 0; uint64_t next_op_pos_ = 0;
uint64_t next_data_pos_ = 0; uint64_t next_data_pos_ = 0;