63 using namespace std::string_literals;
66 auto constexpr app_name =
"doca_storage_comch_to_rdma_gga_offload";
68 auto constexpr default_control_timeout_seconds = std::chrono::seconds{5};
69 auto constexpr default_command_channel_name =
"doca_storage_comch";
71 static_assert(
sizeof(
void *) == 8,
"Expected a pointer to occupy 8 bytes");
73 enum class connection_role : uint8_t {
80 template <
typename T,
size_t N>
81 class per_connection_t {
83 T &operator[](connection_role role)
86 if (
static_cast<uint8_t
>(role) > N) {
87 throw std::range_error{
std::to_string(
static_cast<uint8_t
>(role)) +
" exceeds range " +
91 return m_items[
static_cast<uint8_t
>(role)];
94 T
const &operator[](connection_role role)
const
97 if (
static_cast<uint8_t
>(role) > N) {
98 throw std::range_error{
std::to_string(
static_cast<uint8_t
>(role)) +
" exceeds range " +
102 return m_items[
static_cast<uint8_t
>(role)];
107 return m_items.data();
112 return m_items.data() + m_items.size();
115 T
const *begin() const noexcept
117 return m_items.data();
120 T
const *end() const noexcept
122 return m_items.data() + m_items.size();
126 std::array<T, N> m_items;
129 template <
typename T>
130 using per_ctrl_connection = per_connection_t<T, 4>;
132 template <
typename T>
133 using per_storage_connection = per_connection_t<T, 3>;
135 struct gga_offload_app_configuration {
136 std::vector<uint32_t> cpu_set = {};
137 std::string device_id = {};
138 std::string representor_id = {};
139 std::string command_channel_name = {};
140 std::chrono::seconds control_timeout = {};
141 per_storage_connection<storage::ip_address> storage_server_address = {};
142 std::string ec_matrix_type = {};
143 uint32_t recover_freq = {};
146 struct thread_stats {
147 uint32_t core_idx = 0;
148 uint64_t pe_hit_count = 0;
149 uint64_t pe_miss_count = 0;
150 uint64_t operation_count = 0;
151 uint64_t recovery_count = 0;
154 enum class transaction_mode : uint8_t {
174 class gga_offload_app_worker {
176 struct alignas(storage::cache_line_size) transaction_context {
178 uint32_t remaining_op_count;
179 per_storage_connection<char *> io_message;
180 per_storage_connection<doca_rdma_task_send *> requests;
181 per_storage_connection<doca_rdma_task_receive *> responses;
182 doca_ec_task_recover *ec_recover_task;
183 doca_compress_task_decompress_lz4_stream *decompress_task;
184 doca_comch_producer_task_send *host_response_task;
185 doca_comch_consumer_task_post_recv *host_request_task;
188 transaction_mode mode;
192 "Expected thread_context::transaction_context to occupy two cache lines");
194 struct alignas(storage::cache_line_size) hot_data {
196 uint64_t remote_memory_start_addr;
197 uint64_t local_memory_start_addr;
198 uint64_t storage_capacity;
199 uint64_t pe_hit_count;
200 uint64_t pe_miss_count;
201 uint64_t recovery_flow_count;
202 uint64_t completed_transaction_count;
203 transaction_context *transactions;
204 uint32_t in_flight_transaction_count;
206 uint32_t half_block_size;
209 uint16_t recover_drop_count;
210 uint16_t recover_drop_freq;
211 std::atomic_bool run_flag;
216 hot_data(hot_data
const &other) =
delete;
218 hot_data(hot_data &&other) noexcept;
220 hot_data &operator=(hot_data
const &other) =
delete;
222 hot_data &operator=(hot_data &&other) noexcept;
224 doca_error_t start_transaction(doca_comch_consumer_task_post_recv *task,
char const *io_message);
226 void process_result(gga_offload_app_worker::transaction_context &transaction);
228 void start_decompress(gga_offload_app_worker::transaction_context &transaction);
230 void start_recover(gga_offload_app_worker::transaction_context &transaction);
234 "Expected thread_context::hot_data to occupy two cache lines");
236 ~gga_offload_app_worker();
238 gga_offload_app_worker() =
delete;
240 gga_offload_app_worker(doca_dev *dev,
241 doca_comch_connection *comch_conn,
244 std::string
const &ec_matrix_type,
245 uint32_t recover_drop_freq);
247 gga_offload_app_worker(gga_offload_app_worker
const &) =
delete;
249 [[maybe_unused]] gga_offload_app_worker(gga_offload_app_worker &&) noexcept;
251 gga_offload_app_worker &operator=(gga_offload_app_worker const &) = delete;
253 [[maybe_unused]] gga_offload_app_worker &operator=(gga_offload_app_worker &&) noexcept;
255 std::vector<uint8_t> get_local_rdma_connection_blob(connection_role conn_role,
258 void connect_rdma(connection_role conn_role,
260 std::vector<uint8_t> const &blob);
264 void stop_processing(
void) noexcept;
266 void destroy_comch_objects(
void) noexcept;
268 void create_tasks(uint32_t task_count,
271 uint32_t remote_consumer_id,
272 doca_mmap *local_io_mmap,
273 doca_mmap *remote_io_mmap);
279 void prepare_thread_proc(uint32_t core_id);
281 void start_thread_proc();
283 [[nodiscard]]
bool is_thread_proc_running() const noexcept;
285 [[nodiscard]] hot_data const &get_hot_data() const noexcept;
288 struct rdma_context {
291 std::vector<doca_rdma_task_send *> storage_request_tasks;
292 std::vector<doca_rdma_task_receive *> storage_response_tasks;
296 uint8_t *m_io_message_region;
297 doca_mmap *m_io_message_mmap;
298 doca_buf_inventory *m_buf_inv;
299 std::vector<doca_buf *> m_io_message_bufs;
300 doca_comch_consumer *m_consumer;
301 doca_comch_producer *m_producer;
303 doca_ec_matrix *m_ec_matrix;
304 doca_compress *m_compress;
305 per_storage_connection<rdma_context> m_rdma;
306 std::vector<doca_comch_consumer_task_post_recv *> m_host_request_tasks;
307 std::vector<doca_comch_producer_task_send *> m_host_response_tasks;
308 std::thread m_thread;
310 void init(doca_dev *dev,
311 doca_comch_connection *comch_conn,
314 std::string
const &ec_matrix_type,
315 uint32_t recover_drop_freq);
319 void create_gga_tasks(uint32_t block_size, doca_mmap *local_io_mmap, doca_mmap *remote_io_mmap);
321 void prepare_transaction_part(uint32_t idx, uint8_t *io_message_addr, connection_role role);
323 static void doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
327 static void doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
331 static void doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
335 static void doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
339 static void doca_rdma_task_send_cb(doca_rdma_task_send *task,
343 static void doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
347 static void doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
351 static void doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
355 static void doca_ec_task_recover_cb(doca_ec_task_recover *task,
359 static void doca_ec_task_recover_error_cb(doca_ec_task_recover *task,
363 static void doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task,
367 static void doca_compress_task_decompress_lz4_stream_error_cb(doca_compress_task_decompress_lz4_stream *task,
374 class gga_offload_app {
378 gga_offload_app() =
delete;
380 explicit gga_offload_app(gga_offload_app_configuration
const &
cfg);
382 gga_offload_app(gga_offload_app
const &) =
delete;
384 gga_offload_app(gga_offload_app &&) noexcept = delete;
386 gga_offload_app &operator=(gga_offload_app const &) = delete;
388 gga_offload_app &operator=(gga_offload_app &&) noexcept = delete;
390 void abort(std::
string const &reason);
392 void connect_to_storage(
void);
394 void wait_for_comch_client_connection(
void);
396 void wait_for_and_process_query_storage(
void);
398 void wait_for_and_process_init_storage(
void);
400 void wait_for_and_process_start_storage(
void);
402 void wait_for_and_process_stop_storage(
void);
404 void wait_for_and_process_shutdown(
void);
406 void display_stats(
void) const;
409 gga_offload_app_configuration const m_cfg;
411 doca_dev_rep *m_dev_rep;
412 doca_mmap *m_remote_io_mmap;
413 uint8_t *m_local_io_region;
414 doca_mmap *m_local_io_mmap;
415 per_ctrl_connection<std::unique_ptr<
storage::control::channel>> m_all_ctrl_channels;
416 per_storage_connection<
storage::control::channel *> m_storage_ctrl_channels;
417 std::vector<
storage::control::message> m_ctrl_messages;
418 std::vector<uint32_t> m_remote_consumer_ids;
419 gga_offload_app_worker *m_workers;
420 std::vector<thread_stats> m_stats;
421 uint64_t m_storage_capacity;
422 uint32_t m_storage_block_size;
423 uint32_t m_message_id_counter;
424 uint32_t m_task_count;
425 uint32_t m_batch_size;
426 uint32_t m_core_count;
429 static
void new_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept;
431 static
void expired_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept;
433 storage::control::message wait_for_control_message();
435 void wait_for_responses(std::vector<
storage::control::message_id> const &mids, std::chrono::seconds timeout);
437 storage::control::message get_response(
storage::control::message_id mids);
439 void discard_responses(std::vector<
storage::control::message_id> const &mids);
441 storage::control::message process_query_storage(
storage::control::message const &client_request);
443 storage::control::message process_init_storage(
storage::control::message const &client_request);
445 storage::control::message process_start_storage(
storage::control::message const &client_request);
447 storage::control::message process_stop_storage(
storage::control::message const &client_request);
449 storage::control::message process_shutdown(
storage::control::message const &client_requeste);
451 void prepare_thread_contexts(
storage::control::correlation_id cid);
453 void connect_rdma(uint32_t thread_idx,
455 storage::control::correlation_id cid);
457 void verify_connections_are_ready(
void);
459 void destroy_workers(
void) noexcept;
471 gga_offload_app_configuration parse_cli_args(
int argc,
char **argv);
481 int main(
int argc,
char **argv)
488 gga_offload_app app{parse_cli_args(argc, argv)};
490 app.abort(
"User requested abort");
493 app.connect_to_storage();
494 app.wait_for_comch_client_connection();
495 app.wait_for_and_process_query_storage();
496 app.wait_for_and_process_init_storage();
497 app.wait_for_and_process_start_storage();
498 app.wait_for_and_process_stop_storage();
499 app.wait_for_and_process_shutdown();
501 }
catch (std::exception
const &ex) {
502 fprintf(stderr,
"EXCEPTION: %s\n", ex.what());
519 void print_config(gga_offload_app_configuration
const &
cfg) noexcept
521 printf(
"gga_offload_app_configuration: {\n");
522 printf(
"\tcpu_set : [");
524 for (
auto cpu :
cfg.cpu_set) {
532 printf(
"\tdevice : \"%s\",\n",
cfg.device_id.c_str());
533 printf(
"\trepresentor : \"%s\",\n",
cfg.representor_id.c_str());
534 printf(
"\tcommand_channel_name : \"%s\",\n",
cfg.command_channel_name.c_str());
535 printf(
"\tcontrol_timeout : %u,\n",
static_cast<uint32_t
>(
cfg.control_timeout.count()));
536 printf(
"\tstorage_server[data_1] : %s:%u\n",
537 cfg.storage_server_address[connection_role::data_1].get_address().c_str(),
538 cfg.storage_server_address[connection_role::data_1].get_port());
539 printf(
"\tdata_2_storage_server : %s:%u\n",
540 cfg.storage_server_address[connection_role::data_2].get_address().c_str(),
541 cfg.storage_server_address[connection_role::data_2].get_port());
542 printf(
"\tdata_p_storage_server : %s:%u\n",
543 cfg.storage_server_address[connection_role::data_p].get_address().c_str(),
544 cfg.storage_server_address[connection_role::data_p].get_port());
545 printf(
"\trecover_freq : %u\n",
cfg.recover_freq);
554 void validate_gga_offload_app_configuration(gga_offload_app_configuration
const &
cfg)
556 std::vector<std::string> errors;
558 if (
cfg.control_timeout.count() == 0) {
559 errors.emplace_back(
"Invalid gga_offload_app_configuration: control-timeout must not be zero");
562 if (!errors.empty()) {
563 for (
auto const &err : errors) {
564 printf(
"%s\n", err.c_str());
567 "Invalid gga_offload_app_configuration detected"};
580 gga_offload_app_configuration parse_cli_args(
int argc,
char **argv)
582 gga_offload_app_configuration config{};
583 config.command_channel_name = default_command_channel_name;
584 config.control_timeout = default_control_timeout_seconds;
585 config.ec_matrix_type =
"vandermonde";
600 [](
void *
value,
void *
cfg) noexcept {
601 static_cast<gga_offload_app_configuration *
>(
cfg)->device_id =
602 static_cast<char const *
>(
value);
608 "Device host side representor identifier",
611 [](
void *
value,
void *
cfg) noexcept {
612 static_cast<gga_offload_app_configuration *
>(
cfg)->representor_id =
613 static_cast<char const *
>(
value);
619 "CPU core to which the process affinity can be set",
622 [](
void *
value,
void *
cfg) noexcept {
623 static_cast<gga_offload_app_configuration *
>(
cfg)->cpu_set.push_back(
624 *
static_cast<int *
>(
value));
630 "Storage server addresses in <ip_addr>:<port> format",
633 [](
void *
value,
void *
cfg) noexcept {
635 static_cast<gga_offload_app_configuration *
>(
cfg)
636 ->storage_server_address[connection_role::data_1] =
638 static_cast<char const *
>(
value));
647 "Storage server addresses in <ip_addr>:<port> format",
650 [](
void *
value,
void *
cfg) noexcept {
652 static_cast<gga_offload_app_configuration *
>(
cfg)
653 ->storage_server_address[connection_role::data_2] =
655 static_cast<char const *
>(
value));
664 "Storage server addresses in <ip_addr>:<port> format",
667 [](
void *
value,
void *
cfg) noexcept {
669 static_cast<gga_offload_app_configuration *
>(
cfg)
670 ->storage_server_address[connection_role::data_p] =
672 static_cast<char const *
>(
value));
682 "Type of matrix to use. One of: cauchy, vandermonde Default: vandermonde",
685 [](
void *
value,
void *
cfg) noexcept {
686 static_cast<gga_offload_app_configuration *
>(
cfg)->ec_matrix_type =
687 static_cast<char const *
>(
value);
693 "command-channel-name",
694 "Name of the channel used by the doca_comch_client. Default: \"doca_storage_comch\"",
697 [](
void *
value,
void *
cfg) noexcept {
698 static_cast<gga_offload_app_configuration *
>(
cfg)->command_channel_name =
699 static_cast<char const *
>(
value);
705 "Time (in seconds) to wait while performing control operations. Default: 5",
708 [](
void *
value,
void *
cfg) noexcept {
709 static_cast<gga_offload_app_configuration *
>(
cfg)->control_timeout =
710 std::chrono::seconds{*
static_cast<int *
>(
value)};
715 "trigger-recovery-read-every-n",
716 "Trigger a recovery read flow every N th request. Default: 0 (disabled)",
719 [](
void *
value,
void *
cfg) noexcept {
720 static_cast<gga_offload_app_configuration *
>(
cfg)->recover_freq =
721 *
static_cast<int *
>(
value);
731 print_config(config);
732 validate_gga_offload_app_configuration(config);
745 auto *
const err_details =
747 if (err_details ==
nullptr) {
751 err_code = err_details->error_code;
752 err_msg = std::move(err_details->message);
763 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
767 char *io_message_from_doca_buf(doca_buf
const *buf)
771 return static_cast<char *
>(data);
774 gga_offload_app_worker::hot_data::hot_data()
776 remote_memory_start_addr{0},
777 local_memory_start_addr{0},
781 recovery_flow_count{0},
782 completed_transaction_count{0},
783 transactions{nullptr},
784 in_flight_transaction_count{0},
789 recover_drop_count{0},
790 recover_drop_freq{0},
796 gga_offload_app_worker::hot_data::hot_data(hot_data &&other) noexcept
798 remote_memory_start_addr{other.remote_memory_start_addr},
799 local_memory_start_addr{other.local_memory_start_addr},
800 storage_capacity{other.storage_capacity},
801 pe_hit_count{other.pe_hit_count},
802 pe_miss_count{other.pe_miss_count},
803 recovery_flow_count{other.recovery_flow_count},
804 completed_transaction_count{other.completed_transaction_count},
805 transactions{other.transactions},
806 in_flight_transaction_count{other.in_flight_transaction_count},
807 block_size{other.block_size},
808 half_block_size{other.half_block_size},
809 task_count{other.task_count},
810 core_idx{other.core_idx},
811 recover_drop_count{other.recover_drop_count},
812 recover_drop_freq{other.recover_drop_freq},
813 run_flag{other.run_flag.load()},
814 error_flag{other.error_flag}
817 other.transactions =
nullptr;
820 gga_offload_app_worker::hot_data &gga_offload_app_worker::hot_data::operator=(hot_data &&other) noexcept
822 if (std::addressof(other) ==
this)
826 remote_memory_start_addr = other.remote_memory_start_addr;
827 local_memory_start_addr = other.local_memory_start_addr;
828 storage_capacity = other.storage_capacity;
829 pe_hit_count = other.pe_hit_count;
830 pe_miss_count = other.pe_miss_count;
831 recovery_flow_count = other.recovery_flow_count;
832 completed_transaction_count = other.completed_transaction_count;
833 transactions = other.transactions;
834 in_flight_transaction_count = other.in_flight_transaction_count;
835 block_size = other.block_size;
836 half_block_size = other.half_block_size;
837 task_count = other.task_count;
838 core_idx = other.core_idx;
839 recover_drop_count = other.recover_drop_count;
840 recover_drop_freq = other.recover_drop_freq;
841 run_flag = other.run_flag.load();
842 error_flag = other.error_flag;
845 other.transactions =
nullptr;
850 doca_error_t gga_offload_app_worker::hot_data::start_transaction(doca_comch_consumer_task_post_recv *task,
851 char const *io_message)
862 auto &transaction = transactions[cid];
864 if (transaction.remaining_op_count != 0) {
869 transaction.host_request_task = task;
870 transaction.remaining_op_count = 4;
872 connection_role part_a_conn = connection_role::data_1;
873 connection_role part_b_conn = connection_role::data_2;
875 transaction.mode = transaction_mode::read;
878 auto const half_block_size = transaction.io_size / 2;
881 auto const io_offset = host_io_addr - remote_memory_start_addr;
882 transaction.block_idx = io_offset / block_size;
884 uint32_t remote_offset_a = 0;
885 uint32_t remote_offset_b = 0;
887 if (recover_drop_freq != 0 && (--recover_drop_count) == 0) {
888 recover_drop_count = recover_drop_freq;
889 ++recovery_flow_count;
891 if (recovery_flow_count % 2 == 0) {
892 transaction.mode = transaction_mode::recover_a;
893 part_a_conn = connection_role::data_p;
894 part_b_conn = connection_role::data_2;
895 remote_offset_a = storage_capacity - (transaction.block_idx * half_block_size);
897 transaction.mode = transaction_mode::recover_b;
898 part_a_conn = connection_role::data_1;
899 part_b_conn = connection_role::data_p;
901 storage_capacity - (half_block_size + (transaction.block_idx * half_block_size));
905 auto *part_a_io_message = transaction.io_message[part_a_conn];
906 auto *part_b_io_message = transaction.io_message[part_b_conn];
907 auto *response_io_message =
910 auto const local_io_addr = local_memory_start_addr + io_offset;
956 ++in_flight_transaction_count;
961 void gga_offload_app_worker::hot_data::process_result(gga_offload_app_worker::transaction_context &transaction)
963 if (transaction.mode == transaction_mode::read) {
964 transaction.remaining_op_count = 1;
965 start_decompress(transaction);
967 transaction.remaining_op_count = 2;
968 start_recover(transaction);
972 void gga_offload_app_worker::hot_data::start_decompress(gga_offload_app_worker::transaction_context &transaction)
974 auto const io_offset = transaction.block_idx * block_size;
975 auto *
const local_block_start =
reinterpret_cast<char *
>(local_memory_start_addr) + io_offset;
982 be32toh(hdr->compressed_size)));
986 doca_buf_set_data(dst_buff,
reinterpret_cast<char *
>(remote_memory_start_addr) + io_offset, 0));
998 void gga_offload_app_worker::hot_data::start_recover(gga_offload_app_worker::transaction_context &transaction)
1006 auto *
const d1_addr =
reinterpret_cast<char *
>(local_memory_start_addr) + (transaction.block_idx * block_size);
1007 auto *
const dp_addr =
reinterpret_cast<char *
>(local_memory_start_addr) + storage_capacity +
1008 (transaction.block_idx * half_block_size);
1010 if (transaction.mode == transaction_mode::recover_a) {
1015 d2_len = half_block_size;
1020 d1_len = half_block_size;
1025 static_cast<void>(
doca_buf_set_data(d2_buf, d1_addr + half_block_size, d2_len));
1037 gga_offload_app_worker::~gga_offload_app_worker()
1039 if (m_thread.joinable()) {
1040 m_hot_data.run_flag =
false;
1041 m_hot_data.error_flag =
true;
1047 gga_offload_app_worker::gga_offload_app_worker(doca_dev *dev,
1048 doca_comch_connection *comch_conn,
1049 uint32_t task_count,
1050 uint32_t batch_size,
1051 std::string
const &ec_matrix_type,
1052 uint32_t recover_drop_freq)
1054 m_io_message_region{nullptr},
1055 m_io_message_mmap{nullptr},
1057 m_io_message_bufs{},
1058 m_consumer{nullptr},
1059 m_producer{nullptr},
1061 m_ec_matrix{nullptr},
1062 m_compress{nullptr},
1064 m_host_request_tasks{},
1065 m_host_response_tasks{},
1069 init(dev, comch_conn, task_count, batch_size, ec_matrix_type, recover_drop_freq);
1076 gga_offload_app_worker::gga_offload_app_worker(gga_offload_app_worker &&other) noexcept
1077 : m_hot_data{std::move(other.m_hot_data)},
1078 m_io_message_region{other.m_io_message_region},
1079 m_io_message_mmap{other.m_io_message_mmap},
1080 m_buf_inv{other.m_buf_inv},
1081 m_io_message_bufs{std::move(other.m_io_message_bufs)},
1082 m_consumer{other.m_consumer},
1083 m_producer{other.m_producer},
1085 m_ec_matrix{other.m_ec_matrix},
1086 m_compress{other.m_compress},
1087 m_rdma{std::move(other.m_rdma)},
1088 m_host_request_tasks{std::move(other.m_host_request_tasks)},
1089 m_host_response_tasks{std::move(other.m_host_response_tasks)},
1090 m_thread{std::move(other.m_thread)}
1092 other.m_io_message_region =
nullptr;
1093 other.m_io_message_mmap =
nullptr;
1094 other.m_buf_inv =
nullptr;
1095 other.m_consumer =
nullptr;
1096 other.m_producer =
nullptr;
1097 other.m_ec =
nullptr;
1098 other.m_ec_matrix =
nullptr;
1099 other.m_compress =
nullptr;
1102 gga_offload_app_worker &gga_offload_app_worker::operator=(gga_offload_app_worker &&other) noexcept
1104 if (std::addressof(other) ==
this)
1109 m_hot_data = std::move(other.m_hot_data);
1110 m_io_message_region = other.m_io_message_region;
1111 m_io_message_mmap = other.m_io_message_mmap;
1112 m_buf_inv = other.m_buf_inv;
1113 m_io_message_bufs = std::move(other.m_io_message_bufs);
1114 m_consumer = other.m_consumer;
1115 m_producer = other.m_producer;
1117 m_ec_matrix = other.m_ec_matrix;
1118 m_compress = other.m_compress;
1119 m_rdma = std::move(other.m_rdma);
1120 m_host_request_tasks = std::move(other.m_host_request_tasks);
1121 m_host_response_tasks = std::move(other.m_host_response_tasks);
1122 m_thread = std::move(other.m_thread);
1124 other.m_io_message_region =
nullptr;
1125 other.m_io_message_mmap =
nullptr;
1126 other.m_buf_inv =
nullptr;
1127 other.m_consumer =
nullptr;
1128 other.m_producer =
nullptr;
1129 other.m_ec =
nullptr;
1130 other.m_ec_matrix =
nullptr;
1131 other.m_compress =
nullptr;
1136 std::vector<uint8_t> gga_offload_app_worker::get_local_rdma_connection_blob(
1137 connection_role conn_role,
1141 uint8_t
const *blob =
nullptr;
1142 size_t blob_size = 0;
1144 auto &rdma_ctx = m_rdma[conn_role];
1147 reinterpret_cast<void const **
>(&blob),
1149 std::addressof(rdma_pair.conn));
1155 return std::vector<uint8_t>{blob, blob + blob_size};
1158 void gga_offload_app_worker::connect_rdma(connection_role conn_role,
1160 std::vector<uint8_t>
const &blob)
1162 auto &rdma_ctx = m_rdma[conn_role];
1166 ret =
doca_rdma_connect(rdma_pair.rdma, blob.data(), blob.size(), rdma_pair.conn);
1173 doca_error_t gga_offload_app_worker::get_connections_state() const noexcept
1177 uint32_t pending_count = 0;
1201 for (
auto &
ctx : m_rdma) {
1228 void gga_offload_app_worker::stop_processing(
void) noexcept
1230 m_hot_data.run_flag =
false;
1231 if (m_thread.joinable()) {
1236 void gga_offload_app_worker::destroy_comch_objects(
void) noexcept
1239 std::vector<doca_task *> tasks;
1241 if (m_consumer !=
nullptr) {
1242 tasks.reserve(m_host_request_tasks.size());
1243 std::transform(std::begin(m_host_request_tasks),
1244 std::end(m_host_request_tasks),
1245 std::back_inserter(tasks),
1252 m_host_request_tasks.clear();
1258 m_consumer =
nullptr;
1262 if (m_producer !=
nullptr) {
1263 tasks.reserve(m_host_response_tasks.size());
1264 std::transform(std::begin(m_host_response_tasks),
1265 std::end(m_host_response_tasks),
1266 std::back_inserter(tasks),
1273 m_host_response_tasks.clear();
1279 m_producer =
nullptr;
1284 void gga_offload_app_worker::create_tasks(uint32_t task_count,
1285 uint32_t batch_size,
1286 uint32_t block_size,
1287 uint32_t remote_consumer_id,
1288 doca_mmap *local_io_mmap,
1289 doca_mmap *remote_io_mmap)
1295 auto *io_message_addr = m_io_message_region;
1299 m_io_message_bufs.reserve((task_count * 8) + batch_size);
1301 m_host_request_tasks.reserve(task_count);
1302 m_host_response_tasks.reserve(task_count);
1303 for (
auto &
ctx : m_rdma) {
1304 ctx.storage_request_tasks.reserve(task_count);
1305 ctx.storage_response_tasks.reserve(task_count);
1308 for (uint32_t ii = 0; ii != (task_count + batch_size); ++ii) {
1309 doca_buf *buff =
nullptr;
1321 m_io_message_bufs.push_back(buff);
1323 doca_comch_consumer_task_post_recv *comch_consumer_task_post_recv =
nullptr;
1330 m_host_request_tasks.push_back(comch_consumer_task_post_recv);
1333 for (uint32_t ii = 0; ii != task_count; ++ii) {
1334 prepare_transaction_part(ii, io_message_addr, connection_role::data_1);
1337 prepare_transaction_part(ii, io_message_addr, connection_role::data_2);
1340 prepare_transaction_part(ii, io_message_addr, connection_role::data_p);
1345 create_gga_tasks(block_size, local_io_mmap, remote_io_mmap);
1347 for (uint32_t ii = 0; ii != task_count; ++ii) {
1348 doca_buf *buff =
nullptr;
1360 m_io_message_bufs.push_back(buff);
1362 doca_comch_producer_task_send *comch_producer_task_send;
1368 &comch_producer_task_send);
1374 m_hot_data.transactions[ii].host_response_task = comch_producer_task_send;
1375 m_host_response_tasks.push_back(comch_producer_task_send);
1379 void gga_offload_app_worker::prepare_thread_proc(uint32_t core_id)
1381 m_thread = std::thread{[
this]() {
1384 }
catch (std::exception
const &ex) {
1385 DOCA_LOG_ERR(
"Core: %u Exception: %s", m_hot_data.core_idx, ex.what());
1386 m_hot_data.error_flag =
true;
1387 m_hot_data.run_flag =
false;
1390 m_hot_data.core_idx = core_id;
1394 void gga_offload_app_worker::start_thread_proc(
void)
1398 for (
auto *task : m_host_request_tasks) {
1401 DOCA_LOG_ERR(
"Failed to submit initial doca_comch_consumer_task_post_recv task: %s",
1407 for (
auto &
ctx : m_rdma) {
1408 for (
auto *task :
ctx.storage_response_tasks) {
1411 DOCA_LOG_ERR(
"Failed to submit initial doca_rdma_task_receive task: %s",
1418 m_hot_data.run_flag =
true;
1421 gga_offload_app_worker::hot_data
const &gga_offload_app_worker::get_hot_data(
void)
const noexcept
1426 void gga_offload_app_worker::init(doca_dev *dev,
1427 doca_comch_connection *comch_conn,
1428 uint32_t task_count,
1429 uint32_t batch_size,
1430 std::string
const &ec_matrix_type,
1431 uint32_t recover_drop_freq)
1439 auto const io_message_count = (task_count * 8) + batch_size;
1442 DOCA_LOG_DBG(
"Allocate io messages memory (%zu bytes, aligned to %u byte pages)",
1443 raw_io_messages_size,
1445 m_io_message_region =
static_cast<uint8_t *
>(
1447 if (m_io_message_region ==
nullptr) {
1452 reinterpret_cast<char *
>(m_io_message_region),
1453 raw_io_messages_size,
1456 auto const gga_buffer_count = task_count * 5;
1476 task_count + batch_size,
1477 doca_data{.ptr = std::addressof(m_hot_data)},
1478 doca_comch_consumer_task_post_recv_cb,
1479 doca_comch_consumer_task_post_recv_error_cb);
1484 doca_data{.ptr = std::addressof(m_hot_data)},
1485 doca_comch_producer_task_send_cb,
1486 doca_comch_producer_task_send_error_cb);
1536 doca_compress_task_decompress_lz4_stream_cb,
1537 doca_compress_task_decompress_lz4_stream_error_cb,
1541 "Failed to create doca_compress_task_decompress_lz4_stream task pool"};
1552 for (
auto &
ctx : m_rdma) {
1555 doca_data{.ptr = std::addressof(m_hot_data)},
1559 doca_rdma_task_receive_cb,
1560 doca_rdma_task_receive_error_cb,
1567 doca_rdma_task_send_cb,
1568 doca_rdma_task_send_error_cb,
1569 task_count + batch_size);
1581 doca_data{.ptr = std::addressof(m_hot_data)},
1590 m_hot_data.run_flag =
false;
1591 m_hot_data.error_flag =
false;
1592 m_hot_data.task_count = task_count;
1593 m_hot_data.pe_hit_count = 0;
1594 m_hot_data.pe_miss_count = 0;
1595 m_hot_data.completed_transaction_count = 0;
1596 m_hot_data.in_flight_transaction_count = 0;
1597 m_hot_data.recover_drop_count = recover_drop_freq;
1598 m_hot_data.recover_drop_freq = recover_drop_freq;
1604 std::vector<doca_task *> tasks;
1606 for (
auto &
ctx : m_rdma) {
1607 if (
ctx.ctrl.rdma !=
nullptr) {
1609 tasks.reserve(
ctx.storage_request_tasks.size() +
ctx.storage_response_tasks.size());
1610 std::transform(std::begin(
ctx.storage_request_tasks),
1611 std::end(
ctx.storage_request_tasks),
1612 std::back_inserter(tasks),
1614 std::transform(std::begin(
ctx.storage_response_tasks),
1615 std::end(
ctx.storage_response_tasks),
1616 std::back_inserter(tasks),
1631 if (
ctx.data.rdma !=
nullptr) {
1646 destroy_comch_objects();
1648 if (m_hot_data.pe !=
nullptr) {
1655 for (
auto *buf : m_io_message_bufs) {
1670 if (m_io_message_mmap) {
1681 if (m_io_message_region !=
nullptr) {
1686 void gga_offload_app_worker::create_gga_tasks(uint32_t block_size, doca_mmap *local_io_mmap, doca_mmap *remote_io_mmap)
1688 char *io_local_region_begin =
nullptr;
1689 char *io_remote_region_begin =
nullptr;
1690 size_t io_local_region_size = 0;
1691 size_t io_remote_region_size = 0;
1695 reinterpret_cast<void **
>(&io_local_region_begin),
1696 &io_local_region_size);
1702 reinterpret_cast<void **
>(&io_remote_region_begin),
1703 &io_remote_region_size);
1711 if ((io_remote_region_size + (io_remote_region_size / 2)) != io_local_region_size) {
1715 m_hot_data.local_memory_start_addr =
reinterpret_cast<uint64_t
>(io_local_region_begin);
1716 m_hot_data.remote_memory_start_addr =
reinterpret_cast<uint64_t
>(io_remote_region_begin);
1717 m_hot_data.storage_capacity = io_remote_region_size;
1718 m_hot_data.block_size = block_size;
1719 m_hot_data.half_block_size = block_size / 2;
1721 for (uint32_t ii = 0; ii != m_hot_data.task_count; ++ii) {
1722 doca_buf *in_buf =
nullptr;
1723 doca_buf *out_buf =
nullptr;
1727 io_local_region_begin,
1728 io_local_region_size,
1733 m_io_message_bufs.push_back(in_buf);
1737 io_remote_region_begin,
1738 io_remote_region_size,
1743 m_io_message_bufs.push_back(out_buf);
1745 auto constexpr has_block_checksum =
false;
1746 auto constexpr are_blocks_independent =
true;
1751 are_blocks_independent,
1755 &(m_hot_data.transactions[ii].decompress_task));
1761 for (uint32_t ii = 0; ii != m_hot_data.task_count; ++ii) {
1762 doca_buf *in_buf_1 =
nullptr;
1763 doca_buf *in_buf_2 =
nullptr;
1764 doca_buf *out_buf =
nullptr;
1768 io_local_region_begin,
1769 io_local_region_size,
1774 m_io_message_bufs.push_back(in_buf_1);
1778 io_local_region_begin,
1779 io_local_region_size,
1788 io_local_region_begin,
1789 io_local_region_size,
1794 m_io_message_bufs.push_back(out_buf);
1801 &(m_hot_data.transactions[ii].ec_recover_task));
1808 void gga_offload_app_worker::prepare_transaction_part(uint32_t idx, uint8_t *io_message_addr, connection_role role)
1811 doca_buf *req_buff =
nullptr;
1812 doca_buf *res_buff =
nullptr;
1823 m_hot_data.transactions[idx].io_message[role] =
reinterpret_cast<char *
>(io_message_addr);
1824 m_io_message_bufs.push_back(req_buff);
1837 m_io_message_bufs.push_back(res_buff);
1839 auto &transaction = m_hot_data.transactions[idx];
1840 transaction.array_idx = idx;
1841 transaction.remaining_op_count = 0;
1843 m_rdma[role].ctrl.conn,
1846 std::addressof(transaction.requests[role]));
1850 m_rdma[role].storage_request_tasks.push_back(transaction.requests[role]);
1855 std::addressof(transaction.responses[role]));
1859 m_rdma[role].storage_response_tasks.push_back(transaction.responses[role]);
1862 void gga_offload_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
1866 static_cast<void>(task_user_data);
1870 reinterpret_cast<void **
>(&io_message)));
1873 static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr)->start_transaction(task, io_message);
1879 void gga_offload_app_worker::doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
1883 static_cast<void>(task);
1884 static_cast<void>(task_user_data);
1886 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
1888 if (hot_data->run_flag) {
1889 DOCA_LOG_ERR(
"Failed to complete doca_comch_consumer_task_post_recv");
1890 hot_data->run_flag =
false;
1891 hot_data->error_flag =
true;
1895 void gga_offload_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
1899 static_cast<void>(task);
1900 static_cast<void>(task_user_data);
1901 static_cast<void>(ctx_user_data);
1904 void gga_offload_app_worker::doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
1908 static_cast<void>(task);
1909 static_cast<void>(task_user_data);
1911 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
1912 DOCA_LOG_ERR(
"Failed to complete doca_comch_producer_task_send");
1913 hot_data->run_flag =
false;
1914 hot_data->error_flag =
true;
1917 void gga_offload_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
1921 static_cast<void>(task);
1923 auto *hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
1924 auto &transaction = hot_data->transactions[task_user_data.u64];
1926 --(transaction.remaining_op_count);
1927 if (transaction.remaining_op_count == 0) {
1928 hot_data->process_result(transaction);
1932 void gga_offload_app_worker::doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
1936 static_cast<void>(task);
1937 static_cast<void>(task_user_data);
1939 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
1940 DOCA_LOG_ERR(
"Failed to complete doca_rdma_task_send");
1941 hot_data->run_flag =
false;
1942 hot_data->error_flag =
true;
1945 void gga_offload_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
1949 static_cast<void>(task_user_data);
1951 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
1956 auto *host_io_message = io_message_from_doca_buf(
1967 auto &transaction = hot_data->transactions[cid];
1969 --(transaction.remaining_op_count);
1970 if (transaction.remaining_op_count == 0) {
1971 hot_data->process_result(transaction);
1974 if (hot_data->run_flag) {
1978 DOCA_LOG_ERR(
"Failed to resubmit doca_rdma_task_receive");
1979 hot_data->run_flag =
false;
1980 hot_data->error_flag =
true;
1985 void gga_offload_app_worker::doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
1989 static_cast<void>(task);
1990 static_cast<void>(task_user_data);
1992 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
1993 if (hot_data->run_flag) {
1999 DOCA_LOG_ERR(
"Failed to complete doca_rdma_task_send");
2000 hot_data->run_flag =
false;
2001 hot_data->error_flag =
true;
2005 void gga_offload_app_worker::doca_ec_task_recover_cb(doca_ec_task_recover *task,
2009 static_cast<void>(task);
2011 auto *hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
2012 auto const cid = task_user_data.u64;
2013 auto &transaction = hot_data->transactions[cid];
2015 --(transaction.remaining_op_count);
2016 hot_data->start_decompress(transaction);
2019 void gga_offload_app_worker::doca_ec_task_recover_error_cb(doca_ec_task_recover *task,
2023 static_cast<void>(task);
2024 static_cast<void>(task_user_data);
2026 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
2027 DOCA_LOG_ERR(
"Failed to complete doca_ec_task_recover");
2028 hot_data->run_flag =
false;
2029 hot_data->error_flag =
true;
2032 void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task,
2036 static_cast<void>(task);
2038 auto *hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
2039 auto &transaction = hot_data->transactions[task_user_data.u64];
2040 --(transaction.remaining_op_count);
2042 --(hot_data->in_flight_transaction_count);
2043 ++(hot_data->completed_transaction_count);
2052 hot_data->run_flag =
false;
2053 hot_data->error_flag =
true;
2061 hot_data->error_flag =
true;
2062 hot_data->run_flag =
false;
2066 void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_error_cb(
2067 doca_compress_task_decompress_lz4_stream *task,
2071 static_cast<void>(task);
2072 static_cast<void>(task_user_data);
2074 auto *
const hot_data =
static_cast<gga_offload_app_worker::hot_data *
>(ctx_user_data.ptr);
2075 DOCA_LOG_ERR(
"Failed to complete doca_compress_task_decompress_lz4_stream");
2076 hot_data->run_flag =
false;
2077 hot_data->error_flag =
true;
2080 void gga_offload_app_worker::thread_proc()
2082 while (m_hot_data.run_flag ==
false) {
2083 std::this_thread::yield();
2084 if (m_hot_data.error_flag)
2090 while (m_hot_data.run_flag) {
2091 doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
2094 while (m_hot_data.error_flag ==
false && m_hot_data.in_flight_transaction_count != 0) {
2095 doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
2101 gga_offload_app::~gga_offload_app()
2104 for (
auto &channel : m_all_ctrl_channels) {
2109 if (m_dev !=
nullptr) {
2115 gga_offload_app::gga_offload_app(gga_offload_app_configuration
const &
cfg)
2119 m_remote_io_mmap{nullptr},
2120 m_local_io_region{nullptr},
2121 m_local_io_mmap{nullptr},
2122 m_all_ctrl_channels{},
2123 m_storage_ctrl_channels{},
2125 m_remote_consumer_ids{},
2128 m_storage_capacity{},
2129 m_storage_block_size{},
2130 m_message_id_counter{},
2136 DOCA_LOG_INFO(
"Open doca_dev: %s", m_cfg.device_id.c_str());
2139 DOCA_LOG_INFO(
"Open doca_dev_rep: %s", m_cfg.representor_id.c_str());
2143 m_cfg.storage_server_address[connection_role::data_1]);
2144 m_storage_ctrl_channels[connection_role::data_1] = m_all_ctrl_channels[connection_role::data_1].get();
2147 m_cfg.storage_server_address[connection_role::data_2]);
2148 m_storage_ctrl_channels[connection_role::data_2] = m_all_ctrl_channels[connection_role::data_2].get();
2151 m_cfg.storage_server_address[connection_role::data_p]);
2152 m_storage_ctrl_channels[connection_role::data_p] = m_all_ctrl_channels[connection_role::data_p].get();
2154 m_all_ctrl_channels[connection_role::client] =
2157 m_cfg.command_channel_name.c_str(),
2159 new_comch_consumer_callback,
2160 expired_comch_consumer_callback);
2163 void gga_offload_app::abort(std::string
const &reason)
2169 m_abort_flag =
true;
2172 void gga_offload_app::connect_to_storage(
void)
2174 for (
auto *storage_channel : m_storage_ctrl_channels) {
2179 "Aborted while connecting to storage"};
2182 if (storage_channel->is_connected())
2188 void gga_offload_app::wait_for_comch_client_connection(
void)
2190 while (!m_all_ctrl_channels[connection_role::client]->is_connected()) {
2191 std::this_thread::sleep_for(std::chrono::milliseconds{100});
2194 "Aborted while connecting to client"};
2199 void gga_offload_app::wait_for_and_process_query_storage(
void)
2202 auto const client_request = wait_for_control_message();
2205 std::string err_msg;
2209 m_all_ctrl_channels[connection_role::client]->send_message(
2210 process_query_storage(client_request));
2214 err_msg = ex.what();
2218 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
2222 m_all_ctrl_channels[connection_role::client]->send_message({
2224 client_request.message_id,
2225 client_request.correlation_id,
2226 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2231 void gga_offload_app::wait_for_and_process_init_storage(
void)
2234 auto const client_request = wait_for_control_message();
2237 std::string err_msg;
2241 m_all_ctrl_channels[connection_role::client]->send_message(
2242 process_init_storage(client_request));
2246 err_msg = ex.what();
2250 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
2254 m_all_ctrl_channels[connection_role::client]->send_message({
2256 client_request.message_id,
2257 client_request.correlation_id,
2258 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2263 void gga_offload_app::wait_for_and_process_start_storage(
void)
2266 auto const client_request = wait_for_control_message();
2269 std::string err_msg;
2273 m_all_ctrl_channels[connection_role::client]->send_message(
2274 process_start_storage(client_request));
2278 err_msg = ex.what();
2282 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
2286 m_all_ctrl_channels[connection_role::client]->send_message({
2288 client_request.message_id,
2289 client_request.correlation_id,
2290 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2295 void gga_offload_app::wait_for_and_process_stop_storage(
void)
2298 auto const client_request = wait_for_control_message();
2301 std::string err_msg;
2305 m_all_ctrl_channels[connection_role::client]->send_message(
2306 process_stop_storage(client_request));
2310 err_msg = ex.what();
2314 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
2318 m_all_ctrl_channels[connection_role::client]->send_message({
2320 client_request.message_id,
2321 client_request.correlation_id,
2322 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2327 void gga_offload_app::wait_for_and_process_shutdown(
void)
2330 auto const client_request = wait_for_control_message();
2333 std::string err_msg;
2337 m_all_ctrl_channels[connection_role::client]->send_message(process_shutdown(client_request));
2341 err_msg = ex.what();
2345 err_msg =
"Unexpected " +
to_string(client_request.message_type) +
" while expecting a " +
2349 m_all_ctrl_channels[connection_role::client]->send_message({
2351 client_request.message_id,
2352 client_request.correlation_id,
2353 std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2358 void gga_offload_app::display_stats(
void)
const
2360 for (
auto const &
stats : m_stats) {
2361 auto const pe_hit_rate_pct =
2362 (
static_cast<double>(
stats.pe_hit_count) /
2363 (
static_cast<double>(
stats.pe_hit_count) +
static_cast<double>(
stats.pe_miss_count))) *
2366 printf(
"+================================================+\n");
2367 printf(
"| Core: %u\n",
stats.core_idx);
2368 printf(
"| Operation count: %lu\n",
stats.operation_count);
2369 printf(
"| Recovery count: %lu\n",
stats.recovery_count);
2370 printf(
"| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct,
stats.pe_hit_count,
stats.pe_miss_count);
2374 void gga_offload_app::new_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept
2376 auto *
self =
reinterpret_cast<gga_offload_app *
>(user_data);
2377 if (self->m_remote_consumer_ids.capacity() == 0) {
2378 DOCA_LOG_ERR(
"[BUG] no space for new remote consumer ids");
2382 auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids),
id);
2383 if (found == std::end(self->m_remote_consumer_ids)) {
2384 self->m_remote_consumer_ids.push_back(
id);
2385 DOCA_LOG_DBG(
"Connected to remote consumer with id: %u. Consumer count is now: %zu",
2387 self->m_remote_consumer_ids.size());
2389 DOCA_LOG_WARN(
"Ignoring duplicate remote consumer id: %u",
id);
2393 void gga_offload_app::expired_comch_consumer_callback(
void *user_data, uint32_t
id) noexcept
2395 auto *
self =
reinterpret_cast<gga_offload_app *
>(user_data);
2396 auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids),
id);
2397 if (found != std::end(self->m_remote_consumer_ids)) {
2398 self->m_remote_consumer_ids.erase(found);
2399 DOCA_LOG_DBG(
"Disconnected from remote consumer with id: %u. Consumer count is now: %zu",
2401 self->m_remote_consumer_ids.size());
2403 DOCA_LOG_WARN(
"Ignoring disconnect of unexpected remote consumer id: %u",
id);
2410 if (!m_ctrl_messages.empty()) {
2411 auto msg = std::move(m_ctrl_messages.front());
2412 m_ctrl_messages.erase(m_ctrl_messages.begin());
2416 for (
auto &channel : m_all_ctrl_channels) {
2418 auto *msg = channel->poll();
2420 m_ctrl_messages.push_back(std::move(*msg));
2427 "User aborted the gga_offload_application while waiting on a control message"};
2432 void gga_offload_app::wait_for_responses(std::vector<storage::control::message_id>
const &mids,
2433 std::chrono::seconds timeout)
2435 auto const expiry = std::chrono::steady_clock::now() + timeout;
2436 uint32_t match_count = 0;
2441 "User aborted the gga_offload_application while waiting on a control message"};
2444 for (
auto &channel : m_all_ctrl_channels) {
2446 auto *msg = channel->poll();
2448 m_ctrl_messages.push_back(std::move(*msg));
2453 for (
auto mid : mids) {
2454 for (
auto const &msg : m_ctrl_messages) {
2455 if (msg.message_id.value == mid.value) {
2462 if (expiry < std::chrono::steady_clock::now()) {
2463 std::stringstream ss;
2464 ss <<
"Timed out while waiting on a control messages[";
2465 for (
auto &
id : mids) {
2466 ss <<
id.value <<
" ";
2468 ss <<
"] had available messages:[";
2469 for (
auto &msg : m_ctrl_messages) {
2470 ss << msg.message_id.value <<
" ";
2479 }
while (match_count != mids.size());
2484 auto found = std::find_if(std::begin(m_ctrl_messages), std::end(m_ctrl_messages), [mid](
auto const &msg) {
2485 return msg.message_id.value == mid.
value;
2488 if (found != std::end(m_ctrl_messages)) {
2489 auto msg = std::move(*found);
2490 m_ctrl_messages.erase(found);
2497 void gga_offload_app::discard_responses(std::vector<storage::control::message_id>
const &mids)
2499 m_ctrl_messages.erase(std::remove_if(std::begin(m_ctrl_messages),
2500 std::end(m_ctrl_messages),
2501 [&mids](
auto const &msg) {
2502 return std::find(std::begin(mids),
2504 msg.message_id) != std::end(mids);
2506 std::end(m_ctrl_messages));
2512 std::vector<storage::control::message_id> msg_ids;
2514 for (
auto *storage_ctrl : m_storage_ctrl_channels) {
2522 msg_ids.push_back(storage_request.message_id);
2523 storage_ctrl->send_message(storage_request);
2526 wait_for_responses(msg_ids, default_control_timeout_seconds);
2527 for (
auto &
id : msg_ids) {
2528 auto response = get_response(
id);
2531 discard_responses(msg_ids);
2532 return make_error_response(client_request.
message_id,
2534 std::move(response),
2538 auto const *
const storage_details =
2540 if (storage_details ==
nullptr) {
2544 DOCA_LOG_INFO(
"Storage reports capacity of: %lu using a block size of: %u",
2545 storage_details->total_size,
2546 storage_details->block_size);
2547 if (m_storage_capacity == 0) {
2548 m_storage_capacity = storage_details->total_size;
2549 m_storage_block_size = storage_details->block_size;
2551 if (m_storage_capacity != storage_details->total_size) {
2556 std::make_unique<storage::control::error_response_payload>(
2558 "Mismatch in storage capacity: " +
std::to_string(m_storage_capacity) +
2561 }
else if (m_storage_block_size != storage_details->block_size) {
2566 std::make_unique<storage::control::error_response_payload>(
2568 "Mismatch in block_size: " +
std::to_string(m_storage_block_size) +
2576 auto const local_storage_size = m_storage_capacity + (m_storage_capacity / 2);
2579 if (m_local_io_region ==
nullptr) {
2584 reinterpret_cast<char *
>(m_local_io_region),
2593 std::make_unique<storage::control::storage_details_payload>(m_storage_capacity, m_storage_block_size),
2599 auto const *init_storage_details =
2602 if (init_storage_details->core_count > m_cfg.cpu_set.size()) {
2605 "Unable to create " +
std::to_string(m_core_count) +
" threads as only " +
2610 m_remote_consumer_ids.reserve(init_storage_details->core_count);
2612 m_task_count = init_storage_details->task_count;
2613 m_batch_size = init_storage_details->batch_size;
2614 m_core_count = init_storage_details->core_count;
2616 init_storage_details->mmap_export_blob.data(),
2617 init_storage_details->mmap_export_blob.size());
2618 std::vector<uint8_t> mmap_export_blob = [
this]() {
2619 uint8_t
const *reexport_blob =
nullptr;
2620 size_t reexport_blob_size = 0;
2623 reinterpret_cast<void const **
>(&reexport_blob),
2624 &reexport_blob_size);
2629 return std::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size};
2632 DOCA_LOG_INFO(
"Configured storage: %u cores, %u tasks, %u batch_size", m_core_count, m_task_count, m_batch_size);
2635 std::vector<storage::control::message_id> msg_ids;
2637 for (
auto *storage_ctrl : m_storage_ctrl_channels) {
2642 std::make_unique<storage::control::init_storage_payload>(init_storage_details->task_count,
2643 init_storage_details->batch_size,
2644 init_storage_details->core_count,
2648 msg_ids.push_back(storage_request.message_id);
2649 storage_ctrl->send_message(storage_request);
2652 wait_for_responses(msg_ids, default_control_timeout_seconds);
2653 for (
auto &
id : msg_ids) {
2654 auto response = get_response(
id);
2657 discard_responses(msg_ids);
2658 return make_error_response(client_request.
message_id,
2660 std::move(response),
2679 std::vector<storage::control::message_id> msg_ids;
2681 for (
auto *storage_ctrl : m_storage_ctrl_channels) {
2689 msg_ids.push_back(storage_request.message_id);
2690 storage_ctrl->send_message(storage_request);
2693 wait_for_responses(msg_ids, default_control_timeout_seconds);
2694 for (
auto &
id : msg_ids) {
2695 auto response = get_response(
id);
2698 discard_responses(msg_ids);
2699 return make_error_response(client_request.
message_id,
2701 std::move(response),
2706 verify_connections_are_ready();
2707 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2708 m_workers[ii].create_tasks(m_task_count,
2710 m_storage_block_size,
2711 m_remote_consumer_ids[ii],
2714 m_workers[ii].start_thread_proc();
2728 std::vector<storage::control::message_id> msg_ids;
2730 for (
auto *storage_ctrl : m_storage_ctrl_channels) {
2738 msg_ids.push_back(storage_request.message_id);
2739 storage_ctrl->send_message(storage_request);
2742 wait_for_responses(msg_ids, default_control_timeout_seconds);
2743 for (
auto &
id : msg_ids) {
2744 auto response = get_response(
id);
2747 discard_responses(msg_ids);
2748 return make_error_response(client_request.
message_id,
2750 std::move(response),
2756 m_stats.reserve(m_core_count);
2757 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2758 m_workers[ii].stop_processing();
2759 auto const &hot_data = m_workers[ii].get_hot_data();
2760 m_stats.push_back(thread_stats{
2762 hot_data.pe_hit_count,
2763 hot_data.pe_miss_count,
2764 hot_data.completed_transaction_count,
2765 hot_data.recovery_flow_count,
2767 m_workers[ii].destroy_comch_objects();
2781 while (!m_remote_consumer_ids.empty()) {
2782 auto *msg = m_all_ctrl_channels[connection_role::client]->poll();
2783 DOCA_LOG_DBG(
"Ignoring unexpected %s while processing %s",
2789 std::vector<storage::control::message_id> msg_ids;
2791 for (
auto *storage_ctrl : m_storage_ctrl_channels) {
2799 msg_ids.push_back(storage_request.message_id);
2800 storage_ctrl->send_message(storage_request);
2803 wait_for_responses(msg_ids, default_control_timeout_seconds);
2804 for (
auto &
id : msg_ids) {
2805 auto response = get_response(
id);
2808 discard_responses(msg_ids);
2809 return make_error_response(client_request.
message_id,
2811 std::move(response),
2827 auto const *comch_channel =
2829 if (comch_channel ==
nullptr) {
2835 comch_channel->get_comch_connection(),
2838 m_cfg.ec_matrix_type,
2839 m_cfg.recover_freq);
2841 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2844 m_workers[ii].prepare_thread_proc(m_cfg.cpu_set[ii]);
2848 void gga_offload_app::connect_rdma(uint32_t thread_idx,
2852 std::vector<storage::control::message_id> msg_ids;
2853 auto &tctx = m_workers[thread_idx];
2859 std::make_unique<storage::control::rdma_connection_details_payload>(
2862 tctx.get_local_rdma_connection_blob(connection_role::data_1, role)),
2865 msg_ids.push_back(storage_request.message_id);
2866 m_storage_ctrl_channels[connection_role::data_1]->send_message(storage_request);
2873 std::make_unique<storage::control::rdma_connection_details_payload>(
2876 tctx.get_local_rdma_connection_blob(connection_role::data_2, role)),
2879 msg_ids.push_back(storage_request.message_id);
2880 m_storage_ctrl_channels[connection_role::data_2]->send_message(storage_request);
2887 std::make_unique<storage::control::rdma_connection_details_payload>(
2890 tctx.get_local_rdma_connection_blob(connection_role::data_p, role)),
2893 msg_ids.push_back(storage_request.message_id);
2894 m_storage_ctrl_channels[connection_role::data_p]->send_message(storage_request);
2897 wait_for_responses(msg_ids, default_control_timeout_seconds);
2898 auto response_role = connection_role::data_1;
2900 for (
auto &
id : msg_ids) {
2901 auto response = get_response(
id);
2904 discard_responses(msg_ids);
2906 auto *error_details =
2922 tctx.connect_rdma(response_role, role, remote_details->connection_details);
2923 response_role =
static_cast<connection_role
>(
static_cast<uint8_t
>(response_role) + 1);
2927 void gga_offload_app::verify_connections_are_ready(
void)
2929 uint32_t not_ready_count;
2932 not_ready_count = 0;
2933 if (m_remote_consumer_ids.size() != m_core_count) {
2935 auto *msg = m_all_ctrl_channels[connection_role::client]->poll();
2936 if (msg !=
nullptr) {
2939 "Unexpected " +
to_string(msg->message_type) +
" while processing " +
2945 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2946 auto const ret = m_workers[ii].get_connections_state();
2956 "Aborted while establishing storage connections"};
2958 }
while (not_ready_count != 0);
2961 void gga_offload_app::destroy_workers(
void) noexcept
2963 if (m_workers !=
nullptr) {
2965 for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2966 m_workers[ii].~gga_offload_app_worker();
2969 m_workers =
nullptr;
static void cleanup(struct cache_invalidate_sample_state *state)
static uint32_t get_correlation_id(char const *buf)
static uint32_t get_io_size(char const *buf)
static void set_user_data(doca_data user_data, char *buf)
static doca_data get_user_data(char const *buf)
static void set_correlation_id(uint32_t correlation_id, char *buf)
static void set_type(io_message_type type, char *buf)
static void set_io_address(uint64_t io_address, char *buf)
static io_message_type get_type(char const *buf)
static uint64_t get_io_address(char const *buf)
static void set_io_size(uint32_t io_size, char *buf)
static void set_result(doca_error_t result, char *buf)
static doca_error_t get_result(char const *buf)
static void set_remote_offset(uint32_t remote_offset, char *buf)
T * object_array(size_t object_count, Args &&...args) const
doca_error_t get_doca_error() const noexcept
int main(int argc, char **argv)
DOCA_LOG_REGISTER(gga_offload)
static struct eth_l2_fwd_stats stats
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
static doca_error_t doca_buf_inventory_buf_get_by_data(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *data, size_t data_len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by data & data_len a...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_chain_list(struct doca_buf *list1, struct doca_buf *list2)
Append list2 to list1.
DOCA_STABLE doca_error_t doca_buf_get_next_in_list(struct doca_buf *buf, struct doca_buf **next_buf)
Get next DOCA Buf in linked list.
DOCA_STABLE doca_error_t doca_buf_get_data(const struct doca_buf *buf, void **data)
Get the buffer's data.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE doca_error_t doca_buf_set_data(struct doca_buf *buf, void *data, size_t data_len)
DOCA_STABLE struct doca_buf * doca_comch_consumer_task_post_recv_get_buf(const struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE doca_error_t doca_comch_consumer_destroy(struct doca_comch_consumer *consumer)
DOCA_STABLE doca_error_t doca_comch_consumer_task_post_recv_alloc_init(struct doca_comch_consumer *consumer, struct doca_buf *buf, struct doca_comch_consumer_task_post_recv **task)
Allocate and initialize a doca_consumer post receive task.
DOCA_STABLE struct doca_ctx * doca_comch_consumer_as_ctx(struct doca_comch_consumer *consumer)
DOCA_STABLE struct doca_task * doca_comch_consumer_task_post_recv_as_task(struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE const struct doca_buf * doca_comch_producer_task_send_get_buf(const struct doca_comch_producer_task_send *task)
DOCA_STABLE struct doca_task * doca_comch_producer_task_send_as_task(struct doca_comch_producer_task_send *task)
DOCA_STABLE doca_error_t doca_comch_producer_destroy(struct doca_comch_producer *producer)
DOCA_STABLE struct doca_ctx * doca_comch_producer_as_ctx(struct doca_comch_producer *producer)
DOCA_STABLE doca_error_t doca_comch_producer_task_send_alloc_init(struct doca_comch_producer *producer, const struct doca_buf *buf, uint8_t *imm_data, uint32_t imm_data_len, uint32_t consumer_id, struct doca_comch_producer_task_send **task)
DOCA_EXPERIMENTAL struct doca_buf const * doca_compress_task_decompress_lz4_stream_get_src(const struct doca_compress_task_decompress_lz4_stream *task)
get decompress LZ4 stream task source
DOCA_EXPERIMENTAL doca_error_t doca_compress_task_decompress_lz4_stream_alloc_init(struct doca_compress *compress, uint8_t has_block_checksum, uint8_t are_blocks_independent, struct doca_buf const *src_buff, struct doca_buf *dst_buff, union doca_data user_data, struct doca_compress_task_decompress_lz4_stream **task)
Allocate decompress LZ4 stream task.
DOCA_EXPERIMENTAL doca_error_t doca_compress_create(struct doca_dev *dev, struct doca_compress **compress)
DOCA_EXPERIMENTAL struct doca_buf * doca_compress_task_decompress_lz4_stream_get_dst(const struct doca_compress_task_decompress_lz4_stream *task)
get decompress LZ4 stream task destination
DOCA_EXPERIMENTAL struct doca_ctx * doca_compress_as_ctx(struct doca_compress *compress)
DOCA_EXPERIMENTAL doca_error_t doca_compress_task_decompress_lz4_stream_set_conf(struct doca_compress *compress, doca_compress_task_decompress_lz4_stream_completion_cb_t task_completion_cb, doca_compress_task_decompress_lz4_stream_completion_cb_t task_error_cb, uint32_t num_tasks)
This method sets the decompress LZ4 stream task configuration.
DOCA_EXPERIMENTAL struct doca_task * doca_compress_task_decompress_lz4_stream_as_task(struct doca_compress_task_decompress_lz4_stream *task)
convert decompress LZ4 stream task to doca_task
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
DOCA_STABLE doca_error_t doca_ctx_set_user_data(struct doca_ctx *ctx, union doca_data user_data)
set user data to context
DOCA_STABLE doca_error_t doca_ctx_stop(struct doca_ctx *ctx)
Stops the context allowing reconfiguration.
doca_ctx_states
This enum defines the states of a context.
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
DOCA_EXPERIMENTAL const struct doca_buf * doca_ec_task_recover_get_available_blocks(const struct doca_ec_task_recover *task)
This method gets the available_blocks buffer of a recover task. The available_blocks buffer is a sour...
DOCA_EXPERIMENTAL struct doca_buf * doca_ec_task_recover_get_recovered_data(const struct doca_ec_task_recover *task)
This method gets the recovered_data buffer of a recover task. The recovered_data buffer is a destinat...
DOCA_EXPERIMENTAL doca_error_t doca_ec_create(struct doca_dev *dev, struct doca_ec **ec)
Create a DOCA EC instance.
DOCA_EXPERIMENTAL doca_error_t doca_ec_task_recover_allocate_init(struct doca_ec *ec, const struct doca_ec_matrix *recover_matrix, const struct doca_buf *available_blocks, struct doca_buf *recovered_data_blocks, union doca_data user_data, struct doca_ec_task_recover **task)
This method allocates and initializes a recover task.
DOCA_EXPERIMENTAL struct doca_task * doca_ec_task_recover_as_task(struct doca_ec_task_recover *task)
This method converts an EC recover task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_ec_task_recover_set_conf(struct doca_ec *ec, doca_ec_task_recover_completion_cb_t successful_task_completion_cb, doca_ec_task_recover_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the recover tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_ec_matrix_create(struct doca_ec *ec, enum doca_ec_matrix_type type, size_t data_block_count, size_t rdnc_block_count, struct doca_ec_matrix **matrix)
Generate coding matrix for Erasure Code encode i.e. most basic encode matrix. This is necessary for e...
DOCA_EXPERIMENTAL struct doca_ctx * doca_ec_as_ctx(struct doca_ec *ec)
Convert EC instance into context.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_CONNECTION_ABORTED
@ DOCA_ERROR_INVALID_VALUE
@ DOCA_ERROR_INITIALIZATION
@ DOCA_ERROR_NOT_SUPPORTED
@ DOCA_ERROR_CONNECTION_RESET
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_get_memrange(const struct doca_mmap *mmap, void **addr, size_t *len)
Get the memory range of DOCA memory map.
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
DOCA_STABLE doca_error_t doca_mmap_export_rdma(struct doca_mmap *mmap, const struct doca_dev *dev, const void **export_desc, size_t *export_desc_len)
Compose memory map representation for later import with doca_mmap_create_from_export() for one of the...
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_pe_connect_ctx(struct doca_pe *pe, struct doca_ctx *ctx)
This method connects a context to a progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE void doca_task_set_user_data(struct doca_task *task, union doca_data user_data)
Set user data to a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
DOCA_EXPERIMENTAL struct doca_buf * doca_rdma_task_receive_get_dst_buf(const struct doca_rdma_task_receive *task)
This method gets the destination buffer of a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_set_conf(struct doca_rdma *rdma, doca_rdma_task_send_completion_cb_t successful_task_completion_cb, doca_rdma_task_send_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the send tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_destroy(struct doca_rdma *rdma)
Destroy a DOCA RDMA instance.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_receive_as_task(struct doca_rdma_task_receive *task)
This method converts a receive task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_set_conf(struct doca_rdma *rdma, doca_rdma_task_receive_completion_cb_t successful_task_completion_cb, doca_rdma_task_receive_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the receive tasks configuration.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_send_as_task(struct doca_rdma_task_send *task)
This method converts a send task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_allocate_init(struct doca_rdma *rdma, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_receive **task)
This method allocates and initializes a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_export(struct doca_rdma *rdma, const void **local_rdma_conn_details, size_t *local_rdma_conn_details_size, struct doca_rdma_connection **rdma_connection)
Export doca_rdma connection details object The doca_rdma_conn_details are used in doca_rdma_connect()...
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, union doca_data user_data, struct doca_rdma_task_send **task)
This method allocates and initializes a send task.
DOCA_EXPERIMENTAL struct doca_ctx * doca_rdma_as_ctx(struct doca_rdma *rdma)
Convert doca_rdma instance into a generalized context for use with doca core objects.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_connect(struct doca_rdma *rdma, const void *remote_rdma_conn_details, size_t remote_rdma_conn_details_size, struct doca_rdma_connection *rdma_connection)
Connect to remote doca_rdma peer. Can only be called when the ctx is in DOCA_CTX_STATE_STARTING state...
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_READ
@ DOCA_ACCESS_FLAG_PCI_READ_WRITE
@ DOCA_ACCESS_FLAG_RDMA_WRITE
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
const struct ip_frag_config * cfg
@ create_rdma_connection_request
@ create_rdma_connection_response
std::unique_ptr< storage::control::comch_channel > make_comch_server_control_channel(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::unique_ptr< storage::control::channel > make_tcp_client_control_channel(storage::ip_address const &server_address)
std::string to_string(storage::control::message_type type)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
ip_address parse_ip_v4_address(char const *value)
void uninstall_ctrl_c_handler()
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
static constexpr value_requirement optional_value
void create_doca_logger_backend(void) noexcept
void install_ctrl_c_handler(std::function< void(void)> callback)
void aligned_free(void *memory)
doca_comch_producer * make_comch_producer(doca_comch_connection *conn, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_producer_task_send_completion_cb_t task_cb, doca_comch_producer_task_send_completion_cb_t error_cb)
doca_rdma * make_rdma_context(doca_dev *dev, doca_pe *pe, doca_data ctx_user_data, uint32_t permissions)
static constexpr value_multiplicity multiple_values
void * aligned_alloc(size_t alignment, size_t size)
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
doca_ec_matrix_type matrix_type_from_string(std::string const &matrix_type)
size_t aligned_size(size_t alignment, size_t size)
char * get_buffer_bytes(doca_buf *buf) noexcept
doca_dev_rep * open_representor(doca_dev *dev, std::string const &identifier)
static constexpr value_multiplicity single_value
doca_comch_consumer * make_comch_consumer(doca_comch_connection *conn, doca_mmap *mmap, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_consumer_task_post_recv_completion_cb_t task_cb, doca_comch_consumer_task_post_recv_completion_cb_t error_cb)
constexpr size_t size_of_io_message
constexpr uint32_t cache_line_size
static constexpr value_requirement required_value
uint32_t get_system_page_size(void)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
doca_dev * open_device(std::string const &identifier)
storage::control::correlation_id correlation_id
storage::control::message_id message_id
std::unique_ptr< storage::control::message::payload > payload
storage::control::message_type message_type
Convenience type for representing opaque data.
struct upf_accel_ctx * ctx