NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
initiator_comch.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2025 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include <atomic>
27 #include <chrono>
28 #include <cstdio>
29 #include <memory>
30 #include <numeric>
31 #include <stdexcept>
32 #include <sstream>
33 #include <thread>
34 
35 #include <doca_argp.h>
36 #include <doca_buf.h>
37 #include <doca_buf_inventory.h>
38 #include <doca_comch.h>
39 #include <doca_comch_consumer.h>
40 #include <doca_comch_producer.h>
41 #include <doca_ctx.h>
42 #include <doca_dev.h>
43 #include <doca_error.h>
44 #include <doca_log.h>
45 #include <doca_mmap.h>
46 #include <doca_pe.h>
47 #include <doca_version.h>
48 
58 
59 DOCA_LOG_REGISTER(INITIATOR_COMCH);
60 
61 using namespace std::string_literals;
62 
63 namespace {
64 
65 auto constexpr app_name = "doca_storage_initiator_comch";
66 
67 auto constexpr run_type_read_throughout_test = "read_throughput_test";
68 auto constexpr run_type_write_throughout_test = "write_throughput_test";
69 auto constexpr run_type_read_write_data_validity_test = "read_write_data_validity_test";
70 auto constexpr run_type_read_only_data_validity_test = "read_only_data_validity_test";
71 
72 auto constexpr default_control_timeout_seconds = std::chrono::seconds{5};
73 auto constexpr default_task_count = 64;
74 auto constexpr default_command_channel_name = "doca_storage_comch";
75 auto constexpr default_run_limit_operation_count = 1'000'000;
76 auto constexpr default_batch_size = 4;
77 
78 static_assert(sizeof(void *) == 8, "Expected a pointer to occupy 8 bytes");
79 static_assert(sizeof(std::chrono::steady_clock::time_point) == 8,
80  "Expected std::chrono::steady_clock::time_point to occupy 8 bytes");
81 
82 /*
83  * User configurable parameters for the initiator_comch_app
84  */
85 struct initiator_comch_app_configuration {
86  std::vector<uint32_t> core_set = {};
87  std::string device_id = {};
88  std::string command_channel_name = {};
89  std::string storage_plain_content_file = {};
90  std::string run_type = {};
91  std::chrono::seconds control_timeout = {};
92  uint32_t task_count = 0;
93  uint32_t run_limit_operation_count = 0;
94  uint32_t batch_size = 0;
95 };
96 
97 /*
98  * Statistics emitted by the application
99  */
100 struct initiator_comch_app_stats {
101  std::chrono::steady_clock::time_point start_time = {};
102  std::chrono::steady_clock::time_point end_time = {};
103  uint64_t pe_hit_count = 0;
104  uint64_t pe_miss_count = 0;
105  uint64_t operation_count = 0;
106  uint32_t latency_min = 0;
107  uint32_t latency_max = 0;
108  uint32_t latency_mean = 0;
109 };
110 
111 /*
112  * Data that needs to be tracked per transaction
113  */
114 struct transaction_context {
115  /* The initial task that started the transaction */
116  doca_comch_producer_task_send *request = nullptr;
117  /* The start time of the transaction */
118  std::chrono::steady_clock::time_point start_time{};
119  /* A reference count, a transaction requires both the send and receive task to have their respective completion
120  * callbacks triggered before the transaction can be re-used
121  */
122  uint16_t refcount = 0;
123 };
124 
125 static_assert(sizeof(transaction_context) == 24, "Expected transaction_context to occupy 24 bytes");
126 
127 /*
128  * Data required for a thread worker
129  */
130 class initiator_comch_worker {
131 public:
132  /*
133  * A set of data that can be used in the data path, NO OTHER MEMORY SHOULD BE ACCESSED in the main loop or task
134  * callbacks. This is done to keep the maximum amount of useful data resident in the cache while avoiding as
135  * many cache evictions as possible.
136  */
137  struct alignas(storage::cache_line_size) hot_data {
138  uint8_t const *storage_plain_content;
139  uint8_t *io_region_begin;
140  uint8_t *io_region_end;
141  uint8_t *io_addr;
142  doca_pe *pe;
143  transaction_context *transactions;
144  uint32_t transactions_size;
145  uint32_t io_block_size;
146  std::chrono::steady_clock::time_point end_time;
147  uint64_t pe_hit_count;
148  uint64_t pe_miss_count;
149  uint64_t completed_transaction_count;
150  uint64_t remaining_tx_ops;
151  uint64_t remaining_rx_ops;
152  uint64_t latency_accumulator;
153  uint32_t latency_min;
154  uint32_t latency_max;
155  uint8_t batch_count;
156  uint8_t batch_size;
157  std::atomic_bool run_flag;
158  bool error_flag;
159 
160  /*
161  * Default constructor
162  */
163  hot_data();
164 
165  /*
166  * Deleted copy constructor
167  */
168  hot_data(hot_data const &) = delete;
169 
170  /*
171  * Move constructor
172  * @other [in]: Object to move from
173  */
174  hot_data(hot_data &&other) noexcept;
175 
176  /*
177  * Deleted copy assignment operator
178  */
179  hot_data &operator=(hot_data const &) = delete;
180 
181  /*
182  * Move assignment operator
183  * @other [in]: Object to move from
184  * @return: reference to moved assigned object
185  */
186  hot_data &operator=(hot_data &&other) noexcept;
187 
188  /*
189  * ComCh post recv helper to control batch submission flags
190  *
191  * @task [in]: Task to submit
192  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
193  */
194  doca_error_t submit_recv_task(doca_task *task);
195 
196  /*
197  * Start a transaction
198  *
199  * @transaction [in]: Transaction to start
200  * @now [in]: The current time (to measure duration)
201  */
202  void start_transaction(transaction_context &transaction, std::chrono::steady_clock::time_point now);
203 
204  /*
205  * Process transaction completion
206  *
207  * @transaction [in]: The completed transaction
208  */
209  void on_transaction_complete(transaction_context &transaction);
210  };
211  static_assert(sizeof(initiator_comch_worker::hot_data) == (2 * storage::cache_line_size),
212  "Expected initiator_comch_worker::hot_data to occupy two cache lines");
213 
214  using thread_proc_fn_t = void (*)(initiator_comch_worker::hot_data &hot_data);
215 
216  /*
217  * Destructor
218  */
219  ~initiator_comch_worker();
220 
221  /*
222  * Default constructor
223  */
224  initiator_comch_worker();
225 
226  /*
227  * Deleted copy constructor
228  */
229  initiator_comch_worker(initiator_comch_worker const &) = delete;
230 
231  /*
232  * Move constructor
233  * @other [in]: Object to move from
234  */
235  [[maybe_unused]] initiator_comch_worker(initiator_comch_worker &&) noexcept;
236 
237  /*
238  * Deleted copy assignment operator
239  */
240  initiator_comch_worker &operator=(initiator_comch_worker const &) = delete;
241  /*
242  * Move assignment operator
243  * @other [in]: Object to move from
244  * @return: reference to moved assigned object
245  */
246  [[maybe_unused]] initiator_comch_worker &operator=(initiator_comch_worker &&) noexcept;
247 
248  /*
249  * Allocate and prepare resources for this object
250  *
251  * @dev [in]: Device to use
252  * @comch_conn [in]: Comch control channel to use
253  * @storage_plain_content [in]: Plain (original) input data, Used to compare against the data retrieved from
254  * storage
255  * @task_count [in]: Number of tasks to use (per worker)
256  * @batch_size [in]: Number of tasks to submit together
257  * @io_region_begin [in]: Start address of local storage memory
258  * @io_region_size [in]: Size of local storage memory
259  * @io_block_size [in]: Block size to use
260  */
261  void init(doca_dev *dev,
262  doca_comch_connection *comch_conn,
263  uint8_t const *storage_plain_content,
264  uint32_t task_count,
265  uint32_t batch_size,
266  uint8_t *io_region_begin,
267  uint64_t io_region_size,
268  uint32_t io_block_size);
269 
270  /*
271  * Prepare thread proc
272  * @fn [in]: Thread function
273  * @run_limit_op_count [in]: Number of tasks to execute
274  * @core_id [in]: Core to run on
275  */
276  void prepare_thread_proc(initiator_comch_worker::thread_proc_fn_t fn,
277  uint32_t run_limit_op_count,
278  uint32_t core_id);
279 
280  /*
281  * Prepare tasks required for the data path
282  *
283  * @op_type [in]: Initial operation type (read or write)
284  * @remote_consumer_id [in]: ID of remote consumer
285  */
286  void prepare_tasks(storage::io_message_type op_type, uint32_t remote_consumer_id);
287 
288  /*
289  * Check that all contexts are ready to run
290  *
291  * @return: true if all contexts are ready to run
292  */
293  [[nodiscard]] bool are_contexts_ready(void) const noexcept;
294 
295  /*
296  * Start the worker thread
297  */
298  void start_thread_proc(void);
299 
300  /*
301  * Query if thr worker thead is still running
302  *
303  * @return: true the worker is still running
304  */
305  [[nodiscard]] bool is_thread_proc_running(void) const noexcept;
306 
307  /*
308  * Join the work thread
309  */
310  void join_thread_proc(void);
311 
312  /*
313  * Get a reference to the workers hot data
314  *
315  * @return: A reference to the workers hot data
316  */
317  [[nodiscard]] hot_data const &get_hot_data(void) const noexcept;
318 
319  /*
320  * Get a reference to the workers hot data
321  *
322  * @return: A reference to the workers hot data
323  */
324  void destroy_data_path_objects(void);
325 
326 private:
327  hot_data m_hot_data;
328  uint8_t *m_io_message_region;
329  doca_mmap *m_io_message_mmap;
330  doca_buf_inventory *m_io_message_inv;
331  std::vector<doca_buf *> m_io_message_bufs;
332  doca_comch_consumer *m_consumer;
333  doca_comch_producer *m_producer;
334  std::vector<doca_task *> m_io_responses;
335  std::vector<doca_task *> m_io_requests;
336  std::thread m_thread;
337 
338  /*
339  * Release all resources held by this object
340  */
341  void cleanup(void) noexcept;
342 
343  /*
344  * ComCh consumer task callback
345  *
346  * @task [in]: Completed task
347  * @task_user_data [in]: Data associated with the task
348  * @ctx_user_data [in]: Data associated with the context
349  */
350  static void doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
351  doca_data task_user_data,
352  doca_data ctx_user_data) noexcept;
353 
354  /*
355  * ComCh consumer task error callback
356  *
357  * @task [in]: Failed task
358  * @task_user_data [in]: Data associated with the task
359  * @ctx_user_data [in]: Data associated with the context
360  */
361  static void doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
362  doca_data task_user_data,
363  doca_data ctx_user_data) noexcept;
364 
365  /*
366  * ComCh producer task callback
367  *
368  * @task [in]: Completed task
369  * @task_user_data [in]: Data associated with the task
370  * @ctx_user_data [in]: Data associated with the context
371  */
372  static void doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
373  doca_data task_user_data,
374  doca_data ctx_user_data) noexcept;
375 
376  /*
377  * ComCh producer task error callback
378  *
379  * @task [in]: Failed task
380  * @task_user_data [in]: Data associated with the task
381  * @ctx_user_data [in]: Data associated with the context
382  */
383  static void doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
384  doca_data task_user_data,
385  doca_data ctx_user_data) noexcept;
386 };
387 
388 /*
389  * Application
390  */
391 class initiator_comch_app {
392 public:
393  /*
394  * Destructor
395  */
396  ~initiator_comch_app();
397 
398  /*
399  * Deleted default constructor
400  */
401  initiator_comch_app() = delete;
402 
403  /*
404  * Constructor
405  *
406  * @cfg [in]: Configuration
407  */
408  explicit initiator_comch_app(initiator_comch_app_configuration const &cfg);
409 
410  /*
411  * Deleted copy constructor
412  */
413  initiator_comch_app(initiator_comch_app const &) = delete;
414 
415  /*
416  * Deleted move constructor
417  */
418  initiator_comch_app(initiator_comch_app &&) noexcept = delete;
419 
420  /*
421  * Deleted copy assignment operator
422  */
423  initiator_comch_app &operator=(initiator_comch_app const &) = delete;
424 
425  /*
426  * Deleted move assignment operator
427  */
428  initiator_comch_app &operator=(initiator_comch_app &&) noexcept = delete;
429 
430  /*
431  * Abort the application
432  *
433  * @reason [in]: Abort reason
434  */
435  void abort(std::string const &reason);
436 
437  /*
438  * Connect to the storage service
439  */
440  void connect_to_storage_service(void);
441 
442  /*
443  * Query storage details
444  */
445  void query_storage(void);
446 
447  /*
448  * Initialise storage session
449  */
450  void init_storage(void);
451 
452  /*
453  * Prepare worker threads
454  */
455  void prepare_threads(void);
456 
457  /*
458  * Start storage session
459  */
460  void start_storage(void);
461 
462  /*
463  * Run storage test
464  */
465  bool run(void);
466 
467  /*
468  * Join worker threads
469  */
470  void join_threads(void);
471 
472  /*
473  * Stop storage session
474  */
475  void stop_storage(void);
476 
477  /*
478  * display statistics
479  */
480  void display_stats(void) const;
481 
482  /*
483  * Shutdown storage resources
484  */
485  void shutdown(void);
486 
487 private:
488  initiator_comch_app_configuration const m_cfg;
489  std::vector<uint8_t> const m_storage_plain_content;
490  doca_dev *m_dev;
491  uint8_t *m_io_region;
492  doca_mmap *m_io_mmap;
493  std::unique_ptr<storage::control::comch_channel> m_service_control_channel;
494  std::vector<storage::control::message> m_ctrl_messages;
495  std::vector<uint32_t> m_remote_consumer_ids;
496  initiator_comch_worker *m_workers;
497  uint64_t m_storage_capacity;
498  uint32_t m_storage_block_size;
499  initiator_comch_app_stats m_stats;
500  uint32_t m_message_id_counter;
501  uint32_t m_correlation_id_counter;
502  bool m_abort_flag;
503 
504  /*
505  * Handle a new remote consumer becoming available
506  *
507  * @event [in]: ComCh Event
508  * @comch_connection [in]: Connection the new consumer belongs to
509  * @id [in]: ID of the new consumer
510  */
511  static void new_comch_consumer_callback(void *user_data, uint32_t id) noexcept;
512 
513  /*
514  * Handle a remote consumer becoming unavailable
515  *
516  * @event [in]: ComCh Event
517  * @comch_connection [in]: Connection the new consumer belonged to
518  * @id [in]: ID of the expired consumer
519  */
520  static void expired_comch_consumer_callback(void *user_data, uint32_t id) noexcept;
521 
522  /*
523  * Wait for a response to a control message
524  *
525  * @type [in]: Type of message to expect
526  * @msg_id [in]: ID of the request to match its response to
527  * @timeout [in]: Timeout to use
528  */
529  storage::control::message wait_for_control_response(storage::control::message_type type,
530  storage::control::message_id msg_id,
531  std::chrono::seconds timeout);
532 };
533 
534 /*
535  * Parse command line arguments
536  *
537  * @argc [in]: Number of arguments
538  * @argv [in]: Array of argument values
539  * @return: Parsed initiator_comch_app_configuration
540  *
541  * @throws: storage::runtime_error If the initiator_comch_app_configuration cannot pe parsed or contains invalid values
542  */
543 initiator_comch_app_configuration parse_cli_args(int argc, char **argv);
544 
545 } // namespace
546 
547 /*
548  * Main
549  *
550  * @argc [in]: Number of arguments
551  * @argv [in]: Array of argument values
552  * @return: EXIT_SUCCESS on success and EXIT_FAILURE otherwise
553  */
554 int main(int argc, char **argv)
555 {
557 
558  printf("%s: v%s\n", app_name, doca_version());
559 
560  int exit_value = EXIT_SUCCESS;
561 
562  try {
563  initiator_comch_app app{parse_cli_args(argc, argv)};
565  app.abort("User requested abort");
566  });
567  app.connect_to_storage_service();
568  app.query_storage();
569  app.init_storage();
570  app.prepare_threads();
571  app.start_storage();
572  auto const run_success = app.run();
573  app.join_threads();
574  app.stop_storage();
575  if (run_success) {
576  app.display_stats();
577  } else {
578  exit_value = EXIT_FAILURE;
579  fprintf(stderr, "+================================================+\n");
580  fprintf(stderr, "| Test failed!!\n");
581  fprintf(stderr, "+================================================+\n");
582  }
583  app.shutdown();
584  } catch (std::exception const &ex) {
585  fprintf(stderr, "EXCEPTION: %s\n", ex.what());
586  fflush(stdout);
587  fflush(stderr);
588  return EXIT_FAILURE;
589  }
590 
592 
593  fflush(stdout);
594  fflush(stderr);
595  return exit_value;
596 }
597 
598 namespace {
599 
600 /*
601  * Print the parsed initiator_comch_app_configuration
602  *
603  * @cfg [in]: initiator_comch_app_configuration to display
604  */
605 void print_config(initiator_comch_app_configuration const &cfg) noexcept
606 {
607  printf("initiator_comch_app_configuration: {\n");
608  printf("\tcore_set : [");
609  bool first = true;
610  for (auto cpu : cfg.core_set) {
611  if (first)
612  first = false;
613  else
614  printf(", ");
615  printf("%u", cpu);
616  }
617  printf("]\n");
618  printf("\texecution_strategy : \"%s\",\n", cfg.run_type.c_str());
619  printf("\tdevice : \"%s\",\n", cfg.device_id.c_str());
620  printf("\tcommand_channel_name : \"%s\",\n", cfg.command_channel_name.c_str());
621  printf("\tstorage_plain_content : \"%s\",\n", cfg.storage_plain_content_file.c_str());
622  printf("\ttask_count : %u,\n", cfg.task_count);
623  printf("\tbatch_size : %u,\n", cfg.batch_size);
624  printf("\trun_limit_operation_count : %u,\n", cfg.run_limit_operation_count);
625  printf("\tcontrol_timeout : %u,\n", static_cast<uint32_t>(cfg.control_timeout.count()));
626  printf("}\n");
627 }
628 
629 /*
630  * Validate initiator_comch_app_configuration
631  *
632  * @cfg [in]: initiator_comch_app_configuration
633  */
634 void validate_initiator_comch_app_configuration(initiator_comch_app_configuration const &cfg)
635 {
636  std::vector<std::string> errors;
637 
638  if (cfg.task_count == 0) {
639  errors.emplace_back("Invalid initiator_comch_app_configuration: task-count must not be zero");
640  }
641 
642  if (cfg.control_timeout.count() == 0) {
643  errors.emplace_back("Invalid initiator_comch_app_configuration: control-timeout must not be zero");
644  }
645 
646  if (cfg.run_type == run_type_read_write_data_validity_test && cfg.core_set.size() != 1) {
647  errors.push_back("Invalid initiator_comch_app_configuration: "s +
648  run_type_read_write_data_validity_test + " Only supports one thread");
649  }
650 
651  if (cfg.run_type == run_type_read_only_data_validity_test && cfg.storage_plain_content_file.empty()) {
652  errors.push_back("Invalid initiator_comch_app_configuration: "s +
653  run_type_read_only_data_validity_test + " requires plain data file to be provided");
654  }
655 
656  if (!errors.empty()) {
657  for (auto const &err : errors) {
658  printf("%s\n", err.c_str());
659  }
661  "Invalid initiator_comch_app_configuration detected"};
662  }
663 }
664 
665 /*
666  * Parse command line arguments
667  *
668  * @argc [in]: Number of arguments
669  * @argv [in]: Array of argument values
670  * @return: Parsed initiator_comch_app_configuration
671  *
672  * @throws: storage::runtime_error If the initiator_comch_app_configuration cannot pe parsed or contains invalid values
673  */
674 initiator_comch_app_configuration parse_cli_args(int argc, char **argv)
675 {
676  initiator_comch_app_configuration config{};
677  config.task_count = default_task_count;
678  config.command_channel_name = default_command_channel_name;
679  config.control_timeout = default_control_timeout_seconds;
680  config.run_limit_operation_count = default_run_limit_operation_count;
681  config.batch_size = default_batch_size;
682 
683  doca_error_t ret;
684 
685  ret = doca_argp_init(app_name, &config);
686  if (ret != DOCA_SUCCESS) {
687  throw storage::runtime_error{ret, "Failed to parse CLI args"};
688  }
689 
691  "d",
692  "device",
693  "Device identifier",
696  [](void *value, void *cfg) noexcept {
697  static_cast<initiator_comch_app_configuration *>(cfg)->device_id =
698  static_cast<char const *>(value);
699  return DOCA_SUCCESS;
700  });
703  nullptr,
704  "cpu",
705  "CPU core to which the process affinity can be set",
708  [](void *value, void *cfg) noexcept {
709  static_cast<initiator_comch_app_configuration *>(cfg)->core_set.push_back(
710  *static_cast<int *>(value));
711  return DOCA_SUCCESS;
712  });
715  nullptr,
716  "storage-plain-content",
717  "File containing the plain data that is represented by the storage",
720  [](void *value, void *cfg) noexcept {
721  static_cast<initiator_comch_app_configuration *>(cfg)->storage_plain_content_file =
722  static_cast<char const *>(value);
723  return DOCA_SUCCESS;
724  });
727  nullptr,
728  "execution-strategy",
729  "Define what to run. One of: read_throughput_test | write_throughput_test | read_write_data_validity_test | read_only_data_validity_test",
732  [](void *value, void *cfg) noexcept {
733  static_cast<initiator_comch_app_configuration *>(cfg)->run_type =
734  static_cast<char const *>(value);
735 
736  return DOCA_SUCCESS;
737  });
738 
741  nullptr,
742  "run-limit-operation-count",
743  "Run N operations (per thread) then stop. Default: 1000000",
746  [](void *value, void *cfg) noexcept {
747  static_cast<initiator_comch_app_configuration *>(cfg)->run_limit_operation_count =
748  *static_cast<int *>(value);
749  return DOCA_SUCCESS;
750  });
752  nullptr,
753  "task-count",
754  "Number of concurrent tasks (per thread) to use. Default: 64",
757  [](void *value, void *cfg) noexcept {
758  static_cast<initiator_comch_app_configuration *>(cfg)->task_count =
759  *static_cast<int *>(value);
760  return DOCA_SUCCESS;
761  });
764  nullptr,
765  "command-channel-name",
766  "Name of the channel used by the doca_comch_client. Default: \"doca_storage_comch\"",
769  [](void *value, void *cfg) noexcept {
770  static_cast<initiator_comch_app_configuration *>(cfg)->command_channel_name =
771  static_cast<char const *>(value);
772  return DOCA_SUCCESS;
773  });
775  nullptr,
776  "control-timeout",
777  "Time (in seconds) to wait while performing control operations. Default: 5",
780  [](void *value, void *cfg) noexcept {
781  static_cast<initiator_comch_app_configuration *>(cfg)->control_timeout =
782  std::chrono::seconds{*static_cast<int *>(value)};
783  return DOCA_SUCCESS;
784  });
786  nullptr,
787  "batch-size",
788  "Batch size: Default: 4",
791  [](void *value, void *cfg) noexcept {
792  static_cast<initiator_comch_app_configuration *>(cfg)->batch_size =
793  *static_cast<int *>(value);
794  return DOCA_SUCCESS;
795  });
796  ret = doca_argp_start(argc, argv);
797  if (ret != DOCA_SUCCESS) {
798  throw storage::runtime_error{ret, "Failed to parse CLI args"};
799  }
800 
801  static_cast<void>(doca_argp_destroy());
802 
803  if (config.batch_size > config.task_count) {
804  config.batch_size = config.task_count;
805  DOCA_LOG_WARN("Clamping batch size to maximum value: %u", config.batch_size);
806  }
807 
808  print_config(config);
809  validate_initiator_comch_app_configuration(config);
810 
811  return config;
812 }
813 
814 class thread_proc_catch_wrapper {
815 public:
816  ~thread_proc_catch_wrapper() = default;
817  thread_proc_catch_wrapper() = delete;
818  explicit thread_proc_catch_wrapper(initiator_comch_worker::hot_data *hot_data) noexcept : m_hot_data{hot_data}
819  {
820  }
821  thread_proc_catch_wrapper(thread_proc_catch_wrapper const &) = delete;
822  thread_proc_catch_wrapper(thread_proc_catch_wrapper &&) noexcept = default;
823  thread_proc_catch_wrapper &operator=(thread_proc_catch_wrapper const &) = delete;
824  thread_proc_catch_wrapper &operator=(thread_proc_catch_wrapper &&) noexcept = default;
825 
826  void operator()(initiator_comch_worker::thread_proc_fn_t fn)
827  {
828  try {
829  fn(*m_hot_data);
830  } catch (storage::runtime_error const &ex) {
831  std::stringstream ss;
832  ss << "[Thread: " << std::this_thread::get_id()
833  << "] Exception: " << doca_error_get_name(ex.get_doca_error()) << ":" << ex.what();
834  DOCA_LOG_ERR("%s", ss.str().c_str());
835  m_hot_data->error_flag = true;
836  m_hot_data->run_flag = false;
837  }
838  }
839 
840 private:
841  initiator_comch_worker::hot_data *m_hot_data;
842 };
843 
844 void throughput_thread_proc(initiator_comch_worker::hot_data &hot_data) noexcept
845 {
846  /* wait to start */
847  while (hot_data.run_flag == false) {
848  std::this_thread::yield();
849  if (hot_data.error_flag)
850  return;
851  }
852 
853  /* submit initial tasks */
854  auto const initial_task_count =
855  std::min(static_cast<uint64_t>(hot_data.transactions_size), hot_data.remaining_tx_ops);
856  for (uint32_t ii = 0; ii != initial_task_count; ++ii)
857  hot_data.start_transaction(hot_data.transactions[ii], std::chrono::steady_clock::now());
858 
859  /* run until the test completes */
860  while (hot_data.run_flag) {
861  doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
862  }
863 
864  /* exit if anything went wrong */
865  if (hot_data.error_flag) {
866  return;
867  }
868 
869  /* wait for any completions that are out-standing in the case of a user abort (control+C) */
870  hot_data.remaining_rx_ops = hot_data.remaining_rx_ops - hot_data.remaining_tx_ops;
871  while (hot_data.remaining_rx_ops != 0) {
872  doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
873  }
874 }
875 
876 void write_storage_memory(initiator_comch_worker::hot_data &hot_data, uint8_t const *expected_memory_content) noexcept
877 {
878  auto const io_region_size = hot_data.io_region_end - hot_data.io_region_begin;
879 
880  /* For validation tests just process the remote storage once */
881  hot_data.remaining_tx_ops = hot_data.remaining_rx_ops = io_region_size / hot_data.io_block_size;
882 
883  /* prepare data to be written */
884  hot_data.io_addr = hot_data.io_region_begin;
885  std::copy(expected_memory_content, expected_memory_content + io_region_size, hot_data.io_region_begin);
886 
887  /* submit initial tasks */
888  auto const initial_task_count =
889  std::min(static_cast<uint64_t>(hot_data.transactions_size), hot_data.remaining_tx_ops);
890  for (uint32_t ii = 0; ii != initial_task_count; ++ii) {
891  char *io_request;
892  static_cast<void>(
893  doca_buf_get_data(doca_comch_producer_task_send_get_buf(hot_data.transactions[ii].request),
894  reinterpret_cast<void **>(&io_request)));
896 
897  hot_data.start_transaction(hot_data.transactions[ii], std::chrono::steady_clock::now());
898  }
899 
900  /* run until the test completes */
901  while (hot_data.remaining_rx_ops != 0) {
902  doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
903  }
904 
905  /* exit if anything went wrong */
906  if (hot_data.error_flag) {
907  return;
908  }
909 }
910 
911 void read_and_validate_storage_memory(initiator_comch_worker::hot_data &hot_data,
912  uint8_t const *expected_memory_content) noexcept
913 {
914  size_t const io_region_size = hot_data.io_region_end - hot_data.io_region_begin;
915 
916  /* For validation tests just process the remote storage once */
917  hot_data.remaining_tx_ops = hot_data.remaining_rx_ops = io_region_size / hot_data.io_block_size;
918 
919  /* submit initial tasks */
920  auto const initial_task_count =
921  std::min(static_cast<uint64_t>(hot_data.transactions_size), hot_data.remaining_tx_ops);
922  for (uint32_t ii = 0; ii != initial_task_count; ++ii) {
923  char *io_request;
924  static_cast<void>(
925  doca_buf_get_data(doca_comch_producer_task_send_get_buf(hot_data.transactions[ii].request),
926  reinterpret_cast<void **>(&io_request)));
928 
929  hot_data.start_transaction(hot_data.transactions[ii], std::chrono::steady_clock::now());
930  }
931 
932  /* run until the test completes */
933  while (hot_data.remaining_rx_ops != 0) {
934  doca_pe_progress(hot_data.pe) ? ++(hot_data.pe_hit_count) : ++(hot_data.pe_miss_count);
935  }
936 
937  /* exit if anything went wrong */
938  if (hot_data.error_flag) {
939  return;
940  }
941 
942  /* Check the memory contents were as expected */
943  for (size_t offset = 0; offset != io_region_size; ++offset) {
944  if (hot_data.io_region_begin[offset] != expected_memory_content[offset]) {
945  DOCA_LOG_ERR("Data mismatch @ position %zu: %02x != %02x",
946  offset,
947  hot_data.io_region_begin[offset],
948  expected_memory_content[offset]);
949  hot_data.error_flag = true;
950  break;
951  }
952  }
953 }
954 
955 void read_only_data_validity_thread_proc(initiator_comch_worker::hot_data &hot_data) noexcept
956 {
957  /* wait to start */
958  while (hot_data.run_flag == false) {
959  std::this_thread::yield();
960  if (hot_data.error_flag)
961  return;
962  }
963 
964  read_and_validate_storage_memory(hot_data, hot_data.storage_plain_content);
965 }
966 
967 void read_write_data_validity_thread_proc(initiator_comch_worker::hot_data &hot_data) noexcept
968 {
969  /* wait to start */
970  while (hot_data.run_flag == false) {
971  std::this_thread::yield();
972  if (hot_data.error_flag)
973  return;
974  }
975 
976  size_t const io_region_size = hot_data.io_region_end - hot_data.io_region_begin;
977  std::vector<uint8_t> write_data;
978  write_data.resize(io_region_size);
979  for (size_t ii = 0; ii != io_region_size; ++ii) {
980  write_data[ii] = static_cast<uint8_t>(ii);
981  }
982 
983  write_storage_memory(hot_data, write_data.data());
984  read_and_validate_storage_memory(hot_data, write_data.data());
985 }
986 
987 initiator_comch_worker::hot_data::hot_data()
988  : storage_plain_content{nullptr},
989  io_region_begin{nullptr},
990  io_region_end{nullptr},
991  io_addr{nullptr},
992  pe{nullptr},
993  transactions{nullptr},
994  transactions_size{0},
995  io_block_size{0},
996  end_time{},
997  pe_hit_count{0},
998  pe_miss_count{0},
999  completed_transaction_count{0},
1000  remaining_tx_ops{0},
1001  remaining_rx_ops{0},
1002  latency_accumulator{0},
1003  latency_min{std::numeric_limits<uint32_t>::max()},
1004  latency_max{0},
1005  batch_count{0},
1006  batch_size{1},
1007  run_flag{false},
1008  error_flag{false}
1009 {
1010 }
1011 
1012 initiator_comch_worker::hot_data::hot_data(hot_data &&other) noexcept
1013  : storage_plain_content{other.storage_plain_content},
1014  io_region_begin{other.io_region_begin},
1015  io_region_end{other.io_region_end},
1016  io_addr{other.io_addr},
1017  pe{other.pe},
1018  transactions{other.transactions},
1019  transactions_size{other.transactions_size},
1020  io_block_size{other.io_block_size},
1021  end_time{other.end_time},
1022  pe_hit_count{other.pe_hit_count},
1023  pe_miss_count{other.pe_miss_count},
1024  completed_transaction_count{other.completed_transaction_count},
1025  remaining_tx_ops{other.remaining_tx_ops},
1026  remaining_rx_ops{other.remaining_rx_ops},
1027  latency_accumulator{other.latency_accumulator},
1028  latency_min{other.latency_min},
1029  latency_max{other.latency_max},
1030  batch_count{other.batch_count},
1031  batch_size{other.batch_size},
1032  run_flag{other.run_flag.load()},
1033  error_flag{other.error_flag}
1034 {
1035  other.storage_plain_content = nullptr;
1036  other.pe = nullptr;
1037  other.transactions = nullptr;
1038 }
1039 
1040 initiator_comch_worker::hot_data &initiator_comch_worker::hot_data::operator=(hot_data &&other) noexcept
1041 {
1042  if (std::addressof(other) == this)
1043  return *this;
1044 
1045  storage_plain_content = other.storage_plain_content;
1046  io_region_begin = other.io_region_begin;
1047  io_region_end = other.io_region_end;
1048  io_addr = other.io_addr;
1049  pe = other.pe;
1050  transactions = other.transactions;
1051  transactions_size = other.transactions_size;
1052  io_block_size = other.io_block_size;
1053  end_time = other.end_time;
1054  pe_hit_count = other.pe_hit_count;
1055  pe_miss_count = other.pe_miss_count;
1056  completed_transaction_count = other.completed_transaction_count;
1057  remaining_tx_ops = other.remaining_tx_ops;
1058  remaining_rx_ops = other.remaining_rx_ops;
1059  latency_accumulator = other.latency_accumulator;
1060  latency_min = other.latency_min;
1061  latency_max = other.latency_max;
1062  batch_count = other.batch_count;
1063  batch_size = other.batch_size;
1064  run_flag = other.run_flag.load();
1065  error_flag = other.error_flag;
1066 
1067  other.storage_plain_content = nullptr;
1068  other.pe = nullptr;
1069  other.transactions = nullptr;
1070 
1071  return *this;
1072 }
1073 
1074 doca_error_t initiator_comch_worker::hot_data::submit_recv_task(doca_task *task)
1075 {
1077  if (--batch_count == 0) {
1078  submit_flag = DOCA_TASK_SUBMIT_FLAG_FLUSH;
1079  batch_count = batch_size;
1080  }
1081 
1082  return doca_task_submit_ex(task, submit_flag);
1083 }
1084 
1085 void initiator_comch_worker::hot_data::start_transaction(transaction_context &transaction,
1086  std::chrono::steady_clock::time_point now)
1087 {
1088  /* set the transaction refcount to 2 as the order of completions between the receive callback and the send
1089  * callback are not guaranteed to be ordered. The task cannot be re-used until both callbacks have completed.
1090  */
1091  transaction.refcount = 2;
1092  transaction.start_time = now;
1093  doca_error_t ret;
1094 
1095  // Set the io target to the next block until all the storage memory has been accessed, then go back to the start
1096  char *io_request;
1097  static_cast<void>(doca_buf_get_data(doca_comch_producer_task_send_get_buf(transaction.request),
1098  reinterpret_cast<void **>(&io_request)));
1099  storage::io_message_view::set_io_address(reinterpret_cast<uint64_t>(io_addr), io_request);
1100  io_addr += io_block_size;
1101  if (io_addr == io_region_end) {
1102  io_addr = io_region_begin;
1103  }
1104 
1105  do {
1106  ret = doca_task_submit(doca_comch_producer_task_send_as_task(transaction.request));
1107  } while (ret == DOCA_ERROR_AGAIN);
1108 
1109  if (ret != DOCA_SUCCESS) {
1110  DOCA_LOG_ERR("Failed to submit comch producer send task: %s", doca_error_get_name(ret));
1111  run_flag = false;
1112  error_flag = true;
1113  }
1114 
1115  --remaining_tx_ops;
1116 }
1117 
1118 void initiator_comch_worker::hot_data::on_transaction_complete(transaction_context &transaction)
1119 {
1120  auto const now = std::chrono::steady_clock::now();
1121  auto const usecs = static_cast<uint32_t>(
1122  std::chrono::duration_cast<std::chrono::microseconds>(now - transaction.start_time).count());
1123  latency_accumulator += usecs;
1124  latency_min = std::min(latency_min, usecs);
1125  latency_max = std::max(latency_max, usecs);
1126 
1127  ++completed_transaction_count;
1128  --remaining_rx_ops;
1129  if (remaining_tx_ops) {
1130  start_transaction(transaction, now);
1131  } else if (remaining_rx_ops == 0) {
1132  run_flag = false;
1133  end_time = std::chrono::steady_clock::now();
1134  }
1135 }
1136 
1137 initiator_comch_worker::~initiator_comch_worker()
1138 {
1139  cleanup();
1140 }
1141 
1142 initiator_comch_worker::initiator_comch_worker()
1143  : m_hot_data{},
1144  m_io_message_region{nullptr},
1145  m_io_message_mmap{nullptr},
1146  m_io_message_inv{nullptr},
1147  m_io_message_bufs{},
1148  m_consumer{nullptr},
1149  m_producer{nullptr},
1150  m_io_responses{},
1151  m_io_requests{},
1152  m_thread{}
1153 {
1154 }
1155 
1156 initiator_comch_worker::initiator_comch_worker(initiator_comch_worker &&other) noexcept
1157  : m_hot_data{std::move(other.m_hot_data)},
1158  m_io_message_region{other.m_io_message_region},
1159  m_io_message_mmap{other.m_io_message_mmap},
1160  m_io_message_inv{other.m_io_message_inv},
1161  m_io_message_bufs{std::move(other.m_io_message_bufs)},
1162  m_consumer{other.m_consumer},
1163  m_producer{other.m_producer},
1164  m_io_responses{std::move(other.m_io_responses)},
1165  m_io_requests{std::move(other.m_io_requests)},
1166  m_thread{std::move(other.m_thread)}
1167 {
1168  other.m_io_message_region = nullptr;
1169  other.m_io_message_mmap = nullptr;
1170  other.m_io_message_inv = nullptr;
1171  other.m_consumer = nullptr;
1172  other.m_producer = nullptr;
1173 }
1174 
1175 initiator_comch_worker &initiator_comch_worker::operator=(initiator_comch_worker &&other) noexcept
1176 {
1177  if (std::addressof(other) == this)
1178  return *this;
1179 
1180  m_hot_data = std::move(other.m_hot_data);
1181  m_io_message_region = other.m_io_message_region;
1182  m_io_message_mmap = other.m_io_message_mmap;
1183  m_io_message_inv = other.m_io_message_inv;
1184  m_io_message_bufs = std::move(other.m_io_message_bufs);
1185  m_consumer = other.m_consumer;
1186  m_producer = other.m_producer;
1187  m_io_responses = std::move(other.m_io_responses);
1188  m_io_requests = std::move(other.m_io_requests);
1189  m_thread = std::move(other.m_thread);
1190 
1191  other.m_io_message_region = nullptr;
1192  other.m_io_message_mmap = nullptr;
1193  other.m_io_message_inv = nullptr;
1194  other.m_consumer = nullptr;
1195  other.m_producer = nullptr;
1196 
1197  return *this;
1198 }
1199 
1200 void initiator_comch_worker::init(doca_dev *dev,
1201  doca_comch_connection *comch_conn,
1202  uint8_t const *storage_plain_content,
1203  uint32_t task_count,
1204  uint32_t batch_size,
1205  uint8_t *io_region_begin,
1206  uint64_t io_region_size,
1207  uint32_t io_block_size)
1208 {
1209  doca_error_t ret;
1210  auto const page_size = storage::get_system_page_size();
1211 
1212  m_hot_data.storage_plain_content = storage_plain_content;
1213  m_hot_data.io_addr = io_region_begin;
1214  m_hot_data.io_region_begin = io_region_begin;
1215  m_hot_data.io_region_end = io_region_begin + io_region_size;
1216  m_hot_data.io_block_size = io_block_size;
1217  /*
1218  * Allocate enough memory for at N tasks + cfg.batch_size where N is the smaller value of:
1219  * cfg.buffer_count or cfg.run_limit_operation_count. cfg.batch_size tasks are over-allocated due to how
1220  * batched receive tasks work. The requirement is to always be able to have N tasks in flight. While
1221  * also lowering the cost of task submission by batching. So because upto cfg.batch_size -1 tasks could
1222  * be submitted but not yet flushed means that a surplus of cfg.batch_size tasks are required to
1223  * maintain always having N active tasks.
1224  */
1225  m_hot_data.transactions_size = task_count;
1226  m_hot_data.batch_size = batch_size;
1227  auto const raw_io_messages_size = (task_count + batch_size) * storage::size_of_io_message * 2;
1228 
1229  DOCA_LOG_DBG("Allocate comch buffers memory (%zu bytes, aligned to %u byte pages)",
1230  raw_io_messages_size,
1231  page_size);
1232  m_io_message_region = static_cast<uint8_t *>(
1233  storage::aligned_alloc(page_size, storage::aligned_size(page_size, raw_io_messages_size)));
1234  if (m_io_message_region == nullptr) {
1235  throw storage::runtime_error{DOCA_ERROR_NO_MEMORY, "Failed to allocate comch fast path buffers memory"};
1236  }
1237 
1238  try {
1239  m_hot_data.transactions =
1240  storage::make_aligned<transaction_context>{}.object_array(m_hot_data.transactions_size);
1241  } catch (std::exception const &ex) {
1243  "Failed to allocate transaction contexts memory: "s + ex.what()};
1244  }
1245 
1246  m_io_message_mmap = storage::make_mmap(dev,
1247  reinterpret_cast<char *>(m_io_message_region),
1248  raw_io_messages_size,
1250 
1251  ret = doca_buf_inventory_create((task_count * 2) + batch_size, &m_io_message_inv);
1252  if (ret != DOCA_SUCCESS) {
1253  throw storage::runtime_error{ret, "Failed to create comch fast path doca_buf_inventory"};
1254  }
1255 
1256  ret = doca_buf_inventory_start(m_io_message_inv);
1257  if (ret != DOCA_SUCCESS) {
1258  throw storage::runtime_error{ret, "Failed to start comch fast path doca_buf_inventory"};
1259  }
1260 
1261  DOCA_LOG_DBG("Create hot path progress engine");
1262  ret = doca_pe_create(std::addressof(m_hot_data.pe));
1263  if (ret != DOCA_SUCCESS) {
1264  throw storage::runtime_error{ret, "Failed to create doca_pe"};
1265  }
1266 
1267  m_producer = storage::make_comch_producer(comch_conn,
1268  m_hot_data.pe,
1269  task_count,
1270  doca_data{.ptr = std::addressof(m_hot_data)},
1271  doca_comch_producer_task_send_cb,
1272  doca_comch_producer_task_send_error_cb);
1273  m_io_requests.reserve(task_count);
1274 
1275  m_consumer = storage::make_comch_consumer(comch_conn,
1276  m_io_message_mmap,
1277  m_hot_data.pe,
1278  task_count + batch_size,
1279  doca_data{.ptr = std::addressof(m_hot_data)},
1280  doca_comch_consumer_task_post_recv_cb,
1281  doca_comch_consumer_task_post_recv_error_cb);
1282  m_io_responses.reserve(task_count + batch_size);
1283 }
1284 
1285 void initiator_comch_worker::prepare_thread_proc(initiator_comch_worker::thread_proc_fn_t fn,
1286 
1287  uint32_t run_limit_op_count,
1288  uint32_t cpu_idx)
1289 {
1290  m_hot_data.run_flag = false;
1291  m_hot_data.error_flag = false;
1292  m_hot_data.pe_hit_count = 0;
1293  m_hot_data.pe_miss_count = 0;
1294  m_hot_data.completed_transaction_count = 0;
1295  m_hot_data.remaining_tx_ops = run_limit_op_count;
1296  m_hot_data.remaining_rx_ops = run_limit_op_count;
1297 
1298  m_thread = std::thread{thread_proc_catch_wrapper{std::addressof(m_hot_data)}, fn};
1299  storage::set_thread_affinity(m_thread, cpu_idx);
1300 }
1301 
1302 void initiator_comch_worker::prepare_tasks(storage::io_message_type op_type, uint32_t remote_consumer_id)
1303 {
1304  doca_error_t ret;
1305  uint8_t *io_addr = m_hot_data.io_region_begin;
1306  uint8_t *msg_addr = m_io_message_region;
1307 
1308  /*
1309  * Over allocate receive tasks so all while waiting for a full batch of tasks to submit there is always
1310  * hot_context.transactions_size active tasks
1311  */
1312  for (uint32_t ii = 0; ii != m_hot_data.transactions_size + m_hot_data.batch_size; ++ii) {
1313  doca_buf *consumer_buf;
1314  doca_comch_consumer_task_post_recv *consumer_task;
1315 
1316  ret = doca_buf_inventory_buf_get_by_addr(m_io_message_inv,
1317  m_io_message_mmap,
1318  msg_addr,
1320  &consumer_buf);
1321  if (ret != DOCA_SUCCESS) {
1322  throw storage::runtime_error{ret, "Unable to get doca_buf for consumer task"};
1323  }
1324 
1325  m_io_message_bufs.push_back(consumer_buf);
1326  msg_addr += storage::size_of_io_message;
1327 
1328  ret = doca_comch_consumer_task_post_recv_alloc_init(m_consumer, consumer_buf, &consumer_task);
1329  if (ret != DOCA_SUCCESS) {
1330  throw storage::runtime_error{ret, "Unable to allocate consumer task"};
1331  }
1332 
1333  m_io_responses.push_back(doca_comch_consumer_task_post_recv_as_task(consumer_task));
1334  }
1335 
1336  for (uint32_t ii = 0; ii != m_hot_data.transactions_size; ++ii) {
1337  doca_buf *producer_buf;
1338  doca_comch_producer_task_send *producer_task;
1339 
1340  ret = doca_buf_inventory_buf_get_by_data(m_io_message_inv,
1341  m_io_message_mmap,
1342  msg_addr,
1344  &producer_buf);
1345  if (ret != DOCA_SUCCESS) {
1346  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
1347  }
1348 
1349  m_io_message_bufs.push_back(producer_buf);
1350  auto *const io_message = reinterpret_cast<char *>(msg_addr);
1351  msg_addr += storage::size_of_io_message;
1352  ;
1353 
1355  producer_buf,
1356  nullptr,
1357  0,
1358  remote_consumer_id,
1359  &producer_task);
1360  if (ret != DOCA_SUCCESS) {
1361  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
1362  }
1363  static_cast<void>(
1365  doca_data{.ptr = std::addressof(m_hot_data.transactions[ii])}));
1366  m_io_requests.push_back(doca_comch_producer_task_send_as_task(producer_task));
1367  m_hot_data.transactions[ii].refcount = 0;
1368  m_hot_data.transactions[ii].request = producer_task;
1369 
1370  storage::io_message_view::set_type(op_type, io_message);
1371  storage::io_message_view::set_user_data(doca_data{.u64 = remote_consumer_id}, io_message);
1373  storage::io_message_view::set_io_address(reinterpret_cast<uint64_t>(io_addr), io_message);
1374  storage::io_message_view::set_io_size(m_hot_data.io_block_size, io_message);
1376 
1377  io_addr += m_hot_data.io_block_size;
1378  }
1379 
1380  for (auto *task : m_io_responses) {
1381  ret = doca_task_submit(task);
1382  if (ret != DOCA_SUCCESS) {
1383  throw storage::runtime_error{ret, "Unable to get doca_buf for producer task"};
1384  }
1385  }
1386 }
1387 
1388 bool initiator_comch_worker::are_contexts_ready(void) const noexcept
1389 {
1390  static_cast<void>(doca_pe_progress(m_hot_data.pe));
1391  auto const consumer_running = storage::is_ctx_running(doca_comch_consumer_as_ctx(m_consumer));
1392  auto const producer_running = storage::is_ctx_running(doca_comch_producer_as_ctx(m_producer));
1393  return consumer_running && producer_running;
1394 }
1395 
1396 void initiator_comch_worker::start_thread_proc(void)
1397 {
1398  m_hot_data.run_flag = true;
1399 }
1400 
1401 bool initiator_comch_worker::is_thread_proc_running(void) const noexcept
1402 {
1403  return m_hot_data.run_flag;
1404 }
1405 
1406 void initiator_comch_worker::join_thread_proc(void)
1407 {
1408  if (m_thread.joinable())
1409  m_thread.join();
1410 }
1411 
1412 initiator_comch_worker::hot_data const &initiator_comch_worker::get_hot_data(void) const noexcept
1413 {
1414  return m_hot_data;
1415 }
1416 
1417 void initiator_comch_worker::destroy_data_path_objects(void)
1418 {
1419  doca_error_t ret;
1420  if (m_consumer != nullptr) {
1421  ret = storage::stop_context(doca_comch_consumer_as_ctx(m_consumer), m_hot_data.pe, m_io_responses);
1422  if (ret == DOCA_SUCCESS) {
1423  m_io_responses.clear();
1424  } else {
1425  DOCA_LOG_ERR("Failed to stop consumer context");
1426  }
1427  ret = doca_comch_consumer_destroy(m_consumer);
1428  if (ret == DOCA_SUCCESS) {
1429  m_consumer = nullptr;
1430  } else {
1431  DOCA_LOG_ERR("Failed to destroy consumer context");
1432  }
1433  }
1434 
1435  if (m_producer != nullptr) {
1436  ret = storage::stop_context(doca_comch_producer_as_ctx(m_producer), m_hot_data.pe, m_io_requests);
1437  if (ret == DOCA_SUCCESS) {
1438  m_io_requests.clear();
1439  } else {
1440  DOCA_LOG_ERR("Failed to stop producer context");
1441  }
1442  ret = doca_comch_producer_destroy(m_producer);
1443  if (ret == DOCA_SUCCESS) {
1444  m_producer = nullptr;
1445  } else {
1446  DOCA_LOG_ERR("Failed to destroy producer context");
1447  }
1448  }
1449 
1450  if (m_hot_data.pe != nullptr) {
1451  ret = doca_pe_destroy(m_hot_data.pe);
1452  if (ret == DOCA_SUCCESS) {
1453  m_hot_data.pe = nullptr;
1454  } else {
1455  DOCA_LOG_ERR("Failed to destroy progress engine");
1456  }
1457  }
1458 }
1459 
1460 void initiator_comch_worker::cleanup(void) noexcept
1461 {
1462  doca_error_t ret;
1463 
1464  if (m_thread.joinable()) {
1465  m_hot_data.run_flag = false;
1466  m_hot_data.error_flag = true;
1467  m_thread.join();
1468  }
1469 
1470  destroy_data_path_objects();
1471 
1472  for (auto *buf : m_io_message_bufs) {
1473  static_cast<void>(doca_buf_dec_refcount(buf, nullptr));
1474  }
1475 
1476  if (m_io_message_inv) {
1477  ret = doca_buf_inventory_stop(m_io_message_inv);
1478  if (ret != DOCA_SUCCESS) {
1479  DOCA_LOG_ERR("Failed to stop buffer inventory");
1480  }
1481  ret = doca_buf_inventory_destroy(m_io_message_inv);
1482  if (ret == DOCA_SUCCESS) {
1483  m_io_message_inv = nullptr;
1484  } else {
1485  DOCA_LOG_ERR("Failed to destroy buffer inventory");
1486  }
1487  }
1488 
1489  if (m_io_message_mmap) {
1490  ret = doca_mmap_stop(m_io_message_mmap);
1491  if (ret != DOCA_SUCCESS) {
1492  DOCA_LOG_ERR("Failed to stop mmap");
1493  }
1494  ret = doca_mmap_destroy(m_io_message_mmap);
1495  if (ret == DOCA_SUCCESS) {
1496  m_io_message_mmap = nullptr;
1497  } else {
1498  DOCA_LOG_ERR("Failed to destroy mmap");
1499  }
1500  }
1501 
1502  if (m_hot_data.transactions != nullptr) {
1503  storage::aligned_free(m_hot_data.transactions);
1504  m_hot_data.transactions = nullptr;
1505  }
1506 
1507  if (m_io_message_region != nullptr) {
1508  storage::aligned_free(m_io_message_region);
1509  m_io_message_region = nullptr;
1510  }
1511 }
1512 
1513 void initiator_comch_worker::doca_comch_consumer_task_post_recv_cb(doca_comch_consumer_task_post_recv *task,
1514  doca_data task_user_data,
1515  doca_data ctx_user_data) noexcept
1516 {
1517  static_cast<void>(task_user_data);
1518 
1519  auto *const hot_data = static_cast<initiator_comch_worker::hot_data *>(ctx_user_data.ptr);
1520  char *io_message;
1522  static_cast<void>(doca_buf_get_data(buf, reinterpret_cast<void **>(&io_message)));
1523  auto const correlation_id = storage::io_message_view::get_correlation_id(io_message);
1524  if (correlation_id > hot_data->transactions_size) {
1525  DOCA_LOG_ERR("Received storage response with invalid async id: %u", correlation_id);
1526  hot_data->run_flag = false;
1527  hot_data->error_flag = true;
1528  return;
1529  }
1530 
1531  if (--(hot_data->transactions[correlation_id].refcount) == 0)
1532  hot_data->on_transaction_complete(hot_data->transactions[correlation_id]);
1533 
1535  auto const ret = hot_data->submit_recv_task(doca_comch_consumer_task_post_recv_as_task(task));
1536  if (ret != DOCA_SUCCESS) {
1537  DOCA_LOG_ERR("Failed to resubmit doca_comch_consumer_task_post_recv: %s", doca_error_get_name(ret));
1538  hot_data->run_flag = false;
1539  hot_data->error_flag = true;
1540  }
1541 }
1542 
1543 void initiator_comch_worker::doca_comch_consumer_task_post_recv_error_cb(doca_comch_consumer_task_post_recv *task,
1544  doca_data task_user_data,
1545  doca_data ctx_user_data) noexcept
1546 {
1547  static_cast<void>(task);
1548  static_cast<void>(task_user_data);
1549 
1550  auto *const hot_data = static_cast<initiator_comch_worker::hot_data *>(ctx_user_data.ptr);
1551 
1552  if (hot_data->run_flag) {
1553  DOCA_LOG_ERR("Failed to complete doca_comch_consumer_task_post_recv");
1554  hot_data->run_flag = false;
1555  hot_data->error_flag = true;
1556  }
1557 }
1558 
1559 void initiator_comch_worker::doca_comch_producer_task_send_cb(doca_comch_producer_task_send *task,
1560  doca_data task_user_data,
1561  doca_data ctx_user_data) noexcept
1562 {
1563  static_cast<void>(task);
1564 
1565  auto &transaction = *static_cast<transaction_context *>(task_user_data.ptr);
1566  if (--(transaction.refcount) == 0)
1567  static_cast<initiator_comch_worker::hot_data *>(ctx_user_data.ptr)->on_transaction_complete(transaction);
1568 }
1569 
1570 void initiator_comch_worker::doca_comch_producer_task_send_error_cb(doca_comch_producer_task_send *task,
1571  doca_data task_user_data,
1572  doca_data ctx_user_data) noexcept
1573 {
1574  static_cast<void>(task);
1575  static_cast<void>(task_user_data);
1576 
1577  auto *const hot_data = static_cast<initiator_comch_worker::hot_data *>(ctx_user_data.ptr);
1578  DOCA_LOG_ERR("Failed to complete doca_comch_producer_task_send");
1579  hot_data->run_flag = false;
1580  hot_data->error_flag = true;
1581 }
1582 
1583 initiator_comch_app::~initiator_comch_app()
1584 {
1585  doca_error_t ret;
1586 
1587  if (m_workers != nullptr) {
1588  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1589  m_workers[ii].~initiator_comch_worker();
1590  }
1591  storage::aligned_free(m_workers);
1592  }
1593 
1594  if (m_io_mmap != nullptr) {
1595  ret = doca_mmap_stop(m_io_mmap);
1596  if (ret != DOCA_SUCCESS) {
1597  DOCA_LOG_ERR("Failed to stop doca_mmap(%p): %s", m_io_mmap, doca_error_get_name(ret));
1598  }
1599 
1600  ret = doca_mmap_destroy(m_io_mmap);
1601  if (ret != DOCA_SUCCESS) {
1602  DOCA_LOG_ERR("Failed to destroy doca_mmap: %s", doca_error_get_name(ret));
1603  }
1604  }
1605 
1606  if (m_io_region != nullptr) {
1607  storage::aligned_free(m_io_region);
1608  }
1609 
1610  m_service_control_channel.reset();
1611 
1612  if (m_dev != nullptr) {
1613  ret = doca_dev_close(m_dev);
1614  if (ret != DOCA_SUCCESS) {}
1615  }
1616 }
1617 
1618 std::vector<uint8_t> try_load_file(std::string const &file_name, bool empty_file_is_disallowed)
1619 {
1620  std::vector<uint8_t> result;
1621  try {
1622  result = storage::load_file_bytes(file_name);
1623  } catch (std::exception const &) {
1624  if (empty_file_is_disallowed)
1625  throw;
1626  }
1627 
1628  return result;
1629 }
1630 
1631 initiator_comch_app::initiator_comch_app(initiator_comch_app_configuration const &cfg)
1632  : m_cfg{cfg},
1633  m_storage_plain_content{
1634  try_load_file(cfg.storage_plain_content_file, cfg.run_type == run_type_read_only_data_validity_test)},
1635  m_dev{nullptr},
1636  m_io_region{nullptr},
1637  m_io_mmap{nullptr},
1638  m_service_control_channel{},
1639  m_ctrl_messages{},
1640  m_remote_consumer_ids{},
1641  m_workers{nullptr},
1642  m_storage_capacity{0},
1643  m_storage_block_size{0},
1644  m_stats{},
1645  m_message_id_counter{},
1646  m_correlation_id_counter{0},
1647  m_abort_flag{false}
1648 {
1649  DOCA_LOG_INFO("Open doca_dev: %s", m_cfg.device_id.c_str());
1650  m_dev = storage::open_device(m_cfg.device_id);
1651  m_service_control_channel =
1653  m_cfg.command_channel_name.c_str(),
1654  this,
1655  new_comch_consumer_callback,
1656  expired_comch_consumer_callback);
1657 }
1658 
1659 void initiator_comch_app::abort(std::string const &reason)
1660 {
1661  if (m_abort_flag)
1662  return;
1663 
1664  DOCA_LOG_ERR("Aborted: %s", reason.c_str());
1665  m_abort_flag = true;
1666 }
1667 
1668 void initiator_comch_app::connect_to_storage_service(void)
1669 {
1670  auto const expiry = std::chrono::steady_clock::now() + m_cfg.control_timeout;
1671  for (;;) {
1672  std::this_thread::sleep_for(std::chrono::milliseconds{100});
1673 
1674  if (m_service_control_channel->is_connected()) {
1675  break;
1676  }
1677 
1678  if (std::chrono::steady_clock::now() > expiry) {
1679  throw storage::runtime_error{
1681  "Timed out trying to connect to storage",
1682 
1683  };
1684  }
1685  }
1686 }
1687 
1688 void initiator_comch_app::query_storage(void)
1689 {
1690  DOCA_LOG_INFO("Query storage...");
1691  auto const correlation_id = storage::control::correlation_id{m_correlation_id_counter++};
1692  auto const message_id = storage::control::message_id{m_message_id_counter++};
1693  m_service_control_channel->send_message(
1694  {storage::control::message_type::query_storage_request, message_id, correlation_id, {}});
1695 
1696  auto const response = wait_for_control_response(storage::control::message_type::query_storage_response,
1697  message_id,
1698  default_control_timeout_seconds);
1699  auto const *storage_details =
1700  dynamic_cast<storage::control::storage_details_payload const *>(response.payload.get());
1701  if (storage_details == nullptr) {
1702  throw storage::runtime_error{DOCA_ERROR_UNEXPECTED, "[BUG] Invalid query_storage_response received"};
1703  }
1704 
1705  m_storage_capacity = storage_details->total_size;
1706  m_storage_block_size = storage_details->block_size;
1707  DOCA_LOG_INFO("Storage reports capacity of: %lu using a block size of: %u",
1708  m_storage_capacity,
1709  m_storage_block_size);
1710 
1711  if (m_cfg.run_type == run_type_read_only_data_validity_test) {
1712  if (m_storage_capacity != m_storage_plain_content.size()) {
1713  throw storage::runtime_error{
1715  "Read only validation test requires that the provided plain data is the same size as the storage capacity"};
1716  }
1717  }
1718 }
1719 
1720 void initiator_comch_app::init_storage(void)
1721 {
1722  DOCA_LOG_INFO("Init storage...");
1723  auto const core_count = static_cast<uint32_t>(m_cfg.core_set.size());
1724  uint8_t const *plain_content = m_storage_plain_content.empty() ? nullptr : m_storage_plain_content.data();
1725  auto const storage_block_count = static_cast<uint32_t>(m_storage_capacity / m_storage_block_size);
1726  auto const per_thread_block_count = storage_block_count / core_count;
1727  auto per_thread_block_count_remainder = storage_block_count % core_count;
1728 
1729  m_remote_consumer_ids.reserve(core_count);
1730  m_io_region =
1731  static_cast<uint8_t *>(storage::aligned_alloc(storage::get_system_page_size(), m_storage_capacity));
1732  m_io_mmap = storage::make_mmap(m_dev,
1733  reinterpret_cast<char *>(m_io_region),
1734  m_storage_capacity,
1737 
1738  auto mmap_details = [this]() {
1739  void const *data;
1740  size_t len;
1741  auto const ret = doca_mmap_export_pci(m_io_mmap, m_dev, &data, &len);
1742  if (ret != DOCA_SUCCESS) {
1743  throw storage::runtime_error{ret, "Failed to export mmap"};
1744  }
1745 
1746  return std::vector<uint8_t>(static_cast<uint8_t const *>(data),
1747  static_cast<uint8_t const *>(data) + len);
1748  }();
1749 
1750  auto const correlation_id = storage::control::correlation_id{m_correlation_id_counter++};
1751  auto const message_id = storage::control::message_id{m_message_id_counter++};
1752 
1753  auto const remote_per_thread_block_count =
1754  std::min(m_cfg.task_count,
1755  per_thread_block_count_remainder == 0 ? per_thread_block_count : per_thread_block_count + 1);
1756  m_service_control_channel->send_message({
1758  message_id,
1759  correlation_id,
1760  std::make_unique<storage::control::init_storage_payload>(remote_per_thread_block_count,
1761  m_cfg.batch_size,
1762  core_count,
1763  std::move(mmap_details)),
1764  });
1765 
1766  DOCA_LOG_INFO("Init storage to use using %u cores with %u tasks each",
1767  core_count,
1768  remote_per_thread_block_count);
1769 
1770  static_cast<void>(wait_for_control_response(storage::control::message_type::init_storage_response,
1771  message_id,
1772  default_control_timeout_seconds));
1773 
1774  m_workers = storage::make_aligned<initiator_comch_worker>{}.object_array(m_cfg.core_set.size());
1775 
1776  auto *this_thread_io_region_begin = m_io_region;
1777  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1778  auto this_thread_block_count = per_thread_block_count;
1779  if (per_thread_block_count_remainder != 0) {
1780  ++this_thread_block_count;
1781  --per_thread_block_count_remainder;
1782  }
1783  auto const this_thread_task_count = std::min(m_cfg.task_count, this_thread_block_count);
1784  auto const this_thread_storage_capacity = this_thread_block_count * m_storage_block_size;
1785 
1786  m_workers[ii].init(m_dev,
1787  m_service_control_channel->get_comch_connection(),
1788  plain_content,
1789  this_thread_task_count,
1790  m_cfg.batch_size,
1791  this_thread_io_region_begin,
1792  this_thread_storage_capacity,
1793  m_storage_block_size);
1794 
1795  this_thread_io_region_begin += this_thread_storage_capacity;
1796  }
1797 
1798  DOCA_LOG_INFO("Wait for remote objects to be finish async init...");
1799  auto const expiry = std::chrono::steady_clock::now() + m_cfg.control_timeout;
1800  for (;;) {
1801  auto const *msg = m_service_control_channel->poll();
1802  if (msg != nullptr) {
1804  "Received unexpected " + to_string(msg->message_type) +
1805  "while initialising"};
1806  }
1807 
1808  if (std::chrono::steady_clock::now() > expiry) {
1810  "Timed out waiting for remote objects to start"};
1811  }
1812 
1813  auto const ready_ctx_count = std::accumulate(m_workers,
1814  m_workers + m_cfg.core_set.size(),
1815  uint32_t{0},
1816  [](uint32_t total, initiator_comch_worker &worker) {
1817  return total + worker.are_contexts_ready();
1818  });
1819 
1820  if (m_remote_consumer_ids.size() == m_cfg.core_set.size() && ready_ctx_count == m_cfg.core_set.size()) {
1821  DOCA_LOG_INFO("Async init complete");
1822  return;
1823  }
1824  }
1825 }
1826 
1827 void initiator_comch_app::prepare_threads(void)
1828 {
1829  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1830  storage::io_message_type initial_op_type;
1831  auto &tctx = m_workers[ii];
1832 
1833  if (m_cfg.run_type == run_type_read_throughout_test) {
1834  initial_op_type = storage::io_message_type::read;
1835  tctx.prepare_thread_proc(throughput_thread_proc,
1836  m_cfg.run_limit_operation_count,
1837  m_cfg.core_set[ii]);
1838  } else if (m_cfg.run_type == run_type_write_throughout_test) {
1839  initial_op_type = storage::io_message_type::write;
1840  tctx.prepare_thread_proc(throughput_thread_proc,
1841  m_cfg.run_limit_operation_count,
1842  m_cfg.core_set[ii]);
1843  } else if (m_cfg.run_type == run_type_read_only_data_validity_test) {
1844  initial_op_type = storage::io_message_type::read;
1845  tctx.prepare_thread_proc(read_only_data_validity_thread_proc,
1846  m_cfg.run_limit_operation_count,
1847  m_cfg.core_set[ii]);
1848  } else if (m_cfg.run_type == run_type_read_write_data_validity_test) {
1849  initial_op_type = storage::io_message_type::write;
1850  tctx.prepare_thread_proc(read_write_data_validity_thread_proc,
1851  m_cfg.run_limit_operation_count,
1852  m_cfg.core_set[ii]);
1853  } else {
1854  throw storage::runtime_error{DOCA_ERROR_NOT_SUPPORTED, "Unhandled run mode: " + m_cfg.run_type};
1855  }
1856 
1857  tctx.prepare_tasks(initial_op_type, m_remote_consumer_ids[ii]);
1858  }
1859 }
1860 
1861 void initiator_comch_app::start_storage(void)
1862 {
1863  DOCA_LOG_INFO("Start storage...");
1864  auto const correlation_id = storage::control::correlation_id{m_correlation_id_counter++};
1865  auto const message_id = storage::control::message_id{m_message_id_counter++};
1866  m_service_control_channel->send_message(
1867  {storage::control::message_type::start_storage_request, message_id, correlation_id, {}});
1868 
1869  static_cast<void>(wait_for_control_response(storage::control::message_type::start_storage_response,
1870  message_id,
1871  default_control_timeout_seconds));
1872 }
1873 
1874 bool initiator_comch_app::run(void)
1875 {
1876  // Start threads
1877  m_stats.start_time = std::chrono::steady_clock::now();
1878  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1879  auto &tctx = m_workers[ii];
1880  tctx.start_thread_proc();
1881  }
1882 
1883  // Run to completion or user abort
1884  for (;;) {
1885  std::this_thread::sleep_for(std::chrono::milliseconds{200});
1886  auto const running_workers = std::accumulate(m_workers,
1887  m_workers + m_cfg.core_set.size(),
1888  uint32_t{0},
1889  [](uint32_t total, auto const &tctx) {
1890  return total + tctx.is_thread_proc_running();
1891  });
1892 
1893  if (running_workers == 0)
1894  break;
1895  }
1896 
1897  // Tally stats
1898  uint64_t latency_acc = 0;
1899  m_stats.end_time = m_stats.start_time;
1900  m_stats.operation_count = 0;
1901  m_stats.latency_max = 0;
1902  m_stats.latency_min = std::numeric_limits<uint32_t>::max();
1903  bool any_error = false;
1904  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1905  auto const &hot_data = m_workers[ii].get_hot_data();
1906  if (hot_data.error_flag)
1907  any_error = true;
1908 
1909  m_stats.end_time = std::max(m_stats.end_time, hot_data.end_time);
1910  m_stats.operation_count += hot_data.completed_transaction_count;
1911  m_stats.latency_min = std::min(m_stats.latency_min, hot_data.latency_min);
1912  m_stats.latency_max = std::max(m_stats.latency_max, hot_data.latency_max);
1913  m_stats.pe_hit_count += hot_data.pe_hit_count;
1914  m_stats.pe_miss_count += hot_data.pe_miss_count;
1915 
1916  latency_acc += hot_data.latency_accumulator;
1917  }
1918 
1919  if (m_stats.operation_count != 0) {
1920  m_stats.latency_mean = latency_acc / m_stats.operation_count;
1921  } else {
1922  m_stats.latency_mean = 0;
1923  }
1924 
1925  return any_error == false;
1926 }
1927 
1928 void initiator_comch_app::join_threads(void)
1929 {
1930  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1931  m_workers[ii].join_thread_proc();
1932  }
1933 }
1934 
1935 void initiator_comch_app::stop_storage(void)
1936 {
1937  DOCA_LOG_INFO("Stop storage...");
1938 
1939  auto const correlation_id = storage::control::correlation_id{m_correlation_id_counter++};
1940  auto const message_id = storage::control::message_id{m_message_id_counter++};
1941  m_service_control_channel->send_message(
1942  {storage::control::message_type::stop_storage_request, message_id, correlation_id, {}});
1943 
1944  static_cast<void>(wait_for_control_response(storage::control::message_type::stop_storage_response,
1945  message_id,
1946  default_control_timeout_seconds));
1947  DOCA_LOG_INFO("Storage stopped");
1948 }
1949 
1950 void initiator_comch_app::display_stats(void) const
1951 {
1952  auto duration_secs_float = std::chrono::duration<double>{m_stats.end_time - m_stats.start_time}.count();
1953  auto const bytes = uint64_t{m_stats.operation_count} * m_storage_block_size;
1954  auto const GiBs = static_cast<double>(bytes) / (1024. * 1024. * 1024.);
1955  auto const miops = (static_cast<double>(m_stats.operation_count) / 1'000'000.) / duration_secs_float;
1956  auto const pe_hit_rate_pct =
1957  (static_cast<double>(m_stats.pe_hit_count) /
1958  (static_cast<double>(m_stats.pe_hit_count) + static_cast<double>(m_stats.pe_miss_count))) *
1959  100.;
1960 
1961  printf("+================================================+\n");
1962  printf("| Stats\n");
1963  printf("+================================================+\n");
1964  printf("| Duration (seconds): %2.06lf\n", duration_secs_float);
1965  printf("| Operation count: %lu\n", m_stats.operation_count);
1966  printf("| Data rate: %.03lf GiB/s\n", GiBs / duration_secs_float);
1967  printf("| IO rate: %.03lf MIOP/s\n", miops);
1968  printf("| PE hit rate: %2.03lf%% (%lu:%lu)\n", pe_hit_rate_pct, m_stats.pe_hit_count, m_stats.pe_miss_count);
1969  printf("| Latency:\n");
1970  printf("| \tMin: %uus\n", m_stats.latency_min);
1971  printf("| \tMax: %uus\n", m_stats.latency_max);
1972  printf("| \tMean: %uus\n", m_stats.latency_mean);
1973  printf("+================================================+\n");
1974 }
1975 
1976 void initiator_comch_app::shutdown(void)
1977 {
1978  DOCA_LOG_INFO("Shutdown storage...");
1979 
1980  // destroy local comch objects
1981  for (uint32_t ii = 0; ii != m_cfg.core_set.size(); ++ii) {
1982  m_workers[ii].destroy_data_path_objects();
1983  }
1984 
1985  // Destroy remote comch objects
1986  auto const correlation_id = storage::control::correlation_id{m_correlation_id_counter++};
1987  auto const message_id = storage::control::message_id{m_message_id_counter++};
1988  m_service_control_channel->send_message(
1989  {storage::control::message_type::shutdown_request, message_id, correlation_id, {}});
1990 
1991  static_cast<void>(wait_for_control_response(storage::control::message_type::shutdown_response,
1992  message_id,
1993  default_control_timeout_seconds));
1994 
1995  while (!m_remote_consumer_ids.empty()) {
1996  auto *msg = m_service_control_channel->poll();
1997  if (msg != nullptr) {
1998  DOCA_LOG_INFO("Ignoring unexpected message: %s", storage::control::to_string(*msg).c_str());
1999  }
2000  }
2001 
2002  DOCA_LOG_INFO("Storage shutdown");
2003 }
2004 
2005 void initiator_comch_app::new_comch_consumer_callback(void *user_data, uint32_t id) noexcept
2006 {
2007  auto *self = reinterpret_cast<initiator_comch_app *>(user_data);
2008  if (self->m_remote_consumer_ids.capacity() == 0) {
2009  DOCA_LOG_ERR("[BUG] no space for new remote consumer ids");
2010  return;
2011  }
2012 
2013  auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids), id);
2014  if (found == std::end(self->m_remote_consumer_ids)) {
2015  self->m_remote_consumer_ids.push_back(id);
2016  DOCA_LOG_DBG("Connected to remote consumer with id: %u. Consumer count is now: %zu",
2017  id,
2018  self->m_remote_consumer_ids.size());
2019  } else {
2020  DOCA_LOG_WARN("Ignoring duplicate remote consumer id: %u", id);
2021  }
2022 }
2023 
2024 void initiator_comch_app::expired_comch_consumer_callback(void *user_data, uint32_t id) noexcept
2025 {
2026  auto *self = reinterpret_cast<initiator_comch_app *>(user_data);
2027  auto found = std::find(std::begin(self->m_remote_consumer_ids), std::end(self->m_remote_consumer_ids), id);
2028  if (found != std::end(self->m_remote_consumer_ids)) {
2029  self->m_remote_consumer_ids.erase(found);
2030  DOCA_LOG_DBG("Disconnected from remote consumer with id: %u. Consumer count is now: %zu",
2031  id,
2032  self->m_remote_consumer_ids.size());
2033  } else {
2034  DOCA_LOG_WARN("Ignoring disconnect of unexpected remote consumer id: %u", id);
2035  }
2036 }
2037 
2038 storage::control::message initiator_comch_app::wait_for_control_response(storage::control::message_type type,
2040  std::chrono::seconds timeout)
2041 {
2042  auto expiry_time = std::chrono::steady_clock::now() + timeout;
2043  do {
2044  // Poll for new messages
2045  auto *msg = m_service_control_channel->poll();
2046  if (msg) {
2047  m_ctrl_messages.push_back(std::move(*msg));
2048  }
2049 
2050  // Check for matching message
2051  auto iter = std::find_if(std::begin(m_ctrl_messages), std::end(m_ctrl_messages), [msg_id](auto &msg) {
2052  return msg.message_id == msg_id;
2053  });
2054 
2055  if (iter == std::end(m_ctrl_messages))
2056  continue;
2057 
2058  // Remove response from the queue
2059  auto response = std::move(*iter);
2060  m_ctrl_messages.erase(iter);
2061 
2062  // Handle remote failure
2063  if (response.message_type == storage::control::message_type::error_response) {
2064  auto const *const error_details =
2065  reinterpret_cast<storage::control::error_response_payload const *>(
2066  response.payload.get());
2067  throw storage::runtime_error{error_details->error_code,
2068  to_string(type) + " id: " + std::to_string(msg_id.value) +
2069  " failed!: " + error_details->message};
2070  }
2071 
2072  // Handle unexpected response
2073  if (response.message_type != type) {
2075  "Received unexpected " + to_string(response.message_type) +
2076  " While waiting for " + to_string(type) +
2077  " id: " + std::to_string(msg_id.value)};
2078  }
2079 
2080  // Good response, let caller handle it
2081  return response;
2082 
2083  } while (std::chrono::steady_clock::now() < expiry_time);
2084 
2086  "Timed out waiting on: " + to_string(type) +
2087  " id: " + std::to_string(msg_id.value)};
2088 }
2089 
2090 } /* namespace */
int32_t result
static void cleanup(struct cache_invalidate_sample_state *state)
static doca_error_t run(struct cache_invalidate_sample_state *state)
static uint32_t get_correlation_id(char const *buf)
Definition: io_message.hpp:114
static void set_user_data(doca_data user_data, char *buf)
Definition: io_message.hpp:103
static void set_correlation_id(uint32_t correlation_id, char *buf)
Definition: io_message.hpp:127
static void set_type(io_message_type type, char *buf)
Definition: io_message.hpp:79
static void set_io_address(uint64_t io_address, char *buf)
Definition: io_message.hpp:176
static void set_io_size(uint32_t io_size, char *buf)
Definition: io_message.hpp:200
static void set_remote_offset(uint32_t remote_offset, char *buf)
Definition: io_message.hpp:224
T * object_array(size_t object_count, Args &&...args) const
Definition: aligned_new.hpp:88
doca_error_t get_doca_error() const noexcept
Definition: definitions.hpp:81
uint64_t len
static struct doca_pe * pe
DOCA_EXPERIMENTAL doca_error_t doca_argp_start(int argc, char **argv)
Parse incoming arguments (cmd line/json).
DOCA_EXPERIMENTAL doca_error_t doca_argp_init(const char *program_name, void *program_config)
Initialize the parser interface.
DOCA_EXPERIMENTAL doca_error_t doca_argp_destroy(void)
ARG Parser destroy.
@ DOCA_ARGP_TYPE_STRING
Definition: doca_argp.h:56
@ DOCA_ARGP_TYPE_INT
Definition: doca_argp.h:57
DOCA_STABLE doca_error_t doca_buf_inventory_destroy(struct doca_buf_inventory *inventory)
Destroy buffer inventory structure.
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
static doca_error_t doca_buf_inventory_buf_get_by_data(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *data, size_t data_len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by data & data_len a...
DOCA_STABLE doca_error_t doca_buf_inventory_start(struct doca_buf_inventory *inventory)
Start element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_inventory_create(size_t num_elements, struct doca_buf_inventory **inventory)
Allocates buffer inventory with default/unset attributes.
DOCA_STABLE doca_error_t doca_buf_inventory_stop(struct doca_buf_inventory *inventory)
Stop element retrieval from inventory.
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_get_data(const struct doca_buf *buf, void **data)
Get the buffer's data.
DOCA_STABLE doca_error_t doca_buf_reset_data_len(struct doca_buf *buf)
DOCA_STABLE struct doca_buf * doca_comch_consumer_task_post_recv_get_buf(const struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE doca_error_t doca_comch_consumer_destroy(struct doca_comch_consumer *consumer)
DOCA_STABLE doca_error_t doca_comch_consumer_task_post_recv_alloc_init(struct doca_comch_consumer *consumer, struct doca_buf *buf, struct doca_comch_consumer_task_post_recv **task)
Allocate and initialize a doca_consumer post receive task.
DOCA_STABLE struct doca_ctx * doca_comch_consumer_as_ctx(struct doca_comch_consumer *consumer)
DOCA_STABLE struct doca_task * doca_comch_consumer_task_post_recv_as_task(struct doca_comch_consumer_task_post_recv *task)
DOCA_STABLE const struct doca_buf * doca_comch_producer_task_send_get_buf(const struct doca_comch_producer_task_send *task)
DOCA_STABLE struct doca_task * doca_comch_producer_task_send_as_task(struct doca_comch_producer_task_send *task)
DOCA_STABLE doca_error_t doca_comch_producer_destroy(struct doca_comch_producer *producer)
DOCA_STABLE struct doca_ctx * doca_comch_producer_as_ctx(struct doca_comch_producer *producer)
DOCA_STABLE doca_error_t doca_comch_producer_task_send_alloc_init(struct doca_comch_producer *producer, const struct doca_buf *buf, uint8_t *imm_data, uint32_t imm_data_len, uint32_t consumer_id, struct doca_comch_producer_task_send **task)
DOCA_STABLE doca_error_t doca_dev_close(struct doca_dev *dev)
Destroy allocated local device instance.
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_TIME_OUT
Definition: doca_error.h:47
@ DOCA_ERROR_INVALID_VALUE
Definition: doca_error.h:44
@ DOCA_ERROR_UNEXPECTED
Definition: doca_error.h:60
@ DOCA_ERROR_BAD_STATE
Definition: doca_error.h:56
@ DOCA_ERROR_NOT_SUPPORTED
Definition: doca_error.h:42
@ DOCA_ERROR_AGAIN
Definition: doca_error.h:43
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_NO_MEMORY
Definition: doca_error.h:45
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
Definition: doca_log.h:476
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
DOCA_STABLE doca_error_t doca_mmap_destroy(struct doca_mmap *mmap)
Destroy DOCA Memory Map structure.
DOCA_STABLE doca_error_t doca_mmap_export_pci(struct doca_mmap *mmap, const struct doca_dev *dev, const void **export_desc, size_t *export_desc_len)
Compose memory map representation for later import with doca_mmap_create_from_export() for one of the...
DOCA_STABLE doca_error_t doca_mmap_stop(struct doca_mmap *mmap)
Stop DOCA Memory Map.
doca_task_submit_flag
Flags used when submitting a doca_task.
Definition: doca_pe.h:74
DOCA_EXPERIMENTAL doca_error_t doca_task_submit_ex(struct doca_task *task, uint32_t flags)
Extended version of doca_task_submit.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE void doca_task_set_user_data(struct doca_task *task, union doca_data user_data)
Set user data to a task.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
@ DOCA_TASK_SUBMIT_FLAG_NONE
Definition: doca_pe.h:75
@ DOCA_TASK_SUBMIT_FLAG_FLUSH
Definition: doca_pe.h:76
@ DOCA_ACCESS_FLAG_LOCAL_READ_WRITE
Definition: doca_types.h:83
@ DOCA_ACCESS_FLAG_RDMA_READ
Definition: doca_types.h:84
@ DOCA_ACCESS_FLAG_PCI_READ_WRITE
Definition: doca_types.h:91
@ DOCA_ACCESS_FLAG_RDMA_WRITE
Definition: doca_types.h:85
static const char * doca_version(void)
Function returning DOCA's (SDK) exact version string.
Definition: doca_version.h:90
int main(int argc, char **argv)
DOCA_LOG_REGISTER(INITIATOR_COMCH)
const struct ip_frag_config * cfg
Definition: ip_frag_dp.c:0
type value
std::unique_ptr< storage::control::comch_channel > make_comch_client_control_channel(doca_dev *dev, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::string to_string(storage::control::message_type type)
void set_thread_affinity(std::thread &thread, uint32_t cpu_core_idx)
Definition: os_utils.cpp:53
void uninstall_ctrl_c_handler()
Definition: os_utils.cpp:121
doca_mmap * make_mmap(doca_dev *dev, char *memory_region, size_t memory_region_size, uint32_t permissions)
Definition: doca_utils.cpp:146
static constexpr value_requirement optional_value
Definition: doca_utils.hpp:228
void create_doca_logger_backend(void) noexcept
Definition: doca_utils.cpp:471
void install_ctrl_c_handler(std::function< void(void)> callback)
Definition: os_utils.cpp:106
void aligned_free(void *memory)
Definition: os_utils.cpp:131
doca_comch_producer * make_comch_producer(doca_comch_connection *conn, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_producer_task_send_completion_cb_t task_cb, doca_comch_producer_task_send_completion_cb_t error_cb)
Definition: doca_utils.cpp:279
static constexpr value_multiplicity multiple_values
Definition: doca_utils.hpp:231
std::vector< uint8_t > load_file_bytes(std::string const &file_name)
Definition: file_utils.cpp:58
void * aligned_alloc(size_t alignment, size_t size)
Definition: os_utils.cpp:126
void register_cli_argument(doca_argp_type type, char const *short_name, char const *long_name, char const *description, value_requirement requirement, value_multiplicity multiplicity, doca_argp_param_cb_t callback)
Definition: doca_utils.cpp:413
size_t aligned_size(size_t alignment, size_t size)
static constexpr value_multiplicity single_value
Definition: doca_utils.hpp:230
doca_comch_consumer * make_comch_consumer(doca_comch_connection *conn, doca_mmap *mmap, doca_pe *pe, uint32_t task_pool_size, doca_data callback_user_data, doca_comch_consumer_task_post_recv_completion_cb_t task_cb, doca_comch_consumer_task_post_recv_completion_cb_t error_cb)
Definition: doca_utils.cpp:224
constexpr size_t size_of_io_message
Definition: io_message.hpp:53
constexpr uint32_t cache_line_size
Definition: definitions.hpp:40
bool is_ctx_running(doca_ctx *ctx) noexcept
Definition: doca_utils.cpp:362
static constexpr value_requirement required_value
Definition: doca_utils.hpp:227
uint32_t get_system_page_size(void)
Definition: os_utils.cpp:95
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
Definition: doca_utils.cpp:369
doca_dev * open_device(std::string const &identifier)
Definition: doca_utils.cpp:43
uint8_t type
Definition: packets.h:0
#define false
Definition: stdbool.h:22
Convenience type for representing opaque data.
Definition: doca_types.h:56
uint64_t u64
Definition: doca_types.h:58
void * ptr
Definition: doca_types.h:57