diff --git a/logd/Android.bp b/logd/Android.bp index 1f6ab3467..e0a1168be 100644 --- a/logd/Android.bp +++ b/logd/Android.bp @@ -31,7 +31,11 @@ event_flag = [ cc_defaults { name: "logd_defaults", - shared_libs: ["libbase"], + shared_libs: [ + "libbase", + "libz", + ], + static_libs: ["libzstd"], cflags: [ "-Wextra", "-Wthread-safety", @@ -40,6 +44,7 @@ cc_defaults { lto: { thin: true, }, + cpp_std: "experimental", } cc_library_static { @@ -48,12 +53,16 @@ cc_library_static { host_supported: true, srcs: [ "ChattyLogBuffer.cpp", + "CompressionEngine.cpp", "LogReaderList.cpp", "LogReaderThread.cpp", "LogBufferElement.cpp", "LogStatistics.cpp", "LogWhiteBlackList.cpp", "LogTags.cpp", + "SerializedFlushToState.cpp", + "SerializedLogBuffer.cpp", + "SerializedLogChunk.cpp", "SimpleLogBuffer.cpp", ], logtags: ["event.logtags"], @@ -132,6 +141,8 @@ cc_defaults { "ChattyLogBufferTest.cpp", "logd_test.cpp", "LogBufferTest.cpp", + "SerializedLogChunkTest.cpp", + "SerializedFlushToStateTest.cpp", ], static_libs: [ @@ -140,6 +151,8 @@ cc_defaults { "liblog", "liblogd", "libselinux", + "libz", + "libzstd", ], } diff --git a/logd/CompressionEngine.cpp b/logd/CompressionEngine.cpp new file mode 100644 index 000000000..f37208b84 --- /dev/null +++ b/logd/CompressionEngine.cpp @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CompressionEngine.h" + +#include + +#include +#include +#include + +CompressionEngine& CompressionEngine::GetInstance() { + CompressionEngine* engine = new ZstdCompressionEngine(); + return *engine; +} + +bool ZlibCompressionEngine::Compress(std::span in, std::vector& out) { + z_stream strm; + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + int ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION); + if (ret != Z_OK) { + LOG(FATAL) << "deflateInit() failed"; + } + + CHECK_LE(in.size(), static_cast(std::numeric_limits::max())); + uint32_t out_size = deflateBound(&strm, in.size()); + + out.resize(out_size); + strm.avail_in = in.size(); + strm.next_in = const_cast(in.data()); + strm.avail_out = out_size; + strm.next_out = out.data(); + ret = deflate(&strm, Z_FINISH); + CHECK_EQ(ret, Z_STREAM_END); + + uint32_t compressed_data_size = strm.total_out; + deflateEnd(&strm); + out.resize(compressed_data_size); + + return true; +} + +bool ZlibCompressionEngine::Decompress(const std::vector& in, std::vector& out, + size_t out_size) { + out.resize(out_size); + + z_stream strm; + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = in.size(); + strm.next_in = const_cast(in.data()); + strm.avail_out = out.size(); + strm.next_out = out.data(); + + inflateInit(&strm); + int ret = inflate(&strm, Z_NO_FLUSH); + + CHECK_EQ(strm.avail_in, 0U); + CHECK_EQ(strm.avail_out, 0U); + CHECK_EQ(ret, Z_STREAM_END); + inflateEnd(&strm); + + return true; +} + +bool ZstdCompressionEngine::Compress(std::span in, std::vector& out) { + size_t out_size = ZSTD_compressBound(in.size()); + out.resize(out_size); + + out_size = ZSTD_compress(out.data(), out_size, in.data(), in.size(), 1); + if (ZSTD_isError(out_size)) { + LOG(FATAL) << "ZSTD_compress failed: " << ZSTD_getErrorName(out_size); + } + out.resize(out_size); + + return true; +} + +bool ZstdCompressionEngine::Decompress(const std::vector& in, std::vector& out, + size_t out_size) { + out.resize(out_size); + size_t result = ZSTD_decompress(out.data(), out.size(), in.data(), in.size()); + if (ZSTD_isError(result)) { + LOG(FATAL) << "ZSTD_decompress failed: " << ZSTD_getErrorName(result); + } + CHECK_EQ(result, out.size()); + return true; +} diff --git a/logd/CompressionEngine.h b/logd/CompressionEngine.h new file mode 100644 index 000000000..d760ceaa5 --- /dev/null +++ b/logd/CompressionEngine.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +class CompressionEngine { + public: + static CompressionEngine& GetInstance(); + + virtual ~CompressionEngine(){}; + + virtual bool Compress(std::span in, std::vector& out) = 0; + // Decompress the contents of `in` into `out`. `out_size` must be set to the decompressed size + // of the contents. + virtual bool Decompress(const std::vector& in, std::vector& out, + size_t out_size) = 0; +}; + +class ZlibCompressionEngine : public CompressionEngine { + public: + bool Compress(std::span in, std::vector& out) override; + bool Decompress(const std::vector& in, std::vector& out, + size_t out_size) override; +}; + +class ZstdCompressionEngine : public CompressionEngine { + public: + bool Compress(std::span in, std::vector& out) override; + bool Decompress(const std::vector& in, std::vector& out, + size_t out_size) override; +}; \ No newline at end of file diff --git a/logd/LogBufferTest.cpp b/logd/LogBufferTest.cpp index 412b6f1d7..47d2a2f92 100644 --- a/logd/LogBufferTest.cpp +++ b/logd/LogBufferTest.cpp @@ -455,4 +455,5 @@ TEST_P(LogBufferTest, clear_logs) { CompareLogMessages(after_clear_messages, read_log_messages_after_clear); } -INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest, testing::Values("chatty", "simple")); +INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest, + testing::Values("chatty", "serialized", "simple")); diff --git a/logd/LogBufferTest.h b/logd/LogBufferTest.h index f91a1b544..235f5ac2b 100644 --- a/logd/LogBufferTest.h +++ b/logd/LogBufferTest.h @@ -26,6 +26,7 @@ #include "LogStatistics.h" #include "LogTags.h" #include "LogWhiteBlackList.h" +#include "SerializedLogBuffer.h" #include "SimpleLogBuffer.h" struct LogMessage { @@ -67,6 +68,8 @@ class LogBufferTest : public testing::TestWithParam { void SetUp() override { if (GetParam() == "chatty") { log_buffer_.reset(new ChattyLogBuffer(&reader_list_, &tags_, &prune_, &stats_)); + } else if (GetParam() == "serialized") { + log_buffer_.reset(new SerializedLogBuffer(&reader_list_, &tags_, &stats_)); } else if (GetParam() == "simple") { log_buffer_.reset(new SimpleLogBuffer(&reader_list_, &tags_, &stats_)); } else { diff --git a/logd/LogReaderThread.h b/logd/LogReaderThread.h index bf70b944e..1855c0e26 100644 --- a/logd/LogReaderThread.h +++ b/logd/LogReaderThread.h @@ -60,6 +60,7 @@ class LogReaderThread { std::string name() const { return writer_->name(); } uint64_t start() const { return flush_to_state_->start(); } std::chrono::steady_clock::time_point deadline() const { return deadline_; } + FlushToState& flush_to_state() { return *flush_to_state_; } private: void ThreadFunction(); diff --git a/logd/SerializedFlushToState.cpp b/logd/SerializedFlushToState.cpp new file mode 100644 index 000000000..17ecb6d08 --- /dev/null +++ b/logd/SerializedFlushToState.cpp @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SerializedFlushToState.h" + +#include + +SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask) + : FlushToState(start, log_mask) { + log_id_for_each(i) { + if (((1 << i) & log_mask) == 0) { + continue; + } + logs_needed_from_next_position_[i] = true; + } +} + +SerializedFlushToState::~SerializedFlushToState() { + log_id_for_each(i) { + if (log_positions_[i]) { + log_positions_[i]->buffer_it->DecReaderRefCount(true); + } + } +} + +void SerializedFlushToState::CreateLogPosition(log_id_t log_id) { + CHECK(!logs_[log_id].empty()); + LogPosition log_position; + auto it = logs_[log_id].begin(); + while (it != logs_[log_id].end() && start() > it->highest_sequence_number()) { + ++it; + } + if (it == logs_[log_id].end()) { + --it; + } + it->IncReaderRefCount(); + log_position.buffer_it = it; + + // Find the offset of the first log with sequence number >= start(). + int read_offset = 0; + while (read_offset < it->write_offset()) { + const auto* entry = it->log_entry(read_offset); + if (entry->sequence() >= start()) { + break; + } + read_offset += entry->total_len(); + } + log_position.read_offset = read_offset; + + log_positions_[log_id].emplace(std::move(log_position)); +} + +void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) { + auto& buffer_it = log_positions_[log_id]->buffer_it; + auto read_offset = log_positions_[log_id]->read_offset; + + // If there is another log to read in this buffer, add it to the min heap. + if (read_offset < buffer_it->write_offset()) { + auto* entry = buffer_it->log_entry(read_offset); + min_heap_.emplace(log_id, entry); + } else if (read_offset == buffer_it->write_offset()) { + // If there are no more logs to read in this buffer and it's the last buffer, then + // set logs_needed_from_next_position_ to wait until more logs get logged. + if (buffer_it == std::prev(logs_[log_id].end())) { + logs_needed_from_next_position_[log_id] = true; + } else { + // Otherwise, if there is another buffer piece, move to that and do the same check. + buffer_it->DecReaderRefCount(true); + ++buffer_it; + buffer_it->IncReaderRefCount(); + log_positions_[log_id]->read_offset = 0; + if (buffer_it->write_offset() == 0) { + logs_needed_from_next_position_[log_id] = true; + } else { + auto* entry = buffer_it->log_entry(0); + min_heap_.emplace(log_id, entry); + } + } + } else { + // read_offset > buffer_it->write_offset() should never happen. + CHECK(false); + } +} + +void SerializedFlushToState::CheckForNewLogs() { + log_id_for_each(i) { + if (!logs_needed_from_next_position_[i]) { + continue; + } + if (!log_positions_[i]) { + if (logs_[i].empty()) { + continue; + } + CreateLogPosition(i); + } + logs_needed_from_next_position_[i] = false; + // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true. + AddMinHeapEntry(i); + } +} + +MinHeapElement SerializedFlushToState::PopNextUnreadLog() { + auto top = min_heap_.top(); + min_heap_.pop(); + + auto* entry = top.entry; + auto log_id = top.log_id; + + log_positions_[log_id]->read_offset += entry->total_len(); + + AddMinHeapEntry(log_id); + + return top; +} + +void SerializedFlushToState::Prune(log_id_t log_id, + const std::list::iterator& buffer_it) { + // If we don't have a position for this log or if we're not referencing buffer_it, ignore. + if (!log_positions_[log_id].has_value() || log_positions_[log_id]->buffer_it != buffer_it) { + return; + } + + // // Decrease the ref count since we're deleting our reference. + buffer_it->DecReaderRefCount(false); + + // Delete in the reference. + log_positions_[log_id].reset(); + + // Remove the MinHeapElement referencing log_id, if it exists, but retain the others. + std::vector old_elements; + while (!min_heap_.empty()) { + auto& element = min_heap_.top(); + if (element.log_id != log_id) { + old_elements.emplace_back(element); + } + min_heap_.pop(); + } + for (auto&& element : old_elements) { + min_heap_.emplace(std::move(element)); + } + + // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the + // log_position_ object during the next read. + logs_needed_from_next_position_[log_id] = true; +} diff --git a/logd/SerializedFlushToState.h b/logd/SerializedFlushToState.h new file mode 100644 index 000000000..74b3de55c --- /dev/null +++ b/logd/SerializedFlushToState.h @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "LogBuffer.h" +#include "SerializedLogChunk.h" +#include "SerializedLogEntry.h" + +struct LogPosition { + std::list::iterator buffer_it; + int read_offset; +}; + +struct MinHeapElement { + MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry) + : log_id(log_id), entry(entry) {} + log_id_t log_id; + const SerializedLogEntry* entry; + // The change of comparison operators is intentional, std::priority_queue uses operator<() to + // compare but creates a max heap. Since we want a min heap, we return the opposite result. + bool operator<(const MinHeapElement& rhs) const { + return entry->sequence() > rhs.entry->sequence(); + } +}; + +// This class tracks the specific point where a FlushTo client has read through the logs. It +// directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset +// into each log chunk where it has last read. All interactions with this class, except for its +// construction, must be done with SerializedLogBuffer::lock_ held. No log chunks that it +// references may be pruned, which is handled by ensuring prune does not touch any log chunk with +// highest sequence number greater or equal to start(). +class SerializedFlushToState : public FlushToState { + public: + // Initializes this state object. For each log buffer set in log_mask, this sets + // logs_needed_from_next_position_. + SerializedFlushToState(uint64_t start, LogMask log_mask); + + // Decrease the reference of all referenced logs. This happens when a reader is disconnected. + ~SerializedFlushToState() override; + + // We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here. + void InitializeLogs(std::list* logs) { + if (logs_ == nullptr) logs_ = logs; + } + + // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and + // calls AddMinHeapEntry() if so. + void CheckForNewLogs(); + + bool HasUnreadLogs() { return !min_heap_.empty(); } + + // Pops the next unread log from the min heap. Add the next log for that log_id to the min heap + // if one is available otherwise set logs_needed_from_next_position_ to indicate that we're + // waiting for more logs. + MinHeapElement PopNextUnreadLog(); + + // If the parent log buffer prunes logs, the reference that this class contains may become + // invalid, so this must be called first to drop the reference to buffer_it, if any. + void Prune(log_id_t log_id, const std::list::iterator& buffer_it); + + private: + // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the + // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're + // waiting for the next log. + void AddMinHeapEntry(log_id_t log_id); + + // Create a LogPosition object for the given log_id by searching through the log chunks for the + // first chunk and then first log entry within that chunk that is greater or equal to start(). + void CreateLogPosition(log_id_t log_id); + + std::list* logs_ = nullptr; + // An optional structure that contains an iterator to the serialized log buffer and offset into + // it that this logger should handle next. + std::optional log_positions_[LOG_ID_MAX]; + // A bit for each log that is set if a given log_id has no logs or if this client has read all + // of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) && + // next_log_position == logs_write_position_)`. These will be re-checked in each + // loop in case new logs came in. + std::bitset logs_needed_from_next_position_ = {}; + // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next + // element that this reader should read. + std::priority_queue min_heap_; +}; diff --git a/logd/SerializedFlushToStateTest.cpp b/logd/SerializedFlushToStateTest.cpp new file mode 100644 index 000000000..a1d21ac13 --- /dev/null +++ b/logd/SerializedFlushToStateTest.cpp @@ -0,0 +1,254 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SerializedFlushToState.h" + +#include + +#include +#include +#include +#include + +using android::base::Join; +using android::base::StringPrintf; + +constexpr size_t kChunkSize = 3 * 4096; + +class SerializedFlushToStateTest : public testing::Test { + protected: + void SetUp() override { + // This test spams many unneeded INFO logs, so we suppress them. + old_log_severity_ = android::base::SetMinimumLogSeverity(android::base::WARNING); + } + void TearDown() override { android::base::SetMinimumLogSeverity(old_log_severity_); } + + std::string TestReport(const std::vector& expected, + const std::vector& read) { + auto sequence_to_log_id = [&](uint64_t sequence) -> int { + for (const auto& [log_id, sequences] : sequence_numbers_per_buffer_) { + if (std::find(sequences.begin(), sequences.end(), sequence) != sequences.end()) { + return log_id; + } + } + return -1; + }; + + std::map> missing_sequences; + std::vector missing_expected; + std::set_difference(expected.begin(), expected.end(), read.begin(), read.end(), + std::back_inserter(missing_expected)); + for (uint64_t sequence : missing_expected) { + int log_id = sequence_to_log_id(sequence); + missing_sequences[log_id].emplace_back(sequence); + } + + std::map> extra_sequences; + std::vector extra_read; + std::set_difference(read.begin(), read.end(), expected.begin(), expected.end(), + std::back_inserter(extra_read)); + for (uint64_t sequence : extra_read) { + int log_id = sequence_to_log_id(sequence); + extra_sequences[log_id].emplace_back(sequence); + } + + std::vector errors; + for (const auto& [log_id, sequences] : missing_sequences) { + errors.emplace_back( + StringPrintf("Log id %d missing %zu sequences", log_id, sequences.size())); + } + + for (const auto& [log_id, sequences] : extra_sequences) { + errors.emplace_back( + StringPrintf("Log id %d has extra %zu sequences", log_id, sequences.size())); + } + + return Join(errors, ", "); + } + + // Read sequence numbers in order from SerializedFlushToState for every mask combination and all + // sequence numbers from 0 through the highest logged sequence number + 1. + // This assumes that all of the logs have already been written. + void TestAllReading() { + uint64_t max_sequence = sequence_ + 1; + uint32_t max_mask = (1 << LOG_ID_MAX) - 1; + for (uint64_t sequence = 0; sequence < max_sequence; ++sequence) { + for (uint32_t mask = 0; mask < max_mask; ++mask) { + auto state = SerializedFlushToState{sequence, mask}; + state.InitializeLogs(log_chunks_); + state.CheckForNewLogs(); + TestReading(sequence, mask, state); + } + } + } + + // Similar to TestAllReading() except that it doesn't assume any logs are in the buffer, instead + // it calls write_logs() in a loop for sequence/mask combination. It clears log_chunks_ and + // sequence_numbers_per_buffer_ between calls, such that only the sequence numbers written in + // the previous call to write_logs() are expected. + void TestAllReadingWithFutureMessages(const std::function& write_logs) { + uint64_t max_sequence = sequence_ + 1; + uint32_t max_mask = (1 << LOG_ID_MAX) - 1; + for (uint64_t sequence = 1; sequence < max_sequence; ++sequence) { + for (uint32_t mask = 1; mask < max_mask; ++mask) { + log_id_for_each(i) { log_chunks_[i].clear(); } + auto state = SerializedFlushToState{sequence, mask}; + state.InitializeLogs(log_chunks_); + int loop_count = 0; + while (write_logs(loop_count++)) { + state.CheckForNewLogs(); + TestReading(sequence, mask, state); + sequence_numbers_per_buffer_.clear(); + } + } + } + } + + void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state) { + std::vector expected_sequence; + log_id_for_each(i) { + if (((1 << i) & log_mask) == 0) { + continue; + } + for (const auto& sequence : sequence_numbers_per_buffer_[i]) { + if (sequence >= start) { + expected_sequence.emplace_back(sequence); + } + } + } + std::sort(expected_sequence.begin(), expected_sequence.end()); + + std::vector read_sequence; + + while (state.HasUnreadLogs()) { + auto top = state.PopNextUnreadLog(); + read_sequence.emplace_back(top.entry->sequence()); + } + + EXPECT_TRUE(std::is_sorted(read_sequence.begin(), read_sequence.end())); + + EXPECT_EQ(expected_sequence.size(), read_sequence.size()); + + EXPECT_EQ(expected_sequence, read_sequence) + << "start: " << start << " log_mask: " << log_mask << " " + << TestReport(expected_sequence, read_sequence); + } + + // Add a chunk with the given messages to the a given log buffer. Keep track of the sequence + // numbers for future validation. Optionally mark the block as having finished writing. + void AddChunkWithMessages(int buffer, bool finish_writing, + const std::vector& messages) { + auto chunk = SerializedLogChunk{kChunkSize}; + for (const auto& message : messages) { + auto sequence = sequence_++; + sequence_numbers_per_buffer_[buffer].emplace_back(sequence); + ASSERT_TRUE(chunk.CanLog(message.size() + 1)); + chunk.Log(sequence, log_time(), 0, 1, 1, message.c_str(), message.size() + 1); + } + if (finish_writing) { + chunk.FinishWriting(); + } + log_chunks_[buffer].emplace_back(std::move(chunk)); + } + + android::base::LogSeverity old_log_severity_; + std::map> sequence_numbers_per_buffer_; + std::list log_chunks_[LOG_ID_MAX]; + uint64_t sequence_ = 1; +}; + +// 0: multiple chunks, with variable number of entries, with/without finishing writing +// 1: 1 chunk with 1 log and finished writing +// 2: 1 chunk with 1 log and not finished writing +// 3: 1 chunk with 0 logs and not finished writing +// 4: 1 chunk with 0 logs and finished writing (impossible, but SerializedFlushToState handles it) +// 5-7: 0 chunks +TEST_F(SerializedFlushToStateTest, smoke) { + AddChunkWithMessages(true, 0, {"1st", "2nd"}); + AddChunkWithMessages(true, 1, {"3rd"}); + AddChunkWithMessages(false, 0, {"4th"}); + AddChunkWithMessages(true, 0, {"4th", "5th", "more", "even", "more", "go", "here"}); + AddChunkWithMessages(false, 2, {"6th"}); + AddChunkWithMessages(true, 0, {"7th"}); + AddChunkWithMessages(false, 3, {}); + AddChunkWithMessages(true, 4, {}); + + TestAllReading(); +} + +TEST_F(SerializedFlushToStateTest, random) { + srand(1); + for (int count = 0; count < 20; ++count) { + unsigned int num_messages = 1 + rand() % 15; + auto messages = std::vector{num_messages, "same message"}; + + bool compress = rand() % 2; + int buf = rand() % LOG_ID_MAX; + + AddChunkWithMessages(compress, buf, messages); + } + + TestAllReading(); +} + +// Same start as smoke, but we selectively write logs to the buffers and ensure they're read. +TEST_F(SerializedFlushToStateTest, future_writes) { + auto write_logs = [&](int loop_count) { + switch (loop_count) { + case 0: + // Initial writes. + AddChunkWithMessages(true, 0, {"1st", "2nd"}); + AddChunkWithMessages(true, 1, {"3rd"}); + AddChunkWithMessages(false, 0, {"4th"}); + AddChunkWithMessages(true, 0, {"4th", "5th", "more", "even", "more", "go", "here"}); + AddChunkWithMessages(false, 2, {"6th"}); + AddChunkWithMessages(true, 0, {"7th"}); + AddChunkWithMessages(false, 3, {}); + AddChunkWithMessages(true, 4, {}); + break; + case 1: + // Smoke test, add a simple chunk. + AddChunkWithMessages(true, 0, {"1st", "2nd"}); + break; + case 2: + // Add chunks to all but one of the logs. + AddChunkWithMessages(true, 0, {"1st", "2nd"}); + AddChunkWithMessages(true, 1, {"1st", "2nd"}); + AddChunkWithMessages(true, 2, {"1st", "2nd"}); + AddChunkWithMessages(true, 3, {"1st", "2nd"}); + AddChunkWithMessages(true, 4, {"1st", "2nd"}); + AddChunkWithMessages(true, 5, {"1st", "2nd"}); + AddChunkWithMessages(true, 6, {"1st", "2nd"}); + break; + case 3: + // Finally add chunks to all logs. + AddChunkWithMessages(true, 0, {"1st", "2nd"}); + AddChunkWithMessages(true, 1, {"1st", "2nd"}); + AddChunkWithMessages(true, 2, {"1st", "2nd"}); + AddChunkWithMessages(true, 3, {"1st", "2nd"}); + AddChunkWithMessages(true, 4, {"1st", "2nd"}); + AddChunkWithMessages(true, 5, {"1st", "2nd"}); + AddChunkWithMessages(true, 6, {"1st", "2nd"}); + AddChunkWithMessages(true, 7, {"1st", "2nd"}); + break; + default: + return false; + } + return true; + }; + + TestAllReadingWithFutureMessages(write_logs); +} diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp new file mode 100644 index 000000000..8bda7cf28 --- /dev/null +++ b/logd/SerializedLogBuffer.cpp @@ -0,0 +1,351 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SerializedLogBuffer.h" + +#include +#include + +#include +#include + +#include "LogStatistics.h" +#include "SerializedFlushToState.h" + +SerializedLogBuffer::SerializedLogBuffer(LogReaderList* reader_list, LogTags* tags, + LogStatistics* stats) + : reader_list_(reader_list), tags_(tags), stats_(stats) { + Init(); +} + +SerializedLogBuffer::~SerializedLogBuffer() {} + +void SerializedLogBuffer::Init() { + log_id_for_each(i) { + if (SetSize(i, __android_logger_get_buffer_size(i))) { + SetSize(i, LOG_BUFFER_MIN_SIZE); + } + } + + // Release any sleeping reader threads to dump their current content. + auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; + for (const auto& reader_thread : reader_list_->reader_threads()) { + reader_thread->triggerReader_Locked(); + } +} + +bool SerializedLogBuffer::ShouldLog(log_id_t log_id, const char* msg, uint16_t len) { + if (log_id == LOG_ID_SECURITY) { + return true; + } + + int prio = ANDROID_LOG_INFO; + const char* tag = nullptr; + size_t tag_len = 0; + if (IsBinary(log_id)) { + int32_t tag_int = MsgToTag(msg, len); + tag = tags_->tagToName(tag_int); + if (tag) { + tag_len = strlen(tag); + } + } else { + prio = *msg; + tag = msg + 1; + tag_len = strnlen(tag, len - 1); + } + return __android_log_is_loggable_len(prio, tag, tag_len, ANDROID_LOG_VERBOSE); +} + +int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, + const char* msg, uint16_t len) { + if (log_id >= LOG_ID_MAX || len == 0) { + return -EINVAL; + } + + if (!ShouldLog(log_id, msg, len)) { + stats_->AddTotal(log_id, len); + return -EACCES; + } + + auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed); + + auto lock = std::lock_guard{lock_}; + + if (logs_[log_id].empty()) { + logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4)); + } + + auto total_len = sizeof(SerializedLogEntry) + len; + if (!logs_[log_id].back().CanLog(total_len)) { + logs_[log_id].back().FinishWriting(); + logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4)); + } + + auto entry = logs_[log_id].back().Log(sequence, realtime, uid, pid, tid, msg, len); + stats_->Add(entry->ToLogStatisticsElement(log_id)); + + MaybePrune(log_id); + + reader_list_->NotifyNewLog(1 << log_id); + return len; +} + +void SerializedLogBuffer::MaybePrune(log_id_t log_id) { + auto get_total_size = [](const auto& buffer) { + size_t total_size = 0; + for (const auto& chunk : buffer) { + total_size += chunk.PruneSize(); + } + return total_size; + }; + size_t total_size = get_total_size(logs_[log_id]); + if (total_size > max_size_[log_id]) { + Prune(log_id, total_size - max_size_[log_id], 0); + LOG(INFO) << "Pruned Logs from log_id: " << log_id << ", previous size: " << total_size + << " after size: " << get_total_size(logs_[log_id]); + } +} + +// Decompresses the chunks, call LogStatistics::Subtract() on each entry, then delete the chunks and +// the list. Note that the SerializedLogChunk objects have been removed from logs_ and their +// references have been deleted from any SerializedFlushToState objects, so this can be safely done +// without holding lock_. It is done in a separate thread to avoid delaying the writer thread. The +// lambda for the thread takes ownership of the 'chunks' list and thus when the thread ends and the +// lambda is deleted, the objects are deleted. +void SerializedLogBuffer::DeleteLogChunks(std::list&& chunks, log_id_t log_id) { + auto delete_thread = std::thread{[chunks = std::move(chunks), log_id, this]() mutable { + for (auto& chunk : chunks) { + chunk.IncReaderRefCount(); + int read_offset = 0; + while (read_offset < chunk.write_offset()) { + auto* entry = chunk.log_entry(read_offset); + stats_->Subtract(entry->ToLogStatisticsElement(log_id)); + read_offset += entry->total_len(); + } + chunk.DecReaderRefCount(false); + } + }}; + delete_thread.detach(); +} + +void SerializedLogBuffer::NotifyReadersOfPrune( + log_id_t log_id, const std::list::iterator& chunk) { + for (const auto& reader_thread : reader_list_->reader_threads()) { + auto& state = reinterpret_cast(reader_thread->flush_to_state()); + state.Prune(log_id, chunk); + } +} + +bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) { + // Don't prune logs that are newer than the point at which any reader threads are reading from. + LogReaderThread* oldest = nullptr; + auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; + for (const auto& reader_thread : reader_list_->reader_threads()) { + if (!reader_thread->IsWatching(log_id)) { + continue; + } + if (!oldest || oldest->start() > reader_thread->start() || + (oldest->start() == reader_thread->start() && + reader_thread->deadline().time_since_epoch().count() != 0)) { + oldest = reader_thread.get(); + } + } + + auto& log_buffer = logs_[log_id]; + + std::list chunks_to_prune; + auto prune_chunks = android::base::make_scope_guard([&chunks_to_prune, log_id, this] { + DeleteLogChunks(std::move(chunks_to_prune), log_id); + }); + + auto it = log_buffer.begin(); + while (it != log_buffer.end()) { + if (oldest != nullptr && it->highest_sequence_number() >= oldest->start()) { + break; + } + + // Increment ahead of time since we're going to splice this iterator from the list. + auto it_to_prune = it++; + + // The sequence number check ensures that all readers have read all logs in this chunk, but + // they may still hold a reference to the chunk to track their last read log_position. + // Notify them to delete the reference. + NotifyReadersOfPrune(log_id, it_to_prune); + + if (uid != 0) { + // Reorder the log buffer to remove logs from the given UID. If there are no logs left + // in the buffer after the removal, delete it. + if (it_to_prune->ClearUidLogs(uid, log_id, stats_)) { + log_buffer.erase(it_to_prune); + } + } else { + size_t buffer_size = it_to_prune->PruneSize(); + chunks_to_prune.splice(chunks_to_prune.end(), log_buffer, it_to_prune); + if (buffer_size >= bytes_to_free) { + return true; + } + bytes_to_free -= buffer_size; + } + } + + // If we've deleted all buffers without bytes_to_free hitting 0, then we're called by Clear() + // and should return true. + if (it == log_buffer.end()) { + return true; + } + + // Otherwise we are stuck due to a reader, so mitigate it. + KickReader(oldest, log_id, bytes_to_free); + return false; +} + +// If the selected reader is blocking our pruning progress, decide on +// what kind of mitigation is necessary to unblock the situation. +void SerializedLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) { + if (bytes_to_free >= max_size_[id]) { // +100% + // A misbehaving or slow reader is dropped if we hit too much memory pressure. + LOG(WARNING) << "Kicking blocked reader, " << reader->name() + << ", from LogBuffer::kickMe()"; + reader->release_Locked(); + } else if (reader->deadline().time_since_epoch().count() != 0) { + // Allow a blocked WRAP deadline reader to trigger and start reporting the log data. + reader->triggerReader_Locked(); + } else { + // Tell slow reader to skip entries to catch up. + unsigned long prune_rows = bytes_to_free / 300; + LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name() + << ", from LogBuffer::kickMe()"; + reader->triggerSkip_Locked(id, prune_rows); + } +} + +std::unique_ptr SerializedLogBuffer::CreateFlushToState(uint64_t start, + LogMask log_mask) { + return std::make_unique(start, log_mask); +} + +bool SerializedLogBuffer::FlushTo( + LogWriter* writer, FlushToState& abstract_state, + const std::function& filter) { + auto lock = std::unique_lock{lock_}; + + auto& state = reinterpret_cast(abstract_state); + state.InitializeLogs(logs_); + state.CheckForNewLogs(); + + while (state.HasUnreadLogs()) { + MinHeapElement top = state.PopNextUnreadLog(); + auto* entry = top.entry; + auto log_id = top.log_id; + + if (entry->sequence() < state.start()) { + continue; + } + state.set_start(entry->sequence()); + + if (!writer->privileged() && entry->uid() != writer->uid()) { + continue; + } + + if (filter) { + auto ret = filter(log_id, entry->pid(), entry->sequence(), entry->realtime()); + if (ret == FilterResult::kSkip) { + continue; + } + if (ret == FilterResult::kStop) { + break; + } + } + + lock.unlock(); + // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the + // `entry` pointer is safe here without the lock + if (!entry->Flush(writer, log_id)) { + return false; + } + lock.lock(); + + // Since we released the log above, buffers that aren't in our min heap may now have had + // logs added, so re-check them. + state.CheckForNewLogs(); + } + + state.set_start(state.start() + 1); + return true; +} + +bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) { + // Try three times to clear, then disconnect the readers and try one final time. + for (int retry = 0; retry < 3; ++retry) { + { + auto lock = std::lock_guard{lock_}; + bool prune_success = Prune(id, ULONG_MAX, uid); + if (prune_success) { + return true; + } + } + sleep(1); + } + // Check if it is still busy after the sleep, we try to prune one entry, not another clear run, + // so we are looking for the quick side effect of the return value to tell us if we have a + // _blocked_ reader. + bool busy = false; + { + auto lock = std::lock_guard{lock_}; + busy = !Prune(id, 1, uid); + } + // It is still busy, disconnect all readers. + if (busy) { + auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; + for (const auto& reader_thread : reader_list_->reader_threads()) { + if (reader_thread->IsWatching(id)) { + LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name() + << ", from LogBuffer::clear()"; + reader_thread->release_Locked(); + } + } + } + auto lock = std::lock_guard{lock_}; + return Prune(id, ULONG_MAX, uid); +} + +unsigned long SerializedLogBuffer::GetSize(log_id_t id) { + auto lock = std::lock_guard{lock_}; + size_t retval = 2 * max_size_[id] / 3; // See the comment in SetSize(). + return retval; +} + +// New SerializedLogChunk objects will be allocated according to the new size, but older one are +// unchanged. MaybePrune() is called on the log buffer to reduce it to an appropriate size if the +// new size is lower. +// ChattyLogBuffer/SimpleLogBuffer don't consider the 'Overhead' of LogBufferElement or the +// std::list<> overhead as part of the log size. SerializedLogBuffer does by its very nature, so +// the 'size' metric is not compatible between them. Their actual memory usage is between 1.5x and +// 2x of what they claim to use, so we conservatively set our internal size as size + size / 2. +int SerializedLogBuffer::SetSize(log_id_t id, unsigned long size) { + // Reasonable limits ... + if (!__android_logger_valid_buffer_size(size)) { + return -1; + } + + auto lock = std::lock_guard{lock_}; + max_size_[id] = size + size / 2; + + MaybePrune(id); + + return 0; +} diff --git a/logd/SerializedLogBuffer.h b/logd/SerializedLogBuffer.h new file mode 100644 index 000000000..27334bac3 --- /dev/null +++ b/logd/SerializedLogBuffer.h @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +#include "LogBuffer.h" +#include "LogReaderList.h" +#include "LogStatistics.h" +#include "LogTags.h" +#include "SerializedLogChunk.h" +#include "SerializedLogEntry.h" +#include "rwlock.h" + +class SerializedLogBuffer : public LogBuffer { + public: + SerializedLogBuffer(LogReaderList* reader_list, LogTags* tags, LogStatistics* stats); + ~SerializedLogBuffer(); + void Init() override; + + int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, + uint16_t len) override; + std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) override; + bool FlushTo(LogWriter* writer, FlushToState& state, + const std::function& filter) override; + + bool Clear(log_id_t id, uid_t uid) override; + unsigned long GetSize(log_id_t id) override; + int SetSize(log_id_t id, unsigned long size) override; + + uint64_t sequence() const override { return sequence_.load(std::memory_order_relaxed); } + + private: + bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len); + void MaybePrune(log_id_t log_id) REQUIRES(lock_); + bool Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_); + void KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) + REQUIRES_SHARED(lock_); + void NotifyReadersOfPrune(log_id_t log_id, const std::list::iterator& chunk) + REQUIRES(reader_list_->reader_threads_lock()); + void DeleteLogChunks(std::list&& chunks, log_id_t log_id); + + LogReaderList* reader_list_; + LogTags* tags_; + LogStatistics* stats_; + + unsigned long max_size_[LOG_ID_MAX] GUARDED_BY(lock_) = {}; + std::list logs_[LOG_ID_MAX] GUARDED_BY(lock_); + RwLock lock_; + + std::atomic sequence_ = 1; +}; diff --git a/logd/SerializedLogChunk.cpp b/logd/SerializedLogChunk.cpp new file mode 100644 index 000000000..2516003c1 --- /dev/null +++ b/logd/SerializedLogChunk.cpp @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SerializedLogChunk.h" + +#include + +#include "CompressionEngine.h" + +SerializedLogChunk::~SerializedLogChunk() { + CHECK_EQ(reader_ref_count_, 0U); +} + +void SerializedLogChunk::Compress() { + if (compressed_log_.empty()) { + CompressionEngine::GetInstance().Compress({contents_.data(), write_offset_}, + compressed_log_); + LOG(INFO) << "Compressed Log, buffer max size: " << contents_.size() + << " size used: " << write_offset_ + << " compressed size: " << compressed_log_.size(); + } + contents_.resize(0); +} + +// TODO: Develop a better reference counting strategy to guard against the case where the writer is +// much faster than the reader, and we needlessly compess / decompress the logs. +void SerializedLogChunk::IncReaderRefCount() { + if (++reader_ref_count_ != 1 || writer_active_) { + return; + } + CompressionEngine::GetInstance().Decompress(compressed_log_, contents_, write_offset_); +} + +void SerializedLogChunk::DecReaderRefCount(bool compress) { + CHECK_NE(reader_ref_count_, 0U); + if (--reader_ref_count_ != 0) { + return; + } + if (compress && !writer_active_) { + Compress(); + } +} + +bool SerializedLogChunk::ClearUidLogs(uid_t uid, log_id_t log_id, LogStatistics* stats) { + CHECK_EQ(reader_ref_count_, 0U); + if (write_offset_ == 0) { + return true; + } + + IncReaderRefCount(); + + int read_offset = 0; + int new_write_offset = 0; + while (read_offset < write_offset_) { + const auto* entry = log_entry(read_offset); + if (entry->uid() == uid) { + read_offset += entry->total_len(); + if (stats != nullptr) { + stats->Subtract(entry->ToLogStatisticsElement(log_id)); + } + continue; + } + size_t entry_total_len = entry->total_len(); + if (read_offset != new_write_offset) { + memmove(contents_.data() + new_write_offset, contents_.data() + read_offset, + entry_total_len); + } + read_offset += entry_total_len; + new_write_offset += entry_total_len; + } + + if (new_write_offset == 0) { + DecReaderRefCount(false); + return true; + } + + // Clear the old compressed logs and set write_offset_ appropriately for DecReaderRefCount() + // to compress the new partially cleared log. + if (new_write_offset != write_offset_) { + compressed_log_.clear(); + write_offset_ = new_write_offset; + } + + DecReaderRefCount(true); + + return false; +} + +bool SerializedLogChunk::CanLog(size_t len) { + return write_offset_ + len <= contents_.size(); +} + +SerializedLogEntry* SerializedLogChunk::Log(uint64_t sequence, log_time realtime, uid_t uid, + pid_t pid, pid_t tid, const char* msg, uint16_t len) { + auto new_log_address = contents_.data() + write_offset_; + auto* entry = new (new_log_address) SerializedLogEntry(uid, pid, tid, sequence, realtime, len); + memcpy(entry->msg(), msg, len); + write_offset_ += entry->total_len(); + highest_sequence_number_ = sequence; + return entry; +} \ No newline at end of file diff --git a/logd/SerializedLogChunk.h b/logd/SerializedLogChunk.h new file mode 100644 index 000000000..75cb93620 --- /dev/null +++ b/logd/SerializedLogChunk.h @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "LogWriter.h" +#include "SerializedLogEntry.h" + +class SerializedLogChunk { + public: + SerializedLogChunk(size_t size) : contents_(size) {} + ~SerializedLogChunk(); + + void Compress(); + void IncReaderRefCount(); + // Decrease the reader ref count and compress the log if appropriate. `compress` should only be + // set to false in the case that the log buffer will be deleted afterwards. + void DecReaderRefCount(bool compress); + + // Must have no readers referencing this. Return true if there are no logs left in this chunk. + bool ClearUidLogs(uid_t uid, log_id_t log_id, LogStatistics* stats); + + bool CanLog(size_t len); + SerializedLogEntry* Log(uint64_t sequence, log_time realtime, uid_t uid, pid_t pid, pid_t tid, + const char* msg, uint16_t len); + + // If this buffer has been compressed, we only consider its compressed size when accounting for + // memory consumption for pruning. This is since the uncompressed log is only by used by + // readers, and thus not a representation of how much these logs cost to keep in memory. + size_t PruneSize() const { return compressed_log_.size() ?: contents_.size(); } + + void FinishWriting() { + writer_active_ = false; + if (reader_ref_count_ == 0) { + Compress(); + } + } + + const SerializedLogEntry* log_entry(int offset) const { + return reinterpret_cast(data() + offset); + } + const uint8_t* data() const { return contents_.data(); } + int write_offset() const { return write_offset_; } + uint64_t highest_sequence_number() const { return highest_sequence_number_; } + + private: + // The decompressed contents of this log buffer. Deallocated when the ref_count reaches 0 and + // writer_active_ is false. + std::vector contents_; + int write_offset_ = 0; + uint32_t reader_ref_count_ = 0; + bool writer_active_ = true; + uint64_t highest_sequence_number_ = 1; + std::vector compressed_log_; +}; diff --git a/logd/SerializedLogChunkTest.cpp b/logd/SerializedLogChunkTest.cpp new file mode 100644 index 000000000..657250335 --- /dev/null +++ b/logd/SerializedLogChunkTest.cpp @@ -0,0 +1,284 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SerializedLogChunk.h" + +#include + +#include +#include +#include + +using android::base::StringPrintf; + +TEST(SerializedLogChunk, smoke) { + size_t chunk_size = 10 * 4096; + auto chunk = SerializedLogChunk{chunk_size}; + EXPECT_EQ(chunk_size, chunk.PruneSize()); + + static const char log_message[] = "log message"; + size_t expected_total_len = sizeof(SerializedLogEntry) + sizeof(log_message); + ASSERT_TRUE(chunk.CanLog(expected_total_len)); + EXPECT_TRUE(chunk.CanLog(chunk_size)); + EXPECT_FALSE(chunk.CanLog(chunk_size + 1)); + + log_time time(CLOCK_REALTIME); + auto* entry = chunk.Log(1234, time, 0, 1, 2, log_message, sizeof(log_message)); + ASSERT_NE(nullptr, entry); + + EXPECT_EQ(1234U, entry->sequence()); + EXPECT_EQ(time, entry->realtime()); + EXPECT_EQ(0U, entry->uid()); + EXPECT_EQ(1, entry->pid()); + EXPECT_EQ(2, entry->tid()); + EXPECT_EQ(sizeof(log_message), entry->msg_len()); + EXPECT_STREQ(log_message, entry->msg()); + EXPECT_EQ(expected_total_len, entry->total_len()); + + EXPECT_FALSE(chunk.CanLog(chunk_size)); + EXPECT_EQ(static_cast(expected_total_len), chunk.write_offset()); + EXPECT_EQ(1234U, chunk.highest_sequence_number()); +} + +TEST(SerializedLogChunk, fill_log_exactly) { + static const char log_message[] = "this is a log message"; + size_t individual_message_size = sizeof(SerializedLogEntry) + sizeof(log_message); + size_t chunk_size = individual_message_size * 3; + auto chunk = SerializedLogChunk{chunk_size}; + EXPECT_EQ(chunk_size, chunk.PruneSize()); + + ASSERT_TRUE(chunk.CanLog(individual_message_size)); + EXPECT_NE(nullptr, chunk.Log(1, log_time(), 1000, 1, 1, log_message, sizeof(log_message))); + + ASSERT_TRUE(chunk.CanLog(individual_message_size)); + EXPECT_NE(nullptr, chunk.Log(2, log_time(), 1000, 2, 1, log_message, sizeof(log_message))); + + ASSERT_TRUE(chunk.CanLog(individual_message_size)); + EXPECT_NE(nullptr, chunk.Log(3, log_time(), 1000, 3, 1, log_message, sizeof(log_message))); + + EXPECT_FALSE(chunk.CanLog(1)); +} + +TEST(SerializedLogChunk, three_logs) { + size_t chunk_size = 10 * 4096; + auto chunk = SerializedLogChunk{chunk_size}; + + chunk.Log(2, log_time(0x1234, 0x5678), 0x111, 0x222, 0x333, "initial message", + strlen("initial message")); + chunk.Log(3, log_time(0x2345, 0x6789), 0x444, 0x555, 0x666, "second message", + strlen("second message")); + auto uint64_t_max = std::numeric_limits::max(); + auto uint32_t_max = std::numeric_limits::max(); + chunk.Log(uint64_t_max, log_time(uint32_t_max, uint32_t_max), uint32_t_max, uint32_t_max, + uint32_t_max, "last message", strlen("last message")); + + static const char expected_buffer_data[] = + "\x11\x01\x00\x00\x22\x02\x00\x00\x33\x03\x00\x00" // UID PID TID + "\x02\x00\x00\x00\x00\x00\x00\x00" // Sequence + "\x34\x12\x00\x00\x78\x56\x00\x00" // Timestamp + "\x0F\x00initial message" // msg_len + message + "\x44\x04\x00\x00\x55\x05\x00\x00\x66\x06\x00\x00" // UID PID TID + "\x03\x00\x00\x00\x00\x00\x00\x00" // Sequence + "\x45\x23\x00\x00\x89\x67\x00\x00" // Timestamp + "\x0E\x00second message" // msg_len + message + "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" // UID PID TID + "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" // Sequence + "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" // Timestamp + "\x0C\x00last message"; // msg_len + message + + for (size_t i = 0; i < chunk_size; ++i) { + if (i < sizeof(expected_buffer_data)) { + EXPECT_EQ(static_cast(expected_buffer_data[i]), chunk.data()[i]) + << "position: " << i; + } else { + EXPECT_EQ(0, chunk.data()[i]) << "position: " << i; + } + } +} + +// Check that the CHECK() in DecReaderRefCount() if the ref count goes bad is caught. +TEST(SerializedLogChunk, catch_DecCompressedRef_CHECK) { + size_t chunk_size = 10 * 4096; + auto chunk = SerializedLogChunk{chunk_size}; + EXPECT_DEATH({ chunk.DecReaderRefCount(true); }, ""); + EXPECT_DEATH({ chunk.DecReaderRefCount(false); }, ""); +} + +// Check that the CHECK() in ClearUidLogs() if the ref count is greater than 0 is caught. +TEST(SerializedLogChunk, catch_ClearUidLogs_CHECK) { + size_t chunk_size = 10 * 4096; + auto chunk = SerializedLogChunk{chunk_size}; + chunk.IncReaderRefCount(); + EXPECT_DEATH({ chunk.ClearUidLogs(1000, LOG_ID_MAIN, nullptr); }, ""); + chunk.DecReaderRefCount(false); +} + +class UidClearTest : public testing::TestWithParam { + protected: + template + void Test(const Write& write, const Check& check, bool expected_result) { + write(chunk_); + + bool finish_writing = GetParam(); + if (finish_writing) { + chunk_.FinishWriting(); + } + EXPECT_EQ(expected_result, chunk_.ClearUidLogs(kUidToClear, LOG_ID_MAIN, nullptr)); + if (finish_writing) { + chunk_.IncReaderRefCount(); + } + + check(chunk_); + + if (finish_writing) { + chunk_.DecReaderRefCount(false); + } + } + + static constexpr size_t kChunkSize = 10 * 4096; + static constexpr uid_t kUidToClear = 1000; + static constexpr uid_t kOtherUid = 1234; + + SerializedLogChunk chunk_{kChunkSize}; +}; + +// Test that ClearUidLogs() is a no-op if there are no logs of that UID in the buffer. +TEST_P(UidClearTest, no_logs_in_chunk) { + auto write = [](SerializedLogChunk&) {}; + auto check = [](SerializedLogChunk&) {}; + + Test(write, check, true); +} + +// Test that ClearUidLogs() is a no-op if there are no logs of that UID in the buffer. +TEST_P(UidClearTest, no_logs_from_uid) { + static const char msg[] = "this is a log message"; + auto write = [](SerializedLogChunk& chunk) { + chunk.Log(1, log_time(), kOtherUid, 1, 2, msg, sizeof(msg)); + }; + + auto check = [](SerializedLogChunk& chunk) { + auto* entry = chunk.log_entry(0); + EXPECT_STREQ(msg, entry->msg()); + }; + + Test(write, check, false); +} + +// Test that ClearUidLogs() returns true if all logs in a given buffer correspond to the given UID. +TEST_P(UidClearTest, all_single) { + static const char msg[] = "this is a log message"; + auto write = [](SerializedLogChunk& chunk) { + chunk.Log(1, log_time(), kUidToClear, 1, 2, msg, sizeof(msg)); + }; + auto check = [](SerializedLogChunk&) {}; + + Test(write, check, true); +} + +// Test that ClearUidLogs() returns true if all logs in a given buffer correspond to the given UID. +TEST_P(UidClearTest, all_multiple) { + static const char msg[] = "this is a log message"; + auto write = [](SerializedLogChunk& chunk) { + chunk.Log(2, log_time(), kUidToClear, 1, 2, msg, sizeof(msg)); + chunk.Log(3, log_time(), kUidToClear, 1, 2, msg, sizeof(msg)); + chunk.Log(4, log_time(), kUidToClear, 1, 2, msg, sizeof(msg)); + }; + auto check = [](SerializedLogChunk&) {}; + + Test(write, check, true); +} + +static std::string MakePrintable(const uint8_t* in, size_t length) { + std::string result; + for (size_t i = 0; i < length; ++i) { + uint8_t c = in[i]; + if (isprint(c)) { + result.push_back(c); + } else { + result.append(StringPrintf("\\%02x", static_cast(c) & 0xFF)); + } + } + return result; +} + +// This test clears UID logs at the beginning and end of the buffer, as well as two back to back +// logs in the interior. +TEST_P(UidClearTest, clear_beginning_and_end) { + static const char msg1[] = "this is a log message"; + static const char msg2[] = "non-cleared message"; + static const char msg3[] = "back to back cleared messages"; + static const char msg4[] = "second in a row gone"; + static const char msg5[] = "but we save this one"; + static const char msg6[] = "and this 1!"; + static const char msg7[] = "the last one goes too"; + auto write = [](SerializedLogChunk& chunk) { + ASSERT_NE(nullptr, chunk.Log(1, log_time(), kUidToClear, 1, 2, msg1, sizeof(msg1))); + ASSERT_NE(nullptr, chunk.Log(2, log_time(), kOtherUid, 1, 2, msg2, sizeof(msg2))); + ASSERT_NE(nullptr, chunk.Log(3, log_time(), kUidToClear, 1, 2, msg3, sizeof(msg3))); + ASSERT_NE(nullptr, chunk.Log(4, log_time(), kUidToClear, 1, 2, msg4, sizeof(msg4))); + ASSERT_NE(nullptr, chunk.Log(5, log_time(), kOtherUid, 1, 2, msg5, sizeof(msg5))); + ASSERT_NE(nullptr, chunk.Log(6, log_time(), kOtherUid, 1, 2, msg6, sizeof(msg6))); + ASSERT_NE(nullptr, chunk.Log(7, log_time(), kUidToClear, 1, 2, msg7, sizeof(msg7))); + }; + + auto check = [](SerializedLogChunk& chunk) { + size_t read_offset = 0; + auto* entry = chunk.log_entry(read_offset); + EXPECT_STREQ(msg2, entry->msg()); + read_offset += entry->total_len(); + + entry = chunk.log_entry(read_offset); + EXPECT_STREQ(msg5, entry->msg()); + read_offset += entry->total_len(); + + entry = chunk.log_entry(read_offset); + EXPECT_STREQ(msg6, entry->msg()) << MakePrintable(chunk.data(), chunk.write_offset()); + read_offset += entry->total_len(); + + EXPECT_EQ(static_cast(read_offset), chunk.write_offset()); + }; + Test(write, check, false); +} + +// This tests the opposite case of beginning_and_end, in which we don't clear the beginning or end +// logs. There is a single log pruned in the middle instead of back to back logs. +TEST_P(UidClearTest, save_beginning_and_end) { + static const char msg1[] = "saved first message"; + static const char msg2[] = "cleared interior message"; + static const char msg3[] = "last message stays"; + auto write = [](SerializedLogChunk& chunk) { + ASSERT_NE(nullptr, chunk.Log(1, log_time(), kOtherUid, 1, 2, msg1, sizeof(msg1))); + ASSERT_NE(nullptr, chunk.Log(2, log_time(), kUidToClear, 1, 2, msg2, sizeof(msg2))); + ASSERT_NE(nullptr, chunk.Log(3, log_time(), kOtherUid, 1, 2, msg3, sizeof(msg3))); + }; + + auto check = [](SerializedLogChunk& chunk) { + size_t read_offset = 0; + auto* entry = chunk.log_entry(read_offset); + EXPECT_STREQ(msg1, entry->msg()); + read_offset += entry->total_len(); + + entry = chunk.log_entry(read_offset); + EXPECT_STREQ(msg3, entry->msg()); + read_offset += entry->total_len(); + + EXPECT_EQ(static_cast(read_offset), chunk.write_offset()); + }; + Test(write, check, false); +} + +INSTANTIATE_TEST_CASE_P(UidClearTests, UidClearTest, testing::Values(true, false)); diff --git a/logd/SerializedLogEntry.h b/logd/SerializedLogEntry.h new file mode 100644 index 000000000..f599abeaa --- /dev/null +++ b/logd/SerializedLogEntry.h @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +#include "LogStatistics.h" +#include "LogWriter.h" + +// These structs are packed into a single chunk of memory for each log type within a +// SerializedLogChunk object. Their message is contained immediately at the end of the struct. The +// address of the next log in the buffer is *this + sizeof(SerializedLogEntry) + msg_len_. If that +// value would overflow the chunk of memory associated with the SerializedLogChunk object, then a +// new SerializedLogChunk must be allocated to contain the next SerializedLogEntry. +class __attribute__((packed)) SerializedLogEntry { + public: + SerializedLogEntry(uid_t uid, pid_t pid, pid_t tid, uint64_t sequence, log_time realtime, + uint16_t len) + : uid_(uid), + pid_(pid), + tid_(tid), + sequence_(sequence), + realtime_(realtime), + msg_len_(len) {} + SerializedLogEntry(const SerializedLogEntry& elem) = delete; + SerializedLogEntry& operator=(const SerializedLogEntry& elem) = delete; + ~SerializedLogEntry() { + // Never place anything in this destructor. This class is in place constructed and never + // destructed. + } + + LogStatisticsElement ToLogStatisticsElement(log_id_t log_id) const { + return LogStatisticsElement{ + .uid = uid(), + .pid = pid(), + .tid = tid(), + .tag = IsBinary(log_id) ? MsgToTag(msg(), msg_len()) : 0, + .realtime = realtime(), + .msg = msg(), + .msg_len = msg_len(), + .dropped_count = 0, + .log_id = log_id, + }; + } + + bool Flush(LogWriter* writer, log_id_t log_id) const { + struct logger_entry entry = {}; + + entry.hdr_size = sizeof(struct logger_entry); + entry.lid = log_id; + entry.pid = pid(); + entry.tid = tid(); + entry.uid = uid(); + entry.sec = realtime().tv_sec; + entry.nsec = realtime().tv_nsec; + entry.len = msg_len(); + + return writer->Write(entry, msg()); + } + + uid_t uid() const { return uid_; } + pid_t pid() const { return pid_; } + pid_t tid() const { return tid_; } + uint16_t msg_len() const { return msg_len_; } + uint64_t sequence() const { return sequence_; } + log_time realtime() const { return realtime_; } + + char* msg() { return reinterpret_cast(this) + sizeof(*this); } + const char* msg() const { return reinterpret_cast(this) + sizeof(*this); } + uint16_t total_len() const { return sizeof(*this) + msg_len_; } + + private: + const uint32_t uid_; + const uint32_t pid_; + const uint32_t tid_; + const uint64_t sequence_; + const log_time realtime_; + const uint16_t msg_len_; +}; diff --git a/logd/fuzz/Android.bp b/logd/fuzz/Android.bp index f65fbdfe6..9834ff058 100644 --- a/logd/fuzz/Android.bp +++ b/logd/fuzz/Android.bp @@ -26,6 +26,8 @@ cc_fuzz { "liblogd", "libcutils", "libsysutils", + "libz", + "libzstd", ], cflags: ["-Werror"], } diff --git a/logd/main.cpp b/logd/main.cpp index 773ffb8e1..46b65679a 100644 --- a/logd/main.cpp +++ b/logd/main.cpp @@ -58,6 +58,7 @@ #include "LogStatistics.h" #include "LogTags.h" #include "LogUtils.h" +#include "SerializedLogBuffer.h" #include "SimpleLogBuffer.h" #define KMSG_PRIORITY(PRI) \