61 using namespace std::string_literals;
65 auto constexpr app_name =
"doca_storage_initiator_comch";
67 auto constexpr run_type_read_throughout_test =
"read_throughput_test";
68 auto constexpr run_type_write_throughout_test =
"write_throughput_test";
69 auto constexpr run_type_read_write_data_validity_test =
"read_write_data_validity_test";
70 auto constexpr run_type_read_only_data_validity_test =
"read_only_data_validity_test";
72 auto constexpr default_control_timeout_seconds = std::chrono::seconds{5};
73 auto constexpr default_task_count = 64;
74 auto constexpr default_command_channel_name =
"doca_storage_comch";
75 auto constexpr default_run_limit_operation_count = 1'000'000;
76 auto constexpr default_batch_size = 4;
78 static_assert(
sizeof(
void *) == 8,
"Expected a pointer to occupy 8 bytes");
79 static_assert(
sizeof(std::chrono::steady_clock::time_point) == 8,
80 "Expected std::chrono::steady_clock::time_point to occupy 8 bytes");
85 struct initiator_comch_app_configuration {
86 std::vector<uint32_t> core_set = {};
87 std::string device_id = {};
88 std::string command_channel_name = {};
89 std::string storage_plain_content_file = {};
90 std::string run_type = {};
91 std::chrono::seconds control_timeout = {};
92 uint32_t task_count = 0;
93 uint32_t run_limit_operation_count = 0;
94 uint32_t batch_size = 0;
100 struct initiator_comch_app_stats {
101 std::chrono::steady_clock::time_point start_time = {};
102 std::chrono::steady_clock::time_point end_time = {};
103 uint64_t pe_hit_count = 0;
104 uint64_t pe_miss_count = 0;
105 uint64_t operation_count = 0;
106 uint32_t latency_min = 0;
107 uint32_t latency_max = 0;
108 uint32_t latency_mean = 0;
114 struct transaction_context {
116 doca_comch_producer_task_send *request =
nullptr;
118 std::chrono::steady_clock::time_point start_time{};
122 uint16_t refcount = 0;
125 static_assert(
sizeof(transaction_context) == 24,
"Expected transaction_context to occupy 24 bytes");
130 class initiator_comch_worker {
137 struct alignas(storage::cache_line_size) hot_data {
138 uint8_t
const *storage_plain_content;
139 uint8_t *io_region_begin;
140 uint8_t *io_region_end;
143 transaction_context *transactions;
144 uint32_t transactions_size;
145 uint32_t io_block_size;
146 std::chrono::steady_clock::time_point end_time;
147 uint64_t pe_hit_count;
148 uint64_t pe_miss_count;
149 uint64_t completed_transaction_count;
150 uint64_t remaining_tx_ops;
151 uint64_t remaining_rx_ops;
152 uint64_t latency_accumulator;
153 uint32_t latency_min;
154 uint32_t latency_max;
157 std::atomic_bool run_flag;
168 hot_data(hot_data
const &) =
delete;
174 hot_data(hot_data &&other) noexcept;
179 hot_data &operator=(hot_data
const &) =
delete;
186 hot_data &operator=(hot_data &&other) noexcept;
202 void start_transaction(transaction_context &transaction, std::chrono::steady_clock::time_point now);
209 void on_transaction_complete(transaction_context &transaction);
212 "Expected initiator_comch_worker::hot_data to occupy two cache lines");
214 using thread_proc_fn_t = void (*)(initiator_comch_worker::hot_data &hot_data);
219 ~initiator_comch_worker();
224 initiator_comch_worker();
229 initiator_comch_worker(initiator_comch_worker
const &) =
delete;
235 [[maybe_unused]] initiator_comch_worker(initiator_comch_worker &&) noexcept;
240 initiator_comch_worker &operator=(initiator_comch_worker const &) = delete;
246 [[maybe_unused]] initiator_comch_worker &operator=(initiator_comch_worker &&) noexcept;
261 void init(doca_dev *dev,
262 doca_comch_connection *comch_conn,
263 uint8_t const *storage_plain_content,
266 uint8_t *io_region_begin,
267 uint64_t io_region_size,
268 uint32_t io_block_size);
276 void prepare_thread_proc(initiator_comch_worker::thread_proc_fn_t fn,
277 uint32_t run_limit_op_count,
293 [[nodiscard]]
bool are_contexts_ready(
void) const noexcept;
298 void start_thread_proc(
void);
305 [[nodiscard]]
bool is_thread_proc_running(
void) const noexcept;
310 void join_thread_proc(
void);
317 [[nodiscard]] hot_data const &get_hot_data(
void) const noexcept;
324 void destroy_data_path_objects(
void);
328 uint8_t *m_io_message_region;
329 doca_mmap *m_io_message_mmap;
330 doca_buf_inventory *m_io_message_inv;
331 std::vector<doca_buf *> m_io_message_bufs;
332 doca_comch_consumer *m_consumer;
333 doca_comch_producer *m_producer;
334 std::vector<doca_task *> m_io_responses;
335 std::vector<doca_task *> m_io_requests;
336 std::thread m_thread;
350 static
void doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
361 static
void doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
372 static
void doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
383 static
void doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
391 class initiator_comch_app {
396 ~initiator_comch_app();
401 initiator_comch_app() =
delete;
408 explicit initiator_comch_app(initiator_comch_app_configuration
const &
cfg);
413 initiator_comch_app(initiator_comch_app
const &) =
delete;
418 initiator_comch_app(initiator_comch_app &&) noexcept = delete;
423 initiator_comch_app &operator=(initiator_comch_app const &) = delete;
428 initiator_comch_app &operator=(initiator_comch_app &&) noexcept = delete;
435 void abort(std::
string const &reason);
440 void connect_to_storage_service(
void);
445 void query_storage(
void);
450 void init_storage(
void);
455 void prepare_threads(
void);
460 void start_storage(
void);
470 void join_threads(
void);
475 void stop_storage(
void);
480 void display_stats(
void) const;
488 initiator_comch_app_configuration const m_cfg;
489 std::vector<uint8_t> const m_storage_plain_content;
491 uint8_t *m_io_region;
492 doca_mmap *m_io_mmap;
493 std::unique_ptr<
storage::control::comch_channel> m_service_control_channel;
494 std::vector<
storage::control::message> m_ctrl_messages;
495 std::vector<uint32_t> m_remote_consumer_ids;
496 initiator_comch_worker *m_workers;
497 uint64_t m_storage_capacity;
498 uint32_t m_storage_block_size;
499 initiator_comch_app_stats m_stats;
500 uint32_t m_message_id_counter;
501 uint32_t m_correlation_id_counter;
511 static
void new_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept;
520 static
void expired_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept;
530 storage::control::message_id msg_id,
531 std::chrono::seconds timeout);
543 initiator_comch_app_configuration parse_cli_args(
int argc,
char **argv);
554 int main(
int argc,
char **argv)
560 int exit_value = EXIT_SUCCESS;
563 initiator_comch_app app{parse_cli_args(argc, argv)};
565 app.abort(
"User requested abort");
567 app.connect_to_storage_service();
570 app.prepare_threads();
572 auto const run_success = app.run();
578 exit_value = EXIT_FAILURE;
579 fprintf(stderr,
"+================================================+\n");
580 fprintf(stderr,
"| Test failed!!\n");
581 fprintf(stderr,
"+================================================+\n");
584 }
catch (std::exception
const &ex) {
585 fprintf(stderr,
"EXCEPTION: %s\n", ex.what());
605 void print_config(initiator_comch_app_configuration
const &
cfg) noexcept
607 printf(
"initiator_comch_app_configuration: {\n");
608 printf(
"\tcore_set : [");
610 for (
auto cpu :
cfg.core_set) {
618 printf(
"\texecution_strategy : \"%s\",\n",
cfg.run_type.c_str());
619 printf(
"\tdevice : \"%s\",\n",
cfg.device_id.c_str());
620 printf(
"\tcommand_channel_name : \"%s\",\n",
cfg.command_channel_name.c_str());
621 printf(
"\tstorage_plain_content : \"%s\",\n",
cfg.storage_plain_content_file.c_str());
622 printf(
"\ttask_count : %u,\n",
cfg.task_count);
623 printf(
"\tbatch_size : %u,\n",
cfg.batch_size);
624 printf(
"\trun_limit_operation_count : %u,\n",
cfg.run_limit_operation_count);
625 printf(
"\tcontrol_timeout : %u,\n",
static_cast<uint32_t
>(
cfg.control_timeout.count()));
634 void validate_initiator_comch_app_configuration(initiator_comch_app_configuration
const &
cfg)
636 std::vector<std::string> errors;
638 if (
cfg.task_count == 0) {
639 errors.emplace_back(
"Invalid initiator_comch_app_configuration: task-count must not be zero");
642 if (
cfg.control_timeout.count() == 0) {
643 errors.emplace_back(
"Invalid initiator_comch_app_configuration: control-timeout must not be zero");
646 if (
cfg.run_type == run_type_read_write_data_validity_test &&
cfg.core_set.size() != 1) {
647 errors.push_back(
"Invalid initiator_comch_app_configuration: "s +
648 run_type_read_write_data_validity_test +
" Only supports one thread");
651 if (
cfg.run_type == run_type_read_only_data_validity_test &&
cfg.storage_plain_content_file.empty()) {
652 errors.push_back(
"Invalid initiator_comch_app_configuration: "s +
653 run_type_read_only_data_validity_test +
" requires plain data file to be provided");
656 if (!errors.empty()) {
657 for (
auto const &err : errors) {
658 printf(
"%s\n", err.c_str());
661 "Invalid initiator_comch_app_configuration detected"};
674 initiator_comch_app_configuration parse_cli_args(
int argc,
char **argv)
676 initiator_comch_app_configuration config{};
677 config.task_count = default_task_count;
678 config.command_channel_name = default_command_channel_name;
679 config.control_timeout = default_control_timeout_seconds;
680 config.run_limit_operation_count = default_run_limit_operation_count;
681 config.batch_size = default_batch_size;
696 [](
void *
value,
void *
cfg) noexcept {
697 static_cast<initiator_comch_app_configuration *
>(
cfg)->device_id =
698 static_cast<char const *
>(
value);
705 "CPU core to which the process affinity can be set",
708 [](
void *
value,
void *
cfg) noexcept {
709 static_cast<initiator_comch_app_configuration *
>(
cfg)->core_set.push_back(
710 *
static_cast<int *
>(
value));
716 "storage-plain-content",
717 "File containing the plain data that is represented by the storage",
720 [](
void *
value,
void *
cfg) noexcept {
721 static_cast<initiator_comch_app_configuration *
>(
cfg)->storage_plain_content_file =
722 static_cast<char const *
>(
value);
728 "execution-strategy",
729 "Define what to run. One of: read_throughput_test | write_throughput_test | read_write_data_validity_test | read_only_data_validity_test",
732 [](
void *
value,
void *
cfg) noexcept {
733 static_cast<initiator_comch_app_configuration *
>(
cfg)->run_type =
734 static_cast<char const *
>(
value);
742 "run-limit-operation-count",
743 "Run N operations (per thread) then stop. Default: 1000000",
746 [](
void *
value,
void *
cfg) noexcept {
747 static_cast<initiator_comch_app_configuration *
>(
cfg)->run_limit_operation_count =
748 *
static_cast<int *
>(
value);
754 "Number of concurrent tasks (per thread) to use. Default: 64",
757 [](
void *
value,
void *
cfg) noexcept {
758 static_cast<initiator_comch_app_configuration *
>(
cfg)->task_count =
759 *
static_cast<int *
>(
value);
765 "command-channel-name",
766 "Name of the channel used by the doca_comch_client. Default: \"doca_storage_comch\"",
769 [](
void *
value,
void *
cfg) noexcept {
770 static_cast<initiator_comch_app_configuration *
>(
cfg)->command_channel_name =
771 static_cast<char const *
>(
value);
777 "Time (in seconds) to wait while performing control operations. Default: 5",
780 [](
void *
value,
void *
cfg) noexcept {
781 static_cast<initiator_comch_app_configuration *
>(
cfg)->control_timeout =
782 std::chrono::seconds{*
static_cast<int *
>(
value)};
788 "Batch size: Default: 4",
791 [](
void *
value,
void *
cfg) noexcept {
792 static_cast<initiator_comch_app_configuration *
>(
cfg)->batch_size =
793 *
static_cast<int *
>(
value);
803 if (config.batch_size > config.task_count) {
804 config.batch_size = config.task_count;
805 DOCA_LOG_WARN(
"Clamping batch size to maximum value: %u", config.batch_size);
808 print_config(config);
809 validate_initiator_comch_app_configuration(config);
814 class thread_proc_catch_wrapper {
816 ~thread_proc_catch_wrapper() =
default;
817 thread_proc_catch_wrapper() =
delete;
818 explicit thread_proc_catch_wrapper(initiator_comch_worker::hot_data *hot_data) noexcept : m_hot_data{hot_data}
821 thread_proc_catch_wrapper(thread_proc_catch_wrapper
const &) =
delete;
822 thread_proc_catch_wrapper(thread_proc_catch_wrapper &&) noexcept = default;
823 thread_proc_catch_wrapper &operator=(thread_proc_catch_wrapper const &) = delete;
824 thread_proc_catch_wrapper &operator=(thread_proc_catch_wrapper &&) noexcept = default;
826 void operator()(initiator_comch_worker::thread_proc_fn_t fn)
831 std::stringstream ss;
832 ss <<
"[Thread: " << std::this_thread::get_id()
835 m_hot_data->error_flag =
true;
836 m_hot_data->run_flag =
false;
841 initiator_comch_worker::hot_data *m_hot_data;
844 void throughput_thread_proc(initiator_comch_worker::hot_data &hot_data) noexcept
847 while (hot_data.run_flag ==
false) {
848 std::this_thread::yield();
849 if (hot_data.error_flag)
854 auto const initial_task_count =
855 std::min(
static_cast<uint64_t
>(hot_data.transactions_size), hot_data.remaining_tx_ops);
856 for (uint32_t ii = 0; ii != initial_task_count; ++ii)
857 hot_data.start_transaction(hot_data.transactions[ii], std::chrono::steady_clock::now());
860 while (hot_data.run_flag) {
861 doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
865 if (hot_data.error_flag) {
870 hot_data.remaining_rx_ops = hot_data.remaining_rx_ops - hot_data.remaining_tx_ops;
871 while (hot_data.remaining_rx_ops != 0) {
872 doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
876 void write_storage_memory(initiator_comch_worker::hot_data &hot_data, uint8_t
const *expected_memory_content) noexcept
878 auto const io_region_size = hot_data.io_region_end - hot_data.io_region_begin;
881 hot_data.remaining_tx_ops = hot_data.remaining_rx_ops = io_region_size / hot_data.io_block_size;
884 hot_data.io_addr = hot_data.io_region_begin;
885 std::copy(expected_memory_content, expected_memory_content + io_region_size, hot_data.io_region_begin);
888 auto const initial_task_count =
889 std::min(
static_cast<uint64_t
>(hot_data.transactions_size), hot_data.remaining_tx_ops);
890 for (uint32_t ii = 0; ii != initial_task_count; ++ii) {
894 reinterpret_cast<void **
>(&io_request)));
897 hot_data.start_transaction(hot_data.transactions[ii], std::chrono::steady_clock::now());
901 while (hot_data.remaining_rx_ops != 0) {
902 doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
906 if (hot_data.error_flag) {
911 void read_and_validate_storage_memory(initiator_comch_worker::hot_data &hot_data,
912 uint8_t
const *expected_memory_content) noexcept
914 size_t const io_region_size = hot_data.io_region_end - hot_data.io_region_begin;
917 hot_data.remaining_tx_ops = hot_data.remaining_rx_ops = io_region_size / hot_data.io_block_size;
920 auto const initial_task_count =
921 std::min(
static_cast<uint64_t
>(hot_data.transactions_size), hot_data.remaining_tx_ops);
922 for (uint32_t ii = 0; ii != initial_task_count; ++ii) {
926 reinterpret_cast<void **
>(&io_request)));
929 hot_data.start_transaction(hot_data.transactions[ii], std::chrono::steady_clock::now());
933 while (hot_data.remaining_rx_ops != 0) {
934 doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
938 if (hot_data.error_flag) {
943 for (
size_t offset = 0; offset != io_region_size; ++offset) {
944 if (hot_data.io_region_begin[offset] != expected_memory_content[offset]) {
945 DOCA_LOG_ERR(
"Data mismatch @ position %zu: %02x != %02x",
947 hot_data.io_region_begin[offset],
948 expected_memory_content[offset]);
949 hot_data.error_flag =
true;
955 void read_only_data_validity_thread_proc(initiator_comch_worker::hot_data &hot_data) noexcept
958 while (hot_data.run_flag ==
false) {
959 std::this_thread::yield();
960 if (hot_data.error_flag)
964 read_and_validate_storage_memory(hot_data, hot_data.storage_plain_content);
967 void read_write_data_validity_thread_proc(initiator_comch_worker::hot_data &hot_data) noexcept
970 while (hot_data.run_flag ==
false) {
971 std::this_thread::yield();
972 if (hot_data.error_flag)
976 size_t const io_region_size = hot_data.io_region_end - hot_data.io_region_begin;
977 std::vector<uint8_t> write_data;
978 write_data.resize(io_region_size);
979 for (
size_t ii = 0; ii != io_region_size; ++ii) {
980 write_data[ii] =
static_cast<uint8_t
>(ii);
983 write_storage_memory(hot_data, write_data.data());
984 read_and_validate_storage_memory(hot_data, write_data.data());
987 initiator_comch_worker::hot_data::hot_data()
988 : storage_plain_content{nullptr},
989 io_region_begin{nullptr},
990 io_region_end{nullptr},
993 transactions{nullptr},
994 transactions_size{0},
999 completed_transaction_count{0},
1000 remaining_tx_ops{0},
1001 remaining_rx_ops{0},
1002 latency_accumulator{0},
1003 latency_min{std::numeric_limits<uint32_t>::max()},
1012 initiator_comch_worker::hot_data::hot_data(hot_data &&other) noexcept
1013 : storage_plain_content{other.storage_plain_content},
1014 io_region_begin{other.io_region_begin},
1015 io_region_end{other.io_region_end},
1016 io_addr{other.io_addr},
1018 transactions{other.transactions},
1019 transactions_size{other.transactions_size},
1020 io_block_size{other.io_block_size},
1021 end_time{other.end_time},
1022 pe_hit_count{other.pe_hit_count},
1023 pe_miss_count{other.pe_miss_count},
1024 completed_transaction_count{other.completed_transaction_count},
1025 remaining_tx_ops{other.remaining_tx_ops},
1026 remaining_rx_ops{other.remaining_rx_ops},
1027 latency_accumulator{other.latency_accumulator},
1028 latency_min{other.latency_min},
1029 latency_max{other.latency_max},
1030 batch_count{other.batch_count},
1031 batch_size{other.batch_size},
1032 run_flag{other.run_flag.load()},
1033 error_flag{other.error_flag}
1035 other.storage_plain_content =
nullptr;
1037 other.transactions =
nullptr;
1040 initiator_comch_worker::hot_data &initiator_comch_worker::hot_data::operator=(hot_data &&other) noexcept
1042 if (std::addressof(other) ==
this)
1045 storage_plain_content = other.storage_plain_content;
1046 io_region_begin = other.io_region_begin;
1047 io_region_end = other.io_region_end;
1048 io_addr = other.io_addr;
1050 transactions = other.transactions;
1051 transactions_size = other.transactions_size;
1052 io_block_size = other.io_block_size;
1053 end_time = other.end_time;
1054 pe_hit_count = other.pe_hit_count;
1055 pe_miss_count = other.pe_miss_count;
1056 completed_transaction_count = other.completed_transaction_count;
1057 remaining_tx_ops = other.remaining_tx_ops;
1058 remaining_rx_ops = other.remaining_rx_ops;
1059 latency_accumulator = other.latency_accumulator;
1060 latency_min = other.latency_min;
1061 latency_max = other.latency_max;
1062 batch_count = other.batch_count;
1063 batch_size = other.batch_size;
1064 run_flag = other.run_flag.load();
1065 error_flag = other.error_flag;
1067 other.storage_plain_content =
nullptr;
1069 other.transactions =
nullptr;
1074 doca_error_t initiator_comch_worker::hot_data::submit_recv_task(doca_task *task)
1077 if (--batch_count == 0) {
1079 batch_count = batch_size;
1085 void initiator_comch_worker::hot_data::start_transaction(transaction_context &transaction,
1086 std::chrono::steady_clock::time_point now)
1091 transaction.refcount = 2;
1092 transaction.start_time = now;
1098 reinterpret_cast<void **
>(&io_request)));
1100 io_addr += io_block_size;
1101 if (io_addr == io_region_end) {
1102 io_addr = io_region_begin;
1118 void initiator_comch_worker::hot_data::on_transaction_complete(transaction_context &transaction)
1120 auto const now = std::chrono::steady_clock::now();
1121 auto const usecs =
static_cast<uint32_t
>(
1122 std::chrono::duration_cast<std::chrono::microseconds>(now - transaction.start_time).count());
1123 latency_accumulator += usecs;
1124 latency_min = std::min(latency_min, usecs);
1125 latency_max = std::max(latency_max, usecs);
1127 ++completed_transaction_count;
1129 if (remaining_tx_ops) {
1130 start_transaction(transaction, now);
1131 }
else if (remaining_rx_ops == 0) {
1133 end_time = std::chrono::steady_clock::now();
1137 initiator_comch_worker::~initiator_comch_worker()
1142 initiator_comch_worker::initiator_comch_worker()
1144 m_io_message_region{nullptr},
1145 m_io_message_mmap{nullptr},
1146 m_io_message_inv{nullptr},
1147 m_io_message_bufs{},
1148 m_consumer{nullptr},
1149 m_producer{nullptr},
1156 initiator_comch_worker::initiator_comch_worker(initiator_comch_worker &&other) noexcept
1157 : m_hot_data{std::move(other.m_hot_data)},
1158 m_io_message_region{other.m_io_message_region},
1159 m_io_message_mmap{other.m_io_message_mmap},
1160 m_io_message_inv{other.m_io_message_inv},
1161 m_io_message_bufs{std::move(other.m_io_message_bufs)},
1162 m_consumer{other.m_consumer},
1163 m_producer{other.m_producer},
1164 m_io_responses{std::move(other.m_io_responses)},
1165 m_io_requests{std::move(other.m_io_requests)},
1166 m_thread{std::move(other.m_thread)}
1168 other.m_io_message_region =
nullptr;
1169 other.m_io_message_mmap =
nullptr;
1170 other.m_io_message_inv =
nullptr;
1171 other.m_consumer =
nullptr;
1172 other.m_producer =
nullptr;
1175 initiator_comch_worker &initiator_comch_worker::operator=(initiator_comch_worker &&other) noexcept
1177 if (std::addressof(other) ==
this)
1180 m_hot_data = std::move(other.m_hot_data);
1181 m_io_message_region = other.m_io_message_region;
1182 m_io_message_mmap = other.m_io_message_mmap;
1183 m_io_message_inv = other.m_io_message_inv;
1184 m_io_message_bufs = std::move(other.m_io_message_bufs);
1185 m_consumer = other.m_consumer;
1186 m_producer = other.m_producer;
1187 m_io_responses = std::move(other.m_io_responses);
1188 m_io_requests = std::move(other.m_io_requests);
1189 m_thread = std::move(other.m_thread);
1191 other.m_io_message_region =
nullptr;
1192 other.m_io_message_mmap =
nullptr;
1193 other.m_io_message_inv =
nullptr;
1194 other.m_consumer =
nullptr;
1195 other.m_producer =
nullptr;
1200 void initiator_comch_worker::init(doca_dev *dev,
1201 doca_comch_connection *comch_conn,
1202 uint8_t
const *storage_plain_content,
1203 uint32_t task_count,
1204 uint32_t batch_size,
1205 uint8_t *io_region_begin,
1206 uint64_t io_region_size,
1207 uint32_t io_block_size)
1212 m_hot_data.storage_plain_content = storage_plain_content;
1213 m_hot_data.io_addr = io_region_begin;
1214 m_hot_data.io_region_begin = io_region_begin;
1215 m_hot_data.io_region_end = io_region_begin + io_region_size;
1216 m_hot_data.io_block_size = io_block_size;
1225 m_hot_data.transactions_size = task_count;
1226 m_hot_data.batch_size = batch_size;
1229 DOCA_LOG_DBG(
"Allocate comch buffers memory (%zu bytes, aligned to %u byte pages)",
1230 raw_io_messages_size,
1232 m_io_message_region =
static_cast<uint8_t *
>(
1234 if (m_io_message_region ==
nullptr) {
1239 m_hot_data.transactions =
1241 }
catch (std::exception
const &ex) {
1243 "Failed to allocate transaction contexts memory: "s + ex.what()};
1247 reinterpret_cast<char *
>(m_io_message_region),
1248 raw_io_messages_size,
1270 doca_data{.ptr = std::addressof(m_hot_data)},
1271 doca_comch_producer_task_send_cb,
1272 doca_comch_producer_task_send_error_cb);
1273 m_io_requests.reserve(task_count);
1278 task_count + batch_size,
1279 doca_data{.ptr = std::addressof(m_hot_data)},
1280 doca_comch_consumer_task_post_recv_cb,
1281 doca_comch_consumer_task_post_recv_error_cb);
1282 m_io_responses.reserve(task_count + batch_size);
1285 void initiator_comch_worker::prepare_thread_proc(initiator_comch_worker::thread_proc_fn_t fn,
1287 uint32_t run_limit_op_count,
1290 m_hot_data.run_flag =
false;
1291 m_hot_data.error_flag =
false;
1292 m_hot_data.pe_hit_count = 0;
1293 m_hot_data.pe_miss_count = 0;
1294 m_hot_data.completed_transaction_count = 0;
1295 m_hot_data.remaining_tx_ops = run_limit_op_count;
1296 m_hot_data.remaining_rx_ops = run_limit_op_count;
1298 m_thread = std::thread{thread_proc_catch_wrapper{std::addressof(m_hot_data)}, fn};
1305 uint8_t *io_addr = m_hot_data.io_region_begin;
1306 uint8_t *msg_addr = m_io_message_region;
1312 for (uint32_t ii = 0; ii != m_hot_data.transactions_size + m_hot_data.batch_size; ++ii) {
1313 doca_buf *consumer_buf;
1314 doca_comch_consumer_task_post_recv *consumer_task;
1325 m_io_message_bufs.push_back(consumer_buf);
1336 for (uint32_t ii = 0; ii != m_hot_data.transactions_size; ++ii) {
1337 doca_buf *producer_buf;
1338 doca_comch_producer_task_send *producer_task;
1349 m_io_message_bufs.push_back(producer_buf);
1350 auto *
const io_message =
reinterpret_cast<char *
>(msg_addr);
1365 doca_data{.
ptr = std::addressof(m_hot_data.transactions[ii])}));
1367 m_hot_data.transactions[ii].refcount = 0;
1368 m_hot_data.transactions[ii].request = producer_task;
1377 io_addr += m_hot_data.io_block_size;
1380 for (
auto *task : m_io_responses) {
1388 bool initiator_comch_worker::are_contexts_ready(
void)
const noexcept
1393 return consumer_running && producer_running;
1396 void initiator_comch_worker::start_thread_proc(
void)
1398 m_hot_data.run_flag =
true;
1401 bool initiator_comch_worker::is_thread_proc_running(
void)
const noexcept
1403 return m_hot_data.run_flag;
1406 void initiator_comch_worker::join_thread_proc(
void)
1408 if (m_thread.joinable())
1412 initiator_comch_worker::hot_data
const &initiator_comch_worker::get_hot_data(
void)
const noexcept
1417 void initiator_comch_worker::destroy_data_path_objects(
void)
1420 if (m_consumer !=
nullptr) {
1423 m_io_responses.clear();
1429 m_consumer =
nullptr;
1435 if (m_producer !=
nullptr) {
1438 m_io_requests.clear();
1444 m_producer =
nullptr;
1450 if (m_hot_data.pe !=
nullptr) {
1453 m_hot_data.pe =
nullptr;
1464 if (m_thread.joinable()) {
1465 m_hot_data.run_flag =
false;
1466 m_hot_data.error_flag =
true;
1470 destroy_data_path_objects();
1472 for (
auto *buf : m_io_message_bufs) {
1476 if (m_io_message_inv) {
1483 m_io_message_inv =
nullptr;
1489 if (m_io_message_mmap) {
1496 m_io_message_mmap =
nullptr;
1502 if (m_hot_data.transactions !=
nullptr) {
1504 m_hot_data.transactions =
nullptr;
1507 if (m_io_message_region !=
nullptr) {
1509 m_io_message_region =
nullptr;
1513 void initiator_comch_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
1517 static_cast<void>(task_user_data);
1519 auto *
const hot_data =
static_cast<initiator_comch_worker::hot_data *
>(ctx_user_data.ptr);
1522 static_cast<void>(
doca_buf_get_data(buf,
reinterpret_cast<void **
>(&io_message)));
1524 if (correlation_id > hot_data->transactions_size) {
1525 DOCA_LOG_ERR(
"Received storage response with invalid async id: %u", correlation_id);
1526 hot_data->run_flag =
false;
1527 hot_data->error_flag =
true;
1531 if (--(hot_data->transactions[correlation_id].refcount) == 0)
1532 hot_data->on_transaction_complete(hot_data->transactions[correlation_id]);
1538 hot_data->run_flag =
false;
1539 hot_data->error_flag =
true;
1543 void initiator_comch_worker::doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
1547 static_cast<void>(task);
1548 static_cast<void>(task_user_data);
1550 auto *
const hot_data =
static_cast<initiator_comch_worker::hot_data *
>(ctx_user_data.ptr);
1552 if (hot_data->run_flag) {
1553 DOCA_LOG_ERR(
"Failed to complete doca_comch_consumer_task_post_recv");
1554 hot_data->run_flag =
false;
1555 hot_data->error_flag =
true;
1559 void initiator_comch_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
1563 static_cast<void>(task);
1565 auto &transaction = *
static_cast<transaction_context *
>(task_user_data.ptr);
1566 if (--(transaction.refcount) == 0)
1567 static_cast<initiator_comch_worker::hot_data *
>(ctx_user_data.ptr)->on_transaction_complete(transaction);
1570 void initiator_comch_worker::doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
1574 static_cast<void>(task);
1575 static_cast<void>(task_user_data);
1577 auto *
const hot_data =
static_cast<initiator_comch_worker::hot_data *
>(ctx_user_data.ptr);
1578 DOCA_LOG_ERR(
"Failed to complete doca_comch_producer_task_send");
1579 hot_data->run_flag =
false;
1580 hot_data->error_flag =
true;
1583 initiator_comch_app::~initiator_comch_app()
1587 if (m_workers !=
nullptr) {
1588 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1589 m_workers[ii].~initiator_comch_worker();
1594 if (m_io_mmap !=
nullptr) {
1606 if (m_io_region !=
nullptr) {
1610 m_service_control_channel.reset();
1612 if (m_dev !=
nullptr) {
1618 std::vector<uint8_t> try_load_file(std::string
const &file_name,
bool empty_file_is_disallowed)
1620 std::vector<uint8_t>
result;
1623 }
catch (std::exception
const &) {
1624 if (empty_file_is_disallowed)
1631 initiator_comch_app::initiator_comch_app(initiator_comch_app_configuration
const &
cfg)
1633 m_storage_plain_content{
1634 try_load_file(
cfg.storage_plain_content_file,
cfg.run_type == run_type_read_only_data_validity_test)},
1636 m_io_region{nullptr},
1638 m_service_control_channel{},
1640 m_remote_consumer_ids{},
1642 m_storage_capacity{0},
1643 m_storage_block_size{0},
1645 m_message_id_counter{},
1646 m_correlation_id_counter{0},
1649 DOCA_LOG_INFO(
"Open doca_dev: %s", m_cfg.device_id.c_str());
1651 m_service_control_channel =
1653 m_cfg.command_channel_name.c_str(),
1655 new_comch_consumer_callback,
1656 expired_comch_consumer_callback);
1659 void initiator_comch_app::abort(std::string
const &reason)
1665 m_abort_flag =
true;
1668 void initiator_comch_app::connect_to_storage_service(
void)
1670 auto const expiry = std::chrono::steady_clock::now() + m_cfg.control_timeout;
1672 std::this_thread::sleep_for(std::chrono::milliseconds{100});
1674 if (m_service_control_channel->is_connected()) {
1678 if (std::chrono::steady_clock::now() > expiry) {
1681 "Timed out trying to connect to storage",
1688 void initiator_comch_app::query_storage(
void)
1693 m_service_control_channel->send_message(
1698 default_control_timeout_seconds);
1699 auto const *storage_details =
1701 if (storage_details ==
nullptr) {
1705 m_storage_capacity = storage_details->total_size;
1706 m_storage_block_size = storage_details->block_size;
1707 DOCA_LOG_INFO(
"Storage reports capacity of: %lu using a block size of: %u",
1709 m_storage_block_size);
1711 if (m_cfg.run_type == run_type_read_only_data_validity_test) {
1712 if (m_storage_capacity != m_storage_plain_content.size()) {
1715 "Read only validation test requires that the provided plain data is the same size as the storage capacity"};
1720 void initiator_comch_app::init_storage(
void)
1723 auto const core_count =
static_cast<uint32_t
>(m_cfg.core_set.size());
1724 uint8_t
const *plain_content = m_storage_plain_content.empty() ? nullptr : m_storage_plain_content.data();
1725 auto const storage_block_count =
static_cast<uint32_t
>(m_storage_capacity / m_storage_block_size);
1726 auto const per_thread_block_count = storage_block_count / core_count;
1727 auto per_thread_block_count_remainder = storage_block_count % core_count;
1729 m_remote_consumer_ids.reserve(core_count);
1733 reinterpret_cast<char *
>(m_io_region),
1738 auto mmap_details = [
this]() {
1746 return std::vector<uint8_t>(
static_cast<uint8_t
const *
>(data),
1747 static_cast<uint8_t
const *
>(data) +
len);
1753 auto const remote_per_thread_block_count =
1754 std::min(m_cfg.task_count,
1755 per_thread_block_count_remainder == 0 ? per_thread_block_count : per_thread_block_count + 1);
1756 m_service_control_channel->send_message({
1760 std::make_unique<storage::control::init_storage_payload>(remote_per_thread_block_count,
1763 std::move(mmap_details)),
1766 DOCA_LOG_INFO(
"Init storage to use using %u cores with %u tasks each",
1768 remote_per_thread_block_count);
1772 default_control_timeout_seconds));
1776 auto *this_thread_io_region_begin = m_io_region;
1777 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1778 auto this_thread_block_count = per_thread_block_count;
1779 if (per_thread_block_count_remainder != 0) {
1780 ++this_thread_block_count;
1781 --per_thread_block_count_remainder;
1783 auto const this_thread_task_count = std::min(m_cfg.task_count, this_thread_block_count);
1784 auto const this_thread_storage_capacity = this_thread_block_count * m_storage_block_size;
1786 m_workers[ii].init(m_dev,
1787 m_service_control_channel->get_comch_connection(),
1789 this_thread_task_count,
1791 this_thread_io_region_begin,
1792 this_thread_storage_capacity,
1793 m_storage_block_size);
1795 this_thread_io_region_begin += this_thread_storage_capacity;
1798 DOCA_LOG_INFO(
"Wait for remote objects to be finish async init...");
1799 auto const expiry = std::chrono::steady_clock::now() + m_cfg.control_timeout;
1801 auto const *msg = m_service_control_channel->poll();
1802 if (msg !=
nullptr) {
1804 "Received unexpected " +
to_string(msg->message_type) +
1805 "while initialising"};
1808 if (std::chrono::steady_clock::now() > expiry) {
1810 "Timed out waiting for remote objects to start"};
1813 auto const ready_ctx_count = std::accumulate(m_workers,
1814 m_workers + m_cfg.core_set.size(),
1816 [](uint32_t total, initiator_comch_worker &worker) {
1817 return total + worker.are_contexts_ready();
1820 if (m_remote_consumer_ids.size() == m_cfg.core_set.size() && ready_ctx_count == m_cfg.core_set.size()) {
1827 void initiator_comch_app::prepare_threads(
void)
1829 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1831 auto &tctx = m_workers[ii];
1833 if (m_cfg.run_type == run_type_read_throughout_test) {
1835 tctx.prepare_thread_proc(throughput_thread_proc,
1836 m_cfg.run_limit_operation_count,
1837 m_cfg.core_set[ii]);
1838 }
else if (m_cfg.run_type == run_type_write_throughout_test) {
1840 tctx.prepare_thread_proc(throughput_thread_proc,
1841 m_cfg.run_limit_operation_count,
1842 m_cfg.core_set[ii]);
1843 }
else if (m_cfg.run_type == run_type_read_only_data_validity_test) {
1845 tctx.prepare_thread_proc(read_only_data_validity_thread_proc,
1846 m_cfg.run_limit_operation_count,
1847 m_cfg.core_set[ii]);
1848 }
else if (m_cfg.run_type == run_type_read_write_data_validity_test) {
1850 tctx.prepare_thread_proc(read_write_data_validity_thread_proc,
1851 m_cfg.run_limit_operation_count,
1852 m_cfg.core_set[ii]);
1857 tctx.prepare_tasks(initial_op_type, m_remote_consumer_ids[ii]);
1861 void initiator_comch_app::start_storage(
void)
1866 m_service_control_channel->send_message(
1871 default_control_timeout_seconds));
1877 m_stats.start_time = std::chrono::steady_clock::now();
1878 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1879 auto &tctx = m_workers[ii];
1880 tctx.start_thread_proc();
1885 std::this_thread::sleep_for(std::chrono::milliseconds{200});
1886 auto const running_workers = std::accumulate(m_workers,
1887 m_workers + m_cfg.core_set.size(),
1889 [](uint32_t total,
auto const &tctx) {
1890 return total + tctx.is_thread_proc_running();
1893 if (running_workers == 0)
1898 uint64_t latency_acc = 0;
1899 m_stats.end_time = m_stats.start_time;
1900 m_stats.operation_count = 0;
1901 m_stats.latency_max = 0;
1902 m_stats.latency_min = std::numeric_limits<uint32_t>::max();
1903 bool any_error =
false;
1904 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1905 auto const &hot_data = m_workers[ii].get_hot_data();
1906 if (hot_data.error_flag)
1909 m_stats.end_time = std::max(m_stats.end_time, hot_data.end_time);
1910 m_stats.operation_count += hot_data.completed_transaction_count;
1911 m_stats.latency_min = std::min(m_stats.latency_min, hot_data.latency_min);
1912 m_stats.latency_max = std::max(m_stats.latency_max, hot_data.latency_max);
1913 m_stats.pe_hit_count += hot_data.pe_hit_count;
1914 m_stats.pe_miss_count += hot_data.pe_miss_count;
1916 latency_acc += hot_data.latency_accumulator;
1919 if (m_stats.operation_count != 0) {
1920 m_stats.latency_mean = latency_acc / m_stats.operation_count;
1922 m_stats.latency_mean = 0;
1925 return any_error ==
false;
1928 void initiator_comch_app::join_threads(
void)
1930 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1931 m_workers[ii].join_thread_proc();
1935 void initiator_comch_app::stop_storage(
void)
1941 m_service_control_channel->send_message(
1946 default_control_timeout_seconds));
1950 void initiator_comch_app::display_stats(
void)
const
1952 auto duration_secs_float = std::chrono::duration<double>{m_stats.end_time - m_stats.start_time}.count();
1953 auto const bytes = uint64_t{m_stats.operation_count} * m_storage_block_size;
1954 auto const GiBs =
static_cast<double>(bytes) / (1024. * 1024. * 1024.);
1955 auto const miops = (
static_cast<double>(m_stats.operation_count) / 1'000'000.) / duration_secs_float;
1956 auto const pe_hit_rate_pct =
1957 (
static_cast<double>(m_stats.pe_hit_count) /
1958 (
static_cast<double>(m_stats.pe_hit_count) +
static_cast<double>(m_stats.pe_miss_count))) *
1961 printf(
"+================================================+\n");
1962 printf(
"| Stats\n");
1963 printf(
"+================================================+\n");
1964 printf(
"| Duration (seconds): %2.06lf\n", duration_secs_float);
1965 printf(
"| Operation count: %lu\n", m_stats.operation_count);
1966 printf(
"| Data rate: %.03lf GiB/s\n", GiBs / duration_secs_float);
1967 printf(
"| IO rate: %.03lf MIOP/s\n", miops);
1968 printf(
"| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct, m_stats.pe_hit_count, m_stats.pe_miss_count);
1969 printf(
"| Latency:\n");
1970 printf(
"| \tMin: %uus\n", m_stats.latency_min);
1971 printf(
"| \tMax: %uus\n", m_stats.latency_max);
1972 printf(
"| \tMean: %uus\n", m_stats.latency_mean);
1973 printf(
"+================================================+\n");
1976 void initiator_comch_app::shutdown(
void)
1981 for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1982 m_workers[ii].destroy_data_path_objects();
1988 m_service_control_channel->send_message(
1993 default_control_timeout_seconds));
1995 while (!m_remote_consumer_ids.empty()) {
1996 auto *msg = m_service_control_channel->poll();
1997 if (msg !=
nullptr) {
2005 void initiator_comch_app::new_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept
2007 auto *
self =
reinterpret_cast<initiator_comch_app *
>(user_data);
2008 if (self->m_remote_consumer_ids.capacity() == 0) {
2009 DOCA_LOG_ERR(
"[BUG] no space for new remote consumer ids");
2013 auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids),
id);
2014 if (found == std::end(self->m_remote_consumer_ids)) {
2015 self->m_remote_consumer_ids.push_back(
id);
2016 DOCA_LOG_DBG(
"Connected to remote consumer with id: %u. Consumer count is now: %zu",
2018 self->m_remote_consumer_ids.size());
2020 DOCA_LOG_WARN(
"Ignoring duplicate remote consumer id: %u",
id);
2024 void initiator_comch_app::expired_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept
2026 auto *
self =
reinterpret_cast<initiator_comch_app *
>(user_data);
2027 auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids),
id);
2028 if (found != std::end(self->m_remote_consumer_ids)) {
2029 self->m_remote_consumer_ids.erase(found);
2030 DOCA_LOG_DBG(
"Disconnected from remote consumer with id: %u. Consumer count is now: %zu",
2032 self->m_remote_consumer_ids.size());
2034 DOCA_LOG_WARN(
"Ignoring disconnect of unexpected remote consumer id: %u",
id);
2040 std::chrono::seconds timeout)
2042 auto expiry_time = std::chrono::steady_clock::now() + timeout;
2045 auto *msg = m_service_control_channel->poll();
2047 m_ctrl_messages.push_back(std::move(*msg));
2051 auto iter = std::find_if(std::begin(m_ctrl_messages), std::end(m_ctrl_messages), [msg_id](
auto &msg) {
2052 return msg.message_id == msg_id;
2055 if (iter == std::end(m_ctrl_messages))
2059 auto response = std::move(*iter);
2060 m_ctrl_messages.erase(iter);
2064 auto const *
const error_details =
2066 response.payload.get());
2069 " failed!: " + error_details->message};
2073 if (response.message_type !=
type) {
2075 "Received unexpected " +
to_string(response.message_type) +
2083 }
while (std::chrono::steady_clock::now() < expiry_time);
static void cleanup(struct cache_invalidate_sample_state *state)
static doca_error_t run(struct cache_invalidate_sample_state *state)
static uint32_t get_correlation_id(char const *buf)
static void set_user_data(doca_data user_data, char *buf)
static void set_correlation_id(uint32_t correlation_id, char *buf)
static void set_type(io_message_type type, char *buf)
static void set_io_address(uint64_t io_address, char *buf)
static void set_io_size(uint32_t io_size, char *buf)
static void set_remote_offset(uint32_t remote_offset, char *buf)
T * object_array(size_t object_count, Args &&...args) const
doca_error_t get_doca_error() const noexcept
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
static doca_error_t doca_buf_inventory_buf_get_by_data(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *data, size_t data_len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by data & data_len a...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_get_data(const struct doca_buf *buf, void **data)
Get the buffer's data.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE struct doca_buf * doca_comch_consumer_task_post_recv_get_buf(const struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE doca_error_t doca_comch_consumer_destroy(struct doca_comch_consumer *consumer)
DOCA_STABLE doca_error_t doca_comch_consumer_task_post_recv_alloc_init(struct doca_comch_consumer *consumer, struct doca_buf *buf, struct doca_comch_consumer_task_post_recv **task)
Allocate and initialize a doca_consumer post receive task.
DOCA_STABLE struct doca_ctx * doca_comch_consumer_as_ctx(struct doca_comch_consumer *consumer)
DOCA_STABLE struct doca_task * doca_comch_consumer_task_post_recv_as_task(struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE const struct doca_buf * doca_comch_producer_task_send_get_buf(const struct doca_comch_producer_task_send *task)
DOCA_STABLE struct doca_task * doca_comch_producer_task_send_as_task(struct doca_comch_producer_task_send *task)
DOCA_STABLE doca_error_t doca_comch_producer_destroy(struct doca_comch_producer *producer)
DOCA_STABLE struct doca_ctx * doca_comch_producer_as_ctx(struct doca_comch_producer *producer)
DOCA_STABLE doca_error_t doca_comch_producer_task_send_alloc_init(struct doca_comch_producer *producer, const struct doca_buf *buf, uint8_t *imm_data, uint32_t imm_data_len, uint32_t consumer_id, struct doca_comch_producer_task_send **task)
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_INVALID_VALUE
@ DOCA_ERROR_NOT_SUPPORTED
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_export_pci(struct doca_mmap *mmap, const struct doca_dev *dev, const void **export_desc, size_t *export_desc_len)
Compose memory map representation for later import with doca_mmap_create_from_export() for one of the...
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
doca_task_submit_flag
Flags used when submitting a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_task_submit_ex(struct doca_task *task, uint32_t flags)
Extended version of doca_task_submit.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE void doca_task_set_user_data(struct doca_task *task, union doca_data user_data)
Set user data to a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
@ DOCA_TASK_SUBMIT_FLAG_NONE
@ DOCA_TASK_SUBMIT_FLAG_FLUSH
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_READ
@ DOCA_ACCESS_FLAG_PCI_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_WRITE
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
int main(int argc, char **argv)
DOCA_LOG_REGISTER(INITIATOR_COMCH)
const struct ip_frag_config * cfg
std::unique_ptr< storage::control::comch_channel > make_comch_client_control_channel(doca_dev *dev, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::string to_string(storage::control::message_type type)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
void uninstall_ctrl_c_handler()
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
static constexpr value_requirement optional_value
void create_doca_logger_backend(void) noexcept
void install_ctrl_c_handler(std::function< void(void)> callback)
void aligned_free(void *memory)
doca_comch_producer * make_comch_producer(doca_comch_connection *conn, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_producer_task_send_completion_cb_t task_cb, doca_comch_producer_task_send_completion_cb_t error_cb)
static constexpr value_multiplicity multiple_values
std::vector< uint8_t > load_file_bytes(std::string const &file_name)
void * aligned_alloc(size_t alignment, size_t size)
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
size_t aligned_size(size_t alignment, size_t size)
static constexpr value_multiplicity single_value
doca_comch_consumer * make_comch_consumer(doca_comch_connection *conn, doca_mmap *mmap, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_consumer_task_post_recv_completion_cb_t task_cb, doca_comch_consumer_task_post_recv_completion_cb_t error_cb)
constexpr size_t size_of_io_message
constexpr uint32_t cache_line_size
bool is_ctx_running(doca_ctx *ctx) noexcept
static constexpr value_requirement required_value
uint32_t get_system_page_size(void)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
doca_dev * open_device(std::string const &identifier)
Convenience type for representing opaque data.