42 using namespace std::string_literals;
48 template <
typename InterfaceT>
49 class basic_control_channel :
public InterfaceT {
55 basic_control_channel(basic_control_channel
const &) =
delete;
56 basic_control_channel(basic_control_channel &&) noexcept =
delete;
57 basic_control_channel &operator=(basic_control_channel
const &) =
delete;
58 basic_control_channel &operator=(basic_control_channel &&) noexcept =
delete;
60 void set_error(std::string msg)
74 void append_rx_bytes(
char const *bytes,
size_t byte_count)
76 std::copy(bytes, bytes + byte_count, std::back_inserter(
m_rx_buffer));
79 bool extract_message_from_rx_buffer()
105 class tcp_control_channel :
public basic_control_channel<storage::control::channel> {
110 encode_message_to_tx_buffer(msg);
118 std::array<char, 256> tmp_buff{};
120 uint32_t
const nb_read = socket.
read(tmp_buff.data(), tmp_buff.size());
122 append_rx_bytes(tmp_buff.data(), nb_read);
125 if (extract_message_from_rx_buffer()) {
133 class comch_control_channel :
public basic_control_channel<storage::control::comch_channel> {
135 ~comch_control_channel()
override
137 if (
m_pe !=
nullptr) {
145 comch_control_channel()
146 : basic_control_channel<
storage::control::comch_channel>{},
154 comch_control_channel(comch_control_channel
const &) =
delete;
155 comch_control_channel(comch_control_channel &&) noexcept = delete;
156 comch_control_channel &operator=(comch_control_channel const &) = delete;
157 comch_control_channel &operator=(comch_control_channel &&) noexcept = delete;
159 storage::control::message *poll()
override
162 if (extract_message_from_rx_buffer()) {
170 doca_comch_connection *get_comch_connection() const noexcept
override
182 comch_control_channel *get_self()
195 void set_consumer_callbacks(
void *callback_user_data,
196 comch_channel::consumer_event_callback new_consumer_event_cb,
197 comch_channel::consumer_event_callback expired_consumer_event_cb)
201 m_expiry_cb = std::move(expired_consumer_event_cb);
204 static void new_consumer_event_cb(doca_comch_event_consumer *event,
205 doca_comch_connection *conn,
206 uint32_t
id) noexcept
208 static_cast<void>(event);
211 self->m_connection_cb(self->m_consumer_cb_data.ptr,
id);
214 static void expired_consumer_event_cb(doca_comch_event_consumer *event,
215 doca_comch_connection *conn,
216 uint32_t
id) noexcept
218 static_cast<void>(event);
221 self->m_expiry_cb(self->m_consumer_cb_data.ptr,
id);
231 static void task_send_cb(doca_comch_task_send *task,
doca_data task_user_data,
doca_data ctx_user_data) noexcept
233 static_cast<void>(task_user_data);
234 static_cast<void>(ctx_user_data);
246 static void task_send_error_cb(doca_comch_task_send *task,
250 static_cast<void>(task_user_data);
251 static_cast<comch_control_channel *
>(ctx_user_data.ptr)
252 ->set_error(
"Failed to complete doca_comch_task_send");
264 static void event_msg_recv_cb(doca_comch_event_msg_recv *event,
265 uint8_t *recv_buffer,
267 doca_comch_connection *comch_connection) noexcept
269 static_cast<void>(event);
273 self->append_rx_bytes(
reinterpret_cast<char const *
>(recv_buffer), msg_len);
277 class comch_client_control_channel :
public comch_control_channel {
279 ~comch_client_control_channel()
override
283 comch_client_control_channel() =
delete;
284 comch_client_control_channel(doca_dev *dev,
285 char const *channel_name,
286 void *callback_user_data,
287 comch_channel::consumer_event_callback new_consumer_event_cb,
288 comch_channel::consumer_event_callback expired_consumer_event_cb)
289 : comch_control_channel{},
290 m_comch_client{nullptr}
294 set_consumer_callbacks(callback_user_data,
295 std::move(new_consumer_event_cb),
296 std::move(expired_consumer_event_cb));
297 init(dev, channel_name);
303 comch_client_control_channel(comch_client_control_channel
const &) =
delete;
304 comch_client_control_channel(comch_client_control_channel &&) noexcept = delete;
305 comch_client_control_channel &operator=(comch_client_control_channel const &) = delete;
306 comch_client_control_channel &operator=(comch_client_control_channel &&) noexcept = delete;
308 bool is_connected()
override
314 "Failed to connect to doca_comch client"};
322 throw std::runtime_error{
"Failed to get comch client connection: "s +
336 void send_message(message
const &msg)
override
339 doca_comch_task_send *task;
342 encode_message_to_tx_buffer(msg);
361 doca_comch_client *m_comch_client;
363 void init(doca_dev *dev,
char const *channel_name)
373 comch_control_channel::task_send_cb,
374 comch_control_channel::task_send_error_cb,
381 comch_control_channel::event_msg_recv_cb);
384 "Failed to configure doca_comch_client receive task callback"};
388 comch_control_channel::new_consumer_event_cb,
389 comch_control_channel::expired_consumer_event_cb);
393 "Failed to register for doca_comch_client consumer registration events"};
418 if (m_comch_client) {
424 DOCA_LOG_DBG(
"Destroy doca_comch_client(%p)", m_comch_client);
433 class comch_server_control_channel :
public comch_control_channel {
435 ~comch_server_control_channel()
override
439 comch_server_control_channel() =
delete;
440 comch_server_control_channel(doca_dev *dev,
441 doca_dev_rep *dev_rep,
442 char const *channel_name,
443 void *callback_user_data,
444 comch_channel::consumer_event_callback new_consumer_event_cb,
445 comch_channel::consumer_event_callback expired_consumer_event_cb)
446 : comch_control_channel{},
447 m_comch_server{nullptr}
451 set_consumer_callbacks(callback_user_data,
452 std::move(new_consumer_event_cb),
453 std::move(expired_consumer_event_cb));
454 init(dev, dev_rep, channel_name);
461 comch_server_control_channel(comch_server_control_channel
const &) =
delete;
462 comch_server_control_channel(comch_server_control_channel &&) noexcept = delete;
463 comch_server_control_channel &operator=(comch_server_control_channel const &) = delete;
464 comch_server_control_channel &operator=(comch_server_control_channel &&) noexcept = delete;
466 bool is_connected()
override
472 "Failed to connect to doca_comch client"};
478 void send_message(message
const &msg)
override
481 doca_comch_task_send *task;
484 encode_message_to_tx_buffer(msg);
503 doca_comch_server *m_comch_server;
505 void init(doca_dev *dev, doca_dev_rep *dev_rep,
char const *channel_name)
524 comch_control_channel::task_send_cb,
525 comch_control_channel::task_send_error_cb,
532 comch_control_channel::event_msg_recv_cb);
535 "Failed to configure doca_comch_server receive task callback"};
539 event_connection_connected_cb,
540 event_connection_disconnected_cb);
546 comch_control_channel::new_consumer_event_cb,
547 comch_control_channel::expired_consumer_event_cb);
551 "Failed to register for doca_comch_server consumer registration events"};
562 "[application::application] Failed to start doca_comch_server"};
570 if (m_comch_server !=
nullptr) {
583 static void event_connection_connected_cb(doca_comch_event_connection_status_changed *event,
584 doca_comch_connection *conn,
585 uint8_t change_successful) noexcept
587 static_cast<void>(event);
588 DOCA_LOG_DBG(
"Connection %p %s", conn, (change_successful ?
"connected" :
"refused"));
590 if (change_successful == 0) {
604 auto *
self =
static_cast<comch_server_control_channel *
>(user_data.ptr);
606 self->m_connection = conn;
609 static void event_connection_disconnected_cb(doca_comch_event_connection_status_changed *event,
610 doca_comch_connection *conn,
611 uint8_t change_successful) noexcept
613 static_cast<void>(event);
614 static_cast<void>(change_successful);
625 auto *
self =
static_cast<comch_server_control_channel *
>(user_data.ptr);
626 if (self->m_connection != conn) {
627 DOCA_LOG_WARN(
"Ignoring disconnect of non-connected connection");
631 self->m_connection =
nullptr;
635 class tcp_client_control_channel :
public tcp_control_channel {
637 ~tcp_client_control_channel()
override
646 tcp_client_control_channel() =
delete;
648 : tcp_control_channel{},
649 m_server_address{server_address},
652 m_socket.
connect(m_server_address);
654 tcp_client_control_channel(tcp_client_control_channel
const &) =
delete;
655 tcp_client_control_channel(tcp_client_control_channel &&) noexcept = delete;
656 tcp_client_control_channel &operator=(tcp_client_control_channel const &) = delete;
657 tcp_client_control_channel &operator=(tcp_client_control_channel &&) noexcept = delete;
659 bool is_connected()
override
661 std::string
const remote_display_string =
666 DOCA_LOG_INFO(
"Connected to %s", remote_display_string.c_str());
673 m_socket.
connect(m_server_address);
677 "Unable to connect to " + remote_display_string};
684 void send_message(message
const &msg)
override
687 write_to_tcp_socket(m_socket, msg);
692 auto *msg = poll_tcp_socket(m_socket);
705 class tcp_server_control_channel :
public tcp_control_channel {
707 ~tcp_server_control_channel()
override
710 m_client_socket.
close();
715 m_listen_socket.
close();
720 tcp_server_control_channel() =
delete;
721 explicit tcp_server_control_channel(uint16_t listen_port)
722 : tcp_control_channel{},
726 m_listen_socket.
listen(listen_port);
728 tcp_server_control_channel(tcp_server_control_channel
const &) =
delete;
729 tcp_server_control_channel(tcp_server_control_channel &&) noexcept = delete;
730 tcp_server_control_channel &operator=(tcp_server_control_channel const &) = delete;
731 tcp_server_control_channel &operator=(tcp_server_control_channel &&) noexcept = delete;
733 bool is_connected()
override
735 m_client_socket = m_listen_socket.
accept();
739 void send_message(message
const &msg)
override
742 write_to_tcp_socket(m_client_socket, msg);
747 auto *msg = poll_tcp_socket(m_client_socket);
764 char const *channel_name,
765 void *callback_user_data,
769 return std::make_unique<comch_client_control_channel>(dev,
772 std::move(new_consumer_event_cb),
773 std::move(expired_consumer_event_cb));
778 doca_dev_rep *dev_rep,
779 char const *channel_name,
780 void *callback_user_data,
784 return std::make_unique<comch_server_control_channel>(dev,
788 std::move(new_consumer_event_cb),
789 std::move(expired_consumer_event_cb));
794 return std::make_unique<tcp_client_control_channel>(server_address);
799 return std::make_unique<tcp_server_control_channel>(listen_port);
static void cleanup(struct cache_invalidate_sample_state *state)
static doca_error_t create_pe(struct cache_invalidate_sample_state *state)
std::function< void(void *user_data, uint32_t consumer_id)> consumer_event_callback
uint16_t get_port() const noexcept
std::string const & get_address() const noexcept
size_t read(char *buffer, size_t buffer_capacity)
void listen(uint16_t port)
bool is_valid(void) const noexcept
size_t write(char const *buffer, size_t byte_count)
void connect(storage::ip_address const &address)
connection_status poll_is_connected(void)
doca_data m_consumer_cb_data
std::string m_error_message
DOCA_LOG_REGISTER(CONTROL_CHANNEL)
comch_channel::consumer_event_callback m_expiry_cb
doca_comch_connection * m_connection
std::vector< char > m_rx_buffer
std::vector< char > m_tx_buffer
comch_channel::consumer_event_callback m_connection_cb
DOCA_STABLE doca_error_t doca_comch_client_get_connection(const struct doca_comch_client *comch_client, struct doca_comch_connection **connection)
DOCA_STABLE struct doca_comch_server * doca_comch_server_get_server_ctx(const struct doca_comch_connection *connection)
DOCA_STABLE doca_error_t doca_comch_server_event_msg_recv_register(struct doca_comch_server *comch_server, doca_comch_event_msg_recv_cb_t recv_event_cb)
Configure the doca_comch recv event callback for server context.
DOCA_STABLE doca_error_t doca_comch_server_event_connection_status_changed_register(struct doca_comch_server *comch_server, doca_comch_event_connection_status_changed_cb_t connect_event_cb, doca_comch_event_connection_status_changed_cb_t disconnect_event_cb)
Configure the doca_comch recv event callback for server context.
DOCA_STABLE doca_error_t doca_comch_client_event_consumer_register(struct doca_comch_client *comch_client, doca_comch_event_consumer_cb_t new_consumer_event_cb, doca_comch_event_consumer_cb_t expired_consumer_event_cb)
Configure the doca_comch callback for for receiving consumer events on client context.
DOCA_STABLE doca_error_t doca_comch_connection_set_user_data(struct doca_comch_connection *connection, union doca_data user_data)
DOCA_STABLE union doca_data doca_comch_connection_get_user_data(const struct doca_comch_connection *connection)
DOCA_STABLE struct doca_ctx * doca_comch_client_as_ctx(struct doca_comch_client *comch_client)
DOCA_STABLE doca_error_t doca_comch_server_event_consumer_register(struct doca_comch_server *comch_server, doca_comch_event_consumer_cb_t new_consumer_event_cb, doca_comch_event_consumer_cb_t expired_consumer_event_cb)
Configure the doca_comch callback for for receiving consumer events on server context.
DOCA_STABLE doca_error_t doca_comch_client_task_send_alloc_init(struct doca_comch_client *comch_client, struct doca_comch_connection *peer, const void *msg, uint32_t len, struct doca_comch_task_send **task)
DOCA_STABLE doca_error_t doca_comch_server_destroy(struct doca_comch_server *comch_server)
DOCA_STABLE doca_error_t doca_comch_server_task_send_set_conf(struct doca_comch_server *comch_server, doca_comch_task_send_completion_cb_t task_completion_cb, doca_comch_task_send_completion_cb_t task_error_cb, uint32_t num_send_tasks)
DOCA_STABLE doca_error_t doca_comch_client_task_send_set_conf(struct doca_comch_client *comch_client, doca_comch_task_send_completion_cb_t task_completion_cb, doca_comch_task_send_completion_cb_t task_error_cb, uint32_t num_send_tasks)
DOCA_STABLE doca_error_t doca_comch_client_event_msg_recv_register(struct doca_comch_client *comch_client, doca_comch_event_msg_recv_cb_t recv_event_cb)
Configure the doca_comch recv event callback for client context.
DOCA_STABLE doca_error_t doca_comch_client_destroy(struct doca_comch_client *comch_client)
DOCA_STABLE doca_error_t doca_comch_server_create(struct doca_dev *dev, struct doca_dev_rep *repr, const char *name, struct doca_comch_server **comch_server)
DOCA_STABLE struct doca_task * doca_comch_task_send_as_task(struct doca_comch_task_send *task)
DOCA_STABLE doca_error_t doca_comch_client_create(struct doca_dev *dev, const char *name, struct doca_comch_client **comch_client)
DOCA_STABLE doca_error_t doca_comch_server_task_send_alloc_init(struct doca_comch_server *comch_server, struct doca_comch_connection *peer, const void *msg, uint32_t len, struct doca_comch_task_send **task)
DOCA_STABLE struct doca_ctx * doca_comch_server_as_ctx(struct doca_comch_server *comch_server)
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
DOCA_STABLE doca_error_t doca_ctx_get_state(const struct doca_ctx *ctx, enum doca_ctx_states *state)
Get context state.
DOCA_STABLE doca_error_t doca_ctx_set_user_data(struct doca_ctx *ctx, union doca_data user_data)
set user data to context
doca_ctx_states
This enum defines the states of a context.
DOCA_STABLE doca_error_t doca_ctx_get_user_data(const struct doca_ctx *ctx, union doca_data *user_data)
get user data from context
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_name(doca_error_t error)
Returns the string representation of an error code name.
@ DOCA_ERROR_CONNECTION_ABORTED
@ DOCA_ERROR_INITIALIZATION
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
DOCA_STABLE doca_error_t doca_pe_destroy(struct doca_pe *pe)
Destroy doca progress engine.
DOCA_STABLE doca_error_t doca_pe_connect_ctx(struct doca_pe *pe, struct doca_ctx *ctx)
This method connects a context to a progress engine.
DOCA_STABLE doca_error_t doca_task_submit(struct doca_task *task)
Submit a task to a progress engine.
DOCA_STABLE uint8_t doca_pe_progress(struct doca_pe *pe)
Run the progress engine.
DOCA_STABLE doca_error_t doca_pe_create(struct doca_pe **pe)
Creates DOCA progress engine.
DOCA_STABLE void doca_task_free(struct doca_task *task)
Free a task back to where it was allocated from.
uint32_t wire_size(storage::control::message_header const &hdr) noexcept
std::unique_ptr< storage::control::comch_channel > make_comch_client_control_channel(doca_dev *dev, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
std::unique_ptr< storage::control::comch_channel > make_comch_server_control_channel(doca_dev *dev, doca_dev_rep *dev_rep, char const *channel_name, void *callback_user_data, comch_channel::consumer_event_callback new_consumer_event_cb, comch_channel::consumer_event_callback expired_consumer_event_cb)
char * encode(char *buffer, storage::control::message_header const &hdr) noexcept
std::unique_ptr< storage::control::channel > make_tcp_client_control_channel(storage::ip_address const &server_address)
char const * decode(char const *buffer, storage::control::message_header &hdr) noexcept
std::string to_string(storage::control::message_type type)
std::unique_ptr< storage::control::channel > make_tcp_server_control_channel(uint16_t listen_port)
doca_error_t stop_context(doca_ctx *ctx, doca_pe *pe) noexcept
constexpr uint32_t max_concurrent_control_messages
Convenience type for representing opaque data.