NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
comch_to_rdma_zero_copy.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2025 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include <atomic>
27 #include <chrono>
28 #include <cstdio>
29 #include <memory>
30 #include <numeric>
31 #include <stdexcept>
32 #include <sstream>
33 #include <thread>
34 
35 #include <doca_argp.h>
36 #include <doca_buf.h>
37 #include <doca_buf_inventory.h>
38 #include <doca_comch.h>
39 #include <doca_comch_consumer.h>
40 #include <doca_comch_producer.h>
41 #include <doca_ctx.h>
42 #include <doca_dev.h>
43 #include <doca_error.h>
44 #include <doca_log.h>
45 #include <doca_mmap.h>
46 #include <doca_pe.h>
47 #include <doca_version.h>
48 
58 
59 DOCA_LOG_REGISTER(ZERO_COPY);
60 
61 using namespace std::string_literals;
62 
63 namespace {
64 
65 auto constexpr app_name = "doca_storage_comch_to_rdma_zero_copy";
66 
67 auto constexpr default_control_timeout_seconds = std::chrono::seconds{5};
68 auto constexpr default_command_channel_name = "doca_storage_comch";
69 
70 static_assert(sizeof(void *) == 8, "Expected a pointer to occupy 8 bytes");
71 
72 struct zero_copy_app_configuration {
73  std::vector<uint32_t> cpu_set = {};
74  std::string device_id = {};
75  std::string representor_id = {};
76  std::string command_channel_name = {};
77  std::chrono::seconds control_timeout = {};
78  storage::ip_address storage_server_address = {};
79 };
80 
81 struct thread_stats {
82  uint32_t core_idx = 0;
83  uint64_t pe_hit_count = 0;
84  uint64_t pe_miss_count = 0;
85  uint64_t operation_count = 0;
86 };
87 
88 class zero_copy_app_worker {
89 public:
90  struct alignas(storage::cache_line_size) hot_data {
91  doca_pe *pe;
92  uint64_t pe_hit_count;
93  uint64_t pe_miss_count;
94  uint64_t completed_transaction_count;
95  uint32_t in_flight_transaction_count;
96  uint32_t core_idx;
97  uint8_t batch_count;
98  uint8_t batch_size;
99  std::atomic_bool run_flag;
100  bool error_flag;
101 
102  hot_data();
103  hot_data(hot_data const &other) = delete;
104  hot_data(hot_data &&other) noexcept;
105  hot_data &operator=(hot_data const &other) = delete;
106  hot_data &operator=(hot_data &&other) noexcept;
107 
108  /*
109  * ComCh post recv helper to control batch submission flags
110  *
111  * @task [in]: Task to submit
112  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
113  */
114  doca_error_t submit_comch_recv_task(doca_comch_consumer_task_post_recv *task);
115  };
116  static_assert(sizeof(zero_copy_app_worker::hot_data) == storage::cache_line_size,
117  "Expected thread_context::hot_data to occupy one cache line");
118 
119  ~zero_copy_app_worker();
120  zero_copy_app_worker() = delete;
121  zero_copy_app_worker(doca_dev *dev, doca_comch_connection *comch_conn, uint32_t task_count, uint32_t batch_size);
122  zero_copy_app_worker(zero_copy_app_worker const &) = delete;
123  [[maybe_unused]] zero_copy_app_worker(zero_copy_app_worker &&) noexcept;
124  zero_copy_app_worker &operator=(zero_copy_app_worker const &) = delete;
125  [[maybe_unused]] zero_copy_app_worker &operator=(zero_copy_app_worker &&) noexcept;
126 
127  std::vector<uint8_t> get_local_rdma_connection_blob(storage::control::rdma_connection_role role);
128  void connect_rdma(storage::control::rdma_connection_role role, std::vector<uint8_t> const &blob);
129  doca_error_t get_connections_state() const noexcept;
130  void stop_processing(void) noexcept;
131  void destroy_comch_objects(void) noexcept;
132 
133  void create_tasks(uint32_t task_count, uint32_t batch_size, uint32_t remote_consumer_id);
134 
135  /*
136  * Prepare thread proc
137  * @core_id [in]: Core to run on
138  */
139  void prepare_thread_proc(uint32_t core_id);
140  void start_thread_proc();
141  [[nodiscard]] hot_data const &get_hot_data() const noexcept;
142 
143 private:
144  hot_data m_hot_data;
145  uint8_t *m_io_message_region;
146  doca_mmap *m_io_message_mmap;
147  doca_buf_inventory *m_io_message_inv;
148  std::vector<doca_buf *> m_io_message_bufs;
149  doca_comch_consumer *m_consumer;
150  doca_comch_producer *m_producer;
151  storage::rdma_conn_pair m_rdma_ctrl_ctx;
152  storage::rdma_conn_pair m_rdma_data_ctx;
153  std::vector<doca_comch_consumer_task_post_recv *> m_host_request_tasks;
154  std::vector<doca_comch_producer_task_send *> m_host_response_tasks;
155  std::vector<doca_rdma_task_send *> m_storage_request_tasks;
156  std::vector<doca_rdma_task_receive *> m_storage_response_tasks;
157  std::thread m_thread;
158 
159  void init(doca_dev *dev, doca_comch_connection *comch_conn, uint32_t task_count, uint32_t batch_size);
160  void cleanup(void) noexcept;
161 
162  static void doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
163  doca_data task_user_data,
164  doca_data ctx_user_data) noexcept;
165  static void doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
166  doca_data task_user_data,
167  doca_data ctx_user_data) noexcept;
168  static void doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
169  doca_data task_user_data,
170  doca_data ctx_user_data) noexcept;
171  static void doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
172  doca_data task_user_data,
173  doca_data ctx_user_data) noexcept;
174  static void doca_rdma_task_send_cb(doca_rdma_task_send *task,
175  doca_data task_user_data,
176  doca_data ctx_user_data) noexcept;
177  static void doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
178  doca_data task_user_data,
179  doca_data ctx_user_data) noexcept;
180  static void doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
181  doca_data task_user_data,
182  doca_data ctx_user_data) noexcept;
183  static void doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
184  doca_data task_user_data,
185  doca_data ctx_user_data) noexcept;
186  void thread_proc();
187 };
188 
189 class zero_copy_app {
190 public:
191  ~zero_copy_app();
192  zero_copy_app() = delete;
193  explicit zero_copy_app(zero_copy_app_configuration const &cfg);
194  zero_copy_app(zero_copy_app const &) = delete;
195  zero_copy_app(zero_copy_app &&) noexcept = delete;
196  zero_copy_app &operator=(zero_copy_app const &) = delete;
197  zero_copy_app &operator=(zero_copy_app &&) noexcept = delete;
198 
199  void abort(std::string const &reason);
200 
201  void connect_to_storage(void);
202  void wait_for_comch_client_connection(void);
203  void wait_for_and_process_query_storage(void);
204  void wait_for_and_process_init_storage(void);
205  void wait_for_and_process_start_storage(void);
206  void wait_for_and_process_stop_storage(void);
207  void wait_for_and_process_shutdown(void);
208  void display_stats(void) const;
209 
210 private:
211  zero_copy_app_configuration const m_cfg;
212  doca_dev *m_dev;
213  doca_dev_rep *m_dev_rep;
214  doca_mmap *m_remote_io_mmap;
215  std::unique_ptr<storage::control::comch_channel> m_client_control_channel;
216  std::unique_ptr<storage::control::channel> m_storage_control_channel;
217  std::vector<storage::control::channel *> m_ctrl_channels;
218  std::vector<storage::control::message> m_ctrl_messages;
219  std::vector<uint32_t> m_remote_consumer_ids;
220  zero_copy_app_worker *m_workers;
221  std::vector<thread_stats> m_stats;
222  uint64_t m_storage_capacity;
223  uint32_t m_storage_block_size;
224  uint32_t m_message_id_counter;
225  uint32_t m_task_count;
226  uint32_t m_batch_size;
227  uint32_t m_core_count;
228  bool m_abort_flag;
229 
230  static void new_comch_consumer_callback(void *user_data, uint32_t id) noexcept;
231  static void expired_comch_consumer_callback(void *user_data, uint32_t id) noexcept;
232 
233  storage::control::message wait_for_control_message();
234  storage::control::message wait_for_control_message(storage::control::message_id mid,
235  std::chrono::seconds timeout);
236 
237  storage::control::message process_query_storage(storage::control::message const &client_request);
238  storage::control::message process_init_storage(storage::control::message const &client_request);
239  storage::control::message process_start_storage(storage::control::message const &client_request);
240  storage::control::message process_stop_storage(storage::control::message const &client_request);
241  storage::control::message process_shutdown(storage::control::message const &client_requeste);
242 
243  void prepare_thread_contexts(storage::control::correlation_id cid);
244 
245  void connect_rdma(uint32_t thread_idx,
246  storage::control::rdma_connection_role role,
247  storage::control::correlation_id cid);
248 
249  void verify_connections_are_ready(void);
250 
251  void destroy_workers(void) noexcept;
252 };
253 
254 /*
255  * Parse command line arguments
256  *
257  * @argc [in]: Number of arguments
258  * @argv [in]: Array of argument values
259  * @return: Parsed zero_copy_app_configuration
260  *
261  * @throws: storage::runtime_error If the zero_copy_app_configuration cannot pe parsed or contains invalid values
262  */
263 zero_copy_app_configuration parse_cli_args(int argc, char **argv);
264 
265 } // namespace
266 
267 /*
268  * Main
269  *
270  * @argc [in]: Number of arguments
271  * @argv [in]: Array of argument values
272  * @return: EXIT_SUCCESS on success and EXIT_FAILURE otherwise
273  */
274 int main(int argc, char **argv)
275 {
277 
278  printf("%s: v%s\n", app_name, doca_version());
279 
280  try {
281  zero_copy_app app{parse_cli_args(argc, argv)};
283  app.abort("User requested abort");
284  });
285 
286  app.connect_to_storage();
287  app.wait_for_comch_client_connection();
288  app.wait_for_and_process_query_storage();
289  app.wait_for_and_process_init_storage();
290  app.wait_for_and_process_start_storage();
291  app.wait_for_and_process_stop_storage();
292  app.wait_for_and_process_shutdown();
293  app.display_stats();
294  } catch (std::exception const &ex) {
295  fprintf(stderr, "EXCEPTION: %s\n", ex.what());
296  fflush(stdout);
297  fflush(stderr);
298  return EXIT_FAILURE;
299  }
300 
302 
303  return EXIT_SUCCESS;
304 }
305 
306 namespace {
307 
308 /*
309  * Print the parsed zero_copy_app_configuration
310  *
311  * @cfg [in]: zero_copy_app_configuration to display
312  */
313 void print_config(zero_copy_app_configuration const &cfg) noexcept
314 {
315  printf("zero_copy_app_configuration: {\n");
316  printf("\tcpu_set : [");
317  bool first = true;
318  for (auto cpu : cfg.cpu_set) {
319  if (first)
320  first = false;
321  else
322  printf(", ");
323  printf("%u", cpu);
324  }
325  printf("]\n");
326  printf("\tdevice : \"%s\",\n", cfg.device_id.c_str());
327  printf("\trepresentor : \"%s\",\n", cfg.representor_id.c_str());
328  printf("\tcommand_channel_name : \"%s\",\n", cfg.command_channel_name.c_str());
329  printf("\tcontrol_timeout : %u,\n", static_cast<uint32_t>(cfg.control_timeout.count()));
330  printf("\tstorage_server : %s:%u\n",
331  cfg.storage_server_address.get_address().c_str(),
332  cfg.storage_server_address.get_port());
333  printf("}\n");
334 }
335 
336 /*
337  * Validate zero_copy_app_configuration
338  *
339  * @cfg [in]: zero_copy_app_configuration
340  */
341 void validate_zero_copy_app_configuration(zero_copy_app_configuration const &cfg)
342 {
343  std::vector<std::string> errors;
344 
345  if (cfg.control_timeout.count() == 0) {
346  errors.emplace_back("Invalid zero_copy_app_configuration: control-timeout must not be zero");
347  }
348 
349  if (!errors.empty()) {
350  for (auto const &err : errors) {
351  printf("%s\n", err.c_str());
352  }
353  throw storage::runtime_error{DOCA_ERROR_INVALID_VALUE, "Invalid zero_copy_app_configuration detected"};
354  }
355 }
356 
357 /*
358  * Parse command line arguments
359  *
360  * @argc [in]: Number of arguments
361  * @argv [in]: Array of argument values
362  * @return: Parsed zero_copy_app_configuration
363  *
364  * @throws: storage::runtime_error If the zero_copy_app_configuration cannot pe parsed or contains invalid values
365  */
366 zero_copy_app_configuration parse_cli_args(int argc, char **argv)
367 {
368  zero_copy_app_configuration config{};
369  config.command_channel_name = default_command_channel_name;
370  config.control_timeout = default_control_timeout_seconds;
371 
372  doca_error_t ret;
373 
374  ret = doca_argp_init(app_name, &config);
375  if (ret != DOCA_SUCCESS) {
376  throw storage::runtime_error{ret, "Failed to parse CLI args"};
377  }
378 
380  "d",
381  "device",
382  "Device identifier",
385  [](void *value, void *cfg) noexcept {
386  static_cast<zero_copy_app_configuration *>(cfg)->device_id =
387  static_cast<char const *>(value);
388  return DOCA_SUCCESS;
389  });
391  "r",
392  "representor",
393  "Device host side representor identifier",
396  [](void *value, void *cfg) noexcept {
397  static_cast<zero_copy_app_configuration *>(cfg)->representor_id =
398  static_cast<char const *>(value);
399  return DOCA_SUCCESS;
400  });
402  nullptr,
403  "cpu",
404  "CPU core to which the process affinity can be set",
407  [](void *value, void *cfg) noexcept {
408  static_cast<zero_copy_app_configuration *>(cfg)->cpu_set.push_back(
409  *static_cast<int *>(value));
410  return DOCA_SUCCESS;
411  });
414  nullptr,
415  "storage-server",
416  "Storage server addresses in <ip_addr>:<port> format",
419  [](void *value, void *cfg) noexcept {
420  try {
421  static_cast<zero_copy_app_configuration *>(cfg)->storage_server_address =
422  storage::parse_ip_v4_address(static_cast<char const *>(value));
423  return DOCA_SUCCESS;
424  } catch (storage::runtime_error const &ex) {
426  }
427  });
430  nullptr,
431  "command-channel-name",
432  "Name of the channel used by the doca_comch_client. Default: \"doca_storage_comch\"",
435  [](void *value, void *cfg) noexcept {
436  static_cast<zero_copy_app_configuration *>(cfg)->command_channel_name =
437  static_cast<char const *>(value);
438  return DOCA_SUCCESS;
439  });
441  nullptr,
442  "control-timeout",
443  "Time (in seconds) to wait while performing control operations. Default: 5",
446  [](void *value, void *cfg) noexcept {
447  static_cast<zero_copy_app_configuration *>(cfg)->control_timeout =
448  std::chrono::seconds{*static_cast<int *>(value)};
449  return DOCA_SUCCESS;
450  });
451  ret = doca_argp_start(argc, argv);
452  if (ret != DOCA_SUCCESS) {
453  throw storage::runtime_error{ret, "Failed to parse CLI args"};
454  }
455 
456  static_cast<void>(doca_argp_destroy());
457 
458  print_config(config);
459  validate_zero_copy_app_configuration(config);
460 
461  return config;
462 }
463 
464 zero_copy_app_worker::hot_data::hot_data()
465  : pe{nullptr},
466  pe_hit_count{0},
467  pe_miss_count{0},
468  completed_transaction_count{0},
469  in_flight_transaction_count{0},
470  core_idx{0},
471  batch_count{0},
472  batch_size{1},
473  run_flag{false},
474  error_flag{false}
475 {
476 }
477 
478 zero_copy_app_worker::hot_data::hot_data(hot_data &&other) noexcept
479  : pe{other.pe},
480  pe_hit_count{other.pe_hit_count},
481  pe_miss_count{other.pe_miss_count},
482  completed_transaction_count{other.completed_transaction_count},
483  in_flight_transaction_count{other.in_flight_transaction_count},
484  core_idx{other.core_idx},
485  batch_count{other.batch_count},
486  batch_size{other.batch_size},
487  run_flag{other.run_flag.load()},
488  error_flag{other.error_flag}
489 {
490  other.pe = nullptr;
491 }
492 
493 zero_copy_app_worker::hot_data &zero_copy_app_worker::hot_data::operator=(hot_data &&other) noexcept
494 {
495  if (std::addressof(other) == this)
496  return *this;
497 
498  pe = other.pe;
499  pe_hit_count = other.pe_hit_count;
500  pe_miss_count = other.pe_miss_count;
501  completed_transaction_count = other.completed_transaction_count;
502  in_flight_transaction_count = other.in_flight_transaction_count;
503  core_idx = other.core_idx;
504  batch_count = other.batch_count;
505  batch_size = other.batch_size;
506  run_flag = other.run_flag.load();
507  error_flag = other.error_flag;
508 
509  other.pe = nullptr;
510 
511  return *this;
512 }
513 
514 doca_error_t zero_copy_app_worker::hot_data::submit_comch_recv_task(doca_comch_consumer_task_post_recv *task)
515 {
517  if (--batch_count == 0) {
518  submit_flag = DOCA_TASK_SUBMIT_FLAG_FLUSH;
519  batch_count = batch_size;
520  }
521 
523 }
524 
525 zero_copy_app_worker::~zero_copy_app_worker()
526 {
527  if (m_thread.joinable()) {
528  m_hot_data.run_flag = false;
529  m_hot_data.error_flag = true;
530  m_thread.join();
531  }
532  cleanup();
533 }
534 
535 zero_copy_app_worker::zero_copy_app_worker(doca_dev *dev,
536  doca_comch_connection *comch_conn,
537  uint32_t task_count,
538  uint32_t batch_size)
539  : m_hot_data{},
540  m_io_message_region{nullptr},
541  m_io_message_mmap{nullptr},
542  m_io_message_inv{nullptr},
543  m_io_message_bufs{},
544  m_consumer{nullptr},
545  m_producer{nullptr},
546  m_rdma_ctrl_ctx{},
547  m_rdma_data_ctx{},
548  m_host_request_tasks{},
549  m_host_response_tasks{},
550  m_storage_request_tasks{},
551  m_storage_response_tasks{},
552  m_thread{}
553 {
554  try {
555  init(dev, comch_conn, task_count, batch_size);
556  } catch (storage::runtime_error const &) {
557  cleanup();
558  throw;
559  }
560 }
561 
562 zero_copy_app_worker::zero_copy_app_worker(zero_copy_app_worker &&other) noexcept
563  : m_hot_data{std::move(other.m_hot_data)},
564  m_io_message_region{other.m_io_message_region},
565  m_io_message_mmap{other.m_io_message_mmap},
566  m_io_message_inv{other.m_io_message_inv},
567  m_io_message_bufs{std::move(other.m_io_message_bufs)},
568  m_consumer{other.m_consumer},
569  m_producer{other.m_producer},
570  m_rdma_ctrl_ctx{other.m_rdma_ctrl_ctx},
571  m_rdma_data_ctx{other.m_rdma_data_ctx},
572  m_host_request_tasks{std::move(other.m_host_request_tasks)},
573  m_host_response_tasks{std::move(other.m_host_response_tasks)},
574  m_storage_request_tasks{std::move(other.m_storage_request_tasks)},
575  m_storage_response_tasks{std::move(other.m_storage_response_tasks)},
576  m_thread{std::move(other.m_thread)}
577 {
578  other.m_io_message_region = nullptr;
579  other.m_io_message_mmap = nullptr;
580  other.m_io_message_inv = nullptr;
581  other.m_consumer = nullptr;
582  other.m_producer = nullptr;
583  other.m_rdma_ctrl_ctx = {};
584  other.m_rdma_data_ctx = {};
585 }
586 
587 zero_copy_app_worker &zero_copy_app_worker::operator=(zero_copy_app_worker &&other) noexcept
588 {
589  if (std::addressof(other) == this)
590  return *this;
591 
592  m_hot_data = std::move(other.m_hot_data);
593  m_io_message_region = other.m_io_message_region;
594  m_io_message_mmap = other.m_io_message_mmap;
595  m_io_message_inv = other.m_io_message_inv;
596  m_io_message_bufs = std::move(other.m_io_message_bufs);
597  m_consumer = other.m_consumer;
598  m_producer = other.m_producer;
599  m_rdma_ctrl_ctx = other.m_rdma_ctrl_ctx;
600  m_rdma_data_ctx = other.m_rdma_data_ctx;
601  m_host_request_tasks = std::move(other.m_host_request_tasks);
602  m_host_response_tasks = std::move(other.m_host_response_tasks);
603  m_storage_request_tasks = std::move(other.m_storage_request_tasks);
604  m_storage_response_tasks = std::move(other.m_storage_response_tasks);
605  m_thread = std::move(other.m_thread);
606 
607  other.m_io_message_region = nullptr;
608  other.m_io_message_mmap = nullptr;
609  other.m_io_message_inv = nullptr;
610  other.m_consumer = nullptr;
611  other.m_producer = nullptr;
612  other.m_rdma_ctrl_ctx = {};
613  other.m_rdma_data_ctx = {};
614 
615  return *this;
616 }
617 
618 std::vector<uint8_t> zero_copy_app_worker::get_local_rdma_connection_blob(storage::control::rdma_connection_role role)
619 {
620  doca_error_t ret;
621  uint8_t const *blob = nullptr;
622  size_t blob_size = 0;
623 
624  auto &rdma_pair = role == storage::control::rdma_connection_role::io_data ? m_rdma_data_ctx : m_rdma_ctrl_ctx;
625  ret = doca_rdma_export(rdma_pair.rdma,
626  reinterpret_cast<void const **>(&blob),
627  &blob_size,
628  std::addressof(rdma_pair.conn));
629  if (ret != DOCA_SUCCESS) {
630  DOCA_LOG_ERR("Core: %u RDMA export failed: %s", m_hot_data.core_idx, doca_error_get_name(ret));
631  throw storage::runtime_error{ret, "Failed to export rdma connection"};
632  }
633 
634  return std::vector<uint8_t>{blob, blob + blob_size};
635 }
636 
637 void zero_copy_app_worker::connect_rdma(storage::control::rdma_connection_role role, std::vector<uint8_t> const &blob)
638 {
639  auto &rdma_pair = role == storage::control::rdma_connection_role::io_data ? m_rdma_data_ctx : m_rdma_ctrl_ctx;
640 
641  doca_error_t ret;
642  ret = doca_rdma_connect(rdma_pair.rdma, blob.data(), blob.size(), rdma_pair.conn);
643  if (ret != DOCA_SUCCESS) {
644  DOCA_LOG_ERR("Core: %u RDMA connect failed: %s", m_hot_data.core_idx, doca_error_get_name(ret));
645  throw storage::runtime_error{ret, "Failed to connect to rdma"};
646  }
647 }
648 
649 doca_error_t zero_copy_app_worker::get_connections_state() const noexcept
650 {
651  doca_error_t ret;
652  doca_ctx_states ctx_state;
653  uint32_t pending_count = 0;
654 
655  ret = doca_ctx_get_state(doca_comch_producer_as_ctx(m_producer), &ctx_state);
656  if (ret != DOCA_SUCCESS) {
657  DOCA_LOG_ERR("Failed to query comch producer state: %s", doca_error_get_name(ret));
658  return ret;
659  }
660 
661  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
662  ++pending_count;
663  static_cast<void>(doca_pe_progress(m_hot_data.pe));
664  }
665 
666  ret = doca_ctx_get_state(doca_comch_consumer_as_ctx(m_consumer), &ctx_state);
667  if (ret != DOCA_SUCCESS) {
668  DOCA_LOG_ERR("Failed to query comch consumer state: %s", doca_error_get_name(ret));
669  return ret;
670  }
671 
672  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
673  ++pending_count;
674  static_cast<void>(doca_pe_progress(m_hot_data.pe));
675  }
676 
677  ret = doca_ctx_get_state(doca_rdma_as_ctx(m_rdma_ctrl_ctx.rdma), &ctx_state);
678  if (ret != DOCA_SUCCESS) {
679  DOCA_LOG_ERR("Failed to query rdma context state: %s", doca_error_get_name(ret));
680  return ret;
681  }
682 
683  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
684  ++pending_count;
685  static_cast<void>(doca_pe_progress(m_hot_data.pe));
686  }
687 
688  ret = doca_ctx_get_state(doca_rdma_as_ctx(m_rdma_data_ctx.rdma), &ctx_state);
689  if (ret != DOCA_SUCCESS) {
690  DOCA_LOG_ERR("Failed to query rdma context state: %s", doca_error_get_name(ret));
691  return ret;
692  }
693 
694  if (ctx_state != DOCA_CTX_STATE_RUNNING) {
695  ++pending_count;
696  static_cast<void>(doca_pe_progress(m_hot_data.pe));
697  }
698 
699  return (pending_count == 0) ? DOCA_SUCCESS : DOCA_ERROR_IN_PROGRESS;
700 }
701 
702 void zero_copy_app_worker::stop_processing(void) noexcept
703 {
704  m_hot_data.run_flag = false;
705  if (m_thread.joinable()) {
706  m_thread.join();
707  }
708 }
709 
710 void zero_copy_app_worker::destroy_comch_objects(void) noexcept
711 {
712  doca_error_t ret;
713  std::vector<doca_task *> tasks;
714 
715  if (m_consumer != nullptr) {
716  tasks.reserve(m_host_request_tasks.size());
717  std::transform(std::begin(m_host_request_tasks),
718  std::end(m_host_request_tasks),
719  std::back_inserter(tasks),
721  ret = storage::stop_context(doca_comch_consumer_as_ctx(m_consumer), m_hot_data.pe, tasks);
722  tasks.clear();
723  if (ret != DOCA_SUCCESS) {
724  DOCA_LOG_ERR("Failed to stop consumer context");
725  } else {
726  m_host_request_tasks.clear();
727  }
728  ret = doca_comch_consumer_destroy(m_consumer);
729  if (ret != DOCA_SUCCESS) {
730  DOCA_LOG_ERR("Failed to destroy consumer context");
731  } else {
732  m_consumer = nullptr;
733  }
734  }
735 
736  if (m_producer != nullptr) {
737  tasks.reserve(m_host_response_tasks.size());
738  std::transform(std::begin(m_host_response_tasks),
739  std::end(m_host_response_tasks),
740  std::back_inserter(tasks),
742  ret = storage::stop_context(doca_comch_producer_as_ctx(m_producer), m_hot_data.pe, tasks);
743  tasks.clear();
744  if (ret != DOCA_SUCCESS) {
745  DOCA_LOG_ERR("Failed to stop producer context");
746  } else {
747  m_host_response_tasks.clear();
748  }
749  ret = doca_comch_producer_destroy(m_producer);
750  if (ret != DOCA_SUCCESS) {
751  DOCA_LOG_ERR("Failed to destroy producer context");
752  } else {
753  m_producer = nullptr;
754  }
755  }
756 }
757 
758 void zero_copy_app_worker::create_tasks(uint32_t task_count, uint32_t batch_size, uint32_t remote_consumer_id)
759 {
760  doca_error_t ret;
761 
762  auto *buf_addr = m_io_message_region;
763  m_io_message_bufs.reserve((task_count * 2) + batch_size);
764  m_host_request_tasks.reserve(task_count + batch_size);
765  m_host_response_tasks.reserve(task_count);
766  m_storage_request_tasks.reserve(task_count + batch_size);
767  m_storage_request_tasks.reserve(task_count);
768 
769  for (uint32_t ii = 0; ii != (task_count + batch_size); ++ii) {
770  doca_buf *storage_request_buff = nullptr;
771 
772  ret = doca_buf_inventory_buf_get_by_addr(m_io_message_inv,
773  m_io_message_mmap,
774  buf_addr,
776  &storage_request_buff);
777  if (ret != DOCA_SUCCESS) {
778  throw storage::runtime_error{ret, "Unable to get io message doca_buf"};
779  }
780 
781  buf_addr += storage::size_of_io_message;
782  m_io_message_bufs.push_back(storage_request_buff);
783 
784  doca_rdma_task_send *rdma_task_send = nullptr;
785  ret = doca_rdma_task_send_allocate_init(m_rdma_ctrl_ctx.rdma,
786  m_rdma_ctrl_ctx.conn,
787  storage_request_buff,
788  doca_data{.ptr = nullptr},
789  &rdma_task_send);
790  if (ret != DOCA_SUCCESS) {
791  throw storage::runtime_error{ret, "Failed to allocate rdma doca_rdma_task_send"};
792  }
793  m_storage_request_tasks.push_back(rdma_task_send);
794 
795  doca_comch_consumer_task_post_recv *comch_consumer_task_post_recv = nullptr;
797  storage_request_buff,
798  &comch_consumer_task_post_recv);
799  if (ret != DOCA_SUCCESS) {
800  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
801  }
802  m_host_request_tasks.push_back(comch_consumer_task_post_recv);
803 
804  /* link task pair - comch recv <-> rdma send */
805  static_cast<void>(doca_task_set_user_data(
806  doca_comch_consumer_task_post_recv_as_task(comch_consumer_task_post_recv),
807  doca_data{.ptr = doca_rdma_task_send_as_task(rdma_task_send)}));
808  static_cast<void>(doca_task_set_user_data(doca_rdma_task_send_as_task(rdma_task_send),
809  doca_data{.ptr = comch_consumer_task_post_recv}));
810  }
811 
812  for (uint32_t ii = 0; ii != task_count; ++ii) {
813  doca_buf *storage_recv_buf = nullptr;
814 
815  ret = doca_buf_inventory_buf_get_by_addr(m_io_message_inv,
816  m_io_message_mmap,
817  buf_addr,
819  &storage_recv_buf);
820  if (ret != DOCA_SUCCESS) {
821  throw storage::runtime_error{ret, "Unable to get io message doca_buf"};
822  }
823  buf_addr += storage::size_of_io_message;
824  m_io_message_bufs.push_back(storage_recv_buf);
825 
826  doca_rdma_task_receive *rdma_task_receive = nullptr;
827  ret = doca_rdma_task_receive_allocate_init(m_rdma_ctrl_ctx.rdma,
828  storage_recv_buf,
829  doca_data{.ptr = nullptr},
830  &rdma_task_receive);
831  if (ret != DOCA_SUCCESS) {
832  throw storage::runtime_error{ret, "Failed to allocate rdma doca_rdma_task_receive"};
833  }
834  m_storage_response_tasks.push_back(rdma_task_receive);
835 
836  doca_comch_producer_task_send *comch_producer_task_send;
838  storage_recv_buf,
839  nullptr,
840  0,
841  remote_consumer_id,
842  &comch_producer_task_send);
843  if (ret != DOCA_SUCCESS) {
844  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
845  }
846  m_host_response_tasks.push_back(comch_producer_task_send);
847 
848  /* link task pair - rdma recv <-> comch send */
849  static_cast<void>(
851  doca_data{.ptr = rdma_task_receive}));
852  static_cast<void>(doca_task_set_user_data(
853  doca_rdma_task_receive_as_task(rdma_task_receive),
854  doca_data{.ptr = doca_comch_producer_task_send_as_task(comch_producer_task_send)}));
855  }
856 }
857 
858 void zero_copy_app_worker::prepare_thread_proc(uint32_t core_id)
859 {
860  m_thread = std::thread{[this]() {
861  try {
862  thread_proc();
863  } catch (std::exception const &ex) {
864  DOCA_LOG_ERR("Core: %u Exception: %s", m_hot_data.core_idx, ex.what());
865  m_hot_data.error_flag = true;
866  m_hot_data.run_flag = false;
867  }
868  }};
869  m_hot_data.core_idx = core_id;
870  storage::set_thread_affinity(m_thread, m_hot_data.core_idx);
871 }
872 
873 void zero_copy_app_worker::start_thread_proc(void)
874 {
875  // Submit initial tasks
876  doca_error_t ret;
877  for (auto *task : m_host_request_tasks) {
879  if (ret != DOCA_SUCCESS) {
880  DOCA_LOG_ERR("Failed to submit initial doca_comch_consumer_task_post_recv task: %s",
881  doca_error_get_name(ret));
882  throw storage::runtime_error{ret, "Failed to submit initial task"};
883  }
884  }
885 
886  for (auto *task : m_storage_response_tasks) {
888  if (ret != DOCA_SUCCESS) {
889  DOCA_LOG_ERR("Failed to submit initial doca_rdma_task_receive task: %s",
890  doca_error_get_name(ret));
891  throw storage::runtime_error{ret, "Failed to submit initial task"};
892  }
893  }
894 
895  m_hot_data.run_flag = true;
896 }
897 
898 zero_copy_app_worker::hot_data const &zero_copy_app_worker::get_hot_data(void) const noexcept
899 {
900  return m_hot_data;
901 }
902 
903 void zero_copy_app_worker::init(doca_dev *dev,
904  doca_comch_connection *comch_conn,
905  uint32_t task_count,
906  uint32_t batch_size)
907 {
908  doca_error_t ret;
909  auto const page_size = storage::get_system_page_size();
910 
911  m_hot_data.batch_size = batch_size;
912  auto const raw_io_messages_size = (task_count + batch_size) * storage::size_of_io_message * 2;
913 
914  DOCA_LOG_DBG("Allocate comch buffers memory (%zu bytes, aligned to %u byte pages)",
915  raw_io_messages_size,
916  page_size);
917  m_io_message_region = static_cast<uint8_t *>(
918  storage::aligned_alloc(page_size, storage::aligned_size(page_size, raw_io_messages_size)));
919  if (m_io_message_region == nullptr) {
920  throw storage::runtime_error{DOCA_ERROR_NO_MEMORY, "Failed to allocate comch fast path buffers memory"};
921  }
922 
923  m_io_message_mmap = storage::make_mmap(dev,
924  reinterpret_cast<char *>(m_io_message_region),
925  raw_io_messages_size,
927 
928  ret = doca_buf_inventory_create((task_count * 2) + batch_size, &m_io_message_inv);
929  if (ret != DOCA_SUCCESS) {
930  throw storage::runtime_error{ret, "Failed to create comch fast path doca_buf_inventory"};
931  }
932 
933  ret = doca_buf_inventory_start(m_io_message_inv);
934  if (ret != DOCA_SUCCESS) {
935  throw storage::runtime_error{ret, "Failed to start comch fast path doca_buf_inventory"};
936  }
937 
938  DOCA_LOG_DBG("Create hot path progress engine");
939  ret = doca_pe_create(std::addressof(m_hot_data.pe));
940  if (ret != DOCA_SUCCESS) {
941  throw storage::runtime_error{ret, "Failed to create doca_pe"};
942  }
943 
944  m_consumer = storage::make_comch_consumer(comch_conn,
945  m_io_message_mmap,
946  m_hot_data.pe,
947  task_count + batch_size,
948  doca_data{.ptr = std::addressof(m_hot_data)},
949  doca_comch_consumer_task_post_recv_cb,
950  doca_comch_consumer_task_post_recv_error_cb);
951 
952  m_producer = storage::make_comch_producer(comch_conn,
953  m_hot_data.pe,
954  task_count,
955  doca_data{.ptr = std::addressof(m_hot_data)},
956  doca_comch_producer_task_send_cb,
957  doca_comch_producer_task_send_error_cb);
958  auto const rdma_permissions = DOCA_ACCESS_FLAG_LOCAL_READ_WRITE | DOCA_ACCESS_FLAG_RDMA_READ |
960 
961  m_rdma_ctrl_ctx.rdma = storage::make_rdma_context(dev,
962  m_hot_data.pe,
963  doca_data{.ptr = std::addressof(m_hot_data)},
964  rdma_permissions);
965 
966  ret = doca_rdma_task_receive_set_conf(m_rdma_ctrl_ctx.rdma,
967  doca_rdma_task_receive_cb,
968  doca_rdma_task_receive_error_cb,
969  task_count);
970  if (ret != DOCA_SUCCESS) {
971  throw storage::runtime_error{ret, "Failed to configure rdma receive task pool"};
972  }
973 
974  ret = doca_rdma_task_send_set_conf(m_rdma_ctrl_ctx.rdma,
975  doca_rdma_task_send_cb,
976  doca_rdma_task_send_error_cb,
977  task_count + batch_size);
978  if (ret != DOCA_SUCCESS) {
979  throw storage::runtime_error{ret, "Failed to configure rdma send task pool"};
980  }
981 
982  ret = doca_ctx_start(doca_rdma_as_ctx(m_rdma_ctrl_ctx.rdma));
983  if (ret != DOCA_SUCCESS) {
984  throw storage::runtime_error{ret, "Failed to start doca_rdma context"};
985  }
986 
987  m_rdma_data_ctx.rdma = storage::make_rdma_context(dev,
988  m_hot_data.pe,
989  doca_data{.ptr = std::addressof(m_hot_data)},
990  rdma_permissions);
991 
992  ret = doca_ctx_start(doca_rdma_as_ctx(m_rdma_data_ctx.rdma));
993  if (ret != DOCA_SUCCESS) {
994  throw storage::runtime_error{ret, "Failed to start doca_rdma context"};
995  }
996 
997  m_hot_data.run_flag = false;
998  m_hot_data.error_flag = false;
999  m_hot_data.pe_hit_count = 0;
1000  m_hot_data.pe_miss_count = 0;
1001  m_hot_data.completed_transaction_count = 0;
1002  m_hot_data.in_flight_transaction_count = 0;
1003 }
1004 
1005 void zero_copy_app_worker::cleanup(void) noexcept
1006 {
1007  doca_error_t ret;
1008  std::vector<doca_task *> tasks;
1009 
1010  if (m_rdma_ctrl_ctx.rdma != nullptr) {
1011  tasks.clear();
1012  tasks.reserve(m_storage_request_tasks.size() + m_storage_response_tasks.size());
1013  std::transform(std::begin(m_storage_request_tasks),
1014  std::end(m_storage_request_tasks),
1015  std::back_inserter(tasks),
1017  std::transform(std::begin(m_storage_response_tasks),
1018  std::end(m_storage_response_tasks),
1019  std::back_inserter(tasks),
1021 
1022  /* stop context with tasks list (tasks must be destroyed to finish stopping process) */
1023  ret = storage::stop_context(doca_rdma_as_ctx(m_rdma_ctrl_ctx.rdma), m_hot_data.pe, tasks);
1024  if (ret != DOCA_SUCCESS) {
1025  DOCA_LOG_ERR("Failed to stop rdma control context: %s", doca_error_get_name(ret));
1026  }
1027 
1028  ret = doca_rdma_destroy(m_rdma_ctrl_ctx.rdma);
1029  if (ret != DOCA_SUCCESS) {
1030  DOCA_LOG_ERR("Failed to destroy rdma control context: %s", doca_error_get_name(ret));
1031  }
1032  }
1033 
1034  if (m_rdma_data_ctx.rdma != nullptr) {
1035  // No tasks allocated on this side for the data context, all tasks are executed from the storage side
1036  ret = doca_ctx_stop(doca_rdma_as_ctx(m_rdma_data_ctx.rdma));
1037  if (ret != DOCA_SUCCESS) {
1038  DOCA_LOG_ERR("Failed to stop rdma data context: %s", doca_error_get_name(ret));
1039  }
1040 
1041  ret = doca_rdma_destroy(m_rdma_data_ctx.rdma);
1042  if (ret != DOCA_SUCCESS) {
1043  DOCA_LOG_ERR("Failed to destroy rdma data context: %s", doca_error_get_name(ret));
1044  }
1045  }
1046 
1047  destroy_comch_objects();
1048 
1049  if (m_hot_data.pe != nullptr) {
1050  ret = doca_pe_destroy(m_hot_data.pe);
1051  if (ret != DOCA_SUCCESS) {
1052  DOCA_LOG_ERR("Failed to destroy progress engine");
1053  }
1054  }
1055 
1056  for (auto *buf : m_io_message_bufs) {
1057  static_cast<void>(doca_buf_dec_refcount(buf, nullptr));
1058  }
1059 
1060  if (m_io_message_inv) {
1061  ret = doca_buf_inventory_stop(m_io_message_inv);
1062  if (ret != DOCA_SUCCESS) {
1063  DOCA_LOG_ERR("Failed to stop buffer inventory");
1064  }
1065  ret = doca_buf_inventory_destroy(m_io_message_inv);
1066  if (ret != DOCA_SUCCESS) {
1067  DOCA_LOG_ERR("Failed to destroy buffer inventory");
1068  }
1069  }
1070 
1071  if (m_io_message_mmap) {
1072  ret = doca_mmap_stop(m_io_message_mmap);
1073  if (ret != DOCA_SUCCESS) {
1074  DOCA_LOG_ERR("Failed to stop mmap");
1075  }
1076  ret = doca_mmap_destroy(m_io_message_mmap);
1077  if (ret != DOCA_SUCCESS) {
1078  DOCA_LOG_ERR("Failed to destroy mmap");
1079  }
1080  }
1081 
1082  if (m_io_message_region != nullptr) {
1083  storage::aligned_free(m_io_message_region);
1084  }
1085 }
1086 
1087 void zero_copy_app_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
1088  doca_data task_user_data,
1089  doca_data ctx_user_data) noexcept
1090 {
1091  static_cast<void>(task);
1092  doca_error_t ret;
1093 
1094  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1095 
1096  /*
1097  * Submit send of the data to the storage backend. Note: both tasks share the same doca buf so the data message
1098  * is forwarded verbatim without any action on the users part.
1099  */
1100  ret = doca_task_submit(static_cast<doca_task *>(task_user_data.ptr));
1101  if (ret != DOCA_SUCCESS) {
1102  DOCA_LOG_ERR("Failed to submit doca_rdma_task_send: %s", doca_error_get_name(ret));
1103  hot_data->error_flag = true;
1104  hot_data->run_flag = false;
1105  }
1106 }
1107 
1108 void zero_copy_app_worker::doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
1109  doca_data task_user_data,
1110  doca_data ctx_user_data) noexcept
1111 {
1112  static_cast<void>(task);
1113  static_cast<void>(task_user_data);
1114 
1115  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1116 
1117  if (hot_data->run_flag) {
1118  DOCA_LOG_ERR("Failed to complete doca_comch_consumer_task_post_recv");
1119  hot_data->run_flag = false;
1120  hot_data->error_flag = true;
1121  }
1122 }
1123 
1124 void zero_copy_app_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
1125  doca_data task_user_data,
1126  doca_data ctx_user_data) noexcept
1127 {
1128  static_cast<void>(task);
1129 
1130  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1131 
1132  auto *storage_response_task = static_cast<doca_rdma_task_receive *>(task_user_data.ptr);
1133 
1134  static_cast<void>(doca_buf_reset_data_len(doca_rdma_task_receive_get_dst_buf(storage_response_task)));
1135 
1136  auto ret = doca_task_submit(doca_rdma_task_receive_as_task(storage_response_task));
1137  if (ret != DOCA_SUCCESS) {
1138  DOCA_LOG_ERR("Failed to submit doca_rdma_task_receive: %s", doca_error_get_name(ret));
1139  hot_data->error_flag = true;
1140  hot_data->run_flag = false;
1141  }
1142 
1143  ++(hot_data->completed_transaction_count);
1144 }
1145 
1146 void zero_copy_app_worker::doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
1147  doca_data task_user_data,
1148  doca_data ctx_user_data) noexcept
1149 {
1150  static_cast<void>(task);
1151  static_cast<void>(task_user_data);
1152 
1153  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1154  DOCA_LOG_ERR("Failed to complete doca_comch_producer_task_send");
1155  hot_data->run_flag = false;
1156  hot_data->error_flag = true;
1157 }
1158 
1159 void zero_copy_app_worker::doca_rdma_task_send_cb(doca_rdma_task_send *task,
1160  doca_data task_user_data,
1161  doca_data ctx_user_data) noexcept
1162 {
1163  static_cast<void>(task);
1164 
1165  auto *hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1166 
1167  auto *host_request_task = static_cast<doca_comch_consumer_task_post_recv *>(task_user_data.ptr);
1168 
1169  static_cast<void>(doca_buf_reset_data_len(doca_comch_consumer_task_post_recv_get_buf(host_request_task)));
1170 
1171  auto ret = hot_data->submit_comch_recv_task(host_request_task);
1172  if (ret != DOCA_SUCCESS) {
1173  DOCA_LOG_ERR("Failed to submit doca_comch_consumer_task_post_recv: %s", doca_error_get_name(ret));
1174  hot_data->error_flag = true;
1175  hot_data->run_flag = false;
1176  }
1177 }
1178 
1179 void zero_copy_app_worker::doca_rdma_task_send_error_cb(doca_rdma_task_send *task,
1180  doca_data task_user_data,
1181  doca_data ctx_user_data) noexcept
1182 {
1183  static_cast<void>(task);
1184  static_cast<void>(task_user_data);
1185 
1186  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1187  DOCA_LOG_ERR("Failed to complete doca_rdma_task_send");
1188  hot_data->run_flag = false;
1189  hot_data->error_flag = true;
1190 }
1191 
1192 void zero_copy_app_worker::doca_rdma_task_receive_cb(doca_rdma_task_receive *task,
1193  doca_data task_user_data,
1194  doca_data ctx_user_data) noexcept
1195 {
1196  doca_error_t ret;
1197  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1198 
1199  auto *const io_message = storage::get_buffer_bytes(doca_rdma_task_receive_get_dst_buf(task));
1200 
1203 
1204  do {
1205  ret = doca_task_submit(static_cast<doca_task *>(task_user_data.ptr));
1206  } while (ret == DOCA_ERROR_AGAIN);
1207  if (ret != DOCA_SUCCESS) {
1208  DOCA_LOG_ERR("Failed to submit doca_comch_producer_task_send: %s", doca_error_get_name(ret));
1209  hot_data->run_flag = false;
1210  hot_data->error_flag = true;
1211  }
1212 }
1213 
1214 void zero_copy_app_worker::doca_rdma_task_receive_error_cb(doca_rdma_task_receive *task,
1215  doca_data task_user_data,
1216  doca_data ctx_user_data) noexcept
1217 {
1218  static_cast<void>(task);
1219  static_cast<void>(task_user_data);
1220 
1221  auto *const hot_data = static_cast<zero_copy_app_worker::hot_data *>(ctx_user_data.ptr);
1222  if (hot_data->run_flag) {
1223  /*
1224  * Only consider it a failure when this callback triggers while running. This callback will be triggered
1225  * as part of teardown as the submitted receive tasks that were never filled by requests from the host
1226  * get flushed out.
1227  */
1228  DOCA_LOG_ERR("Failed to complete doca_rdma_task_send");
1229  hot_data->run_flag = false;
1230  hot_data->error_flag = true;
1231  }
1232 }
1233 
1234 void zero_copy_app_worker::thread_proc()
1235 {
1236  while (m_hot_data.run_flag == false) {
1237  std::this_thread::yield();
1238  if (m_hot_data.error_flag)
1239  return;
1240  }
1241 
1242  DOCA_LOG_INFO("Core: %u running", m_hot_data.core_idx);
1243 
1244  while (m_hot_data.run_flag) {
1245  doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1246  }
1247 
1248  while (m_hot_data.error_flag == false && m_hot_data.in_flight_transaction_count != 0) {
1249  doca_pe_progress(m_hot_data.pe) ? ++(m_hot_data.pe_hit_count) : ++(m_hot_data.pe_miss_count);
1250  }
1251 
1252  DOCA_LOG_INFO("Core: %u complete", m_hot_data.core_idx);
1253 }
1254 
1255 zero_copy_app::~zero_copy_app()
1256 {
1257  destroy_workers();
1258  m_storage_control_channel.reset();
1259  m_client_control_channel.reset();
1260 
1261  doca_error_t ret;
1262  if (m_dev != nullptr) {
1263  ret = doca_dev_close(m_dev);
1264  if (ret != DOCA_SUCCESS) {}
1265  }
1266 }
1267 
1268 zero_copy_app::zero_copy_app(zero_copy_app_configuration const &cfg)
1269  : m_cfg{cfg},
1270  m_dev{nullptr},
1271  m_dev_rep{nullptr},
1272  m_remote_io_mmap{nullptr},
1273  m_client_control_channel{},
1274  m_storage_control_channel{},
1275  m_ctrl_channels{},
1276  m_ctrl_messages{},
1277  m_remote_consumer_ids{},
1278  m_workers{nullptr},
1279  m_stats{},
1280  m_storage_capacity{},
1281  m_storage_block_size{},
1282  m_message_id_counter{},
1283  m_task_count{0},
1284  m_batch_size{0},
1285  m_core_count{0},
1286  m_abort_flag{false}
1287 {
1288  DOCA_LOG_INFO("Open doca_dev: %s", m_cfg.device_id.c_str());
1289  m_dev = storage::open_device(m_cfg.device_id);
1290 
1291  DOCA_LOG_INFO("Open doca_dev_rep: %s", m_cfg.representor_id.c_str());
1292  m_dev_rep = storage::open_representor(m_dev, m_cfg.representor_id);
1293 
1294  m_client_control_channel =
1296  m_dev_rep,
1297  m_cfg.command_channel_name.c_str(),
1298  this,
1299  new_comch_consumer_callback,
1300  expired_comch_consumer_callback);
1301 
1302  m_storage_control_channel = storage::control::make_tcp_client_control_channel(m_cfg.storage_server_address);
1303  m_ctrl_channels.reserve(2);
1304  m_ctrl_channels.push_back(m_client_control_channel.get());
1305  m_ctrl_channels.push_back(m_storage_control_channel.get());
1306 }
1307 
1308 void zero_copy_app::abort(std::string const &reason)
1309 {
1310  if (m_abort_flag)
1311  return;
1312 
1313  DOCA_LOG_ERR("Aborted: %s", reason.c_str());
1314  m_abort_flag = true;
1315 }
1316 
1317 void zero_copy_app::connect_to_storage(void)
1318 {
1319  while (!m_storage_control_channel->is_connected()) {
1320  std::this_thread::sleep_for(std::chrono::milliseconds{100});
1321  if (m_abort_flag) {
1323  "Aborted while connecting to storage"};
1324  }
1325  }
1326 }
1327 
1328 void zero_copy_app::wait_for_comch_client_connection(void)
1329 {
1330  while (!m_client_control_channel->is_connected()) {
1331  std::this_thread::sleep_for(std::chrono::milliseconds{100});
1332  if (m_abort_flag) {
1334  "Aborted while connecting to client"};
1335  }
1336  }
1337 }
1338 
1339 void zero_copy_app::wait_for_and_process_query_storage(void)
1340 {
1341  DOCA_LOG_INFO("Wait for query storage...");
1342  auto const client_request = wait_for_control_message();
1343 
1344  doca_error_t err_code;
1345  std::string err_msg;
1346 
1347  if (client_request.message_type == storage::control::message_type::query_storage_request) {
1348  try {
1349  m_client_control_channel->send_message(process_query_storage(client_request));
1350  return;
1351  } catch (storage::runtime_error const &ex) {
1352  err_code = ex.get_doca_error();
1353  err_msg = ex.what();
1354  }
1355  } else {
1356  err_code = DOCA_ERROR_UNEXPECTED;
1357  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1359  }
1360 
1361  m_client_control_channel->send_message({
1363  client_request.message_id,
1364  client_request.correlation_id,
1365  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1366 
1367  });
1368 }
1369 
1370 void zero_copy_app::wait_for_and_process_init_storage(void)
1371 {
1372  DOCA_LOG_INFO("Wait for init storage...");
1373  auto const client_request = wait_for_control_message();
1374 
1375  doca_error_t err_code;
1376  std::string err_msg;
1377 
1378  if (client_request.message_type == storage::control::message_type::init_storage_request) {
1379  try {
1380  m_client_control_channel->send_message(process_init_storage(client_request));
1381  return;
1382  } catch (storage::runtime_error const &ex) {
1383  err_code = ex.get_doca_error();
1384  err_msg = ex.what();
1385  }
1386  } else {
1387  err_code = DOCA_ERROR_UNEXPECTED;
1388  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1390  }
1391 
1392  m_client_control_channel->send_message({
1394  client_request.message_id,
1395  client_request.correlation_id,
1396  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1397 
1398  });
1399 }
1400 
1401 void zero_copy_app::wait_for_and_process_start_storage(void)
1402 {
1403  DOCA_LOG_INFO("Wait for start storage...");
1404  auto const client_request = wait_for_control_message();
1405 
1406  doca_error_t err_code;
1407  std::string err_msg;
1408 
1409  if (client_request.message_type == storage::control::message_type::start_storage_request) {
1410  try {
1411  m_client_control_channel->send_message(process_start_storage(client_request));
1412  return;
1413  } catch (storage::runtime_error const &ex) {
1414  err_code = ex.get_doca_error();
1415  err_msg = ex.what();
1416  }
1417  } else {
1418  err_code = DOCA_ERROR_UNEXPECTED;
1419  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1421  }
1422 
1423  m_client_control_channel->send_message({
1425  client_request.message_id,
1426  client_request.correlation_id,
1427  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1428 
1429  });
1430 }
1431 
1432 void zero_copy_app::wait_for_and_process_stop_storage(void)
1433 {
1434  DOCA_LOG_INFO("Wait for stop storage...");
1435  auto const client_request = wait_for_control_message();
1436 
1437  doca_error_t err_code;
1438  std::string err_msg;
1439 
1440  if (client_request.message_type == storage::control::message_type::stop_storage_request) {
1441  try {
1442  m_client_control_channel->send_message(process_stop_storage(client_request));
1443  return;
1444  } catch (storage::runtime_error const &ex) {
1445  err_code = ex.get_doca_error();
1446  err_msg = ex.what();
1447  }
1448  } else {
1449  err_code = DOCA_ERROR_UNEXPECTED;
1450  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1452  }
1453 
1454  m_client_control_channel->send_message({
1456  client_request.message_id,
1457  client_request.correlation_id,
1458  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1459 
1460  });
1461 }
1462 
1463 void zero_copy_app::wait_for_and_process_shutdown(void)
1464 {
1465  DOCA_LOG_INFO("Wait for shutdown storage...");
1466  auto const client_request = wait_for_control_message();
1467 
1468  doca_error_t err_code;
1469  std::string err_msg;
1470 
1471  if (client_request.message_type == storage::control::message_type::shutdown_request) {
1472  try {
1473  m_client_control_channel->send_message(process_shutdown(client_request));
1474  return;
1475  } catch (storage::runtime_error const &ex) {
1476  err_code = ex.get_doca_error();
1477  err_msg = ex.what();
1478  }
1479  } else {
1480  err_code = DOCA_ERROR_UNEXPECTED;
1481  err_msg = "Unexpected " + to_string(client_request.message_type) + " while expecting a " +
1483  }
1484 
1485  m_client_control_channel->send_message({
1487  client_request.message_id,
1488  client_request.correlation_id,
1489  std::make_unique<storage::control::error_response_payload>(err_code, std::move(err_msg)),
1490 
1491  });
1492 }
1493 
1494 void zero_copy_app::display_stats(void) const
1495 {
1496  for (auto const &stats : m_stats) {
1497  auto const pe_hit_rate_pct =
1498  (static_cast<double>(stats.pe_hit_count) /
1499  (static_cast<double>(stats.pe_hit_count) + static_cast<double>(stats.pe_miss_count))) *
1500  100.;
1501 
1502  printf("+================================================+\n");
1503  printf("| Core: %u\n", stats.core_idx);
1504  printf("| Operation count: %lu\n", stats.operation_count);
1505  printf("| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct, stats.pe_hit_count, stats.pe_miss_count);
1506  }
1507 }
1508 
1509 void zero_copy_app::new_comch_consumer_callback(void *user_data, uint32_t id) noexcept
1510 {
1511  auto *self = reinterpret_cast<zero_copy_app *>(user_data);
1512  if (self->m_remote_consumer_ids.capacity() == 0) {
1513  DOCA_LOG_ERR("[BUG] no space for new remote consumer ids");
1514  return;
1515  }
1516 
1517  auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids), id);
1518  if (found == std::end(self->m_remote_consumer_ids)) {
1519  self->m_remote_consumer_ids.push_back(id);
1520  DOCA_LOG_DBG("Connected to remote consumer with id: %u. Consumer count is now: %zu",
1521  id,
1522  self->m_remote_consumer_ids.size());
1523  } else {
1524  DOCA_LOG_WARN("Ignoring duplicate remote consumer id: %u", id);
1525  }
1526 }
1527 
1528 void zero_copy_app::expired_comch_consumer_callback(void *user_data, uint32_t id) noexcept
1529 {
1530  auto *self = reinterpret_cast<zero_copy_app *>(user_data);
1531  auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids), id);
1532  if (found != std::end(self->m_remote_consumer_ids)) {
1533  self->m_remote_consumer_ids.erase(found);
1534  DOCA_LOG_DBG("Disconnected from remote consumer with id: %u. Consumer count is now: %zu",
1535  id,
1536  self->m_remote_consumer_ids.size());
1537  } else {
1538  DOCA_LOG_WARN("Ignoring disconnect of unexpected remote consumer id: %u", id);
1539  }
1540 }
1541 
1542 storage::control::message zero_copy_app::wait_for_control_message()
1543 {
1544  for (;;) {
1545  if (!m_ctrl_messages.empty()) {
1546  auto msg = std::move(m_ctrl_messages.front());
1547  m_ctrl_messages.erase(m_ctrl_messages.begin());
1548  return msg;
1549  }
1550 
1551  for (auto *channel : m_ctrl_channels) {
1552  // Poll for new messages
1553  auto *msg = channel->poll();
1554  if (msg) {
1555  m_ctrl_messages.push_back(std::move(*msg));
1556  }
1557  }
1558 
1559  if (m_abort_flag) {
1560  throw storage::runtime_error{
1562  "User aborted the zero_copy_application while waiting on a control message"};
1563  }
1564  }
1565 }
1566 
1567 storage::control::message zero_copy_app::wait_for_control_message(storage::control::message_id mid,
1568  std::chrono::seconds timeout)
1569 {
1570  auto const expiry = std::chrono::steady_clock::now() + timeout;
1571  do {
1572  if (m_abort_flag) {
1573  throw storage::runtime_error{
1575  "User aborted the zero_copy_application while waiting on a control message"};
1576  }
1577 
1578  for (auto *channel : m_ctrl_channels) {
1579  // Poll for new messages
1580  auto *msg = channel->poll();
1581  if (msg) {
1582  m_ctrl_messages.push_back(std::move(*msg));
1583  }
1584  }
1585 
1586  auto found =
1587  std::find_if(std::begin(m_ctrl_messages), std::end(m_ctrl_messages), [mid](auto const &msg) {
1588  return msg.message_id.value == mid.value;
1589  });
1590 
1591  if (found != std::end(m_ctrl_messages)) {
1592  auto msg = std::move(*found);
1593  m_ctrl_messages.erase(found);
1594  return msg;
1595  }
1596 
1597  } while (expiry > std::chrono::steady_clock::now());
1598 
1599  throw storage::runtime_error{DOCA_ERROR_TIME_OUT, "Timed out while waiting on a control message"};
1600 }
1601 
1602 storage::control::message zero_copy_app::process_query_storage(storage::control::message const &client_request)
1603 {
1604  DOCA_LOG_DBG("Forward request to storage...");
1605  auto storage_request = storage::control::message{
1607  storage::control::message_id{m_message_id_counter++},
1608  client_request.correlation_id,
1609  {},
1610  };
1611  m_storage_control_channel->send_message(storage_request);
1612 
1613  // Wait for storage response
1614  auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1615 
1616  if (storage_response.message_type == storage::control::message_type::query_storage_response) {
1617  auto const *const storage_details =
1618  dynamic_cast<storage::control::storage_details_payload const *>(storage_response.payload.get());
1619  if (storage_details == nullptr) {
1621  "[BUG] Invalid query_storage_response received"};
1622  }
1623 
1624  m_storage_capacity = storage_details->total_size;
1625  m_storage_block_size = storage_details->block_size;
1626  DOCA_LOG_INFO("Storage reports capacity of: %lu using a block size of: %u",
1627  m_storage_capacity,
1628  m_storage_block_size);
1631  client_request.message_id,
1632  client_request.correlation_id,
1633  std::move(storage_response.payload),
1634  };
1635  } else if (storage_response.message_type == storage::control::message_type::error_response) {
1638  client_request.message_id,
1639  client_request.correlation_id,
1640  std::move(storage_response.payload),
1641  };
1642  } else {
1643  throw storage::runtime_error{
1645  "Unexpected " + to_string(storage_response.message_type) + " while expecting a " +
1647  };
1648  }
1649 }
1650 
1651 storage::control::message zero_copy_app::process_init_storage(storage::control::message const &client_request)
1652 {
1653  auto const *init_storage_details =
1654  reinterpret_cast<storage::control::init_storage_payload const *>(client_request.payload.get());
1655 
1656  if (init_storage_details->core_count > m_cfg.cpu_set.size()) {
1657  throw storage::runtime_error{
1659  "Unable to create " + std::to_string(m_core_count) + " threads as only " +
1660  std::to_string(m_cfg.cpu_set.size()) + " were defined",
1661  };
1662  }
1663 
1664  m_remote_consumer_ids.reserve(init_storage_details->core_count);
1665 
1666  m_task_count = init_storage_details->task_count;
1667  m_batch_size = init_storage_details->batch_size;
1668  m_core_count = init_storage_details->core_count;
1669  m_remote_io_mmap = storage::make_mmap(m_dev,
1670  init_storage_details->mmap_export_blob.data(),
1671  init_storage_details->mmap_export_blob.size());
1672  std::vector<uint8_t> mmap_export_blob = [this]() {
1673  uint8_t const *reexport_blob = nullptr;
1674  size_t reexport_blob_size = 0;
1675  auto const ret = doca_mmap_export_rdma(m_remote_io_mmap,
1676  m_dev,
1677  reinterpret_cast<void const **>(&reexport_blob),
1678  &reexport_blob_size);
1679  if (ret != DOCA_SUCCESS) {
1680  throw storage::runtime_error{ret, "Failed to re-export host mmap for rdma"};
1681  }
1682 
1683  return std::vector<uint8_t>{reexport_blob, reexport_blob + reexport_blob_size};
1684  }();
1685 
1686  DOCA_LOG_INFO("Configured storage: %u cores, %u tasks, %u batch_size", m_core_count, m_task_count, m_batch_size);
1687 
1688  DOCA_LOG_DBG("Forward request to storage...");
1689  auto storage_request = storage::control::message{
1691  storage::control::message_id{m_message_id_counter++},
1692  client_request.correlation_id,
1693  std::make_unique<storage::control::init_storage_payload>(init_storage_details->task_count,
1694  init_storage_details->batch_size,
1695  init_storage_details->core_count,
1696  std::move(mmap_export_blob)),
1697  };
1698  m_storage_control_channel->send_message(storage_request);
1699 
1700  // Wait for storage response
1701  auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1702 
1703  if (storage_response.message_type == storage::control::message_type::init_storage_response) {
1704  DOCA_LOG_DBG("prepare thread contexts...");
1705  prepare_thread_contexts(client_request.correlation_id);
1708  client_request.message_id,
1709  client_request.correlation_id,
1710  std::move(storage_response.payload),
1711  };
1712  } else if (storage_response.message_type == storage::control::message_type::error_response) {
1715  client_request.message_id,
1716  client_request.correlation_id,
1717  std::move(storage_response.payload),
1718  };
1719  } else {
1720  throw storage::runtime_error{
1722  "Unexpected " + to_string(storage_response.message_type) + " while expecting a " +
1724  };
1725  }
1726 }
1727 
1728 storage::control::message zero_copy_app::process_start_storage(storage::control::message const &client_request)
1729 {
1730  DOCA_LOG_DBG("Forward request to storage...");
1731  auto storage_request = storage::control::message{
1733  storage::control::message_id{m_message_id_counter++},
1734  client_request.correlation_id,
1735  {},
1736  };
1737  m_storage_control_channel->send_message(storage_request);
1738 
1739  // Wait for storage response
1740  auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1741 
1742  if (storage_response.message_type == storage::control::message_type::start_storage_response) {
1743  verify_connections_are_ready();
1744  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1745  m_workers[ii].create_tasks(m_task_count, m_batch_size, m_remote_consumer_ids[ii]);
1746  m_workers[ii].start_thread_proc();
1747  }
1750  client_request.message_id,
1751  client_request.correlation_id,
1752  std::move(storage_response.payload),
1753  };
1754  } else if (storage_response.message_type == storage::control::message_type::error_response) {
1757  client_request.message_id,
1758  client_request.correlation_id,
1759  std::move(storage_response.payload),
1760  };
1761  } else {
1762  throw storage::runtime_error{
1764  "Unexpected " + to_string(storage_response.message_type) + " while expecting a " +
1766  };
1767  }
1768 }
1769 
1770 storage::control::message zero_copy_app::process_stop_storage(storage::control::message const &client_request)
1771 {
1772  DOCA_LOG_DBG("Forward request to storage...");
1773  auto storage_request = storage::control::message{
1775  storage::control::message_id{m_message_id_counter++},
1776  client_request.correlation_id,
1777  {},
1778  };
1779  m_storage_control_channel->send_message(storage_request);
1780 
1781  // Wait for storage response
1782  auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1783 
1784  if (storage_response.message_type == storage::control::message_type::stop_storage_response) {
1785  /* Stop all processing */
1786  m_stats.reserve(m_core_count);
1787  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1788  m_workers[ii].stop_processing();
1789  auto const &hot_data = m_workers[ii].get_hot_data();
1790  m_stats.push_back(thread_stats{
1791  m_cfg.cpu_set[ii],
1792  hot_data.pe_hit_count,
1793  hot_data.pe_miss_count,
1794  hot_data.completed_transaction_count,
1795  });
1796  m_workers[ii].destroy_comch_objects();
1797  }
1800  client_request.message_id,
1801  client_request.correlation_id,
1802  std::move(storage_response.payload),
1803  };
1804  } else if (storage_response.message_type == storage::control::message_type::error_response) {
1807  client_request.message_id,
1808  client_request.correlation_id,
1809  std::move(storage_response.payload),
1810  };
1811  } else {
1812  throw storage::runtime_error{
1814  "Unexpected " + to_string(storage_response.message_type) + " while expecting a " +
1816  };
1817  }
1818 }
1819 
1820 storage::control::message zero_copy_app::process_shutdown(storage::control::message const &client_request)
1821 {
1822  /* Wait for all remote comch objects to be destroyed and notified */
1823  while (!m_remote_consumer_ids.empty()) {
1824  auto *msg = m_client_control_channel->poll();
1825  DOCA_LOG_DBG("Ignoring unexpected %s while processing %s",
1826  to_string(msg->message_type).c_str(),
1828  }
1829 
1830  DOCA_LOG_DBG("Forward request to storage...");
1831  auto storage_request = storage::control::message{
1833  storage::control::message_id{m_message_id_counter++},
1834  client_request.correlation_id,
1835  {},
1836  };
1837  m_storage_control_channel->send_message(storage_request);
1838 
1839  // Wait for storage response
1840  auto storage_response = wait_for_control_message(storage_request.message_id, default_control_timeout_seconds);
1841 
1842  if (storage_response.message_type == storage::control::message_type::shutdown_response) {
1843  destroy_workers();
1846  client_request.message_id,
1847  client_request.correlation_id,
1848  std::move(storage_response.payload),
1849  };
1850  } else if (storage_response.message_type == storage::control::message_type::error_response) {
1853  client_request.message_id,
1854  client_request.correlation_id,
1855  std::move(storage_response.payload),
1856  };
1857  } else {
1858  throw storage::runtime_error{
1860  "Unexpected " + to_string(storage_response.message_type) + " while expecting a " +
1862  };
1863  }
1864 }
1865 
1866 void zero_copy_app::prepare_thread_contexts(storage::control::correlation_id cid)
1867 {
1869  m_core_count,
1870  m_dev,
1871  m_client_control_channel->get_comch_connection(),
1872  m_task_count,
1873  m_batch_size);
1874 
1875  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1876  connect_rdma(ii, storage::control::rdma_connection_role::io_data, cid);
1878  m_workers[ii].prepare_thread_proc(m_cfg.cpu_set[ii]);
1879  }
1880 }
1881 
1882 void zero_copy_app::connect_rdma(uint32_t thread_idx,
1885 {
1886  auto &tctx = m_workers[thread_idx];
1887  storage::control::message connect_rdma_request{
1889  storage::control::message_id{m_message_id_counter++},
1890  cid,
1891  std::make_unique<storage::control::rdma_connection_details_payload>(
1892  thread_idx,
1893  role,
1894  tctx.get_local_rdma_connection_blob(role)),
1895  };
1896 
1897  m_storage_control_channel->send_message(connect_rdma_request);
1898 
1899  // Wait for storage response
1900  auto connect_rdma_response =
1901  wait_for_control_message(connect_rdma_request.message_id, default_control_timeout_seconds);
1902 
1903  if (connect_rdma_response.message_type == storage::control::message_type::create_rdma_connection_response) {
1904  auto *remote_details = reinterpret_cast<storage::control::rdma_connection_details_payload const *>(
1905  connect_rdma_response.payload.get());
1906  tctx.connect_rdma(role, remote_details->connection_details);
1907  } else if (connect_rdma_response.message_type == storage::control::message_type::error_response) {
1908  auto *error_details = reinterpret_cast<storage::control::error_response_payload const *>(
1909  connect_rdma_response.payload.get());
1910  throw storage::runtime_error{error_details->error_code, error_details->message};
1911  } else {
1912  throw storage::runtime_error{
1914  "Unexpected " + to_string(connect_rdma_response.message_type) + " while expecting a " +
1916  };
1917  }
1918 }
1919 
1920 void zero_copy_app::verify_connections_are_ready(void)
1921 {
1922  uint32_t not_ready_count;
1923 
1924  do {
1925  not_ready_count = 0;
1926  if (m_remote_consumer_ids.size() != m_core_count) {
1927  ++not_ready_count;
1928  auto *msg = m_client_control_channel->poll();
1929  if (msg != nullptr) {
1930  throw storage::runtime_error{
1932  "Unexpected " + to_string(msg->message_type) + " while processing " +
1934  };
1935  }
1936  }
1937 
1938  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1939  auto const ret = m_workers[ii].get_connections_state();
1940  if (ret == DOCA_ERROR_IN_PROGRESS) {
1941  ++not_ready_count;
1942  } else if (ret != DOCA_SUCCESS) {
1943  throw storage::runtime_error{ret, "Failure while establishing RDMA connections"};
1944  }
1945  }
1946 
1947  if (m_abort_flag) {
1949  "Aborted while establishing storage connections"};
1950  }
1951  } while (not_ready_count != 0);
1952 }
1953 
1954 void zero_copy_app::destroy_workers(void) noexcept
1955 {
1956  if (m_workers != nullptr) {
1957  // Destroy all thread resources
1958  for (uint32_t ii = 0; ii != m_core_count; ++ii) {
1959  m_workers[ii].~zero_copy_app_worker();
1960  }
1961  storage::aligned_free(m_workers);
1962  m_workers = nullptr;
1963  }
1964 }
1965 
1966 } /* namespace */
static void cleanup(struct cache_invalidate_sample_state *state)
static void set_type(io_message_type type, char *buf)
Definition: io_message.hpp:79
static void set_result(doca_error_t result, char *buf)
Definition: io_message.hpp:152
T * object_array(size_t object_count, Args &&...args) const
Definition: aligned_new.hpp:88
doca_error_t get_doca_error() const noexcept
Definition: definitions.hpp:81
int main(int argc, char **argv)
DOCA_LOG_REGISTER(ZERO_COPY)
static struct eth_l2_fwd_stats stats
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
@ DOCA_ARGP_TYPE_STRING
Definition: doca_argp.h:56
@ DOCA_ARGP_TYPE_INT
Definition: doca_argp.h:57
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE struct doca_buf * doca_comch_consumer_task_post_recv_get_buf(const struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE doca_error_t doca_comch_consumer_destroy(struct doca_comch_consumer *consumer)
DOCA_STABLE doca_error_t doca_comch_consumer_task_post_recv_alloc_init(struct doca_comch_consumer *consumer, struct doca_buf *buf, struct doca_comch_consumer_task_post_recv **task)
Allocate and initialize a doca_consumer post receive task.
DOCA_STABLE struct doca_ctx * doca_comch_consumer_as_ctx(struct doca_comch_consumer *consumer)
DOCA_STABLE struct doca_task * doca_comch_consumer_task_post_recv_as_task(struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE struct doca_task * doca_comch_producer_task_send_as_task(struct doca_comch_producer_task_send *task)
DOCA_STABLE doca_error_t doca_comch_producer_destroy(struct doca_comch_producer *producer)
DOCA_STABLE struct doca_ctx * doca_comch_producer_as_ctx(struct doca_comch_producer *producer)
DOCA_STABLE doca_error_t doca_comch_producer_task_send_alloc_init(struct doca_comch_producer *producer, const struct doca_buf *buf, uint8_t *imm_data, uint32_t imm_data_len, uint32_t consumer_id, struct doca_comch_producer_task_send **task)
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
DOCA_STABLE doca_error_t doca_ctx_stop(struct doca_ctx *ctx)
Stops the context allowing reconfiguration.
doca_ctx_states
This enum defines the states of a context.
Definition: doca_ctx.h:83
@ DOCA_CTX_STATE_RUNNING
Definition: doca_ctx.h:98
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_TIME_OUT
Definition: doca_error.h:47
@ DOCA_ERROR_CONNECTION_ABORTED
Definition: doca_error.h:50
@ DOCA_ERROR_INVALID_VALUE
Definition: doca_error.h:44
@ DOCA_ERROR_INITIALIZATION
Definition: doca_error.h:46
@ DOCA_ERROR_UNEXPECTED
Definition: doca_error.h:60
@ DOCA_ERROR_AGAIN
Definition: doca_error.h:43
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_NO_MEMORY
Definition: doca_error.h:45
@ DOCA_ERROR_IN_PROGRESS
Definition: doca_error.h:64
@ DOCA_ERROR_CONNECTION_RESET
Definition: doca_error.h:49
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
Definition: doca_log.h:476
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
DOCA_STABLE doca_error_t doca_mmap_export_rdma(struct doca_mmap *mmap, const struct doca_dev *dev, const void **export_desc, size_t *export_desc_len)
Compose memory map representation for later import with doca_mmap_create_from_export() for one of the...
doca_task_submit_flag
Flags used when submitting a doca_task.
Definition: doca_pe.h:74
DOCA_EXPERIMENTAL doca_error_t doca_task_submit_ex(struct doca_task *task, uint32_t flags)
Extended version of doca_task_submit.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE void doca_task_set_user_data(struct doca_task *task, union doca_data user_data)
Set user data to a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
@ DOCA_TASK_SUBMIT_FLAG_NONE
Definition: doca_pe.h:75
@ DOCA_TASK_SUBMIT_FLAG_FLUSH
Definition: doca_pe.h:76
DOCA_EXPERIMENTAL struct doca_buf * doca_rdma_task_receive_get_dst_buf(const struct doca_rdma_task_receive *task)
This method gets the destination buffer of a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_set_conf(struct doca_rdma *rdma, doca_rdma_task_send_completion_cb_t successful_task_completion_cb, doca_rdma_task_send_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the send tasks configuration.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_destroy(struct doca_rdma *rdma)
Destroy a DOCA RDMA instance.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_receive_as_task(struct doca_rdma_task_receive *task)
This method converts a receive task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_set_conf(struct doca_rdma *rdma, doca_rdma_task_receive_completion_cb_t successful_task_completion_cb, doca_rdma_task_receive_completion_cb_t error_task_completion_cb, uint32_t num_tasks)
This method sets the receive tasks configuration.
DOCA_EXPERIMENTAL struct doca_task * doca_rdma_task_send_as_task(struct doca_rdma_task_send *task)
This method converts a send task to a doca_task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_receive_allocate_init(struct doca_rdma *rdma, struct doca_buf *dst_buf, union doca_data user_data, struct doca_rdma_task_receive **task)
This method allocates and initializes a receive task.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_export(struct doca_rdma *rdma, const void **local_rdma_conn_details, size_t *local_rdma_conn_details_size, struct doca_rdma_connection **rdma_connection)
Export doca_rdma connection details object The doca_rdma_conn_details are used in doca_rdma_connect()...
DOCA_EXPERIMENTAL doca_error_t doca_rdma_task_send_allocate_init(struct doca_rdma *rdma, struct doca_rdma_connection *rdma_connection, const struct doca_buf *src_buf, union doca_data user_data, struct doca_rdma_task_send **task)
This method allocates and initializes a send task.
DOCA_EXPERIMENTAL struct doca_ctx * doca_rdma_as_ctx(struct doca_rdma *rdma)
Convert doca_rdma instance into a generalized context for use with doca core objects.
DOCA_EXPERIMENTAL doca_error_t doca_rdma_connect(struct doca_rdma *rdma, const void *remote_rdma_conn_details, size_t remote_rdma_conn_details_size, struct doca_rdma_connection *rdma_connection)
Connect to remote doca_rdma peer. Can only be called when the ctx is in DOCA_CTX_STATE_STARTING state...
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
Definition: doca_types.h:83
@ DOCA_ACCESS_FLAG_RDMA_READ
Definition: doca_types.h:84
@ DOCA_ACCESS_FLAG_PCI_READ_WRITE
Definition: doca_types.h:91
@ DOCA_ACCESS_FLAG_RDMA_WRITE
Definition: doca_types.h:85
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
Definition: doca_version.h:90
const struct ip_frag_config * cfg
Definition: ip_frag_dp.c:0
type value
std::unique_ptr< storage::control::comch_channel > make_comch_server_control_channel(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::unique_ptr< storage::control::channel > make_tcp_client_control_channel(storage::ip_address const &server_address)
std::string to_string(storage::control::message_type type)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
Definition: os_utils.cpp:53
ip_address parse_ip_v4_address(char const *value)
Definition: ip_address.cpp:50
void uninstall_ctrl_c_handler()
Definition: os_utils.cpp:121
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
Definition: doca_utils.cpp:146
static constexpr value_requirement optional_value
Definition: doca_utils.hpp:228
void create_doca_logger_backend(void) noexcept
Definition: doca_utils.cpp:471
void install_ctrl_c_handler(std::function< void(void)> callback)
Definition: os_utils.cpp:106
void aligned_free(void *memory)
Definition: os_utils.cpp:131
doca_comch_producer * make_comch_producer(doca_comch_connection *conn, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_producer_task_send_completion_cb_t task_cb, doca_comch_producer_task_send_completion_cb_t error_cb)
Definition: doca_utils.cpp:279
doca_rdma * make_rdma_context(doca_dev *dev, doca_pe *pe, doca_data ctx_user_data, uint32_t permissions)
Definition: doca_utils.cpp:331
static constexpr value_multiplicity multiple_values
Definition: doca_utils.hpp:231
void * aligned_alloc(size_t alignment, size_t size)
Definition: os_utils.cpp:126
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
Definition: doca_utils.cpp:413
size_t aligned_size(size_t alignment, size_t size)
char * get_buffer_bytes(doca_buf *buf) noexcept
Definition: doca_utils.hpp:260
doca_dev_rep * open_representor(doca_dev *dev, std::string const &identifier)
Definition: doca_utils.cpp:104
static constexpr value_multiplicity single_value
Definition: doca_utils.hpp:230
doca_comch_consumer * make_comch_consumer(doca_comch_connection *conn, doca_mmap *mmap, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_consumer_task_post_recv_completion_cb_t task_cb, doca_comch_consumer_task_post_recv_completion_cb_t error_cb)
Definition: doca_utils.cpp:224
constexpr size_t size_of_io_message
Definition: io_message.hpp:53
constexpr uint32_t cache_line_size
Definition: definitions.hpp:40
static constexpr value_requirement required_value
Definition: doca_utils.hpp:227
uint32_t get_system_page_size(void)
Definition: os_utils.cpp:95
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
Definition: doca_utils.cpp:369
doca_dev * open_device(std::string const &identifier)
Definition: doca_utils.cpp:43
#define false
Definition: stdbool.h:22
storage::control::correlation_id correlation_id
storage::control::message_id message_id
std::unique_ptr< storage::control::message::payload > payload
Convenience type for representing opaque data.
Definition: doca_types.h:56
void * ptr
Definition: doca_types.h:57