DPDK  25.11.0
rte_ring_hts_elem_pvt.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_HTS_ELEM_PVT_H_
11 #define _RTE_RING_HTS_ELEM_PVT_H_
12 
13 #include <rte_stdatomic.h>
14 
26 static __rte_always_inline void
27 __rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail,
28  uint32_t num, uint32_t enqueue)
29 {
30  uint32_t tail;
31 
32  RTE_SET_USED(enqueue);
33 
34  tail = old_tail + num;
35 
36  /*
37  * R0: Release the tail update. Establishes a synchronization edge with
38  * the load-acquire at A1. This release ensures that all updates to *ht
39  * and the ring array made by this thread become visible to the opposing
40  * thread once the tail value written here is observed.
41  */
42  rte_atomic_store_explicit(&ht->ht.pos.tail, tail, rte_memory_order_release);
43 }
44 
56 static __rte_always_inline union __rte_ring_hts_pos
57 __rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht,
58  rte_memory_order memorder)
59 {
60  union __rte_ring_hts_pos p;
61  p.raw = rte_atomic_load_explicit(&ht->ht.raw, memorder);
62 
63  while (p.pos.head != p.pos.tail) {
64  rte_pause();
65  p.raw = rte_atomic_load_explicit(&ht->ht.raw, memorder);
66  }
67 
68  return p;
69 }
70 
95 static __rte_always_inline uint32_t
96 __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
97  const struct rte_ring_headtail *s, uint32_t capacity, unsigned int num,
98  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
99  uint32_t *entries)
100 {
101  uint32_t n, stail;
102  union __rte_ring_hts_pos np, op;
103 
104  do {
105  /* Reset n to the initial burst count */
106  n = num;
107 
108  /*
109  * wait for tail to be equal to head,
110  * make sure that we read prod head/tail *before*
111  * reading cons tail.
112  */
113  /*
114  * A0: Synchronizes with the CAS at R1.
115  * Establishes a happens-before relationship with a thread of the same
116  * type that released the ht.raw, ensuring this thread observes all of
117  * its memory effects needed to maintain a safe partial order.
118  */
119  op = __rte_ring_hts_head_wait(d, rte_memory_order_acquire);
120 
121  /*
122  * A1: Establish a synchronizes-with edge using a store-release at R0.
123  * This ensures that all memory effects from the preceding opposing
124  * thread are observed.
125  */
126  stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
127 
128  /*
129  * The subtraction is done between two unsigned 32bits value
130  * (the result is always modulo 32 bits even if we have
131  * *old_head > cons_tail). So 'entries' is always between 0
132  * and capacity (which is < size).
133  */
134  *entries = capacity + stail - op.pos.head;
135 
136  /* check that we have enough room in ring */
137  if (unlikely(n > *entries))
138  n = (behavior == RTE_RING_QUEUE_FIXED) ?
139  0 : *entries;
140 
141  if (n == 0)
142  break;
143 
144  np.pos.tail = op.pos.tail;
145  np.pos.head = op.pos.head + n;
146 
147  /*
148  * R1: Establishes a synchronizes-with edge with the load-acquire
149  * of ht.raw at A0. This makes sure that the store-release to the
150  * tail by this thread, if it was of the opposite type, becomes
151  * visible to another thread of the current type. That thread will
152  * then observe the updates in the same order, keeping a safe
153  * partial order.
154  */
155  } while (rte_atomic_compare_exchange_strong_explicit(&d->ht.raw,
156  (uint64_t *)(uintptr_t)&op.raw, np.raw,
157  rte_memory_order_release,
158  rte_memory_order_relaxed) == 0);
159 
160  *old_head = op.pos.head;
161  return n;
162 }
166 static __rte_always_inline unsigned int
167 __rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num,
168  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
169  uint32_t *free_entries)
170 {
171  return __rte_ring_hts_move_head(&r->hts_prod, &r->cons,
172  r->capacity, num, behavior, old_head, free_entries);
173 }
174 
178 static __rte_always_inline unsigned int
179 __rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num,
180  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
181  uint32_t *entries)
182 {
183  return __rte_ring_hts_move_head(&r->hts_cons, &r->prod,
184  0, num, behavior, old_head, entries);
185 }
186 
209 static __rte_always_inline unsigned int
210 __rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table,
211  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
212  uint32_t *free_space)
213 {
214  uint32_t free, head;
215 
216  n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
217 
218  if (n != 0) {
219  __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
220  __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1);
221  }
222 
223  if (free_space != NULL)
224  *free_space = free - n;
225  return n;
226 }
227 
250 static __rte_always_inline unsigned int
251 __rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table,
252  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
253  uint32_t *available)
254 {
255  uint32_t entries, head;
256 
257  n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries);
258 
259  if (n != 0) {
260  __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
261  __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0);
262  }
263 
264  if (available != NULL)
265  *available = entries - n;
266  return n;
267 }
268 
269 #endif /* _RTE_RING_HTS_ELEM_PVT_H_ */
#define unlikely(x)
#define RTE_SET_USED(x)
Definition: rte_common.h:264
#define __rte_always_inline
Definition: rte_common.h:490
static void rte_pause(void)
rte_ring_queue_behavior
Definition: rte_ring_core.h:40
@ RTE_RING_QUEUE_FIXED
Definition: rte_ring_core.h:42
uint32_t capacity