59 using namespace std::string_literals;
63 auto constexpr app_name =
"doca_storage_target_rdma";
65 auto constexpr default_storage_block_size = 4096;
66 auto constexpr default_storage_block_count = 128;
68 static_assert(
sizeof(
void *) == 8,
"Expected a pointer to occupy 8 bytes");
76 struct target_rdma_app_configuration {
77 std::vector<uint32_t> core_set = {};
78 std::string device_id = {};
79 std::string storage_content_file_name = {};
80 uint32_t block_count = {};
81 uint32_t block_size = {};
82 uint16_t listen_port = {};
83 std::vector<uint8_t> content = {};
89 struct target_rdma_worker_stats {
90 uint32_t core_idx = 0;
91 uint64_t pe_hit_count = 0;
92 uint64_t pe_miss_count = 0;
93 uint64_t operation_count = 0;
99 struct alignas(storage::cache_line_size / 2) transfer_context {
100 doca_rdma_task_write *write_task =
nullptr;
101 doca_rdma_task_read *read_task =
nullptr;
102 doca_buf *host_buf =
nullptr;
103 doca_buf *storage_buf =
nullptr;
109 class target_rdma_worker {
116 struct alignas(storage::cache_line_size) hot_data {
118 uint64_t pe_hit_count;
119 uint64_t pe_miss_count;
120 char *remote_memory_start_addr;
121 char *local_memory_start_addr;
122 uint64_t completed_transaction_count;
123 uint32_t in_flight_transaction_count;
125 std::atomic_bool run_flag;
136 hot_data(hot_data
const &) =
delete;
142 hot_data(hot_data &&other) noexcept;
147 hot_data &operator=(hot_data
const &) =
delete;
154 hot_data &operator=(hot_data &&other) noexcept;
157 "Expected target_rdma_worker::hot_data to occupy one cache line");
162 ~target_rdma_worker();
167 target_rdma_worker() =
delete;
175 target_rdma_worker(doca_dev *dev, uint32_t task_count, doca_mmap *remote_mmap, doca_mmap *local_mmap);
180 target_rdma_worker(target_rdma_worker
const &) =
delete;
186 [[maybe_unused]] target_rdma_worker(target_rdma_worker &&other) noexcept;
191 target_rdma_worker &operator=(target_rdma_worker
const &) =
delete;
198 [[maybe_unused]] target_rdma_worker &operator=(target_rdma_worker &&other) noexcept;
208 std::vector<uint8_t>
const &remote_conn_details);
219 [[nodiscard]]
doca_error_t get_rdma_connection_state() const noexcept;
224 void stop_processing(
void) noexcept;
229 void prepare_and_submit_tasks(
void);
236 void prepare_thread_proc(uint32_t core_id);
241 void start_thread_proc(
void);
248 [[nodiscard]] hot_data const &get_hot_data() const noexcept;
252 uint8_t *m_io_message_region;
253 doca_mmap *m_io_message_mmap;
254 doca_buf_inventory *m_buf_inv;
255 std::vector<doca_buf *> m_bufs;
256 storage::rdma_conn_pair m_rdma_ctrl_ctx;
257 storage::rdma_conn_pair m_rdma_data_ctx;
258 doca_mmap *m_local_mmap;
259 doca_mmap *m_remote_mmap;
260 uint32_t m_task_count;
261 uint32_t m_transfer_contexts_size;
262 transfer_context *m_transfer_contexts;
263 std::vector<doca_task *> m_ctrl_tasks;
264 std::vector<doca_task *> m_data_tasks;
265 std::thread m_thread;
272 void init(doca_dev *dev);
286 static
void doca_rdma_task_send_cb(doca_rdma_task_send *task,
297 static
void doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
308 static
void doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
319 static
void doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
330 static
void on_transfer_complete(doca_task *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept;
339 static
void on_transfer_error(doca_task *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept;
350 class target_rdma_app {
353 target_rdma_app() =
delete;
354 explicit target_rdma_app(target_rdma_app_configuration
const &
cfg);
355 target_rdma_app(target_rdma_app
const &) =
delete;
356 target_rdma_app(target_rdma_app &&) noexcept = delete;
357 target_rdma_app &operator=(target_rdma_app const &) = delete;
358 target_rdma_app &operator=(target_rdma_app &&) noexcept = delete;
360 void abort(std::
string const &reason);
362 void wait_for_client_connection(
void);
363 void wait_for_and_process_query_storage(
void);
364 void wait_for_and_process_init_storage(
void);
365 void wait_for_and_process_create_rdma_connections(
void);
366 void wait_for_and_process_start_storage(
void);
367 void wait_for_and_process_stop_storage(
void);
368 void wait_for_and_process_shutdown(
void);
369 void display_stats(
void) const;
372 target_rdma_app_configuration const m_cfg;
374 std::unique_ptr<
storage::control::channel> m_control_channel;
375 std::vector<
storage::control::message> m_ctrl_messages;
376 uint8_t *m_local_io_region;
377 uint64_t m_local_io_region_size;
378 doca_mmap *m_local_io_mmap;
379 doca_mmap *m_remote_io_mmap;
380 target_rdma_worker *m_workers;
381 std::vector<target_rdma_worker_stats> m_stats;
382 uint32_t m_storage_block_count;
383 uint32_t m_storage_block_size;
384 uint32_t m_task_count;
385 uint32_t m_core_count;
403 storage::control::message wait_for_control_message();
411 storage::control::message process_query_storage(
storage::control::message const &client_request);
418 storage::control::message process_init_storage(
storage::control::message const &client_request);
426 storage::control::message process_create_rdma_connection(
storage::control::message const &client_request);
434 storage::control::message process_start_storage(
storage::control::message const &client_request);
442 storage::control::message process_stop_storage(
storage::control::message const &client_request);
450 storage::control::message process_shutdown(
storage::control::message const &client_request);
455 void prepare_workers();
460 void destroy_workers() noexcept;
465 void verify_connections_are_ready(
void);
477 target_rdma_app_configuration parse_target_rdma_app_cli_args(
int argc,
char **argv);
488 int main(
int argc,
char **argv)
495 target_rdma_app app{parse_target_rdma_app_cli_args(argc, argv)};
497 app.abort(
"User requested abort");
500 app.wait_for_client_connection();
501 app.wait_for_and_process_query_storage();
502 app.wait_for_and_process_init_storage();
503 app.wait_for_and_process_create_rdma_connections();
504 app.wait_for_and_process_start_storage();
505 app.wait_for_and_process_stop_storage();
506 app.wait_for_and_process_shutdown();
508 }
catch (std::exception
const &ex) {
509 fprintf(stderr,
"EXCEPTION: %s\n", ex.what());
527 void print_config(target_rdma_app_configuration
const &
cfg) noexcept
529 printf(
"target_rdma_app_configuration: {\n");
530 printf(
"\tcore_set : [");
532 for (
auto cpu :
cfg.core_set) {
540 printf(
"\tdevice : \"%s\",\n",
cfg.device_id.c_str());
541 printf(
"\tstorage_content_file_name : \"%s\",\n",
cfg.storage_content_file_name.c_str());
542 printf(
"\tlisten_port : %u\n",
cfg.listen_port);
543 printf(
"\tblock_count : %u\n",
cfg.block_count);
544 printf(
"\tblock_size : %u\n",
cfg.block_size);
553 void validate_target_rdma_app_configuration(target_rdma_app_configuration
const &
cfg)
555 std::vector<std::string> errors;
557 if (
cfg.storage_content_file_name.empty() && (
cfg.block_size == 0 ||
cfg.block_count == 0)) {
559 "Invalid target_rdma_app_configuration: block-size and block-count must be non zero when binary-content is not provided");
562 if (!errors.empty()) {
563 for (
auto const &err : errors) {
564 printf(
"%s\n", err.c_str());
567 "Invalid target_rdma_app_configuration detected"};
580 target_rdma_app_configuration parse_target_rdma_app_cli_args(
int argc,
char **argv)
582 target_rdma_app_configuration config{};
583 config.block_count = default_storage_block_count;
584 config.block_size = default_storage_block_size;
599 [](
void *
value,
void *
cfg) noexcept {
600 static_cast<target_rdma_app_configuration *
>(
cfg)->device_id =
601 static_cast<char const *
>(
value);
607 "CPU core to which the process affinity can be set",
610 [](
void *
value,
void *
cfg) noexcept {
611 static_cast<target_rdma_app_configuration *
>(
cfg)->core_set.push_back(
612 *
static_cast<int *
>(
value));
618 "TCP listen port number",
621 [](
void *
value,
void *
cfg) noexcept {
622 auto inv_val = *
static_cast<int *
>(
value);
623 auto short_val =
static_cast<uint16_t
>(inv_val);
625 if (inv_val != short_val)
628 static_cast<target_rdma_app_configuration *
>(
cfg)->listen_port =
636 "Path to binary .sbc file containing the initial content to be represented by this storage instance",
639 [](
void *
value,
void *
cfg) noexcept {
640 static_cast<target_rdma_app_configuration *
>(
cfg)->storage_content_file_name =
641 static_cast<char const *
>(
value);
648 "Number of available storage blocks. (Ignored when using content binary file) Default: 128",
651 [](
void *
value,
void *
cfg) noexcept {
652 static_cast<target_rdma_app_configuration *
>(
cfg)->block_count =
653 *
static_cast<uint32_t *
>(
value);
660 "Block size used by the storage. (Ignored when using content binary file) Default: 4096",
663 [](
void *
value,
void *
cfg) noexcept {
664 static_cast<target_rdma_app_configuration *
>(
cfg)->block_size = *
static_cast<uint32_t *
>(
value);
674 if (!config.storage_content_file_name.empty()) {
677 config.block_count = sbc.block_count;
678 config.block_size = sbc.block_size;
679 config.content = std::move(sbc.content);
682 auto const expected_content_size = uint64_t{config.block_size} * config.block_count;
683 if (config.content.size() != expected_content_size) {
686 "Selected input data file content : " + config.storage_content_file_name +
688 " bytes does not match the storage size of " +
694 print_config(config);
695 validate_target_rdma_app_configuration(config);
700 target_rdma_worker::hot_data::hot_data()
704 remote_memory_start_addr{nullptr},
705 local_memory_start_addr{nullptr},
706 completed_transaction_count{0},
707 in_flight_transaction_count{0},
714 target_rdma_worker::hot_data::hot_data(hot_data &&other) noexcept
716 pe_hit_count{other.pe_hit_count},
717 pe_miss_count{other.pe_miss_count},
718 remote_memory_start_addr{other.remote_memory_start_addr},
719 local_memory_start_addr{other.local_memory_start_addr},
720 completed_transaction_count{other.completed_transaction_count},
721 in_flight_transaction_count{other.in_flight_transaction_count},
722 core_idx{other.core_idx},
723 run_flag{other.run_flag.load()},
724 error_flag{other.error_flag}
729 target_rdma_worker::hot_data &target_rdma_worker::hot_data::operator=(hot_data &&other) noexcept
731 if (std::addressof(other) ==
this)
735 pe_hit_count = other.pe_hit_count;
736 pe_miss_count = other.pe_miss_count;
737 remote_memory_start_addr = other.remote_memory_start_addr;
738 local_memory_start_addr = other.local_memory_start_addr;
739 completed_transaction_count = other.completed_transaction_count;
740 in_flight_transaction_count = other.in_flight_transaction_count;
741 core_idx = other.core_idx;
742 run_flag = other.run_flag.load();
743 error_flag = other.error_flag;
750 target_rdma_worker::~target_rdma_worker()
752 if (m_thread.joinable()) {
753 m_hot_data.run_flag =
false;
754 m_hot_data.error_flag =
true;
761 target_rdma_worker::target_rdma_worker(doca_dev *dev, uint32_t task_count, doca_mmap *remote_mmap, doca_mmap *local_mmap)
763 m_io_message_region{nullptr},
764 m_io_message_mmap{nullptr},
769 m_local_mmap{local_mmap},
770 m_remote_mmap{remote_mmap},
771 m_task_count{task_count},
772 m_transfer_contexts_size{0},
773 m_transfer_contexts{nullptr},
786 target_rdma_worker::target_rdma_worker(target_rdma_worker &&other) noexcept
787 : m_hot_data{std::move(other.m_hot_data)},
788 m_io_message_region{other.m_io_message_region},
789 m_io_message_mmap{other.m_io_message_mmap},
790 m_buf_inv{other.m_buf_inv},
791 m_bufs{std::move(other.m_bufs)},
792 m_rdma_ctrl_ctx{other.m_rdma_ctrl_ctx},
793 m_rdma_data_ctx{other.m_rdma_data_ctx},
794 m_local_mmap{other.m_local_mmap},
795 m_remote_mmap{other.m_remote_mmap},
796 m_task_count{other.m_task_count},
797 m_transfer_contexts_size{other.m_transfer_contexts_size},
798 m_transfer_contexts{other.m_transfer_contexts},
799 m_ctrl_tasks{std::move(other.m_ctrl_tasks)},
800 m_data_tasks{std::move(other.m_data_tasks)},
801 m_thread{std::move(other.m_thread)}
803 other.m_io_message_region =
nullptr;
804 other.m_io_message_mmap =
nullptr;
805 other.m_buf_inv =
nullptr;
806 other.m_rdma_ctrl_ctx = {};
807 other.m_rdma_data_ctx = {};
808 other.m_transfer_contexts =
nullptr;
811 target_rdma_worker &target_rdma_worker::operator=(target_rdma_worker &&other) noexcept
813 if (std::addressof(other) ==
this)
816 m_hot_data = std::move(other.m_hot_data);
817 m_io_message_region = other.m_io_message_region;
818 m_io_message_mmap = other.m_io_message_mmap;
819 m_buf_inv = other.m_buf_inv;
820 m_bufs = std::move(other.m_bufs);
821 m_rdma_ctrl_ctx = other.m_rdma_ctrl_ctx;
822 m_rdma_data_ctx = other.m_rdma_data_ctx;
823 m_local_mmap = other.m_local_mmap;
824 m_remote_mmap = other.m_remote_mmap;
825 m_task_count = other.m_task_count;
826 m_transfer_contexts_size = other.m_transfer_contexts_size;
827 m_transfer_contexts = other.m_transfer_contexts;
828 m_ctrl_tasks = std::move(other.m_ctrl_tasks);
829 m_data_tasks = std::move(other.m_data_tasks);
830 m_thread = std::move(other.m_thread);
832 other.m_io_message_region =
nullptr;
833 other.m_io_message_mmap =
nullptr;
834 other.m_buf_inv =
nullptr;
835 other.m_rdma_ctrl_ctx = {};
836 other.m_rdma_data_ctx = {};
837 other.m_transfer_contexts =
nullptr;
843 std::vector<uint8_t>
const &remote_conn_details)
847 auto local_connection_details = [
this, &rdma_pair]() {
848 uint8_t
const *blob =
nullptr;
849 size_t blob_size = 0;
852 reinterpret_cast<void const **
>(&blob),
854 std::addressof(rdma_pair.conn));
859 return std::vector<uint8_t>{blob, blob + blob_size};
863 remote_conn_details.data(),
864 remote_conn_details.size(),
871 return local_connection_details;
874 doca_error_t target_rdma_worker::get_rdma_connection_state() const noexcept
878 bool ctrl_connected =
false;
879 bool data_connected =
false;
886 ctrl_connected =
true;
896 data_connected =
true;
904 void target_rdma_worker::stop_processing(
void) noexcept
906 m_hot_data.run_flag =
false;
907 if (m_thread.joinable()) {
912 void target_rdma_worker::prepare_and_submit_tasks(
void)
915 uint8_t *message_buffer_addr = m_io_message_region;
916 size_t local_memory_size = 0;
917 size_t remote_memory_size = 0;
919 reinterpret_cast<void **
>(&m_hot_data.local_memory_start_addr),
920 &local_memory_size));
922 reinterpret_cast<void **
>(&m_hot_data.remote_memory_start_addr),
923 &remote_memory_size));
925 if (remote_memory_size < local_memory_size) {
927 "Unable to start storage, remote memory region is to small(" +
929 " bytes) This storage instance requires it to be at least: " +
932 if (local_memory_size != remote_memory_size) {}
934 std::vector<doca_task *> request_tasks;
935 request_tasks.reserve(m_task_count);
936 m_ctrl_tasks.reserve(m_task_count * 2);
937 m_data_tasks.reserve(m_task_count * 2);
938 m_bufs.reserve(m_task_count * 3);
942 for (uint32_t ii = 0; ii != m_task_count; ++ii) {
943 doca_buf *message_buf;
954 m_bufs.push_back(message_buf);
959 m_hot_data.local_memory_start_addr,
961 std::addressof(m_transfer_contexts[ii].storage_buf));
966 m_bufs.push_back(m_transfer_contexts[ii].storage_buf);
970 m_hot_data.remote_memory_start_addr,
972 std::addressof(m_transfer_contexts[ii].host_buf));
977 m_bufs.push_back(m_transfer_contexts[ii].host_buf);
979 doca_rdma_task_receive *request_task =
nullptr;
982 doca_data{.ptr = std::addressof(m_transfer_contexts[ii])},
990 doca_rdma_task_send *response_task =
nullptr;
992 m_rdma_ctrl_ctx.conn,
1002 m_rdma_data_ctx.conn,
1003 m_transfer_contexts[ii].storage_buf,
1004 m_transfer_contexts[ii].host_buf,
1006 std::addressof(m_transfer_contexts[ii].write_task));
1013 m_rdma_data_ctx.conn,
1014 m_transfer_contexts[ii].host_buf,
1015 m_transfer_contexts[ii].storage_buf,
1017 std::addressof(m_transfer_contexts[ii].read_task));
1024 for (
auto *task : request_tasks) {
1032 void target_rdma_worker::prepare_thread_proc(uint32_t core_id)
1034 m_thread = std::thread{[
this]() {
1037 }
catch (std::exception
const &ex) {
1038 DOCA_LOG_ERR(
"Core: %u Exception: %s", m_hot_data.core_idx, ex.what());
1039 m_hot_data.error_flag =
true;
1040 m_hot_data.run_flag =
false;
1043 m_hot_data.core_idx = core_id;
1047 void target_rdma_worker::start_thread_proc()
1049 m_hot_data.run_flag =
true;
1052 target_rdma_worker::hot_data
const &target_rdma_worker::get_hot_data(
void)
const noexcept
1057 void target_rdma_worker::init(doca_dev *dev)
1064 m_io_message_region =
static_cast<uint8_t *
>(
1066 if (m_io_message_region ==
nullptr) {
1071 reinterpret_cast<char *
>(m_io_message_region),
1072 raw_io_messages_size,
1091 doca_data{.ptr = std::addressof(m_hot_data)},
1095 doca_rdma_task_receive_cb,
1096 doca_rdma_task_receive_error_cb,
1103 doca_rdma_task_send_cb,
1104 doca_rdma_task_send_error_cb,
1117 doca_data{.ptr = std::addressof(m_hot_data)},
1127 "Failed to set doca_rdma_task_read task pool target_rdma_app_configuration"};
1131 m_rdma_data_ctx.rdma,
1138 "Failed to set doca_rdma_task_write task pool target_rdma_app_configuration"};
1151 if (m_rdma_ctrl_ctx.rdma !=
nullptr) {
1155 m_ctrl_tasks.clear();
1162 m_rdma_ctrl_ctx.rdma =
nullptr;
1168 if (m_rdma_data_ctx.rdma !=
nullptr) {
1172 m_data_tasks.clear();
1179 m_rdma_data_ctx.rdma =
nullptr;
1185 if (m_hot_data.pe !=
nullptr) {
1188 m_hot_data.pe =
nullptr;
1194 if (m_transfer_contexts !=
nullptr) {
1196 m_transfer_contexts =
nullptr;
1199 for (
auto *buf : m_bufs) {
1210 m_buf_inv =
nullptr;
1216 if (m_io_message_mmap) {
1223 m_io_message_mmap =
nullptr;
1229 if (m_io_message_region !=
nullptr) {
1234 void target_rdma_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
1238 static_cast<void>(task);
1239 static_cast<void>(ctx_user_data);
1241 auto *
const request_task =
static_cast<doca_rdma_task_receive *
>(task_user_data.ptr);
1249 auto *
const hot_data =
static_cast<target_rdma_worker::hot_data *
>(ctx_user_data.ptr);
1250 --(hot_data->in_flight_transaction_count);
1253 void target_rdma_worker::doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
1257 static_cast<void>(task);
1258 static_cast<void>(task_user_data);
1260 DOCA_LOG_ERR(
"Failed to complete doca_rdma_task_send");
1262 auto *
const hot_data =
static_cast<target_rdma_worker::hot_data *
>(ctx_user_data.ptr);
1263 --(hot_data->in_flight_transaction_count);
1264 hot_data->run_flag =
false;
1265 hot_data->error_flag =
true;
1268 void target_rdma_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
1273 auto *
const hot_data =
static_cast<target_rdma_worker::hot_data *
>(ctx_user_data.ptr);
1278 auto *
const transfer_ctx =
static_cast<transfer_context *
>(task_user_data.ptr);
1283 hot_data->remote_memory_start_addr;
1285 char *
const remote_addr = hot_data->remote_memory_start_addr + offset +
1287 char *
const local_addr = hot_data->local_memory_start_addr + offset;
1310 ++(hot_data->in_flight_transaction_count);
1312 "Start read(%p) of %u bytes from storage: %p to remote: %p (in_flight_transaction_count: %u)",
1313 transfer_ctx->write_task,
1317 hot_data->in_flight_transaction_count);
1321 hot_data->remote_memory_start_addr;
1323 char *
const remote_addr = hot_data->remote_memory_start_addr + offset +
1325 char *
const local_addr = hot_data->local_memory_start_addr + offset;
1348 ++(hot_data->in_flight_transaction_count);
1350 "Start write(%p) of %u bytes from remote: %p to storage: %p (in_flight_transaction_count: %u)",
1351 transfer_ctx->read_task,
1355 hot_data->in_flight_transaction_count);
1367 auto *
const response_task =
static_cast<doca_rdma_task_send *
>(
1379 void target_rdma_worker::doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
1383 static_cast<void>(task);
1384 static_cast<void>(task_user_data);
1386 auto *
const hot_data =
static_cast<target_rdma_worker::hot_data *
>(ctx_user_data.ptr);
1387 if (hot_data->run_flag) {
1388 DOCA_LOG_ERR(
"Failed to complete doca_rdma_task_receive");
1390 --(hot_data->in_flight_transaction_count);
1391 hot_data->run_flag =
false;
1392 hot_data->error_flag =
true;
1396 void target_rdma_worker::on_transfer_complete(doca_task *task,
1400 static_cast<void>(task);
1401 static_cast<void>(ctx_user_data);
1403 auto *
const hot_data =
static_cast<target_rdma_worker::hot_data *
>(ctx_user_data.ptr);
1404 auto *
const response_task =
static_cast<doca_rdma_task_send *
>(task_user_data.ptr);
1405 auto *
const io_message =
1408 ++(hot_data->completed_transaction_count);
1419 void target_rdma_worker::on_transfer_error(doca_task *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
1421 static_cast<void>(task);
1423 auto *
const hot_data =
static_cast<target_rdma_worker::hot_data *
>(ctx_user_data.ptr);
1424 auto *
const response_task =
static_cast<doca_rdma_task_send *
>(task_user_data.ptr);
1425 auto *
const io_message =
1428 ++(hot_data->completed_transaction_count);
1429 hot_data->error_flag =
true;
1440 void target_rdma_worker::thread_proc()
1442 while (m_hot_data.run_flag ==
false) {
1443 std::this_thread::yield();
1444 if (m_hot_data.error_flag)
1450 while (m_hot_data.run_flag) {
1451 doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1454 while (m_hot_data.error_flag ==
false && m_hot_data.in_flight_transaction_count != 0) {
1455 doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1461 target_rdma_app::~target_rdma_app()
1466 target_rdma_app::target_rdma_app(target_rdma_app_configuration
const &
cfg)
1469 m_control_channel{},
1471 m_local_io_region{nullptr},
1472 m_local_io_region_size{0},
1473 m_local_io_mmap{nullptr},
1474 m_remote_io_mmap{nullptr},
1477 m_storage_block_count{},
1478 m_storage_block_size{},
1485 }
catch (std::exception
const &) {
1491 void target_rdma_app::abort(std::string
const &reason)
1497 m_abort_flag =
true;
1500 void target_rdma_app::wait_for_client_connection(
void)
1502 while (!m_control_channel->is_connected()) {
1503 std::this_thread::sleep_for(std::chrono::milliseconds{100});
1507 void target_rdma_app::wait_for_and_process_query_storage(
void)
1510 auto const client_request = wait_for_control_message();
1513 std::string err_msg;
1517 m_control_channel->send_message(process_query_storage(client_request));
1521 err_msg = ex.what();
1525 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1529 m_control_channel->send_message({
1531 client_request.message_id,
1532 client_request.correlation_id,
1533 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1538 void target_rdma_app::wait_for_and_process_init_storage(
void)
1541 auto const client_request = wait_for_control_message();
1544 std::string err_msg;
1548 m_control_channel->send_message(process_init_storage(client_request));
1552 err_msg = ex.what();
1556 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1560 m_control_channel->send_message({
1562 client_request.message_id,
1563 client_request.correlation_id,
1564 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1569 void target_rdma_app::wait_for_and_process_create_rdma_connections(
void)
1573 uint32_t remaining_connections = m_core_count * 2;
1574 while (remaining_connections != 0) {
1575 auto const client_request = wait_for_control_message();
1578 std::string err_msg;
1582 m_control_channel->send_message(process_create_rdma_connection(client_request));
1583 --remaining_connections;
1587 err_msg = ex.what();
1591 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1595 --remaining_connections;
1596 m_control_channel->send_message({
1598 client_request.message_id,
1599 client_request.correlation_id,
1600 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1606 "target_rdma_app aborted while preparing to start storage"};
1611 void target_rdma_app::wait_for_and_process_start_storage(
void)
1614 auto const client_request = wait_for_control_message();
1617 std::string err_msg;
1621 m_control_channel->send_message(process_start_storage(client_request));
1625 err_msg = ex.what();
1629 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1633 m_control_channel->send_message({
1635 client_request.message_id,
1636 client_request.correlation_id,
1637 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1642 void target_rdma_app::wait_for_and_process_stop_storage(
void)
1645 auto const client_request = wait_for_control_message();
1648 std::string err_msg;
1652 m_control_channel->send_message(process_stop_storage(client_request));
1656 err_msg = ex.what();
1660 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1664 m_control_channel->send_message({
1666 client_request.message_id,
1667 client_request.correlation_id,
1668 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1673 void target_rdma_app::wait_for_and_process_shutdown(
void)
1676 auto const client_request = wait_for_control_message();
1679 std::string err_msg;
1683 m_control_channel->send_message(process_shutdown(client_request));
1687 err_msg = ex.what();
1691 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1695 m_control_channel->send_message({
1697 client_request.message_id,
1698 client_request.correlation_id,
1699 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1704 void target_rdma_app::display_stats(
void)
const
1706 for (
auto const &
stats : m_stats) {
1707 auto const pe_hit_rate_pct =
1708 (
static_cast<double>(
stats.pe_hit_count) /
1709 (
static_cast<double>(
stats.pe_hit_count) +
static_cast<double>(
stats.pe_miss_count))) *
1712 printf(
"+================================================+\n");
1713 printf(
"| Core: %u\n",
stats.core_idx);
1714 printf(
"| Operation count: %lu\n",
stats.operation_count);
1715 printf(
"| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct,
stats.pe_hit_count,
stats.pe_miss_count);
1719 void target_rdma_app::init(
void)
1723 m_storage_block_count = m_cfg.block_count;
1724 m_storage_block_size = m_cfg.block_size;
1727 m_local_io_region_size = uint64_t{m_storage_block_count} * m_storage_block_size;
1730 if (!m_cfg.content.empty()) {
1731 std::copy(std::begin(m_cfg.content), std::end(m_cfg.content), m_local_io_region);
1739 m_control_channel.reset();
1741 if (m_dev !=
nullptr) {
1752 if (!m_ctrl_messages.empty()) {
1753 auto msg = std::move(m_ctrl_messages.front());
1754 m_ctrl_messages.erase(m_ctrl_messages.begin());
1759 auto *new_msg = m_control_channel->poll();
1761 m_ctrl_messages.push_back(std::move(*new_msg));
1767 "User aborted the target_rdma_app while waiting on a control message"};
1778 std::make_unique<storage::control::storage_details_payload>(uint64_t{m_storage_block_size} *
1779 m_storage_block_count,
1780 m_storage_block_size),
1786 auto const *details =
1789 if (details->core_count > m_cfg.core_set.size()) {
1791 "Requested storage to use " +
std::to_string(details->core_count) +
1796 m_core_count = details->core_count;
1797 m_task_count = details->task_count;
1800 reinterpret_cast<char *
>(m_local_io_region),
1801 m_local_io_region_size,
1804 storage::make_mmap(m_dev, details->mmap_export_blob.data(), details->mmap_export_blob.size());
1820 client_request.
payload.get());
1821 if (details->context_idx > m_core_count) {
1823 "Unable to create RDMA connection for invalid context idx"};
1827 m_workers[details->context_idx].create_rdma_connection(details->role, details->connection_details);
1833 std::make_unique<storage::control::rdma_connection_details_payload>(details->context_idx,
1841 verify_connections_are_ready();
1842 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1843 m_workers[ii].prepare_and_submit_tasks();
1844 m_workers[ii].prepare_thread_proc(m_cfg.core_set[ii]);
1845 m_workers[ii].start_thread_proc();
1858 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1859 m_workers[ii].stop_processing();
1872 m_stats.reserve(m_core_count);
1873 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1874 auto const &hot_data = m_workers[ii].get_hot_data();
1875 m_stats.push_back(target_rdma_worker_stats{
1877 hot_data.pe_hit_count,
1878 hot_data.pe_miss_count,
1879 hot_data.completed_transaction_count,
1893 void target_rdma_app::prepare_workers()
1895 if (m_core_count > m_cfg.core_set.size()) {
1897 "Unable to create " +
std::to_string(m_core_count) +
" threads as only " +
1908 void target_rdma_app::destroy_workers(
void) noexcept
1910 if (m_workers !=
nullptr) {
1912 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1913 m_workers[ii].~target_rdma_worker();
1916 m_workers =
nullptr;
1920 void target_rdma_app::verify_connections_are_ready(
void)
1922 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1927 "Aborted while establishing storage connections"};
1930 auto const ret = m_workers[ii].get_rdma_connection_state();
static void cleanup(struct cache_invalidate_sample_state *state)
static uint32_t get_io_size(char const *buf)
static void set_type(io_message_type type, char *buf)
static uint32_t get_remote_offset(char const *buf)
static io_message_type get_type(char const *buf)
static uint64_t get_io_address(char const *buf)
static void set_result(doca_error_t result, char *buf)
T * object_array(size_t object_count, Args &&...args) const
doca_error_t get_doca_error() const noexcept
static struct eth_l2_fwd_stats stats
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE doca_error_t doca_buf_set_data(struct doca_buf *buf, void *data, size_t data_len)
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
doca_ctx_states
This enum defines the states of a context.
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_INVALID_VALUE
@ DOCA_ERROR_INITIALIZATION
@ DOCA_ERROR_CONNECTION_RESET
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
#define DOCA_LOG_TRC(format,...)
Generates a TRACE application log message.
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_get_memrange(const struct doca_mmap *mmap, void **addr, size_t *len)
Get the memory range of DOCA memory map.
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE union doca_data doca_task_get_user_data(const struct doca_task *task)
Get user data from a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
DOCA_EXPERIMENTAL struct doca_buf * doca_rdma_task_receive_get_dst_buf(const struct doca_rdma_task_receive *task)
This method gets the destination buffer of a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_set_conf(struct doca_rdma *rdma, doca_rdma_task_send_completion_cb_t successful_task_completion_cb, doca_rdma_task_send_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the send tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_destroy(struct doca_rdma *rdma)
Destroy a DOCA RDMA instance.
DOCA_EXPERIMENTAL const struct doca_buf * doca_rdma_task_send_get_src_buf(const struct doca_rdma_task_send *task)
This method gets the source buffer of a send task.
DOCA_EXPERIMENTAL void doca_rdma_task_write_set_dst_buf(struct doca_rdma_task_write *task, struct doca_buf *dst_buf)
This method sets the destination buffer of a write task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_read_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_read **task)
This method allocates and initializes a read task.
DOCA_EXPERIMENTAL void doca_rdma_task_write_set_src_buf(struct doca_rdma_task_write *task, const struct doca_buf *src_buf)
This method sets the source buffer of a write task.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_write_as_task(struct doca_rdma_task_write *task)
This method converts a write task to a doca_task.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_receive_as_task(struct doca_rdma_task_receive *task)
This method converts a receive task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_write_set_conf(struct doca_rdma *rdma, doca_rdma_task_write_completion_cb_t successful_task_completion_cb, doca_rdma_task_write_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the write tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_set_conf(struct doca_rdma *rdma, doca_rdma_task_receive_completion_cb_t successful_task_completion_cb, doca_rdma_task_receive_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the receive tasks configuration.
DOCA_EXPERIMENTAL void doca_rdma_task_read_set_dst_buf(struct doca_rdma_task_read *task, struct doca_buf *dst_buf)
This method sets the destination buffer of a read task.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_send_as_task(struct doca_rdma_task_send *task)
This method converts a send task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_allocate_init(struct doca_rdma *rdma, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_receive **task)
This method allocates and initializes a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_read_set_conf(struct doca_rdma *rdma, doca_rdma_task_read_completion_cb_t successful_task_completion_cb, doca_rdma_task_read_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the read tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_export(struct doca_rdma *rdma, const void **local_rdma_conn_details, size_t *local_rdma_conn_details_size, struct doca_rdma_connection **rdma_connection)
Export doca_rdma connection details object The doca_rdma_conn_details are used in doca_rdma_connect()...
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_read_as_task(struct doca_rdma_task_read *task)
This method converts a read task to a doca_task.
DOCA_EXPERIMENTAL void doca_rdma_task_read_set_src_buf(struct doca_rdma_task_read *task, const struct doca_buf *src_buf)
This method sets the source buffer of a read task.
void(* doca_rdma_task_read_completion_cb_t)(struct doca_rdma_task_read *task, union doca_data task_user_data, union doca_data ctx_user_data)
Function to execute on completion of a read task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, union doca_data user_data, struct doca_rdma_task_send **task)
This method allocates and initializes a send task.
DOCA_EXPERIMENTAL struct doca_ctx * doca_rdma_as_ctx(struct doca_rdma *rdma)
Convert doca_rdma instance into a generalized context for use with doca core objects.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_connect(struct doca_rdma *rdma, const void *remote_rdma_conn_details, size_t remote_rdma_conn_details_size, struct doca_rdma_connection *rdma_connection)
Connect to remote doca_rdma peer. Can only be called when the ctx is in DOCA_CTX_STATE_STARTING state...
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_write_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_write **task)
This method allocates and initializes a write task.
void(* doca_rdma_task_write_completion_cb_t)(struct doca_rdma_task_write *task, union doca_data task_user_data, union doca_data ctx_user_data)
Function to execute on completion of a write task.
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_READ
@ DOCA_ACCESS_FLAG_RDMA_WRITE
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
const struct ip_frag_config * cfg
@ create_rdma_connection_request
@ create_rdma_connection_response
std::string to_string(storage::control::message_type type)
std::unique_ptr< storage::control::channel > make_tcp_server_control_channel(uint16_t listen_port)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
void uninstall_ctrl_c_handler()
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
static constexpr value_requirement optional_value
void create_doca_logger_backend(void) noexcept
void install_ctrl_c_handler(std::function< void(void)> callback)
bool file_has_binary_content_header(std::string const &file_name)
void aligned_free(void *memory)
doca_rdma * make_rdma_context(doca_dev *dev, doca_pe *pe, doca_data ctx_user_data, uint32_t permissions)
static constexpr value_multiplicity multiple_values
std::vector< uint8_t > load_file_bytes(std::string const &file_name)
void * aligned_alloc(size_t alignment, size_t size)
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
size_t aligned_size(size_t alignment, size_t size)
char * get_buffer_bytes(doca_buf *buf) noexcept
static constexpr value_multiplicity single_value
constexpr size_t size_of_io_message
constexpr uint32_t cache_line_size
std::string io_message_to_string(char const *buf)
static constexpr value_requirement required_value
uint32_t get_system_page_size(void)
storage::binary_content load_binary_content_from_file(std::string const &file_name)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
doca_dev * open_device(std::string const &identifier)
storage::control::correlation_id correlation_id
storage::control::message_id message_id
std::unique_ptr< storage::control::message::payload > payload
int main(int argc, char **argv)
DOCA_LOG_REGISTER(TARGET_RDMA)
Convenience type for representing opaque data.