NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
comch_to_rdma_gga_offload.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2025 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include <atomic>
27 #include <chrono>
28 #include <cstdio>
29 #include <memory>
30 #include <numeric>
31 #include <stdexcept>
32 #include <sstream>
33 #include <thread>
34 
35 #include <doca_argp.h>
36 #include <doca_buf.h>
37 #include <doca_buf_inventory.h>
38 #include <doca_comch.h>
39 #include <doca_comch_consumer.h>
40 #include <doca_comch_producer.h>
41 #include <doca_compress.h>
42 #include <doca_ctx.h>
43 #include <doca_dev.h>
44 #include <doca_erasure_coding.h>
45 #include <doca_error.h>
46 #include <doca_log.h>
47 #include <doca_mmap.h>
48 #include <doca_pe.h>
49 #include <doca_version.h>
50 
60 
61 DOCA_LOG_REGISTER(gga_offload);
62 
63 using namespace std::string_literals;
64 
65 namespace {
66 auto constexpr app_name = "doca_storage_comch_to_rdma_gga_offload";
67 
68 auto constexpr default_control_timeout_seconds = std::chrono::seconds{5};
69 auto constexpr default_command_channel_name = "doca_storage_comch";
70 
71 static_assert(sizeof(void *) == 8, "Expected a pointer to occupy 8 bytes");
72 
73 enum class connection_role : uint8_t {
74  data_1 = 0,
75  data_2 = 1,
76  data_p = 2,
77  client = 3,
78 };
79 
80 template <typename T, size_t N>
81 class per_connection_t {
82 public:
83  T &operator[](connection_role role)
84  {
85 #ifdef DOCA_DEBUG
86  if (static_cast<uint8_t>(role) > N) {
87  throw std::range_error{std::to_string(static_cast<uint8_t>(role)) + " exceeds range " +
88  std::to_string(N)};
89  }
90 #endif
91  return m_items[static_cast<uint8_t>(role)];
92  }
93 
94  T const &operator[](connection_role role) const
95  {
96 #ifdef DOCA_DEBUG
97  if (static_cast<uint8_t>(role) > N) {
98  throw std::range_error{std::to_string(static_cast<uint8_t>(role)) + " exceeds range " +
99  std::to_string(N)};
100  }
101 #endif
102  return m_items[static_cast<uint8_t>(role)];
103  }
104 
105  T *begin() noexcept
106  {
107  return m_items.data();
108  }
109 
110  T *end() noexcept
111  {
112  return m_items.data() + m_items.size();
113  }
114 
115  T const *begin() const noexcept
116  {
117  return m_items.data();
118  }
119 
120  T const *end() const noexcept
121  {
122  return m_items.data() + m_items.size();
123  }
124 
125 private:
126  std::array<T, N> m_items;
127 };
128 
129 template <typename T>
130 using per_ctrl_connection = per_connection_t<T, 4>;
131 
132 template <typename T>
133 using per_storage_connection = per_connection_t<T, 3>;
134 
135 struct gga_offload_app_configuration {
136  std::vector<uint32_t> cpu_set = {};
137  std::string device_id = {};
138  std::string representor_id = {};
139  std::string command_channel_name = {};
140  std::chrono::seconds control_timeout = {};
141  per_storage_connection<storage::ip_address> storage_server_address = {};
142  std::string ec_matrix_type = {};
143  uint32_t recover_freq = {};
144 };
145 
146 struct thread_stats {
147  uint32_t core_idx = 0;
148  uint64_t pe_hit_count = 0;
149  uint64_t pe_miss_count = 0;
150  uint64_t operation_count = 0;
151  uint64_t recovery_count = 0;
152 };
153 
154 enum class transaction_mode : uint8_t {
155  read,
156  recover_a,
157  recover_b,
158 };
159 
160 /*
161  * IO memory layout:
162  *
163  * Host and each of the 3 storage servers have a full block size * block count amount of storage.
164  *
165  * A host read will be split into two parts were each part comes from two of the 3 storage servers. In the case of a
166  * "normal" read, the top half of the host request is filled by data_1 and the bottom half by data_2. These two halfs
167  * are then treated as one region surrounded by a header and trailer to know how much compressed data is in the middle
168  * part. That middle part is then decompressed as a single buffer and the output 2 * block_size is returned to the host.
169  *
170  * In a recovery read data_1 or data_2 fills its part as per usual, the parity data is read
171  * from data_p. This data_p is provided with the other data chunk to doca_ec to restore the missing part
172  *
173  */
174 class gga_offload_app_worker {
175 public:
176  struct alignas(storage::cache_line_size) transaction_context {
177  uint32_t array_idx;
178  uint32_t remaining_op_count;
179  per_storage_connection<char *> io_message;
180  per_storage_connection<doca_rdma_task_send *> requests;
181  per_storage_connection<doca_rdma_task_receive *> responses;
182  doca_ec_task_recover *ec_recover_task;
183  doca_compress_task_decompress_lz4_stream *decompress_task;
184  doca_comch_producer_task_send *host_response_task;
185  doca_comch_consumer_task_post_recv *host_request_task;
186  uint32_t block_idx;
187  uint32_t io_size;
188  transaction_mode mode;
189  };
190 
191  static_assert(sizeof(gga_offload_app_worker::transaction_context) == (storage::cache_line_size * 2),
192  "Expected thread_context::transaction_context to occupy two cache lines");
193 
194  struct alignas(storage::cache_line_size) hot_data {
195  doca_pe *pe;
196  uint64_t remote_memory_start_addr;
197  uint64_t local_memory_start_addr;
198  uint64_t storage_capacity;
199  uint64_t pe_hit_count;
200  uint64_t pe_miss_count;
201  uint64_t recovery_flow_count;
202  uint64_t completed_transaction_count;
203  transaction_context *transactions;
204  uint32_t in_flight_transaction_count;
205  uint32_t block_size;
206  uint32_t half_block_size;
207  uint16_t task_count;
208  uint16_t core_idx;
209  uint16_t recover_drop_count;
210  uint16_t recover_drop_freq;
211  std::atomic_bool run_flag;
212  bool error_flag;
213 
214  hot_data();
215 
216  hot_data(hot_data const &other) = delete;
217 
218  hot_data(hot_data &&other) noexcept;
219 
220  hot_data &operator=(hot_data const &other) = delete;
221 
222  hot_data &operator=(hot_data &&other) noexcept;
223 
224  doca_error_t start_transaction(doca_comch_consumer_task_post_recv *task, char const *io_message);
225 
226  void process_result(gga_offload_app_worker::transaction_context &transaction);
227 
228  void start_decompress(gga_offload_app_worker::transaction_context &transaction);
229 
230  void start_recover(gga_offload_app_worker::transaction_context &transaction);
231  };
232 
233  static_assert(sizeof(gga_offload_app_worker::hot_data) == (storage::cache_line_size * 2),
234  "Expected thread_context::hot_data to occupy two cache lines");
235 
236  ~gga_offload_app_worker();
237 
238  gga_offload_app_worker() = delete;
239 
240  gga_offload_app_worker(doca_dev *dev,
241  doca_comch_connection *comch_conn,
242  uint32_t task_count,
243  uint32_t batch_size,
244  std::string const &ec_matrix_type,
245  uint32_t recover_drop_freq);
246 
247  gga_offload_app_worker(gga_offload_app_worker const &) = delete;
248 
249  [[maybe_unused]] gga_offload_app_worker(gga_offload_app_worker &&) noexcept;
250 
251  gga_offload_app_worker &operator=(gga_offload_app_worker const &) = delete;
252 
253  [[maybe_unused]] gga_offload_app_worker &operator=(gga_offload_app_worker &&) noexcept;
254 
255  std::vector<uint8_t> get_local_rdma_connection_blob(connection_role conn_role,
256  storage::control::rdma_connection_role rdma_role);
257 
258  void connect_rdma(connection_role conn_role,
259  storage::control::rdma_connection_role rdma_role,
260  std::vector<uint8_t> const &blob);
261 
262  doca_error_t get_connections_state() const noexcept;
263 
264  void stop_processing(void) noexcept;
265 
266  void destroy_comch_objects(void) noexcept;
267 
268  void create_tasks(uint32_t task_count,
269  uint32_t batch_size,
270  uint32_t block_size,
271  uint32_t remote_consumer_id,
272  doca_mmap *local_io_mmap,
273  doca_mmap *remote_io_mmap);
274 
275  /*
276  * Prepare thread proc
277  * @core_id [in]: Core to run on
278  */
279  void prepare_thread_proc(uint32_t core_id);
280 
281  void start_thread_proc();
282 
283  [[nodiscard]] bool is_thread_proc_running() const noexcept;
284 
285  [[nodiscard]] hot_data const &get_hot_data() const noexcept;
286 
287 private:
288  struct rdma_context {
291  std::vector<doca_rdma_task_send *> storage_request_tasks;
292  std::vector<doca_rdma_task_receive *> storage_response_tasks;
293  };
294 
295  hot_data m_hot_data;
296  uint8_t *m_io_message_region;
297  doca_mmap *m_io_message_mmap;
298  doca_buf_inventory *m_buf_inv;
299  std::vector<doca_buf *> m_io_message_bufs;
300  doca_comch_consumer *m_consumer;
301  doca_comch_producer *m_producer;
302  doca_ec *m_ec;
303  doca_ec_matrix *m_ec_matrix;
304  doca_compress *m_compress;
305  per_storage_connection<rdma_context> m_rdma;
306  std::vector<doca_comch_consumer_task_post_recv *> m_host_request_tasks;
307  std::vector<doca_comch_producer_task_send *> m_host_response_tasks;
308  std::thread m_thread;
309 
310  void init(doca_dev *dev,
311  doca_comch_connection *comch_conn,
312  uint32_t task_count,
313  uint32_t batch_size,
314  std::string const &ec_matrix_type,
315  uint32_t recover_drop_freq);
316 
317  void cleanup(void) noexcept;
318 
319  void create_gga_tasks(uint32_t block_size, doca_mmap *local_io_mmap, doca_mmap *remote_io_mmap);
320 
321  void prepare_transaction_part(uint32_t idx, uint8_t *io_message_addr, connection_role role);
322 
323  static void doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
324  doca_data task_user_data,
325  doca_data ctx_user_data) noexcept;
326 
327  static void doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
328  doca_data task_user_data,
329  doca_data ctx_user_data) noexcept;
330 
331  static void doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
332  doca_data task_user_data,
333  doca_data ctx_user_data) noexcept;
334 
335  static void doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
336  doca_data task_user_data,
337  doca_data ctx_user_data) noexcept;
338 
339  static void doca_rdma_task_send_cb(doca_rdma_task_send *task,
340  doca_data task_user_data,
341  doca_data ctx_user_data) noexcept;
342 
343  static void doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
344  doca_data task_user_data,
345  doca_data ctx_user_data) noexcept;
346 
347  static void doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
348  doca_data task_user_data,
349  doca_data ctx_user_data) noexcept;
350 
351  static void doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
352  doca_data task_user_data,
353  doca_data ctx_user_data) noexcept;
354 
355  static void doca_ec_task_recover_cb(doca_ec_task_recover *task,
356  doca_data task_user_data,
357  doca_data ctx_user_data) noexcept;
358 
359  static void doca_ec_task_recover_error_cb(doca_ec_task_recover *task,
360  doca_data task_user_data,
361  doca_data ctx_user_data) noexcept;
362 
363  static void doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task,
364  doca_data task_user_data,
365  doca_data ctx_user_data) noexcept;
366 
367  static void doca_compress_task_decompress_lz4_stream_error_cb(doca_compress_task_decompress_lz4_stream *task,
368  doca_data task_user_data,
369  doca_data ctx_user_data) noexcept;
370 
371  void thread_proc();
372 };
373 
374 class gga_offload_app {
375 public:
376  ~gga_offload_app();
377 
378  gga_offload_app() = delete;
379 
380  explicit gga_offload_app(gga_offload_app_configuration const &cfg);
381 
382  gga_offload_app(gga_offload_app const &) = delete;
383 
384  gga_offload_app(gga_offload_app &&) noexcept = delete;
385 
386  gga_offload_app &operator=(gga_offload_app const &) = delete;
387 
388  gga_offload_app &operator=(gga_offload_app &&) noexcept = delete;
389 
390  void abort(std::string const &reason);
391 
392  void connect_to_storage(void);
393 
394  void wait_for_comch_client_connection(void);
395 
396  void wait_for_and_process_query_storage(void);
397 
398  void wait_for_and_process_init_storage(void);
399 
400  void wait_for_and_process_start_storage(void);
401 
402  void wait_for_and_process_stop_storage(void);
403 
404  void wait_for_and_process_shutdown(void);
405 
406  void display_stats(void) const;
407 
408 private:
409  gga_offload_app_configuration const m_cfg;
410  doca_dev *m_dev;
411  doca_dev_rep *m_dev_rep;
412  doca_mmap *m_remote_io_mmap;
413  uint8_t *m_local_io_region;
414  doca_mmap *m_local_io_mmap;
415  per_ctrl_connection<std::unique_ptr<storage::control::channel>> m_all_ctrl_channels;
416  per_storage_connection<storage::control::channel *> m_storage_ctrl_channels;
417  std::vector<storage::control::message> m_ctrl_messages;
418  std::vector<uint32_t> m_remote_consumer_ids;
419  gga_offload_app_worker *m_workers;
420  std::vector<thread_stats> m_stats;
421  uint64_t m_storage_capacity;
422  uint32_t m_storage_block_size;
423  uint32_t m_message_id_counter;
424  uint32_t m_task_count;
425  uint32_t m_batch_size;
426  uint32_t m_core_count;
427  bool m_abort_flag;
428 
429  static void new_comch_consumer_callback(void *user_data, uint32_t id) noexcept;
430 
431  static void expired_comch_consumer_callback(void *user_data, uint32_t id) noexcept;
432 
433  storage::control::message wait_for_control_message();
434 
435  void wait_for_responses(std::vector<storage::control::message_id> const &mids, std::chrono::seconds timeout);
436 
437  storage::control::message get_response(storage::control::message_id mids);
438 
439  void discard_responses(std::vector<storage::control::message_id> const &mids);
440 
441  storage::control::message process_query_storage(storage::control::message const &client_request);
442 
443  storage::control::message process_init_storage(storage::control::message const &client_request);
444 
445  storage::control::message process_start_storage(storage::control::message const &client_request);
446 
447  storage::control::message process_stop_storage(storage::control::message const &client_request);
448 
449  storage::control::message process_shutdown(storage::control::message const &client_requeste);
450 
451  void prepare_thread_contexts(storage::control::correlation_id cid);
452 
453  void connect_rdma(uint32_t thread_idx,
454  storage::control::rdma_connection_role role,
455  storage::control::correlation_id cid);
456 
457  void verify_connections_are_ready(void);
458 
459  void destroy_workers(void) noexcept;
460 };
461 
462 /*
463  * Parse command line arguments
464  *
465  * @argc [in]: Number of arguments
466  * @argv [in]: Array of argument values
467  * @return: Parsed gga_offload_app_configuration
468  *
469  * @throws: storage::runtime_error If the gga_offload_app_configuration cannot pe parsed or contains invalid values
470  */
471 gga_offload_app_configuration parse_cli_args(int argc, char **argv);
472 } // namespace
473 
474 /*
475  * Main
476  *
477  * @argc [in]: Number of arguments
478  * @argv [in]: Array of argument values
479  * @return: EXIT_SUCCESS on success and EXIT_FAILURE otherwise
480  */
481 int main(int argc, char **argv)
482 {
484 
485  printf("%s: v%s\n", app_name, doca_version());
486 
487  try {
488  gga_offload_app app{parse_cli_args(argc, argv)};
490  app.abort("User requested abort");
491  });
492 
493  app.connect_to_storage();
494  app.wait_for_comch_client_connection();
495  app.wait_for_and_process_query_storage();
496  app.wait_for_and_process_init_storage();
497  app.wait_for_and_process_start_storage();
498  app.wait_for_and_process_stop_storage();
499  app.wait_for_and_process_shutdown();
500  app.display_stats();
501  } catch (std::exception const &ex) {
502  fprintf(stderr, "EXCEPTION: %s\n", ex.what());
503  fflush(stdout);
504  fflush(stderr);
505  return EXIT_FAILURE;
506  }
507 
509 
510  return EXIT_SUCCESS;
511 }
512 
513 namespace {
514 /*
515  * Print the parsed gga_offload_app_configuration
516  *
517  * @cfg [in]: gga_offload_app_configuration to display
518  */
519 void print_config(gga_offload_app_configuration const &cfg) noexcept
520 {
521  printf("gga_offload_app_configuration: {\n");
522  printf("\tcpu_set : [");
523  bool first = true;
524  for (auto cpu : cfg.cpu_set) {
525  if (first)
526  first = false;
527  else
528  printf(", ");
529  printf("%u", cpu);
530  }
531  printf("]\n");
532  printf("\tdevice : \"%s\",\n", cfg.device_id.c_str());
533  printf("\trepresentor : \"%s\",\n", cfg.representor_id.c_str());
534  printf("\tcommand_channel_name : \"%s\",\n", cfg.command_channel_name.c_str());
535  printf("\tcontrol_timeout : %u,\n", static_cast<uint32_t>(cfg.control_timeout.count()));
536  printf("\tstorage_server[data_1] : %s:%u\n",
537  cfg.storage_server_address[connection_role::data_1].get_address().c_str(),
538  cfg.storage_server_address[connection_role::data_1].get_port());
539  printf("\tdata_2_storage_server : %s:%u\n",
540  cfg.storage_server_address[connection_role::data_2].get_address().c_str(),
541  cfg.storage_server_address[connection_role::data_2].get_port());
542  printf("\tdata_p_storage_server : %s:%u\n",
543  cfg.storage_server_address[connection_role::data_p].get_address().c_str(),
544  cfg.storage_server_address[connection_role::data_p].get_port());
545  printf("\trecover_freq : %u\n", cfg.recover_freq);
546  printf("}\n");
547 }
548 
549 /*
550  * Validate gga_offload_app_configuration
551  *
552  * @cfg [in]: gga_offload_app_configuration
553  */
554 void validate_gga_offload_app_configuration(gga_offload_app_configuration const &cfg)
555 {
556  std::vector<std::string> errors;
557 
558  if (cfg.control_timeout.count() == 0) {
559  errors.emplace_back("Invalid gga_offload_app_configuration: control-timeout must not be zero");
560  }
561 
562  if (!errors.empty()) {
563  for (auto const &err : errors) {
564  printf("%s\n", err.c_str());
565  }
567  "Invalid gga_offload_app_configuration detected"};
568  }
569 }
570 
571 /*
572  * Parse command line arguments
573  *
574  * @argc [in]: Number of arguments
575  * @argv [in]: Array of argument values
576  * @return: Parsed gga_offload_app_configuration
577  *
578  * @throws: storage::runtime_error If the gga_offload_app_configuration cannot pe parsed or contains invalid values
579  */
580 gga_offload_app_configuration parse_cli_args(int argc, char **argv)
581 {
582  gga_offload_app_configuration config{};
583  config.command_channel_name = default_command_channel_name;
584  config.control_timeout = default_control_timeout_seconds;
585  config.ec_matrix_type = "vandermonde";
586 
587  doca_error_t ret;
588 
589  ret = doca_argp_init(app_name, &config);
590  if (ret != DOCA_SUCCESS) {
591  throw storage::runtime_error{ret, "Failed to parse CLI args"};
592  }
593 
595  "d",
596  "device",
597  "Device identifier",
600  [](void *value, void *cfg) noexcept {
601  static_cast<gga_offload_app_configuration *>(cfg)->device_id =
602  static_cast<char const *>(value);
603  return DOCA_SUCCESS;
604  });
606  "r",
607  "representor",
608  "Device host side representor identifier",
611  [](void *value, void *cfg) noexcept {
612  static_cast<gga_offload_app_configuration *>(cfg)->representor_id =
613  static_cast<char const *>(value);
614  return DOCA_SUCCESS;
615  });
617  nullptr,
618  "cpu",
619  "CPU core to which the process affinity can be set",
622  [](void *value, void *cfg) noexcept {
623  static_cast<gga_offload_app_configuration *>(cfg)->cpu_set.push_back(
624  *static_cast<int *>(value));
625  return DOCA_SUCCESS;
626  });
628  nullptr,
629  "data-1-storage",
630  "Storage server addresses in <ip_addr>:<port> format",
633  [](void *value, void *cfg) noexcept {
634  try {
635  static_cast<gga_offload_app_configuration *>(cfg)
636  ->storage_server_address[connection_role::data_1] =
638  static_cast<char const *>(value));
639  return DOCA_SUCCESS;
640  } catch (storage::runtime_error const &ex) {
642  }
643  });
645  nullptr,
646  "data-2-storage",
647  "Storage server addresses in <ip_addr>:<port> format",
650  [](void *value, void *cfg) noexcept {
651  try {
652  static_cast<gga_offload_app_configuration *>(cfg)
653  ->storage_server_address[connection_role::data_2] =
655  static_cast<char const *>(value));
656  return DOCA_SUCCESS;
657  } catch (storage::runtime_error const &ex) {
659  }
660  });
662  nullptr,
663  "data-p-storage",
664  "Storage server addresses in <ip_addr>:<port> format",
667  [](void *value, void *cfg) noexcept {
668  try {
669  static_cast<gga_offload_app_configuration *>(cfg)
670  ->storage_server_address[connection_role::data_p] =
672  static_cast<char const *>(value));
673  return DOCA_SUCCESS;
674  } catch (storage::runtime_error const &ex) {
676  }
677  });
678 
680  nullptr,
681  "matrix-type",
682  "Type of matrix to use. One of: cauchy, vandermonde Default: vandermonde",
685  [](void *value, void *cfg) noexcept {
686  static_cast<gga_offload_app_configuration *>(cfg)->ec_matrix_type =
687  static_cast<char const *>(value);
688  return DOCA_SUCCESS;
689  });
692  nullptr,
693  "command-channel-name",
694  "Name of the channel used by the doca_comch_client. Default: \"doca_storage_comch\"",
697  [](void *value, void *cfg) noexcept {
698  static_cast<gga_offload_app_configuration *>(cfg)->command_channel_name =
699  static_cast<char const *>(value);
700  return DOCA_SUCCESS;
701  });
703  nullptr,
704  "control-timeout",
705  "Time (in seconds) to wait while performing control operations. Default: 5",
708  [](void *value, void *cfg) noexcept {
709  static_cast<gga_offload_app_configuration *>(cfg)->control_timeout =
710  std::chrono::seconds{*static_cast<int *>(value)};
711  return DOCA_SUCCESS;
712  });
714  nullptr,
715  "trigger-recovery-read-every-n",
716  "Trigger a recovery read flow every N th request. Default: 0 (disabled)",
719  [](void *value, void *cfg) noexcept {
720  static_cast<gga_offload_app_configuration *>(cfg)->recover_freq =
721  *static_cast<int *>(value);
722  return DOCA_SUCCESS;
723  });
724  ret = doca_argp_start(argc, argv);
725  if (ret != DOCA_SUCCESS) {
726  throw storage::runtime_error{ret, "Failed to parse CLI args"};
727  }
728 
729  static_cast<void>(doca_argp_destroy());
730 
731  print_config(config);
732  validate_gga_offload_app_configuration(config);
733 
734  return config;
735 }
736 
737 storage::control::message make_error_response(storage::control::message_id const &request_mid,
739  storage::control::message response,
740  storage::control::message_type expected_response_type)
741 {
742  doca_error_t err_code;
743  std::string err_msg;
745  auto *const err_details =
746  dynamic_cast<storage::control::error_response_payload *>(response.payload.get());
747  if (err_details == nullptr) {
748  throw storage::runtime_error{DOCA_ERROR_UNEXPECTED, "[BUG] invalid error_response"};
749  }
750 
751  err_code = err_details->error_code;
752  err_msg = std::move(err_details->message);
753  } else {
754  err_code = DOCA_ERROR_UNEXPECTED;
755  err_msg = "Unexpected " + to_string(response.message_type) + " while expecting a " +
756  to_string(expected_response_type);
757  }
758 
761  request_mid,
762  request_cid,
763  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
764  };
765 }
766 
767 char *io_message_from_doca_buf(doca_buf const *buf)
768 {
769  void *data;
770  static_cast<void>(doca_buf_get_data(buf, &data));
771  return static_cast<char *>(data);
772 }
773 
774 gga_offload_app_worker::hot_data::hot_data()
775  : pe{nullptr},
776  remote_memory_start_addr{0},
777  local_memory_start_addr{0},
778  storage_capacity{0},
779  pe_hit_count{0},
780  pe_miss_count{0},
781  recovery_flow_count{0},
782  completed_transaction_count{0},
783  transactions{nullptr},
784  in_flight_transaction_count{0},
785  block_size{0},
786  half_block_size{0},
787  task_count{0},
788  core_idx{0},
789  recover_drop_count{0},
790  recover_drop_freq{0},
791  run_flag{false},
792  error_flag{false}
793 {
794 }
795 
796 gga_offload_app_worker::hot_data::hot_data(hot_data &&other) noexcept
797  : pe{other.pe},
798  remote_memory_start_addr{other.remote_memory_start_addr},
799  local_memory_start_addr{other.local_memory_start_addr},
800  storage_capacity{other.storage_capacity},
801  pe_hit_count{other.pe_hit_count},
802  pe_miss_count{other.pe_miss_count},
803  recovery_flow_count{other.recovery_flow_count},
804  completed_transaction_count{other.completed_transaction_count},
805  transactions{other.transactions},
806  in_flight_transaction_count{other.in_flight_transaction_count},
807  block_size{other.block_size},
808  half_block_size{other.half_block_size},
809  task_count{other.task_count},
810  core_idx{other.core_idx},
811  recover_drop_count{other.recover_drop_count},
812  recover_drop_freq{other.recover_drop_freq},
813  run_flag{other.run_flag.load()},
814  error_flag{other.error_flag}
815 {
816  other.pe = nullptr;
817  other.transactions = nullptr;
818 }
819 
820 gga_offload_app_worker::hot_data &gga_offload_app_worker::hot_data::operator=(hot_data &&other) noexcept
821 {
822  if (std::addressof(other) == this)
823  return *this;
824 
825  pe = other.pe;
826  remote_memory_start_addr = other.remote_memory_start_addr;
827  local_memory_start_addr = other.local_memory_start_addr;
828  storage_capacity = other.storage_capacity;
829  pe_hit_count = other.pe_hit_count;
830  pe_miss_count = other.pe_miss_count;
831  recovery_flow_count = other.recovery_flow_count;
832  completed_transaction_count = other.completed_transaction_count;
833  transactions = other.transactions;
834  in_flight_transaction_count = other.in_flight_transaction_count;
835  block_size = other.block_size;
836  half_block_size = other.half_block_size;
837  task_count = other.task_count;
838  core_idx = other.core_idx;
839  recover_drop_count = other.recover_drop_count;
840  recover_drop_freq = other.recover_drop_freq;
841  run_flag = other.run_flag.load();
842  error_flag = other.error_flag;
843 
844  other.pe = nullptr;
845  other.transactions = nullptr;
846 
847  return *this;
848 }
849 
850 doca_error_t gga_offload_app_worker::hot_data::start_transaction(doca_comch_consumer_task_post_recv *task,
851  char const *io_message)
852 {
853  auto const type = storage::io_message_view::get_type(io_message);
854 
856  error_flag = true;
858  }
859 
860  auto const cid = storage::io_message_view::get_correlation_id(io_message);
861 
862  auto &transaction = transactions[cid];
863 
864  if (transaction.remaining_op_count != 0) {
865  error_flag = true;
866  return DOCA_ERROR_BAD_STATE;
867  }
868 
869  transaction.host_request_task = task;
870  transaction.remaining_op_count = 4; // 2 * rdma send + 2 * rdma recv
871 
872  connection_role part_a_conn = connection_role::data_1;
873  connection_role part_b_conn = connection_role::data_2;
874 
875  transaction.mode = transaction_mode::read;
876 
877  transaction.io_size = storage::io_message_view::get_io_size(io_message);
878  auto const half_block_size = transaction.io_size / 2;
879 
880  auto const host_io_addr = storage::io_message_view::get_io_address(io_message);
881  auto const io_offset = host_io_addr - remote_memory_start_addr;
882  transaction.block_idx = io_offset / block_size;
883 
884  uint32_t remote_offset_a = 0;
885  uint32_t remote_offset_b = 0;
886 
887  if (recover_drop_freq != 0 && (--recover_drop_count) == 0) {
888  recover_drop_count = recover_drop_freq;
889  ++recovery_flow_count;
890 
891  if (recovery_flow_count % 2 == 0) {
892  transaction.mode = transaction_mode::recover_a;
893  part_a_conn = connection_role::data_p;
894  part_b_conn = connection_role::data_2;
895  remote_offset_a = storage_capacity - (transaction.block_idx * half_block_size);
896  } else {
897  transaction.mode = transaction_mode::recover_b;
898  part_a_conn = connection_role::data_1;
899  part_b_conn = connection_role::data_p;
900  remote_offset_b =
901  storage_capacity - (half_block_size + (transaction.block_idx * half_block_size));
902  }
903  }
904 
905  auto *part_a_io_message = transaction.io_message[part_a_conn];
906  auto *part_b_io_message = transaction.io_message[part_b_conn];
907  auto *response_io_message =
908  io_message_from_doca_buf(doca_comch_producer_task_send_get_buf(transaction.host_response_task));
909 
910  auto const local_io_addr = local_memory_start_addr + io_offset;
911  auto const user_data = storage::io_message_view::get_user_data(io_message);
912 
913  storage::io_message_view::set_correlation_id(cid, part_a_io_message);
914  storage::io_message_view::set_correlation_id(cid, part_b_io_message);
915  storage::io_message_view::set_correlation_id(cid, response_io_message);
916 
917  storage::io_message_view::set_type(type, part_a_io_message);
918  storage::io_message_view::set_type(type, part_b_io_message);
920 
921  storage::io_message_view::set_user_data(user_data, part_a_io_message);
922  storage::io_message_view::set_user_data(user_data, part_b_io_message);
923  storage::io_message_view::set_user_data(user_data, response_io_message);
924 
925  storage::io_message_view::set_io_address(local_io_addr, part_a_io_message);
926  storage::io_message_view::set_io_address(local_io_addr + half_block_size, part_b_io_message);
927  storage::io_message_view::set_io_address(host_io_addr, response_io_message);
928 
929  storage::io_message_view::set_io_size(half_block_size, part_a_io_message);
930  storage::io_message_view::set_io_size(half_block_size, part_b_io_message);
931  storage::io_message_view::set_io_size(transaction.io_size, response_io_message);
932 
933  storage::io_message_view::set_remote_offset(remote_offset_a, part_a_io_message);
934  storage::io_message_view::set_remote_offset(remote_offset_b, part_b_io_message);
935 
937 
938  doca_error_t ret;
939 
940  /*
941  * NOTE: if either send task fails intentionally leave things as they are (remaining_op_count for example) as
942  * any parts that were sent should NOT trigger and action upon their eventual completion
943  */
944  ret = doca_task_submit(doca_rdma_task_send_as_task(transaction.requests[part_a_conn]));
945  if (ret != DOCA_SUCCESS) {
946  error_flag = true;
947  return ret;
948  }
949 
950  ret = doca_task_submit(doca_rdma_task_send_as_task(transaction.requests[part_b_conn]));
951  if (ret != DOCA_SUCCESS) {
952  error_flag = true;
953  return ret;
954  }
955 
956  ++in_flight_transaction_count;
957 
958  return DOCA_SUCCESS;
959 }
960 
961 void gga_offload_app_worker::hot_data::process_result(gga_offload_app_worker::transaction_context &transaction)
962 {
963  if (transaction.mode == transaction_mode::read) {
964  transaction.remaining_op_count = 1;
965  start_decompress(transaction);
966  } else {
967  transaction.remaining_op_count = 2;
968  start_recover(transaction);
969  }
970 }
971 
972 void gga_offload_app_worker::hot_data::start_decompress(gga_offload_app_worker::transaction_context &transaction)
973 {
974  auto const io_offset = transaction.block_idx * block_size;
975  auto *const local_block_start = reinterpret_cast<char *>(local_memory_start_addr) + io_offset;
976  auto const *hdr = reinterpret_cast<storage::compressed_block_header const *>(local_block_start);
977 
978  auto *src_buff =
979  const_cast<doca_buf *>(doca_compress_task_decompress_lz4_stream_get_src(transaction.decompress_task));
980  static_cast<void>(doca_buf_set_data(src_buff,
981  local_block_start + sizeof(storage::compressed_block_header),
982  be32toh(hdr->compressed_size)));
983 
984  auto *dst_buff = doca_compress_task_decompress_lz4_stream_get_dst(transaction.decompress_task);
985  static_cast<void>(
986  doca_buf_set_data(dst_buff, reinterpret_cast<char *>(remote_memory_start_addr) + io_offset, 0));
987 
988  // do decompress
989  auto const ret =
991  if (ret != DOCA_SUCCESS) {
992  DOCA_LOG_ERR("Failed to submit decompress task");
993  error_flag = true;
994  run_flag = false;
995  }
996 }
997 
998 void gga_offload_app_worker::hot_data::start_recover(gga_offload_app_worker::transaction_context &transaction)
999 {
1000  doca_buf *d1_buf;
1001  doca_buf *d2_buf;
1002  doca_buf *dp_buf;
1003  uint32_t d1_len;
1004  uint32_t d2_len;
1005 
1006  auto *const d1_addr = reinterpret_cast<char *>(local_memory_start_addr) + (transaction.block_idx * block_size);
1007  auto *const dp_addr = reinterpret_cast<char *>(local_memory_start_addr) + storage_capacity +
1008  (transaction.block_idx * half_block_size);
1009 
1010  if (transaction.mode == transaction_mode::recover_a) {
1011  dp_buf = const_cast<doca_buf *>(doca_ec_task_recover_get_available_blocks(transaction.ec_recover_task));
1012  static_cast<void>(doca_buf_get_next_in_list(dp_buf, &d2_buf));
1013  d1_buf = doca_ec_task_recover_get_recovered_data(transaction.ec_recover_task);
1014  d1_len = 0; /* d1 is output so clear its size */
1015  d2_len = half_block_size;
1016  } else {
1017  d1_buf = const_cast<doca_buf *>(doca_ec_task_recover_get_available_blocks(transaction.ec_recover_task));
1018  static_cast<void>(doca_buf_get_next_in_list(d1_buf, &dp_buf));
1019  d2_buf = doca_ec_task_recover_get_recovered_data(transaction.ec_recover_task);
1020  d1_len = half_block_size;
1021  d2_len = 0; /* d2 is output so clear its size */
1022  }
1023 
1024  static_cast<void>(doca_buf_set_data(d1_buf, d1_addr, d1_len));
1025  static_cast<void>(doca_buf_set_data(d2_buf, d1_addr + half_block_size, d2_len));
1026  static_cast<void>(doca_buf_set_data(dp_buf, dp_addr, half_block_size));
1027 
1028  // do recover
1029  auto const ret = doca_task_submit(doca_ec_task_recover_as_task(transaction.ec_recover_task));
1030  if (ret != DOCA_SUCCESS) {
1031  DOCA_LOG_ERR("Failed to submit decompress task");
1032  error_flag = true;
1033  run_flag = false;
1034  }
1035 }
1036 
1037 gga_offload_app_worker::~gga_offload_app_worker()
1038 {
1039  if (m_thread.joinable()) {
1040  m_hot_data.run_flag = false;
1041  m_hot_data.error_flag = true;
1042  m_thread.join();
1043  }
1044  cleanup();
1045 }
1046 
1047 gga_offload_app_worker::gga_offload_app_worker(doca_dev *dev,
1048  doca_comch_connection *comch_conn,
1049  uint32_t task_count,
1050  uint32_t batch_size,
1051  std::string const &ec_matrix_type,
1052  uint32_t recover_drop_freq)
1053  : m_hot_data{},
1054  m_io_message_region{nullptr},
1055  m_io_message_mmap{nullptr},
1056  m_buf_inv{nullptr},
1057  m_io_message_bufs{},
1058  m_consumer{nullptr},
1059  m_producer{nullptr},
1060  m_ec{nullptr},
1061  m_ec_matrix{nullptr},
1062  m_compress{nullptr},
1063  m_rdma{},
1064  m_host_request_tasks{},
1065  m_host_response_tasks{},
1066  m_thread{}
1067 {
1068  try {
1069  init(dev, comch_conn, task_count, batch_size, ec_matrix_type, recover_drop_freq);
1070  } catch (storage::runtime_error const &) {
1071  cleanup();
1072  throw;
1073  }
1074 }
1075 
1076 gga_offload_app_worker::gga_offload_app_worker(gga_offload_app_worker &&other) noexcept
1077  : m_hot_data{std::move(other.m_hot_data)},
1078  m_io_message_region{other.m_io_message_region},
1079  m_io_message_mmap{other.m_io_message_mmap},
1080  m_buf_inv{other.m_buf_inv},
1081  m_io_message_bufs{std::move(other.m_io_message_bufs)},
1082  m_consumer{other.m_consumer},
1083  m_producer{other.m_producer},
1084  m_ec{other.m_ec},
1085  m_ec_matrix{other.m_ec_matrix},
1086  m_compress{other.m_compress},
1087  m_rdma{std::move(other.m_rdma)},
1088  m_host_request_tasks{std::move(other.m_host_request_tasks)},
1089  m_host_response_tasks{std::move(other.m_host_response_tasks)},
1090  m_thread{std::move(other.m_thread)}
1091 {
1092  other.m_io_message_region = nullptr;
1093  other.m_io_message_mmap = nullptr;
1094  other.m_buf_inv = nullptr;
1095  other.m_consumer = nullptr;
1096  other.m_producer = nullptr;
1097  other.m_ec = nullptr;
1098  other.m_ec_matrix = nullptr;
1099  other.m_compress = nullptr;
1100 }
1101 
1102 gga_offload_app_worker &gga_offload_app_worker::operator=(gga_offload_app_worker &&other) noexcept
1103 {
1104  if (std::addressof(other) == this)
1105  return *this;
1106 
1107  cleanup();
1108 
1109  m_hot_data = std::move(other.m_hot_data);
1110  m_io_message_region = other.m_io_message_region;
1111  m_io_message_mmap = other.m_io_message_mmap;
1112  m_buf_inv = other.m_buf_inv;
1113  m_io_message_bufs = std::move(other.m_io_message_bufs);
1114  m_consumer = other.m_consumer;
1115  m_producer = other.m_producer;
1116  m_ec = other.m_ec;
1117  m_ec_matrix = other.m_ec_matrix;
1118  m_compress = other.m_compress;
1119  m_rdma = std::move(other.m_rdma);
1120  m_host_request_tasks = std::move(other.m_host_request_tasks);
1121  m_host_response_tasks = std::move(other.m_host_response_tasks);
1122  m_thread = std::move(other.m_thread);
1123 
1124  other.m_io_message_region = nullptr;
1125  other.m_io_message_mmap = nullptr;
1126  other.m_buf_inv = nullptr;
1127  other.m_consumer = nullptr;
1128  other.m_producer = nullptr;
1129  other.m_ec = nullptr;
1130  other.m_ec_matrix = nullptr;
1131  other.m_compress = nullptr;
1132 
1133  return *this;
1134 }
1135 
1136 std::vector<uint8_t> gga_offload_app_worker::get_local_rdma_connection_blob(
1137  connection_role conn_role,
1139 {
1140  doca_error_t ret;
1141  uint8_t const *blob = nullptr;
1142  size_t blob_size = 0;
1143 
1144  auto &rdma_ctx = m_rdma[conn_role];
1145  auto &rdma_pair = rdma_role == storage::control::rdma_connection_role::io_data ? rdma_ctx.data : rdma_ctx.ctrl;
1146  ret = doca_rdma_export(rdma_pair.rdma,
1147  reinterpret_cast<void const **>(&blob),
1148  &blob_size,
1149  std::addressof(rdma_pair.conn));
1150  if (ret != DOCA_SUCCESS) {
1151  DOCA_LOG_ERR("Core: %u RDMA export failed: %s", m_hot_data.core_idx, doca_error_get_name(ret));
1152  throw storage::runtime_error{ret, "Failed to export rdma connection"};
1153  }
1154 
1155  return std::vector<uint8_t>{blob, blob + blob_size};
1156 }
1157 
1158 void gga_offload_app_worker::connect_rdma(connection_role conn_role,
1160  std::vector<uint8_t> const &blob)
1161 {
1162  auto &rdma_ctx = m_rdma[conn_role];
1163  auto &rdma_pair = rdma_role == storage::control::rdma_connection_role::io_data ? rdma_ctx.data : rdma_ctx.ctrl;
1164 
1165  doca_error_t ret;
1166  ret = doca_rdma_connect(rdma_pair.rdma, blob.data(), blob.size(), rdma_pair.conn);
1167  if (ret != DOCA_SUCCESS) {
1168  DOCA_LOG_ERR("Core: %u RDMA connect failed: %s", m_hot_data.core_idx, doca_error_get_name(ret));
1169  throw storage::runtime_error{ret, "Failed to connect to rdma"};
1170  }
1171 }
1172 
1173 doca_error_t gga_offload_app_worker::get_connections_state() const noexcept
1174 {
1175  doca_error_t ret;
1176  doca_ctx_states ctx_state;
1177  uint32_t pending_count = 0;
1178 
1179  ret = doca_ctx_get_state(doca_comch_producer_as_ctx(m_producer), &ctx_state);
1180  if (ret != DOCA_SUCCESS) {
1181  DOCA_LOG_ERR("Failed to query comch producer state: %s", doca_error_get_name(ret));
1182  return ret;
1183  }
1184 
1185  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
1186  ++pending_count;
1187  static_cast<void>(doca_pe_progress(m_hot_data.pe));
1188  }
1189 
1190  ret = doca_ctx_get_state(doca_comch_consumer_as_ctx(m_consumer), &ctx_state);
1191  if (ret != DOCA_SUCCESS) {
1192  DOCA_LOG_ERR("Failed to query comch consumer state: %s", doca_error_get_name(ret));
1193  return ret;
1194  }
1195 
1196  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
1197  ++pending_count;
1198  static_cast<void>(doca_pe_progress(m_hot_data.pe));
1199  }
1200 
1201  for (auto &ctx : m_rdma) {
1202  ret = doca_ctx_get_state(doca_rdma_as_ctx(ctx.data.rdma), &ctx_state);
1203  if (ret != DOCA_SUCCESS) {
1204  DOCA_LOG_ERR("Failed to query rdma context state: %s", doca_error_get_name(ret));
1205  return ret;
1206  }
1207 
1208  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
1209  ++pending_count;
1210  static_cast<void>(doca_pe_progress(m_hot_data.pe));
1211  }
1212 
1213  ret = doca_ctx_get_state(doca_rdma_as_ctx(ctx.ctrl.rdma), &ctx_state);
1214  if (ret != DOCA_SUCCESS) {
1215  DOCA_LOG_ERR("Failed to query rdma context state: %s", doca_error_get_name(ret));
1216  return ret;
1217  }
1218 
1219  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
1220  ++pending_count;
1221  static_cast<void>(doca_pe_progress(m_hot_data.pe));
1222  }
1223  }
1224 
1225  return (pending_count == 0) ? DOCA_SUCCESS : DOCA_ERROR_IN_PROGRESS;
1226 }
1227 
1228 void gga_offload_app_worker::stop_processing(void) noexcept
1229 {
1230  m_hot_data.run_flag = false;
1231  if (m_thread.joinable()) {
1232  m_thread.join();
1233  }
1234 }
1235 
1236 void gga_offload_app_worker::destroy_comch_objects(void) noexcept
1237 {
1238  doca_error_t ret;
1239  std::vector<doca_task *> tasks;
1240 
1241  if (m_consumer != nullptr) {
1242  tasks.reserve(m_host_request_tasks.size());
1243  std::transform(std::begin(m_host_request_tasks),
1244  std::end(m_host_request_tasks),
1245  std::back_inserter(tasks),
1247  ret = storage::stop_context(doca_comch_consumer_as_ctx(m_consumer), m_hot_data.pe, tasks);
1248  tasks.clear();
1249  if (ret != DOCA_SUCCESS) {
1250  DOCA_LOG_ERR("Failed to stop consumer context");
1251  } else {
1252  m_host_request_tasks.clear();
1253  }
1254  ret = doca_comch_consumer_destroy(m_consumer);
1255  if (ret != DOCA_SUCCESS) {
1256  DOCA_LOG_ERR("Failed to destroy consumer context");
1257  } else {
1258  m_consumer = nullptr;
1259  }
1260  }
1261 
1262  if (m_producer != nullptr) {
1263  tasks.reserve(m_host_response_tasks.size());
1264  std::transform(std::begin(m_host_response_tasks),
1265  std::end(m_host_response_tasks),
1266  std::back_inserter(tasks),
1268  ret = storage::stop_context(doca_comch_producer_as_ctx(m_producer), m_hot_data.pe, tasks);
1269  tasks.clear();
1270  if (ret != DOCA_SUCCESS) {
1271  DOCA_LOG_ERR("Failed to stop producer context");
1272  } else {
1273  m_host_response_tasks.clear();
1274  }
1275  ret = doca_comch_producer_destroy(m_producer);
1276  if (ret != DOCA_SUCCESS) {
1277  DOCA_LOG_ERR("Failed to destroy producer context");
1278  } else {
1279  m_producer = nullptr;
1280  }
1281  }
1282 }
1283 
1284 void gga_offload_app_worker::create_tasks(uint32_t task_count,
1285  uint32_t batch_size,
1286  uint32_t block_size,
1287  uint32_t remote_consumer_id,
1288  doca_mmap *local_io_mmap,
1289  doca_mmap *remote_io_mmap)
1290 {
1291  doca_error_t ret;
1292 
1293  m_hot_data.transactions = storage::make_aligned<transaction_context>{}.object_array(task_count);
1294 
1295  auto *io_message_addr = m_io_message_region;
1296  // 6 * task_count: transactions io_buffers
1297  // 1 * task_count : comch_recv from host
1298  // 1 * task_count : comch_send_tasks
1299  m_io_message_bufs.reserve((task_count * 8) + batch_size);
1300 
1301  m_host_request_tasks.reserve(task_count);
1302  m_host_response_tasks.reserve(task_count);
1303  for (auto &ctx : m_rdma) {
1304  ctx.storage_request_tasks.reserve(task_count);
1305  ctx.storage_response_tasks.reserve(task_count);
1306  }
1307 
1308  for (uint32_t ii = 0; ii != (task_count + batch_size); ++ii) {
1309  doca_buf *buff = nullptr;
1310 
1311  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1312  m_io_message_mmap,
1313  io_message_addr,
1315  &buff);
1316  if (ret != DOCA_SUCCESS) {
1317  throw storage::runtime_error{ret, "Unable to get io message doca_buf"};
1318  }
1319 
1320  io_message_addr += storage::size_of_io_message;
1321  m_io_message_bufs.push_back(buff);
1322 
1323  doca_comch_consumer_task_post_recv *comch_consumer_task_post_recv = nullptr;
1324  ret = doca_comch_consumer_task_post_recv_alloc_init(m_consumer, buff, &comch_consumer_task_post_recv);
1325  if (ret != DOCA_SUCCESS) {
1326  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
1327  }
1329  doca_data{.ptr = std::addressof(m_hot_data)});
1330  m_host_request_tasks.push_back(comch_consumer_task_post_recv);
1331  }
1332 
1333  for (uint32_t ii = 0; ii != task_count; ++ii) {
1334  prepare_transaction_part(ii, io_message_addr, connection_role::data_1);
1335  io_message_addr += storage::size_of_io_message;
1336  io_message_addr += storage::size_of_io_message;
1337  prepare_transaction_part(ii, io_message_addr, connection_role::data_2);
1338  io_message_addr += storage::size_of_io_message;
1339  io_message_addr += storage::size_of_io_message;
1340  prepare_transaction_part(ii, io_message_addr, connection_role::data_p);
1341  io_message_addr += storage::size_of_io_message;
1342  io_message_addr += storage::size_of_io_message;
1343  }
1344 
1345  create_gga_tasks(block_size, local_io_mmap, remote_io_mmap);
1346 
1347  for (uint32_t ii = 0; ii != task_count; ++ii) {
1348  doca_buf *buff = nullptr;
1349 
1350  ret = doca_buf_inventory_buf_get_by_data(m_buf_inv,
1351  m_io_message_mmap,
1352  io_message_addr,
1354  &buff);
1355  if (ret != DOCA_SUCCESS) {
1356  throw storage::runtime_error{ret, "Unable to get io message doca_buf"};
1357  }
1358 
1359  io_message_addr += storage::size_of_io_message;
1360  m_io_message_bufs.push_back(buff);
1361 
1362  doca_comch_producer_task_send *comch_producer_task_send;
1364  buff,
1365  nullptr,
1366  0,
1367  remote_consumer_id,
1368  &comch_producer_task_send);
1369  if (ret != DOCA_SUCCESS) {
1370  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
1371  }
1373  doca_data{.u64 = ii});
1374  m_hot_data.transactions[ii].host_response_task = comch_producer_task_send;
1375  m_host_response_tasks.push_back(comch_producer_task_send);
1376  }
1377 }
1378 
1379 void gga_offload_app_worker::prepare_thread_proc(uint32_t core_id)
1380 {
1381  m_thread = std::thread{[this]() {
1382  try {
1383  thread_proc();
1384  } catch (std::exception const &ex) {
1385  DOCA_LOG_ERR("Core: %u Exception: %s", m_hot_data.core_idx, ex.what());
1386  m_hot_data.error_flag = true;
1387  m_hot_data.run_flag = false;
1388  }
1389  }};
1390  m_hot_data.core_idx = core_id;
1391  storage::set_thread_affinity(m_thread, m_hot_data.core_idx);
1392 }
1393 
1394 void gga_offload_app_worker::start_thread_proc(void)
1395 {
1396  // Submit initial tasks
1397  doca_error_t ret;
1398  for (auto *task : m_host_request_tasks) {
1400  if (ret != DOCA_SUCCESS) {
1401  DOCA_LOG_ERR("Failed to submit initial doca_comch_consumer_task_post_recv task: %s",
1402  doca_error_get_name(ret));
1403  throw storage::runtime_error{ret, "Failed to submit initial task"};
1404  }
1405  }
1406 
1407  for (auto &ctx : m_rdma) {
1408  for (auto *task : ctx.storage_response_tasks) {
1410  if (ret != DOCA_SUCCESS) {
1411  DOCA_LOG_ERR("Failed to submit initial doca_rdma_task_receive task: %s",
1412  doca_error_get_name(ret));
1413  throw storage::runtime_error{ret, "Failed to submit initial task"};
1414  }
1415  }
1416  }
1417 
1418  m_hot_data.run_flag = true;
1419 }
1420 
1421 gga_offload_app_worker::hot_data const &gga_offload_app_worker::get_hot_data(void) const noexcept
1422 {
1423  return m_hot_data;
1424 }
1425 
1426 void gga_offload_app_worker::init(doca_dev *dev,
1427  doca_comch_connection *comch_conn,
1428  uint32_t task_count,
1429  uint32_t batch_size,
1430  std::string const &ec_matrix_type,
1431  uint32_t recover_drop_freq)
1432 {
1433  doca_error_t ret;
1434  auto const page_size = storage::get_system_page_size();
1435 
1436  // 6 * task_count: transactions io_buffers
1437  // 1 * task_count : comch_recv from host
1438  // 1 * task_count : comch_send_tasks
1439  auto const io_message_count = (task_count * 8) + batch_size;
1440  auto const raw_io_messages_size = io_message_count * storage::size_of_io_message;
1441 
1442  DOCA_LOG_DBG("Allocate io messages memory (%zu bytes, aligned to %u byte pages)",
1443  raw_io_messages_size,
1444  page_size);
1445  m_io_message_region = static_cast<uint8_t *>(
1446  storage::aligned_alloc(page_size, storage::aligned_size(page_size, raw_io_messages_size)));
1447  if (m_io_message_region == nullptr) {
1448  throw storage::runtime_error{DOCA_ERROR_NO_MEMORY, "Failed to allocate io messages"};
1449  }
1450 
1451  m_io_message_mmap = storage::make_mmap(dev,
1452  reinterpret_cast<char *>(m_io_message_region),
1453  raw_io_messages_size,
1455 
1456  auto const gga_buffer_count = task_count * 5;
1457  ret = doca_buf_inventory_create(io_message_count + gga_buffer_count, &m_buf_inv);
1458  if (ret != DOCA_SUCCESS) {
1459  throw storage::runtime_error{ret, "Failed to create doca_buf_inventory"};
1460  }
1461 
1462  ret = doca_buf_inventory_start(m_buf_inv);
1463  if (ret != DOCA_SUCCESS) {
1464  throw storage::runtime_error{ret, "Failed to start doca_buf_inventory"};
1465  }
1466 
1467  DOCA_LOG_DBG("Create hot path progress engine");
1468  ret = doca_pe_create(std::addressof(m_hot_data.pe));
1469  if (ret != DOCA_SUCCESS) {
1470  throw storage::runtime_error{ret, "Failed to create doca_pe"};
1471  }
1472 
1473  m_consumer = storage::make_comch_consumer(comch_conn,
1474  m_io_message_mmap,
1475  m_hot_data.pe,
1476  task_count + batch_size,
1477  doca_data{.ptr = std::addressof(m_hot_data)},
1478  doca_comch_consumer_task_post_recv_cb,
1479  doca_comch_consumer_task_post_recv_error_cb);
1480 
1481  m_producer = storage::make_comch_producer(comch_conn,
1482  m_hot_data.pe,
1483  task_count,
1484  doca_data{.ptr = std::addressof(m_hot_data)},
1485  doca_comch_producer_task_send_cb,
1486  doca_comch_producer_task_send_error_cb);
1487 
1488  ret = doca_ec_create(dev, &m_ec);
1489  if (ret != DOCA_SUCCESS) {
1490  throw storage::runtime_error{ret, "Failed to create doca_ec"};
1491  }
1492 
1493  ret = doca_ctx_set_user_data(doca_ec_as_ctx(m_ec), doca_data{.ptr = std::addressof(m_hot_data)});
1494  if (ret != DOCA_SUCCESS) {
1495  throw storage::runtime_error{ret, "Failed to set doca_ec user data: "s + doca_error_get_name(ret)};
1496  }
1497 
1498  ret = doca_pe_connect_ctx(m_hot_data.pe, doca_ec_as_ctx(m_ec));
1499  if (ret != DOCA_SUCCESS) {
1500  throw storage::runtime_error{ret, "Failed to connect doca_ec to progress engine"};
1501  }
1502 
1503  ret = doca_ec_task_recover_set_conf(m_ec, doca_ec_task_recover_cb, doca_ec_task_recover_error_cb, task_count);
1504  if (ret != DOCA_SUCCESS) {
1505  throw storage::runtime_error{ret, "Failed to create doca_ec_task_recover task pool"};
1506  }
1507 
1508  ret = doca_ctx_start(doca_ec_as_ctx(m_ec));
1509  if (ret != DOCA_SUCCESS) {
1510  throw storage::runtime_error{ret, "Failed to start doca_ec"};
1511  }
1512 
1513  // Create a matrix that creates one redundancy block per 2 data blocks
1514  ret = doca_ec_matrix_create(m_ec, storage::matrix_type_from_string(ec_matrix_type), 2, 1, &m_ec_matrix);
1515  if (ret != DOCA_SUCCESS) {
1516  throw storage::runtime_error{ret, "Failed to create doca_ec matrix"};
1517  }
1518 
1519  ret = doca_compress_create(dev, &m_compress);
1520  if (ret != DOCA_SUCCESS) {
1521  throw storage::runtime_error{ret, "Failed to create doca_compress"};
1522  }
1523 
1524  ret = doca_ctx_set_user_data(doca_compress_as_ctx(m_compress), doca_data{.ptr = std::addressof(m_hot_data)});
1525  if (ret != DOCA_SUCCESS) {
1526  throw storage::runtime_error{ret,
1527  "Failed to set doca_compress user data: "s + doca_error_get_name(ret)};
1528  }
1529 
1530  ret = doca_pe_connect_ctx(m_hot_data.pe, doca_compress_as_ctx(m_compress));
1531  if (ret != DOCA_SUCCESS) {
1532  throw storage::runtime_error{ret, "Failed to conncompresst doca_compress to progress engine"};
1533  }
1534 
1536  doca_compress_task_decompress_lz4_stream_cb,
1537  doca_compress_task_decompress_lz4_stream_error_cb,
1538  task_count);
1539  if (ret != DOCA_SUCCESS) {
1540  throw storage::runtime_error{ret,
1541  "Failed to create doca_compress_task_decompress_lz4_stream task pool"};
1542  }
1543 
1544  ret = doca_ctx_start(doca_compress_as_ctx(m_compress));
1545  if (ret != DOCA_SUCCESS) {
1546  throw storage::runtime_error{ret, "Failed to start doca_compress"};
1547  }
1548 
1549  auto constexpr rdma_permissions = DOCA_ACCESS_FLAG_LOCAL_READ_WRITE | DOCA_ACCESS_FLAG_RDMA_READ |
1551 
1552  for (auto &ctx : m_rdma) {
1553  ctx.ctrl.rdma = storage::make_rdma_context(dev,
1554  m_hot_data.pe,
1555  doca_data{.ptr = std::addressof(m_hot_data)},
1556  rdma_permissions);
1557 
1558  ret = doca_rdma_task_receive_set_conf(ctx.ctrl.rdma,
1559  doca_rdma_task_receive_cb,
1560  doca_rdma_task_receive_error_cb,
1561  task_count);
1562  if (ret != DOCA_SUCCESS) {
1563  throw storage::runtime_error{ret, "Failed to configure rdma receive task pool"};
1564  }
1565 
1566  ret = doca_rdma_task_send_set_conf(ctx.ctrl.rdma,
1567  doca_rdma_task_send_cb,
1568  doca_rdma_task_send_error_cb,
1569  task_count + batch_size);
1570  if (ret != DOCA_SUCCESS) {
1571  throw storage::runtime_error{ret, "Failed to configure rdma send task pool"};
1572  }
1573 
1574  ret = doca_ctx_start(doca_rdma_as_ctx(ctx.ctrl.rdma));
1575  if (ret != DOCA_SUCCESS) {
1576  throw storage::runtime_error{ret, "Failed to start doca_rdma context"};
1577  }
1578 
1579  ctx.data.rdma = storage::make_rdma_context(dev,
1580  m_hot_data.pe,
1581  doca_data{.ptr = std::addressof(m_hot_data)},
1582  rdma_permissions);
1583 
1584  ret = doca_ctx_start(doca_rdma_as_ctx(ctx.data.rdma));
1585  if (ret != DOCA_SUCCESS) {
1586  throw storage::runtime_error{ret, "Failed to start doca_rdma context"};
1587  }
1588  }
1589 
1590  m_hot_data.run_flag = false;
1591  m_hot_data.error_flag = false;
1592  m_hot_data.task_count = task_count;
1593  m_hot_data.pe_hit_count = 0;
1594  m_hot_data.pe_miss_count = 0;
1595  m_hot_data.completed_transaction_count = 0;
1596  m_hot_data.in_flight_transaction_count = 0;
1597  m_hot_data.recover_drop_count = recover_drop_freq;
1598  m_hot_data.recover_drop_freq = recover_drop_freq;
1599 }
1600 
1601 void gga_offload_app_worker::cleanup(void) noexcept
1602 {
1603  doca_error_t ret;
1604  std::vector<doca_task *> tasks;
1605 
1606  for (auto &ctx : m_rdma) {
1607  if (ctx.ctrl.rdma != nullptr) {
1608  tasks.clear();
1609  tasks.reserve(ctx.storage_request_tasks.size() + ctx.storage_response_tasks.size());
1610  std::transform(std::begin(ctx.storage_request_tasks),
1611  std::end(ctx.storage_request_tasks),
1612  std::back_inserter(tasks),
1614  std::transform(std::begin(ctx.storage_response_tasks),
1615  std::end(ctx.storage_response_tasks),
1616  std::back_inserter(tasks),
1618 
1619  /* stop context with tasks list (tasks must be destroyed to finish stopping process) */
1620  ret = storage::stop_context(doca_rdma_as_ctx(ctx.ctrl.rdma), m_hot_data.pe, tasks);
1621  if (ret != DOCA_SUCCESS) {
1622  DOCA_LOG_ERR("Failed to stop rdma control context: %s", doca_error_get_name(ret));
1623  }
1624 
1625  ret = doca_rdma_destroy(ctx.ctrl.rdma);
1626  if (ret != DOCA_SUCCESS) {
1627  DOCA_LOG_ERR("Failed to destroy rdma control context: %s", doca_error_get_name(ret));
1628  }
1629  }
1630 
1631  if (ctx.data.rdma != nullptr) {
1632  // No tasks allocated on this side for the data context, all tasks are executed from the storage
1633  // side
1634  ret = doca_ctx_stop(doca_rdma_as_ctx(ctx.data.rdma));
1635  if (ret != DOCA_SUCCESS) {
1636  DOCA_LOG_ERR("Failed to stop rdma data context: %s", doca_error_get_name(ret));
1637  }
1638 
1639  ret = doca_rdma_destroy(ctx.data.rdma);
1640  if (ret != DOCA_SUCCESS) {
1641  DOCA_LOG_ERR("Failed to destroy rdma data context: %s", doca_error_get_name(ret));
1642  }
1643  }
1644  }
1645 
1646  destroy_comch_objects();
1647 
1648  if (m_hot_data.pe != nullptr) {
1649  ret = doca_pe_destroy(m_hot_data.pe);
1650  if (ret != DOCA_SUCCESS) {
1651  DOCA_LOG_ERR("Failed to destroy progress engine");
1652  }
1653  }
1654 
1655  for (auto *buf : m_io_message_bufs) {
1656  static_cast<void>(doca_buf_dec_refcount(buf, nullptr));
1657  }
1658 
1659  if (m_buf_inv) {
1660  ret = doca_buf_inventory_stop(m_buf_inv);
1661  if (ret != DOCA_SUCCESS) {
1662  DOCA_LOG_ERR("Failed to stop buffer inventory");
1663  }
1664  ret = doca_buf_inventory_destroy(m_buf_inv);
1665  if (ret != DOCA_SUCCESS) {
1666  DOCA_LOG_ERR("Failed to destroy buffer inventory");
1667  }
1668  }
1669 
1670  if (m_io_message_mmap) {
1671  ret = doca_mmap_stop(m_io_message_mmap);
1672  if (ret != DOCA_SUCCESS) {
1673  DOCA_LOG_ERR("Failed to stop mmap");
1674  }
1675  ret = doca_mmap_destroy(m_io_message_mmap);
1676  if (ret != DOCA_SUCCESS) {
1677  DOCA_LOG_ERR("Failed to destroy mmap");
1678  }
1679  }
1680 
1681  if (m_io_message_region != nullptr) {
1682  storage::aligned_free(m_io_message_region);
1683  }
1684 }
1685 
1686 void gga_offload_app_worker::create_gga_tasks(uint32_t block_size, doca_mmap *local_io_mmap, doca_mmap *remote_io_mmap)
1687 {
1688  char *io_local_region_begin = nullptr;
1689  char *io_remote_region_begin = nullptr;
1690  size_t io_local_region_size = 0;
1691  size_t io_remote_region_size = 0;
1692  doca_error_t ret;
1693 
1694  ret = doca_mmap_get_memrange(local_io_mmap,
1695  reinterpret_cast<void **>(&io_local_region_begin),
1696  &io_local_region_size);
1697  if (ret != DOCA_SUCCESS) {
1698  throw storage::runtime_error{ret, "Failed to query memrange for local mmap"};
1699  }
1700 
1701  ret = doca_mmap_get_memrange(remote_io_mmap,
1702  reinterpret_cast<void **>(&io_remote_region_begin),
1703  &io_remote_region_size);
1704  if (ret != DOCA_SUCCESS) {
1705  throw storage::runtime_error{ret, "Failed to query memrange for remote mmap"};
1706  }
1707 
1708  /*
1709  * Expect that the local region is 50% larger than the remote (50% extra space for temporary parity data)
1710  */
1711  if ((io_remote_region_size + (io_remote_region_size / 2)) != io_local_region_size) {
1712  throw storage::runtime_error{DOCA_ERROR_BAD_STATE, "Remote and local memranges differ in size"};
1713  }
1714 
1715  m_hot_data.local_memory_start_addr = reinterpret_cast<uint64_t>(io_local_region_begin);
1716  m_hot_data.remote_memory_start_addr = reinterpret_cast<uint64_t>(io_remote_region_begin);
1717  m_hot_data.storage_capacity = io_remote_region_size;
1718  m_hot_data.block_size = block_size;
1719  m_hot_data.half_block_size = block_size / 2;
1720 
1721  for (uint32_t ii = 0; ii != m_hot_data.task_count; ++ii) {
1722  doca_buf *in_buf = nullptr;
1723  doca_buf *out_buf = nullptr;
1724 
1725  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1726  local_io_mmap,
1727  io_local_region_begin,
1728  io_local_region_size,
1729  &in_buf);
1730  if (ret != DOCA_SUCCESS) {
1731  throw storage::runtime_error{ret, "Failed to get local io buf"};
1732  }
1733  m_io_message_bufs.push_back(in_buf);
1734 
1735  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1736  remote_io_mmap,
1737  io_remote_region_begin,
1738  io_remote_region_size,
1739  &out_buf);
1740  if (ret != DOCA_SUCCESS) {
1741  throw storage::runtime_error{ret, "Failed to get remote io buf"};
1742  }
1743  m_io_message_bufs.push_back(out_buf);
1744 
1745  auto constexpr has_block_checksum = false;
1746  auto constexpr are_blocks_independent = true;
1747 
1749  m_compress,
1750  has_block_checksum,
1751  are_blocks_independent,
1752  in_buf,
1753  out_buf,
1754  doca_data{.u64 = ii},
1755  &(m_hot_data.transactions[ii].decompress_task));
1756  if (ret != DOCA_SUCCESS) {
1757  throw storage::runtime_error{ret, "Failed to allocate decompress task"};
1758  }
1759  }
1760 
1761  for (uint32_t ii = 0; ii != m_hot_data.task_count; ++ii) {
1762  doca_buf *in_buf_1 = nullptr;
1763  doca_buf *in_buf_2 = nullptr;
1764  doca_buf *out_buf = nullptr;
1765 
1766  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1767  local_io_mmap,
1768  io_local_region_begin,
1769  io_local_region_size,
1770  &in_buf_1);
1771  if (ret != DOCA_SUCCESS) {
1772  throw storage::runtime_error{ret, "Failed to get local io buf"};
1773  }
1774  m_io_message_bufs.push_back(in_buf_1);
1775 
1776  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1777  local_io_mmap,
1778  io_local_region_begin,
1779  io_local_region_size,
1780  &in_buf_2);
1781  if (ret != DOCA_SUCCESS) {
1782  throw storage::runtime_error{ret, "Failed to get local io buf"};
1783  }
1784  static_cast<void>(doca_buf_chain_list(in_buf_1, in_buf_2));
1785 
1786  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1787  local_io_mmap,
1788  io_local_region_begin,
1789  io_local_region_size,
1790  &out_buf);
1791  if (ret != DOCA_SUCCESS) {
1792  throw storage::runtime_error{ret, "Failed to get parity io buf"};
1793  }
1794  m_io_message_bufs.push_back(out_buf);
1795 
1797  m_ec_matrix,
1798  in_buf_1,
1799  out_buf,
1800  doca_data{.u64 = ii},
1801  &(m_hot_data.transactions[ii].ec_recover_task));
1802  if (ret != DOCA_SUCCESS) {
1803  throw storage::runtime_error{ret, "Failed to allocate ec recover task"};
1804  }
1805  }
1806 }
1807 
1808 void gga_offload_app_worker::prepare_transaction_part(uint32_t idx, uint8_t *io_message_addr, connection_role role)
1809 {
1810  doca_error_t ret;
1811  doca_buf *req_buff = nullptr;
1812  doca_buf *res_buff = nullptr;
1813 
1814  ret = doca_buf_inventory_buf_get_by_data(m_buf_inv,
1815  m_io_message_mmap,
1816  io_message_addr,
1818  &req_buff);
1819  if (ret != DOCA_SUCCESS) {
1820  throw storage::runtime_error{ret, "Unable to get io message doca_buf"};
1821  }
1822 
1823  m_hot_data.transactions[idx].io_message[role] = reinterpret_cast<char *>(io_message_addr);
1824  m_io_message_bufs.push_back(req_buff);
1825 
1826  io_message_addr += storage::size_of_io_message;
1827 
1828  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
1829  m_io_message_mmap,
1830  io_message_addr,
1832  &res_buff);
1833  if (ret != DOCA_SUCCESS) {
1834  throw storage::runtime_error{ret, "Unable to get io message doca_buf"};
1835  }
1836 
1837  m_io_message_bufs.push_back(res_buff);
1838 
1839  auto &transaction = m_hot_data.transactions[idx];
1840  transaction.array_idx = idx;
1841  transaction.remaining_op_count = 0;
1842  ret = doca_rdma_task_send_allocate_init(m_rdma[role].ctrl.rdma,
1843  m_rdma[role].ctrl.conn,
1844  req_buff,
1845  doca_data{.u64 = idx},
1846  std::addressof(transaction.requests[role]));
1847  if (ret != DOCA_SUCCESS) {
1848  throw storage::runtime_error{ret, "Failed to allocate rdma doca_rdma_task_send"};
1849  }
1850  m_rdma[role].storage_request_tasks.push_back(transaction.requests[role]);
1851 
1852  ret = doca_rdma_task_receive_allocate_init(m_rdma[role].ctrl.rdma,
1853  res_buff,
1854  doca_data{.ptr = nullptr},
1855  std::addressof(transaction.responses[role]));
1856  if (ret != DOCA_SUCCESS) {
1857  throw storage::runtime_error{ret, "Failed to allocate rdma doca_rdma_task_receive"};
1858  }
1859  m_rdma[role].storage_response_tasks.push_back(transaction.responses[role]);
1860 }
1861 
1862 void gga_offload_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
1863  doca_data task_user_data,
1864  doca_data ctx_user_data) noexcept
1865 {
1866  static_cast<void>(task_user_data);
1867 
1868  char *io_message;
1870  reinterpret_cast<void **>(&io_message)));
1871 
1872  auto const ret =
1873  static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr)->start_transaction(task, io_message);
1874  if (ret != DOCA_SUCCESS) {
1875  DOCA_LOG_ERR("Failed to start transaction: %s", doca_error_get_name(ret));
1876  }
1877 }
1878 
1879 void gga_offload_app_worker::doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
1880  doca_data task_user_data,
1881  doca_data ctx_user_data) noexcept
1882 {
1883  static_cast<void>(task);
1884  static_cast<void>(task_user_data);
1885 
1886  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
1887 
1888  if (hot_data->run_flag) {
1889  DOCA_LOG_ERR("Failed to complete doca_comch_consumer_task_post_recv");
1890  hot_data->run_flag = false;
1891  hot_data->error_flag = true;
1892  }
1893 }
1894 
1895 void gga_offload_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
1896  doca_data task_user_data,
1897  doca_data ctx_user_data) noexcept
1898 {
1899  static_cast<void>(task);
1900  static_cast<void>(task_user_data);
1901  static_cast<void>(ctx_user_data);
1902 }
1903 
1904 void gga_offload_app_worker::doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
1905  doca_data task_user_data,
1906  doca_data ctx_user_data) noexcept
1907 {
1908  static_cast<void>(task);
1909  static_cast<void>(task_user_data);
1910 
1911  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
1912  DOCA_LOG_ERR("Failed to complete doca_comch_producer_task_send");
1913  hot_data->run_flag = false;
1914  hot_data->error_flag = true;
1915 }
1916 
1917 void gga_offload_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
1918  doca_data task_user_data,
1919  doca_data ctx_user_data) noexcept
1920 {
1921  static_cast<void>(task);
1922 
1923  auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
1924  auto &transaction = hot_data->transactions[task_user_data.u64];
1925 
1926  --(transaction.remaining_op_count);
1927  if (transaction.remaining_op_count == 0) {
1928  hot_data->process_result(transaction);
1929  }
1930 }
1931 
1932 void gga_offload_app_worker::doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
1933  doca_data task_user_data,
1934  doca_data ctx_user_data) noexcept
1935 {
1936  static_cast<void>(task);
1937  static_cast<void>(task_user_data);
1938 
1939  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
1940  DOCA_LOG_ERR("Failed to complete doca_rdma_task_send");
1941  hot_data->run_flag = false;
1942  hot_data->error_flag = true;
1943 }
1944 
1945 void gga_offload_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
1946  doca_data task_user_data,
1947  doca_data ctx_user_data) noexcept
1948 {
1949  static_cast<void>(task_user_data);
1950 
1951  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
1952 
1953  auto *const rdma_io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task));
1954  auto const cid = storage::io_message_view::get_correlation_id(rdma_io_message);
1955 
1956  auto *host_io_message = io_message_from_doca_buf(
1957  doca_comch_producer_task_send_get_buf(hot_data->transactions[cid].host_response_task));
1958 
1959  if (storage::io_message_view::get_result(rdma_io_message) != DOCA_SUCCESS) {
1960  // store error
1962  host_io_message);
1963  }
1964 
1966 
1967  auto &transaction = hot_data->transactions[cid];
1968 
1969  --(transaction.remaining_op_count);
1970  if (transaction.remaining_op_count == 0) {
1971  hot_data->process_result(transaction);
1972  }
1973 
1974  if (hot_data->run_flag) {
1976  auto const ret = doca_task_submit(doca_rdma_task_receive_as_task(task));
1977  if (ret != DOCA_SUCCESS) {
1978  DOCA_LOG_ERR("Failed to resubmit doca_rdma_task_receive");
1979  hot_data->run_flag = false;
1980  hot_data->error_flag = true;
1981  }
1982  }
1983 }
1984 
1985 void gga_offload_app_worker::doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
1986  doca_data task_user_data,
1987  doca_data ctx_user_data) noexcept
1988 {
1989  static_cast<void>(task);
1990  static_cast<void>(task_user_data);
1991 
1992  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
1993  if (hot_data->run_flag) {
1994  /*
1995  * Only consider it a failure when this callback triggers while running. This callback will be triggered
1996  * as part of teardown as the submitted receive tasks that were never filled by requests from the host
1997  * get flushed out.
1998  */
1999  DOCA_LOG_ERR("Failed to complete doca_rdma_task_send");
2000  hot_data->run_flag = false;
2001  hot_data->error_flag = true;
2002  }
2003 }
2004 
2005 void gga_offload_app_worker::doca_ec_task_recover_cb(doca_ec_task_recover *task,
2006  doca_data task_user_data,
2007  doca_data ctx_user_data) noexcept
2008 {
2009  static_cast<void>(task);
2010 
2011  auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
2012  auto const cid = task_user_data.u64;
2013  auto &transaction = hot_data->transactions[cid];
2014 
2015  --(transaction.remaining_op_count);
2016  hot_data->start_decompress(transaction);
2017 }
2018 
2019 void gga_offload_app_worker::doca_ec_task_recover_error_cb(doca_ec_task_recover *task,
2020  doca_data task_user_data,
2021  doca_data ctx_user_data) noexcept
2022 {
2023  static_cast<void>(task);
2024  static_cast<void>(task_user_data);
2025 
2026  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
2027  DOCA_LOG_ERR("Failed to complete doca_ec_task_recover");
2028  hot_data->run_flag = false;
2029  hot_data->error_flag = true;
2030 }
2031 
2032 void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_cb(doca_compress_task_decompress_lz4_stream *task,
2033  doca_data task_user_data,
2034  doca_data ctx_user_data) noexcept
2035 {
2036  static_cast<void>(task);
2037 
2038  auto *hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
2039  auto &transaction = hot_data->transactions[task_user_data.u64];
2040  --(transaction.remaining_op_count);
2041 
2042  --(hot_data->in_flight_transaction_count);
2043  ++(hot_data->completed_transaction_count);
2044 
2045  doca_error_t ret;
2046  do {
2047  ret = doca_task_submit(doca_comch_producer_task_send_as_task(transaction.host_response_task));
2048  } while (ret == DOCA_ERROR_AGAIN);
2049 
2050  if (ret != DOCA_SUCCESS) {
2051  DOCA_LOG_ERR("Failed to submit doca_comch_producer_task_send: %s", doca_error_get_name(ret));
2052  hot_data->run_flag = false;
2053  hot_data->error_flag = true;
2054  }
2055  static_cast<void>(
2057 
2058  ret = doca_task_submit(doca_comch_consumer_task_post_recv_as_task(transaction.host_request_task));
2059  if (ret != DOCA_SUCCESS) {
2060  DOCA_LOG_ERR("Failed to submit doca_comch_consumer_task_post_recv: %s", doca_error_get_name(ret));
2061  hot_data->error_flag = true;
2062  hot_data->run_flag = false;
2063  }
2064 }
2065 
2066 void gga_offload_app_worker::doca_compress_task_decompress_lz4_stream_error_cb(
2067  doca_compress_task_decompress_lz4_stream *task,
2068  doca_data task_user_data,
2069  doca_data ctx_user_data) noexcept
2070 {
2071  static_cast<void>(task);
2072  static_cast<void>(task_user_data);
2073 
2074  auto *const hot_data = static_cast<gga_offload_app_worker::hot_data *>(ctx_user_data.ptr);
2075  DOCA_LOG_ERR("Failed to complete doca_compress_task_decompress_lz4_stream");
2076  hot_data->run_flag = false;
2077  hot_data->error_flag = true;
2078 }
2079 
2080 void gga_offload_app_worker::thread_proc()
2081 {
2082  while (m_hot_data.run_flag == false) {
2083  std::this_thread::yield();
2084  if (m_hot_data.error_flag)
2085  return;
2086  }
2087 
2088  DOCA_LOG_INFO("Core: %u running", m_hot_data.core_idx);
2089 
2090  while (m_hot_data.run_flag) {
2091  doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
2092  }
2093 
2094  while (m_hot_data.error_flag == false && m_hot_data.in_flight_transaction_count != 0) {
2095  doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
2096  }
2097 
2098  DOCA_LOG_INFO("Core: %u complete", m_hot_data.core_idx);
2099 }
2100 
2101 gga_offload_app::~gga_offload_app()
2102 {
2103  destroy_workers();
2104  for (auto &channel : m_all_ctrl_channels) {
2105  channel.reset();
2106  }
2107 
2108  doca_error_t ret;
2109  if (m_dev != nullptr) {
2110  ret = doca_dev_close(m_dev);
2111  if (ret != DOCA_SUCCESS) {}
2112  }
2113 }
2114 
2115 gga_offload_app::gga_offload_app(gga_offload_app_configuration const &cfg)
2116  : m_cfg{cfg},
2117  m_dev{nullptr},
2118  m_dev_rep{nullptr},
2119  m_remote_io_mmap{nullptr},
2120  m_local_io_region{nullptr},
2121  m_local_io_mmap{nullptr},
2122  m_all_ctrl_channels{},
2123  m_storage_ctrl_channels{},
2124  m_ctrl_messages{},
2125  m_remote_consumer_ids{},
2126  m_workers{nullptr},
2127  m_stats{},
2128  m_storage_capacity{},
2129  m_storage_block_size{},
2130  m_message_id_counter{},
2131  m_task_count{0},
2132  m_batch_size{0},
2133  m_core_count{0},
2134  m_abort_flag{false}
2135 {
2136  DOCA_LOG_INFO("Open doca_dev: %s", m_cfg.device_id.c_str());
2137  m_dev = storage::open_device(m_cfg.device_id);
2138 
2139  DOCA_LOG_INFO("Open doca_dev_rep: %s", m_cfg.representor_id.c_str());
2140  m_dev_rep = storage::open_representor(m_dev, m_cfg.representor_id);
2141 
2142  m_all_ctrl_channels[connection_role::data_1] = storage::control::make_tcp_client_control_channel(
2143  m_cfg.storage_server_address[connection_role::data_1]);
2144  m_storage_ctrl_channels[connection_role::data_1] = m_all_ctrl_channels[connection_role::data_1].get();
2145 
2146  m_all_ctrl_channels[connection_role::data_2] = storage::control::make_tcp_client_control_channel(
2147  m_cfg.storage_server_address[connection_role::data_2]);
2148  m_storage_ctrl_channels[connection_role::data_2] = m_all_ctrl_channels[connection_role::data_2].get();
2149 
2150  m_all_ctrl_channels[connection_role::data_p] = storage::control::make_tcp_client_control_channel(
2151  m_cfg.storage_server_address[connection_role::data_p]);
2152  m_storage_ctrl_channels[connection_role::data_p] = m_all_ctrl_channels[connection_role::data_p].get();
2153 
2154  m_all_ctrl_channels[connection_role::client] =
2156  m_dev_rep,
2157  m_cfg.command_channel_name.c_str(),
2158  this,
2159  new_comch_consumer_callback,
2160  expired_comch_consumer_callback);
2161 }
2162 
2163 void gga_offload_app::abort(std::string const &reason)
2164 {
2165  if (m_abort_flag)
2166  return;
2167 
2168  DOCA_LOG_ERR("Aborted: %s", reason.c_str());
2169  m_abort_flag = true;
2170 }
2171 
2172 void gga_offload_app::connect_to_storage(void)
2173 {
2174  for (auto *storage_channel : m_storage_ctrl_channels) {
2175  DOCA_LOG_DBG("Connect control channel...");
2176  for (;;) {
2177  if (m_abort_flag) {
2179  "Aborted while connecting to storage"};
2180  }
2181 
2182  if (storage_channel->is_connected())
2183  break;
2184  }
2185  }
2186 }
2187 
2188 void gga_offload_app::wait_for_comch_client_connection(void)
2189 {
2190  while (!m_all_ctrl_channels[connection_role::client]->is_connected()) {
2191  std::this_thread::sleep_for(std::chrono::milliseconds{100});
2192  if (m_abort_flag) {
2194  "Aborted while connecting to client"};
2195  }
2196  }
2197 }
2198 
2199 void gga_offload_app::wait_for_and_process_query_storage(void)
2200 {
2201  DOCA_LOG_INFO("Wait for query storage...");
2202  auto const client_request = wait_for_control_message();
2203 
2204  doca_error_t err_code;
2205  std::string err_msg;
2206 
2207  if (client_request.message_type == storage::control::message_type::query_storage_request) {
2208  try {
2209  m_all_ctrl_channels[connection_role::client]->send_message(
2210  process_query_storage(client_request));
2211  return;
2212  } catch (storage::runtime_error const &ex) {
2213  err_code = ex.get_doca_error();
2214  err_msg = ex.what();
2215  }
2216  } else {
2217  err_code = DOCA_ERROR_UNEXPECTED;
2218  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
2220  }
2221 
2222  m_all_ctrl_channels[connection_role::client]->send_message({
2224  client_request.message_id,
2225  client_request.correlation_id,
2226  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2227 
2228  });
2229 }
2230 
2231 void gga_offload_app::wait_for_and_process_init_storage(void)
2232 {
2233  DOCA_LOG_INFO("Wait for init storage...");
2234  auto const client_request = wait_for_control_message();
2235 
2236  doca_error_t err_code;
2237  std::string err_msg;
2238 
2239  if (client_request.message_type == storage::control::message_type::init_storage_request) {
2240  try {
2241  m_all_ctrl_channels[connection_role::client]->send_message(
2242  process_init_storage(client_request));
2243  return;
2244  } catch (storage::runtime_error const &ex) {
2245  err_code = ex.get_doca_error();
2246  err_msg = ex.what();
2247  }
2248  } else {
2249  err_code = DOCA_ERROR_UNEXPECTED;
2250  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
2252  }
2253 
2254  m_all_ctrl_channels[connection_role::client]->send_message({
2256  client_request.message_id,
2257  client_request.correlation_id,
2258  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2259 
2260  });
2261 }
2262 
2263 void gga_offload_app::wait_for_and_process_start_storage(void)
2264 {
2265  DOCA_LOG_INFO("Wait for start storage...");
2266  auto const client_request = wait_for_control_message();
2267 
2268  doca_error_t err_code;
2269  std::string err_msg;
2270 
2271  if (client_request.message_type == storage::control::message_type::start_storage_request) {
2272  try {
2273  m_all_ctrl_channels[connection_role::client]->send_message(
2274  process_start_storage(client_request));
2275  return;
2276  } catch (storage::runtime_error const &ex) {
2277  err_code = ex.get_doca_error();
2278  err_msg = ex.what();
2279  }
2280  } else {
2281  err_code = DOCA_ERROR_UNEXPECTED;
2282  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
2284  }
2285 
2286  m_all_ctrl_channels[connection_role::client]->send_message({
2288  client_request.message_id,
2289  client_request.correlation_id,
2290  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2291 
2292  });
2293 }
2294 
2295 void gga_offload_app::wait_for_and_process_stop_storage(void)
2296 {
2297  DOCA_LOG_INFO("Wait for stop storage...");
2298  auto const client_request = wait_for_control_message();
2299 
2300  doca_error_t err_code;
2301  std::string err_msg;
2302 
2303  if (client_request.message_type == storage::control::message_type::stop_storage_request) {
2304  try {
2305  m_all_ctrl_channels[connection_role::client]->send_message(
2306  process_stop_storage(client_request));
2307  return;
2308  } catch (storage::runtime_error const &ex) {
2309  err_code = ex.get_doca_error();
2310  err_msg = ex.what();
2311  }
2312  } else {
2313  err_code = DOCA_ERROR_UNEXPECTED;
2314  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
2316  }
2317 
2318  m_all_ctrl_channels[connection_role::client]->send_message({
2320  client_request.message_id,
2321  client_request.correlation_id,
2322  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2323 
2324  });
2325 }
2326 
2327 void gga_offload_app::wait_for_and_process_shutdown(void)
2328 {
2329  DOCA_LOG_INFO("Wait for shutdown storage...");
2330  auto const client_request = wait_for_control_message();
2331 
2332  doca_error_t err_code;
2333  std::string err_msg;
2334 
2335  if (client_request.message_type == storage::control::message_type::shutdown_request) {
2336  try {
2337  m_all_ctrl_channels[connection_role::client]->send_message(process_shutdown(client_request));
2338  return;
2339  } catch (storage::runtime_error const &ex) {
2340  err_code = ex.get_doca_error();
2341  err_msg = ex.what();
2342  }
2343  } else {
2344  err_code = DOCA_ERROR_UNEXPECTED;
2345  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
2347  }
2348 
2349  m_all_ctrl_channels[connection_role::client]->send_message({
2351  client_request.message_id,
2352  client_request.correlation_id,
2353  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
2354 
2355  });
2356 }
2357 
2358 void gga_offload_app::display_stats(void) const
2359 {
2360  for (auto const &stats : m_stats) {
2361  auto const pe_hit_rate_pct =
2362  (static_cast<double>(stats.pe_hit_count) /
2363  (static_cast<double>(stats.pe_hit_count) + static_cast<double>(stats.pe_miss_count))) *
2364  100.;
2365 
2366  printf("+================================================+\n");
2367  printf("| Core: %u\n", stats.core_idx);
2368  printf("| Operation count: %lu\n", stats.operation_count);
2369  printf("| Recovery count: %lu\n", stats.recovery_count);
2370  printf("| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct, stats.pe_hit_count, stats.pe_miss_count);
2371  }
2372 }
2373 
2374 void gga_offload_app::new_comch_consumer_callback(void *user_data, uint32_t id) noexcept
2375 {
2376  auto *self = reinterpret_cast<gga_offload_app *>(user_data);
2377  if (self->m_remote_consumer_ids.capacity() == 0) {
2378  DOCA_LOG_ERR("[BUG] no space for new remote consumer ids");
2379  return;
2380  }
2381 
2382  auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids), id);
2383  if (found == std::end(self->m_remote_consumer_ids)) {
2384  self->m_remote_consumer_ids.push_back(id);
2385  DOCA_LOG_DBG("Connected to remote consumer with id: %u. Consumer count is now: %zu",
2386  id,
2387  self->m_remote_consumer_ids.size());
2388  } else {
2389  DOCA_LOG_WARN("Ignoring duplicate remote consumer id: %u", id);
2390  }
2391 }
2392 
2393 void gga_offload_app::expired_comch_consumer_callback(void *user_data, uint32_t id) noexcept
2394 {
2395  auto *self = reinterpret_cast<gga_offload_app *>(user_data);
2396  auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids), id);
2397  if (found != std::end(self->m_remote_consumer_ids)) {
2398  self->m_remote_consumer_ids.erase(found);
2399  DOCA_LOG_DBG("Disconnected from remote consumer with id: %u. Consumer count is now: %zu",
2400  id,
2401  self->m_remote_consumer_ids.size());
2402  } else {
2403  DOCA_LOG_WARN("Ignoring disconnect of unexpected remote consumer id: %u", id);
2404  }
2405 }
2406 
2407 storage::control::message gga_offload_app::wait_for_control_message()
2408 {
2409  for (;;) {
2410  if (!m_ctrl_messages.empty()) {
2411  auto msg = std::move(m_ctrl_messages.front());
2412  m_ctrl_messages.erase(m_ctrl_messages.begin());
2413  return msg;
2414  }
2415 
2416  for (auto &channel : m_all_ctrl_channels) {
2417  // Poll for new messages
2418  auto *msg = channel->poll();
2419  if (msg) {
2420  m_ctrl_messages.push_back(std::move(*msg));
2421  }
2422  }
2423 
2424  if (m_abort_flag) {
2425  throw storage::runtime_error{
2427  "User aborted the gga_offload_application while waiting on a control message"};
2428  }
2429  }
2430 }
2431 
2432 void gga_offload_app::wait_for_responses(std::vector<storage::control::message_id> const &mids,
2433  std::chrono::seconds timeout)
2434 {
2435  auto const expiry = std::chrono::steady_clock::now() + timeout;
2436  uint32_t match_count = 0;
2437  do {
2438  if (m_abort_flag) {
2439  throw storage::runtime_error{
2441  "User aborted the gga_offload_application while waiting on a control message"};
2442  }
2443 
2444  for (auto &channel : m_all_ctrl_channels) {
2445  // Poll for new messages
2446  auto *msg = channel->poll();
2447  if (msg) {
2448  m_ctrl_messages.push_back(std::move(*msg));
2449  }
2450  }
2451 
2452  match_count = 0;
2453  for (auto mid : mids) {
2454  for (auto const &msg : m_ctrl_messages) {
2455  if (msg.message_id.value == mid.value) {
2456  ++match_count;
2457  break;
2458  }
2459  }
2460  }
2461 
2462  if (expiry < std::chrono::steady_clock::now()) {
2463  std::stringstream ss;
2464  ss << "Timed out while waiting on a control messages[";
2465  for (auto &id : mids) {
2466  ss << id.value << " ";
2467  }
2468  ss << "] had available messages:[";
2469  for (auto &msg : m_ctrl_messages) {
2470  ss << msg.message_id.value << " ";
2471  }
2472  ss << "]";
2473 
2474  throw storage::runtime_error{
2476  ss.str(),
2477  };
2478  }
2479  } while (match_count != mids.size());
2480 }
2481 
2482 storage::control::message gga_offload_app::get_response(storage::control::message_id mid)
2483 {
2484  auto found = std::find_if(std::begin(m_ctrl_messages), std::end(m_ctrl_messages), [mid](auto const &msg) {
2485  return msg.message_id.value == mid.value;
2486  });
2487 
2488  if (found != std::end(m_ctrl_messages)) {
2489  auto msg = std::move(*found);
2490  m_ctrl_messages.erase(found);
2491  return msg;
2492  }
2493 
2494  throw storage::runtime_error{DOCA_ERROR_BAD_STATE, "[BUG] Failed to get response from store"};
2495 }
2496 
2497 void gga_offload_app::discard_responses(std::vector<storage::control::message_id> const &mids)
2498 {
2499  m_ctrl_messages.erase(std::remove_if(std::begin(m_ctrl_messages),
2500  std::end(m_ctrl_messages),
2501  [&mids](auto const &msg) {
2502  return std::find(std::begin(mids),
2503  std::end(mids),
2504  msg.message_id) != std::end(mids);
2505  }),
2506  std::end(m_ctrl_messages));
2507 }
2508 
2509 storage::control::message gga_offload_app::process_query_storage(storage::control::message const &client_request)
2510 {
2511  DOCA_LOG_DBG("Forward request to storage...");
2512  std::vector<storage::control::message_id> msg_ids;
2513 
2514  for (auto *storage_ctrl : m_storage_ctrl_channels) {
2515  auto storage_request = storage::control::message{
2517  storage::control::message_id{m_message_id_counter++},
2518  client_request.correlation_id,
2519  {},
2520  };
2521 
2522  msg_ids.push_back(storage_request.message_id);
2523  storage_ctrl->send_message(storage_request);
2524  }
2525 
2526  wait_for_responses(msg_ids, default_control_timeout_seconds);
2527  for (auto &id : msg_ids) {
2528  auto response = get_response(id);
2529 
2531  discard_responses(msg_ids);
2532  return make_error_response(client_request.message_id,
2533  client_request.correlation_id,
2534  std::move(response),
2536  }
2537 
2538  auto const *const storage_details =
2539  dynamic_cast<storage::control::storage_details_payload const *>(response.payload.get());
2540  if (storage_details == nullptr) {
2541  throw storage::runtime_error{DOCA_ERROR_UNEXPECTED, "[BUG] invalid query_storage_response"};
2542  }
2543 
2544  DOCA_LOG_INFO("Storage reports capacity of: %lu using a block size of: %u",
2545  storage_details->total_size,
2546  storage_details->block_size);
2547  if (m_storage_capacity == 0) {
2548  m_storage_capacity = storage_details->total_size;
2549  m_storage_block_size = storage_details->block_size;
2550  } else {
2551  if (m_storage_capacity != storage_details->total_size) {
2554  client_request.message_id,
2555  client_request.correlation_id,
2556  std::make_unique<storage::control::error_response_payload>(
2558  "Mismatch in storage capacity: " + std::to_string(m_storage_capacity) +
2559  " vs " + std::to_string(storage_details->total_size)),
2560  };
2561  } else if (m_storage_block_size != storage_details->block_size) {
2564  client_request.message_id,
2565  client_request.correlation_id,
2566  std::make_unique<storage::control::error_response_payload>(
2568  "Mismatch in block_size: " + std::to_string(m_storage_block_size) +
2569  " vs " + std::to_string(storage_details->block_size)),
2570  };
2571  }
2572  }
2573  }
2574 
2575  /* over allocate DPU storage to make space for EC data chunks */
2576  auto const local_storage_size = m_storage_capacity + (m_storage_capacity / 2);
2577  m_local_io_region =
2578  static_cast<uint8_t *>(storage::aligned_alloc(storage::get_system_page_size(), local_storage_size));
2579  if (m_local_io_region == nullptr) {
2580  throw storage::runtime_error{DOCA_ERROR_NO_MEMORY, "Failed to allocate local memory region"};
2581  }
2582 
2583  m_local_io_mmap = storage::make_mmap(m_dev,
2584  reinterpret_cast<char *>(m_local_io_region),
2585  local_storage_size,
2588 
2591  client_request.message_id,
2592  client_request.correlation_id,
2593  std::make_unique<storage::control::storage_details_payload>(m_storage_capacity, m_storage_block_size),
2594  };
2595 }
2596 
2597 storage::control::message gga_offload_app::process_init_storage(storage::control::message const &client_request)
2598 {
2599  auto const *init_storage_details =
2600  reinterpret_cast<storage::control::init_storage_payload const *>(client_request.payload.get());
2601 
2602  if (init_storage_details->core_count > m_cfg.cpu_set.size()) {
2603  throw storage::runtime_error{
2605  "Unable to create " + std::to_string(m_core_count) + " threads as only " +
2606  std::to_string(m_cfg.cpu_set.size()) + " were defined",
2607  };
2608  }
2609 
2610  m_remote_consumer_ids.reserve(init_storage_details->core_count);
2611 
2612  m_task_count = init_storage_details->task_count;
2613  m_batch_size = init_storage_details->batch_size;
2614  m_core_count = init_storage_details->core_count;
2615  m_remote_io_mmap = storage::make_mmap(m_dev,
2616  init_storage_details->mmap_export_blob.data(),
2617  init_storage_details->mmap_export_blob.size());
2618  std::vector<uint8_t> mmap_export_blob = [this]() {
2619  uint8_t const *reexport_blob = nullptr;
2620  size_t reexport_blob_size = 0;
2621  auto const ret = doca_mmap_export_rdma(m_local_io_mmap,
2622  m_dev,
2623  reinterpret_cast<void const **>(&reexport_blob),
2624  &reexport_blob_size);
2625  if (ret != DOCA_SUCCESS) {
2626  throw storage::runtime_error{ret, "Failed to re-export host mmap for rdma"};
2627  }
2628 
2629  return std::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size};
2630  }();
2631 
2632  DOCA_LOG_INFO("Configured storage: %u cores, %u tasks, %u batch_size", m_core_count, m_task_count, m_batch_size);
2633 
2634  DOCA_LOG_DBG("Forward request to storage...");
2635  std::vector<storage::control::message_id> msg_ids;
2636 
2637  for (auto *storage_ctrl : m_storage_ctrl_channels) {
2638  auto storage_request = storage::control::message{
2640  storage::control::message_id{m_message_id_counter++},
2641  client_request.correlation_id,
2642  std::make_unique<storage::control::init_storage_payload>(init_storage_details->task_count,
2643  init_storage_details->batch_size,
2644  init_storage_details->core_count,
2645  mmap_export_blob),
2646  };
2647 
2648  msg_ids.push_back(storage_request.message_id);
2649  storage_ctrl->send_message(storage_request);
2650  }
2651 
2652  wait_for_responses(msg_ids, default_control_timeout_seconds);
2653  for (auto &id : msg_ids) {
2654  auto response = get_response(id);
2655 
2657  discard_responses(msg_ids);
2658  return make_error_response(client_request.message_id,
2659  client_request.correlation_id,
2660  std::move(response),
2662  }
2663  }
2664 
2665  DOCA_LOG_DBG("prepare thread contexts...");
2666  prepare_thread_contexts(client_request.correlation_id);
2667 
2670  client_request.message_id,
2671  client_request.correlation_id,
2672  {},
2673  };
2674 }
2675 
2676 storage::control::message gga_offload_app::process_start_storage(storage::control::message const &client_request)
2677 {
2678  DOCA_LOG_DBG("Forward request to storage...");
2679  std::vector<storage::control::message_id> msg_ids;
2680 
2681  for (auto *storage_ctrl : m_storage_ctrl_channels) {
2682  auto storage_request = storage::control::message{
2684  storage::control::message_id{m_message_id_counter++},
2685  client_request.correlation_id,
2686  {},
2687  };
2688 
2689  msg_ids.push_back(storage_request.message_id);
2690  storage_ctrl->send_message(storage_request);
2691  }
2692 
2693  wait_for_responses(msg_ids, default_control_timeout_seconds);
2694  for (auto &id : msg_ids) {
2695  auto response = get_response(id);
2696 
2698  discard_responses(msg_ids);
2699  return make_error_response(client_request.message_id,
2700  client_request.correlation_id,
2701  std::move(response),
2703  }
2704  }
2705 
2706  verify_connections_are_ready();
2707  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2708  m_workers[ii].create_tasks(m_task_count,
2709  m_batch_size,
2710  m_storage_block_size,
2711  m_remote_consumer_ids[ii],
2712  m_local_io_mmap,
2713  m_remote_io_mmap);
2714  m_workers[ii].start_thread_proc();
2715  }
2716 
2719  client_request.message_id,
2720  client_request.correlation_id,
2721  {},
2722  };
2723 }
2724 
2725 storage::control::message gga_offload_app::process_stop_storage(storage::control::message const &client_request)
2726 {
2727  DOCA_LOG_DBG("Forward request to storage...");
2728  std::vector<storage::control::message_id> msg_ids;
2729 
2730  for (auto *storage_ctrl : m_storage_ctrl_channels) {
2731  auto storage_request = storage::control::message{
2733  storage::control::message_id{m_message_id_counter++},
2734  client_request.correlation_id,
2735  {},
2736  };
2737 
2738  msg_ids.push_back(storage_request.message_id);
2739  storage_ctrl->send_message(storage_request);
2740  }
2741 
2742  wait_for_responses(msg_ids, default_control_timeout_seconds);
2743  for (auto &id : msg_ids) {
2744  auto response = get_response(id);
2745 
2747  discard_responses(msg_ids);
2748  return make_error_response(client_request.message_id,
2749  client_request.correlation_id,
2750  std::move(response),
2752  }
2753  }
2754 
2755  /* Stop all processing */
2756  m_stats.reserve(m_core_count);
2757  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2758  m_workers[ii].stop_processing();
2759  auto const &hot_data = m_workers[ii].get_hot_data();
2760  m_stats.push_back(thread_stats{
2761  m_cfg.cpu_set[ii],
2762  hot_data.pe_hit_count,
2763  hot_data.pe_miss_count,
2764  hot_data.completed_transaction_count,
2765  hot_data.recovery_flow_count,
2766  });
2767  m_workers[ii].destroy_comch_objects();
2768  }
2769 
2772  client_request.message_id,
2773  client_request.correlation_id,
2774  {},
2775  };
2776 }
2777 
2778 storage::control::message gga_offload_app::process_shutdown(storage::control::message const &client_request)
2779 {
2780  /* Wait for all remote comch objects to be destroyed and notified */
2781  while (!m_remote_consumer_ids.empty()) {
2782  auto *msg = m_all_ctrl_channels[connection_role::client]->poll();
2783  DOCA_LOG_DBG("Ignoring unexpected %s while processing %s",
2784  to_string(msg->message_type).c_str(),
2786  }
2787 
2788  DOCA_LOG_DBG("Forward request to storage...");
2789  std::vector<storage::control::message_id> msg_ids;
2790 
2791  for (auto *storage_ctrl : m_storage_ctrl_channels) {
2792  auto storage_request = storage::control::message{
2794  storage::control::message_id{m_message_id_counter++},
2795  client_request.correlation_id,
2796  {},
2797  };
2798 
2799  msg_ids.push_back(storage_request.message_id);
2800  storage_ctrl->send_message(storage_request);
2801  }
2802 
2803  wait_for_responses(msg_ids, default_control_timeout_seconds);
2804  for (auto &id : msg_ids) {
2805  auto response = get_response(id);
2806 
2808  discard_responses(msg_ids);
2809  return make_error_response(client_request.message_id,
2810  client_request.correlation_id,
2811  std::move(response),
2813  }
2814  }
2815 
2816  destroy_workers();
2819  client_request.message_id,
2820  client_request.correlation_id,
2821  {},
2822  };
2823 }
2824 
2825 void gga_offload_app::prepare_thread_contexts(storage::control::correlation_id cid)
2826 {
2827  auto const *comch_channel =
2828  dynamic_cast<storage::control::comch_channel *>(m_all_ctrl_channels[connection_role::client].get());
2829  if (comch_channel == nullptr) {
2830  throw storage::runtime_error{DOCA_ERROR_UNEXPECTED, "[BUG] invalid control channel"};
2831  }
2832 
2834  m_dev,
2835  comch_channel->get_comch_connection(),
2836  m_task_count,
2837  m_batch_size,
2838  m_cfg.ec_matrix_type,
2839  m_cfg.recover_freq);
2840 
2841  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2842  connect_rdma(ii, storage::control::rdma_connection_role::io_data, cid);
2844  m_workers[ii].prepare_thread_proc(m_cfg.cpu_set[ii]);
2845  }
2846 }
2847 
2848 void gga_offload_app::connect_rdma(uint32_t thread_idx,
2851 {
2852  std::vector<storage::control::message_id> msg_ids;
2853  auto &tctx = m_workers[thread_idx];
2854  {
2855  auto storage_request = storage::control::message{
2857  storage::control::message_id{m_message_id_counter++},
2858  cid,
2859  std::make_unique<storage::control::rdma_connection_details_payload>(
2860  thread_idx,
2861  role,
2862  tctx.get_local_rdma_connection_blob(connection_role::data_1, role)),
2863  };
2864 
2865  msg_ids.push_back(storage_request.message_id);
2866  m_storage_ctrl_channels[connection_role::data_1]->send_message(storage_request);
2867  }
2868  {
2869  auto storage_request = storage::control::message{
2871  storage::control::message_id{m_message_id_counter++},
2872  cid,
2873  std::make_unique<storage::control::rdma_connection_details_payload>(
2874  thread_idx,
2875  role,
2876  tctx.get_local_rdma_connection_blob(connection_role::data_2, role)),
2877  };
2878 
2879  msg_ids.push_back(storage_request.message_id);
2880  m_storage_ctrl_channels[connection_role::data_2]->send_message(storage_request);
2881  }
2882  {
2883  auto storage_request = storage::control::message{
2885  storage::control::message_id{m_message_id_counter++},
2886  cid,
2887  std::make_unique<storage::control::rdma_connection_details_payload>(
2888  thread_idx,
2889  role,
2890  tctx.get_local_rdma_connection_blob(connection_role::data_p, role)),
2891  };
2892 
2893  msg_ids.push_back(storage_request.message_id);
2894  m_storage_ctrl_channels[connection_role::data_p]->send_message(storage_request);
2895  }
2896 
2897  wait_for_responses(msg_ids, default_control_timeout_seconds);
2898  auto response_role = connection_role::data_1;
2899 
2900  for (auto &id : msg_ids) {
2901  auto response = get_response(id);
2902 
2904  discard_responses(msg_ids);
2906  auto *error_details =
2907  reinterpret_cast<storage::control::error_response_payload const *>(
2908  response.payload.get());
2909  throw storage::runtime_error{error_details->error_code, error_details->message};
2910  } else {
2911  throw storage::runtime_error{
2913  "Unexpected " + to_string(response.message_type) + " while expecting a " +
2914  to_string(
2916  };
2917  }
2918  }
2919 
2920  auto *remote_details = reinterpret_cast<storage::control::rdma_connection_details_payload const *>(
2921  response.payload.get());
2922  tctx.connect_rdma(response_role, role, remote_details->connection_details);
2923  response_role = static_cast<connection_role>(static_cast<uint8_t>(response_role) + 1);
2924  }
2925 }
2926 
2927 void gga_offload_app::verify_connections_are_ready(void)
2928 {
2929  uint32_t not_ready_count;
2930 
2931  do {
2932  not_ready_count = 0;
2933  if (m_remote_consumer_ids.size() != m_core_count) {
2934  ++not_ready_count;
2935  auto *msg = m_all_ctrl_channels[connection_role::client]->poll();
2936  if (msg != nullptr) {
2937  throw storage::runtime_error{
2939  "Unexpected " + to_string(msg->message_type) + " while processing " +
2941  };
2942  }
2943  }
2944 
2945  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2946  auto const ret = m_workers[ii].get_connections_state();
2947  if (ret == DOCA_ERROR_IN_PROGRESS) {
2948  ++not_ready_count;
2949  } else if (ret != DOCA_SUCCESS) {
2950  throw storage::runtime_error{ret, "Failure while establishing RDMA connections"};
2951  }
2952  }
2953 
2954  if (m_abort_flag) {
2956  "Aborted while establishing storage connections"};
2957  }
2958  } while (not_ready_count != 0);
2959 }
2960 
2961 void gga_offload_app::destroy_workers(void) noexcept
2962 {
2963  if (m_workers != nullptr) {
2964  // Destroy all thread resources
2965  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
2966  m_workers[ii].~gga_offload_app_worker();
2967  }
2968  storage::aligned_free(m_workers);
2969  m_workers = nullptr;
2970  }
2971 }
2972 } /* namespace */
static void cleanup(struct cache_invalidate_sample_state *state)
static uint32_t get_correlation_id(char const *buf)
Definition: io_message.hpp:114
static uint32_t get_io_size(char const *buf)
Definition: io_message.hpp:187
static void set_user_data(doca_data user_data, char *buf)
Definition: io_message.hpp:103
static doca_data get_user_data(char const *buf)
Definition: io_message.hpp:90
static void set_correlation_id(uint32_t correlation_id, char *buf)
Definition: io_message.hpp:127
static void set_type(io_message_type type, char *buf)
Definition: io_message.hpp:79
static void set_io_address(uint64_t io_address, char *buf)
Definition: io_message.hpp:176
static io_message_type get_type(char const *buf)
Definition: io_message.hpp:66
static uint64_t get_io_address(char const *buf)
Definition: io_message.hpp:163
static void set_io_size(uint32_t io_size, char *buf)
Definition: io_message.hpp:200
static void set_result(doca_error_t result, char *buf)
Definition: io_message.hpp:152
static doca_error_t get_result(char const *buf)
Definition: io_message.hpp:138
static void set_remote_offset(uint32_t remote_offset, char *buf)
Definition: io_message.hpp:224
T * object_array(size_t object_count, Args &&...args) const
Definition: aligned_new.hpp:88
doca_error_t get_doca_error() const noexcept
Definition: definitions.hpp:81
int main(int argc, char **argv)
DOCA_LOG_REGISTER(gga_offload)
static struct eth_l2_fwd_stats stats
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
@ DOCA_ARGP_TYPE_STRING
Definition: doca_argp.h:56
@ DOCA_ARGP_TYPE_INT
Definition: doca_argp.h:57
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
static doca_error_t doca_buf_inventory_buf_get_by_data(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *data, size_t data_len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by data & data_len a...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_chain_list(struct doca_buf *list1, struct doca_buf *list2)
Append list2 to list1.
DOCA_STABLE doca_error_t doca_buf_get_next_in_list(struct doca_buf *buf, struct doca_buf **next_buf)
Get next DOCA Buf in linked list.
DOCA_STABLE doca_error_t doca_buf_get_data(const struct doca_buf *buf, void **data)
Get the buffer's data.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE doca_error_t doca_buf_set_data(struct doca_buf *buf, void *data, size_t data_len)
DOCA_STABLE struct doca_buf * doca_comch_consumer_task_post_recv_get_buf(const struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE doca_error_t doca_comch_consumer_destroy(struct doca_comch_consumer *consumer)
DOCA_STABLE doca_error_t doca_comch_consumer_task_post_recv_alloc_init(struct doca_comch_consumer *consumer, struct doca_buf *buf, struct doca_comch_consumer_task_post_recv **task)
Allocate and initialize a doca_consumer post receive task.
DOCA_STABLE struct doca_ctx * doca_comch_consumer_as_ctx(struct doca_comch_consumer *consumer)
DOCA_STABLE struct doca_task * doca_comch_consumer_task_post_recv_as_task(struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE const struct doca_buf * doca_comch_producer_task_send_get_buf(const struct doca_comch_producer_task_send *task)
DOCA_STABLE struct doca_task * doca_comch_producer_task_send_as_task(struct doca_comch_producer_task_send *task)
DOCA_STABLE doca_error_t doca_comch_producer_destroy(struct doca_comch_producer *producer)
DOCA_STABLE struct doca_ctx * doca_comch_producer_as_ctx(struct doca_comch_producer *producer)
DOCA_STABLE doca_error_t doca_comch_producer_task_send_alloc_init(struct doca_comch_producer *producer, const struct doca_buf *buf, uint8_t *imm_data, uint32_t imm_data_len, uint32_t consumer_id, struct doca_comch_producer_task_send **task)
DOCA_EXPERIMENTAL struct doca_buf const * doca_compress_task_decompress_lz4_stream_get_src(const struct doca_compress_task_decompress_lz4_stream *task)
get decompress LZ4 stream task source
DOCA_EXPERIMENTAL doca_error_t doca_compress_task_decompress_lz4_stream_alloc_init(struct doca_compress *compress, uint8_t has_block_checksum, uint8_t are_blocks_independent, struct doca_buf const *src_buff, struct doca_buf *dst_buff, union doca_data user_data, struct doca_compress_task_decompress_lz4_stream **task)
Allocate decompress LZ4 stream task.
DOCA_EXPERIMENTAL doca_error_t doca_compress_create(struct doca_dev *dev, struct doca_compress **compress)
DOCA_EXPERIMENTAL struct doca_buf * doca_compress_task_decompress_lz4_stream_get_dst(const struct doca_compress_task_decompress_lz4_stream *task)
get decompress LZ4 stream task destination
DOCA_EXPERIMENTAL struct doca_ctx * doca_compress_as_ctx(struct doca_compress *compress)
DOCA_EXPERIMENTAL doca_error_t doca_compress_task_decompress_lz4_stream_set_conf(struct doca_compress *compress, doca_compress_task_decompress_lz4_stream_completion_cb_t task_completion_cb, doca_compress_task_decompress_lz4_stream_completion_cb_t task_error_cb, uint32_t num_tasks)
This method sets the decompress LZ4 stream task configuration.
DOCA_EXPERIMENTAL struct doca_task * doca_compress_task_decompress_lz4_stream_as_task(struct doca_compress_task_decompress_lz4_stream *task)
convert decompress LZ4 stream task to doca_task
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
DOCA_STABLE doca_error_t doca_ctx_set_user_data(struct doca_ctx *ctx, union doca_data user_data)
set user data to context
DOCA_STABLE doca_error_t doca_ctx_stop(struct doca_ctx *ctx)
Stops the context allowing reconfiguration.
doca_ctx_states
This enum defines the states of a context.
Definition: doca_ctx.h:83
@ DOCA_CTX_STATE_RUNNING
Definition: doca_ctx.h:98
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
DOCA_EXPERIMENTAL const struct doca_buf * doca_ec_task_recover_get_available_blocks(const struct doca_ec_task_recover *task)
This method gets the available_blocks buffer of a recover task. The available_blocks buffer is a sour...
DOCA_EXPERIMENTAL struct doca_buf * doca_ec_task_recover_get_recovered_data(const struct doca_ec_task_recover *task)
This method gets the recovered_data buffer of a recover task. The recovered_data buffer is a destinat...
DOCA_EXPERIMENTAL doca_error_t doca_ec_create(struct doca_dev *dev, struct doca_ec **ec)
Create a DOCA EC instance.
DOCA_EXPERIMENTAL doca_error_t doca_ec_task_recover_allocate_init(struct doca_ec *ec, const struct doca_ec_matrix *recover_matrix, const struct doca_buf *available_blocks, struct doca_buf *recovered_data_blocks, union doca_data user_data, struct doca_ec_task_recover **task)
This method allocates and initializes a recover task.
DOCA_EXPERIMENTAL struct doca_task * doca_ec_task_recover_as_task(struct doca_ec_task_recover *task)
This method converts an EC recover task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_ec_task_recover_set_conf(struct doca_ec *ec, doca_ec_task_recover_completion_cb_t successful_task_completion_cb, doca_ec_task_recover_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the recover tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_ec_matrix_create(struct doca_ec *ec, enum doca_ec_matrix_type type, size_t data_block_count, size_t rdnc_block_count, struct doca_ec_matrix **matrix)
Generate coding matrix for Erasure Code encode i.e. most basic encode matrix. This is necessary for e...
DOCA_EXPERIMENTAL struct doca_ctx * doca_ec_as_ctx(struct doca_ec *ec)
Convert EC instance into context.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_TIME_OUT
Definition: doca_error.h:47
@ DOCA_ERROR_CONNECTION_ABORTED
Definition: doca_error.h:50
@ DOCA_ERROR_INVALID_VALUE
Definition: doca_error.h:44
@ DOCA_ERROR_INITIALIZATION
Definition: doca_error.h:46
@ DOCA_ERROR_UNEXPECTED
Definition: doca_error.h:60
@ DOCA_ERROR_BAD_STATE
Definition: doca_error.h:56
@ DOCA_ERROR_NOT_SUPPORTED
Definition: doca_error.h:42
@ DOCA_ERROR_AGAIN
Definition: doca_error.h:43
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_NO_MEMORY
Definition: doca_error.h:45
@ DOCA_ERROR_IN_PROGRESS
Definition: doca_error.h:64
@ DOCA_ERROR_CONNECTION_RESET
Definition: doca_error.h:49
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
Definition: doca_log.h:476
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_get_memrange(const struct doca_mmap *mmap, void **addr, size_t *len)
Get the memory range of DOCA memory map.
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
DOCA_STABLE doca_error_t doca_mmap_export_rdma(struct doca_mmap *mmap, const struct doca_dev *dev, const void **export_desc, size_t *export_desc_len)
Compose memory map representation for later import with doca_mmap_create_from_export() for one of the...
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_pe_connect_ctx(struct doca_pe *pe, struct doca_ctx *ctx)
This method connects a context to a progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE void doca_task_set_user_data(struct doca_task *task, union doca_data user_data)
Set user data to a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
DOCA_EXPERIMENTAL struct doca_buf * doca_rdma_task_receive_get_dst_buf(const struct doca_rdma_task_receive *task)
This method gets the destination buffer of a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_set_conf(struct doca_rdma *rdma, doca_rdma_task_send_completion_cb_t successful_task_completion_cb, doca_rdma_task_send_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the send tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_destroy(struct doca_rdma *rdma)
Destroy a DOCA RDMA instance.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_receive_as_task(struct doca_rdma_task_receive *task)
This method converts a receive task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_set_conf(struct doca_rdma *rdma, doca_rdma_task_receive_completion_cb_t successful_task_completion_cb, doca_rdma_task_receive_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the receive tasks configuration.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_send_as_task(struct doca_rdma_task_send *task)
This method converts a send task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_allocate_init(struct doca_rdma *rdma, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_receive **task)
This method allocates and initializes a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_export(struct doca_rdma *rdma, const void **local_rdma_conn_details, size_t *local_rdma_conn_details_size, struct doca_rdma_connection **rdma_connection)
Export doca_rdma connection details object The doca_rdma_conn_details are used in doca_rdma_connect()...
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, union doca_data user_data, struct doca_rdma_task_send **task)
This method allocates and initializes a send task.
DOCA_EXPERIMENTAL struct doca_ctx * doca_rdma_as_ctx(struct doca_rdma *rdma)
Convert doca_rdma instance into a generalized context for use with doca core objects.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_connect(struct doca_rdma *rdma, const void *remote_rdma_conn_details, size_t remote_rdma_conn_details_size, struct doca_rdma_connection *rdma_connection)
Connect to remote doca_rdma peer. Can only be called when the ctx is in DOCA_CTX_STATE_STARTING state...
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
Definition: doca_types.h:83
@ DOCA_ACCESS_FLAG_RDMA_READ
Definition: doca_types.h:84
@ DOCA_ACCESS_FLAG_PCI_READ_WRITE
Definition: doca_types.h:91
@ DOCA_ACCESS_FLAG_RDMA_WRITE
Definition: doca_types.h:85
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
Definition: doca_version.h:90
const struct ip_frag_config * cfg
Definition: ip_frag_dp.c:0
type value
std::unique_ptr< storage::control::comch_channel > make_comch_server_control_channel(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::unique_ptr< storage::control::channel > make_tcp_client_control_channel(storage::ip_address const &server_address)
std::string to_string(storage::control::message_type type)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
Definition: os_utils.cpp:53
ip_address parse_ip_v4_address(char const *value)
Definition: ip_address.cpp:50
void uninstall_ctrl_c_handler()
Definition: os_utils.cpp:121
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
Definition: doca_utils.cpp:146
static constexpr value_requirement optional_value
Definition: doca_utils.hpp:228
void create_doca_logger_backend(void) noexcept
Definition: doca_utils.cpp:471
void install_ctrl_c_handler(std::function< void(void)> callback)
Definition: os_utils.cpp:106
void aligned_free(void *memory)
Definition: os_utils.cpp:131
doca_comch_producer * make_comch_producer(doca_comch_connection *conn, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_producer_task_send_completion_cb_t task_cb, doca_comch_producer_task_send_completion_cb_t error_cb)
Definition: doca_utils.cpp:279
doca_rdma * make_rdma_context(doca_dev *dev, doca_pe *pe, doca_data ctx_user_data, uint32_t permissions)
Definition: doca_utils.cpp:331
static constexpr value_multiplicity multiple_values
Definition: doca_utils.hpp:231
void * aligned_alloc(size_t alignment, size_t size)
Definition: os_utils.cpp:126
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
Definition: doca_utils.cpp:413
doca_ec_matrix_type matrix_type_from_string(std::string const &matrix_type)
Definition: doca_utils.cpp:503
size_t aligned_size(size_t alignment, size_t size)
char * get_buffer_bytes(doca_buf *buf) noexcept
Definition: doca_utils.hpp:260
doca_dev_rep * open_representor(doca_dev *dev, std::string const &identifier)
Definition: doca_utils.cpp:104
static constexpr value_multiplicity single_value
Definition: doca_utils.hpp:230
doca_comch_consumer * make_comch_consumer(doca_comch_connection *conn, doca_mmap *mmap, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_consumer_task_post_recv_completion_cb_t task_cb, doca_comch_consumer_task_post_recv_completion_cb_t error_cb)
Definition: doca_utils.cpp:224
constexpr size_t size_of_io_message
Definition: io_message.hpp:53
constexpr uint32_t cache_line_size
Definition: definitions.hpp:40
static constexpr value_requirement required_value
Definition: doca_utils.hpp:227
uint32_t get_system_page_size(void)
Definition: os_utils.cpp:95
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
Definition: doca_utils.cpp:369
doca_dev * open_device(std::string const &identifier)
Definition: doca_utils.cpp:43
uint8_t type
Definition: packets.h:0
#define false
Definition: stdbool.h:22
storage::control::correlation_id correlation_id
storage::control::message_id message_id
std::unique_ptr< storage::control::message::payload > payload
storage::control::message_type message_type
Convenience type for representing opaque data.
Definition: doca_types.h:56
uint64_t u64
Definition: doca_types.h:58
void * ptr
Definition: doca_types.h:57
struct upf_accel_ctx * ctx