NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
control_channel.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2025 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
27 
28 #include <chrono>
29 #include <thread>
30 
31 #include <doca_comch.h>
32 #include <doca_log.h>
33 #include <doca_pe.h>
34 
39 
40 DOCA_LOG_REGISTER(CONTROL_CHANNEL);
41 
42 using namespace std::string_literals;
43 
44 namespace storage::control {
45 
46 namespace {
47 
48 template <typename InterfaceT>
49 class basic_control_channel : public InterfaceT {
50 public:
51  basic_control_channel() : m_rx_buffer{}, m_rx_message{}
52  {
53  m_rx_buffer.reserve(1024 * 2);
54  }
55  basic_control_channel(basic_control_channel const &) = delete;
56  basic_control_channel(basic_control_channel &&) noexcept = delete;
57  basic_control_channel &operator=(basic_control_channel const &) = delete;
58  basic_control_channel &operator=(basic_control_channel &&) noexcept = delete;
59 
60  void set_error(std::string msg)
61  {
62  m_error_message = std::move(msg);
63  }
64 
65  void encode_message_to_tx_buffer(control::message const &message)
66  {
68 
69  m_tx_buffer.resize(wire_size(header) + header.wire_size);
70 
71  static_cast<void>(encode(encode(m_tx_buffer.data(), header), message));
72  }
73 
74  void append_rx_bytes(char const *bytes, size_t byte_count)
75  {
76  std::copy(bytes, bytes + byte_count, std::back_inserter(m_rx_buffer));
77  }
78 
79  bool extract_message_from_rx_buffer()
80  {
81  if (m_rx_buffer.size() < sizeof(control::message_header))
82  return false;
83 
84  control::message_header header{};
85 
86  auto *read_ptr = decode(m_rx_buffer.data(), header);
87 
88  if (m_rx_buffer.size() < (sizeof(control::message_header) + header.wire_size))
89  return false;
90 
91  static_cast<void>(decode(read_ptr, m_rx_message));
92 
93  m_rx_buffer.erase(m_rx_buffer.begin(),
94  m_rx_buffer.begin() + sizeof(control::message_header) + header.wire_size);
95  return true;
96  }
97 
98 protected:
99  std::vector<char> m_tx_buffer;
100  std::vector<char> m_rx_buffer;
101  message m_rx_message;
102  std::string m_error_message;
103 };
104 
105 class tcp_control_channel : public basic_control_channel<storage::control::channel> {
106 public:
107 protected:
108  void write_to_tcp_socket(storage::tcp_socket &socket, control::message const &msg)
109  {
110  encode_message_to_tx_buffer(msg);
111  if (socket.write(m_tx_buffer.data(), m_tx_buffer.size()) != m_tx_buffer.size()) {
112  throw storage::runtime_error{DOCA_ERROR_IO_FAILED, "Failed to send control message"};
113  }
114  }
115 
116  storage::control::message *poll_tcp_socket(storage::tcp_socket &socket)
117  {
118  std::array<char, 256> tmp_buff{};
119 
120  uint32_t const nb_read = socket.read(tmp_buff.data(), tmp_buff.size());
121  if (nb_read != 0) {
122  append_rx_bytes(tmp_buff.data(), nb_read);
123  }
124 
125  if (extract_message_from_rx_buffer()) {
126  return std::addressof(m_rx_message);
127  }
128 
129  return nullptr;
130  }
131 };
132 
133 class comch_control_channel : public basic_control_channel<storage::control::comch_channel> {
134 public:
135  ~comch_control_channel() override
136  {
137  if (m_pe != nullptr) {
138  auto const ret = doca_pe_destroy(m_pe);
139  if (ret != DOCA_SUCCESS) {
140  DOCA_LOG_ERR("Failed to destroy doca_pe: %s", doca_error_get_name(ret));
141  }
142  }
143  }
144 
145  comch_control_channel()
146  : basic_control_channel<storage::control::comch_channel>{},
147  m_pe{nullptr},
148  m_connection{nullptr},
149  m_consumer_cb_data{.ptr = nullptr},
150  m_connection_cb{},
151  m_expiry_cb{}
152  {
153  }
154  comch_control_channel(comch_control_channel const &) = delete;
155  comch_control_channel(comch_control_channel &&) noexcept = delete;
156  comch_control_channel &operator=(comch_control_channel const &) = delete;
157  comch_control_channel &operator=(comch_control_channel &&) noexcept = delete;
158 
159  storage::control::message *poll() override
160  {
161  static_cast<void>(doca_pe_progress(m_pe));
162  if (extract_message_from_rx_buffer()) {
163  DOCA_LOG_DBG("%s", to_string(m_rx_message).c_str());
164  return std::addressof(m_rx_message);
165  }
166 
167  return nullptr;
168  }
169 
170  doca_comch_connection *get_comch_connection() const noexcept override
171  {
172  return m_connection;
173  }
174 
175 protected:
176  doca_pe *m_pe;
177  doca_comch_connection *m_connection;
179  comch_channel::consumer_event_callback m_connection_cb;
180  comch_channel::consumer_event_callback m_expiry_cb;
181 
182  comch_control_channel *get_self()
183  {
184  return this;
185  }
186 
187  void create_pe()
188  {
189  auto const ret = doca_pe_create(&m_pe);
190  if (ret != DOCA_SUCCESS) {
191  throw storage::runtime_error{ret, "Failed to create doca_pe"};
192  }
193  }
194 
195  void set_consumer_callbacks(void *callback_user_data,
196  comch_channel::consumer_event_callback new_consumer_event_cb,
197  comch_channel::consumer_event_callback expired_consumer_event_cb)
198  {
199  m_consumer_cb_data.ptr = callback_user_data;
200  m_connection_cb = std::move(new_consumer_event_cb);
201  m_expiry_cb = std::move(expired_consumer_event_cb);
202  }
203 
204  static void new_consumer_event_cb(doca_comch_event_consumer *event,
205  doca_comch_connection *conn,
206  uint32_t id) noexcept
207  {
208  static_cast<void>(event);
209 
210  auto *self = static_cast<comch_control_channel *>(doca_comch_connection_get_user_data(conn).ptr);
211  self->m_connection_cb(self->m_consumer_cb_data.ptr, id);
212  }
213 
214  static void expired_consumer_event_cb(doca_comch_event_consumer *event,
215  doca_comch_connection *conn,
216  uint32_t id) noexcept
217  {
218  static_cast<void>(event);
219 
220  auto *self = static_cast<comch_control_channel *>(doca_comch_connection_get_user_data(conn).ptr);
221  self->m_expiry_cb(self->m_consumer_cb_data.ptr, id);
222  }
223 
224  /*
225  * ComCh control task send callback
226  *
227  * @task [in]: Completed task
228  * @task_user_data [in]: Data associated with the task
229  * @ctx_user_data [in]: Data associated with the context
230  */
231  static void task_send_cb(doca_comch_task_send *task, doca_data task_user_data, doca_data ctx_user_data) noexcept
232  {
233  static_cast<void>(task_user_data);
234  static_cast<void>(ctx_user_data);
235 
237  }
238 
239  /*
240  * ComCh control task send error callback
241  *
242  * @task [in]: Failed task
243  * @task_user_data [in]: Data associated with the task
244  * @ctx_user_data [in]: Data associated with the context
245  */
246  static void task_send_error_cb(doca_comch_task_send *task,
247  doca_data task_user_data,
248  doca_data ctx_user_data) noexcept
249  {
250  static_cast<void>(task_user_data);
251  static_cast<comch_control_channel *>(ctx_user_data.ptr)
252  ->set_error("Failed to complete doca_comch_task_send");
253 
255  }
256 
257  /*
258  * ComCh control message received callback
259  *
260  * @task [in]: Completed task
261  * @task_user_data [in]: Data associated with the task
262  * @ctx_user_data [in]: Data associated with the context
263  */
264  static void event_msg_recv_cb(doca_comch_event_msg_recv *event,
265  uint8_t *recv_buffer,
266  uint32_t msg_len,
267  doca_comch_connection *comch_connection) noexcept
268  {
269  static_cast<void>(event);
270  auto *const self =
271  static_cast<comch_control_channel *>(doca_comch_connection_get_user_data(comch_connection).ptr);
272 
273  self->append_rx_bytes(reinterpret_cast<char const *>(recv_buffer), msg_len);
274  }
275 };
276 
277 class comch_client_control_channel : public comch_control_channel {
278 public:
279  ~comch_client_control_channel() override
280  {
281  cleanup();
282  }
283  comch_client_control_channel() = delete;
284  comch_client_control_channel(doca_dev *dev,
285  char const *channel_name,
286  void *callback_user_data,
287  comch_channel::consumer_event_callback new_consumer_event_cb,
288  comch_channel::consumer_event_callback expired_consumer_event_cb)
289  : comch_control_channel{},
290  m_comch_client{nullptr}
291  {
292  try {
293  create_pe();
294  set_consumer_callbacks(callback_user_data,
295  std::move(new_consumer_event_cb),
296  std::move(expired_consumer_event_cb));
297  init(dev, channel_name);
298  } catch (storage::runtime_error const &ex) {
299  cleanup();
300  throw;
301  }
302  }
303  comch_client_control_channel(comch_client_control_channel const &) = delete;
304  comch_client_control_channel(comch_client_control_channel &&) noexcept = delete;
305  comch_client_control_channel &operator=(comch_client_control_channel const &) = delete;
306  comch_client_control_channel &operator=(comch_client_control_channel &&) noexcept = delete;
307 
308  bool is_connected() override
309  {
310  static_cast<void>(doca_pe_progress(m_pe));
311 
312  if (!m_error_message.empty()) {
314  "Failed to connect to doca_comch client"};
315  }
316 
317  doca_ctx_states cur_state;
318  static_cast<void>(doca_ctx_get_state(doca_comch_client_as_ctx(m_comch_client), &cur_state));
319  if (cur_state == DOCA_CTX_STATE_RUNNING) {
320  auto const ret = doca_comch_client_get_connection(m_comch_client, &m_connection);
321  if (ret != DOCA_SUCCESS) {
322  throw std::runtime_error{"Failed to get comch client connection: "s +
323  doca_error_get_name(ret)};
324  }
325 
326  static_cast<void>(
328 
329  DOCA_LOG_DBG("Connected to comch server");
330  return true;
331  }
332 
333  return false;
334  }
335 
336  void send_message(message const &msg) override
337  {
338  doca_error_t ret;
339  doca_comch_task_send *task;
340 
341  DOCA_LOG_DBG("%s", to_string(msg).c_str());
342  encode_message_to_tx_buffer(msg);
343 
344  ret = doca_comch_client_task_send_alloc_init(m_comch_client,
345  m_connection,
346  m_tx_buffer.data(),
347  m_tx_buffer.size(),
348  &task);
349  if (ret != DOCA_SUCCESS) {
350  throw storage::runtime_error{ret, "Failed to allocate comch task"};
351  }
352 
354  if (ret != DOCA_SUCCESS) {
356  throw storage::runtime_error{ret, "Failed to send control message"};
357  }
358  }
359 
360 private:
361  doca_comch_client *m_comch_client;
362 
363  void init(doca_dev *dev, char const *channel_name)
364  {
365  doca_error_t ret;
366 
367  ret = doca_comch_client_create(dev, channel_name, &m_comch_client);
368  if (ret != DOCA_SUCCESS) {
369  throw storage::runtime_error{ret, "Failed to create doca_comch_client"};
370  }
371 
372  ret = doca_comch_client_task_send_set_conf(m_comch_client,
373  comch_control_channel::task_send_cb,
374  comch_control_channel::task_send_error_cb,
376  if (ret != DOCA_SUCCESS) {
377  throw storage::runtime_error{ret, "Failed to configure doca_comch_client send task pool"};
378  }
379 
380  ret = doca_comch_client_event_msg_recv_register(m_comch_client,
381  comch_control_channel::event_msg_recv_cb);
382  if (ret != DOCA_SUCCESS) {
383  throw storage::runtime_error{ret,
384  "Failed to configure doca_comch_client receive task callback"};
385  }
386 
387  ret = doca_comch_client_event_consumer_register(m_comch_client,
388  comch_control_channel::new_consumer_event_cb,
389  comch_control_channel::expired_consumer_event_cb);
390  if (ret != DOCA_SUCCESS) {
392  ret,
393  "Failed to register for doca_comch_client consumer registration events"};
394  }
395 
396  ret = doca_ctx_set_user_data(doca_comch_client_as_ctx(m_comch_client), doca_data{.ptr = get_self()});
397  if (ret != DOCA_SUCCESS) {
398  throw storage::runtime_error{ret, "Failed to set doca_comch_client user data"};
399  }
400 
401  auto *comch_ctx = doca_comch_client_as_ctx(m_comch_client);
402 
403  ret = doca_pe_connect_ctx(m_pe, comch_ctx);
404  if (ret != DOCA_SUCCESS) {
405  throw storage::runtime_error{ret, "Failed to connect doca_comch_client with doca_pe"};
406  }
407 
408  ret = doca_ctx_start(doca_comch_client_as_ctx(m_comch_client));
409  if (ret != DOCA_ERROR_IN_PROGRESS && ret != DOCA_SUCCESS) {
410  throw storage::runtime_error{ret, "Failed to start doca_comch_client"};
411  }
412  }
413 
414  void cleanup() noexcept
415  {
416  doca_error_t ret;
417 
418  if (m_comch_client) {
419  ret = storage::stop_context(doca_comch_client_as_ctx(m_comch_client), m_pe);
420  if (ret != DOCA_SUCCESS) {
421  DOCA_LOG_ERR("Failed to stop doca_comch_client: %s", doca_error_get_name(ret));
422  }
423 
424  DOCA_LOG_DBG("Destroy doca_comch_client(%p)", m_comch_client);
425  ret = doca_comch_client_destroy(m_comch_client);
426  if (ret != DOCA_SUCCESS) {
427  DOCA_LOG_ERR("Failed to destroy doca_comch_client: %s", doca_error_get_name(ret));
428  }
429  }
430  }
431 };
432 
433 class comch_server_control_channel : public comch_control_channel {
434 public:
435  ~comch_server_control_channel() override
436  {
437  cleanup();
438  }
439  comch_server_control_channel() = delete;
440  comch_server_control_channel(doca_dev *dev,
441  doca_dev_rep *dev_rep,
442  char const *channel_name,
443  void *callback_user_data,
444  comch_channel::consumer_event_callback new_consumer_event_cb,
445  comch_channel::consumer_event_callback expired_consumer_event_cb)
446  : comch_control_channel{},
447  m_comch_server{nullptr}
448  {
449  try {
450  create_pe();
451  set_consumer_callbacks(callback_user_data,
452  std::move(new_consumer_event_cb),
453  std::move(expired_consumer_event_cb));
454  init(dev, dev_rep, channel_name);
455  } catch (storage::runtime_error const &ex) {
456  cleanup();
457  throw;
458  }
459  }
460 
461  comch_server_control_channel(comch_server_control_channel const &) = delete;
462  comch_server_control_channel(comch_server_control_channel &&) noexcept = delete;
463  comch_server_control_channel &operator=(comch_server_control_channel const &) = delete;
464  comch_server_control_channel &operator=(comch_server_control_channel &&) noexcept = delete;
465 
466  bool is_connected() override
467  {
468  static_cast<void>(doca_pe_progress(m_pe));
469 
470  if (!m_error_message.empty()) {
472  "Failed to connect to doca_comch client"};
473  }
474 
475  return m_connection != nullptr;
476  }
477 
478  void send_message(message const &msg) override
479  {
480  doca_error_t ret;
481  doca_comch_task_send *task;
482 
483  DOCA_LOG_DBG("%s", to_string(msg).c_str());
484  encode_message_to_tx_buffer(msg);
485 
486  ret = doca_comch_server_task_send_alloc_init(m_comch_server,
487  m_connection,
488  m_tx_buffer.data(),
489  m_tx_buffer.size(),
490  &task);
491  if (ret != DOCA_SUCCESS) {
492  throw storage::runtime_error{ret, "Failed to allocate comch task"};
493  }
494 
496  if (ret != DOCA_SUCCESS) {
498  throw storage::runtime_error{ret, "Failed to send control message"};
499  }
500  }
501 
502 private:
503  doca_comch_server *m_comch_server;
504 
505  void init(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name)
506  {
507  doca_error_t ret;
508 
509  create_pe();
510 
511  ret = doca_comch_server_create(dev, dev_rep, channel_name, &m_comch_server);
512  if (ret != DOCA_SUCCESS) {
513  throw storage::runtime_error{ret, "Failed to create doca_comch_server"};
514  }
515 
516  auto *comch_ctx = doca_comch_server_as_ctx(m_comch_server);
517 
518  ret = doca_pe_connect_ctx(m_pe, comch_ctx);
519  if (ret != DOCA_SUCCESS) {
520  throw storage::runtime_error{ret, "Failed to connect doca_comch_server with doca_pe"};
521  }
522 
523  ret = doca_comch_server_task_send_set_conf(m_comch_server,
524  comch_control_channel::task_send_cb,
525  comch_control_channel::task_send_error_cb,
527  if (ret != DOCA_SUCCESS) {
528  throw storage::runtime_error{ret, "Failed to configure doca_comch_server send task pool"};
529  }
530 
531  ret = doca_comch_server_event_msg_recv_register(m_comch_server,
532  comch_control_channel::event_msg_recv_cb);
533  if (ret != DOCA_SUCCESS) {
534  throw storage::runtime_error{ret,
535  "Failed to configure doca_comch_server receive task callback"};
536  }
537 
539  event_connection_connected_cb,
540  event_connection_disconnected_cb);
541  if (ret != DOCA_SUCCESS) {
542  throw storage::runtime_error{ret, "Failed to configure doca_comch_server connection callbacks"};
543  }
544 
545  ret = doca_comch_server_event_consumer_register(m_comch_server,
546  comch_control_channel::new_consumer_event_cb,
547  comch_control_channel::expired_consumer_event_cb);
548  if (ret != DOCA_SUCCESS) {
550  ret,
551  "Failed to register for doca_comch_server consumer registration events"};
552  }
553 
554  ret = doca_ctx_set_user_data(comch_ctx, doca_data{.ptr = get_self()});
555  if (ret != DOCA_SUCCESS) {
556  throw storage::runtime_error{ret, "Failed to set doca_comch_server user data"};
557  }
558 
559  ret = doca_ctx_start(comch_ctx);
560  if (ret != DOCA_ERROR_IN_PROGRESS && ret != DOCA_SUCCESS) {
561  throw storage::runtime_error{ret,
562  "[application::application] Failed to start doca_comch_server"};
563  }
564  }
565 
566  void cleanup() noexcept
567  {
568  doca_error_t ret;
569 
570  if (m_comch_server != nullptr) {
571  ret = storage::stop_context(doca_comch_server_as_ctx(m_comch_server), m_pe);
572  if (ret != DOCA_SUCCESS) {
573  DOCA_LOG_ERR("Failed to stop doca_comch_server: %s", doca_error_get_name(ret));
574  }
575 
576  ret = doca_comch_server_destroy(m_comch_server);
577  if (ret != DOCA_SUCCESS) {
578  DOCA_LOG_ERR("Failed to destroy doca_comch_server: %s", doca_error_get_name(ret));
579  }
580  }
581  }
582 
583  static void event_connection_connected_cb(doca_comch_event_connection_status_changed *event,
584  doca_comch_connection *conn,
585  uint8_t change_successful) noexcept
586  {
587  static_cast<void>(event);
588  DOCA_LOG_DBG("Connection %p %s", conn, (change_successful ? "connected" : "refused"));
589 
590  if (change_successful == 0) {
591  DOCA_LOG_ERR("Failed to accept new client connection");
592  return;
593  }
594 
595  doca_data user_data{.ptr = nullptr};
596  auto const ret =
598  &user_data);
599  if (ret != DOCA_SUCCESS || user_data.ptr == nullptr) {
600  DOCA_LOG_ERR("[BUG] unable to extract user data");
601  return;
602  }
603 
604  auto *self = static_cast<comch_server_control_channel *>(user_data.ptr);
605  static_cast<void>(doca_comch_connection_set_user_data(conn, doca_data{.ptr = self}));
606  self->m_connection = conn;
607  }
608 
609  static void event_connection_disconnected_cb(doca_comch_event_connection_status_changed *event,
610  doca_comch_connection *conn,
611  uint8_t change_successful) noexcept
612  {
613  static_cast<void>(event);
614  static_cast<void>(change_successful);
615 
616  doca_data user_data{.ptr = nullptr};
617  auto const ret =
619  &user_data);
620  if (ret != DOCA_SUCCESS || user_data.ptr == nullptr) {
621  DOCA_LOG_ERR("[BUG] unable to extract user data");
622  return;
623  }
624 
625  auto *self = static_cast<comch_server_control_channel *>(user_data.ptr);
626  if (self->m_connection != conn) {
627  DOCA_LOG_WARN("Ignoring disconnect of non-connected connection");
628  return;
629  }
630 
631  self->m_connection = nullptr;
632  }
633 };
634 
635 class tcp_client_control_channel : public tcp_control_channel {
636 public:
637  ~tcp_client_control_channel() override
638  {
639  try {
640  m_socket.close();
641  } catch (storage::runtime_error const &ex) {
642  DOCA_LOG_ERR("Failed to close socket: %s", ex.what());
643  }
644  }
645 
646  tcp_client_control_channel() = delete;
647  explicit tcp_client_control_channel(storage::ip_address const &server_address)
648  : tcp_control_channel{},
649  m_server_address{server_address},
650  m_socket{}
651  {
652  m_socket.connect(m_server_address);
653  }
654  tcp_client_control_channel(tcp_client_control_channel const &) = delete;
655  tcp_client_control_channel(tcp_client_control_channel &&) noexcept = delete;
656  tcp_client_control_channel &operator=(tcp_client_control_channel const &) = delete;
657  tcp_client_control_channel &operator=(tcp_client_control_channel &&) noexcept = delete;
658 
659  bool is_connected() override
660  {
661  std::string const remote_display_string =
662  m_server_address.get_address() + ":" + std::to_string(m_server_address.get_port());
663 
664  switch (m_socket.poll_is_connected()) {
666  DOCA_LOG_INFO("Connected to %s", remote_display_string.c_str());
667  return true;
668  }
670  } break;
672  m_socket = storage::tcp_socket{}; /* reset the socket */
673  m_socket.connect(m_server_address);
674  } break;
677  "Unable to connect to " + remote_display_string};
678  }
679  }
680 
681  return false;
682  }
683 
684  void send_message(message const &msg) override
685  {
686  DOCA_LOG_DBG("%s", to_string(msg).c_str());
687  write_to_tcp_socket(m_socket, msg);
688  }
689 
690  storage::control::message *poll() override
691  {
692  auto *msg = poll_tcp_socket(m_socket);
693  if (msg) {
694  DOCA_LOG_DBG("%s", to_string(*msg).c_str());
695  }
696 
697  return msg;
698  }
699 
700 private:
701  storage::ip_address m_server_address;
702  storage::tcp_socket m_socket;
703 };
704 
705 class tcp_server_control_channel : public tcp_control_channel {
706 public:
707  ~tcp_server_control_channel() override
708  {
709  try {
710  m_client_socket.close();
711  } catch (storage::runtime_error const &ex) {
712  DOCA_LOG_ERR("Failed to close socket: %s", ex.what());
713  }
714  try {
715  m_listen_socket.close();
716  } catch (storage::runtime_error const &ex) {
717  DOCA_LOG_ERR("Failed to close socket: %s", ex.what());
718  }
719  }
720  tcp_server_control_channel() = delete;
721  explicit tcp_server_control_channel(uint16_t listen_port)
722  : tcp_control_channel{},
723  m_listen_socket{},
724  m_client_socket{}
725  {
726  m_listen_socket.listen(listen_port);
727  }
728  tcp_server_control_channel(tcp_server_control_channel const &) = delete;
729  tcp_server_control_channel(tcp_server_control_channel &&) noexcept = delete;
730  tcp_server_control_channel &operator=(tcp_server_control_channel const &) = delete;
731  tcp_server_control_channel &operator=(tcp_server_control_channel &&) noexcept = delete;
732 
733  bool is_connected() override
734  {
735  m_client_socket = m_listen_socket.accept();
736  return m_client_socket.is_valid();
737  }
738 
739  void send_message(message const &msg) override
740  {
741  DOCA_LOG_DBG("%s", to_string(msg).c_str());
742  write_to_tcp_socket(m_client_socket, msg);
743  }
744 
745  storage::control::message *poll() override
746  {
747  auto *msg = poll_tcp_socket(m_client_socket);
748  if (msg) {
749  DOCA_LOG_DBG("%s", to_string(*msg).c_str());
750  }
751 
752  return msg;
753  }
754 
755 private:
756  storage::tcp_socket m_listen_socket;
757  storage::tcp_socket m_client_socket;
758 };
759 
760 } // namespace
761 
762 std::unique_ptr<storage::control::comch_channel> make_comch_client_control_channel(
763  doca_dev *dev,
764  char const *channel_name,
765  void *callback_user_data,
766  comch_channel::consumer_event_callback new_consumer_event_cb,
767  comch_channel::consumer_event_callback expired_consumer_event_cb)
768 {
769  return std::make_unique<comch_client_control_channel>(dev,
770  channel_name,
771  callback_user_data,
772  std::move(new_consumer_event_cb),
773  std::move(expired_consumer_event_cb));
774 }
775 
776 std::unique_ptr<storage::control::comch_channel> make_comch_server_control_channel(
777  doca_dev *dev,
778  doca_dev_rep *dev_rep,
779  char const *channel_name,
780  void *callback_user_data,
781  comch_channel::consumer_event_callback new_consumer_event_cb,
782  comch_channel::consumer_event_callback expired_consumer_event_cb)
783 {
784  return std::make_unique<comch_server_control_channel>(dev,
785  dev_rep,
786  channel_name,
787  callback_user_data,
788  std::move(new_consumer_event_cb),
789  std::move(expired_consumer_event_cb));
790 }
791 
792 std::unique_ptr<storage::control::channel> make_tcp_client_control_channel(storage::ip_address const &server_address)
793 {
794  return std::make_unique<tcp_client_control_channel>(server_address);
795 }
796 
797 std::unique_ptr<storage::control::channel> make_tcp_server_control_channel(uint16_t listen_port)
798 {
799  return std::make_unique<tcp_server_control_channel>(listen_port);
800 }
801 
802 } // namespace storage::control
static void cleanup(struct cache_invalidate_sample_state *state)
static doca_error_t create_pe(struct cache_invalidate_sample_state *state)
std::function< void(void *user_data, uint32_t consumer_id)> consumer_event_callback
uint16_t get_port() const noexcept
Definition: ip_address.cpp:45
std::string const & get_address() const noexcept
Definition: ip_address.cpp:40
size_t read(char *buffer, size_t buffer_capacity)
Definition: tcp_socket.cpp:305
void listen(uint16_t port)
Definition: tcp_socket.cpp:172
bool is_valid(void) const noexcept
Definition: tcp_socket.cpp:336
size_t write(char const *buffer, size_t byte_count)
Definition: tcp_socket.cpp:274
void connect(storage::ip_address const &address)
Definition: tcp_socket.cpp:138
tcp_socket accept(void)
Definition: tcp_socket.cpp:245
connection_status poll_is_connected(void)
Definition: tcp_socket.cpp:195
doca_data m_consumer_cb_data
message m_rx_message
std::string m_error_message
DOCA_LOG_REGISTER(CONTROL_CHANNEL)
comch_channel::consumer_event_callback m_expiry_cb
doca_comch_connection * m_connection
std::vector< char > m_rx_buffer
std::vector< char > m_tx_buffer
doca_pe * m_pe
comch_channel::consumer_event_callback m_connection_cb
DOCA_STABLE doca_error_t doca_comch_client_get_connection(const struct doca_comch_client *comch_client, struct doca_comch_connection **connection)
DOCA_STABLE struct doca_comch_server * doca_comch_server_get_server_ctx(const struct doca_comch_connection *connection)
DOCA_STABLE doca_error_t doca_comch_server_event_msg_recv_register(struct doca_comch_server *comch_server, doca_comch_event_msg_recv_cb_t recv_event_cb)
Configure the doca_comch recv event callback for server context.
DOCA_STABLE doca_error_t doca_comch_server_event_connection_status_changed_register(struct doca_comch_server *comch_server, doca_comch_event_connection_status_changed_cb_t connect_event_cb, doca_comch_event_connection_status_changed_cb_t disconnect_event_cb)
Configure the doca_comch recv event callback for server context.
DOCA_STABLE doca_error_t doca_comch_client_event_consumer_register(struct doca_comch_client *comch_client, doca_comch_event_consumer_cb_t new_consumer_event_cb, doca_comch_event_consumer_cb_t expired_consumer_event_cb)
Configure the doca_comch callback for for receiving consumer events on client context.
DOCA_STABLE doca_error_t doca_comch_connection_set_user_data(struct doca_comch_connection *connection, union doca_data user_data)
DOCA_STABLE union doca_data doca_comch_connection_get_user_data(const struct doca_comch_connection *connection)
DOCA_STABLE struct doca_ctx * doca_comch_client_as_ctx(struct doca_comch_client *comch_client)
DOCA_STABLE doca_error_t doca_comch_server_event_consumer_register(struct doca_comch_server *comch_server, doca_comch_event_consumer_cb_t new_consumer_event_cb, doca_comch_event_consumer_cb_t expired_consumer_event_cb)
Configure the doca_comch callback for for receiving consumer events on server context.
DOCA_STABLE doca_error_t doca_comch_client_task_send_alloc_init(struct doca_comch_client *comch_client, struct doca_comch_connection *peer, const void *msg, uint32_t len, struct doca_comch_task_send **task)
DOCA_STABLE doca_error_t doca_comch_server_destroy(struct doca_comch_server *comch_server)
DOCA_STABLE doca_error_t doca_comch_server_task_send_set_conf(struct doca_comch_server *comch_server, doca_comch_task_send_completion_cb_t task_completion_cb, doca_comch_task_send_completion_cb_t task_error_cb, uint32_t num_send_tasks)
DOCA_STABLE doca_error_t doca_comch_client_task_send_set_conf(struct doca_comch_client *comch_client, doca_comch_task_send_completion_cb_t task_completion_cb, doca_comch_task_send_completion_cb_t task_error_cb, uint32_t num_send_tasks)
DOCA_STABLE doca_error_t doca_comch_client_event_msg_recv_register(struct doca_comch_client *comch_client, doca_comch_event_msg_recv_cb_t recv_event_cb)
Configure the doca_comch recv event callback for client context.
DOCA_STABLE doca_error_t doca_comch_client_destroy(struct doca_comch_client *comch_client)
DOCA_STABLE doca_error_t doca_comch_server_create(struct doca_dev *dev, struct doca_dev_rep *repr, const char *name, struct doca_comch_server **comch_server)
DOCA_STABLE struct doca_task * doca_comch_task_send_as_task(struct doca_comch_task_send *task)
DOCA_STABLE doca_error_t doca_comch_client_create(struct doca_dev *dev, const char *name, struct doca_comch_client **comch_client)
DOCA_STABLE doca_error_t doca_comch_server_task_send_alloc_init(struct doca_comch_server *comch_server, struct doca_comch_connection *peer, const void *msg, uint32_t len, struct doca_comch_task_send **task)
DOCA_STABLE struct doca_ctx * doca_comch_server_as_ctx(struct doca_comch_server *comch_server)
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
DOCA_STABLE doca_error_t doca_ctx_set_user_data(struct doca_ctx *ctx, union doca_data user_data)
set user data to context
doca_ctx_states
This enum defines the states of a context.
Definition: doca_ctx.h:83
DOCA_STABLE doca_error_t doca_ctx_get_user_data(const struct doca_ctx *ctx, union doca_data *user_data)
get user data from context
@ DOCA_CTX_STATE_RUNNING
Definition: doca_ctx.h:98
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_CONNECTION_ABORTED
Definition: doca_error.h:50
@ DOCA_ERROR_INITIALIZATION
Definition: doca_error.h:46
@ DOCA_ERROR_IO_FAILED
Definition: doca_error.h:55
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_IN_PROGRESS
Definition: doca_error.h:64
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
Definition: doca_log.h:476
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_pe_connect_ctx(struct doca_pe *pe, struct doca_ctx *ctx)
This method connects a context to a progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
DOCA_STABLE void doca_task_free(struct doca_task *task)
Free a task back to where it was allocated from.
uint32_t wire_size(storage::control::message_header const &hdr) noexcept
std::unique_ptr< storage::control::comch_channel > make_comch_client_control_channel(doca_dev *dev, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::unique_ptr< storage::control::comch_channel > make_comch_server_control_channel(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
char * encode(char *buffer, storage::control::message_header const &hdr) noexcept
std::unique_ptr< storage::control::channel > make_tcp_client_control_channel(storage::ip_address const &server_address)
char const * decode(char const *buffer, storage::control::message_header &hdr) noexcept
std::string to_string(storage::control::message_type type)
std::unique_ptr< storage::control::channel > make_tcp_server_control_channel(uint16_t listen_port)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
Definition: doca_utils.cpp:369
constexpr uint32_t max_concurrent_control_messages
Definition: definitions.hpp:45
Convenience type for representing opaque data.
Definition: doca_types.h:56
void * ptr
Definition: doca_types.h:57