NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
worker_rdmo_ops.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2024 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include <stdint.h>
27 #include <stdlib.h>
28 
29 #include "worker_rdmo.h"
30 #include "urom_rdmo.h"
31 
32 DOCA_LOG_REGISTER(UROM::WORKER::RDMO::OPS);
33 
34 #define urom_rdmo_compiler_fence() asm volatile("" ::: "memory") /* Memory barrier */
35 
36 /*
37  * Get memory address by id from mkey cache
38  *
39  * @rdmo_mkey [in]: RDMO mkey structure
40  * @addr [in]: address id
41  * @val [out]: set address value from cache
42  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
43  */
45  uint64_t addr,
46  uint64_t *val)
47 {
48  khint_t k;
49 
50  k = kh_get(mem_cache, rdmo_mkey->mem_cache, addr);
51  if (k == kh_end(rdmo_mkey->mem_cache)) {
52  DOCA_LOG_DBG("Cache miss addr: %#lx", addr);
53  return DOCA_ERROR_NOT_FOUND;
54  }
55 
56  *val = kh_value(rdmo_mkey->mem_cache, k);
57  DOCA_LOG_DBG("Cache hit addr: %#lx val: %#lx", addr, *val);
58 
59  return DOCA_SUCCESS;
60 }
61 
62 /*
63  * Set memory address by id in mkey cache
64  *
65  * @rdmo_mkey [in]: RDMO mkey structure
66  * @addr [in]: address id
67  * @val [in]: address value
68  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
69  */
70 static doca_error_t urom_worker_rdmo_mem_cache_put(struct urom_worker_rdmo_mkey *rdmo_mkey, uint64_t addr, uint64_t val)
71 {
72  khint_t k;
73  int ret;
74 
75  k = kh_get(mem_cache, rdmo_mkey->mem_cache, addr);
76  if (k != kh_end(rdmo_mkey->mem_cache)) {
77  kh_value(rdmo_mkey->mem_cache, k) = val;
78  DOCA_LOG_DBG("Cache update addr: %#lx val: %#lx", addr, val);
79  } else {
80  k = kh_put(mem_cache, rdmo_mkey->mem_cache, addr, &ret);
81  if (ret < 0) {
82  DOCA_LOG_ERR("Failed to rdmo mkey");
83  return DOCA_ERROR_DRIVER;
84  }
85  kh_value(rdmo_mkey->mem_cache, k) = val;
86  DOCA_LOG_DBG("Cache insert addr: %#lx val: %#lx", addr, val);
87  }
88  return DOCA_SUCCESS;
89 }
90 
91 /*
92  * RDMO address flush callback
93  *
94  * @request [in]: flush request
95  * @ucs_status [in]: operation status
96  * @user_data [in]: user data
97  */
98 static void urom_worker_rdmo_addr_flush_cb(void *request, ucs_status_t ucs_status, void *user_data)
99 {
100  if (ucs_status != UCS_OK)
101  return;
102 
103  ucs_mpool_put(user_data);
104  ucp_request_free(request);
105 }
106 
107 /*
108  * Register address flush handle
109  *
110  * @client [in]: RDMO client
111  * @addr [in]: address id
112  * @val [in]: address value
113  * @rdmo_mkey [in]: RDMO mkey
114  * @return: UCX_OK on success and error status otherwise
115  */
116 static ucs_status_ptr_t urom_worker_rdmo_addr_flush_slow(struct urom_worker_rdmo_client *client,
117  uint64_t addr,
118  uint64_t val,
119  struct urom_worker_rdmo_mkey *rdmo_mkey)
120 {
121  ucp_request_param_t req_param = {0};
122  uint64_t *req;
123 
124  DOCA_LOG_DBG("Slow flush: %#lx = %#lx (client: %p)", addr, val, client);
125 
126  req = ucs_mpool_get(&client->rdmo_worker->req_mp);
127  if (req == NULL)
128  return NULL;
129  *req = val;
130 
131  req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA;
132  req_param.cb.send = urom_worker_rdmo_addr_flush_cb;
133  req_param.user_data = req;
134 
135  return ucp_put_nbx(client->ep->ep, req, 8, addr, rdmo_mkey->ucp_rkey, &req_param);
136 }
137 
138 /*
139  * Handle memory cache flush
140  *
141  * @client [in]: RDMO client
142  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
143  */
145 {
146  khint_t k;
147  khint_t j;
148  struct urom_worker_rdmo_mkey *rdmo_mkey;
149  uint64_t addr;
150  uint64_t val;
151  ucp_request_param_t req_param;
152  ucs_status_ptr_t ucs_status_ptr;
153  uint64_t get_addr = 0;
154  ucp_rkey_h get_rkey;
155 
156  /* For each client mkey */
157  for (k = kh_begin(client->mkeys); k != kh_end(client->mkeys); ++k) {
158  if (!kh_exist(client->mkeys, k))
159  continue;
160 
161  rdmo_mkey = kh_value(client->mkeys, k);
162 
163  /* flush each cached addr */
164  for (j = kh_begin(rdmo_mkey->mem_cache); j != kh_end(rdmo_mkey->mem_cache); ++j) {
165  if (!kh_exist(rdmo_mkey->mem_cache, j))
166  continue;
167 
168  addr = kh_key(rdmo_mkey->mem_cache, j);
169  val = kh_value(rdmo_mkey->mem_cache, j);
170  kh_del(mem_cache, rdmo_mkey->mem_cache, j);
171 
172  /* Try send from stack */
173  memset(&req_param, 0, sizeof(req_param));
174  req_param.op_attr_mask = UCP_OP_ATTR_FLAG_FORCE_IMM_CMPL;
175 
176  ucs_status_ptr = ucp_put_nbx(client->ep->ep, &val, 8, addr, rdmo_mkey->ucp_rkey, &req_param);
177  if (UCS_PTR_STATUS(ucs_status_ptr) == UCS_ERR_NO_RESOURCE) {
178  /* Fall back to send from heap */
179  ucs_status_ptr = urom_worker_rdmo_addr_flush_slow(client, addr, val, rdmo_mkey);
180  } else {
181  if (UCS_PTR_IS_PTR(ucs_status_ptr))
182  ucp_request_free(ucs_status_ptr);
183  }
184 
185  if (UCS_PTR_IS_ERR(ucs_status_ptr))
186  return DOCA_ERROR_DRIVER;
187 
188  DOCA_LOG_DBG("Flushed %#lx = %#lx (client: %p)", addr, val, client);
189 
190  /* Save one of t he flushed addresses to use with a flushing get */
191  if (!get_addr) {
192  get_addr = addr;
193  get_rkey = rdmo_mkey->ucp_rkey;
194  }
195  }
196  }
197 
198  /* Use a Get to make Put data visibility */
199  if (get_addr) {
200  memset(&req_param, 0, sizeof(req_param));
201  ucs_status_ptr = ucp_get_nbx(client->ep->ep, &client->get_result, 8, get_addr, get_rkey, &req_param);
202 
203  if (UCS_PTR_IS_ERR(ucs_status_ptr))
204  return DOCA_ERROR_DRIVER;
205 
206  if (UCS_PTR_IS_PTR(ucs_status_ptr))
207  ucp_request_free(ucs_status_ptr);
208 
209  DOCA_LOG_DBG("Issued flushing Get to %#lx", addr);
210  }
211 
212  return DOCA_SUCCESS;
213 }
214 
215 /*
216  * Free RDMO request
217  *
218  * @req [in]: RDMO request
219  */
221 {
222  req->ep->oreqs--;
223  ucs_mpool_put(req);
224 }
225 
226 /*
227  * Free RDMO request's data
228  *
229  * @req [in]: RDMO request
230  */
232 {
233  struct urom_worker_rdmo *rdmo_worker = req->client->rdmo_worker;
234 
235  if (!(req->param.recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV))
236  ucp_am_data_release(rdmo_worker->ucp_data.ucp_worker, req->data);
237 }
238 
239 /*
240  * Launch RDMO request
241  *
242  * @req [in]: RDMO request
243  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
244  */
246 {
247  struct urom_worker_rdmo_ep *ep = req->ep;
248  struct urom_rdmo_hdr *rdmo_hdr = (struct urom_rdmo_hdr *)req->header;
249  doca_error_t status;
250 
251  if (!ucs_list_is_empty(&ep->fenced_ops) || (rdmo_hdr->flags & UROM_RDMO_REQ_FLAG_FENCE && ep->oreqs)) {
252  DOCA_LOG_DBG("New fenced request: %p", req);
253  ucs_list_add_tail(&ep->fenced_ops, &req->entry);
254  return DOCA_ERROR_IN_PROGRESS;
255  }
256 
257  ep->oreqs++;
258  status = req->ops->progress(req);
259 
260  return status;
261 }
262 
263 /*
264  * Check if fence operation is required for the end-point
265  *
266  * @ep [in]: RDMO end-point
267  */
269 {
270  struct urom_worker_rdmo_req *req;
271  const struct urom_rdmo_hdr *rdmo_hdr;
272  doca_error_t status;
273 
274  if (ep->oreqs || ucs_list_is_empty(&ep->fenced_ops))
275  return;
276 
277  /* Progress fenced ops */
278  while (!ucs_list_is_empty(&ep->fenced_ops)) {
279  req = ucs_list_head(&ep->fenced_ops, struct urom_worker_rdmo_req, entry);
280 
281  rdmo_hdr = (const struct urom_rdmo_hdr *)req->header;
282  if (rdmo_hdr->flags & UROM_RDMO_REQ_FLAG_FENCE && ep->oreqs) {
283  DOCA_LOG_DBG("Fenced on req: %p", req);
284  break;
285  }
286 
287  ucs_list_del(&req->entry);
288 
289  DOCA_LOG_DBG("Starting req: %p", req);
290  ep->oreqs++;
291  status = req->ops->progress(req);
292  if (status == DOCA_SUCCESS) {
295  }
296  }
297 }
298 
299 /*
300  * Progress client paused RDMO operations
301  *
302  * @client [in]: RDMO client
303  */
305 {
306  struct urom_worker_rdmo_req *req;
307  doca_error_t status;
308 
309  if (client->pause || ucs_list_is_empty(&client->paused_ops))
310  return;
311 
312  /* Progress paused ops */
313  while (!client->pause && !ucs_list_is_empty(&client->paused_ops)) {
314  req = ucs_list_extract_head(&client->paused_ops, struct urom_worker_rdmo_req, entry);
315 
316  DOCA_LOG_DBG("Starting req: %p", req);
317 
318  status = urom_worker_rdmo_req_start(req);
319  if (status == DOCA_SUCCESS) {
322  }
323  }
324 }
325 
326 /*
327  * RDMO request completion callback
328  *
329  * @req [in]: scatter request
330  */
332 {
336 }
337 
338 /*
339  * RDMO operation callback
340  *
341  * @request [in]: scatter request
342  * @ucs_status [in]: operation status
343  * @user_data [in]: user data
344  */
345 static void urom_worker_rdmo_op_cb(void *request, ucs_status_t ucs_status, void *user_data)
346 {
347  (void)ucs_status;
348 
349  doca_error_t status;
350  struct urom_worker_rdmo_req *req = (struct urom_worker_rdmo_req *)user_data;
351 
352  status = req->ops->progress(req);
353  if (status == DOCA_SUCCESS) {
356  }
357 
358  ucp_request_free(request);
359 }
360 
361 /*
362  * RDMO send operation callback
363  *
364  * @request [in]: scatter request
365  * @ucs_status [in]: operation status
366  * @user_data [in]: user data
367  */
368 static void urom_worker_rdmo_op_send_cb(void *request, ucs_status_t ucs_status, void *user_data)
369 {
370  urom_worker_rdmo_op_cb(request, ucs_status, user_data);
371 }
372 
373 /*
374  * RDMO receive data operation callback
375  *
376  * @request [in]: scatter request
377  * @ucs_status [in]: operation status
378  * @length [in]: data length
379  * @user_data [in]: user data
380  */
381 static void urom_worker_rdmo_am_recv_data_cb(void *request, ucs_status_t ucs_status, size_t length, void *user_data)
382 {
383  (void)length;
384  urom_worker_rdmo_op_cb(request, ucs_status, user_data);
385 }
386 
387 /*
388  * RDMO scatter operation callback
389  *
390  * @request [in]: scatter request
391  * @ucs_status [in]: operation status
392  * @user_data [in]: user data
393  */
394 static void urom_worker_rdmo_scatter_op_send_cb(void *request, ucs_status_t ucs_status, void *user_data)
395 {
396  struct urom_worker_rdmo_req *req __attribute__((unused)) = (struct urom_worker_rdmo_req *)user_data;
397 
398  req->ctx[1]--; /* pending completions */
399  DOCA_LOG_DBG("Pending completions: %lu req: %p", req->ctx[1], req);
400  urom_worker_rdmo_op_cb(request, ucs_status, user_data);
401 }
402 
403 /*
404  * Progress function for flush operations
405  *
406  * @req [in]: RDMO request
407  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
408  */
410 {
411  ucs_status_t ucs_status;
412  ucs_status_ptr_t ucs_status_ptr;
413  ucp_request_param_t req_param = {
414  .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA,
415  .cb.send = urom_worker_rdmo_op_send_cb,
416  .user_data = req,
417  };
418  const struct urom_rdmo_hdr *rdmo_hdr = (const struct urom_rdmo_hdr *)req->header;
419  const struct urom_rdmo_flush_hdr *flush_hdr = (struct urom_rdmo_flush_hdr *)(rdmo_hdr + 1);
420  struct urom_rdmo_rsp_hdr *rsp_hdr;
421  struct urom_rdmo_flush_rsp_hdr *flush_rsp;
422  size_t rsp_len = sizeof(*rsp_hdr) + sizeof(*flush_rsp);
423 
424  if (req->ctx[0] == 0) {
425  /* Stage 1: flush RMA */
426  /* Ensure payload is written before the queue pointer */
427  ucs_status = ucp_worker_fence(req->client->rdmo_worker->ucp_data.ucp_worker);
428  if (ucs_status != UCS_OK)
429  return DOCA_ERROR_DRIVER;
430 
432 
433  ucs_status_ptr = ucp_ep_flush_nbx(req->client->ep->ep, &req_param);
434  if (UCS_PTR_IS_ERR(ucs_status_ptr))
435  return DOCA_ERROR_DRIVER;
436 
437  req->ctx[0] = 1;
438 
439  if (UCS_PTR_STATUS(ucs_status_ptr) == UCS_INPROGRESS) {
440  DOCA_LOG_DBG("Initiated RMA flush, req: %p", req);
441  return DOCA_ERROR_IN_PROGRESS;
442  }
443 
444  DOCA_LOG_DBG("Completed RMA flush, req: %p", req);
445  if (UCS_PTR_STATUS(ucs_status_ptr) != UCS_OK)
446  return DOCA_ERROR_DRIVER;
447 
448  /* Fall through */
449  }
450 
451  if (req->ctx[0] == 1) {
452  rsp_hdr = (struct urom_rdmo_rsp_hdr *)&req->ctx[1];
453  flush_rsp = (struct urom_rdmo_flush_rsp_hdr *)(rsp_hdr + 1);
454 
455  rsp_hdr->rsp_id = UROM_RDMO_OP_FLUSH;
456  flush_rsp->flush_id = flush_hdr->flush_id;
457 
458  /* Stage 2: send response to initiator */
459  ucs_status_ptr =
460  ucp_am_send_nbx(req->param.reply_ep, UROM_RDMO_AM_ID, rsp_hdr, rsp_len, NULL, 0, &req_param);
461  if (UCS_PTR_IS_ERR(ucs_status_ptr))
462  return DOCA_ERROR_DRIVER;
463 
464  req->ctx[0] = 2;
465 
466  if (UCS_PTR_STATUS(ucs_status_ptr) == UCS_INPROGRESS) {
467  DOCA_LOG_DBG("Initiated flush response, req: %p", req);
468  return DOCA_ERROR_IN_PROGRESS;
469  }
470  if (UCS_PTR_STATUS(ucs_status_ptr) != UCS_OK)
471  return DOCA_ERROR_DRIVER;
472 
473  DOCA_LOG_DBG("Completed flush response, req: %p", req);
474 
475  /* Fall through */
476  }
477 
478  /* Complete request */
479  DOCA_LOG_DBG("Completed flush request: %p", req);
480 
481  return DOCA_SUCCESS;
482 }
483 
484 /* RDMO flush operations */
487 };
488 
489 /*
490  * Progress function for append operations
491  *
492  * @req [in]: RDMO request
493  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
494  */
496 {
497  ucs_status_ptr_t ucs_status_ptr;
498  ucp_request_param_t req_param;
499  const struct urom_rdmo_hdr *rdmo_hdr = (const struct urom_rdmo_hdr *)req->header;
500  const struct urom_rdmo_append_hdr *append_hdr = (struct urom_rdmo_append_hdr *)(rdmo_hdr + 1);
501  khint_t k;
502  struct urom_worker_rdmo_mkey *rdmo_mkey;
503  ucp_rkey_h ucp_rkey;
504  uint64_t *sm_addr;
506  uint64_t *sm_ptr = NULL;
507 
508  if (req->ctx[0] == 0) {
509  /* Stage 1: FADD */
510 
511  k = kh_get(mkey, req->client->mkeys, append_hdr->ptr_rkey);
512  if (k == kh_end(req->client->mkeys)) {
513  DOCA_LOG_ERR("Unknown ptr_rkey: %lu", append_hdr->ptr_rkey);
514  return DOCA_SUCCESS;
515  }
516 
517  rdmo_mkey = kh_value(req->client->mkeys, k);
518  req->ctx[2] = (uint64_t)rdmo_mkey;
519  ucp_rkey = rdmo_mkey->ucp_rkey;
520  if ((uintptr_t)ucp_rkey != append_hdr->ptr_rkey)
522 
523  if (ucp_rkey_ptr(ucp_rkey, append_hdr->ptr_addr, (void **)&sm_addr) == UCS_OK) {
524  sm_ptr = sm_addr;
525  req->ctx[1] = *sm_addr;
526  req->ctx[0] = 2; /* Next: put */
527  DOCA_LOG_DBG("Performed SM FADD, req: %p", req);
528  } else if (urom_worker_rdmo_mem_cache_get(rdmo_mkey, append_hdr->ptr_addr, &req->ctx[1]) ==
529  DOCA_SUCCESS) {
530  DOCA_LOG_DBG("Using cached pointer val: %#lx, req: %p", req->ctx[1], req);
531  req->ctx[0] = 1; /* Next: cache update */
532  } else {
533  memset(&req_param, 0, sizeof(req_param));
534  req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA;
535  req_param.cb.send = urom_worker_rdmo_op_send_cb;
536  req_param.user_data = req;
537 
538  ucs_status_ptr = ucp_get_nbx(req->client->ep->ep,
539  &req->ctx[1],
540  8,
541  append_hdr->ptr_addr,
542  ucp_rkey,
543  &req_param);
544  if (UCS_PTR_STATUS(ucs_status_ptr) != UCS_INPROGRESS)
545  return DOCA_ERROR_DRIVER;
546 
547  DOCA_LOG_DBG("Initiated Get, req: %p", req);
548  req->ctx[0] = 1; /* Next: cache update */
549 
550  req->client->pause = 1; /* Prevent concurrent AMOs */
551 
552  return DOCA_ERROR_IN_PROGRESS;
553  }
554  }
555 
556  if (req->ctx[0] == 1) {
557  /* Stage 2: update cache */
558  rdmo_mkey = (struct urom_worker_rdmo_mkey *)req->ctx[2];
559  result = urom_worker_rdmo_mem_cache_put(rdmo_mkey, append_hdr->ptr_addr, req->ctx[1] + req->length);
560  if (result != DOCA_SUCCESS)
561  return result;
562  req->ctx[0] = 2; /* Next: put */
563  req->client->pause = 0;
564  }
565 
566  if (req->ctx[0] == 2) {
567  /* Stage 3: Put */
568  if (append_hdr->ptr_rkey != append_hdr->data_rkey) {
569  k = kh_get(mkey, req->client->mkeys, append_hdr->data_rkey);
570  if (k == kh_end(req->client->mkeys)) {
571  DOCA_LOG_ERR("Unknown data_rkey: %lu", append_hdr->data_rkey);
572  return DOCA_SUCCESS;
573  }
574 
575  rdmo_mkey = kh_value(req->client->mkeys, k);
576  ucp_rkey = rdmo_mkey->ucp_rkey;
577  } else {
578  rdmo_mkey = (struct urom_worker_rdmo_mkey *)req->ctx[2];
579  ucp_rkey = rdmo_mkey->ucp_rkey;
580  }
581 
582  if (req->ctx[1] < rdmo_mkey->va || (req->ctx[1] + req->length) > (rdmo_mkey->va + rdmo_mkey->len)) {
583  DOCA_LOG_ERR("Append out of bounds, put: %#lx-%#lx mkey: %#lx-%#lx",
584  req->ctx[1],
585  req->ctx[1] + req->length,
586  rdmo_mkey->va,
587  rdmo_mkey->va + rdmo_mkey->len);
588  return DOCA_ERROR_UNEXPECTED;
589  }
590 
591  if (req->param.recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) {
592  if (rdmo_mkey->ucp_memh == NULL) {
593  DOCA_LOG_ERR("RNDV AM requires xGVMI support");
595  }
596 
597  memset(&req_param, 0, sizeof(req_param));
598  req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA |
599  UCP_OP_ATTR_FIELD_MEMH;
600  req_param.cb.recv_am = urom_worker_rdmo_am_recv_data_cb;
601  req_param.user_data = req;
602  req_param.memh = rdmo_mkey->ucp_memh;
603 
604  ucs_status_ptr = ucp_am_recv_data_nbx(req->client->rdmo_worker->ucp_data.ucp_worker,
605  req->data,
606  (void *)req->ctx[1],
607  req->length,
608  &req_param);
609  if (UCS_PTR_IS_ERR(ucs_status_ptr))
610  return DOCA_ERROR_DRIVER;
611 
612  req->ctx[0] = 3;
613 
614  if (UCS_PTR_STATUS(ucs_status_ptr) == UCS_INPROGRESS) {
615  DOCA_LOG_DBG("Initiated Get to: %#lx len: %lu req %p", req->ctx[1], req->length, req);
616  return DOCA_ERROR_IN_PROGRESS;
617  }
618  if (UCS_PTR_STATUS(ucs_status_ptr) != UCS_OK)
619  return DOCA_ERROR_DRIVER;
620 
621  DOCA_LOG_DBG("Completed Get, req: %p", req);
622  } else if (ucp_rkey_ptr(ucp_rkey, req->ctx[1], (void **)&sm_addr) != UCS_OK) {
623  memset(&req_param, 0, sizeof(req_param));
624  req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA;
625  req_param.cb.send = urom_worker_rdmo_op_send_cb;
626  req_param.user_data = req;
627 
628  /* Estimate ceiling benefit of Put aggregation */
629  ucs_status_ptr = ucp_put_nbx(req->client->ep->ep,
630  req->data,
631  req->length,
632  req->ctx[1],
633  ucp_rkey,
634  &req_param);
635 
636  if (UCS_PTR_IS_ERR(ucs_status_ptr))
637  return DOCA_ERROR_DRIVER;
638 
639  req->ctx[0] = 3;
640 
641  if (UCS_PTR_STATUS(ucs_status_ptr) == UCS_INPROGRESS) {
642  DOCA_LOG_DBG("Initiated Put to: %#lx len: %lu req %p", req->ctx[1], req->length, req);
643  return DOCA_ERROR_IN_PROGRESS;
644  }
645  if (UCS_PTR_STATUS(ucs_status_ptr) != UCS_OK)
646  return DOCA_ERROR_DRIVER;
647 
648  DOCA_LOG_DBG("Completed Put, req: %p", req);
649  } else {
650  /* Buffer is in shared memory between client and urom_worker */
651  memcpy((void *)sm_addr, req->data, req->length);
653  *sm_ptr += req->length;
654  DOCA_LOG_DBG("Completed copy, req: %p", req);
655  }
656 
657  /* Send complete, fall through to completion */
658  }
659 
660  /* Stage 3: Put ack */
661  DOCA_LOG_DBG("Completed Append request: %p", req);
662 
663  return DOCA_SUCCESS;
664 }
665 
666 /* RDMO append operations */
669 };
670 
671 /*
672  * Progress function for scatter operations
673  *
674  * @req [in]: RDMO request
675  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
676  */
678 {
679  ucs_status_ptr_t ucs_status_ptr;
680  ucp_request_param_t req_param;
681  const struct urom_rdmo_hdr *rdmo_hdr = (const struct urom_rdmo_hdr *)req->header;
682  const struct urom_rdmo_scatter_hdr *scatter_hdr = (struct urom_rdmo_scatter_hdr *)(rdmo_hdr + 1);
683  khint_t k;
684  uint64_t prev_rkey = UINT64_MAX;
685  struct urom_worker_rdmo_mkey *rdmo_mkey = NULL;
686  ucp_rkey_h ucp_rkey = NULL;
687  uint64_t *sm_addr;
688  struct urom_rdmo_scatter_iov *iov;
689  void *iov_data;
690  uint64_t i;
691 
692  if (req->ctx[0] == 0) {
693  if (req->param.recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) {
694  DOCA_LOG_ERR("Rendezvous scatter not supported");
696  }
697 
698  /* Stage 2: Do Puts */
699  iov = (struct urom_rdmo_scatter_iov *)req->data;
700  iov_data = (void *)(iov + 1);
701 
702  for (i = 0; i < scatter_hdr->count; i++) {
703  if (iov->rkey != prev_rkey) {
704  k = kh_get(mkey, req->client->mkeys, iov->rkey);
705  if (k == kh_end(req->client->mkeys)) {
706  DOCA_LOG_ERR("Unknown rkey: %lu", iov->rkey);
707  return DOCA_SUCCESS;
708  }
709 
710  rdmo_mkey = kh_value(req->client->mkeys, k);
711  ucp_rkey = rdmo_mkey->ucp_rkey;
712  prev_rkey = iov->rkey;
713  }
714 
715  if (iov->addr < rdmo_mkey->va || (iov->addr + iov->len) > (rdmo_mkey->va + rdmo_mkey->len)) {
716  DOCA_LOG_ERR("Scatter IOV out of bounds, put: %#lx-%#lx mkey: %#lx-%#lx",
717  iov->addr,
718  iov->addr + iov->len,
719  rdmo_mkey->va,
720  rdmo_mkey->va + rdmo_mkey->len);
721  return DOCA_ERROR_UNEXPECTED;
722  }
723 
724  if (ucp_rkey_ptr(ucp_rkey, iov->addr, (void **)&sm_addr) != UCS_OK) {
725  memset(&req_param, 0, sizeof(req_param));
726  req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA;
727  req_param.cb.send = urom_worker_rdmo_scatter_op_send_cb;
728  req_param.user_data = req;
729 
730  ucs_status_ptr = ucp_put_nbx(req->client->ep->ep,
731  iov_data,
732  iov->len,
733  iov->addr,
734  ucp_rkey,
735  &req_param);
736  if (UCS_PTR_IS_ERR(ucs_status_ptr))
737  return DOCA_ERROR_DRIVER;
738 
739  if (UCS_PTR_STATUS(ucs_status_ptr) == UCS_INPROGRESS) {
740  DOCA_LOG_DBG("Initiated Put to: %#lx len: %u req %p", iov->addr, iov->len, req);
741  req->ctx[1]++; /* Pending completion */
742  } else {
743  if (UCS_PTR_STATUS(ucs_status_ptr) != UCS_OK)
744  return DOCA_ERROR_DRIVER;
745 
746  DOCA_LOG_DBG("Completed Scatter Put, req: %p", req);
747  }
748  } else {
749  /* Buffer is in shared memory between client and urom_worker */
750  memcpy((void *)sm_addr, iov_data, iov->len);
751 
752  DOCA_LOG_DBG("Completed copy, req: %p", req);
753  }
754 
755  iov = (struct urom_rdmo_scatter_iov *)((uintptr_t)iov + sizeof(*iov) + iov->len);
756  iov_data = (void *)(iov + 1);
757  }
758 
759  /* all sends complete */
760  req->ctx[0] = 1;
761  }
762 
763  if (req->ctx[0] == 1) {
764  /* Stage 3: Wait for all completions */
765  if (req->ctx[1])
766  return DOCA_ERROR_IN_PROGRESS;
767  }
768 
769  DOCA_LOG_DBG("Completed Scatter request: %p", req);
770 
771  return DOCA_SUCCESS;
772 }
773 
774 /* RDMO scatter operations */
777 };
778 
779 /* RDMO worker requests operations */
784 };
785 
787 {
788  struct urom_worker_rdmo_client *client = req->client;
789  doca_error_t status;
790 
791  if (!ucs_list_is_empty(&client->paused_ops) || client->pause) {
792  DOCA_LOG_DBG("New paused request: %p", req);
793  ucs_list_add_tail(&client->paused_ops, &req->entry);
794  return DOCA_ERROR_IN_PROGRESS;
795  }
796 
797  status = urom_worker_rdmo_req_start(req);
798  if (status == DOCA_SUCCESS)
800 
801  return status;
802 }
#define NULL
Definition: __stddef_null.h:26
int32_t result
uintptr_t addr
if(bitoffset % 64+bitlength > 64) result|
#define UROM_RDMO_AM_ID
Definition: worker_rdmo.h:39
static struct doca_flow_pipe_entry * entry[MAX_ENTRIES]
#define __attribute__(_x_)
To allow compiling functions and structs that are using GCC attributes using attribute() in compilers...
Definition: doca_compat.h:81
enum doca_error doca_error_t
DOCA API return codes.
@ DOCA_ERROR_INVALID_VALUE
Definition: doca_error.h:44
@ DOCA_ERROR_UNEXPECTED
Definition: doca_error.h:60
@ DOCA_ERROR_NOT_FOUND
Definition: doca_error.h:54
@ DOCA_ERROR_NOT_SUPPORTED
Definition: doca_error.h:42
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_IN_PROGRESS
Definition: doca_error.h:64
@ DOCA_ERROR_DRIVER
Definition: doca_error.h:59
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
__UINTPTR_TYPE__ uintptr_t
Definition: stdint.h:298
ucp_worker_h ucp_worker
Definition: worker_rdmo.h:45
uint32_t flags
Definition: urom_rdmo.h:163
uint32_t rsp_id
Definition: urom_rdmo.h:201
ucs_list_link_t paused_ops
Definition: worker_rdmo.h:95
struct urom_worker_rdmo * rdmo_worker
Definition: worker_rdmo.h:88
struct urom_worker_rdmo_ep * ep
Definition: worker_rdmo.h:91
ucs_list_link_t fenced_ops
Definition: worker_rdmo.h:55
doca_error_t(* progress)(struct urom_worker_rdmo_req *req)
Definition: worker_rdmo.h:108
uint8_t header[UROM_RDMO_HDR_LEN_MAX]
Definition: worker_rdmo.h:117
struct urom_worker_rdmo_ep * ep
Definition: worker_rdmo.h:115
ucs_list_link_t entry
Definition: worker_rdmo.h:113
struct urom_worker_rdmo_req_ops * ops
Definition: worker_rdmo.h:121
struct urom_worker_rdmo_client * client
Definition: worker_rdmo.h:114
ucp_am_recv_param_t param
Definition: worker_rdmo.h:120
struct ucp_data ucp_data
Definition: worker_rdmo.h:127
@ UROM_RDMO_OP_APPEND
Definition: urom_rdmo.h:155
@ UROM_RDMO_OP_SCATTER
Definition: urom_rdmo.h:156
@ UROM_RDMO_OP_FLUSH
Definition: urom_rdmo.h:154
@ UROM_RDMO_REQ_FLAG_FENCE
Definition: urom_rdmo.h:149
static void urom_worker_rdmo_addr_flush_cb(void *request, ucs_status_t ucs_status, void *user_data)
static void urom_worker_rdmo_scatter_op_send_cb(void *request, ucs_status_t ucs_status, void *user_data)
static void urom_worker_rdmo_am_recv_data_cb(void *request, ucs_status_t ucs_status, size_t length, void *user_data)
static ucs_status_ptr_t urom_worker_rdmo_addr_flush_slow(struct urom_worker_rdmo_client *client, uint64_t addr, uint64_t val, struct urom_worker_rdmo_mkey *rdmo_mkey)
static void urom_worker_rdmo_req_complete(struct urom_worker_rdmo_req *req)
#define urom_rdmo_compiler_fence()
static doca_error_t urom_worker_rdmo_flush_progress(struct urom_worker_rdmo_req *req)
static doca_error_t urom_worker_rdmo_mem_cache_flush(struct urom_worker_rdmo_client *client)
static void urom_worker_rdmo_req_free_data(struct urom_worker_rdmo_req *req)
static doca_error_t urom_worker_rdmo_mem_cache_get(struct urom_worker_rdmo_mkey *rdmo_mkey, uint64_t addr, uint64_t *val)
DOCA_LOG_REGISTER(UROM::WORKER::RDMO::OPS)
struct urom_worker_rdmo_req_ops * urom_worker_rdmo_ops_table[]
static struct urom_worker_rdmo_req_ops urom_worker_rdmo_scatter_ops
static struct urom_worker_rdmo_req_ops urom_worker_rdmo_flush_ops
static doca_error_t urom_worker_rdmo_mem_cache_put(struct urom_worker_rdmo_mkey *rdmo_mkey, uint64_t addr, uint64_t val)
doca_error_t urom_worker_rdmo_req_queue(struct urom_worker_rdmo_req *req)
static void urom_worker_rdmo_req_free(struct urom_worker_rdmo_req *req)
static doca_error_t urom_worker_rdmo_req_start(struct urom_worker_rdmo_req *req)
static void urom_worker_rdmo_op_send_cb(void *request, ucs_status_t ucs_status, void *user_data)
static void urom_worker_rdmo_check_paused(struct urom_worker_rdmo_client *client)
static void urom_worker_rdmo_op_cb(void *request, ucs_status_t ucs_status, void *user_data)
static void urom_worker_rdmo_check_fenced(struct urom_worker_rdmo_ep *ep)
static doca_error_t urom_worker_rdmo_append_progress(struct urom_worker_rdmo_req *req)
static doca_error_t urom_worker_rdmo_scatter_progress(struct urom_worker_rdmo_req *req)
static struct urom_worker_rdmo_req_ops urom_worker_rdmo_append_ops