Initial multi-threading support for SMTP

This commit is contained in:
relesgoe 2020-06-11 16:27:54 -07:00
parent c37269f5ed
commit a0470e83e6

View file

@ -16,19 +16,26 @@
#include "common/setup_before.h"
#include "smtp.h"
#include <array>
#include <ctime>
#include <mutex>
#include <string>
#include <thread>
#include <tuple>
#include <curl/curl.h>
#include <fmt/core.h>
#include <fmt/chrono.h>
#include "common/eventlog.h"
#include "common/xalloc.h"
#include "server.h"
#include "common/setup_after.h"
#define SMTP_TIMEOUT_DEFAULT 1000
namespace pvpgn
{
@ -36,6 +43,8 @@ namespace pvpgn
{
static bool is_curl_initialized = false;
static std::thread smtp_thread;
static std::array<std::tuple<CURLM*, std::mutex>, 2> handles_and_mutexes = {};
static std::string smtp_ca_cert_store;
static char smtp_server_url[512] = {};
static long smtp_port;
@ -48,14 +57,78 @@ namespace pvpgn
std::size_t bytes_remaining;
} read_callback_message;
static void smtp_consumer()
{
while (is_curl_initialized == true)
{
for (auto& tuple : handles_and_mutexes)
{
CURLM* curl_multi_handle = std::get<0>(tuple);
std::mutex& curl_multi_handle_mutex = std::get<1>(tuple);
if (curl_multi_handle_mutex.try_lock() == false)
{
continue;
}
long timeout = -1;
curl_multi_timeout(curl_multi_handle, &timeout);
if (timeout == -1)
{
timeout = SMTP_TIMEOUT_DEFAULT;
}
curl_multi_poll(curl_multi_handle, nullptr, 0, timeout, nullptr);
int running_handles = 0; // unused
curl_multi_perform(curl_multi_handle, &running_handles);
// After calling curl_multi_perform(), free all curl easy handles that are done.
{
struct CURLMsg* curlmsg;
do
{
int msgq = 0;
curlmsg = curl_multi_info_read(curl_multi_handle, &msgq);
if (curlmsg && (curlmsg->msg == CURLMSG_DONE))
{
CURL* curl = curlmsg->easy_handle;
running_handles -= 1;
curl_multi_remove_handle(curl_multi_handle, curl);
struct curl_slist* recipient = nullptr;
curl_easy_getinfo(curl, CURLINFO_PRIVATE, &recipient);
if (recipient != nullptr)
{
curl_slist_free_all(recipient);
}
curl_easy_cleanup(curl);
}
} while (curlmsg);
}
curl_multi_handle_mutex.unlock();
}
}
}
// This function will be called at least twice for every message, the last call must return 0.
static std::size_t read_callback(char* buffer, std::size_t size, std::size_t nitems, void* message)
{
read_callback_message* rcbmessage = reinterpret_cast<read_callback_message*>(message);
read_callback_message* rcbmessage = static_cast<read_callback_message*>(message);
std::size_t buffer_size = size * nitems;
// Copy at least (rcbmessage->bytes_remaining) bytes and at most (buffer_size) bytes
std::size_t copy_size = rcbmessage->bytes_remaining <= buffer_size ? rcbmessage->bytes_remaining : buffer_size;
std::memcpy(buffer, rcbmessage->message.c_str(), copy_size);
rcbmessage->bytes_remaining -= copy_size;
if (copy_size > 0)
{
std::memcpy(buffer, rcbmessage->message.c_str(), copy_size);
rcbmessage->bytes_remaining -= copy_size;
}
return copy_size;
}
@ -73,16 +146,28 @@ namespace pvpgn
return false;
}
if (curl_global_init(CURL_GLOBAL_NOTHING) == 0)
for (auto& tuple : handles_and_mutexes)
{
is_curl_initialized = true;
return true;
CURLM** curl_multi_handle = &std::get<0>(tuple);
*curl_multi_handle = curl_multi_init();
if (*curl_multi_handle == nullptr)
{
eventlog(eventlog_level_error, __FUNCTION__, "Failed to initialize curl multi handle");
return false;
}
}
else
if (curl_global_init(CURL_GLOBAL_NOTHING) != 0)
{
eventlog(eventlog_level_error, __FUNCTION__, "Failed to initialize curl global context");
return false;
}
smtp_thread = std::thread(smtp_consumer);
is_curl_initialized = true;
return true;
}
/**
@ -109,8 +194,20 @@ namespace pvpgn
{
if (is_curl_initialized)
{
curl_global_cleanup();
is_curl_initialized = false;
// wait for the smtp thread to finish executing
smtp_thread.join();
// free all the curl multi handles
for (auto& tuple : handles_and_mutexes)
{
CURLM* curl_multi_handle = std::get<0>(tuple);
curl_multi_cleanup(curl_multi_handle);
}
// free curl's global context
curl_global_cleanup();
}
}
@ -151,25 +248,49 @@ namespace pvpgn
}
curl_easy_setopt(curl, CURLOPT_MAIL_RCPT, recipient);
// store pointer to recipient struct in the handle and retrieve it later to free the memory
curl_easy_setopt(curl, CURLOPT_PRIVATE, reinterpret_cast<void*>(recipient));
curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback);
message.insert(0, fmt::format("MIME-Version: 1.0\r\nContent-Type: text/plain; charset=\"UTF-8\"\r\nDate: {:%a, %d %b %Y %T %z}\r\nFrom: {} <{}>\r\nTo: <{}>\r\nSubject: {}\r\n\r\n", *std::localtime(&now), from_name, from_address, to_address, subject));
read_callback_message rcbmessage = {};
rcbmessage.message = message;
rcbmessage.bytes_remaining = message.length() + 1;
curl_easy_setopt(curl, CURLOPT_READDATA, reinterpret_cast<void *>(&rcbmessage));
read_callback_message* rcbmessage = static_cast<read_callback_message*>(xmalloc(sizeof(read_callback_message)));
rcbmessage->message = message;
rcbmessage->bytes_remaining = message.length() + 1;
curl_easy_setopt(curl, CURLOPT_READDATA, static_cast<void*>(rcbmessage));
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
CURLcode result = curl_easy_perform(curl);
if (result != CURLE_OK)
// lock an available mutex and then add the curl easy handle to the associated curl multi handle
while (true)
{
eventlog(eventlog_level_error, __FUNCTION__, "Failed to send email ({})", curl_easy_strerror(result));
for (auto& tuple : handles_and_mutexes)
{
std::mutex& curl_multi_handle_mutex = std::get<1>(tuple);
if (curl_multi_handle_mutex.try_lock() == false)
{
continue;
}
CURLM* curl_multi_handle = std::get<0>(tuple);
CURLMcode code = curl_multi_add_handle(curl_multi_handle, curl);
if (code == CURLE_OK)
{
eventlog(eventlog_level_trace, __FUNCTION__, "Added handle to CURL multi handle ({})", curl_multi_handle);
}
else
{
eventlog(eventlog_level_error, __FUNCTION__, "Failed to add handle to CURL multi handle (CURLMcode: {})", code);
}
curl_multi_handle_mutex.unlock();
return;
}
}
curl_slist_free_all(recipient);
curl_easy_cleanup(curl);
}
}