NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
target_rdma.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2025 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include <atomic>
27 #include <chrono>
28 #include <cstdio>
29 #include <memory>
30 #include <numeric>
31 #include <stdexcept>
32 #include <sstream>
33 #include <thread>
34 
35 #include <doca_argp.h>
36 #include <doca_buf.h>
37 #include <doca_buf_inventory.h>
38 #include <doca_ctx.h>
39 #include <doca_dev.h>
40 #include <doca_error.h>
41 #include <doca_log.h>
42 #include <doca_mmap.h>
43 #include <doca_pe.h>
44 #include <doca_version.h>
45 
56 
57 DOCA_LOG_REGISTER(TARGET_RDMA);
58 
59 using namespace std::string_literals;
60 
61 namespace {
62 
63 auto constexpr app_name = "doca_storage_target_rdma";
64 
65 auto constexpr default_storage_block_size = 4096;
66 auto constexpr default_storage_block_count = 128;
67 
68 static_assert(sizeof(void *) == 8, "Expected a pointer to occupy 8 bytes");
69 
70 auto constexpr rdma_permissions = DOCA_ACCESS_FLAG_LOCAL_READ_WRITE | DOCA_ACCESS_FLAG_RDMA_READ |
72 
73 /*
74  * User configurable parameters for the target_rdma_app
75  */
76 struct target_rdma_app_configuration {
77  std::vector<uint32_t> core_set = {};
78  std::string device_id = {};
79  std::string storage_content_file_name = {};
80  uint32_t block_count = {};
81  uint32_t block_size = {};
82  uint16_t listen_port = {};
83  std::vector<uint8_t> content = {};
84 };
85 
86 /*
87  * Statistics emitted by each thread
88  */
89 struct target_rdma_worker_stats {
90  uint32_t core_idx = 0;
91  uint64_t pe_hit_count = 0;
92  uint64_t pe_miss_count = 0;
93  uint64_t operation_count = 0;
94 };
95 
96 /*
97  * Context for a transaction (io request => data transfer => io response)
98  */
99 struct alignas(storage::cache_line_size / 2) transfer_context {
100  doca_rdma_task_write *write_task = nullptr;
101  doca_rdma_task_read *read_task = nullptr;
102  doca_buf *host_buf = nullptr;
103  doca_buf *storage_buf = nullptr;
104 };
105 
106 /*
107  * Data required for a thread worker
108  */
109 class target_rdma_worker {
110 public:
111  /*
112  * A set of data that can be used in the data path, NO OTHER MEMORY SHOULD BE ACCESSED in the main loop or task
113  * callbacks. This is done to keep the maximum amount of useful data resident in the cache while avoiding as
114  * many cache evictions as possible.
115  */
116  struct alignas(storage::cache_line_size) hot_data {
117  doca_pe *pe;
118  uint64_t pe_hit_count;
119  uint64_t pe_miss_count;
120  char *remote_memory_start_addr;
121  char *local_memory_start_addr;
122  uint64_t completed_transaction_count;
123  uint32_t in_flight_transaction_count;
124  uint32_t core_idx;
125  std::atomic_bool run_flag;
126  bool error_flag;
127 
128  /*
129  * Default constructor
130  */
131  hot_data();
132 
133  /*
134  * Deleted copy constructor
135  */
136  hot_data(hot_data const &) = delete;
137 
138  /*
139  * Move constructor
140  * @other [in]: Object to move from
141  */
142  hot_data(hot_data &&other) noexcept;
143 
144  /*
145  * Deleted copy assignment operator
146  */
147  hot_data &operator=(hot_data const &) = delete;
148 
149  /*
150  * Move assignment operator
151  * @other [in]: Object to move from
152  * @return: reference to moved assigned object
153  */
154  hot_data &operator=(hot_data &&other) noexcept;
155  };
156  static_assert(sizeof(target_rdma_worker::hot_data) == storage::cache_line_size,
157  "Expected target_rdma_worker::hot_data to occupy one cache line");
158 
159  /*
160  * Destructor
161  */
162  ~target_rdma_worker();
163 
164  /*
165  * Deleted default constructor
166  */
167  target_rdma_worker() = delete;
168  /*
169  * Constructor
170  * @dev [in]: Device to use
171  * @task_count [in]: Number of tasks to use
172  * @remote_mmap [in]: Reference to remote (client) mmap
173  * @local_mmap [in]: Reference to local (storage) mmap
174  */
175  target_rdma_worker(doca_dev *dev, uint32_t task_count, doca_mmap *remote_mmap, doca_mmap *local_mmap);
176 
177  /*
178  * Deleted copy constructor
179  */
180  target_rdma_worker(target_rdma_worker const &) = delete;
181 
182  /*
183  * Move constructor
184  * @other [in]: Object to move from
185  */
186  [[maybe_unused]] target_rdma_worker(target_rdma_worker &&other) noexcept;
187 
188  /*
189  * Deleted copy assignment operator
190  */
191  target_rdma_worker &operator=(target_rdma_worker const &) = delete;
192 
193  /*
194  * Move assignment operator
195  * @other [in]: Object to move from
196  * @return: reference to moved assigned object
197  */
198  [[maybe_unused]] target_rdma_worker &operator=(target_rdma_worker &&other) noexcept;
199 
200  /*
201  * Create a RDMA connection
202  *
203  * @role [in]: Role of the connection
204  * @remote_conn_details [in]: Remote connection details
205  * @return: Local connection details for use by the remote to connect to
206  */
207  std::vector<uint8_t> create_rdma_connection(storage::control::rdma_connection_role role,
208  std::vector<uint8_t> const &remote_conn_details);
209 
210  /*
211  * Get the current state of this objects RDMA connections
212  *
213  * @return doca_error_t:
214  * DOCA_SUCCESS - all connections are ready
215  * DOCA_ERROR_IN_PROGRESS - One or more connections are still pending
216  * All other status codes represent a specific error that has occurred, the connections will not recover
217  * from such an error
218  */
219  [[nodiscard]] doca_error_t get_rdma_connection_state() const noexcept;
220 
221  /*
222  * Stop and join this thread
223  */
224  void stop_processing(void) noexcept;
225 
226  /*
227  * Create all tasks and submit initial tasks
228  */
229  void prepare_and_submit_tasks(void);
230 
231  /*
232  * Prepare the worker thread
233  *
234  * @core_id [in]: Core to bind to
235  */
236  void prepare_thread_proc(uint32_t core_id);
237 
238  /*
239  * Start the thread
240  */
241  void start_thread_proc(void);
242 
243  /*
244  * Get a view of this objects hot data
245  *
246  * @return hot data
247  */
248  [[nodiscard]] hot_data const &get_hot_data() const noexcept;
249 
250 private:
251  hot_data m_hot_data;
252  uint8_t *m_io_message_region;
253  doca_mmap *m_io_message_mmap;
254  doca_buf_inventory *m_buf_inv;
255  std::vector<doca_buf *> m_bufs;
256  storage::rdma_conn_pair m_rdma_ctrl_ctx;
257  storage::rdma_conn_pair m_rdma_data_ctx;
258  doca_mmap *m_local_mmap;
259  doca_mmap *m_remote_mmap;
260  uint32_t m_task_count;
261  uint32_t m_transfer_contexts_size;
262  transfer_context *m_transfer_contexts;
263  std::vector<doca_task *> m_ctrl_tasks;
264  std::vector<doca_task *> m_data_tasks;
265  std::thread m_thread;
266 
267  /*
268  * Allocate and prepare resources for this object
269  *
270  * @dev [in]: Device to use
271  */
272  void init(doca_dev *dev);
273 
274  /*
275  * Release all resources held by this object
276  */
277  void cleanup() noexcept;
278 
279  /*
280  * RDMA task send callback
281  *
282  * @task [in]: Completed task
283  * @task_user_data [in]: Data associated with the task
284  * @ctx_user_data [in]: Data associated with the context
285  */
286  static void doca_rdma_task_send_cb(doca_rdma_task_send *task,
287  doca_data task_user_data,
288  doca_data ctx_user_data) noexcept;
289 
290  /*
291  * RDMA task send error callback
292  *
293  * @task [in]: Failed task
294  * @task_user_data [in]: Data associated with the task
295  * @ctx_user_data [in]: Data associated with the context
296  */
297  static void doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
298  doca_data task_user_data,
299  doca_data ctx_user_data) noexcept;
300 
301  /*
302  * RDMA task receive callback
303  *
304  * @task [in]: Completed task
305  * @task_user_data [in]: Data associated with the task
306  * @ctx_user_data [in]: Data associated with the context
307  */
308  static void doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
309  doca_data task_user_data,
310  doca_data ctx_user_data) noexcept;
311 
312  /*
313  * RDMA task receive error callback
314  *
315  * @task [in]: Failed task
316  * @task_user_data [in]: Data associated with the task
317  * @ctx_user_data [in]: Data associated with the context
318  */
319  static void doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
320  doca_data task_user_data,
321  doca_data ctx_user_data) noexcept;
322 
323  /*
324  * Shared RDMA read/write callback
325  *
326  * @task [in]: Completed task
327  * @task_user_data [in]: Data associated with the task
328  * @ctx_user_data [in]: Data associated with the context
329  */
330  static void on_transfer_complete(doca_task *task, doca_data task_user_data, doca_data ctx_user_data) noexcept;
331 
332  /*
333  * Shared RDMA read/write error callback
334  *
335  * @task [in]: Failed task
336  * @task_user_data [in]: Data associated with the task
337  * @ctx_user_data [in]: Data associated with the context
338  */
339  static void on_transfer_error(doca_task *task, doca_data task_user_data, doca_data ctx_user_data) noexcept;
340 
341  /*
342  * Thread process function to be exeuted on the hot path
343  */
344  void thread_proc();
345 };
346 
347 /*
348  * top level target_rdma_app structure
349  */
350 class target_rdma_app {
351 public:
352  ~target_rdma_app();
353  target_rdma_app() = delete;
354  explicit target_rdma_app(target_rdma_app_configuration const &cfg);
355  target_rdma_app(target_rdma_app const &) = delete;
356  target_rdma_app(target_rdma_app &&) noexcept = delete;
357  target_rdma_app &operator=(target_rdma_app const &) = delete;
358  target_rdma_app &operator=(target_rdma_app &&) noexcept = delete;
359 
360  void abort(std::string const &reason);
361 
362  void wait_for_client_connection(void);
363  void wait_for_and_process_query_storage(void);
364  void wait_for_and_process_init_storage(void);
365  void wait_for_and_process_create_rdma_connections(void);
366  void wait_for_and_process_start_storage(void);
367  void wait_for_and_process_stop_storage(void);
368  void wait_for_and_process_shutdown(void);
369  void display_stats(void) const;
370 
371 private:
372  target_rdma_app_configuration const m_cfg;
373  doca_dev *m_dev;
374  std::unique_ptr<storage::control::channel> m_control_channel;
375  std::vector<storage::control::message> m_ctrl_messages;
376  uint8_t *m_local_io_region;
377  uint64_t m_local_io_region_size;
378  doca_mmap *m_local_io_mmap;
379  doca_mmap *m_remote_io_mmap;
380  target_rdma_worker *m_workers;
381  std::vector<target_rdma_worker_stats> m_stats;
382  uint32_t m_storage_block_count;
383  uint32_t m_storage_block_size;
384  uint32_t m_task_count;
385  uint32_t m_core_count;
386  bool m_abort_flag;
387 
388  /*
389  * Allocate and prepare resources
390  */
391  void init(void);
392 
393  /*
394  * Release resources
395  */
396  void cleanup(void) noexcept;
397 
398  /*
399  * Wait for a control message
400  *
401  * @return: control message
402  */
403  storage::control::message wait_for_control_message();
404 
405  /*
406  * Process a query storage request
407  *
408  * @client_request [in]: Request
409  * @return: Response
410  */
411  storage::control::message process_query_storage(storage::control::message const &client_request);
412  /*
413  * Process a init storage request
414  *
415  * @client_request [in]: Request
416  * @return:Response
417  */
418  storage::control::message process_init_storage(storage::control::message const &client_request);
419 
420  /*
421  * Process a create RDMA connection request
422  *
423  * @client_request [in]: Request
424  * @return:Response
425  */
426  storage::control::message process_create_rdma_connection(storage::control::message const &client_request);
427 
428  /*
429  * Process a start storage request
430  *
431  * @client_request [in]: Request
432  * @return:Response
433  */
434  storage::control::message process_start_storage(storage::control::message const &client_request);
435 
436  /*
437  * Process a stop storage request
438  *
439  * @client_request [in]: Request
440  * @return:Response
441  */
442  storage::control::message process_stop_storage(storage::control::message const &client_request);
443 
444  /*
445  * Process a shutdown request
446  *
447  * @client_request [in]: Request
448  * @return:Response
449  */
450  storage::control::message process_shutdown(storage::control::message const &client_request);
451 
452  /*
453  * Prepare worker objects
454  */
455  void prepare_workers();
456 
457  /*
458  * Destroy workers
459  */
460  void destroy_workers() noexcept;
461 
462  /*
463  * Verify that RDMA connections are ready
464  */
465  void verify_connections_are_ready(void);
466 };
467 
468 /*
469  * Parse command line arguments
470  *
471  * @argc [in]: Number of arguments
472  * @argv [in]: Array of argument values
473  * @return: Parsed target_rdma_app_configuration
474  *
475  * @throws: storage::runtime_error If the target_rdma_app_configuration cannot pe parsed or contains invalid values
476  */
477 target_rdma_app_configuration parse_target_rdma_app_cli_args(int argc, char **argv);
478 
479 } // namespace
480 
481 /*
482  * Main
483  *
484  * @argc [in]: Number of arguments
485  * @argv [in]: Array of argument values
486  * @return: EXIT_SUCCESS on success and EXIT_FAILURE otherwise
487  */
488 int main(int argc, char **argv)
489 {
491 
492  printf("%s: v%s\n", app_name, doca_version());
493 
494  try {
495  target_rdma_app app{parse_target_rdma_app_cli_args(argc, argv)};
497  app.abort("User requested abort");
498  });
499 
500  app.wait_for_client_connection();
501  app.wait_for_and_process_query_storage();
502  app.wait_for_and_process_init_storage();
503  app.wait_for_and_process_create_rdma_connections();
504  app.wait_for_and_process_start_storage();
505  app.wait_for_and_process_stop_storage();
506  app.wait_for_and_process_shutdown();
507  app.display_stats();
508  } catch (std::exception const &ex) {
509  fprintf(stderr, "EXCEPTION: %s\n", ex.what());
510  fflush(stdout);
511  fflush(stderr);
512  return EXIT_FAILURE;
513  }
514 
516 
517  return EXIT_SUCCESS;
518 }
519 
520 namespace {
521 
522 /*
523  * Print the parsed target_rdma_app_configuration
524  *
525  * @cfg [in]: target_rdma_app_configuration to display
526  */
527 void print_config(target_rdma_app_configuration const &cfg) noexcept
528 {
529  printf("target_rdma_app_configuration: {\n");
530  printf("\tcore_set : [");
531  bool first = true;
532  for (auto cpu : cfg.core_set) {
533  if (first)
534  first = false;
535  else
536  printf(", ");
537  printf("%u", cpu);
538  }
539  printf("]\n");
540  printf("\tdevice : \"%s\",\n", cfg.device_id.c_str());
541  printf("\tstorage_content_file_name : \"%s\",\n", cfg.storage_content_file_name.c_str());
542  printf("\tlisten_port : %u\n", cfg.listen_port);
543  printf("\tblock_count : %u\n", cfg.block_count);
544  printf("\tblock_size : %u\n", cfg.block_size);
545  printf("}\n");
546 }
547 
548 /*
549  * Validate target_rdma_app_configuration
550  *
551  * @cfg [in]: target_rdma_app_configuration
552  */
553 void validate_target_rdma_app_configuration(target_rdma_app_configuration const &cfg)
554 {
555  std::vector<std::string> errors;
556 
557  if (cfg.storage_content_file_name.empty() && (cfg.block_size == 0 || cfg.block_count == 0)) {
558  errors.emplace_back(
559  "Invalid target_rdma_app_configuration: block-size and block-count must be non zero when binary-content is not provided");
560  }
561 
562  if (!errors.empty()) {
563  for (auto const &err : errors) {
564  printf("%s\n", err.c_str());
565  }
567  "Invalid target_rdma_app_configuration detected"};
568  }
569 }
570 
571 /*
572  * Parse command line arguments
573  *
574  * @argc [in]: Number of arguments
575  * @argv [in]: Array of argument values
576  * @return: Parsed target_rdma_app_configuration
577  *
578  * @throws: storage::runtime_error If the target_rdma_app_configuration cannot pe parsed or contains invalid values
579  */
580 target_rdma_app_configuration parse_target_rdma_app_cli_args(int argc, char **argv)
581 {
582  target_rdma_app_configuration config{};
583  config.block_count = default_storage_block_count;
584  config.block_size = default_storage_block_size;
585 
586  doca_error_t ret;
587 
588  ret = doca_argp_init(app_name, &config);
589  if (ret != DOCA_SUCCESS) {
590  throw storage::runtime_error{ret, "Failed to parse CLI args"};
591  }
592 
594  "d",
595  "device",
596  "Device identifier",
599  [](void *value, void *cfg) noexcept {
600  static_cast<target_rdma_app_configuration *>(cfg)->device_id =
601  static_cast<char const *>(value);
602  return DOCA_SUCCESS;
603  });
605  nullptr,
606  "cpu",
607  "CPU core to which the process affinity can be set",
610  [](void *value, void *cfg) noexcept {
611  static_cast<target_rdma_app_configuration *>(cfg)->core_set.push_back(
612  *static_cast<int *>(value));
613  return DOCA_SUCCESS;
614  });
616  nullptr,
617  "listen-port",
618  "TCP listen port number",
621  [](void *value, void *cfg) noexcept {
622  auto inv_val = *static_cast<int *>(value);
623  auto short_val = static_cast<uint16_t>(inv_val);
624 
625  if (inv_val != short_val)
627 
628  static_cast<target_rdma_app_configuration *>(cfg)->listen_port =
629  short_val;
630  return DOCA_SUCCESS;
631  });
634  nullptr,
635  "binary-content",
636  "Path to binary .sbc file containing the initial content to be represented by this storage instance",
639  [](void *value, void *cfg) noexcept {
640  static_cast<target_rdma_app_configuration *>(cfg)->storage_content_file_name =
641  static_cast<char const *>(value);
642  return DOCA_SUCCESS;
643  });
646  nullptr,
647  "block-count",
648  "Number of available storage blocks. (Ignored when using content binary file) Default: 128",
651  [](void *value, void *cfg) noexcept {
652  static_cast<target_rdma_app_configuration *>(cfg)->block_count =
653  *static_cast<uint32_t *>(value);
654  return DOCA_SUCCESS;
655  });
658  nullptr,
659  "block-size",
660  "Block size used by the storage. (Ignored when using content binary file) Default: 4096",
663  [](void *value, void *cfg) noexcept {
664  static_cast<target_rdma_app_configuration *>(cfg)->block_size = *static_cast<uint32_t *>(value);
665  return DOCA_SUCCESS;
666  });
667  ret = doca_argp_start(argc, argv);
668  if (ret != DOCA_SUCCESS) {
669  throw storage::runtime_error{ret, "Failed to parse CLI args"};
670  }
671 
672  static_cast<void>(doca_argp_destroy());
673 
674  if (!config.storage_content_file_name.empty()) {
675  if (storage::file_has_binary_content_header(config.storage_content_file_name)) {
676  auto sbc = storage::load_binary_content_from_file(config.storage_content_file_name);
677  config.block_count = sbc.block_count;
678  config.block_size = sbc.block_size;
679  config.content = std::move(sbc.content);
680  } else {
681  config.content = storage::load_file_bytes(config.storage_content_file_name);
682  auto const expected_content_size = uint64_t{config.block_size} * config.block_count;
683  if (config.content.size() != expected_content_size) {
686  "Selected input data file content : " + config.storage_content_file_name +
687  " : " + std::to_string(config.content.size()) +
688  " bytes does not match the storage size of " +
689  std::to_string(expected_content_size) + " bytes"};
690  }
691  }
692  }
693 
694  print_config(config);
695  validate_target_rdma_app_configuration(config);
696 
697  return config;
698 }
699 
700 target_rdma_worker::hot_data::hot_data()
701  : pe{nullptr},
702  pe_hit_count{0},
703  pe_miss_count{0},
704  remote_memory_start_addr{nullptr},
705  local_memory_start_addr{nullptr},
706  completed_transaction_count{0},
707  in_flight_transaction_count{0},
708  core_idx{0},
709  run_flag{false},
710  error_flag{false}
711 {
712 }
713 
714 target_rdma_worker::hot_data::hot_data(hot_data &&other) noexcept
715  : pe{other.pe},
716  pe_hit_count{other.pe_hit_count},
717  pe_miss_count{other.pe_miss_count},
718  remote_memory_start_addr{other.remote_memory_start_addr},
719  local_memory_start_addr{other.local_memory_start_addr},
720  completed_transaction_count{other.completed_transaction_count},
721  in_flight_transaction_count{other.in_flight_transaction_count},
722  core_idx{other.core_idx},
723  run_flag{other.run_flag.load()},
724  error_flag{other.error_flag}
725 {
726  other.pe = nullptr;
727 }
728 
729 target_rdma_worker::hot_data &target_rdma_worker::hot_data::operator=(hot_data &&other) noexcept
730 {
731  if (std::addressof(other) == this)
732  return *this;
733 
734  pe = other.pe;
735  pe_hit_count = other.pe_hit_count;
736  pe_miss_count = other.pe_miss_count;
737  remote_memory_start_addr = other.remote_memory_start_addr;
738  local_memory_start_addr = other.local_memory_start_addr;
739  completed_transaction_count = other.completed_transaction_count;
740  in_flight_transaction_count = other.in_flight_transaction_count;
741  core_idx = other.core_idx;
742  run_flag = other.run_flag.load();
743  error_flag = other.error_flag;
744 
745  other.pe = nullptr;
746 
747  return *this;
748 }
749 
750 target_rdma_worker::~target_rdma_worker()
751 {
752  if (m_thread.joinable()) {
753  m_hot_data.run_flag = false;
754  m_hot_data.error_flag = true;
755  m_thread.join();
756  }
757 
758  cleanup();
759 }
760 
761 target_rdma_worker::target_rdma_worker(doca_dev *dev, uint32_t task_count, doca_mmap *remote_mmap, doca_mmap *local_mmap)
762  : m_hot_data{},
763  m_io_message_region{nullptr},
764  m_io_message_mmap{nullptr},
765  m_buf_inv{nullptr},
766  m_bufs{},
767  m_rdma_ctrl_ctx{},
768  m_rdma_data_ctx{},
769  m_local_mmap{local_mmap},
770  m_remote_mmap{remote_mmap},
771  m_task_count{task_count},
772  m_transfer_contexts_size{0},
773  m_transfer_contexts{nullptr},
774  m_ctrl_tasks{},
775  m_data_tasks{},
776  m_thread{}
777 {
778  try {
779  init(dev);
780  } catch (storage::runtime_error const &) {
781  cleanup();
782  throw;
783  }
784 }
785 
786 target_rdma_worker::target_rdma_worker(target_rdma_worker &&other) noexcept
787  : m_hot_data{std::move(other.m_hot_data)},
788  m_io_message_region{other.m_io_message_region},
789  m_io_message_mmap{other.m_io_message_mmap},
790  m_buf_inv{other.m_buf_inv},
791  m_bufs{std::move(other.m_bufs)},
792  m_rdma_ctrl_ctx{other.m_rdma_ctrl_ctx},
793  m_rdma_data_ctx{other.m_rdma_data_ctx},
794  m_local_mmap{other.m_local_mmap},
795  m_remote_mmap{other.m_remote_mmap},
796  m_task_count{other.m_task_count},
797  m_transfer_contexts_size{other.m_transfer_contexts_size},
798  m_transfer_contexts{other.m_transfer_contexts},
799  m_ctrl_tasks{std::move(other.m_ctrl_tasks)},
800  m_data_tasks{std::move(other.m_data_tasks)},
801  m_thread{std::move(other.m_thread)}
802 {
803  other.m_io_message_region = nullptr;
804  other.m_io_message_mmap = nullptr;
805  other.m_buf_inv = nullptr;
806  other.m_rdma_ctrl_ctx = {};
807  other.m_rdma_data_ctx = {};
808  other.m_transfer_contexts = nullptr;
809 }
810 
811 target_rdma_worker &target_rdma_worker::operator=(target_rdma_worker &&other) noexcept
812 {
813  if (std::addressof(other) == this)
814  return *this;
815 
816  m_hot_data = std::move(other.m_hot_data);
817  m_io_message_region = other.m_io_message_region;
818  m_io_message_mmap = other.m_io_message_mmap;
819  m_buf_inv = other.m_buf_inv;
820  m_bufs = std::move(other.m_bufs);
821  m_rdma_ctrl_ctx = other.m_rdma_ctrl_ctx;
822  m_rdma_data_ctx = other.m_rdma_data_ctx;
823  m_local_mmap = other.m_local_mmap;
824  m_remote_mmap = other.m_remote_mmap;
825  m_task_count = other.m_task_count;
826  m_transfer_contexts_size = other.m_transfer_contexts_size;
827  m_transfer_contexts = other.m_transfer_contexts;
828  m_ctrl_tasks = std::move(other.m_ctrl_tasks);
829  m_data_tasks = std::move(other.m_data_tasks);
830  m_thread = std::move(other.m_thread);
831 
832  other.m_io_message_region = nullptr;
833  other.m_io_message_mmap = nullptr;
834  other.m_buf_inv = nullptr;
835  other.m_rdma_ctrl_ctx = {};
836  other.m_rdma_data_ctx = {};
837  other.m_transfer_contexts = nullptr;
838 
839  return *this;
840 }
841 
842 std::vector<uint8_t> target_rdma_worker::create_rdma_connection(storage::control::rdma_connection_role role,
843  std::vector<uint8_t> const &remote_conn_details)
844 {
845  auto &rdma_pair = role == storage::control::rdma_connection_role::io_data ? m_rdma_data_ctx : m_rdma_ctrl_ctx;
846 
847  auto local_connection_details = [this, &rdma_pair]() {
848  uint8_t const *blob = nullptr;
849  size_t blob_size = 0;
850 
851  auto const ret = doca_rdma_export(rdma_pair.rdma,
852  reinterpret_cast<void const **>(&blob),
853  &blob_size,
854  std::addressof(rdma_pair.conn));
855  if (ret != DOCA_SUCCESS) {
856  DOCA_LOG_ERR("Core: %u RDMA export failed: %s", m_hot_data.core_idx, doca_error_get_name(ret));
857  throw storage::runtime_error{ret, "Failed to export rdma connection"};
858  }
859  return std::vector<uint8_t>{blob, blob + blob_size};
860  }();
861 
862  auto const ret = doca_rdma_connect(rdma_pair.rdma,
863  remote_conn_details.data(),
864  remote_conn_details.size(),
865  rdma_pair.conn);
866  if (ret != DOCA_SUCCESS) {
867  DOCA_LOG_ERR("Core: %u RDMA connect failed: %s", m_hot_data.core_idx, doca_error_get_name(ret));
868  throw storage::runtime_error{ret, "Failed to connect to rdma"};
869  }
870 
871  return local_connection_details;
872 }
873 
874 doca_error_t target_rdma_worker::get_rdma_connection_state() const noexcept
875 {
876  doca_error_t ret;
877  doca_ctx_states rdma_state;
878  bool ctrl_connected = false;
879  bool data_connected = false;
880 
881  ret = doca_ctx_get_state(doca_rdma_as_ctx(m_rdma_ctrl_ctx.rdma), &rdma_state);
882  if (ret != DOCA_SUCCESS) {
883  DOCA_LOG_ERR("Failed to query rdma context state: %s", doca_error_get_name(ret));
884  return ret;
885  } else if (rdma_state == DOCA_CTX_STATE_RUNNING) {
886  ctrl_connected = true;
887  } else {
888  static_cast<void>(doca_pe_progress(m_hot_data.pe));
889  }
890 
891  ret = doca_ctx_get_state(doca_rdma_as_ctx(m_rdma_data_ctx.rdma), &rdma_state);
892  if (ret != DOCA_SUCCESS) {
893  DOCA_LOG_ERR("Failed to query rdma context state: %s", doca_error_get_name(ret));
894  return ret;
895  } else if (rdma_state == DOCA_CTX_STATE_RUNNING) {
896  data_connected = true;
897  } else {
898  static_cast<void>(doca_pe_progress(m_hot_data.pe));
899  }
900 
901  return ctrl_connected && data_connected ? DOCA_SUCCESS : DOCA_ERROR_IN_PROGRESS;
902 }
903 
904 void target_rdma_worker::stop_processing(void) noexcept
905 {
906  m_hot_data.run_flag = false;
907  if (m_thread.joinable()) {
908  m_thread.join();
909  }
910 }
911 
912 void target_rdma_worker::prepare_and_submit_tasks(void)
913 {
914  doca_error_t ret;
915  uint8_t *message_buffer_addr = m_io_message_region;
916  size_t local_memory_size = 0;
917  size_t remote_memory_size = 0;
918  static_cast<void>(doca_mmap_get_memrange(m_local_mmap,
919  reinterpret_cast<void **>(&m_hot_data.local_memory_start_addr),
920  &local_memory_size));
921  static_cast<void>(doca_mmap_get_memrange(m_remote_mmap,
922  reinterpret_cast<void **>(&m_hot_data.remote_memory_start_addr),
923  &remote_memory_size));
924 
925  if (remote_memory_size < local_memory_size) {
927  "Unable to start storage, remote memory region is to small(" +
928  std::to_string(remote_memory_size) +
929  " bytes) This storage instance requires it to be at least: " +
930  std::to_string(local_memory_size) + " bytes"};
931  }
932  if (local_memory_size != remote_memory_size) {}
933 
934  std::vector<doca_task *> request_tasks;
935  request_tasks.reserve(m_task_count);
936  m_ctrl_tasks.reserve(m_task_count * 2);
937  m_data_tasks.reserve(m_task_count * 2);
938  m_bufs.reserve(m_task_count * 3);
939 
940  m_transfer_contexts = storage::make_aligned<transfer_context>{}.object_array(m_task_count);
941 
942  for (uint32_t ii = 0; ii != m_task_count; ++ii) {
943  doca_buf *message_buf;
944 
945  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
946  m_io_message_mmap,
947  message_buffer_addr,
949  &message_buf);
950  if (ret != DOCA_SUCCESS) {
951  throw storage::runtime_error{ret, "Failed to allocate io_message doca_buf"};
952  }
953 
954  m_bufs.push_back(message_buf);
955  message_buffer_addr += storage::size_of_io_message;
956 
957  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
958  m_local_mmap,
959  m_hot_data.local_memory_start_addr,
960  local_memory_size,
961  std::addressof(m_transfer_contexts[ii].storage_buf));
962  if (ret != DOCA_SUCCESS) {
963  throw storage::runtime_error{ret, "Failed to allocate local storage doca_buf"};
964  }
965 
966  m_bufs.push_back(m_transfer_contexts[ii].storage_buf);
967 
968  ret = doca_buf_inventory_buf_get_by_addr(m_buf_inv,
969  m_remote_mmap,
970  m_hot_data.remote_memory_start_addr,
971  remote_memory_size,
972  std::addressof(m_transfer_contexts[ii].host_buf));
973  if (ret != DOCA_SUCCESS) {
974  throw storage::runtime_error{ret, "Failed to allocate remote storage doca_buf"};
975  }
976 
977  m_bufs.push_back(m_transfer_contexts[ii].host_buf);
978 
979  doca_rdma_task_receive *request_task = nullptr;
980  ret = doca_rdma_task_receive_allocate_init(m_rdma_ctrl_ctx.rdma,
981  message_buf,
982  doca_data{.ptr = std::addressof(m_transfer_contexts[ii])},
983  &request_task);
984  if (ret != DOCA_SUCCESS) {
985  throw storage::runtime_error{ret, "Failed to allocate doca_rdma_task_receive"};
986  }
987  m_ctrl_tasks.push_back(doca_rdma_task_receive_as_task(request_task));
988  request_tasks.push_back(doca_rdma_task_receive_as_task(request_task));
989 
990  doca_rdma_task_send *response_task = nullptr;
991  ret = doca_rdma_task_send_allocate_init(m_rdma_ctrl_ctx.rdma,
992  m_rdma_ctrl_ctx.conn,
993  message_buf,
994  doca_data{.ptr = request_task},
995  &response_task);
996  if (ret != DOCA_SUCCESS) {
997  throw storage::runtime_error{ret, "Failed to allocate doca_rdma_task_send"};
998  }
999  m_ctrl_tasks.push_back(doca_rdma_task_send_as_task(response_task));
1000 
1001  ret = doca_rdma_task_write_allocate_init(m_rdma_data_ctx.rdma,
1002  m_rdma_data_ctx.conn,
1003  m_transfer_contexts[ii].storage_buf,
1004  m_transfer_contexts[ii].host_buf,
1005  doca_data{.ptr = response_task},
1006  std::addressof(m_transfer_contexts[ii].write_task));
1007  if (ret != DOCA_SUCCESS) {
1008  throw storage::runtime_error{ret, "Failed to allocate doca_rdma_task_write"};
1009  }
1010  m_data_tasks.push_back(doca_rdma_task_write_as_task(m_transfer_contexts[ii].write_task));
1011 
1012  ret = doca_rdma_task_read_allocate_init(m_rdma_data_ctx.rdma,
1013  m_rdma_data_ctx.conn,
1014  m_transfer_contexts[ii].host_buf,
1015  m_transfer_contexts[ii].storage_buf,
1016  doca_data{.ptr = response_task},
1017  std::addressof(m_transfer_contexts[ii].read_task));
1018  if (ret != DOCA_SUCCESS) {
1019  throw storage::runtime_error{ret, "Failed to allocate doca_rdma_task_read"};
1020  }
1021  m_data_tasks.push_back(doca_rdma_task_read_as_task(m_transfer_contexts[ii].read_task));
1022  }
1023 
1024  for (auto *task : request_tasks) {
1025  ret = doca_task_submit(task);
1026  if (ret != DOCA_SUCCESS) {
1027  throw storage::runtime_error{ret, "Failed to allocate doca_rdma_task_read"};
1028  }
1029  }
1030 }
1031 
1032 void target_rdma_worker::prepare_thread_proc(uint32_t core_id)
1033 {
1034  m_thread = std::thread{[this]() {
1035  try {
1036  thread_proc();
1037  } catch (std::exception const &ex) {
1038  DOCA_LOG_ERR("Core: %u Exception: %s", m_hot_data.core_idx, ex.what());
1039  m_hot_data.error_flag = true;
1040  m_hot_data.run_flag = false;
1041  }
1042  }};
1043  m_hot_data.core_idx = core_id;
1044  storage::set_thread_affinity(m_thread, m_hot_data.core_idx);
1045 }
1046 
1047 void target_rdma_worker::start_thread_proc()
1048 {
1049  m_hot_data.run_flag = true;
1050 }
1051 
1052 target_rdma_worker::hot_data const &target_rdma_worker::get_hot_data(void) const noexcept
1053 {
1054  return m_hot_data;
1055 }
1056 
1057 void target_rdma_worker::init(doca_dev *dev)
1058 {
1059  doca_error_t ret;
1060  auto const page_size = storage::get_system_page_size();
1061 
1062  auto const raw_io_messages_size = m_task_count * storage::size_of_io_message * 2;
1063 
1064  m_io_message_region = static_cast<uint8_t *>(
1065  storage::aligned_alloc(page_size, storage::aligned_size(page_size, raw_io_messages_size)));
1066  if (m_io_message_region == nullptr) {
1067  throw storage::runtime_error{DOCA_ERROR_NO_MEMORY, "Failed to allocate comch fast path buffers memory"};
1068  }
1069 
1070  m_io_message_mmap = storage::make_mmap(dev,
1071  reinterpret_cast<char *>(m_io_message_region),
1072  raw_io_messages_size,
1073  rdma_permissions);
1074 
1075  ret = doca_buf_inventory_create(m_task_count * 3, &m_buf_inv);
1076  if (ret != DOCA_SUCCESS) {
1077  throw storage::runtime_error{ret, "Failed to create comch fast path doca_buf_inventory"};
1078  }
1079  ret = doca_buf_inventory_start(m_buf_inv);
1080  if (ret != DOCA_SUCCESS) {
1081  throw storage::runtime_error{ret, "Failed to start comch fast path doca_buf_inventory"};
1082  }
1083 
1084  ret = doca_pe_create(std::addressof(m_hot_data.pe));
1085  if (ret != DOCA_SUCCESS) {
1086  throw storage::runtime_error{ret, "Failed to create doca_pe"};
1087  }
1088 
1089  m_rdma_ctrl_ctx.rdma = storage::make_rdma_context(dev,
1090  m_hot_data.pe,
1091  doca_data{.ptr = std::addressof(m_hot_data)},
1092  rdma_permissions);
1093 
1094  ret = doca_rdma_task_receive_set_conf(m_rdma_ctrl_ctx.rdma,
1095  doca_rdma_task_receive_cb,
1096  doca_rdma_task_receive_error_cb,
1097  m_task_count);
1098  if (ret != DOCA_SUCCESS) {
1099  throw storage::runtime_error{ret, "Failed to configure rdma receive task pool"};
1100  }
1101 
1102  ret = doca_rdma_task_send_set_conf(m_rdma_ctrl_ctx.rdma,
1103  doca_rdma_task_send_cb,
1104  doca_rdma_task_send_error_cb,
1105  m_task_count);
1106  if (ret != DOCA_SUCCESS) {
1107  throw storage::runtime_error{ret, "Failed to configure rdma send task pool"};
1108  }
1109 
1110  ret = doca_ctx_start(doca_rdma_as_ctx(m_rdma_ctrl_ctx.rdma));
1111  if (ret != DOCA_SUCCESS) {
1112  throw storage::runtime_error{ret, "Failed to start doca_rdma context"};
1113  }
1114 
1115  m_rdma_data_ctx.rdma = storage::make_rdma_context(dev,
1116  m_hot_data.pe,
1117  doca_data{.ptr = std::addressof(m_hot_data)},
1118  rdma_permissions);
1119 
1120  ret = doca_rdma_task_read_set_conf(m_rdma_data_ctx.rdma,
1121  reinterpret_cast<doca_rdma_task_read_completion_cb_t>(on_transfer_complete),
1122  reinterpret_cast<doca_rdma_task_read_completion_cb_t>(on_transfer_error),
1123  m_task_count);
1124  if (ret != DOCA_SUCCESS) {
1125  throw storage::runtime_error{
1126  ret,
1127  "Failed to set doca_rdma_task_read task pool target_rdma_app_configuration"};
1128  }
1129 
1131  m_rdma_data_ctx.rdma,
1132  reinterpret_cast<doca_rdma_task_write_completion_cb_t>(on_transfer_complete),
1133  reinterpret_cast<doca_rdma_task_write_completion_cb_t>(on_transfer_error),
1134  m_task_count);
1135  if (ret != DOCA_SUCCESS) {
1136  throw storage::runtime_error{
1137  ret,
1138  "Failed to set doca_rdma_task_write task pool target_rdma_app_configuration"};
1139  }
1140 
1141  ret = doca_ctx_start(doca_rdma_as_ctx(m_rdma_data_ctx.rdma));
1142  if (ret != DOCA_SUCCESS) {
1143  throw storage::runtime_error{ret, "Failed to start doca_rdma context"};
1144  }
1145 }
1146 
1147 void target_rdma_worker::cleanup() noexcept
1148 {
1149  doca_error_t ret;
1150 
1151  if (m_rdma_ctrl_ctx.rdma != nullptr) {
1152  /* stop context with tasks list (tasks must be destroyed to finish stopping process) */
1153  ret = storage::stop_context(doca_rdma_as_ctx(m_rdma_ctrl_ctx.rdma), m_hot_data.pe, m_ctrl_tasks);
1154  if (ret == DOCA_SUCCESS) {
1155  m_ctrl_tasks.clear();
1156  } else {
1157  DOCA_LOG_ERR("Failed to stop rdma control context: %s", doca_error_get_name(ret));
1158  }
1159 
1160  ret = doca_rdma_destroy(m_rdma_ctrl_ctx.rdma);
1161  if (ret == DOCA_SUCCESS) {
1162  m_rdma_ctrl_ctx.rdma = nullptr;
1163  } else {
1164  DOCA_LOG_ERR("Failed to destroy rdma control context: %s", doca_error_get_name(ret));
1165  }
1166  }
1167 
1168  if (m_rdma_data_ctx.rdma != nullptr) {
1169  /* stop context with tasks list (tasks must be destroyed to finish stopping process) */
1170  ret = storage::stop_context(doca_rdma_as_ctx(m_rdma_data_ctx.rdma), m_hot_data.pe, m_data_tasks);
1171  if (ret != DOCA_SUCCESS) {
1172  m_data_tasks.clear();
1173  } else {
1174  DOCA_LOG_ERR("Failed to stop rdma data context: %s", doca_error_get_name(ret));
1175  }
1176 
1177  ret = doca_rdma_destroy(m_rdma_data_ctx.rdma);
1178  if (ret == DOCA_SUCCESS) {
1179  m_rdma_data_ctx.rdma = nullptr;
1180  } else {
1181  DOCA_LOG_ERR("Failed to destroy rdma data context: %s", doca_error_get_name(ret));
1182  }
1183  }
1184 
1185  if (m_hot_data.pe != nullptr) {
1186  ret = doca_pe_destroy(m_hot_data.pe);
1187  if (ret == DOCA_SUCCESS) {
1188  m_hot_data.pe = nullptr;
1189  } else {
1190  DOCA_LOG_ERR("Failed to destroy progress engine");
1191  }
1192  }
1193 
1194  if (m_transfer_contexts != nullptr) {
1195  storage::aligned_free(m_transfer_contexts);
1196  m_transfer_contexts = nullptr;
1197  }
1198 
1199  for (auto *buf : m_bufs) {
1200  static_cast<void>(doca_buf_dec_refcount(buf, nullptr));
1201  }
1202 
1203  if (m_buf_inv) {
1204  ret = doca_buf_inventory_stop(m_buf_inv);
1205  if (ret != DOCA_SUCCESS) {
1206  DOCA_LOG_ERR("Failed to stop buffer inventory");
1207  }
1208  ret = doca_buf_inventory_destroy(m_buf_inv);
1209  if (ret == DOCA_SUCCESS) {
1210  m_buf_inv = nullptr;
1211  } else {
1212  DOCA_LOG_ERR("Failed to destroy buffer inventory");
1213  }
1214  }
1215 
1216  if (m_io_message_mmap) {
1217  ret = doca_mmap_stop(m_io_message_mmap);
1218  if (ret != DOCA_SUCCESS) {
1219  DOCA_LOG_ERR("Failed to stop mmap");
1220  }
1221  ret = doca_mmap_destroy(m_io_message_mmap);
1222  if (ret == DOCA_SUCCESS) {
1223  m_io_message_mmap = nullptr;
1224  } else {
1225  DOCA_LOG_ERR("Failed to destroy mmap");
1226  }
1227  }
1228 
1229  if (m_io_message_region != nullptr) {
1230  storage::aligned_free(m_io_message_region);
1231  }
1232 }
1233 
1234 void target_rdma_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
1235  doca_data task_user_data,
1236  doca_data ctx_user_data) noexcept
1237 {
1238  static_cast<void>(task);
1239  static_cast<void>(ctx_user_data);
1240 
1241  auto *const request_task = static_cast<doca_rdma_task_receive *>(task_user_data.ptr);
1242 
1244  auto const ret = doca_task_submit(doca_rdma_task_receive_as_task(request_task));
1245  if (ret != DOCA_SUCCESS) {
1246  DOCA_LOG_ERR("Failed re-submit request task: %s", doca_error_get_name(ret));
1247  }
1248 
1249  auto *const hot_data = static_cast<target_rdma_worker::hot_data *>(ctx_user_data.ptr);
1250  --(hot_data->in_flight_transaction_count);
1251 }
1252 
1253 void target_rdma_worker::doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
1254  doca_data task_user_data,
1255  doca_data ctx_user_data) noexcept
1256 {
1257  static_cast<void>(task);
1258  static_cast<void>(task_user_data);
1259 
1260  DOCA_LOG_ERR("Failed to complete doca_rdma_task_send");
1261 
1262  auto *const hot_data = static_cast<target_rdma_worker::hot_data *>(ctx_user_data.ptr);
1263  --(hot_data->in_flight_transaction_count);
1264  hot_data->run_flag = false;
1265  hot_data->error_flag = true;
1266 }
1267 
1268 void target_rdma_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
1269  doca_data task_user_data,
1270  doca_data ctx_user_data) noexcept
1271 {
1272  doca_error_t ret;
1273  auto *const hot_data = static_cast<target_rdma_worker::hot_data *>(ctx_user_data.ptr);
1274 
1275  auto *const io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task));
1276  auto const message_type = storage::io_message_view::get_type(io_message);
1277 
1278  auto *const transfer_ctx = static_cast<transfer_context *>(task_user_data.ptr);
1279 
1280  switch (message_type) {
1282  size_t const offset = reinterpret_cast<char *>(storage::io_message_view::get_io_address(io_message)) -
1283  hot_data->remote_memory_start_addr;
1284 
1285  char *const remote_addr = hot_data->remote_memory_start_addr + offset +
1287  char *const local_addr = hot_data->local_memory_start_addr + offset;
1288  uint32_t const transfer_size = storage::io_message_view::get_io_size(io_message);
1289 
1290  ret = doca_buf_set_data(transfer_ctx->host_buf, remote_addr, 0);
1291  if (ret != DOCA_SUCCESS) {
1292  DOCA_LOG_ERR("Failed to set transfer host memory range: %s", doca_error_get_name(ret));
1293  break;
1294  }
1295 
1296  ret = doca_buf_set_data(transfer_ctx->storage_buf, local_addr, transfer_size);
1297  if (ret != DOCA_SUCCESS) {
1298  DOCA_LOG_ERR("Failed to set transfer storage memory range: %s", doca_error_get_name(ret));
1299  break;
1300  }
1301  doca_rdma_task_write_set_dst_buf(transfer_ctx->write_task, transfer_ctx->host_buf);
1302  doca_rdma_task_write_set_src_buf(transfer_ctx->write_task, transfer_ctx->storage_buf);
1303 
1304  ret = doca_task_submit(doca_rdma_task_write_as_task(transfer_ctx->write_task));
1305  if (ret != DOCA_SUCCESS) {
1306  DOCA_LOG_ERR("Failed to submit doca_rdma_task_write: %s", doca_error_get_name(ret));
1307  break;
1308  }
1309 
1310  ++(hot_data->in_flight_transaction_count);
1311  DOCA_LOG_TRC(
1312  "Start read(%p) of %u bytes from storage: %p to remote: %p (in_flight_transaction_count: %u)",
1313  transfer_ctx->write_task,
1314  transfer_size,
1315  local_addr,
1316  remote_addr,
1317  hot_data->in_flight_transaction_count);
1318  } break;
1320  size_t const offset = reinterpret_cast<char *>(storage::io_message_view::get_io_address(io_message)) -
1321  hot_data->remote_memory_start_addr;
1322 
1323  char *const remote_addr = hot_data->remote_memory_start_addr + offset +
1325  char *const local_addr = hot_data->local_memory_start_addr + offset;
1326  uint32_t const transfer_size = storage::io_message_view::get_io_size(io_message);
1327 
1328  ret = doca_buf_set_data(transfer_ctx->host_buf, remote_addr, transfer_size);
1329  if (ret != DOCA_SUCCESS) {
1330  DOCA_LOG_ERR("Failed to set transfer host memory range: %s", doca_error_get_name(ret));
1331  break;
1332  }
1333 
1334  ret = doca_buf_set_data(transfer_ctx->storage_buf, local_addr, 0);
1335  if (ret != DOCA_SUCCESS) {
1336  DOCA_LOG_ERR("Failed to set transfer storage memory range: %s", doca_error_get_name(ret));
1337  break;
1338  }
1339  doca_rdma_task_read_set_dst_buf(transfer_ctx->read_task, transfer_ctx->storage_buf);
1340  doca_rdma_task_read_set_src_buf(transfer_ctx->read_task, transfer_ctx->host_buf);
1341 
1342  ret = doca_task_submit(doca_rdma_task_read_as_task(transfer_ctx->read_task));
1343  if (ret != DOCA_SUCCESS) {
1344  DOCA_LOG_ERR("Failed to submit doca_rdma_task_read: %s", doca_error_get_name(ret));
1345  break;
1346  }
1347 
1348  ++(hot_data->in_flight_transaction_count);
1349  DOCA_LOG_TRC(
1350  "Start write(%p) of %u bytes from remote: %p to storage: %p (in_flight_transaction_count: %u)",
1351  transfer_ctx->read_task,
1352  transfer_size,
1353  remote_addr,
1354  local_addr,
1355  hot_data->in_flight_transaction_count);
1356  } break;
1358  default:
1359  DOCA_LOG_ERR("Received message of unexpected type: %u", static_cast<uint32_t>(message_type));
1361  }
1362 
1363  if (ret == DOCA_SUCCESS)
1364  return;
1365 
1366  DOCA_LOG_ERR("Command error response: %s", storage::io_message_to_string(io_message).c_str());
1367  auto *const response_task = static_cast<doca_rdma_task_send *>(
1368  doca_task_get_user_data(doca_rdma_task_write_as_task(transfer_ctx->write_task)).ptr);
1369 
1371  storage::io_message_view::set_result(ret, io_message);
1372 
1373  ret = doca_task_submit(doca_rdma_task_send_as_task(response_task));
1374  if (ret != DOCA_SUCCESS) {
1375  DOCA_LOG_ERR("Failed submit response task: %s", doca_error_get_name(ret));
1376  }
1377 }
1378 
1379 void target_rdma_worker::doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
1380  doca_data task_user_data,
1381  doca_data ctx_user_data) noexcept
1382 {
1383  static_cast<void>(task);
1384  static_cast<void>(task_user_data);
1385 
1386  auto *const hot_data = static_cast<target_rdma_worker::hot_data *>(ctx_user_data.ptr);
1387  if (hot_data->run_flag) {
1388  DOCA_LOG_ERR("Failed to complete doca_rdma_task_receive");
1389 
1390  --(hot_data->in_flight_transaction_count);
1391  hot_data->run_flag = false;
1392  hot_data->error_flag = true;
1393  }
1394 }
1395 
1396 void target_rdma_worker::on_transfer_complete(doca_task *task,
1397  doca_data task_user_data,
1398  doca_data ctx_user_data) noexcept
1399 {
1400  static_cast<void>(task);
1401  static_cast<void>(ctx_user_data);
1402 
1403  auto *const hot_data = static_cast<target_rdma_worker::hot_data *>(ctx_user_data.ptr);
1404  auto *const response_task = static_cast<doca_rdma_task_send *>(task_user_data.ptr);
1405  auto *const io_message =
1406  storage::get_buffer_bytes(const_cast<doca_buf *>(doca_rdma_task_send_get_src_buf(response_task)));
1407 
1408  ++(hot_data->completed_transaction_count);
1409 
1412 
1413  auto const ret = doca_task_submit(doca_rdma_task_send_as_task(response_task));
1414  if (ret != DOCA_SUCCESS) {
1415  DOCA_LOG_ERR("Failed submit response task: %s", doca_error_get_name(ret));
1416  }
1417 }
1418 
1419 void target_rdma_worker::on_transfer_error(doca_task *task, doca_data task_user_data, doca_data ctx_user_data) noexcept
1420 {
1421  static_cast<void>(task);
1422 
1423  auto *const hot_data = static_cast<target_rdma_worker::hot_data *>(ctx_user_data.ptr);
1424  auto *const response_task = static_cast<doca_rdma_task_send *>(task_user_data.ptr);
1425  auto *const io_message =
1426  storage::get_buffer_bytes(const_cast<doca_buf *>(doca_rdma_task_send_get_src_buf(response_task)));
1427 
1428  ++(hot_data->completed_transaction_count);
1429  hot_data->error_flag = true;
1430 
1433 
1434  auto const ret = doca_task_submit(doca_rdma_task_send_as_task(response_task));
1435  if (ret != DOCA_SUCCESS) {
1436  DOCA_LOG_ERR("Failed submit response task: %s", doca_error_get_name(ret));
1437  }
1438 }
1439 
1440 void target_rdma_worker::thread_proc()
1441 {
1442  while (m_hot_data.run_flag == false) {
1443  std::this_thread::yield();
1444  if (m_hot_data.error_flag)
1445  return;
1446  }
1447 
1448  DOCA_LOG_INFO("Core: %u running", m_hot_data.core_idx);
1449 
1450  while (m_hot_data.run_flag) {
1451  doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1452  }
1453 
1454  while (m_hot_data.error_flag == false && m_hot_data.in_flight_transaction_count != 0) {
1455  doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1456  }
1457 
1458  DOCA_LOG_INFO("Core: %u complete", m_hot_data.core_idx);
1459 }
1460 
1461 target_rdma_app::~target_rdma_app()
1462 {
1463  cleanup();
1464 }
1465 
1466 target_rdma_app::target_rdma_app(target_rdma_app_configuration const &cfg)
1467  : m_cfg{cfg},
1468  m_dev{nullptr},
1469  m_control_channel{},
1470  m_ctrl_messages{},
1471  m_local_io_region{nullptr},
1472  m_local_io_region_size{0},
1473  m_local_io_mmap{nullptr},
1474  m_remote_io_mmap{nullptr},
1475  m_workers{},
1476  m_stats{},
1477  m_storage_block_count{},
1478  m_storage_block_size{},
1479  m_task_count{0},
1480  m_core_count{0},
1481  m_abort_flag{false}
1482 {
1483  try {
1484  init();
1485  } catch (std::exception const &) {
1486  cleanup();
1487  throw;
1488  }
1489 }
1490 
1491 void target_rdma_app::abort(std::string const &reason)
1492 {
1493  if (m_abort_flag)
1494  return;
1495 
1496  DOCA_LOG_ERR("Aborted: %s", reason.c_str());
1497  m_abort_flag = true;
1498 }
1499 
1500 void target_rdma_app::wait_for_client_connection(void)
1501 {
1502  while (!m_control_channel->is_connected()) {
1503  std::this_thread::sleep_for(std::chrono::milliseconds{100});
1504  }
1505 }
1506 
1507 void target_rdma_app::wait_for_and_process_query_storage(void)
1508 {
1509  DOCA_LOG_INFO("Wait for query storage...");
1510  auto const client_request = wait_for_control_message();
1511 
1512  doca_error_t err_code;
1513  std::string err_msg;
1514 
1515  if (client_request.message_type == storage::control::message_type::query_storage_request) {
1516  try {
1517  m_control_channel->send_message(process_query_storage(client_request));
1518  return;
1519  } catch (storage::runtime_error const &ex) {
1520  err_code = ex.get_doca_error();
1521  err_msg = ex.what();
1522  }
1523  } else {
1524  err_code = DOCA_ERROR_UNEXPECTED;
1525  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1527  }
1528 
1529  m_control_channel->send_message({
1531  client_request.message_id,
1532  client_request.correlation_id,
1533  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1534 
1535  });
1536 }
1537 
1538 void target_rdma_app::wait_for_and_process_init_storage(void)
1539 {
1540  DOCA_LOG_INFO("Wait for init storage...");
1541  auto const client_request = wait_for_control_message();
1542 
1543  doca_error_t err_code;
1544  std::string err_msg;
1545 
1546  if (client_request.message_type == storage::control::message_type::init_storage_request) {
1547  try {
1548  m_control_channel->send_message(process_init_storage(client_request));
1549  return;
1550  } catch (storage::runtime_error const &ex) {
1551  err_code = ex.get_doca_error();
1552  err_msg = ex.what();
1553  }
1554  } else {
1555  err_code = DOCA_ERROR_UNEXPECTED;
1556  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1558  }
1559 
1560  m_control_channel->send_message({
1562  client_request.message_id,
1563  client_request.correlation_id,
1564  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1565 
1566  });
1567 }
1568 
1569 void target_rdma_app::wait_for_and_process_create_rdma_connections(void)
1570 {
1571  DOCA_LOG_INFO("Wait for RDMA connections...");
1572 
1573  uint32_t remaining_connections = m_core_count * 2;
1574  while (remaining_connections != 0) {
1575  auto const client_request = wait_for_control_message();
1576 
1578  std::string err_msg;
1579 
1580  if (client_request.message_type == storage::control::message_type::create_rdma_connection_request) {
1581  try {
1582  m_control_channel->send_message(process_create_rdma_connection(client_request));
1583  --remaining_connections;
1584  continue;
1585  } catch (storage::runtime_error const &ex) {
1586  err_code = ex.get_doca_error();
1587  err_msg = ex.what();
1588  }
1589  } else {
1590  err_code = DOCA_ERROR_UNEXPECTED;
1591  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1593  }
1594 
1595  --remaining_connections;
1596  m_control_channel->send_message({
1598  client_request.message_id,
1599  client_request.correlation_id,
1600  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1601 
1602  });
1603 
1604  if (m_abort_flag) {
1606  "target_rdma_app aborted while preparing to start storage"};
1607  }
1608  }
1609 }
1610 
1611 void target_rdma_app::wait_for_and_process_start_storage(void)
1612 {
1613  DOCA_LOG_INFO("Wait for start storage...");
1614  auto const client_request = wait_for_control_message();
1615 
1616  doca_error_t err_code;
1617  std::string err_msg;
1618 
1619  if (client_request.message_type == storage::control::message_type::start_storage_request) {
1620  try {
1621  m_control_channel->send_message(process_start_storage(client_request));
1622  return;
1623  } catch (storage::runtime_error const &ex) {
1624  err_code = ex.get_doca_error();
1625  err_msg = ex.what();
1626  }
1627  } else {
1628  err_code = DOCA_ERROR_UNEXPECTED;
1629  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1631  }
1632 
1633  m_control_channel->send_message({
1635  client_request.message_id,
1636  client_request.correlation_id,
1637  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1638 
1639  });
1640 }
1641 
1642 void target_rdma_app::wait_for_and_process_stop_storage(void)
1643 {
1644  DOCA_LOG_INFO("Wait for stop storage...");
1645  auto const client_request = wait_for_control_message();
1646 
1647  doca_error_t err_code;
1648  std::string err_msg;
1649 
1650  if (client_request.message_type == storage::control::message_type::stop_storage_request) {
1651  try {
1652  m_control_channel->send_message(process_stop_storage(client_request));
1653  return;
1654  } catch (storage::runtime_error const &ex) {
1655  err_code = ex.get_doca_error();
1656  err_msg = ex.what();
1657  }
1658  } else {
1659  err_code = DOCA_ERROR_UNEXPECTED;
1660  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1662  }
1663 
1664  m_control_channel->send_message({
1666  client_request.message_id,
1667  client_request.correlation_id,
1668  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1669 
1670  });
1671 }
1672 
1673 void target_rdma_app::wait_for_and_process_shutdown(void)
1674 {
1675  DOCA_LOG_INFO("Wait for shutdown storage...");
1676  auto const client_request = wait_for_control_message();
1677 
1678  doca_error_t err_code;
1679  std::string err_msg;
1680 
1681  if (client_request.message_type == storage::control::message_type::shutdown_request) {
1682  try {
1683  m_control_channel->send_message(process_shutdown(client_request));
1684  return;
1685  } catch (storage::runtime_error const &ex) {
1686  err_code = ex.get_doca_error();
1687  err_msg = ex.what();
1688  }
1689  } else {
1690  err_code = DOCA_ERROR_UNEXPECTED;
1691  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1693  }
1694 
1695  m_control_channel->send_message({
1697  client_request.message_id,
1698  client_request.correlation_id,
1699  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1700 
1701  });
1702 }
1703 
1704 void target_rdma_app::display_stats(void) const
1705 {
1706  for (auto const &stats : m_stats) {
1707  auto const pe_hit_rate_pct =
1708  (static_cast<double>(stats.pe_hit_count) /
1709  (static_cast<double>(stats.pe_hit_count) + static_cast<double>(stats.pe_miss_count))) *
1710  100.;
1711 
1712  printf("+================================================+\n");
1713  printf("| Core: %u\n", stats.core_idx);
1714  printf("| Operation count: %lu\n", stats.operation_count);
1715  printf("| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct, stats.pe_hit_count, stats.pe_miss_count);
1716  }
1717 }
1718 
1719 void target_rdma_app::init(void)
1720 {
1721  m_dev = storage::open_device(m_cfg.device_id);
1722 
1723  m_storage_block_count = m_cfg.block_count;
1724  m_storage_block_size = m_cfg.block_size;
1725 
1726  auto const page_size = storage::get_system_page_size();
1727  m_local_io_region_size = uint64_t{m_storage_block_count} * m_storage_block_size;
1728  m_local_io_region = static_cast<uint8_t *>(storage::aligned_alloc(page_size, m_local_io_region_size));
1729 
1730  if (!m_cfg.content.empty()) {
1731  std::copy(std::begin(m_cfg.content), std::end(m_cfg.content), m_local_io_region);
1732  }
1733 
1734  m_control_channel = storage::control::make_tcp_server_control_channel(m_cfg.listen_port);
1735 }
1736 
1737 void target_rdma_app::cleanup(void) noexcept
1738 {
1739  m_control_channel.reset();
1740  destroy_workers();
1741  if (m_dev != nullptr) {
1742  auto const ret = doca_dev_close(m_dev);
1743  if (ret != DOCA_SUCCESS) {
1744  DOCA_LOG_ERR("Failed to close doca_dev(%p): %s", m_dev, doca_error_get_name(ret));
1745  }
1746  }
1747 }
1748 
1749 storage::control::message target_rdma_app::wait_for_control_message()
1750 {
1751  for (;;) {
1752  if (!m_ctrl_messages.empty()) {
1753  auto msg = std::move(m_ctrl_messages.front());
1754  m_ctrl_messages.erase(m_ctrl_messages.begin());
1755  return msg;
1756  }
1757 
1758  // Poll for new messages
1759  auto *new_msg = m_control_channel->poll();
1760  if (new_msg) {
1761  m_ctrl_messages.push_back(std::move(*new_msg));
1762  }
1763 
1764  if (m_abort_flag) {
1765  throw storage::runtime_error{
1767  "User aborted the target_rdma_app while waiting on a control message"};
1768  }
1769  }
1770 }
1771 
1772 storage::control::message target_rdma_app::process_query_storage(storage::control::message const &client_request)
1773 {
1774  return {
1776  client_request.message_id,
1777  client_request.correlation_id,
1778  std::make_unique<storage::control::storage_details_payload>(uint64_t{m_storage_block_size} *
1779  m_storage_block_count,
1780  m_storage_block_size),
1781  };
1782 }
1783 
1784 storage::control::message target_rdma_app::process_init_storage(storage::control::message const &client_request)
1785 {
1786  auto const *details =
1787  reinterpret_cast<storage::control::init_storage_payload const *>(client_request.payload.get());
1788 
1789  if (details->core_count > m_cfg.core_set.size()) {
1791  "Requested storage to use " + std::to_string(details->core_count) +
1792  " cores but only " + std::to_string(m_cfg.core_set.size()) +
1793  " are configured"};
1794  }
1795 
1796  m_core_count = details->core_count;
1797  m_task_count = details->task_count;
1798 
1799  m_local_io_mmap = storage::make_mmap(m_dev,
1800  reinterpret_cast<char *>(m_local_io_region),
1801  m_local_io_region_size,
1802  rdma_permissions);
1803  m_remote_io_mmap =
1804  storage::make_mmap(m_dev, details->mmap_export_blob.data(), details->mmap_export_blob.size());
1805 
1806  prepare_workers();
1807 
1808  return {
1810  client_request.message_id,
1811  client_request.correlation_id,
1812  {},
1813  };
1814 }
1815 
1816 storage::control::message target_rdma_app::process_create_rdma_connection(
1817  storage::control::message const &client_request)
1818 {
1819  auto const *details = reinterpret_cast<storage::control::rdma_connection_details_payload const *>(
1820  client_request.payload.get());
1821  if (details->context_idx > m_core_count) {
1823  "Unable to create RDMA connection for invalid context idx"};
1824  }
1825 
1826  auto export_blob =
1827  m_workers[details->context_idx].create_rdma_connection(details->role, details->connection_details);
1828 
1829  return {
1831  client_request.message_id,
1832  client_request.correlation_id,
1833  std::make_unique<storage::control::rdma_connection_details_payload>(details->context_idx,
1834  details->role,
1835  std::move(export_blob)),
1836  };
1837 }
1838 
1839 storage::control::message target_rdma_app::process_start_storage(storage::control::message const &client_request)
1840 {
1841  verify_connections_are_ready();
1842  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1843  m_workers[ii].prepare_and_submit_tasks();
1844  m_workers[ii].prepare_thread_proc(m_cfg.core_set[ii]);
1845  m_workers[ii].start_thread_proc();
1846  }
1847 
1848  return {
1850  client_request.message_id,
1851  client_request.correlation_id,
1852  {},
1853  };
1854 }
1855 
1856 storage::control::message target_rdma_app::process_stop_storage(storage::control::message const &client_request)
1857 {
1858  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1859  m_workers[ii].stop_processing();
1860  }
1861 
1862  return {
1864  client_request.message_id,
1865  client_request.correlation_id,
1866  {},
1867  };
1868 }
1869 
1870 storage::control::message target_rdma_app::process_shutdown(storage::control::message const &client_request)
1871 {
1872  m_stats.reserve(m_core_count);
1873  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1874  auto const &hot_data = m_workers[ii].get_hot_data();
1875  m_stats.push_back(target_rdma_worker_stats{
1876  m_cfg.core_set[ii],
1877  hot_data.pe_hit_count,
1878  hot_data.pe_miss_count,
1879  hot_data.completed_transaction_count,
1880  });
1881  }
1882 
1883  destroy_workers();
1884 
1885  return {
1887  client_request.message_id,
1888  client_request.correlation_id,
1889  {},
1890  };
1891 }
1892 
1893 void target_rdma_app::prepare_workers()
1894 {
1895  if (m_core_count > m_cfg.core_set.size()) {
1897  "Unable to create " + std::to_string(m_core_count) + " threads as only " +
1898  std::to_string(m_cfg.core_set.size()) + " cores were defined"};
1899  }
1900 
1901  m_workers = storage::make_aligned<target_rdma_worker>{}.object_array(m_core_count,
1902  m_dev,
1903  m_task_count,
1904  m_remote_io_mmap,
1905  m_local_io_mmap);
1906 }
1907 
1908 void target_rdma_app::destroy_workers(void) noexcept
1909 {
1910  if (m_workers != nullptr) {
1911  // Destroy all thread resources
1912  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1913  m_workers[ii].~target_rdma_worker();
1914  }
1915  storage::aligned_free(m_workers);
1916  m_workers = nullptr;
1917  }
1918 }
1919 
1920 void target_rdma_app::verify_connections_are_ready(void)
1921 {
1922  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1923  bool ready = false;
1924  do {
1925  if (m_abort_flag) {
1927  "Aborted while establishing storage connections"};
1928  }
1929 
1930  auto const ret = m_workers[ii].get_rdma_connection_state();
1931  if (ret == DOCA_SUCCESS)
1932  ready = true;
1933  else if (ret != DOCA_ERROR_IN_PROGRESS) {
1934  throw storage::runtime_error{ret, "Failure while establishing RDMA connections"};
1935  }
1936  } while (!ready);
1937  }
1938 }
1939 
1940 } /* namespace */
static void cleanup(struct cache_invalidate_sample_state *state)
static uint32_t get_io_size(char const *buf)
Definition: io_message.hpp:187
static void set_type(io_message_type type, char *buf)
Definition: io_message.hpp:79
static uint32_t get_remote_offset(char const *buf)
Definition: io_message.hpp:211
static io_message_type get_type(char const *buf)
Definition: io_message.hpp:66
static uint64_t get_io_address(char const *buf)
Definition: io_message.hpp:163
static void set_result(doca_error_t result, char *buf)
Definition: io_message.hpp:152
T * object_array(size_t object_count, Args &&...args) const
Definition: aligned_new.hpp:88
doca_error_t get_doca_error() const noexcept
Definition: definitions.hpp:81
static struct eth_l2_fwd_stats stats
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
@ DOCA_ARGP_TYPE_STRING
Definition: doca_argp.h:56
@ DOCA_ARGP_TYPE_INT
Definition: doca_argp.h:57
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE doca_error_t doca_buf_set_data(struct doca_buf *buf, void *data, size_t data_len)
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
doca_ctx_states
This enum defines the states of a context.
Definition: doca_ctx.h:83
@ DOCA_CTX_STATE_RUNNING
Definition: doca_ctx.h:98
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_INVALID_VALUE
Definition: doca_error.h:44
@ DOCA_ERROR_INITIALIZATION
Definition: doca_error.h:46
@ DOCA_ERROR_UNEXPECTED
Definition: doca_error.h:60
@ DOCA_ERROR_IO_FAILED
Definition: doca_error.h:55
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_NO_MEMORY
Definition: doca_error.h:45
@ DOCA_ERROR_IN_PROGRESS
Definition: doca_error.h:64
@ DOCA_ERROR_CONNECTION_RESET
Definition: doca_error.h:49
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_TRC(format,...)
Generates a TRACE application log message.
Definition: doca_log.h:513
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_get_memrange(const struct doca_mmap *mmap, void **addr, size_t *len)
Get the memory range of DOCA memory map.
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE union doca_data doca_task_get_user_data(const struct doca_task *task)
Get user data from a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
DOCA_EXPERIMENTAL struct doca_buf * doca_rdma_task_receive_get_dst_buf(const struct doca_rdma_task_receive *task)
This method gets the destination buffer of a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_set_conf(struct doca_rdma *rdma, doca_rdma_task_send_completion_cb_t successful_task_completion_cb, doca_rdma_task_send_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the send tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_destroy(struct doca_rdma *rdma)
Destroy a DOCA RDMA instance.
DOCA_EXPERIMENTAL const struct doca_buf * doca_rdma_task_send_get_src_buf(const struct doca_rdma_task_send *task)
This method gets the source buffer of a send task.
DOCA_EXPERIMENTAL void doca_rdma_task_write_set_dst_buf(struct doca_rdma_task_write *task, struct doca_buf *dst_buf)
This method sets the destination buffer of a write task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_read_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_read **task)
This method allocates and initializes a read task.
DOCA_EXPERIMENTAL void doca_rdma_task_write_set_src_buf(struct doca_rdma_task_write *task, const struct doca_buf *src_buf)
This method sets the source buffer of a write task.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_write_as_task(struct doca_rdma_task_write *task)
This method converts a write task to a doca_task.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_receive_as_task(struct doca_rdma_task_receive *task)
This method converts a receive task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_write_set_conf(struct doca_rdma *rdma, doca_rdma_task_write_completion_cb_t successful_task_completion_cb, doca_rdma_task_write_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the write tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_set_conf(struct doca_rdma *rdma, doca_rdma_task_receive_completion_cb_t successful_task_completion_cb, doca_rdma_task_receive_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the receive tasks configuration.
DOCA_EXPERIMENTAL void doca_rdma_task_read_set_dst_buf(struct doca_rdma_task_read *task, struct doca_buf *dst_buf)
This method sets the destination buffer of a read task.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_send_as_task(struct doca_rdma_task_send *task)
This method converts a send task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_allocate_init(struct doca_rdma *rdma, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_receive **task)
This method allocates and initializes a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_read_set_conf(struct doca_rdma *rdma, doca_rdma_task_read_completion_cb_t successful_task_completion_cb, doca_rdma_task_read_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the read tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_export(struct doca_rdma *rdma, const void **local_rdma_conn_details, size_t *local_rdma_conn_details_size, struct doca_rdma_connection **rdma_connection)
Export doca_rdma connection details object The doca_rdma_conn_details are used in doca_rdma_connect()...
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_read_as_task(struct doca_rdma_task_read *task)
This method converts a read task to a doca_task.
DOCA_EXPERIMENTAL void doca_rdma_task_read_set_src_buf(struct doca_rdma_task_read *task, const struct doca_buf *src_buf)
This method sets the source buffer of a read task.
void(* doca_rdma_task_read_completion_cb_t)(struct doca_rdma_task_read *task, union doca_data task_user_data, union doca_data ctx_user_data)
Function to execute on completion of a read task.
Definition: doca_rdma.h:1987
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, union doca_data user_data, struct doca_rdma_task_send **task)
This method allocates and initializes a send task.
DOCA_EXPERIMENTAL struct doca_ctx * doca_rdma_as_ctx(struct doca_rdma *rdma)
Convert doca_rdma instance into a generalized context for use with doca core objects.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_connect(struct doca_rdma *rdma, const void *remote_rdma_conn_details, size_t remote_rdma_conn_details_size, struct doca_rdma_connection *rdma_connection)
Connect to remote doca_rdma peer. Can only be called when the ctx is in DOCA_CTX_STATE_STARTING state...
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_write_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_write **task)
This method allocates and initializes a write task.
void(* doca_rdma_task_write_completion_cb_t)(struct doca_rdma_task_write *task, union doca_data task_user_data, union doca_data ctx_user_data)
Function to execute on completion of a write task.
Definition: doca_rdma.h:2203
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
Definition: doca_types.h:83
@ DOCA_ACCESS_FLAG_RDMA_READ
Definition: doca_types.h:84
@ DOCA_ACCESS_FLAG_RDMA_WRITE
Definition: doca_types.h:85
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
Definition: doca_version.h:90
const struct ip_frag_config * cfg
Definition: ip_frag_dp.c:0
type value
std::string to_string(storage::control::message_type type)
std::unique_ptr< storage::control::channel > make_tcp_server_control_channel(uint16_t listen_port)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
Definition: os_utils.cpp:53
void uninstall_ctrl_c_handler()
Definition: os_utils.cpp:121
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
Definition: doca_utils.cpp:146
static constexpr value_requirement optional_value
Definition: doca_utils.hpp:228
void create_doca_logger_backend(void) noexcept
Definition: doca_utils.cpp:471
void install_ctrl_c_handler(std::function< void(void)> callback)
Definition: os_utils.cpp:106
bool file_has_binary_content_header(std::string const &file_name)
void aligned_free(void *memory)
Definition: os_utils.cpp:131
doca_rdma * make_rdma_context(doca_dev *dev, doca_pe *pe, doca_data ctx_user_data, uint32_t permissions)
Definition: doca_utils.cpp:331
static constexpr value_multiplicity multiple_values
Definition: doca_utils.hpp:231
std::vector< uint8_t > load_file_bytes(std::string const &file_name)
Definition: file_utils.cpp:58
void * aligned_alloc(size_t alignment, size_t size)
Definition: os_utils.cpp:126
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
Definition: doca_utils.cpp:413
size_t aligned_size(size_t alignment, size_t size)
char * get_buffer_bytes(doca_buf *buf) noexcept
Definition: doca_utils.hpp:260
static constexpr value_multiplicity single_value
Definition: doca_utils.hpp:230
constexpr size_t size_of_io_message
Definition: io_message.hpp:53
constexpr uint32_t cache_line_size
Definition: definitions.hpp:40
std::string io_message_to_string(char const *buf)
Definition: io_message.cpp:50
static constexpr value_requirement required_value
Definition: doca_utils.hpp:227
uint32_t get_system_page_size(void)
Definition: os_utils.cpp:95
storage::binary_content load_binary_content_from_file(std::string const &file_name)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
Definition: doca_utils.cpp:369
doca_dev * open_device(std::string const &identifier)
Definition: doca_utils.cpp:43
#define false
Definition: stdbool.h:22
storage::control::correlation_id correlation_id
storage::control::message_id message_id
std::unique_ptr< storage::control::message::payload > payload
int main(int argc, char **argv)
DOCA_LOG_REGISTER(TARGET_RDMA)
Convenience type for representing opaque data.
Definition: doca_types.h:56
void * ptr
Definition: doca_types.h:57