diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp index 926148201..7fcaac50e 100644 --- a/fs_mgr/libsnapshot/snapuserd/Android.bp +++ b/fs_mgr/libsnapshot/snapuserd/Android.bp @@ -62,6 +62,7 @@ cc_library_static { "dm-snapshot-merge/snapuserd_worker.cpp", "dm-snapshot-merge/snapuserd_readahead.cpp", "snapuserd_buffer.cpp", + "user-space-merge/handler_manager.cpp", "user-space-merge/snapuserd_core.cpp", "user-space-merge/snapuserd_dm_user.cpp", "user-space-merge/snapuserd_merge.cpp", diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp index bfe93ebc3..055721471 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp @@ -114,7 +114,7 @@ bool Daemon::StartServerForUserspaceSnapshots(int arg_start, int argc, char** ar return false; } auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]); - if (!handler || !user_server_.StartHandler(handler)) { + if (!handler || !user_server_.StartHandler(parts[0])) { return false; } } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp new file mode 100644 index 000000000..c5150c411 --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp @@ -0,0 +1,371 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "handler_manager.h" + +#include + +#include + +#include "snapuserd_core.h" + +namespace android { +namespace snapshot { + +static constexpr uint8_t kMaxMergeThreads = 2; + +void HandlerThread::FreeResources() { + // Each worker thread holds a reference to snapuserd. + // Clear them so that all the resources + // held by snapuserd is released + if (snapuserd_) { + snapuserd_->FreeResources(); + snapuserd_ = nullptr; + } +} + +SnapshotHandlerManager::SnapshotHandlerManager() { + monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); + if (monitor_merge_event_fd_ == -1) { + PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd"; + } +} + +std::shared_ptr SnapshotHandlerManager::AddHandler( + const std::string& misc_name, const std::string& cow_device_path, + const std::string& backing_device, const std::string& base_path_merge, + int num_worker_threads, bool use_iouring, bool perform_verification) { + auto snapuserd = std::make_shared(misc_name, cow_device_path, backing_device, + base_path_merge, num_worker_threads, + use_iouring, perform_verification); + if (!snapuserd->InitCowDevice()) { + LOG(ERROR) << "Failed to initialize Snapuserd"; + return nullptr; + } + + if (!snapuserd->InitializeWorkers()) { + LOG(ERROR) << "Failed to initialize workers"; + return nullptr; + } + + auto handler = std::make_shared(snapuserd); + { + std::lock_guard lock(lock_); + if (FindHandler(&lock, misc_name) != dm_users_.end()) { + LOG(ERROR) << "Handler already exists: " << misc_name; + return nullptr; + } + dm_users_.push_back(handler); + } + return handler; +} + +bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << misc_name; + return false; + } + if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) { + LOG(ERROR) << "Tried to re-attach control device: " << misc_name; + return false; + } + if (!StartHandler(*iter)) { + return false; + } + return true; +} + +bool SnapshotHandlerManager::StartHandler(const std::shared_ptr& handler) { + if (handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler already attached"; + return false; + } + + handler->snapuserd()->AttachControlDevice(); + + handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler)); + return true; +} + +bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) { + { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + // After merge is completed, we swap dm-user table with + // the underlying dm-linear base device. Hence, worker + // threads would have terminted and was removed from + // the list. + LOG(DEBUG) << "Could not find handler: " << misc_name; + return true; + } + + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } + if (!RemoveAndJoinHandler(misc_name)) { + return false; + } + return true; +} + +void SnapshotHandlerManager::RunThread(std::shared_ptr handler) { + LOG(INFO) << "Entering thread for handler: " << handler->misc_name(); + + if (!handler->snapuserd()->Start()) { + LOG(ERROR) << " Failed to launch all worker threads"; + } + + handler->snapuserd()->CloseFds(); + bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus(); + handler->snapuserd()->UnmapBufferRegion(); + + auto misc_name = handler->misc_name(); + LOG(INFO) << "Handler thread about to exit: " << misc_name; + + { + std::lock_guard lock(lock_); + if (merge_completed) { + num_partitions_merge_complete_ += 1; + active_merge_threads_ -= 1; + WakeupMonitorMergeThread(); + } + handler->SetThreadTerminated(); + auto iter = FindHandler(&lock, handler->misc_name()); + if (iter == dm_users_.end()) { + // RemoveAndJoinHandler() already removed us from the list, and is + // now waiting on a join(), so just return. Additionally, release + // all the resources held by snapuserd object which are shared + // by worker threads. This should be done when the last reference + // of "handler" is released; but we will explicitly release here + // to make sure snapuserd object is freed as it is the biggest + // consumer of memory in the daemon. + handler->FreeResources(); + LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name; + return; + } + + LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name; + + if (handler->snapuserd()->IsAttached()) { + handler->thread().detach(); + } + + // Important: free resources within the lock. This ensures that if + // WaitForDelete() is called, the handler is either in the list, or + // it's not and its resources are guaranteed to be freed. + handler->FreeResources(); + dm_users_.erase(iter); + } +} + +bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << misc_name; + return false; + } + + return StartMerge(&lock, *iter); +} + +bool SnapshotHandlerManager::StartMerge(std::lock_guard* proof_of_lock, + const std::shared_ptr& handler) { + CHECK(proof_of_lock); + + if (!handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started"; + return false; + } + + handler->snapuserd()->MonitorMerge(); + + if (!is_merge_monitor_started_) { + std::thread(&SnapshotHandlerManager::MonitorMerge, this).detach(); + is_merge_monitor_started_ = true; + } + + merge_handlers_.push(handler); + WakeupMonitorMergeThread(); + return true; +} + +void SnapshotHandlerManager::WakeupMonitorMergeThread() { + uint64_t notify = 1; + ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify))); + if (rc < 0) { + PLOG(FATAL) << "failed to notify monitor merge thread"; + } +} + +void SnapshotHandlerManager::MonitorMerge() { + while (!stop_monitor_merge_thread_) { + uint64_t testVal; + ssize_t ret = + TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal))); + if (ret == -1) { + PLOG(FATAL) << "Failed to read from eventfd"; + } else if (ret == 0) { + LOG(FATAL) << "Hit EOF on eventfd"; + } + + LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_; + { + std::lock_guard lock(lock_); + while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) { + auto handler = merge_handlers_.front(); + merge_handlers_.pop(); + + if (!handler->snapuserd()) { + LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name(); + continue; + } + + LOG(INFO) << "Starting merge for partition: " + << handler->snapuserd()->GetMiscName(); + handler->snapuserd()->InitiateMerge(); + active_merge_threads_ += 1; + } + } + } + + LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size(); +} + +std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << misc_name; + return {}; + } + + return (*iter)->snapuserd()->GetMergeStatus(); +} + +double SnapshotHandlerManager::GetMergePercentage() { + std::lock_guard lock(lock_); + + double percentage = 0.0; + int n = 0; + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + auto& th = (*iter)->thread(); + if (th.joinable()) { + // Merge percentage by individual partitions wherein merge is still + // in-progress + percentage += (*iter)->snapuserd()->GetMergePercentage(); + n += 1; + } + } + + // Calculate final merge including those partitions where merge was already + // completed - num_partitions_merge_complete_ will track them when each + // thread exists in RunThread. + int total_partitions = n + num_partitions_merge_complete_; + + if (total_partitions) { + percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions; + } + + LOG(DEBUG) << "Merge %: " << percentage + << " num_partitions_merge_complete_: " << num_partitions_merge_complete_ + << " total_partitions: " << total_partitions << " n: " << n; + return percentage; +} + +bool SnapshotHandlerManager::GetVerificationStatus() { + std::lock_guard lock(lock_); + + bool status = true; + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + auto& th = (*iter)->thread(); + if (th.joinable() && status) { + status = (*iter)->snapuserd()->CheckPartitionVerification() && status; + } else { + // return immediately if there is a failure + return false; + } + } + + return status; +} + +bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) { + std::shared_ptr handler; + { + std::lock_guard lock(lock_); + + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + // Client already deleted. + return true; + } + handler = std::move(*iter); + dm_users_.erase(iter); + } + + auto& th = handler->thread(); + if (th.joinable()) { + th.join(); + } + return true; +} + +void SnapshotHandlerManager::TerminateMergeThreads() { + std::lock_guard guard(lock_); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } +} + +void SnapshotHandlerManager::JoinAllThreads() { + // Acquire the thread list within the lock. + std::vector> dm_users; + { + std::lock_guard guard(lock_); + dm_users = std::move(dm_users_); + } + + for (auto& client : dm_users) { + auto& th = client->thread(); + + if (th.joinable()) th.join(); + } + + stop_monitor_merge_thread_ = true; + WakeupMonitorMergeThread(); +} + +auto SnapshotHandlerManager::FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name) -> HandlerList::iterator { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if ((*iter)->misc_name() == misc_name) { + return iter; + } + } + return dm_users_.end(); +} + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h new file mode 100644 index 000000000..b7ddac19f --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h @@ -0,0 +1,131 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace android { +namespace snapshot { + +class SnapshotHandler; + +class HandlerThread { + public: + explicit HandlerThread(std::shared_ptr snapuserd); + + void FreeResources(); + const std::shared_ptr& snapuserd() const { return snapuserd_; } + std::thread& thread() { return thread_; } + + const std::string& misc_name() const { return misc_name_; } + bool ThreadTerminated() { return thread_terminated_; } + void SetThreadTerminated() { thread_terminated_ = true; } + + private: + std::thread thread_; + std::shared_ptr snapuserd_; + std::string misc_name_; + bool thread_terminated_ = false; +}; + +class ISnapshotHandlerManager { + public: + virtual ~ISnapshotHandlerManager() {} + + // Add a new snapshot handler but do not start serving requests yet. + virtual std::shared_ptr AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge, + int num_worker_threads, bool use_iouring, + bool perform_verification) = 0; + + // Start serving requests on a snapshot handler. + virtual bool StartHandler(const std::string& misc_name) = 0; + + // Stop serving requests on a snapshot handler and remove it. + virtual bool DeleteHandler(const std::string& misc_name) = 0; + + // Begin merging blocks on the given snapshot handler. + virtual bool InitiateMerge(const std::string& misc_name) = 0; + + // Return a string containing a status code indicating the merge status + // on the handler. Returns empty on error. + virtual std::string GetMergeStatus(const std::string& misc_name) = 0; + + // Wait until all handlers have terminated. + virtual void JoinAllThreads() = 0; + + // Stop any in-progress merge threads. + virtual void TerminateMergeThreads() = 0; + + // Returns the merge progress across all merging snapshot handlers. + virtual double GetMergePercentage() = 0; + + // Returns whether all snapshots have verified. + virtual bool GetVerificationStatus() = 0; +}; + +class SnapshotHandlerManager final : public ISnapshotHandlerManager { + public: + SnapshotHandlerManager(); + std::shared_ptr AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge, + int num_worker_threads, bool use_iouring, + bool perform_verification) override; + bool StartHandler(const std::string& misc_name) override; + bool DeleteHandler(const std::string& misc_name) override; + bool InitiateMerge(const std::string& misc_name) override; + std::string GetMergeStatus(const std::string& misc_name) override; + void JoinAllThreads() override; + void TerminateMergeThreads() override; + double GetMergePercentage() override; + bool GetVerificationStatus() override; + + private: + bool StartHandler(const std::shared_ptr& handler); + void RunThread(std::shared_ptr handler); + bool StartMerge(std::lock_guard* proof_of_lock, + const std::shared_ptr& handler); + void MonitorMerge(); + void WakeupMonitorMergeThread(); + bool RemoveAndJoinHandler(const std::string& misc_name); + + // Find a HandlerThread within a lock. + using HandlerList = std::vector>; + HandlerList::iterator FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name); + + std::mutex lock_; + HandlerList dm_users_; + + bool is_merge_monitor_started_ = false; + bool stop_monitor_merge_thread_ = false; + int active_merge_threads_ = 0; + int num_partitions_merge_complete_ = 0; + std::queue> merge_handlers_; + android::base::unique_fd monitor_merge_event_fd_; +}; + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp index b7ce2109a..d87990aca 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp @@ -45,28 +45,9 @@ using namespace std::string_literals; using android::base::borrowed_fd; using android::base::unique_fd; -DaemonOps UserSnapshotServer::Resolveop(std::string& input) { - if (input == "init") return DaemonOps::INIT; - if (input == "start") return DaemonOps::START; - if (input == "stop") return DaemonOps::STOP; - if (input == "query") return DaemonOps::QUERY; - if (input == "delete") return DaemonOps::DELETE; - if (input == "detach") return DaemonOps::DETACH; - if (input == "supports") return DaemonOps::SUPPORTS; - if (input == "initiate_merge") return DaemonOps::INITIATE; - if (input == "merge_percent") return DaemonOps::PERCENTAGE; - if (input == "getstatus") return DaemonOps::GETSTATUS; - if (input == "update-verify") return DaemonOps::UPDATE_VERIFY; - - return DaemonOps::INVALID; -} - UserSnapshotServer::UserSnapshotServer() { - monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); - if (monitor_merge_event_fd_ == -1) { - PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd"; - } terminating_ = false; + handlers_ = std::make_unique(); } UserSnapshotServer::~UserSnapshotServer() { @@ -99,7 +80,7 @@ void UserSnapshotServer::Parsemsg(std::string const& msg, const char delim, void UserSnapshotServer::ShutdownThreads() { terminating_ = true; - JoinAllThreads(); + handlers_->JoinAllThreads(); } HandlerThread::HandlerThread(std::shared_ptr snapuserd) @@ -135,226 +116,118 @@ bool UserSnapshotServer::Receivemsg(android::base::borrowed_fd fd, const std::st std::vector out; Parsemsg(str, delim, out); - DaemonOps op = Resolveop(out[0]); - switch (op) { - case DaemonOps::INIT: { - // Message format: - // init,,,, - // - // Reads the metadata and send the number of sectors - if (out.size() != 5) { - LOG(ERROR) << "Malformed init message, " << out.size() << " parts"; - return Sendmsg(fd, "fail"); - } - - auto handler = AddHandler(out[1], out[2], out[3], out[4]); - if (!handler) { - return Sendmsg(fd, "fail"); - } - - auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors()); - return Sendmsg(fd, retval); - } - case DaemonOps::START: { - // Message format: - // start, - // - // Start the new thread which binds to dm-user misc device - if (out.size() != 2) { - LOG(ERROR) << "Malformed start message, " << out.size() << " parts"; - return Sendmsg(fd, "fail"); - } - - std::lock_guard lock(lock_); - auto iter = FindHandler(&lock, out[1]); - if (iter == dm_users_.end()) { - LOG(ERROR) << "Could not find handler: " << out[1]; - return Sendmsg(fd, "fail"); - } - if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) { - LOG(ERROR) << "Tried to re-attach control device: " << out[1]; - return Sendmsg(fd, "fail"); - } - if (!StartHandler(*iter)) { - return Sendmsg(fd, "fail"); - } - return Sendmsg(fd, "success"); - } - case DaemonOps::STOP: { - // Message format: stop - // - // Stop all the threads gracefully and then shutdown the - // main thread - SetTerminating(); - ShutdownThreads(); - return true; - } - case DaemonOps::QUERY: { - // Message format: query - // - // As part of transition, Second stage daemon will be - // created before terminating the first stage daemon. Hence, - // for a brief period client may have to distiguish between - // first stage daemon and second stage daemon. - // - // Second stage daemon is marked as active and hence will - // be ready to receive control message. - return Sendmsg(fd, GetDaemonStatus()); - } - case DaemonOps::DELETE: { - // Message format: - // delete, - if (out.size() != 2) { - LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; - return Sendmsg(fd, "fail"); - } - { - std::lock_guard lock(lock_); - auto iter = FindHandler(&lock, out[1]); - if (iter == dm_users_.end()) { - // After merge is completed, we swap dm-user table with - // the underlying dm-linear base device. Hence, worker - // threads would have terminted and was removed from - // the list. - LOG(DEBUG) << "Could not find handler: " << out[1]; - return Sendmsg(fd, "success"); - } - - if (!(*iter)->ThreadTerminated()) { - (*iter)->snapuserd()->NotifyIOTerminated(); - } - } - if (!RemoveAndJoinHandler(out[1])) { - return Sendmsg(fd, "fail"); - } - return Sendmsg(fd, "success"); - } - case DaemonOps::DETACH: { - std::lock_guard lock(lock_); - TerminateMergeThreads(&lock); - terminating_ = true; - return true; - } - case DaemonOps::SUPPORTS: { - if (out.size() != 2) { - LOG(ERROR) << "Malformed supports message, " << out.size() << " parts"; - return Sendmsg(fd, "fail"); - } - if (out[1] == "second_stage_socket_handoff") { - return Sendmsg(fd, "success"); - } + const auto& cmd = out[0]; + if (cmd == "init") { + // Message format: + // init,,,, + // + // Reads the metadata and send the number of sectors + if (out.size() != 5) { + LOG(ERROR) << "Malformed init message, " << out.size() << " parts"; return Sendmsg(fd, "fail"); } - case DaemonOps::INITIATE: { - if (out.size() != 2) { - LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts"; - return Sendmsg(fd, "fail"); - } - if (out[0] == "initiate_merge") { - std::lock_guard lock(lock_); - auto iter = FindHandler(&lock, out[1]); - if (iter == dm_users_.end()) { - LOG(ERROR) << "Could not find handler: " << out[1]; - return Sendmsg(fd, "fail"); - } - if (!StartMerge(&lock, *iter)) { - return Sendmsg(fd, "fail"); - } - - return Sendmsg(fd, "success"); - } + auto handler = AddHandler(out[1], out[2], out[3], out[4]); + if (!handler) { return Sendmsg(fd, "fail"); } - case DaemonOps::PERCENTAGE: { - std::lock_guard lock(lock_); - double percentage = GetMergePercentage(&lock); - return Sendmsg(fd, std::to_string(percentage)); + auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors()); + return Sendmsg(fd, retval); + } else if (cmd == "start") { + // Message format: + // start, + // + // Start the new thread which binds to dm-user misc device + if (out.size() != 2) { + LOG(ERROR) << "Malformed start message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); } - case DaemonOps::GETSTATUS: { - // Message format: - // getstatus, - if (out.size() != 2) { - LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; - return Sendmsg(fd, "snapshot-merge-failed"); - } - { - std::lock_guard lock(lock_); - auto iter = FindHandler(&lock, out[1]); - if (iter == dm_users_.end()) { - LOG(ERROR) << "Could not find handler: " << out[1]; - return Sendmsg(fd, "snapshot-merge-failed"); - } - std::string merge_status = GetMergeStatus(*iter); - return Sendmsg(fd, merge_status); - } + if (!handlers_->StartHandler(out[1])) { + return Sendmsg(fd, "fail"); } - case DaemonOps::UPDATE_VERIFY: { - std::lock_guard lock(lock_); - if (!UpdateVerification(&lock)) { - return Sendmsg(fd, "fail"); - } - + return Sendmsg(fd, "success"); + } else if (cmd == "stop") { + // Message format: stop + // + // Stop all the threads gracefully and then shutdown the + // main thread + SetTerminating(); + ShutdownThreads(); + return true; + } else if (cmd == "query") { + // Message format: query + // + // As part of transition, Second stage daemon will be + // created before terminating the first stage daemon. Hence, + // for a brief period client may have to distiguish between + // first stage daemon and second stage daemon. + // + // Second stage daemon is marked as active and hence will + // be ready to receive control message. + return Sendmsg(fd, GetDaemonStatus()); + } else if (cmd == "delete") { + // Message format: + // delete, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (!handlers_->DeleteHandler(out[1])) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } else if (cmd == "detach") { + handlers_->TerminateMergeThreads(); + terminating_ = true; + return true; + } else if (cmd == "supports") { + if (out.size() != 2) { + LOG(ERROR) << "Malformed supports message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[1] == "second_stage_socket_handoff") { return Sendmsg(fd, "success"); } - default: { - LOG(ERROR) << "Received unknown message type from client"; - Sendmsg(fd, "fail"); - return false; + return Sendmsg(fd, "fail"); + } else if (cmd == "initiate_merge") { + if (out.size() != 2) { + LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); } - } -} - -void UserSnapshotServer::RunThread(std::shared_ptr handler) { - LOG(INFO) << "Entering thread for handler: " << handler->misc_name(); - - if (!handler->snapuserd()->Start()) { - LOG(ERROR) << " Failed to launch all worker threads"; - } - - handler->snapuserd()->CloseFds(); - bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus(); - handler->snapuserd()->UnmapBufferRegion(); - - auto misc_name = handler->misc_name(); - LOG(INFO) << "Handler thread about to exit: " << misc_name; - - { - std::lock_guard lock(lock_); - if (merge_completed) { - num_partitions_merge_complete_ += 1; - active_merge_threads_ -= 1; - WakeupMonitorMergeThread(); + if (out[0] == "initiate_merge") { + if (!handlers_->InitiateMerge(out[1])) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); } - handler->SetThreadTerminated(); - auto iter = FindHandler(&lock, handler->misc_name()); - if (iter == dm_users_.end()) { - // RemoveAndJoinHandler() already removed us from the list, and is - // now waiting on a join(), so just return. Additionally, release - // all the resources held by snapuserd object which are shared - // by worker threads. This should be done when the last reference - // of "handler" is released; but we will explicitly release here - // to make sure snapuserd object is freed as it is the biggest - // consumer of memory in the daemon. - handler->FreeResources(); - LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name; - return; + return Sendmsg(fd, "fail"); + } else if (cmd == "merge_percent") { + double percentage = handlers_->GetMergePercentage(); + return Sendmsg(fd, std::to_string(percentage)); + } else if (cmd == "getstatus") { + // Message format: + // getstatus, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "snapshot-merge-failed"); } - - LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name; - - if (handler->snapuserd()->IsAttached()) { - handler->thread().detach(); + auto status = handlers_->GetMergeStatus(out[1]); + if (status.empty()) { + return Sendmsg(fd, "snapshot-merge-failed"); } - - // Important: free resources within the lock. This ensures that if - // WaitForDelete() is called, the handler is either in the list, or - // it's not and its resources are guaranteed to be freed. - handler->FreeResources(); - dm_users_.erase(iter); + return Sendmsg(fd, status); + } else if (cmd == "update-verify") { + if (!handlers_->GetVerificationStatus()) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } else { + LOG(ERROR) << "Received unknown message type from client"; + Sendmsg(fd, "fail"); + return false; } } @@ -423,28 +296,10 @@ bool UserSnapshotServer::Run() { } } - JoinAllThreads(); + handlers_->JoinAllThreads(); return true; } -void UserSnapshotServer::JoinAllThreads() { - // Acquire the thread list within the lock. - std::vector> dm_users; - { - std::lock_guard guard(lock_); - dm_users = std::move(dm_users_); - } - - for (auto& client : dm_users) { - auto& th = client->thread(); - - if (th.joinable()) th.join(); - } - - stop_monitor_merge_thread_ = true; - WakeupMonitorMergeThread(); -} - void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) { struct pollfd p = {}; p.fd = fd.get(); @@ -506,185 +361,13 @@ std::shared_ptr UserSnapshotServer::AddHandler(const std::string& perform_verification = false; } - auto snapuserd = std::make_shared(misc_name, cow_device_path, backing_device, - base_path_merge, num_worker_threads, - io_uring_enabled_, perform_verification); - if (!snapuserd->InitCowDevice()) { - LOG(ERROR) << "Failed to initialize Snapuserd"; - return nullptr; - } - - if (!snapuserd->InitializeWorkers()) { - LOG(ERROR) << "Failed to initialize workers"; - return nullptr; - } - - auto handler = std::make_shared(snapuserd); - { - std::lock_guard lock(lock_); - if (FindHandler(&lock, misc_name) != dm_users_.end()) { - LOG(ERROR) << "Handler already exists: " << misc_name; - return nullptr; - } - dm_users_.push_back(handler); - } - return handler; -} - -bool UserSnapshotServer::StartHandler(const std::shared_ptr& handler) { - if (handler->snapuserd()->IsAttached()) { - LOG(ERROR) << "Handler already attached"; - return false; - } - - handler->snapuserd()->AttachControlDevice(); - - handler->thread() = std::thread(std::bind(&UserSnapshotServer::RunThread, this, handler)); - return true; -} - -bool UserSnapshotServer::StartMerge(std::lock_guard* proof_of_lock, - const std::shared_ptr& handler) { - CHECK(proof_of_lock); - - if (!handler->snapuserd()->IsAttached()) { - LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started"; - return false; - } - - handler->snapuserd()->MonitorMerge(); - - if (!is_merge_monitor_started_.has_value()) { - std::thread(&UserSnapshotServer::MonitorMerge, this).detach(); - is_merge_monitor_started_ = true; - } - - merge_handlers_.push(handler); - WakeupMonitorMergeThread(); - return true; -} - -auto UserSnapshotServer::FindHandler(std::lock_guard* proof_of_lock, - const std::string& misc_name) -> HandlerList::iterator { - CHECK(proof_of_lock); - - for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { - if ((*iter)->misc_name() == misc_name) { - return iter; - } - } - return dm_users_.end(); -} - -void UserSnapshotServer::TerminateMergeThreads(std::lock_guard* proof_of_lock) { - CHECK(proof_of_lock); - - for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { - if (!(*iter)->ThreadTerminated()) { - (*iter)->snapuserd()->NotifyIOTerminated(); - } - } -} - -std::string UserSnapshotServer::GetMergeStatus(const std::shared_ptr& handler) { - return handler->snapuserd()->GetMergeStatus(); -} - -double UserSnapshotServer::GetMergePercentage(std::lock_guard* proof_of_lock) { - CHECK(proof_of_lock); - double percentage = 0.0; - int n = 0; - - for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { - auto& th = (*iter)->thread(); - if (th.joinable()) { - // Merge percentage by individual partitions wherein merge is still - // in-progress - percentage += (*iter)->snapuserd()->GetMergePercentage(); - n += 1; - } - } - - // Calculate final merge including those partitions where merge was already - // completed - num_partitions_merge_complete_ will track them when each - // thread exists in RunThread. - int total_partitions = n + num_partitions_merge_complete_; - - if (total_partitions) { - percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions; - } - - LOG(DEBUG) << "Merge %: " << percentage - << " num_partitions_merge_complete_: " << num_partitions_merge_complete_ - << " total_partitions: " << total_partitions << " n: " << n; - return percentage; -} - -bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) { - std::shared_ptr handler; - { - std::lock_guard lock(lock_); - - auto iter = FindHandler(&lock, misc_name); - if (iter == dm_users_.end()) { - // Client already deleted. - return true; - } - handler = std::move(*iter); - dm_users_.erase(iter); - } - - auto& th = handler->thread(); - if (th.joinable()) { - th.join(); - } - return true; -} - -void UserSnapshotServer::WakeupMonitorMergeThread() { - uint64_t notify = 1; - ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify))); - if (rc < 0) { - PLOG(FATAL) << "failed to notify monitor merge thread"; - } -} - -void UserSnapshotServer::MonitorMerge() { - while (!stop_monitor_merge_thread_) { - uint64_t testVal; - ssize_t ret = - TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal))); - if (ret == -1) { - PLOG(FATAL) << "Failed to read from eventfd"; - } else if (ret == 0) { - LOG(FATAL) << "Hit EOF on eventfd"; - } - - LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_; - { - std::lock_guard lock(lock_); - while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) { - auto handler = merge_handlers_.front(); - merge_handlers_.pop(); - - if (!handler->snapuserd()) { - LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name(); - continue; - } - - LOG(INFO) << "Starting merge for partition: " - << handler->snapuserd()->GetMiscName(); - handler->snapuserd()->InitiateMerge(); - active_merge_threads_ += 1; - } - } - } - - LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size(); + return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge, + num_worker_threads, io_uring_enabled_, perform_verification); } bool UserSnapshotServer::WaitForSocket() { - auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); }); + auto scope_guard = + android::base::make_scope_guard([this]() -> void { handlers_->JoinAllThreads(); }); auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy; @@ -781,21 +464,8 @@ bool UserSnapshotServer::RunForSocketHandoff() { return true; } -bool UserSnapshotServer::UpdateVerification(std::lock_guard* proof_of_lock) { - CHECK(proof_of_lock); - - bool status = true; - for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { - auto& th = (*iter)->thread(); - if (th.joinable() && status) { - status = (*iter)->snapuserd()->CheckPartitionVerification() && status; - } else { - // return immediately if there is a failure - return false; - } - } - - return status; +bool UserSnapshotServer::StartHandler(const std::string& misc_name) { + return handlers_->StartHandler(misc_name); } } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h index 12c3903c4..988c01a99 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h @@ -31,6 +31,7 @@ #include #include +#include "handler_manager.h" #include "snapuserd_core.h" namespace android { @@ -39,48 +40,6 @@ namespace snapshot { static constexpr uint32_t kMaxPacketSize = 512; static constexpr uint8_t kMaxMergeThreads = 2; -enum class DaemonOps { - INIT, - START, - QUERY, - STOP, - DELETE, - DETACH, - SUPPORTS, - INITIATE, - PERCENTAGE, - GETSTATUS, - UPDATE_VERIFY, - INVALID, -}; - -class HandlerThread { - public: - explicit HandlerThread(std::shared_ptr snapuserd); - - void FreeResources() { - // Each worker thread holds a reference to snapuserd. - // Clear them so that all the resources - // held by snapuserd is released - if (snapuserd_) { - snapuserd_->FreeResources(); - snapuserd_ = nullptr; - } - } - const std::shared_ptr& snapuserd() const { return snapuserd_; } - std::thread& thread() { return thread_; } - - const std::string& misc_name() const { return misc_name_; } - bool ThreadTerminated() { return thread_terminated_; } - void SetThreadTerminated() { thread_terminated_ = true; } - - private: - std::thread thread_; - std::shared_ptr snapuserd_; - std::string misc_name_; - bool thread_terminated_ = false; -}; - class UserSnapshotServer { private: android::base::unique_fd sockfd_; @@ -88,21 +47,12 @@ class UserSnapshotServer { volatile bool received_socket_signal_ = false; std::vector watched_fds_; bool is_socket_present_ = false; - int num_partitions_merge_complete_ = 0; - int active_merge_threads_ = 0; - bool stop_monitor_merge_thread_ = false; bool is_server_running_ = false; bool io_uring_enabled_ = false; - std::optional is_merge_monitor_started_; - - android::base::unique_fd monitor_merge_event_fd_; + std::unique_ptr handlers_; std::mutex lock_; - using HandlerList = std::vector>; - HandlerList dm_users_; - std::queue> merge_handlers_; - void AddWatchedFd(android::base::borrowed_fd fd, int events); void AcceptClient(); bool HandleClient(android::base::borrowed_fd fd, int revents); @@ -111,28 +61,14 @@ class UserSnapshotServer { bool Receivemsg(android::base::borrowed_fd fd, const std::string& str); void ShutdownThreads(); - bool RemoveAndJoinHandler(const std::string& control_device); - DaemonOps Resolveop(std::string& input); std::string GetDaemonStatus(); void Parsemsg(std::string const& msg, const char delim, std::vector& out); bool IsTerminating() { return terminating_; } - void RunThread(std::shared_ptr handler); - void MonitorMerge(); - void JoinAllThreads(); bool StartWithSocket(bool start_listening); - // Find a HandlerThread within a lock. - HandlerList::iterator FindHandler(std::lock_guard* proof_of_lock, - const std::string& misc_name); - - double GetMergePercentage(std::lock_guard* proof_of_lock); - void TerminateMergeThreads(std::lock_guard* proof_of_lock); - - bool UpdateVerification(std::lock_guard* proof_of_lock); - public: UserSnapshotServer(); ~UserSnapshotServer(); @@ -147,12 +83,8 @@ class UserSnapshotServer { const std::string& cow_device_path, const std::string& backing_device, const std::string& base_path_merge); - bool StartHandler(const std::shared_ptr& handler); - bool StartMerge(std::lock_guard* proof_of_lock, - const std::shared_ptr& handler); - std::string GetMergeStatus(const std::shared_ptr& handler); + bool StartHandler(const std::string& misc_name); - void WakeupMonitorMergeThread(); void SetTerminating() { terminating_ = true; } void ReceivedSocketSignal() { received_socket_signal_ = true; } void SetServerRunning() { is_server_running_ = true; }