38 .chunk_release = ucs_mpool_chunk_free,
58 const ucp_am_recv_param_t *param)
68 k = kh_get(client, rdmo_worker->clients, rdmo_hdr->
id);
69 if (k == kh_end(rdmo_worker->clients)) {
74 client = kh_value(rdmo_worker->clients, k);
75 k = kh_get(ep, rdmo_worker->eps, (int64_t)param->reply_ep);
76 if (k == kh_end(rdmo_worker->eps)) {
81 ep = kh_value(rdmo_worker->eps, k);
87 req = ucs_mpool_get(&rdmo_worker->
req_mp);
89 return UCS_ERR_NO_ELEM;
91 memset(req, 0,
sizeof(*req));
94 return UCS_ERR_INVALID_PARAM;
97 memcpy(req->
header, header, header_length);
103 return UCS_ERR_NO_RESOURCE;
108 return UCS_INPROGRESS;
120 if (rdmo_worker ==
NULL)
123 kh_destroy(ep, rdmo_worker->eps);
124 kh_destroy(client, rdmo_worker->clients);
125 ucs_mpool_cleanup(&rdmo_worker->
req_mp, 0);
141 ucp_params_t ucp_params;
142 ucp_config_t *ucp_config;
143 ucp_context_h ucp_context;
144 ucp_worker_h ucp_worker;
145 ucs_mpool_params_t mp_params;
146 ucp_am_handler_param_t am_param;
147 ucp_worker_params_t worker_params;
150 rdmo_worker = calloc(1,
sizeof(*rdmo_worker));
151 if (rdmo_worker ==
NULL)
154 ctx->plugin_ctx = rdmo_worker;
156 status = ucp_config_read(
NULL,
NULL, &ucp_config);
157 if (status != UCS_OK)
161 #if UCP_API_VERSION >= UCP_VERSION(1, 17)
162 status = ucp_config_modify(ucp_config,
"TCP_PUT_ENABLE",
"n");
164 status = ucp_config_modify(ucp_config,
"PUT_ENABLE",
"n");
166 if (status != UCS_OK) {
167 ucp_config_release(ucp_config);
171 ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
172 ucp_params.features = UCP_FEATURE_AM | UCP_FEATURE_RMA | UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH;
173 ucp_params.features |= UCP_FEATURE_WAKEUP;
174 status = ucp_init(&ucp_params, ucp_config, &ucp_context);
175 ucp_config_release(ucp_config);
176 if (status != UCS_OK)
179 worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
180 worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
181 status = ucp_worker_create(ucp_context, &worker_params, &ucp_worker);
182 if (status != UCS_OK)
183 goto err_worker_create;
185 status = ucp_worker_get_address(ucp_worker,
188 if (status != UCS_OK)
189 goto err_worker_address;
194 am_param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | UCP_AM_HANDLER_PARAM_FIELD_FLAGS |
195 UCP_AM_HANDLER_PARAM_FIELD_CB | UCP_AM_HANDLER_PARAM_FIELD_ARG;
197 am_param.flags = UCP_AM_FLAG_PERSISTENT_DATA;
199 am_param.arg = rdmo_worker;
201 status = ucp_worker_set_am_recv_handler(ucp_worker, &am_param);
202 if (status != UCS_OK)
205 rdmo_worker->clients = kh_init(client);
206 if (!rdmo_worker->clients)
209 rdmo_worker->eps = kh_init(ep);
210 if (!rdmo_worker->eps)
213 ucs_mpool_params_reset(&mp_params);
215 mp_params.elems_per_chunk = 1024;
217 mp_params.name =
"urom_rdmo_requests";
220 status = ucs_mpool_init(&mp_params, &rdmo_worker->
req_mp);
221 if (status != UCS_OK)
229 kh_destroy(
ep, rdmo_worker->eps);
231 kh_destroy(
client, rdmo_worker->clients);
236 ucp_worker_destroy(ucp_worker);
238 ucp_cleanup(ucp_context);
256 ucs_status_t ucs_status;
259 ucp_ep_params_t ep_params;
261 ucp_worker_address_attr_t addr_attr = {
262 .field_mask = UCP_WORKER_ADDRESS_ATTR_FIELD_UID,
265 ep = calloc(1,
sizeof(*
ep));
270 ep->peer_uid = addr_attr.worker_uid;
271 ucs_list_head_init(&
ep->fenced_ops);
272 ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
273 ep_params.address = peer_addr;
277 if (ucs_status != UCS_OK) {
282 k = kh_put(
ep, rdmo_worker->eps, (int64_t)
ep->ep, &ret);
284 ucp_ep_destroy(
ep->ep);
288 kh_value(rdmo_worker->eps, k) =
ep;
313 if (client->dest_id ==
dest_id)
340 nd = calloc(1,
sizeof(*nd) +
sizeof(*rdmo_notif));
360 if (client !=
NULL) {
366 k = kh_get(client, rdmo_worker->clients, rdmo_cmd->client_init.id);
367 if (k != kh_end(rdmo_worker->clients)) {
368 DOCA_LOG_ERR(
"Client ID unavailable: %#lx", rdmo_cmd->client_init.id);
372 client = calloc(1,
sizeof(*client));
373 if (client ==
NULL) {
378 client->rdmo_worker = rdmo_worker;
379 client->id = rdmo_cmd->client_init.id;
380 client->dest_id = cmd_desc->
dest_id;
381 ucs_list_head_init(&client->paused_ops);
383 DOCA_LOG_DBG(
"Client worker address len: %lu", rdmo_cmd->client_init.addr_len);
392 client->mkeys = kh_init(mkey);
393 if (client->mkeys ==
NULL) {
398 client->rqs = kh_init(rq);
399 if (client->rqs ==
NULL) {
404 k = kh_put(client, rdmo_worker->clients, client->id, &ret);
409 kh_value(rdmo_worker->clients, k) = client;
414 DOCA_LOG_DBG(
"RDMO client initialized (client: %p ep: %p)", client, client->ep);
419 kh_destroy(rq, client->rqs);
421 kh_destroy(mkey, client->mkeys);
423 ucp_ep_destroy(client->ep->ep);
444 ucs_status_t ucs_status;
454 if (client ==
NULL) {
459 k = kh_get(mkey, client->mkeys, rdmo_cmd->mr_dereg.rkey);
460 if (k == kh_end(client->mkeys)) {
461 DOCA_LOG_ERR(
"Mkey does not exists for rkey: %lu", rdmo_cmd->mr_dereg.rkey);
466 rdmo_mkey = kh_value(client->mkeys, k);
470 if (ucs_status != UCS_OK) {
471 DOCA_LOG_ERR(
"ucp_mem_unmap() returned error: %s", ucs_status_string(ucs_status));
477 ucp_rkey_destroy(rdmo_mkey->
ucp_rkey);
478 kh_destroy(mem_cache, rdmo_mkey->mem_cache);
479 kh_del(mkey, client->mkeys, k);
486 nd = calloc(1,
sizeof(*nd) +
sizeof(*rdmo_notif));
488 kh_del(rq, client->rqs, k);
497 notif->
len =
sizeof(*rdmo_notif);
506 DOCA_LOG_DBG(
"Dereg - rkey: %lu (client: %p)", rdmo_cmd->mr_dereg.rkey, client);
532 nd = calloc(1,
sizeof(*nd) +
sizeof(*rdmo_notif));
541 notif->
len =
sizeof(*rdmo_notif);
547 if (client ==
NULL) {
553 rq = calloc(1,
sizeof(*rq));
559 rq->
id = client->next_rq_id++;
560 k = kh_put(rq, client->rqs, rq->
id, &ret);
566 kh_value(client->rqs, k) = rq;
570 kh_del(rq, client->rqs, k);
577 DOCA_LOG_DBG(
"Created RQ: %p ID: %#lx (client: %p ep: %p)", rq, rq->
id, client, rq->
ep);
596 ucs_status_ptr_t ucs_status_ptr;
598 k = kh_get(ep, rdmo_worker->eps, (int64_t)rdmo_ep->
ep);
599 if (k == kh_end(rdmo_worker->eps)) {
600 DOCA_LOG_ERR(
"EP does not exist - RDMO EP: %p UCP EP: %p", rdmo_ep, rdmo_ep->
ep);
603 ucs_status_ptr = ucp_ep_close_nb(rdmo_ep->
ep, UCP_EP_CLOSE_MODE_FLUSH);
604 if (UCS_PTR_IS_ERR(ucs_status_ptr))
607 kh_del(ep, rdmo_worker->eps, k);
608 ucp_request_release(ucs_status_ptr);
635 if (client ==
NULL) {
640 k = kh_get(rq, client->rqs, rdmo_cmd->rq_destroy.rq_id);
641 if (k == kh_end(client->rqs)) {
642 DOCA_LOG_ERR(
"RQ does not exist for ID: %lu", rdmo_cmd->rq_destroy.rq_id);
646 rq = kh_value(client->rqs, k);
652 kh_del(rq, client->rqs, k);
659 nd = calloc(1,
sizeof(*nd) +
sizeof(*rdmo_notif));
661 kh_del(rq, client->rqs, k);
670 notif->
len =
sizeof(*rdmo_notif);
698 ucs_status_t ucs_status;
699 ucp_mem_h ucp_memh =
NULL;
700 ucp_mem_map_params_t map_params;
710 nd = calloc(1,
sizeof(*nd) +
sizeof(*rdmo_notif));
719 notif->
len =
sizeof(*rdmo_notif);
724 rdmo_mkey = malloc(
sizeof(*rdmo_mkey));
725 if (rdmo_mkey ==
NULL) {
731 if (client ==
NULL) {
738 ucs_status = ucp_ep_rkey_unpack(client->ep->ep, rdmo_cmd->mr_reg.packed_rkey, &ucp_rkey);
739 if (ucs_status != UCS_OK) {
745 if (rdmo_cmd->mr_reg.packed_memh) {
746 map_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER;
747 map_params.exported_memh_buffer = rdmo_cmd->mr_reg.packed_memh;
749 if (ucs_status == UCS_ERR_UNREACHABLE) {
752 }
else if (ucs_status != UCS_OK) {
759 rdmo_mkey->mem_cache = kh_init(mem_cache);
760 if (rdmo_mkey->mem_cache ==
NULL) {
768 rdmo_mkey->
va = rdmo_cmd->mr_reg.va;
769 rdmo_mkey->
len = rdmo_cmd->mr_reg.len;
772 k = kh_put(mkey, client->mkeys, rkey, &ret);
778 kh_value(client->mkeys, k) = rdmo_mkey;
782 DOCA_LOG_DBG(
"Allocated rkey: %lu ucp_memh: %p (client: %p)", rkey, ucp_memh, client);
803 uint64_t extended_mem = 0;
814 switch (rdmo_cmd->
type) {
854 while (!ucs_list_is_empty(cmd_list)) {
863 switch (rdmo_cmd->
type) {
910 ucs_list_add_tail(notif_list, &nd->
entry);
925 size_t *packed_notif_len,
930 void *pack_tail = packed_notif;
936 memcpy(pack_head, notif, pack_len);
937 *packed_notif_len = pack_len;
940 switch (rdmo_notif->
type) {
945 *packed_notif_len += pack_len;
if(bitoffset % 64+bitlength > 64) result|
static ucs_mpool_ops_t urom_rdmo_req_mpool_ops
static doca_error_t urom_worker_rdmo_cmd_unpack(void *packed_cmd, size_t packed_cmd_len, struct urom_worker_cmd **cmd)
static uint64_t plugin_version
static doca_error_t urom_worker_rdmo_rq_destroy_cmd(struct urom_worker_rdmo *rdmo_worker, struct urom_worker_cmd_desc *cmd_desc)
static doca_error_t urom_worker_rdmo_ep_get(struct urom_worker_rdmo *rdmo_worker, void *peer_addr, struct urom_worker_rdmo_ep **rdmo_ep)
static doca_error_t urom_worker_rdmo_mr_dereg_cmd(struct urom_worker_rdmo *rdmo_worker, struct urom_worker_cmd_desc *cmd_desc)
static doca_error_t urom_worker_rdmo_progress(struct urom_worker_ctx *ctx, ucs_list_link_t *notif_list)
static doca_error_t urom_worker_rdmo_notif_pack(struct urom_worker_notify *notif, size_t *packed_notif_len, void *packed_notif)
static doca_error_t urom_worker_rdmo_open(struct urom_worker_ctx *ctx)
static ucs_status_t urom_worker_rdmo_am_cb(void *ctx, const void *header, size_t header_length, void *data, size_t length, const ucp_am_recv_param_t *param)
DOCA_LOG_REGISTER(UROM::WORKER::RDMO)
doca_error_t urom_plugin_get_version(uint64_t *version)
doca_error_t urom_plugin_get_iface(struct urom_plugin_iface *iface)
static doca_error_t urom_worker_rdmo_rq_create_cmd(struct urom_worker_rdmo *rdmo_worker, struct urom_worker_cmd_desc *cmd_desc)
static void urom_worker_rdmo_close(struct urom_worker_ctx *worker_ctx)
static struct urom_worker_rdmo_client * urom_worker_rdmo_dest_to_client(struct urom_worker_rdmo *rdmo_worker, uint64_t dest_id)
static doca_error_t urom_worker_rdmo_addr(struct urom_worker_ctx *worker_ctx, void *addr, uint64_t *addr_len)
static doca_error_t urom_worker_rdmo_worker_cmd(struct urom_worker_ctx *ctx, ucs_list_link_t *cmd_list)
static doca_error_t urom_worker_rdmo_ep_del(struct urom_worker_rdmo *rdmo_worker, struct urom_worker_rdmo_ep *rdmo_ep)
static doca_error_t urom_worker_rdmo_mr_reg_cmd(struct urom_worker_rdmo *rdmo_worker, struct urom_worker_cmd_desc *cmd_desc)
static doca_error_t urom_worker_rdmo_client_init_cmd(struct urom_worker_rdmo *rdmo_worker, struct urom_worker_cmd_desc *cmd_desc)
struct urom_worker_rdmo_req_ops * urom_worker_rdmo_ops_table[]
doca_error_t urom_worker_rdmo_req_queue(struct urom_worker_rdmo_req *req)
#define UROM_RDMO_HDR_LEN_MAX
static struct doca_flow_pipe_entry * entry[MAX_ENTRIES]
#define DOCA_STRUCT_CTOR(_X_)
enum doca_error doca_error_t
DOCA API return codes.
@ DOCA_ERROR_INVALID_VALUE
@ DOCA_ERROR_INITIALIZATION
@ DOCA_ERROR_ALREADY_EXIST
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
__UINTPTR_TYPE__ uintptr_t
ucp_context_h ucp_context
ucp_address_t * worker_address
UROM Worker plugin interface.
UROM Worker command descriptor structure.
struct urom_worker_cmd worker_cmd
UROM Worker command structure.
UROM Worker notification descriptor structure.
struct urom_worker_notify worker_notif
struct urom_worker_rdmo_notify_mr_reg mr_reg
struct urom_worker_rdmo_notify_rq_create rq_create
struct urom_worker_rdmo_notify_rq_destroy rq_destroy
struct urom_worker_rdmo_notify_mr_dereg mr_dereg
struct urom_worker_rdmo_notify_client_init client_init
UROM Worker notification structure.
struct urom_worker_rdmo * rdmo_worker
struct urom_worker_rdmo_cmd_mr_reg mr_reg
struct urom_worker_rdmo_cmd_client_init client_init
struct urom_worker_rdmo_cmd_rq_create rq_create
doca_error_t(* progress)(struct urom_worker_rdmo_req *req)
uint8_t header[UROM_RDMO_HDR_LEN_MAX]
struct urom_worker_rdmo_ep * ep
struct urom_worker_rdmo_req_ops * ops
struct urom_worker_rdmo_client * client
ucp_am_recv_param_t param
struct urom_worker_rdmo_ep * ep
ucs_list_link_t completed_reqs
struct upf_accel_ctx * ctx
@ UROM_WORKER_NOTIFY_RDMO_CLIENT_INIT
@ UROM_WORKER_NOTIFY_RDMO_RQ_CREATE
@ UROM_WORKER_NOTIFY_RDMO_RQ_DESTROY
@ UROM_WORKER_NOTIFY_RDMO_MR_REG
@ UROM_WORKER_NOTIFY_RDMO_MR_DEREG
@ UROM_WORKER_CMD_RDMO_MR_REG
@ UROM_WORKER_CMD_RDMO_MR_DEREG
@ UROM_WORKER_CMD_RDMO_RQ_CREATE
@ UROM_WORKER_CMD_RDMO_RQ_DESTROY
@ UROM_WORKER_CMD_RDMO_CLIENT_INIT
#define urom_rdmo_serialize_next_raw(_iter, _type, _offset)