61 using namespace std::string_literals;
65 auto constexpr app_name =
"doca_storage_comch_to_rdma_zero_copy";
67 auto constexpr default_control_timeout_seconds = std::chrono::seconds{5};
68 auto constexpr default_command_channel_name =
"doca_storage_comch";
70 static_assert(
sizeof(
void *) == 8,
"Expected a pointer to occupy 8 bytes");
72 struct zero_copy_app_configuration {
73 std::vector<uint32_t> cpu_set = {};
74 std::string device_id = {};
75 std::string representor_id = {};
76 std::string command_channel_name = {};
77 std::chrono::seconds control_timeout = {};
82 uint32_t core_idx = 0;
83 uint64_t pe_hit_count = 0;
84 uint64_t pe_miss_count = 0;
85 uint64_t operation_count = 0;
88 class zero_copy_app_worker {
90 struct alignas(storage::cache_line_size) hot_data {
92 uint64_t pe_hit_count;
93 uint64_t pe_miss_count;
94 uint64_t completed_transaction_count;
95 uint32_t in_flight_transaction_count;
99 std::atomic_bool run_flag;
103 hot_data(hot_data
const &other) =
delete;
104 hot_data(hot_data &&other) noexcept;
105 hot_data &operator=(hot_data
const &other) =
delete;
106 hot_data &operator=(hot_data &&other) noexcept;
114 doca_error_t submit_comch_recv_task(doca_comch_consumer_task_post_recv *task);
117 "Expected thread_context::hot_data to occupy one cache line");
119 ~zero_copy_app_worker();
120 zero_copy_app_worker() =
delete;
121 zero_copy_app_worker(doca_dev *dev, doca_comch_connection *comch_conn, uint32_t task_count, uint32_t batch_size);
122 zero_copy_app_worker(zero_copy_app_worker
const &) =
delete;
123 [[maybe_unused]] zero_copy_app_worker(zero_copy_app_worker &&) noexcept;
124 zero_copy_app_worker &operator=(zero_copy_app_worker const &) = delete;
125 [[maybe_unused]] zero_copy_app_worker &operator=(zero_copy_app_worker &&) noexcept;
130 void stop_processing(
void) noexcept;
131 void destroy_comch_objects(
void) noexcept;
133 void create_tasks(uint32_t task_count, uint32_t batch_size, uint32_t remote_consumer_id);
139 void prepare_thread_proc(uint32_t core_id);
140 void start_thread_proc();
141 [[nodiscard]] hot_data const &get_hot_data() const noexcept;
145 uint8_t *m_io_message_region;
146 doca_mmap *m_io_message_mmap;
147 doca_buf_inventory *m_io_message_inv;
148 std::vector<doca_buf *> m_io_message_bufs;
149 doca_comch_consumer *m_consumer;
150 doca_comch_producer *m_producer;
151 storage::rdma_conn_pair m_rdma_ctrl_ctx;
152 storage::rdma_conn_pair m_rdma_data_ctx;
153 std::vector<doca_comch_consumer_task_post_recv *> m_host_request_tasks;
154 std::vector<doca_comch_producer_task_send *> m_host_response_tasks;
155 std::vector<doca_rdma_task_send *> m_storage_request_tasks;
156 std::vector<doca_rdma_task_receive *> m_storage_response_tasks;
157 std::thread m_thread;
159 void init(doca_dev *dev, doca_comch_connection *comch_conn, uint32_t task_count, uint32_t batch_size);
162 static
void doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
165 static
void doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
168 static
void doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
171 static
void doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
174 static
void doca_rdma_task_send_cb(doca_rdma_task_send *task,
177 static
void doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
180 static
void doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
183 static
void doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
189 class zero_copy_app {
192 zero_copy_app() =
delete;
193 explicit zero_copy_app(zero_copy_app_configuration
const &
cfg);
194 zero_copy_app(zero_copy_app
const &) =
delete;
195 zero_copy_app(zero_copy_app &&) noexcept = delete;
196 zero_copy_app &operator=(zero_copy_app const &) = delete;
197 zero_copy_app &operator=(zero_copy_app &&) noexcept = delete;
199 void abort(std::
string const &reason);
201 void connect_to_storage(
void);
202 void wait_for_comch_client_connection(
void);
203 void wait_for_and_process_query_storage(
void);
204 void wait_for_and_process_init_storage(
void);
205 void wait_for_and_process_start_storage(
void);
206 void wait_for_and_process_stop_storage(
void);
207 void wait_for_and_process_shutdown(
void);
208 void display_stats(
void) const;
211 zero_copy_app_configuration const m_cfg;
213 doca_dev_rep *m_dev_rep;
214 doca_mmap *m_remote_io_mmap;
215 std::unique_ptr<
storage::control::comch_channel> m_client_control_channel;
216 std::unique_ptr<
storage::control::channel> m_storage_control_channel;
217 std::vector<
storage::control::channel *> m_ctrl_channels;
218 std::vector<
storage::control::message> m_ctrl_messages;
219 std::vector<uint32_t> m_remote_consumer_ids;
220 zero_copy_app_worker *m_workers;
221 std::vector<thread_stats> m_stats;
222 uint64_t m_storage_capacity;
223 uint32_t m_storage_block_size;
224 uint32_t m_message_id_counter;
225 uint32_t m_task_count;
226 uint32_t m_batch_size;
227 uint32_t m_core_count;
230 static
void new_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept;
231 static
void expired_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept;
233 storage::control::message wait_for_control_message();
234 storage::control::message wait_for_control_message(
storage::control::message_id mid,
235 std::chrono::seconds timeout);
237 storage::control::message process_query_storage(
storage::control::message const &client_request);
238 storage::control::message process_init_storage(
storage::control::message const &client_request);
239 storage::control::message process_start_storage(
storage::control::message const &client_request);
240 storage::control::message process_stop_storage(
storage::control::message const &client_request);
241 storage::control::message process_shutdown(
storage::control::message const &client_requeste);
243 void prepare_thread_contexts(
storage::control::correlation_id cid);
245 void connect_rdma(uint32_t thread_idx,
247 storage::control::correlation_id cid);
249 void verify_connections_are_ready(
void);
251 void destroy_workers(
void) noexcept;
263 zero_copy_app_configuration parse_cli_args(
int argc,
char **argv);
274 int main(
int argc,
char **argv)
281 zero_copy_app app{parse_cli_args(argc, argv)};
283 app.abort(
"User requested abort");
286 app.connect_to_storage();
287 app.wait_for_comch_client_connection();
288 app.wait_for_and_process_query_storage();
289 app.wait_for_and_process_init_storage();
290 app.wait_for_and_process_start_storage();
291 app.wait_for_and_process_stop_storage();
292 app.wait_for_and_process_shutdown();
294 }
catch (std::exception
const &ex) {
295 fprintf(stderr,
"EXCEPTION: %s\n", ex.what());
313 void print_config(zero_copy_app_configuration
const &
cfg) noexcept
315 printf(
"zero_copy_app_configuration: {\n");
316 printf(
"\tcpu_set : [");
318 for (
auto cpu :
cfg.cpu_set) {
326 printf(
"\tdevice : \"%s\",\n",
cfg.device_id.c_str());
327 printf(
"\trepresentor : \"%s\",\n",
cfg.representor_id.c_str());
328 printf(
"\tcommand_channel_name : \"%s\",\n",
cfg.command_channel_name.c_str());
329 printf(
"\tcontrol_timeout : %u,\n",
static_cast<uint32_t
>(
cfg.control_timeout.count()));
330 printf(
"\tstorage_server : %s:%u\n",
331 cfg.storage_server_address.get_address().c_str(),
332 cfg.storage_server_address.get_port());
341 void validate_zero_copy_app_configuration(zero_copy_app_configuration
const &
cfg)
343 std::vector<std::string> errors;
345 if (
cfg.control_timeout.count() == 0) {
346 errors.emplace_back(
"Invalid zero_copy_app_configuration: control-timeout must not be zero");
349 if (!errors.empty()) {
350 for (
auto const &err : errors) {
351 printf(
"%s\n", err.c_str());
366 zero_copy_app_configuration parse_cli_args(
int argc,
char **argv)
368 zero_copy_app_configuration config{};
369 config.command_channel_name = default_command_channel_name;
370 config.control_timeout = default_control_timeout_seconds;
385 [](
void *
value,
void *
cfg) noexcept {
386 static_cast<zero_copy_app_configuration *
>(
cfg)->device_id =
387 static_cast<char const *
>(
value);
393 "Device host side representor identifier",
396 [](
void *
value,
void *
cfg) noexcept {
397 static_cast<zero_copy_app_configuration *
>(
cfg)->representor_id =
398 static_cast<char const *
>(
value);
404 "CPU core to which the process affinity can be set",
407 [](
void *
value,
void *
cfg) noexcept {
408 static_cast<zero_copy_app_configuration *
>(
cfg)->cpu_set.push_back(
409 *
static_cast<int *
>(
value));
416 "Storage server addresses in <ip_addr>:<port> format",
419 [](
void *
value,
void *
cfg) noexcept {
421 static_cast<zero_copy_app_configuration *
>(
cfg)->storage_server_address =
431 "command-channel-name",
432 "Name of the channel used by the doca_comch_client. Default: \"doca_storage_comch\"",
435 [](
void *
value,
void *
cfg) noexcept {
436 static_cast<zero_copy_app_configuration *
>(
cfg)->command_channel_name =
437 static_cast<char const *
>(
value);
443 "Time (in seconds) to wait while performing control operations. Default: 5",
446 [](
void *
value,
void *
cfg) noexcept {
447 static_cast<zero_copy_app_configuration *
>(
cfg)->control_timeout =
448 std::chrono::seconds{*
static_cast<int *
>(
value)};
458 print_config(config);
459 validate_zero_copy_app_configuration(config);
464 zero_copy_app_worker::hot_data::hot_data()
468 completed_transaction_count{0},
469 in_flight_transaction_count{0},
478 zero_copy_app_worker::hot_data::hot_data(hot_data &&other) noexcept
480 pe_hit_count{other.pe_hit_count},
481 pe_miss_count{other.pe_miss_count},
482 completed_transaction_count{other.completed_transaction_count},
483 in_flight_transaction_count{other.in_flight_transaction_count},
484 core_idx{other.core_idx},
485 batch_count{other.batch_count},
486 batch_size{other.batch_size},
487 run_flag{other.run_flag.load()},
488 error_flag{other.error_flag}
493 zero_copy_app_worker::hot_data &zero_copy_app_worker::hot_data::operator=(hot_data &&other) noexcept
495 if (std::addressof(other) ==
this)
499 pe_hit_count = other.pe_hit_count;
500 pe_miss_count = other.pe_miss_count;
501 completed_transaction_count = other.completed_transaction_count;
502 in_flight_transaction_count = other.in_flight_transaction_count;
503 core_idx = other.core_idx;
504 batch_count = other.batch_count;
505 batch_size = other.batch_size;
506 run_flag = other.run_flag.load();
507 error_flag = other.error_flag;
514 doca_error_t zero_copy_app_worker::hot_data::submit_comch_recv_task(doca_comch_consumer_task_post_recv *task)
517 if (--batch_count == 0) {
519 batch_count = batch_size;
525 zero_copy_app_worker::~zero_copy_app_worker()
527 if (m_thread.joinable()) {
528 m_hot_data.run_flag =
false;
529 m_hot_data.error_flag =
true;
535 zero_copy_app_worker::zero_copy_app_worker(doca_dev *dev,
536 doca_comch_connection *comch_conn,
540 m_io_message_region{nullptr},
541 m_io_message_mmap{nullptr},
542 m_io_message_inv{nullptr},
548 m_host_request_tasks{},
549 m_host_response_tasks{},
550 m_storage_request_tasks{},
551 m_storage_response_tasks{},
555 init(dev, comch_conn, task_count, batch_size);
562 zero_copy_app_worker::zero_copy_app_worker(zero_copy_app_worker &&other) noexcept
563 : m_hot_data{std::move(other.m_hot_data)},
564 m_io_message_region{other.m_io_message_region},
565 m_io_message_mmap{other.m_io_message_mmap},
566 m_io_message_inv{other.m_io_message_inv},
567 m_io_message_bufs{std::move(other.m_io_message_bufs)},
568 m_consumer{other.m_consumer},
569 m_producer{other.m_producer},
570 m_rdma_ctrl_ctx{other.m_rdma_ctrl_ctx},
571 m_rdma_data_ctx{other.m_rdma_data_ctx},
572 m_host_request_tasks{std::move(other.m_host_request_tasks)},
573 m_host_response_tasks{std::move(other.m_host_response_tasks)},
574 m_storage_request_tasks{std::move(other.m_storage_request_tasks)},
575 m_storage_response_tasks{std::move(other.m_storage_response_tasks)},
576 m_thread{std::move(other.m_thread)}
578 other.m_io_message_region =
nullptr;
579 other.m_io_message_mmap =
nullptr;
580 other.m_io_message_inv =
nullptr;
581 other.m_consumer =
nullptr;
582 other.m_producer =
nullptr;
583 other.m_rdma_ctrl_ctx = {};
584 other.m_rdma_data_ctx = {};
587 zero_copy_app_worker &zero_copy_app_worker::operator=(zero_copy_app_worker &&other) noexcept
589 if (std::addressof(other) ==
this)
592 m_hot_data = std::move(other.m_hot_data);
593 m_io_message_region = other.m_io_message_region;
594 m_io_message_mmap = other.m_io_message_mmap;
595 m_io_message_inv = other.m_io_message_inv;
596 m_io_message_bufs = std::move(other.m_io_message_bufs);
597 m_consumer = other.m_consumer;
598 m_producer = other.m_producer;
599 m_rdma_ctrl_ctx = other.m_rdma_ctrl_ctx;
600 m_rdma_data_ctx = other.m_rdma_data_ctx;
601 m_host_request_tasks = std::move(other.m_host_request_tasks);
602 m_host_response_tasks = std::move(other.m_host_response_tasks);
603 m_storage_request_tasks = std::move(other.m_storage_request_tasks);
604 m_storage_response_tasks = std::move(other.m_storage_response_tasks);
605 m_thread = std::move(other.m_thread);
607 other.m_io_message_region =
nullptr;
608 other.m_io_message_mmap =
nullptr;
609 other.m_io_message_inv =
nullptr;
610 other.m_consumer =
nullptr;
611 other.m_producer =
nullptr;
612 other.m_rdma_ctrl_ctx = {};
613 other.m_rdma_data_ctx = {};
621 uint8_t
const *blob =
nullptr;
622 size_t blob_size = 0;
626 reinterpret_cast<void const **
>(&blob),
628 std::addressof(rdma_pair.conn));
634 return std::vector<uint8_t>{blob, blob + blob_size};
642 ret =
doca_rdma_connect(rdma_pair.rdma, blob.data(), blob.size(), rdma_pair.conn);
649 doca_error_t zero_copy_app_worker::get_connections_state() const noexcept
653 uint32_t pending_count = 0;
702 void zero_copy_app_worker::stop_processing(
void) noexcept
704 m_hot_data.run_flag =
false;
705 if (m_thread.joinable()) {
710 void zero_copy_app_worker::destroy_comch_objects(
void) noexcept
713 std::vector<doca_task *> tasks;
715 if (m_consumer !=
nullptr) {
716 tasks.reserve(m_host_request_tasks.size());
717 std::transform(std::begin(m_host_request_tasks),
718 std::end(m_host_request_tasks),
719 std::back_inserter(tasks),
726 m_host_request_tasks.clear();
732 m_consumer =
nullptr;
736 if (m_producer !=
nullptr) {
737 tasks.reserve(m_host_response_tasks.size());
738 std::transform(std::begin(m_host_response_tasks),
739 std::end(m_host_response_tasks),
740 std::back_inserter(tasks),
747 m_host_response_tasks.clear();
753 m_producer =
nullptr;
758 void zero_copy_app_worker::create_tasks(uint32_t task_count, uint32_t batch_size, uint32_t remote_consumer_id)
762 auto *buf_addr = m_io_message_region;
763 m_io_message_bufs.reserve((task_count * 2) + batch_size);
764 m_host_request_tasks.reserve(task_count + batch_size);
765 m_host_response_tasks.reserve(task_count);
766 m_storage_request_tasks.reserve(task_count + batch_size);
767 m_storage_request_tasks.reserve(task_count);
769 for (uint32_t ii = 0; ii != (task_count + batch_size); ++ii) {
770 doca_buf *storage_request_buff =
nullptr;
776 &storage_request_buff);
782 m_io_message_bufs.push_back(storage_request_buff);
784 doca_rdma_task_send *rdma_task_send =
nullptr;
786 m_rdma_ctrl_ctx.conn,
787 storage_request_buff,
793 m_storage_request_tasks.push_back(rdma_task_send);
795 doca_comch_consumer_task_post_recv *comch_consumer_task_post_recv =
nullptr;
797 storage_request_buff,
798 &comch_consumer_task_post_recv);
802 m_host_request_tasks.push_back(comch_consumer_task_post_recv);
812 for (uint32_t ii = 0; ii != task_count; ++ii) {
813 doca_buf *storage_recv_buf =
nullptr;
824 m_io_message_bufs.push_back(storage_recv_buf);
826 doca_rdma_task_receive *rdma_task_receive =
nullptr;
834 m_storage_response_tasks.push_back(rdma_task_receive);
836 doca_comch_producer_task_send *comch_producer_task_send;
842 &comch_producer_task_send);
846 m_host_response_tasks.push_back(comch_producer_task_send);
858 void zero_copy_app_worker::prepare_thread_proc(uint32_t core_id)
860 m_thread = std::thread{[
this]() {
863 }
catch (std::exception
const &ex) {
864 DOCA_LOG_ERR(
"Core: %u Exception: %s", m_hot_data.core_idx, ex.what());
865 m_hot_data.error_flag =
true;
866 m_hot_data.run_flag =
false;
869 m_hot_data.core_idx = core_id;
873 void zero_copy_app_worker::start_thread_proc(
void)
877 for (
auto *task : m_host_request_tasks) {
880 DOCA_LOG_ERR(
"Failed to submit initial doca_comch_consumer_task_post_recv task: %s",
886 for (
auto *task : m_storage_response_tasks) {
889 DOCA_LOG_ERR(
"Failed to submit initial doca_rdma_task_receive task: %s",
895 m_hot_data.run_flag =
true;
898 zero_copy_app_worker::hot_data
const &zero_copy_app_worker::get_hot_data(
void)
const noexcept
903 void zero_copy_app_worker::init(doca_dev *dev,
904 doca_comch_connection *comch_conn,
911 m_hot_data.batch_size = batch_size;
914 DOCA_LOG_DBG(
"Allocate comch buffers memory (%zu bytes, aligned to %u byte pages)",
915 raw_io_messages_size,
917 m_io_message_region =
static_cast<uint8_t *
>(
919 if (m_io_message_region ==
nullptr) {
924 reinterpret_cast<char *
>(m_io_message_region),
925 raw_io_messages_size,
947 task_count + batch_size,
948 doca_data{.ptr = std::addressof(m_hot_data)},
949 doca_comch_consumer_task_post_recv_cb,
950 doca_comch_consumer_task_post_recv_error_cb);
955 doca_data{.ptr = std::addressof(m_hot_data)},
956 doca_comch_producer_task_send_cb,
957 doca_comch_producer_task_send_error_cb);
963 doca_data{.ptr = std::addressof(m_hot_data)},
967 doca_rdma_task_receive_cb,
968 doca_rdma_task_receive_error_cb,
975 doca_rdma_task_send_cb,
976 doca_rdma_task_send_error_cb,
977 task_count + batch_size);
989 doca_data{.ptr = std::addressof(m_hot_data)},
997 m_hot_data.run_flag =
false;
998 m_hot_data.error_flag =
false;
999 m_hot_data.pe_hit_count = 0;
1000 m_hot_data.pe_miss_count = 0;
1001 m_hot_data.completed_transaction_count = 0;
1002 m_hot_data.in_flight_transaction_count = 0;
1008 std::vector<doca_task *> tasks;
1010 if (m_rdma_ctrl_ctx.rdma !=
nullptr) {
1012 tasks.reserve(m_storage_request_tasks.size() + m_storage_response_tasks.size());
1013 std::transform(std::begin(m_storage_request_tasks),
1014 std::end(m_storage_request_tasks),
1015 std::back_inserter(tasks),
1017 std::transform(std::begin(m_storage_response_tasks),
1018 std::end(m_storage_response_tasks),
1019 std::back_inserter(tasks),
1034 if (m_rdma_data_ctx.rdma !=
nullptr) {
1047 destroy_comch_objects();
1049 if (m_hot_data.pe !=
nullptr) {
1056 for (
auto *buf : m_io_message_bufs) {
1060 if (m_io_message_inv) {
1071 if (m_io_message_mmap) {
1082 if (m_io_message_region !=
nullptr) {
1087 void zero_copy_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
1091 static_cast<void>(task);
1094 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1103 hot_data->error_flag =
true;
1104 hot_data->run_flag =
false;
1108 void zero_copy_app_worker::doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
1112 static_cast<void>(task);
1113 static_cast<void>(task_user_data);
1115 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1117 if (hot_data->run_flag) {
1118 DOCA_LOG_ERR(
"Failed to complete doca_comch_consumer_task_post_recv");
1119 hot_data->run_flag =
false;
1120 hot_data->error_flag =
true;
1124 void zero_copy_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
1128 static_cast<void>(task);
1130 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1132 auto *storage_response_task =
static_cast<doca_rdma_task_receive *
>(task_user_data.ptr);
1139 hot_data->error_flag =
true;
1140 hot_data->run_flag =
false;
1143 ++(hot_data->completed_transaction_count);
1146 void zero_copy_app_worker::doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
1150 static_cast<void>(task);
1151 static_cast<void>(task_user_data);
1153 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1154 DOCA_LOG_ERR(
"Failed to complete doca_comch_producer_task_send");
1155 hot_data->run_flag =
false;
1156 hot_data->error_flag =
true;
1159 void zero_copy_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
1163 static_cast<void>(task);
1165 auto *hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1167 auto *host_request_task =
static_cast<doca_comch_consumer_task_post_recv *
>(task_user_data.ptr);
1171 auto ret = hot_data->submit_comch_recv_task(host_request_task);
1174 hot_data->error_flag =
true;
1175 hot_data->run_flag =
false;
1179 void zero_copy_app_worker::doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
1183 static_cast<void>(task);
1184 static_cast<void>(task_user_data);
1186 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1187 DOCA_LOG_ERR(
"Failed to complete doca_rdma_task_send");
1188 hot_data->run_flag =
false;
1189 hot_data->error_flag =
true;
1192 void zero_copy_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
1197 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1209 hot_data->run_flag =
false;
1210 hot_data->error_flag =
true;
1214 void zero_copy_app_worker::doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
1218 static_cast<void>(task);
1219 static_cast<void>(task_user_data);
1221 auto *
const hot_data =
static_cast<zero_copy_app_worker::hot_data *
>(ctx_user_data.ptr);
1222 if (hot_data->run_flag) {
1228 DOCA_LOG_ERR(
"Failed to complete doca_rdma_task_send");
1229 hot_data->run_flag =
false;
1230 hot_data->error_flag =
true;
1234 void zero_copy_app_worker::thread_proc()
1236 while (m_hot_data.run_flag ==
false) {
1237 std::this_thread::yield();
1238 if (m_hot_data.error_flag)
1244 while (m_hot_data.run_flag) {
1245 doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1248 while (m_hot_data.error_flag ==
false && m_hot_data.in_flight_transaction_count != 0) {
1249 doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1255 zero_copy_app::~zero_copy_app()
1258 m_storage_control_channel.reset();
1259 m_client_control_channel.reset();
1262 if (m_dev !=
nullptr) {
1268 zero_copy_app::zero_copy_app(zero_copy_app_configuration
const &
cfg)
1272 m_remote_io_mmap{nullptr},
1273 m_client_control_channel{},
1274 m_storage_control_channel{},
1277 m_remote_consumer_ids{},
1280 m_storage_capacity{},
1281 m_storage_block_size{},
1282 m_message_id_counter{},
1288 DOCA_LOG_INFO(
"Open doca_dev: %s", m_cfg.device_id.c_str());
1291 DOCA_LOG_INFO(
"Open doca_dev_rep: %s", m_cfg.representor_id.c_str());
1294 m_client_control_channel =
1297 m_cfg.command_channel_name.c_str(),
1299 new_comch_consumer_callback,
1300 expired_comch_consumer_callback);
1303 m_ctrl_channels.reserve(2);
1304 m_ctrl_channels.push_back(m_client_control_channel.get());
1305 m_ctrl_channels.push_back(m_storage_control_channel.get());
1308 void zero_copy_app::abort(std::string
const &reason)
1314 m_abort_flag =
true;
1317 void zero_copy_app::connect_to_storage(
void)
1319 while (!m_storage_control_channel->is_connected()) {
1320 std::this_thread::sleep_for(std::chrono::milliseconds{100});
1323 "Aborted while connecting to storage"};
1328 void zero_copy_app::wait_for_comch_client_connection(
void)
1330 while (!m_client_control_channel->is_connected()) {
1331 std::this_thread::sleep_for(std::chrono::milliseconds{100});
1334 "Aborted while connecting to client"};
1339 void zero_copy_app::wait_for_and_process_query_storage(
void)
1342 auto const client_request = wait_for_control_message();
1345 std::string err_msg;
1349 m_client_control_channel->send_message(process_query_storage(client_request));
1353 err_msg = ex.what();
1357 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1361 m_client_control_channel->send_message({
1363 client_request.message_id,
1364 client_request.correlation_id,
1365 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1370 void zero_copy_app::wait_for_and_process_init_storage(
void)
1373 auto const client_request = wait_for_control_message();
1376 std::string err_msg;
1380 m_client_control_channel->send_message(process_init_storage(client_request));
1384 err_msg = ex.what();
1388 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1392 m_client_control_channel->send_message({
1394 client_request.message_id,
1395 client_request.correlation_id,
1396 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1401 void zero_copy_app::wait_for_and_process_start_storage(
void)
1404 auto const client_request = wait_for_control_message();
1407 std::string err_msg;
1411 m_client_control_channel->send_message(process_start_storage(client_request));
1415 err_msg = ex.what();
1419 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1423 m_client_control_channel->send_message({
1425 client_request.message_id,
1426 client_request.correlation_id,
1427 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1432 void zero_copy_app::wait_for_and_process_stop_storage(
void)
1435 auto const client_request = wait_for_control_message();
1438 std::string err_msg;
1442 m_client_control_channel->send_message(process_stop_storage(client_request));
1446 err_msg = ex.what();
1450 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1454 m_client_control_channel->send_message({
1456 client_request.message_id,
1457 client_request.correlation_id,
1458 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1463 void zero_copy_app::wait_for_and_process_shutdown(
void)
1466 auto const client_request = wait_for_control_message();
1469 std::string err_msg;
1473 m_client_control_channel->send_message(process_shutdown(client_request));
1477 err_msg = ex.what();
1481 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
1485 m_client_control_channel->send_message({
1487 client_request.message_id,
1488 client_request.correlation_id,
1489 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1494 void zero_copy_app::display_stats(
void)
const
1496 for (
auto const &
stats : m_stats) {
1497 auto const pe_hit_rate_pct =
1498 (
static_cast<double>(
stats.pe_hit_count) /
1499 (
static_cast<double>(
stats.pe_hit_count) +
static_cast<double>(
stats.pe_miss_count))) *
1502 printf(
"+================================================+\n");
1503 printf(
"| Core: %u\n",
stats.core_idx);
1504 printf(
"| Operation count: %lu\n",
stats.operation_count);
1505 printf(
"| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct,
stats.pe_hit_count,
stats.pe_miss_count);
1509 void zero_copy_app::new_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept
1511 auto *
self =
reinterpret_cast<zero_copy_app *
>(user_data);
1512 if (self->m_remote_consumer_ids.capacity() == 0) {
1513 DOCA_LOG_ERR(
"[BUG] no space for new remote consumer ids");
1517 auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids),
id);
1518 if (found == std::end(self->m_remote_consumer_ids)) {
1519 self->m_remote_consumer_ids.push_back(
id);
1520 DOCA_LOG_DBG(
"Connected to remote consumer with id: %u. Consumer count is now: %zu",
1522 self->m_remote_consumer_ids.size());
1524 DOCA_LOG_WARN(
"Ignoring duplicate remote consumer id: %u",
id);
1528 void zero_copy_app::expired_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept
1530 auto *
self =
reinterpret_cast<zero_copy_app *
>(user_data);
1531 auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids),
id);
1532 if (found != std::end(self->m_remote_consumer_ids)) {
1533 self->m_remote_consumer_ids.erase(found);
1534 DOCA_LOG_DBG(
"Disconnected from remote consumer with id: %u. Consumer count is now: %zu",
1536 self->m_remote_consumer_ids.size());
1538 DOCA_LOG_WARN(
"Ignoring disconnect of unexpected remote consumer id: %u",
id);
1545 if (!m_ctrl_messages.empty()) {
1546 auto msg = std::move(m_ctrl_messages.front());
1547 m_ctrl_messages.erase(m_ctrl_messages.begin());
1551 for (
auto *channel : m_ctrl_channels) {
1553 auto *msg = channel->poll();
1555 m_ctrl_messages.push_back(std::move(*msg));
1562 "User aborted the zero_copy_application while waiting on a control message"};
1568 std::chrono::seconds timeout)
1570 auto const expiry = std::chrono::steady_clock::now() + timeout;
1575 "User aborted the zero_copy_application while waiting on a control message"};
1578 for (
auto *channel : m_ctrl_channels) {
1580 auto *msg = channel->poll();
1582 m_ctrl_messages.push_back(std::move(*msg));
1587 std::find_if(std::begin(m_ctrl_messages), std::end(m_ctrl_messages), [mid](
auto const &msg) {
1588 return msg.message_id.value == mid.
value;
1591 if (found != std::end(m_ctrl_messages)) {
1592 auto msg = std::move(*found);
1593 m_ctrl_messages.erase(found);
1597 }
while (expiry > std::chrono::steady_clock::now());
1611 m_storage_control_channel->send_message(storage_request);
1614 auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1617 auto const *
const storage_details =
1619 if (storage_details ==
nullptr) {
1621 "[BUG] Invalid query_storage_response received"};
1624 m_storage_capacity = storage_details->total_size;
1625 m_storage_block_size = storage_details->block_size;
1626 DOCA_LOG_INFO(
"Storage reports capacity of: %lu using a block size of: %u",
1628 m_storage_block_size);
1633 std::move(storage_response.payload),
1640 std::move(storage_response.payload),
1645 "Unexpected " +
to_string(storage_response.message_type) +
" while expecting a " +
1653 auto const *init_storage_details =
1656 if (init_storage_details->core_count > m_cfg.cpu_set.size()) {
1659 "Unable to create " +
std::to_string(m_core_count) +
" threads as only " +
1664 m_remote_consumer_ids.reserve(init_storage_details->core_count);
1666 m_task_count = init_storage_details->task_count;
1667 m_batch_size = init_storage_details->batch_size;
1668 m_core_count = init_storage_details->core_count;
1670 init_storage_details->mmap_export_blob.data(),
1671 init_storage_details->mmap_export_blob.size());
1672 std::vector<uint8_t> mmap_export_blob = [
this]() {
1673 uint8_t
const *reexport_blob =
nullptr;
1674 size_t reexport_blob_size = 0;
1677 reinterpret_cast<void const **
>(&reexport_blob),
1678 &reexport_blob_size);
1683 return std::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size};
1686 DOCA_LOG_INFO(
"Configured storage: %u cores, %u tasks, %u batch_size", m_core_count, m_task_count, m_batch_size);
1693 std::make_unique<storage::control::init_storage_payload>(init_storage_details->task_count,
1694 init_storage_details->batch_size,
1695 init_storage_details->core_count,
1696 std::move(mmap_export_blob)),
1698 m_storage_control_channel->send_message(storage_request);
1701 auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1710 std::move(storage_response.payload),
1717 std::move(storage_response.payload),
1722 "Unexpected " +
to_string(storage_response.message_type) +
" while expecting a " +
1737 m_storage_control_channel->send_message(storage_request);
1740 auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1743 verify_connections_are_ready();
1744 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1745 m_workers[ii].create_tasks(m_task_count, m_batch_size, m_remote_consumer_ids[ii]);
1746 m_workers[ii].start_thread_proc();
1752 std::move(storage_response.payload),
1759 std::move(storage_response.payload),
1764 "Unexpected " +
to_string(storage_response.message_type) +
" while expecting a " +
1779 m_storage_control_channel->send_message(storage_request);
1782 auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1786 m_stats.reserve(m_core_count);
1787 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1788 m_workers[ii].stop_processing();
1789 auto const &hot_data = m_workers[ii].get_hot_data();
1790 m_stats.push_back(thread_stats{
1792 hot_data.pe_hit_count,
1793 hot_data.pe_miss_count,
1794 hot_data.completed_transaction_count,
1796 m_workers[ii].destroy_comch_objects();
1802 std::move(storage_response.payload),
1809 std::move(storage_response.payload),
1814 "Unexpected " +
to_string(storage_response.message_type) +
" while expecting a " +
1823 while (!m_remote_consumer_ids.empty()) {
1824 auto *msg = m_client_control_channel->poll();
1825 DOCA_LOG_DBG(
"Ignoring unexpected %s while processing %s",
1837 m_storage_control_channel->send_message(storage_request);
1840 auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1848 std::move(storage_response.payload),
1855 std::move(storage_response.payload),
1860 "Unexpected " +
to_string(storage_response.message_type) +
" while expecting a " +
1871 m_client_control_channel->get_comch_connection(),
1875 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1878 m_workers[ii].prepare_thread_proc(m_cfg.cpu_set[ii]);
1882 void zero_copy_app::connect_rdma(uint32_t thread_idx,
1886 auto &tctx = m_workers[thread_idx];
1891 std::make_unique<storage::control::rdma_connection_details_payload>(
1894 tctx.get_local_rdma_connection_blob(role)),
1897 m_storage_control_channel->send_message(connect_rdma_request);
1900 auto connect_rdma_response =
1901 wait_for_control_message(connect_rdma_request.message_id, default_control_timeout_seconds);
1905 connect_rdma_response.payload.get());
1906 tctx.connect_rdma(role, remote_details->connection_details);
1909 connect_rdma_response.payload.get());
1914 "Unexpected " +
to_string(connect_rdma_response.message_type) +
" while expecting a " +
1920 void zero_copy_app::verify_connections_are_ready(
void)
1922 uint32_t not_ready_count;
1925 not_ready_count = 0;
1926 if (m_remote_consumer_ids.size() != m_core_count) {
1928 auto *msg = m_client_control_channel->poll();
1929 if (msg !=
nullptr) {
1932 "Unexpected " +
to_string(msg->message_type) +
" while processing " +
1938 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1939 auto const ret = m_workers[ii].get_connections_state();
1949 "Aborted while establishing storage connections"};
1951 }
while (not_ready_count != 0);
1954 void zero_copy_app::destroy_workers(
void) noexcept
1956 if (m_workers !=
nullptr) {
1958 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1959 m_workers[ii].~zero_copy_app_worker();
1962 m_workers =
nullptr;
static void cleanup(struct cache_invalidate_sample_state *state)
static void set_type(io_message_type type, char *buf)
static void set_result(doca_error_t result, char *buf)
T * object_array(size_t object_count, Args &&...args) const
doca_error_t get_doca_error() const noexcept
int main(int argc, char **argv)
DOCA_LOG_REGISTER(ZERO_COPY)
static struct eth_l2_fwd_stats stats
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE struct doca_buf * doca_comch_consumer_task_post_recv_get_buf(const struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE doca_error_t doca_comch_consumer_destroy(struct doca_comch_consumer *consumer)
DOCA_STABLE doca_error_t doca_comch_consumer_task_post_recv_alloc_init(struct doca_comch_consumer *consumer, struct doca_buf *buf, struct doca_comch_consumer_task_post_recv **task)
Allocate and initialize a doca_consumer post receive task.
DOCA_STABLE struct doca_ctx * doca_comch_consumer_as_ctx(struct doca_comch_consumer *consumer)
DOCA_STABLE struct doca_task * doca_comch_consumer_task_post_recv_as_task(struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE struct doca_task * doca_comch_producer_task_send_as_task(struct doca_comch_producer_task_send *task)
DOCA_STABLE doca_error_t doca_comch_producer_destroy(struct doca_comch_producer *producer)
DOCA_STABLE struct doca_ctx * doca_comch_producer_as_ctx(struct doca_comch_producer *producer)
DOCA_STABLE doca_error_t doca_comch_producer_task_send_alloc_init(struct doca_comch_producer *producer, const struct doca_buf *buf, uint8_t *imm_data, uint32_t imm_data_len, uint32_t consumer_id, struct doca_comch_producer_task_send **task)
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
DOCA_STABLE doca_error_t doca_ctx_stop(struct doca_ctx *ctx)
Stops the context allowing reconfiguration.
doca_ctx_states
This enum defines the states of a context.
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_CONNECTION_ABORTED
@ DOCA_ERROR_INVALID_VALUE
@ DOCA_ERROR_INITIALIZATION
@ DOCA_ERROR_CONNECTION_RESET
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
DOCA_STABLE doca_error_t doca_mmap_export_rdma(struct doca_mmap *mmap, const struct doca_dev *dev, const void **export_desc, size_t *export_desc_len)
Compose memory map representation for later import with doca_mmap_create_from_export() for one of the...
doca_task_submit_flag
Flags used when submitting a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_task_submit_ex(struct doca_task *task, uint32_t flags)
Extended version of doca_task_submit.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE void doca_task_set_user_data(struct doca_task *task, union doca_data user_data)
Set user data to a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
@ DOCA_TASK_SUBMIT_FLAG_NONE
@ DOCA_TASK_SUBMIT_FLAG_FLUSH
DOCA_EXPERIMENTAL struct doca_buf * doca_rdma_task_receive_get_dst_buf(const struct doca_rdma_task_receive *task)
This method gets the destination buffer of a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_set_conf(struct doca_rdma *rdma, doca_rdma_task_send_completion_cb_t successful_task_completion_cb, doca_rdma_task_send_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the send tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_destroy(struct doca_rdma *rdma)
Destroy a DOCA RDMA instance.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_receive_as_task(struct doca_rdma_task_receive *task)
This method converts a receive task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_set_conf(struct doca_rdma *rdma, doca_rdma_task_receive_completion_cb_t successful_task_completion_cb, doca_rdma_task_receive_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the receive tasks configuration.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_send_as_task(struct doca_rdma_task_send *task)
This method converts a send task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_allocate_init(struct doca_rdma *rdma, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_receive **task)
This method allocates and initializes a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_export(struct doca_rdma *rdma, const void **local_rdma_conn_details, size_t *local_rdma_conn_details_size, struct doca_rdma_connection **rdma_connection)
Export doca_rdma connection details object The doca_rdma_conn_details are used in doca_rdma_connect()...
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, union doca_data user_data, struct doca_rdma_task_send **task)
This method allocates and initializes a send task.
DOCA_EXPERIMENTAL struct doca_ctx * doca_rdma_as_ctx(struct doca_rdma *rdma)
Convert doca_rdma instance into a generalized context for use with doca core objects.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_connect(struct doca_rdma *rdma, const void *remote_rdma_conn_details, size_t remote_rdma_conn_details_size, struct doca_rdma_connection *rdma_connection)
Connect to remote doca_rdma peer. Can only be called when the ctx is in DOCA_CTX_STATE_STARTING state...
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_READ
@ DOCA_ACCESS_FLAG_PCI_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_WRITE
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
const struct ip_frag_config * cfg
@ create_rdma_connection_request
@ create_rdma_connection_response
std::unique_ptr< storage::control::comch_channel > make_comch_server_control_channel(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::unique_ptr< storage::control::channel > make_tcp_client_control_channel(storage::ip_address const &server_address)
std::string to_string(storage::control::message_type type)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
ip_address parse_ip_v4_address(char const *value)
void uninstall_ctrl_c_handler()
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
static constexpr value_requirement optional_value
void create_doca_logger_backend(void) noexcept
void install_ctrl_c_handler(std::function< void(void)> callback)
void aligned_free(void *memory)
doca_comch_producer * make_comch_producer(doca_comch_connection *conn, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_producer_task_send_completion_cb_t task_cb, doca_comch_producer_task_send_completion_cb_t error_cb)
doca_rdma * make_rdma_context(doca_dev *dev, doca_pe *pe, doca_data ctx_user_data, uint32_t permissions)
static constexpr value_multiplicity multiple_values
void * aligned_alloc(size_t alignment, size_t size)
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
size_t aligned_size(size_t alignment, size_t size)
char * get_buffer_bytes(doca_buf *buf) noexcept
doca_dev_rep * open_representor(doca_dev *dev, std::string const &identifier)
static constexpr value_multiplicity single_value
doca_comch_consumer * make_comch_consumer(doca_comch_connection *conn, doca_mmap *mmap, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_consumer_task_post_recv_completion_cb_t task_cb, doca_comch_consumer_task_post_recv_completion_cb_t error_cb)
constexpr size_t size_of_io_message
constexpr uint32_t cache_line_size
static constexpr value_requirement required_value
uint32_t get_system_page_size(void)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
doca_dev * open_device(std::string const &identifier)
storage::control::correlation_id correlation_id
storage::control::message_id message_id
std::unique_ptr< storage::control::message::payload > payload
Convenience type for representing opaque data.