NVIDIA DOCA SDK Data Center on a Chip Framework Documentation
tcp_cpu_rss_func.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2023-2024 NVIDIA CORPORATION AND AFFILIATES. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification, are permitted
5  * provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright notice, this list of
7  * conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright notice, this list of
9  * conditions and the following disclaimer in the documentation and/or other materials
10  * provided with the distribution.
11  * * Neither the name of the NVIDIA CORPORATION nor the names of its contributors may be used
12  * to endorse or promote products derived from this software without specific prior written
13  * permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
19  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
20  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21  * STRICT LIABILITY, OR TOR (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
22  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  *
24  */
25 
26 #include "tcp_cpu_rss_func.h"
27 #include "tcp_session_table.h"
28 
29 DOCA_LOG_REGISTER(TCP_CPU_RSS);
30 
31 int tcp_cpu_rss_func(void *lcore_args)
32 {
33  struct rte_mbuf **rx_packets;
34  struct rte_mbuf **tx_packets;
35  uint32_t num_tx_packets = 0;
36  uint16_t port_id = DPDK_DEFAULT_PORT;
37  const struct rxq_tcp_queues *tcp_queues = lcore_args;
38  struct rte_mbuf *ack;
39  int num_sent;
41  uint16_t queue_id;
42 
43  if (tcp_queues == NULL) {
44  DOCA_LOG_ERR("%s: 'tcp_queues argument cannot be NULL", __func__);
45  DOCA_GPUNETIO_VOLATILE(force_quit) = true;
46  return -1;
47  }
48  if (tcp_queues->port == NULL) {
49  DOCA_LOG_ERR("%s: 'tcp_queues->port argument cannot be NULL", __func__);
50  DOCA_GPUNETIO_VOLATILE(force_quit) = true;
51  return -1;
52  }
53  if (tcp_queues->rxq_pipe_gpu == NULL) {
54  DOCA_LOG_ERR("%s: 'tcp_queues->rxq_pipe_gpu argument cannot be NULL", __func__);
55  DOCA_GPUNETIO_VOLATILE(force_quit) = true;
56  return -1;
57  }
58 
59  queue_id = rte_lcore_index(rte_lcore_id()) - tcp_queues->lcore_idx_start;
60 
61  rx_packets = (struct rte_mbuf **)calloc(TCP_PACKET_MAX_BURST_SIZE, sizeof(struct rte_mbuf *));
62  if (rx_packets == NULL) {
63  DOCA_LOG_ERR("No memory available to allocate DPDK rx packets");
64  return -1;
65  }
66 
67  tx_packets = (struct rte_mbuf **)calloc(TCP_PACKET_MAX_BURST_SIZE, sizeof(struct rte_mbuf *));
68  if (tx_packets == NULL) {
69  free(rx_packets);
70  DOCA_LOG_ERR("No memory available to allocate DPDK tx packets");
71  return -1;
72  }
73 
74  DOCA_LOG_INFO("Core %u is performing TCP SYN/FIN processing on queue %u", rte_lcore_id(), queue_id);
75 
76  /* read global force_quit */
77  while (DOCA_GPUNETIO_VOLATILE(force_quit) == false) {
78  int num_rx_packets = rte_eth_rx_burst(port_id, queue_id, rx_packets, TCP_PACKET_MAX_BURST_SIZE);
79 
80  for (int i = 0; i < num_rx_packets; i++) {
81  const struct rte_mbuf *pkt = rx_packets[i];
82  const struct rte_tcp_hdr *tcp_hdr = extract_tcp_hdr(pkt);
83 
84  if (!tcp_hdr) {
85  DOCA_LOG_WARN("Not a TCP packet");
86  continue;
87  }
88 
89  if (!tcp_hdr->syn && !tcp_hdr->fin && !tcp_hdr->rst) {
90  DOCA_LOG_WARN("Unexpected TCP packet flags: 0x%x, expected SYN/RST/FIN",
92  continue;
93  }
94 
95  if (tcp_hdr->rst) {
96  log_tcp_flag(pkt, "RST");
98  continue; // Do not bother to ack
99  } else if (tcp_hdr->fin) {
100  log_tcp_flag(pkt, "FIN");
102  } else if (tcp_hdr->syn) {
103  log_tcp_flag(pkt, "SYN");
105  if (result != DOCA_SUCCESS)
106  goto error;
107  } else {
108  DOCA_LOG_WARN("Unexpected TCP packet flags: 0x%x, expected SYN/RST/FIN",
109  tcp_hdr->tcp_flags);
110  continue;
111  }
112 
114  if (ack)
115  tx_packets[num_tx_packets++] = ack;
116  }
117 
118  while (num_tx_packets > 0) {
119  num_sent = rte_eth_tx_burst(port_id, queue_id, tx_packets, num_tx_packets);
120  DOCA_LOG_DBG("DPDK tx_burst sent %d packets", num_sent);
121  num_tx_packets -= num_sent;
122  }
123 
124  for (int i = 0; i < num_rx_packets; i++)
125  rte_pktmbuf_free(rx_packets[i]);
126  }
127 
128  free(rx_packets);
129  free(tx_packets);
130 
131  return 0;
132 error:
133 
134  free(rx_packets);
135  free(tx_packets);
136 
137  return -1;
138 }
139 
140 const struct rte_tcp_hdr *extract_tcp_hdr(const struct rte_mbuf *packet)
141 {
142  const struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(packet, struct rte_ether_hdr *);
143 
144  if (((uint16_t)htons(eth_hdr->ether_type)) != RTE_ETHER_TYPE_IPV4) {
145  DOCA_LOG_ERR("Expected ether_type 0x%x, got 0x%x",
146  RTE_ETHER_TYPE_IPV4,
147  ((uint16_t)htons(eth_hdr->ether_type)));
148  return NULL;
149  }
150 
151  const struct rte_ipv4_hdr *ipv4_hdr = (struct rte_ipv4_hdr *)&eth_hdr[1];
152 
153  if (ipv4_hdr->next_proto_id != IPPROTO_TCP) {
154  DOCA_LOG_ERR("Expected next_proto_id %d, got %d", IPPROTO_TCP, ipv4_hdr->next_proto_id);
155  return NULL;
156  }
157 
158  const struct rte_tcp_hdr *tcp_hdr = (struct rte_tcp_hdr *)&ipv4_hdr[1];
159 
160  return tcp_hdr;
161 }
162 
164  const struct rte_mbuf *pkt,
165  struct doca_flow_port *port,
166  struct doca_flow_pipe *gpu_rss_pipe)
167 {
168  int ret;
169  struct tcp_session_entry *session_entry;
170 
171  session_entry = rte_zmalloc("tcp_session", sizeof(struct tcp_session_entry), 0);
172  if (!session_entry) {
173  DOCA_LOG_ERR("Failed to allocate TCP session object");
174  return DOCA_ERROR_NO_MEMORY;
175  }
176  session_entry->key = extract_session_key(pkt);
177  enable_tcp_gpu_offload(port, queue_id, gpu_rss_pipe, session_entry);
178 
179  ret = rte_hash_add_key_data(tcp_session_table, &session_entry->key, session_entry);
180  if (ret != 0) {
181  DOCA_LOG_ERR("Couldn't add new has key data err %d", ret);
182  return DOCA_ERROR_DRIVER;
183  }
184 
185  return DOCA_SUCCESS;
186 }
187 
188 void destroy_tcp_session(const uint16_t queue_id, const struct rte_mbuf *pkt, struct doca_flow_port *port)
189 {
190  const struct tcp_session_key key = extract_session_key(pkt);
191  struct tcp_session_entry *session_entry = NULL;
192 
193  if (rte_hash_lookup_data(tcp_session_table, &key, (void **)&session_entry) < 0 || !session_entry)
194  return;
195 
196  disable_tcp_gpu_offload(port, queue_id, session_entry);
197 
198  rte_hash_del_key(tcp_session_table, &key);
199  rte_free(session_entry);
200 }
201 
202 void log_tcp_flag(const struct rte_mbuf *packet, const char *flags)
203 {
204  const struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(packet, struct rte_ether_hdr *);
205  const struct rte_ipv4_hdr *ipv4_hdr = (struct rte_ipv4_hdr *)&eth_hdr[1];
206  const struct rte_tcp_hdr *tcp_hdr = (struct rte_tcp_hdr *)&ipv4_hdr[1];
207  char src_addr[INET_ADDRSTRLEN];
208  char dst_addr[INET_ADDRSTRLEN];
209 
210  inet_ntop(AF_INET, &ipv4_hdr->src_addr, src_addr, INET_ADDRSTRLEN);
211  inet_ntop(AF_INET, &ipv4_hdr->dst_addr, dst_addr, INET_ADDRSTRLEN);
212  DOCA_LOG_INFO("Received %s for TCP %s:%d>%s:%d",
213  flags,
214  src_addr,
215  htons(tcp_hdr->src_port),
216  dst_addr,
217  htons(tcp_hdr->dst_port));
218 }
219 
220 struct rte_mbuf *create_ack_packet(const struct rte_mbuf *src_packet, struct rte_mempool *tcp_ack_pkt_pool)
221 {
222  uint32_t RTE_TCP_OPT_NOP_bytes = 1;
223  uint32_t RTE_TCP_OPT_MSS_nbytes = 4;
224  uint32_t RTE_TCP_OPT_WND_SCALE_nbytes = 3;
225  uint32_t RTE_TCP_OPT_SACK_PERMITTED_nbytes = 2;
226  uint32_t RTE_TCP_OPT_TIMESTAMP_nbytes = 10;
227  uint16_t mss = 8192; /* pick something */
228  size_t tcp_option_array_len = RTE_TCP_OPT_MSS_nbytes + RTE_TCP_OPT_SACK_PERMITTED_nbytes +
229  RTE_TCP_OPT_TIMESTAMP_nbytes + RTE_TCP_OPT_NOP_bytes +
230  RTE_TCP_OPT_WND_SCALE_nbytes;
231 
232  struct rte_ether_hdr *dst_eth_hdr;
233  struct rte_ipv4_hdr *dst_ipv4_hdr;
234  struct rte_tcp_hdr *dst_tcp_hdr;
235  uint8_t *dst_tcp_opts;
236  struct rte_mbuf *dst_packet;
237  const struct rte_ether_hdr *src_eth_hdr = rte_pktmbuf_mtod(src_packet, struct rte_ether_hdr *);
238  const struct rte_ipv4_hdr *src_ipv4_hdr = (struct rte_ipv4_hdr *)&src_eth_hdr[1];
239  const struct rte_tcp_hdr *src_tcp_hdr = (struct rte_tcp_hdr *)&src_ipv4_hdr[1];
240 
241  if (!src_tcp_hdr->syn) {
242  /* Do not bother with TCP options unless responding to SYN */
243  tcp_option_array_len = 0;
244  }
245 
246  dst_packet = rte_pktmbuf_alloc(tcp_ack_pkt_pool);
247  if (!dst_packet) {
248  DOCA_LOG_ERR("Failed to allocate TCP ACK packet");
249  return NULL;
250  }
251 
252  dst_eth_hdr =
253  (struct rte_ether_hdr *)rte_pktmbuf_append(dst_packet,
254  sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr) +
255  sizeof(struct rte_tcp_hdr) + tcp_option_array_len);
256  if (dst_eth_hdr == NULL)
257  goto release_dst;
258 
259  dst_ipv4_hdr = (struct rte_ipv4_hdr *)&dst_eth_hdr[1];
260  dst_tcp_hdr = (struct rte_tcp_hdr *)&dst_ipv4_hdr[1];
261  dst_tcp_opts = (uint8_t *)&dst_tcp_hdr[1];
262 
263  dst_eth_hdr->src_addr = src_eth_hdr->dst_addr;
264  dst_eth_hdr->dst_addr = src_eth_hdr->src_addr;
265  dst_eth_hdr->ether_type = src_eth_hdr->ether_type;
266 
267  /* Reminder: double-check remaining ack fields */
268  dst_ipv4_hdr->version = 4;
269  dst_ipv4_hdr->ihl = 5;
270  dst_ipv4_hdr->src_addr = src_ipv4_hdr->dst_addr;
271  dst_ipv4_hdr->dst_addr = src_ipv4_hdr->src_addr;
272  dst_ipv4_hdr->total_length =
273  RTE_BE16(sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_tcp_hdr) + tcp_option_array_len);
274  dst_ipv4_hdr->fragment_offset = htons(RTE_IPV4_HDR_DF_FLAG);
275  dst_ipv4_hdr->time_to_live = 64;
276  dst_ipv4_hdr->next_proto_id = IPPROTO_TCP;
277 
278  dst_tcp_hdr->src_port = src_tcp_hdr->dst_port;
279  dst_tcp_hdr->dst_port = src_tcp_hdr->src_port;
280  dst_tcp_hdr->recv_ack = RTE_BE32(RTE_BE32(src_tcp_hdr->sent_seq) + 1);
281  dst_tcp_hdr->sent_seq = src_tcp_hdr->syn ? RTE_BE32(1000) : src_tcp_hdr->recv_ack;
282  dst_tcp_hdr->rx_win = RTE_BE16(60000);
283  dst_tcp_hdr->dt_off = 5 + tcp_option_array_len / 4;
284 
285  if (!src_tcp_hdr->ack) {
286  dst_tcp_hdr->syn = src_tcp_hdr->syn;
287  dst_tcp_hdr->fin = src_tcp_hdr->fin;
288  }
289  dst_tcp_hdr->ack = 1;
290 
291  if (tcp_option_array_len) {
292  uint8_t *mss_opt = dst_tcp_opts;
293  uint8_t *sack_ok_opt = dst_tcp_opts + RTE_TCP_OPT_MSS_nbytes;
294  uint8_t *ts_opt = sack_ok_opt + RTE_TCP_OPT_SACK_PERMITTED_nbytes;
295  uint8_t *nop_opt = ts_opt + RTE_TCP_OPT_TIMESTAMP_nbytes;
296  uint8_t *ws_opt = nop_opt + 1;
297  time_t seconds = htonl(time(NULL));
298 
299  mss_opt[0] = RTE_TCP_OPT_MSS;
300  mss_opt[1] = RTE_TCP_OPT_MSS_nbytes;
301  mss_opt[2] = (uint8_t)(mss >> 8);
302  mss_opt[3] = (uint8_t)mss;
303 
304  sack_ok_opt[0] = RTE_TCP_OPT_SACK_PERMITTED;
305  sack_ok_opt[1] = RTE_TCP_OPT_SACK_PERMITTED_nbytes;
306 
307  ts_opt[0] = RTE_TCP_OPT_TIMESTAMP;
308  ts_opt[1] = RTE_TCP_OPT_TIMESTAMP_nbytes;
309  memcpy(ts_opt + 2, &seconds, 4);
310  // ts_opt+6 (ECR) set below
311 
312  nop_opt[0] = RTE_TCP_OPT_NOP;
313 
314  ws_opt[0] = RTE_TCP_OPT_WND_SCALE;
315  ws_opt[1] = RTE_TCP_OPT_WND_SCALE_nbytes;
316  ws_opt[2] = 7; // pick a scale
317 
318  const uint8_t *src_tcp_option = (uint8_t *)&src_tcp_hdr[1];
319  const uint8_t *src_tcp_options_end = src_tcp_option + 4 * src_tcp_hdr->data_off;
320  uint32_t opt_len = 0;
321 
322  while (src_tcp_option < src_tcp_options_end) {
323  DOCA_LOG_DBG("Processing TCP Option 0x%x", *src_tcp_option);
324  switch (*src_tcp_option) {
325  case RTE_TCP_OPT_END:
326  src_tcp_option = src_tcp_options_end; // end loop
327  break;
328  case RTE_TCP_OPT_NOP:
329  ++src_tcp_option;
330  break;
331  case RTE_TCP_OPT_MSS:
332  opt_len = *src_tcp_option;
333  src_tcp_option += 4; // don't care
334  break;
335  case RTE_TCP_OPT_WND_SCALE:
336  src_tcp_option += 3; // don't care
337  break;
338  case RTE_TCP_OPT_SACK_PERMITTED:
339  src_tcp_option += 2; // don't care
340  break;
341  case RTE_TCP_OPT_SACK:
342  opt_len = *src_tcp_option; // variable length; don't care
343  src_tcp_option += opt_len;
344  break;
345  case RTE_TCP_OPT_TIMESTAMP: {
346  const uint8_t *src_tsval = src_tcp_option + 2;
347  uint8_t *dst_tsecr = ts_opt + 6;
348 
349  memcpy(dst_tsecr, src_tsval, 4);
350  src_tcp_option += 10;
351  break;
352  }
353  }
354  }
355  } /* tcp options */
356 
357  /* Use offloaded checksum operations */
358  dst_packet->ol_flags |= RTE_MBUF_F_TX_IPV4 | RTE_MBUF_F_TX_IP_CKSUM | RTE_MBUF_F_TX_TCP_CKSUM;
359 
360  return dst_packet;
361 
362 release_dst:
363  rte_pktmbuf_free(dst_packet);
364  return NULL;
365 }
366 
367 struct tcp_session_key extract_session_key(const struct rte_mbuf *packet)
368 {
369  const struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(packet, struct rte_ether_hdr *);
370  const struct rte_ipv4_hdr *ipv4_hdr = (struct rte_ipv4_hdr *)&eth_hdr[1];
371  const struct rte_tcp_hdr *tcp_hdr = (struct rte_tcp_hdr *)&ipv4_hdr[1];
372 
373  struct tcp_session_key key = {
375  .dst_addr = ipv4_hdr->dst_addr,
376  .src_port = tcp_hdr->src_port,
377  .dst_port = tcp_hdr->dst_port,
378  };
379 
380  return key;
381 }
#define NULL
Definition: __stddef_null.h:26
int32_t result
doca_error_t enable_tcp_gpu_offload(struct doca_flow_port *port, uint16_t queue_id, struct doca_flow_pipe *gpu_rss_pipe, struct tcp_session_entry *session_entry)
Definition: flow.c:841
doca_error_t disable_tcp_gpu_offload(struct doca_flow_port *port, uint16_t queue_id, struct tcp_session_entry *session_entry)
Definition: flow.c:901
#define DPDK_DEFAULT_PORT
Definition: defines.h:88
static volatile bool force_quit
Definition: flow_skeleton.c:49
static struct rxq_tcp_queues tcp_queues
enum doca_error doca_error_t
DOCA API return codes.
@ DOCA_SUCCESS
Definition: doca_error.h:38
@ DOCA_ERROR_NO_MEMORY
Definition: doca_error.h:45
@ DOCA_ERROR_DRIVER
Definition: doca_error.h:59
#define DOCA_LOG_ERR(format,...)
Generates an ERROR application log message.
Definition: doca_log.h:466
#define DOCA_LOG_WARN(format,...)
Generates a WARNING application log message.
Definition: doca_log.h:476
#define DOCA_LOG_INFO(format,...)
Generates an INFO application log message.
Definition: doca_log.h:486
#define DOCA_LOG_DBG(format,...)
Generates a DEBUG application log message.
Definition: doca_log.h:496
uint16_t queue_id
Definition: ip_frag_dp.c:1
uint32_t src_addr
Definition: packets.h:8
uint32_t dst_addr
Definition: packets.h:9
uint32_t src_addr
Definition: packets.h:75
uint32_t dst_addr
Definition: packets.h:76
uint8_t next_proto_id
Definition: packets.h:73
struct doca_flow_pipe * rxq_pipe_gpu
Definition: common.h:59
struct rte_mempool * tcp_ack_pkt_pool
Definition: common.h:50
struct doca_flow_port * port
Definition: common.h:58
uint16_t lcore_idx_start
Definition: common.h:49
uint16_t src_port
Definition: packets.h:80
uint8_t tcp_flags
Definition: packets.h:85
uint16_t dst_port
Definition: packets.h:81
struct tcp_session_key key
const struct rte_tcp_hdr * extract_tcp_hdr(const struct rte_mbuf *packet)
doca_error_t create_tcp_session(const uint16_t queue_id, const struct rte_mbuf *pkt, struct doca_flow_port *port, struct doca_flow_pipe *gpu_rss_pipe)
void destroy_tcp_session(const uint16_t queue_id, const struct rte_mbuf *pkt, struct doca_flow_port *port)
struct tcp_session_key extract_session_key(const struct rte_mbuf *packet)
DOCA_LOG_REGISTER(TCP_CPU_RSS)
void log_tcp_flag(const struct rte_mbuf *packet, const char *flags)
int tcp_cpu_rss_func(void *lcore_args)
struct rte_mbuf * create_ack_packet(const struct rte_mbuf *src_packet, struct rte_mempool *tcp_ack_pkt_pool)
#define TCP_PACKET_MAX_BURST_SIZE
struct rte_hash * tcp_session_table