From a0470e83e68e1e7a24fc0079cf986050d049db9f Mon Sep 17 00:00:00 2001
From: relesgoe <RElesgoe@users.noreply.github.com>
Date: Thu, 11 Jun 2020 16:27:54 -0700
Subject: [PATCH] Initial multi-threading support for SMTP

---
 src/bnetd/smtp.cpp | 159 +++++++++++++++++++++++++++++++++++++++------
 1 file changed, 140 insertions(+), 19 deletions(-)

diff --git a/src/bnetd/smtp.cpp b/src/bnetd/smtp.cpp
index faa3164..8d971c0 100644
--- a/src/bnetd/smtp.cpp
+++ b/src/bnetd/smtp.cpp
@@ -16,19 +16,26 @@
 #include "common/setup_before.h"
 #include "smtp.h"
 
+#include <array>
 #include <ctime>
+#include <mutex>
 #include <string>
+#include <thread>
+#include <tuple>
 
 #include <curl/curl.h>
 #include <fmt/core.h>
 #include <fmt/chrono.h>
 
 #include "common/eventlog.h"
+#include "common/xalloc.h"
 
 #include "server.h"
 
 #include "common/setup_after.h"
 
+#define SMTP_TIMEOUT_DEFAULT 1000
+
 namespace pvpgn
 {
 
@@ -36,6 +43,8 @@ namespace pvpgn
 	{
 
 		static bool is_curl_initialized = false;
+		static std::thread smtp_thread;
+		static std::array<std::tuple<CURLM*, std::mutex>, 2> handles_and_mutexes = {};
 		static std::string smtp_ca_cert_store;
 		static char smtp_server_url[512] = {};
 		static long smtp_port;
@@ -48,14 +57,78 @@ namespace pvpgn
 			std::size_t bytes_remaining;
 		} read_callback_message;
 
+		static void smtp_consumer()
+		{
+			while (is_curl_initialized == true)
+			{
+				for (auto& tuple : handles_and_mutexes)
+				{
+					CURLM* curl_multi_handle = std::get<0>(tuple);
+					std::mutex& curl_multi_handle_mutex = std::get<1>(tuple);
+
+					if (curl_multi_handle_mutex.try_lock() == false)
+					{
+						continue;
+					}
+
+					long timeout = -1;
+					curl_multi_timeout(curl_multi_handle, &timeout);
+					if (timeout == -1)
+					{
+						timeout = SMTP_TIMEOUT_DEFAULT;
+					}
+
+					curl_multi_poll(curl_multi_handle, nullptr, 0, timeout, nullptr);
+
+					int running_handles = 0; // unused
+					curl_multi_perform(curl_multi_handle, &running_handles);
+
+					// After calling curl_multi_perform(), free all curl easy handles that are done.
+					{
+						struct CURLMsg* curlmsg;
+						do
+						{
+							int msgq = 0;
+							curlmsg = curl_multi_info_read(curl_multi_handle, &msgq);
+							if (curlmsg && (curlmsg->msg == CURLMSG_DONE))
+							{
+								CURL* curl = curlmsg->easy_handle;
+
+								running_handles -= 1;
+	
+								curl_multi_remove_handle(curl_multi_handle, curl);
+
+								struct curl_slist* recipient = nullptr;
+								curl_easy_getinfo(curl, CURLINFO_PRIVATE, &recipient);
+								if (recipient != nullptr)
+								{
+									curl_slist_free_all(recipient);
+								}
+
+								curl_easy_cleanup(curl);
+							}
+						} while (curlmsg);
+					}
+
+					curl_multi_handle_mutex.unlock();
+				}
+			}
+		}
+
+		// This function will be called at least twice for every message, the last call must return 0.
 		static std::size_t read_callback(char* buffer, std::size_t size, std::size_t nitems, void* message)
 		{
-			read_callback_message* rcbmessage = reinterpret_cast<read_callback_message*>(message);
+			read_callback_message* rcbmessage = static_cast<read_callback_message*>(message);
 			std::size_t buffer_size = size * nitems;
+			// Copy at least (rcbmessage->bytes_remaining) bytes and at most (buffer_size) bytes
 			std::size_t copy_size = rcbmessage->bytes_remaining <= buffer_size ? rcbmessage->bytes_remaining : buffer_size;
 
-			std::memcpy(buffer, rcbmessage->message.c_str(), copy_size);
-			rcbmessage->bytes_remaining -= copy_size;
+			if (copy_size > 0)
+			{
+				std::memcpy(buffer, rcbmessage->message.c_str(), copy_size);
+				rcbmessage->bytes_remaining -= copy_size;
+			}
+
 			return copy_size;
 		}
 
@@ -73,16 +146,28 @@ namespace pvpgn
 				return false;
 			}
 
-			if (curl_global_init(CURL_GLOBAL_NOTHING) == 0)
+			for (auto& tuple : handles_and_mutexes)
 			{
-				is_curl_initialized = true;
-
-				return true;
+				CURLM** curl_multi_handle = &std::get<0>(tuple);
+				*curl_multi_handle = curl_multi_init();
+				if (*curl_multi_handle == nullptr)
+				{
+					eventlog(eventlog_level_error, __FUNCTION__, "Failed to initialize curl multi handle");
+					return false;
+				}
 			}
-			else
+
+			if (curl_global_init(CURL_GLOBAL_NOTHING) != 0)
 			{
+				eventlog(eventlog_level_error, __FUNCTION__, "Failed to initialize curl global context");
 				return false;
 			}
+
+			smtp_thread = std::thread(smtp_consumer);
+
+			is_curl_initialized = true;
+
+			return true;
 		}
 
 		/**
@@ -109,8 +194,20 @@ namespace pvpgn
 		{
 			if (is_curl_initialized)
 			{
-				curl_global_cleanup();
 				is_curl_initialized = false;
+
+				// wait for the smtp thread to finish executing
+				smtp_thread.join();
+
+				// free all the curl multi handles
+				for (auto& tuple : handles_and_mutexes)
+				{
+					CURLM* curl_multi_handle = std::get<0>(tuple);
+					curl_multi_cleanup(curl_multi_handle);
+				}
+
+				// free curl's global context
+				curl_global_cleanup();
 			}
 		}
 
@@ -151,25 +248,49 @@ namespace pvpgn
 			}
 			curl_easy_setopt(curl, CURLOPT_MAIL_RCPT, recipient);
 
+			// store pointer to recipient struct in the handle and retrieve it later to free the memory
+			curl_easy_setopt(curl, CURLOPT_PRIVATE, reinterpret_cast<void*>(recipient));
+
 			curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback);
 
 			message.insert(0, fmt::format("MIME-Version: 1.0\r\nContent-Type: text/plain; charset=\"UTF-8\"\r\nDate: {:%a, %d %b %Y %T %z}\r\nFrom: {} <{}>\r\nTo: <{}>\r\nSubject: {}\r\n\r\n", *std::localtime(&now), from_name, from_address, to_address, subject));
-			read_callback_message rcbmessage = {};
-			rcbmessage.message = message;
-			rcbmessage.bytes_remaining = message.length() + 1;
-			curl_easy_setopt(curl, CURLOPT_READDATA, reinterpret_cast<void *>(&rcbmessage));
+
+			read_callback_message* rcbmessage = static_cast<read_callback_message*>(xmalloc(sizeof(read_callback_message)));
+			rcbmessage->message = message;
+			rcbmessage->bytes_remaining = message.length() + 1;
+			curl_easy_setopt(curl, CURLOPT_READDATA, static_cast<void*>(rcbmessage));
 
 			curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
 
-			CURLcode result = curl_easy_perform(curl);
-			if (result != CURLE_OK)
+			// lock an available mutex and then add the curl easy handle to the associated curl multi handle
+			while (true)
 			{
-				eventlog(eventlog_level_error, __FUNCTION__, "Failed to send email ({})", curl_easy_strerror(result));
+				for (auto& tuple : handles_and_mutexes)
+				{
+					std::mutex& curl_multi_handle_mutex = std::get<1>(tuple);
+
+					if (curl_multi_handle_mutex.try_lock() == false)
+					{
+						continue;
+					}
+
+					CURLM* curl_multi_handle = std::get<0>(tuple);
+					CURLMcode code = curl_multi_add_handle(curl_multi_handle, curl);
+					if (code == CURLE_OK)
+					{
+						eventlog(eventlog_level_trace, __FUNCTION__, "Added handle to CURL multi handle ({})", curl_multi_handle);
+					}
+					else
+					{
+						eventlog(eventlog_level_error, __FUNCTION__, "Failed to add handle to CURL multi handle (CURLMcode: {})", code);
+					}
+
+					curl_multi_handle_mutex.unlock();
+
+					return;
+				}
 			}
 
-			curl_slist_free_all(recipient);
-
-			curl_easy_cleanup(curl);
 		}
 
 	}