NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
file_compression_core.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2022-2024 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include <arpa/inet.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/stat.h>
30 #include <sys/mman.h>
31 #include <time.h>
32 #include <unistd.h>
33 #include <zlib.h>
34 
35 #include <doca_argp.h>
36 #include <doca_log.h>
37 
38 #include <pack.h>
39 #include <utils.h>
40 
41 #include "file_compression_core.h"
42 
43 #define MAX_MSG 512 /* Maximum number of messages in CC queue */
44 #define SW_MAX_FILE_SIZE 128 * 1024 * 1024 /* 128 MB */
45 #define SLEEP_IN_NANOS (10 * 1000) /* Sample the task every 10 microseconds */
46 #define DECOMPRESS_RATIO 1032 /* Maximal decompress ratio size */
47 #define DEFAULT_TIMEOUT 10 /* default timeout for receiving messages */
48 
49 DOCA_LOG_REGISTER(FILE_COMPRESSION::Core);
50 
52  uint64_t checksum; /* Checksum of file to be transferred */
53  uint32_t num_segs; /* Number of comch segments to transfer file across */
54 };
55 
56 /*
57  * Get DOCA compress maximum buffer size allowed
58  *
59  * @resources [in]: DOCA compress resources pointer
60  * @max_buf_size [out]: Maximum buffer size allowed
61  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
62  */
63 static doca_error_t get_compress_max_buf_size(struct compress_resources *resources, uint64_t *max_buf_size)
64 {
65  struct doca_devinfo *compress_dev_info = doca_dev_as_devinfo(resources->state->dev);
67 
69  result = doca_compress_cap_task_compress_deflate_get_max_buf_size(compress_dev_info, max_buf_size);
70  else
71  result = doca_compress_cap_task_decompress_deflate_get_max_buf_size(compress_dev_info, max_buf_size);
72 
73  if (result != DOCA_SUCCESS)
74  DOCA_LOG_ERR("Failed to retrieve maximum buffer size allowed from DOCA compress device");
75  else
76  DOCA_LOG_DBG("DOCA compress device supports maximum buffer size of %" PRIu64 " bytes", *max_buf_size);
77 
78  return result;
79 }
80 
81 /*
82  * Allocate DOCA compress needed resources with 2 buffers
83  *
84  * @mode [in]: Running mode
85  * @resources [out]: DOCA compress resources pointer
86  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
87  */
89 {
90  uint32_t max_bufs = 2;
92 
93  if (mode == CLIENT)
95  else
97 
99  if (result != DOCA_SUCCESS)
100  DOCA_LOG_ERR("Failed to allocate compress resources: %s", doca_error_get_descr(result));
101 
102  return result;
103 }
104 
105 /*
106  * Initiate DOCA compress needed flows
107  *
108  * @compress_cfg [in]: application config struct
109  * @resources [out]: DOCA compress resources pointer
110  * @method [out]: Compress method to execute
111  * @max_buf_size [out]: Maximum compress buffer size
112  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
113  */
117  uint64_t *max_buf_size)
118 {
120 
121  /* Default is to use HW compress */
122  *method = COMPRESS_DEFLATE_HW;
123 
124  /* Allocate compress resources */
126  if (result != DOCA_SUCCESS) {
128  DOCA_LOG_INFO("Failed to find device for compress task, running SW compress with zlib");
129  *method = COMPRESS_DEFLATE_SW;
130  *max_buf_size = SW_MAX_FILE_SIZE;
132  } else if (resources->mode == COMPRESS_MODE_DECOMPRESS_DEFLATE)
133  DOCA_LOG_ERR("Failed to allocate compress resources: %s", doca_error_get_descr(result));
134  } else {
135  result = get_compress_max_buf_size(resources, max_buf_size);
136  if (result != DOCA_SUCCESS) {
137  DOCA_LOG_ERR("Failed to retrieve DOCA compress device maximum buffer size: %s",
140  if (result != DOCA_SUCCESS)
141  DOCA_LOG_ERR("Failed to destroy DOCA compress resources: %s",
143  }
144  }
145 
146  return result;
147 }
148 
149 /*
150  * Unmap callback - free doca_buf allocated pointer
151  *
152  * @addr [in]: Memory range pointer
153  * @len [in]: Memory range length
154  * @opaque [in]: An opaque pointer passed to iterator
155  */
156 static void unmap_cb(void *addr, size_t len, void *opaque)
157 {
158  (void)opaque;
159 
160  if (addr != NULL)
161  munmap(addr, len);
162 }
163 
164 /*
165  * Populate destination doca buffer for compress tasks
166  *
167  * @state [in]: application configuration struct
168  * @dst_buffer [in]: destination buffer
169  * @dst_buf_size [in]: destination buffer size
170  * @dst_doca_buf [out]: created doca buffer
171  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
172  */
174  uint8_t *dst_buffer,
175  size_t dst_buf_size,
176  struct doca_buf **dst_doca_buf)
177 {
179 
180  result = doca_mmap_set_memrange(state->dst_mmap, dst_buffer, dst_buf_size);
181  if (result != DOCA_SUCCESS) {
182  DOCA_LOG_ERR("Unable to set memory range destination memory map: %s", doca_error_get_descr(result));
183  return result;
184  }
185 
186  result = doca_mmap_start(state->dst_mmap);
187  if (result != DOCA_SUCCESS) {
188  DOCA_LOG_ERR("Unable to start destination memory map: %s", doca_error_get_descr(result));
189  return result;
190  }
191 
193  state->dst_mmap,
194  dst_buffer,
195  dst_buf_size,
196  dst_doca_buf);
197  if (result != DOCA_SUCCESS) {
198  DOCA_LOG_ERR("Unable to acquire DOCA buffer representing destination buffer: %s",
200  return result;
201  }
202  return result;
203 }
204 
205 /*
206  * Allocate DOCA compress resources and submit compress/decompress task
207  *
208  * @file_data [in]: file data to the source buffer
209  * @file_size [in]: file size
210  * @dst_buf_size [in]: allocated destination buffer length
211  * @resources [in]: DOCA compress resources
212  * @compressed_file [out]: destination buffer with the result
213  * @compressed_file_len [out]: destination buffer size
214  * @output_chksum [out]: the returned checksum
215  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
216  */
217 static doca_error_t compress_file_hw(char *file_data,
218  size_t file_size,
219  size_t dst_buf_size,
221  uint8_t *compressed_file,
222  size_t *compressed_file_len,
223  uint64_t *output_chksum)
224 {
225  struct doca_buf *dst_doca_buf;
226  struct doca_buf *src_doca_buf;
227  uint8_t *resp_head;
228  struct program_core_objects *state = resources->state;
229  doca_error_t result, tmp_result;
230 
231  /* Start compress context */
232  result = doca_ctx_start(state->ctx);
233  if (result != DOCA_SUCCESS) {
234  DOCA_LOG_ERR("Failed to start context: %s", doca_error_get_descr(result));
235  return result;
236  }
237 
238  result = doca_mmap_set_memrange(state->src_mmap, file_data, file_size);
239  if (result != DOCA_SUCCESS) {
240  DOCA_LOG_ERR("Unable to set memory range of source memory map: %s", doca_error_get_descr(result));
241  return result;
242  }
244  if (result != DOCA_SUCCESS) {
245  DOCA_LOG_ERR("Unable to set free callback of source memory map: %s", doca_error_get_descr(result));
246  munmap(file_data, file_size);
247  return result;
248  }
249  result = doca_mmap_start(state->src_mmap);
250  if (result != DOCA_SUCCESS) {
251  DOCA_LOG_ERR("Unable to start source memory map: %s", doca_error_get_descr(result));
252  return result;
253  }
254 
255  result =
256  doca_buf_inventory_buf_get_by_addr(state->buf_inv, state->src_mmap, file_data, file_size, &src_doca_buf);
257  if (result != DOCA_SUCCESS) {
258  DOCA_LOG_ERR("Unable to acquire DOCA buffer representing source buffer: %s",
260  return result;
261  }
262 
263  doca_buf_get_data(src_doca_buf, (void **)&resp_head);
264  doca_buf_set_data(src_doca_buf, resp_head, file_size);
265 
266  result = populate_dst_buf(state, compressed_file, dst_buf_size, &dst_doca_buf);
267  if (result != DOCA_SUCCESS) {
268  DOCA_LOG_ERR("Failed to populate destination buffer: %s", doca_error_get_descr(result));
269  goto dec_src_buf;
270  }
271 
273  result = submit_compress_deflate_task(resources, src_doca_buf, dst_doca_buf, output_chksum);
274  else
275  result = submit_decompress_deflate_task(resources, src_doca_buf, dst_doca_buf, output_chksum);
276  if (result != DOCA_SUCCESS) {
277  DOCA_LOG_ERR("Failed to submit %s task: %s",
278  (resources->mode == COMPRESS_MODE_COMPRESS_DEFLATE) ? "compress" : "decompress",
280  goto dec_dst_buf;
281  }
282 
283  doca_buf_get_data_len(dst_doca_buf, compressed_file_len);
284 
285 dec_dst_buf:
286  tmp_result = doca_buf_dec_refcount(dst_doca_buf, NULL);
287  if (tmp_result != DOCA_SUCCESS) {
288  DOCA_LOG_ERR("Failed to decrease DOCA destination buffer count: %s", doca_error_get_descr(tmp_result));
289  DOCA_ERROR_PROPAGATE(result, tmp_result);
290  }
291 dec_src_buf:
292  tmp_result = doca_buf_dec_refcount(src_doca_buf, NULL);
293  if (tmp_result != DOCA_SUCCESS) {
294  DOCA_LOG_ERR("Failed to decrease DOCA source buffer count: %s", doca_error_get_descr(tmp_result));
295  DOCA_ERROR_PROPAGATE(result, tmp_result);
296  }
297 
298  return result;
299 }
300 
301 /*
302  * Compress the input file in SW
303  *
304  * @file_data [in]: file data to the source buffer
305  * @file_size [in]: file size
306  * @dst_buf_size [in]: allocated destination buffer length
307  * @compressed_file [out]: destination buffer with the result
308  * @compressed_file_len [out]: destination buffer size
309  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
310  */
311 static doca_error_t compress_file_sw(char *file_data,
312  size_t file_size,
313  size_t dst_buf_size,
314  Byte **compressed_file,
315  uLong *compressed_file_len)
316 {
317  z_stream c_stream; /* compression stream */
318  int err;
319 
320  memset(&c_stream, 0, sizeof(c_stream));
321 
322  c_stream.zalloc = NULL;
323  c_stream.zfree = NULL;
324 
325  err = deflateInit2(&c_stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -MAX_WBITS, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY);
326  if (err < 0) {
327  DOCA_LOG_ERR("Failed to initialize compression system");
328  return DOCA_ERROR_BAD_STATE;
329  }
330 
331  c_stream.next_in = (z_const unsigned char *)file_data;
332  c_stream.next_out = *compressed_file;
333 
334  c_stream.avail_in = file_size;
335  c_stream.avail_out = dst_buf_size;
336  err = deflate(&c_stream, Z_NO_FLUSH);
337  if (err < 0) {
338  DOCA_LOG_ERR("Failed to compress file");
339  return DOCA_ERROR_BAD_STATE;
340  }
341 
342  /* Finish the stream */
343  err = deflate(&c_stream, Z_FINISH);
344  if (err < 0 || err != Z_STREAM_END) {
345  DOCA_LOG_ERR("Failed to compress file");
346  return DOCA_ERROR_BAD_STATE;
347  }
348 
349  err = deflateEnd(&c_stream);
350  if (err < 0) {
351  DOCA_LOG_ERR("Failed to compress file");
352  return DOCA_ERROR_BAD_STATE;
353  }
354  *compressed_file_len = c_stream.total_out;
355  return DOCA_SUCCESS;
356 }
357 
358 /*
359  * Calculate file checksum with zlib, where the lower 32 bits contain the CRC checksum result
360  * and the upper 32 bits contain the Adler checksum result.
361  *
362  * @file_data [in]: file data to the source buffer
363  * @file_size [in]: file size
364  * @output_chksum [out]: the calculated checksum
365  */
366 static void calculate_checksum_sw(char *file_data, size_t file_size, uint64_t *output_chksum)
367 {
368  uint32_t crc;
369  uint32_t adler;
370  uint64_t result_checksum;
371 
372  crc = crc32(0L, Z_NULL, 0);
373  crc = crc32(crc, (const unsigned char *)file_data, file_size);
374  adler = adler32(0L, Z_NULL, 0);
375  adler = adler32(adler, (const unsigned char *)file_data, file_size);
376 
377  result_checksum = adler;
378  result_checksum <<= 32;
379  result_checksum += crc;
380 
381  *output_chksum = result_checksum;
382 }
383 
384 /*
385  * Compress / decompress the input file data
386  *
387  * @file_data [in]: file data to the source buffer
388  * @file_size [in]: file size
389  * @max_buf_size [in]: maximum compress buffer size allowed
390  * @resources [in]: DOCA compress resources
391  * @method [in]: Compression method to be used
392  * @compressed_file [out]: destination buffer with the result
393  * @compressed_file_len [out]: destination buffer size
394  * @output_chksum [out]: the calculated checksum
395  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
396  */
397 static doca_error_t compress_file(char *file_data,
398  size_t file_size,
399  uint64_t max_buf_size,
402  uint8_t **compressed_file,
403  size_t *compressed_file_len,
404  uint64_t *output_chksum)
405 {
406  size_t dst_buf_size = 0;
407 
408  enum compress_mode;
409 
411  dst_buf_size = MAX(file_size + 16, file_size * 2);
412  if (dst_buf_size > max_buf_size)
413  dst_buf_size = max_buf_size;
414  } else if (resources->mode == COMPRESS_MODE_DECOMPRESS_DEFLATE)
415  dst_buf_size = MIN(max_buf_size, DECOMPRESS_RATIO * file_size);
416 
417  *compressed_file = calloc(1, dst_buf_size);
418  if (*compressed_file == NULL) {
419  DOCA_LOG_ERR("Failed to allocate memory");
420  return DOCA_ERROR_NO_MEMORY;
421  }
422 
423  if (method == COMPRESS_DEFLATE_SW) {
424  calculate_checksum_sw(file_data, file_size, output_chksum);
425  return compress_file_sw(file_data, file_size, dst_buf_size, compressed_file, compressed_file_len);
426  } else
427  return compress_file_hw(file_data,
428  file_size,
429  dst_buf_size,
430  resources,
431  *compressed_file,
432  compressed_file_len,
433  output_chksum);
434 }
435 
436 /*
437  * Send the input file with comch to the server in segments of max_comch_msg length
438  *
439  * @compress_cfg [in]: compression configuration information
440  * @comch_cfg [in]: comch configuration object for sending file across
441  * @file_data [in]: file data to the source buffer
442  * @file_size [in]: file size
443  * @checksum [in]: checksum of the file
444  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
445  */
447  struct comch_cfg *comch_cfg,
448  char *file_data,
449  size_t file_size,
450  uint64_t checksum)
451 {
452  struct file_info_message file_meta = {};
453  uint32_t max_comch_msg;
454  uint32_t total_msgs;
455  size_t msg_len;
456  uint32_t i;
458  struct timespec ts = {
459  .tv_nsec = SLEEP_IN_NANOS,
460  };
461 
463  if (max_comch_msg == 0) {
464  DOCA_LOG_ERR("Comch max buffer size is zero");
466  }
467 
468  /* Send to the server the number of messages needed for receiving the file and its checksum */
469  total_msgs = (file_size + max_comch_msg - 1) / max_comch_msg;
470  file_meta.num_segs = htonl(total_msgs);
471  file_meta.checksum = htonq(checksum);
472 
474  if (result != DOCA_SUCCESS) {
475  DOCA_LOG_ERR("Failed to send file info message: %s", doca_error_get_descr(result));
476  return result;
477  }
478 
479  /* Send file to the server */
480  for (i = 0; i < total_msgs; i++) {
481  /* Stop sending if the server is done receiving */
482  if (compress_cfg->state == TRANSFER_COMPLETE)
483  return DOCA_ERROR_IO_FAILED;
484 
485  msg_len = MIN(file_size, max_comch_msg);
487  while (result == DOCA_ERROR_AGAIN) {
488  nanosleep(&ts, &ts);
490  if (result != DOCA_SUCCESS)
491  break;
493  }
494 
495  if (result != DOCA_SUCCESS) {
496  DOCA_LOG_ERR("File data was not sent: %s", doca_error_get_descr(result));
497  return result;
498  }
499  file_data += msg_len;
500  file_size -= msg_len;
501  }
502  return DOCA_SUCCESS;
503 }
504 
505 void client_recv_event_cb(struct doca_comch_event_msg_recv *event,
506  uint8_t *recv_buffer,
507  uint32_t msg_len,
508  struct doca_comch_connection *comch_connection)
509 {
510  struct file_compression_config *cfg = comch_utils_get_user_data(comch_connection);
511 
512  /* A message is only expected from the server when it has read the compressed file successfully */
513  (void)event;
514 
515  /* Print the completion message sent from the server */
516  recv_buffer[msg_len] = '\0';
517  DOCA_LOG_INFO("Received message: %s", recv_buffer);
518  cfg->state = TRANSFER_COMPLETE;
519 }
520 
524 {
525  char *file_data;
526  struct stat statbuf;
527  int fd;
528  uint8_t *compressed_file;
529  size_t compressed_file_len;
530  uint64_t checksum;
532  struct timespec ts = {
533  .tv_nsec = SLEEP_IN_NANOS,
534  };
535 
536  fd = open(compress_cfg->file_path, O_RDWR);
537  if (fd < 0) {
538  DOCA_LOG_ERR("Failed to open %s", compress_cfg->file_path);
539  return DOCA_ERROR_IO_FAILED;
540  }
541 
542  if (fstat(fd, &statbuf) < 0) {
543  DOCA_LOG_ERR("Failed to get file information");
544  close(fd);
545  return DOCA_ERROR_IO_FAILED;
546  }
547 
548  if (statbuf.st_size == 0 || (uint64_t)statbuf.st_size > compress_cfg->max_compress_file_len) {
549  DOCA_LOG_ERR("Invalid file size. Should be greater then zero and smaller than %" PRIu64 " bytes",
550  compress_cfg->max_compress_file_len);
551  close(fd);
553  }
554 
555  file_data = mmap(NULL, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
556  if (file_data == MAP_FAILED) {
557  DOCA_LOG_ERR("Unable to map file content: %s", strerror(errno));
558  close(fd);
559  return DOCA_ERROR_NO_MEMORY;
560  }
561 
562  DOCA_LOG_TRC("File size: %ld", statbuf.st_size);
563  /* Send compress task */
564  result = compress_file(file_data,
565  statbuf.st_size,
566  compress_cfg->max_compress_file_len,
567  resources,
568  compress_cfg->compress_method,
569  &compressed_file,
570  &compressed_file_len,
571  &checksum);
572  if (result != DOCA_SUCCESS) {
573  close(fd);
574  free(compressed_file);
575  return result;
576  }
577  close(fd);
578  DOCA_LOG_TRC("Compressed file size: %ld", compressed_file_len);
579 
580  /* Send the file content to the server */
581  result = send_file(compress_cfg, comch_cfg, (char *)compressed_file, compressed_file_len, checksum);
582  if (result != DOCA_SUCCESS) {
583  free(compressed_file);
584  return result;
585  }
586  free(compressed_file);
587 
588  /* Wait for a signal that the transfer has complete */
589  while (compress_cfg->state != TRANSFER_COMPLETE) {
590  nanosleep(&ts, &ts);
592  if (result != DOCA_SUCCESS) {
593  DOCA_LOG_ERR("Comch connection unexpectedly dropped: %s", doca_error_get_descr(result));
594  return result;
595  }
596  }
597 
598  return result;
599 }
600 
601 void server_recv_event_cb(struct doca_comch_event_msg_recv *event,
602  uint8_t *recv_buffer,
603  uint32_t msg_len,
604  struct doca_comch_connection *comch_connection)
605 {
606  struct file_compression_config *cfg = comch_utils_get_user_data(comch_connection);
607  struct server_runtime_data *server_data;
608 
609  (void)event;
610 
611  if (cfg == NULL) {
612  DOCA_LOG_ERR("Cannot get configuration information");
613  return;
614  }
615 
616  /* Ignore any events occurring after transfer is complete */
617  if (cfg->state == TRANSFER_COMPLETE || cfg->state == TRANSFER_ERROR)
618  return;
619 
620  server_data = &cfg->server_data;
621 
622  /* First received message should contain file metadata */
623  if (server_data->expected_file_chunks == 0) {
624  struct file_info_message *file_info = (struct file_info_message *)recv_buffer;
625 
626  if (msg_len != sizeof(struct file_info_message)) {
627  DOCA_LOG_ERR("Unexpected file info message received. Size %u, expected size %lu",
628  msg_len,
629  sizeof(struct file_info_message));
630  cfg->state = TRANSFER_ERROR;
631  return;
632  }
633 
634  server_data->expected_file_chunks = ntohl(file_info->num_segs);
635  server_data->expected_checksum = ntohq(file_info->checksum);
636  cfg->state = TRANSFER_IN_PROGRESS;
637  return;
638  }
639 
640  if (server_data->received_file_length + msg_len > cfg->max_compress_file_len) {
641  DOCA_LOG_ERR("Received file exceeded maximum file size. Received length: %u, maximum size: %lu",
642  server_data->received_file_length + msg_len,
643  cfg->max_compress_file_len);
644  cfg->state = TRANSFER_ERROR;
645  return;
646  }
647  memcpy(server_data->compressed_file + server_data->received_file_length, recv_buffer, msg_len);
648 
649  server_data->received_file_chunks += 1;
650  server_data->received_file_length += msg_len;
651 
652  if (server_data->received_file_chunks == server_data->expected_file_chunks)
653  cfg->state = TRANSFER_COMPLETE;
654 }
655 
659 {
660  struct server_runtime_data *server_data;
661  int fd;
662  char finish_msg[] = "Server was done receiving messages";
663  uint8_t *resp_head;
664  uint64_t checksum;
665  size_t data_len;
666  int counter = 0;
667  int num_of_iterations = (compress_cfg->timeout * 1000 * 1000) / (SLEEP_IN_NANOS / 1000);
668  struct timespec ts = {
669  .tv_nsec = SLEEP_IN_NANOS,
670  };
672 
673  server_data = (struct server_runtime_data *)&compress_cfg->server_data;
674 
675  /* Wait on comch to complete client to server transactions */
676  while (compress_cfg->state != TRANSFER_COMPLETE && compress_cfg->state != TRANSFER_ERROR) {
677  nanosleep(&ts, &ts);
679  if (result != DOCA_SUCCESS) {
680  DOCA_LOG_ERR("Comch connection unexpectedly dropped: %s", doca_error_get_descr(result));
681  return result;
682  }
683 
684  if (compress_cfg->state == TRANSFER_IDLE)
685  continue;
686 
687  counter++;
688  if (counter == num_of_iterations) {
689  DOCA_LOG_ERR("Message was not received at the given timeout");
691  goto finish_msg;
692  }
693  }
694 
695  if (compress_cfg->state == TRANSFER_ERROR) {
696  DOCA_LOG_ERR("Error detected during comch exchange");
698  goto finish_msg;
699  }
700 
701  result = compress_file(server_data->compressed_file,
702  server_data->received_file_length,
703  compress_cfg->max_compress_file_len,
704  resources,
705  compress_cfg->compress_method,
706  &resp_head,
707  &data_len,
708  &checksum);
709  if (result != DOCA_SUCCESS) {
710  DOCA_LOG_ERR("Failed to decompress the received file: %s", doca_error_get_descr(result));
711  free(resp_head);
712  goto finish_msg;
713  }
714  if (checksum == server_data->expected_checksum)
715  DOCA_LOG_INFO("SUCCESS: file was received and decompressed successfully");
716  else {
717  DOCA_LOG_ERR("ERROR: file checksum is different. received: 0x%lx, calculated: 0x%lx",
718  server_data->expected_checksum,
719  checksum);
720  free(resp_head);
722  goto finish_msg;
723  }
724 
725  fd = open(compress_cfg->file_path, O_CREAT | O_WRONLY, S_IRUSR | S_IRGRP);
726  if (fd < 0) {
727  DOCA_LOG_ERR("Failed to open %s", compress_cfg->file_path);
728  free(resp_head);
730  goto finish_msg;
731  }
732 
733  if ((size_t)write(fd, resp_head, data_len) != data_len) {
734  DOCA_LOG_ERR("Failed to write the decompressed into the input file");
735  free(resp_head);
736  close(fd);
738  goto finish_msg;
739  }
740  free(resp_head);
741  close(fd);
742 
743 finish_msg:
744  if (comch_utils_send(comch_util_get_connection(comch_cfg), finish_msg, sizeof(finish_msg)) != DOCA_SUCCESS)
745  DOCA_LOG_ERR("Failed to send finish message: %s", doca_error_get_descr(result));
746 
747  return result;
748 }
749 
751 {
753 
754  /* set default timeout */
755  if (compress_cfg->timeout == 0)
756  compress_cfg->timeout = DEFAULT_TIMEOUT;
757 
759  resources,
760  &compress_cfg->compress_method,
761  &compress_cfg->max_compress_file_len);
762  if (result != DOCA_SUCCESS)
763  return result;
764 
765  /* Server should preallocate memory to receive a file */
766  if (compress_cfg->mode == SERVER) {
767  compress_cfg->server_data.compressed_file = calloc(1, compress_cfg->max_compress_file_len);
768  if (compress_cfg->server_data.compressed_file == NULL) {
769  DOCA_LOG_ERR("Failed to allocate file memory");
771  return DOCA_ERROR_NO_MEMORY;
772  }
773  }
774 
775  return DOCA_SUCCESS;
776 }
777 
779 {
781 
782  (void)compress_cfg;
783 
785  if (result != DOCA_SUCCESS)
786  DOCA_LOG_ERR("Failed to destroy compress resources: %s", doca_error_get_descr(result));
787 
788  if (compress_cfg->mode == SERVER) {
789  free(compress_cfg->server_data.compressed_file);
790  compress_cfg->server_data.compressed_file = NULL;
791  }
792 }
793 
794 /*
795  * ARGP Callback - Handle file parameter
796  *
797  * @param [in]: Input parameter
798  * @config [in/out]: Program configuration context
799  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
800  */
801 static doca_error_t file_callback(void *param, void *config)
802 {
804  char *file_path = (char *)param;
805 
806  if (strnlen(file_path, MAX_FILE_NAME) == MAX_FILE_NAME) {
807  DOCA_LOG_ERR("File name is too long - MAX=%d", MAX_FILE_NAME - 1);
809  }
811  return DOCA_SUCCESS;
812 }
813 
814 /*
815  * ARGP Callback - Handle Comch DOCA device PCI address parameter
816  *
817  * @param [in]: Input parameter
818  * @config [in/out]: Program configuration context
819  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
820  */
821 static doca_error_t dev_pci_addr_callback(void *param, void *config)
822 {
824  char *pci_addr = (char *)param;
825 
826  if (strnlen(pci_addr, DOCA_DEVINFO_PCI_ADDR_SIZE) == DOCA_DEVINFO_PCI_ADDR_SIZE) {
827  DOCA_LOG_ERR("Entered device PCI address exceeding the maximum size of %d",
830  }
831  strlcpy(compress_cfg->cc_dev_pci_addr, pci_addr, DOCA_DEVINFO_PCI_ADDR_SIZE);
832  return DOCA_SUCCESS;
833 }
834 
835 /*
836  * ARGP Callback - Handle Comch DOCA device representor PCI address parameter
837  *
838  * @param [in]: Input parameter
839  * @config [in/out]: Program configuration context
840  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
841  */
842 static doca_error_t rep_pci_addr_callback(void *param, void *config)
843 {
845  const char *rep_pci_addr = (char *)param;
846 
847  if (compress_cfg->mode == SERVER) {
848  if (strnlen(rep_pci_addr, DOCA_DEVINFO_REP_PCI_ADDR_SIZE) == DOCA_DEVINFO_REP_PCI_ADDR_SIZE) {
849  DOCA_LOG_ERR("Entered device representor PCI address exceeding the maximum size of %d",
852  }
853 
854  strlcpy(compress_cfg->cc_dev_rep_pci_addr, rep_pci_addr, DOCA_DEVINFO_REP_PCI_ADDR_SIZE);
855  }
856 
857  return DOCA_SUCCESS;
858 }
859 
860 /*
861  * ARGP Callback - Handle timeout parameter
862  *
863  * @param [in]: Input parameter
864  * @config [in/out]: Program configuration context
865  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
866  */
867 static doca_error_t timeout_callback(void *param, void *config)
868 {
870  int *timeout = (int *)param;
871 
872  if (*timeout <= 0) {
873  DOCA_LOG_ERR("Timeout parameter must be positive value");
875  }
876  compress_cfg->timeout = *timeout;
877  return DOCA_SUCCESS;
878 }
879 
880 /*
881  * ARGP validation Callback - check if the running mode is valid and that the input file exists in client mode
882  *
883  * @cfg [in]: Program configuration context
884  * @return: DOCA_SUCCESS on success and DOCA_ERROR otherwise
885  */
887 {
889 
890  if (compress_cfg->mode == CLIENT && (access(compress_cfg->file_path, F_OK) == -1)) {
891  DOCA_LOG_ERR("File was not found %s", compress_cfg->file_path);
892  return DOCA_ERROR_NOT_FOUND;
893  } else if (compress_cfg->mode == SERVER && strlen(compress_cfg->cc_dev_rep_pci_addr) == 0) {
894  DOCA_LOG_ERR("Missing representor PCI address for server");
895  return DOCA_ERROR_NOT_FOUND;
896  }
897  return DOCA_SUCCESS;
898 }
899 
901 {
903 
904  struct doca_argp_param *dev_pci_addr_param, *rep_pci_addr_param, *file_param, *timeout_param;
905 
906  /* Create and register pci param */
907  result = doca_argp_param_create(&dev_pci_addr_param);
908  if (result != DOCA_SUCCESS) {
909  DOCA_LOG_ERR("Failed to create ARGP param: %s", doca_error_get_descr(result));
910  return result;
911  }
912  doca_argp_param_set_short_name(dev_pci_addr_param, "p");
913  doca_argp_param_set_long_name(dev_pci_addr_param, "pci-addr");
914  doca_argp_param_set_description(dev_pci_addr_param, "DOCA Comch device PCI address");
916  doca_argp_param_set_type(dev_pci_addr_param, DOCA_ARGP_TYPE_STRING);
917  doca_argp_param_set_mandatory(dev_pci_addr_param);
918  result = doca_argp_register_param(dev_pci_addr_param);
919  if (result != DOCA_SUCCESS) {
920  DOCA_LOG_ERR("Failed to register program param: %s", doca_error_get_descr(result));
921  return result;
922  }
923 
924  /* Create and register rep PCI address param */
925  result = doca_argp_param_create(&rep_pci_addr_param);
926  if (result != DOCA_SUCCESS) {
927  DOCA_LOG_ERR("Failed to create ARGP param: %s", doca_error_get_descr(result));
928  return result;
929  }
930  doca_argp_param_set_short_name(rep_pci_addr_param, "r");
931  doca_argp_param_set_long_name(rep_pci_addr_param, "rep-pci");
932  doca_argp_param_set_description(rep_pci_addr_param, "DOCA Comch device representor PCI address");
934  doca_argp_param_set_type(rep_pci_addr_param, DOCA_ARGP_TYPE_STRING);
935  result = doca_argp_register_param(rep_pci_addr_param);
936  if (result != DOCA_SUCCESS) {
937  DOCA_LOG_ERR("Failed to register program param: %s", doca_error_get_descr(result));
938  return result;
939  }
940 
941  /* Create and register message to send param */
942  result = doca_argp_param_create(&file_param);
943  if (result != DOCA_SUCCESS) {
944  DOCA_LOG_ERR("Failed to create ARGP param: %s", doca_error_get_descr(result));
945  return result;
946  }
947  doca_argp_param_set_short_name(file_param, "f");
948  doca_argp_param_set_long_name(file_param, "file");
949  doca_argp_param_set_description(file_param, "File to send by the client / File to write by the server");
952  doca_argp_param_set_mandatory(file_param);
953  result = doca_argp_register_param(file_param);
954  if (result != DOCA_SUCCESS) {
955  DOCA_LOG_ERR("Failed to register program param: %s", doca_error_get_descr(result));
956  return result;
957  }
958 
959  /* Create and register timeout */
960  result = doca_argp_param_create(&timeout_param);
961  if (result != DOCA_SUCCESS) {
962  DOCA_LOG_ERR("Failed to create ARGP param: %s", doca_error_get_descr(result));
963  return result;
964  }
965  doca_argp_param_set_short_name(timeout_param, "t");
966  doca_argp_param_set_long_name(timeout_param, "timeout");
967  doca_argp_param_set_description(timeout_param,
968  "Application timeout for receiving file content messages, default is 5 sec");
971  result = doca_argp_register_param(timeout_param);
972  if (result != DOCA_SUCCESS) {
973  DOCA_LOG_ERR("Failed to register program param: %s", doca_error_get_descr(result));
974  return result;
975  }
976 
977  /* Register version callback for DOCA SDK & RUNTIME */
979  if (result != DOCA_SUCCESS) {
980  DOCA_LOG_ERR("Failed to register version callback: %s", doca_error_get_descr(result));
981  return result;
982  }
983 
984  /* Register application callback */
986  if (result != DOCA_SUCCESS) {
987  DOCA_LOG_ERR("Failed to register program validation callback: %s", doca_error_get_descr(result));
988  return result;
989  }
990 
991  return result;
992 }
#define NULL
Definition: __stddef_null.h:26
int32_t result
void * comch_utils_get_user_data(struct doca_comch_connection *connection)
Definition: comch_utils.c:254
doca_error_t comch_utils_progress_connection(struct doca_comch_connection *connection)
Definition: comch_utils.c:264
struct doca_comch_connection * comch_util_get_connection(struct comch_cfg *comch_cfg)
Definition: comch_utils.c:276
doca_error_t comch_utils_send(struct doca_comch_connection *connection, const void *msg, uint32_t len)
Definition: comch_utils.c:212
uint32_t comch_utils_get_max_buffer_size(struct comch_cfg *comch_cfg)
Definition: comch_utils.c:286
doca_error_t destroy_compress_resources(struct compress_resources *resources)
doca_error_t allocate_compress_resources(const char *pci_addr, uint32_t max_bufs, struct compress_resources *resources)
doca_error_t submit_decompress_deflate_task(struct compress_resources *resources, struct doca_buf *src_buf, struct doca_buf *dst_buf, uint64_t *output_checksum)
doca_error_t submit_compress_deflate_task(struct compress_resources *resources, struct doca_buf *src_buf, struct doca_buf *dst_buf, uint64_t *output_checksum)
compress_mode
@ COMPRESS_MODE_COMPRESS_DEFLATE
@ COMPRESS_MODE_DECOMPRESS_DEFLATE
#define MIN(X, Y)
Definition: utils.h:30
#define MAX(X, Y)
Definition: utils.h:34
uintptr_t addr
doca_dpa_dev_mmap_t mmap
uint64_t len
static doca_error_t compress_file_hw(char *file_data, size_t file_size, size_t dst_buf_size, struct compress_resources *resources, uint8_t *compressed_file, size_t *compressed_file_len, uint64_t *output_chksum)
doca_error_t register_file_compression_params(void)
doca_error_t file_compression_client(struct comch_cfg *comch_cfg, struct file_compression_config *compress_cfg, struct compress_resources *resources)
static doca_error_t init_compress_resources(struct file_compression_config *compress_cfg, struct compress_resources *resources, enum file_compression_compress_method *method, uint64_t *max_buf_size)
static doca_error_t populate_dst_buf(struct program_core_objects *state, uint8_t *dst_buffer, size_t dst_buf_size, struct doca_buf **dst_doca_buf)
static doca_error_t rep_pci_addr_callback(void *param, void *config)
#define DECOMPRESS_RATIO
static doca_error_t get_compress_max_buf_size(struct compress_resources *resources, uint64_t *max_buf_size)
static void calculate_checksum_sw(char *file_data, size_t file_size, uint64_t *output_chksum)
doca_error_t file_compression_init(struct file_compression_config *compress_cfg, struct compress_resources *resources)
static doca_error_t timeout_callback(void *param, void *config)
#define DEFAULT_TIMEOUT
DOCA_LOG_REGISTER(FILE_COMPRESSION::Core)
void server_recv_event_cb(struct doca_comch_event_msg_recv *event, uint8_t *recv_buffer, uint32_t msg_len, struct doca_comch_connection *comch_connection)
void client_recv_event_cb(struct doca_comch_event_msg_recv *event, uint8_t *recv_buffer, uint32_t msg_len, struct doca_comch_connection *comch_connection)
static void unmap_cb(void *addr, size_t len, void *opaque)
static doca_error_t args_validation_callback(void *cfg)
#define SW_MAX_FILE_SIZE
static doca_error_t compress_file(char *file_data, size_t file_size, uint64_t max_buf_size, struct compress_resources *resources, enum file_compression_compress_method method, uint8_t **compressed_file, size_t *compressed_file_len, uint64_t *output_chksum)
void file_compression_cleanup(struct file_compression_config *compress_cfg, struct compress_resources *resources)
doca_error_t file_compression_server(struct comch_cfg *comch_cfg, struct file_compression_config *compress_cfg, struct compress_resources *resources)
static doca_error_t dev_pci_addr_callback(void *param, void *config)
#define SLEEP_IN_NANOS
static doca_error_t compress_file_sw(char *file_data, size_t file_size, size_t dst_buf_size, Byte **compressed_file, uLong *compressed_file_len)
static doca_error_t file_callback(void *param, void *config)
static doca_error_t get_compress_resources(enum file_compression_mode mode, struct compress_resources *resources)
static doca_error_t send_file(struct file_compression_config *compress_cfg, struct comch_cfg *comch_cfg, char *file_data, size_t file_size, uint64_t checksum)
@ TRANSFER_ERROR
@ TRANSFER_COMPLETE
@ TRANSFER_IN_PROGRESS
@ TRANSFER_IDLE
file_compression_mode
file_compression_compress_method
@ COMPRESS_DEFLATE_SW
@ COMPRESS_DEFLATE_HW
#define MAX_FILE_NAME
struct rdma_resources resources
DOCA_EXPERIMENTAL doca_error_t doca_argp_register_validation_callback(doca_argp_validation_cb_t callback)
Register program validation callback function.
DOCA_EXPERIMENTAL void doca_argp_param_set_description(struct doca_argp_param *param, const char *description)
Set the description of the program param, used during program usage.
DOCA_EXPERIMENTAL void doca_argp_param_set_long_name(struct doca_argp_param *param, const char *name)
Set the long name of the program param.
DOCA_EXPERIMENTAL void doca_argp_param_set_callback(struct doca_argp_param *param, doca_argp_param_cb_t callback)
Set the callback function of the program param.
DOCA_EXPERIMENTAL void doca_argp_param_set_mandatory(struct doca_argp_param *param)
Mark the program param as mandatory.
DOCA_EXPERIMENTAL doca_error_t doca_argp_param_create(struct doca_argp_param **param)
Create new program param.
DOCA_EXPERIMENTAL void doca_argp_param_set_type(struct doca_argp_param *param, enum doca_argp_type type)
Set the type of the param arguments.
DOCA_EXPERIMENTAL doca_error_t doca_argp_register_version_callback(doca_argp_param_cb_t callback)
Register an alternative version callback.
DOCA_EXPERIMENTAL void doca_argp_param_set_short_name(struct doca_argp_param *param, const char *name)
Set the short name of the program param.
DOCA_EXPERIMENTAL doca_error_t doca_argp_register_param(struct doca_argp_param *input_param)
Register a program flag.
@ DOCA_ARGP_TYPE_STRING
Definition: doca_argp.h:56
@ DOCA_ARGP_TYPE_INT
Definition: doca_argp.h:57
static doca_error_t doca_buf_inventory_buf_get_by_addr(struct doca_buf_inventory *inventory, struct doca_mmap *mmap, void *addr, size_t len, struct doca_buf **buf)
Allocate single element from buffer inventory and point it to the buffer defined by addr & len argume...
DOCA_STABLE doca_error_t doca_buf_dec_refcount(struct doca_buf *buf, uint16_t *refcount)
Decrease the object reference count by 1, if 0 reached, return the element back to the inventory.
DOCA_STABLE doca_error_t doca_buf_get_data(const struct doca_buf *buf, void **data)
Get the buffer's data.
DOCA_STABLE doca_error_t doca_buf_get_data_len(const struct doca_buf *buf, size_t *data_len)
Get buffer's data length.
DOCA_STABLE doca_error_t doca_buf_set_data(struct doca_buf *buf, void *data, size_t data_len)
DOCA_EXPERIMENTAL doca_error_t doca_compress_cap_task_decompress_deflate_get_max_buf_size(const struct doca_devinfo *devinfo, uint64_t *max_buffer_size)
Get decompress deflate max size.
DOCA_EXPERIMENTAL doca_error_t doca_compress_cap_task_compress_deflate_get_max_buf_size(const struct doca_devinfo *devinfo, uint64_t *max_buffer_size)
Get compress deflate max size.
DOCA_STABLE doca_error_t doca_ctx_start(struct doca_ctx *ctx)
Finalizes all configurations, and starts the DOCA CTX.
#define DOCA_DEVINFO_REP_PCI_ADDR_SIZE
Buffer size to hold PCI BDF format: "XXXX:XX:XX.X". Including a null terminator.
Definition: doca_dev.h:665
#define DOCA_DEVINFO_PCI_ADDR_SIZE
Buffer size to hold PCI BDF format: "XXXX:XX:XX.X". Including a null terminator.
Definition: doca_dev.h:313
DOCA_STABLE struct doca_devinfo * doca_dev_as_devinfo(const struct doca_dev *dev)
Get local device info from device. This should be useful when wanting to query information about devi...
#define DOCA_ERROR_PROPAGATE(r, t)
Save the first encountered doca_error_t.
Definition: doca_error.h:83
enum doca_error doca_error_t
DOCA API return codes.
DOCA_STABLE const char * doca_error_get_descr(doca_error_t error)
Returns the description string of an error code.
@ DOCA_ERROR_INVALID_VALUE
Definition: doca_error.h:44
@ DOCA_ERROR_NOT_FOUND
Definition: doca_error.h:54
@ DOCA_ERROR_BAD_STATE
Definition: doca_error.h:56
@ DOCA_ERROR_IO_FAILED
Definition: doca_error.h:55
@ DOCA_ERROR_AGAIN
Definition: doca_error.h:43
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_NO_MEMORY
Definition: doca_error.h:45
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_TRC(format,...)
Generates a TRACE application log message.
Definition: doca_log.h:513
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
DOCA_STABLE doca_error_t doca_mmap_set_memrange(struct doca_mmap *mmap, void *addr, size_t len)
Set the memory range of DOCA memory map.
DOCA_STABLE doca_error_t doca_mmap_start(struct doca_mmap *mmap)
Start DOCA Memory Map.
DOCA_STABLE doca_error_t doca_mmap_set_free_cb(struct doca_mmap *mmap, doca_mmap_memrange_free_cb_t *free_cb, void *opaque)
Set callback that will free the memory range when destroying DOCA memory map.
const struct ip_frag_config * cfg
Definition: ip_frag_dp.c:0
uint64_t ntohq(uint64_t value)
Definition: pack.c:30
#define htonq
Definition: pack.h:48
enum compress_mode mode
char file_path[MAX_FILE_NAME]
char file_path[MAX_FILE_NAME]
struct doca_mmap * src_mmap
Definition: common.h:47
struct doca_buf_inventory * buf_inv
Definition: common.h:49
struct doca_mmap * dst_mmap
Definition: common.h:48
struct doca_ctx * ctx
Definition: common.h:50
size_t strlcpy(char *dst, const char *src, size_t size)
Definition: utils.c:123
noreturn doca_error_t sdk_version_callback(void *param, void *doca_config)
Definition: utils.c:41