1/* SPDX-License-Identifier: GPL-2.0-or-later */
2/*
3 * Definitions for the 'struct ptr_ring' datastructure.
4 *
5 * Author:
6 * Michael S. Tsirkin <mst@redhat.com>
7 *
8 * Copyright (C) 2016 Red Hat, Inc.
9 *
10 * This is a limited-size FIFO maintaining pointers in FIFO order, with
11 * one CPU producing entries and another consuming entries from a FIFO.
12 *
13 * This implementation tries to minimize cache-contention when there is a
14 * single producer and a single consumer CPU.
15 */
16
17#ifndef _LINUX_PTR_RING_H
18#define _LINUX_PTR_RING_H 1
19
20#ifdef __KERNEL__
21#include <linux/spinlock.h>
22#include <linux/cache.h>
23#include <linux/types.h>
24#include <linux/compiler.h>
25#include <linux/slab.h>
26#include <linux/mm.h>
27#include <asm/errno.h>
28#endif
29
30struct ptr_ring {
31 int producer ____cacheline_aligned_in_smp;
32 spinlock_t producer_lock;
33 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34 int consumer_tail; /* next entry to invalidate */
35 spinlock_t consumer_lock;
36 /* Shared consumer/producer data */
37 /* Read-only by both the producer and the consumer */
38 int size ____cacheline_aligned_in_smp; /* max entries in queue */
39 int batch; /* number of entries to consume in a batch */
40 void **queue;
41};
42
43/* Note: callers invoking this in a loop must use a compiler barrier,
44 * for example cpu_relax().
45 *
46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47 * see e.g. ptr_ring_full.
48 */
49static inline bool __ptr_ring_full(struct ptr_ring *r)
50{
51 return r->queue[r->producer];
52}
53
54static inline bool ptr_ring_full(struct ptr_ring *r)
55{
56 bool ret;
57
58 spin_lock(lock: &r->producer_lock);
59 ret = __ptr_ring_full(r);
60 spin_unlock(lock: &r->producer_lock);
61
62 return ret;
63}
64
65static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66{
67 bool ret;
68
69 spin_lock_irq(lock: &r->producer_lock);
70 ret = __ptr_ring_full(r);
71 spin_unlock_irq(lock: &r->producer_lock);
72
73 return ret;
74}
75
76static inline bool ptr_ring_full_any(struct ptr_ring *r)
77{
78 unsigned long flags;
79 bool ret;
80
81 spin_lock_irqsave(&r->producer_lock, flags);
82 ret = __ptr_ring_full(r);
83 spin_unlock_irqrestore(lock: &r->producer_lock, flags);
84
85 return ret;
86}
87
88static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89{
90 bool ret;
91
92 spin_lock_bh(lock: &r->producer_lock);
93 ret = __ptr_ring_full(r);
94 spin_unlock_bh(lock: &r->producer_lock);
95
96 return ret;
97}
98
99/* Note: callers invoking this in a loop must use a compiler barrier,
100 * for example cpu_relax(). Callers must hold producer_lock.
101 * Callers are responsible for making sure pointer that is being queued
102 * points to a valid data.
103 */
104static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105{
106 if (unlikely(!r->size) || r->queue[r->producer])
107 return -ENOSPC;
108
109 /* Make sure the pointer we are storing points to a valid data. */
110 /* Pairs with the dependency ordering in __ptr_ring_consume. */
111 smp_wmb();
112
113 WRITE_ONCE(r->queue[r->producer++], ptr);
114 if (unlikely(r->producer >= r->size))
115 r->producer = 0;
116 return 0;
117}
118
119/*
120 * Note: resize (below) nests producer lock within consumer lock, so if you
121 * consume in interrupt or BH context, you must disable interrupts/BH when
122 * calling this.
123 */
124static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125{
126 int ret;
127
128 spin_lock(lock: &r->producer_lock);
129 ret = __ptr_ring_produce(r, ptr);
130 spin_unlock(lock: &r->producer_lock);
131
132 return ret;
133}
134
135static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136{
137 int ret;
138
139 spin_lock_irq(lock: &r->producer_lock);
140 ret = __ptr_ring_produce(r, ptr);
141 spin_unlock_irq(lock: &r->producer_lock);
142
143 return ret;
144}
145
146static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147{
148 unsigned long flags;
149 int ret;
150
151 spin_lock_irqsave(&r->producer_lock, flags);
152 ret = __ptr_ring_produce(r, ptr);
153 spin_unlock_irqrestore(lock: &r->producer_lock, flags);
154
155 return ret;
156}
157
158static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159{
160 int ret;
161
162 spin_lock_bh(lock: &r->producer_lock);
163 ret = __ptr_ring_produce(r, ptr);
164 spin_unlock_bh(lock: &r->producer_lock);
165
166 return ret;
167}
168
169static inline void *__ptr_ring_peek(struct ptr_ring *r)
170{
171 if (likely(r->size))
172 return READ_ONCE(r->queue[r->consumer_head]);
173 return NULL;
174}
175
176/*
177 * Test ring empty status without taking any locks.
178 *
179 * NB: This is only safe to call if ring is never resized.
180 *
181 * However, if some other CPU consumes ring entries at the same time, the value
182 * returned is not guaranteed to be correct.
183 *
184 * In this case - to avoid incorrectly detecting the ring
185 * as empty - the CPU consuming the ring entries is responsible
186 * for either consuming all ring entries until the ring is empty,
187 * or synchronizing with some other CPU and causing it to
188 * re-test __ptr_ring_empty and/or consume the ring enteries
189 * after the synchronization point.
190 *
191 * Note: callers invoking this in a loop must use a compiler barrier,
192 * for example cpu_relax().
193 */
194static inline bool __ptr_ring_empty(struct ptr_ring *r)
195{
196 if (likely(r->size))
197 return !r->queue[READ_ONCE(r->consumer_head)];
198 return true;
199}
200
201static inline bool ptr_ring_empty(struct ptr_ring *r)
202{
203 bool ret;
204
205 spin_lock(lock: &r->consumer_lock);
206 ret = __ptr_ring_empty(r);
207 spin_unlock(lock: &r->consumer_lock);
208
209 return ret;
210}
211
212static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213{
214 bool ret;
215
216 spin_lock_irq(lock: &r->consumer_lock);
217 ret = __ptr_ring_empty(r);
218 spin_unlock_irq(lock: &r->consumer_lock);
219
220 return ret;
221}
222
223static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224{
225 unsigned long flags;
226 bool ret;
227
228 spin_lock_irqsave(&r->consumer_lock, flags);
229 ret = __ptr_ring_empty(r);
230 spin_unlock_irqrestore(lock: &r->consumer_lock, flags);
231
232 return ret;
233}
234
235static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236{
237 bool ret;
238
239 spin_lock_bh(lock: &r->consumer_lock);
240 ret = __ptr_ring_empty(r);
241 spin_unlock_bh(lock: &r->consumer_lock);
242
243 return ret;
244}
245
246/* Zero entries from tail to specified head.
247 * NB: if consumer_head can be >= r->size need to fixup tail later.
248 */
249static inline void __ptr_ring_zero_tail(struct ptr_ring *r, int consumer_head)
250{
251 int head = consumer_head;
252
253 /* Zero out entries in the reverse order: this way we touch the
254 * cache line that producer might currently be reading the last;
255 * producer won't make progress and touch other cache lines
256 * besides the first one until we write out all entries.
257 */
258 while (likely(head > r->consumer_tail))
259 r->queue[--head] = NULL;
260
261 r->consumer_tail = consumer_head;
262}
263
264/* Must only be called after __ptr_ring_peek returned !NULL */
265static inline void __ptr_ring_discard_one(struct ptr_ring *r)
266{
267 /* Fundamentally, what we want to do is update consumer
268 * index and zero out the entry so producer can reuse it.
269 * Doing it naively at each consume would be as simple as:
270 * consumer = r->consumer;
271 * r->queue[consumer++] = NULL;
272 * if (unlikely(consumer >= r->size))
273 * consumer = 0;
274 * r->consumer = consumer;
275 * but that is suboptimal when the ring is full as producer is writing
276 * out new entries in the same cache line. Defer these updates until a
277 * batch of entries has been consumed.
278 */
279 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
280 * to work correctly.
281 */
282 int consumer_head = r->consumer_head + 1;
283
284 /* Once we have processed enough entries invalidate them in
285 * the ring all at once so producer can reuse their space in the ring.
286 * We also do this when we reach end of the ring - not mandatory
287 * but helps keep the implementation simple.
288 */
289 if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
290 consumer_head >= r->size))
291 __ptr_ring_zero_tail(r, consumer_head);
292
293 if (unlikely(consumer_head >= r->size)) {
294 consumer_head = 0;
295 r->consumer_tail = 0;
296 }
297 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
298 WRITE_ONCE(r->consumer_head, consumer_head);
299}
300
301static inline void *__ptr_ring_consume(struct ptr_ring *r)
302{
303 void *ptr;
304
305 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
306 * accessing data through the pointer is up to date. Pairs
307 * with smp_wmb in __ptr_ring_produce.
308 */
309 ptr = __ptr_ring_peek(r);
310 if (ptr)
311 __ptr_ring_discard_one(r);
312
313 return ptr;
314}
315
316static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
317 void **array, int n)
318{
319 void *ptr;
320 int i;
321
322 for (i = 0; i < n; i++) {
323 ptr = __ptr_ring_consume(r);
324 if (!ptr)
325 break;
326 array[i] = ptr;
327 }
328
329 return i;
330}
331
332/*
333 * Note: resize (below) nests producer lock within consumer lock, so if you
334 * call this in interrupt or BH context, you must disable interrupts/BH when
335 * producing.
336 */
337static inline void *ptr_ring_consume(struct ptr_ring *r)
338{
339 void *ptr;
340
341 spin_lock(lock: &r->consumer_lock);
342 ptr = __ptr_ring_consume(r);
343 spin_unlock(lock: &r->consumer_lock);
344
345 return ptr;
346}
347
348static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
349{
350 void *ptr;
351
352 spin_lock_irq(lock: &r->consumer_lock);
353 ptr = __ptr_ring_consume(r);
354 spin_unlock_irq(lock: &r->consumer_lock);
355
356 return ptr;
357}
358
359static inline void *ptr_ring_consume_any(struct ptr_ring *r)
360{
361 unsigned long flags;
362 void *ptr;
363
364 spin_lock_irqsave(&r->consumer_lock, flags);
365 ptr = __ptr_ring_consume(r);
366 spin_unlock_irqrestore(lock: &r->consumer_lock, flags);
367
368 return ptr;
369}
370
371static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
372{
373 void *ptr;
374
375 spin_lock_bh(lock: &r->consumer_lock);
376 ptr = __ptr_ring_consume(r);
377 spin_unlock_bh(lock: &r->consumer_lock);
378
379 return ptr;
380}
381
382static inline int ptr_ring_consume_batched(struct ptr_ring *r,
383 void **array, int n)
384{
385 int ret;
386
387 spin_lock(lock: &r->consumer_lock);
388 ret = __ptr_ring_consume_batched(r, array, n);
389 spin_unlock(lock: &r->consumer_lock);
390
391 return ret;
392}
393
394static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
395 void **array, int n)
396{
397 int ret;
398
399 spin_lock_irq(lock: &r->consumer_lock);
400 ret = __ptr_ring_consume_batched(r, array, n);
401 spin_unlock_irq(lock: &r->consumer_lock);
402
403 return ret;
404}
405
406static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
407 void **array, int n)
408{
409 unsigned long flags;
410 int ret;
411
412 spin_lock_irqsave(&r->consumer_lock, flags);
413 ret = __ptr_ring_consume_batched(r, array, n);
414 spin_unlock_irqrestore(lock: &r->consumer_lock, flags);
415
416 return ret;
417}
418
419static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
420 void **array, int n)
421{
422 int ret;
423
424 spin_lock_bh(lock: &r->consumer_lock);
425 ret = __ptr_ring_consume_batched(r, array, n);
426 spin_unlock_bh(lock: &r->consumer_lock);
427
428 return ret;
429}
430
431/* Cast to structure type and call a function without discarding from FIFO.
432 * Function must return a value.
433 * Callers must take consumer_lock.
434 */
435#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
436
437#define PTR_RING_PEEK_CALL(r, f) ({ \
438 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
439 \
440 spin_lock(&(r)->consumer_lock); \
441 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
442 spin_unlock(&(r)->consumer_lock); \
443 __PTR_RING_PEEK_CALL_v; \
444})
445
446#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
447 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
448 \
449 spin_lock_irq(&(r)->consumer_lock); \
450 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
451 spin_unlock_irq(&(r)->consumer_lock); \
452 __PTR_RING_PEEK_CALL_v; \
453})
454
455#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
456 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
457 \
458 spin_lock_bh(&(r)->consumer_lock); \
459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460 spin_unlock_bh(&(r)->consumer_lock); \
461 __PTR_RING_PEEK_CALL_v; \
462})
463
464#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
465 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
466 unsigned long __PTR_RING_PEEK_CALL_f;\
467 \
468 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
469 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
470 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
471 __PTR_RING_PEEK_CALL_v; \
472})
473
474/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
475 * documentation for vmalloc for which of them are legal.
476 */
477static inline void **__ptr_ring_init_queue_alloc_noprof(unsigned int size, gfp_t gfp)
478{
479 if (size > KMALLOC_MAX_SIZE / sizeof(void *))
480 return NULL;
481 return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO);
482}
483
484static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
485{
486 r->size = size;
487 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
488 /* We need to set batch at least to 1 to make logic
489 * in __ptr_ring_discard_one work correctly.
490 * Batching too much (because ring is small) would cause a lot of
491 * burstiness. Needs tuning, for now disable batching.
492 */
493 if (r->batch > r->size / 2 || !r->batch)
494 r->batch = 1;
495}
496
497static inline int ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp)
498{
499 r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp);
500 if (!r->queue)
501 return -ENOMEM;
502
503 __ptr_ring_set_size(r, size);
504 r->producer = r->consumer_head = r->consumer_tail = 0;
505 spin_lock_init(&r->producer_lock);
506 spin_lock_init(&r->consumer_lock);
507
508 return 0;
509}
510#define ptr_ring_init(...) alloc_hooks(ptr_ring_init_noprof(__VA_ARGS__))
511
512/*
513 * Return entries into ring. Destroy entries that don't fit.
514 *
515 * Note: this is expected to be a rare slow path operation.
516 *
517 * Note: producer lock is nested within consumer lock, so if you
518 * resize you must make sure all uses nest correctly.
519 * In particular if you consume ring in interrupt or BH context, you must
520 * disable interrupts/BH when doing so.
521 */
522static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
523 void (*destroy)(void *))
524{
525 unsigned long flags;
526
527 spin_lock_irqsave(&r->consumer_lock, flags);
528 spin_lock(lock: &r->producer_lock);
529
530 if (!r->size)
531 goto done;
532
533 /*
534 * Clean out buffered entries (for simplicity). This way following code
535 * can test entries for NULL and if not assume they are valid.
536 */
537 __ptr_ring_zero_tail(r, consumer_head: r->consumer_head);
538
539 /*
540 * Go over entries in batch, start moving head back and copy entries.
541 * Stop when we run into previously unconsumed entries.
542 */
543 while (n) {
544 int head = r->consumer_head - 1;
545 if (head < 0)
546 head = r->size - 1;
547 if (r->queue[head]) {
548 /* This batch entry will have to be destroyed. */
549 goto done;
550 }
551 r->queue[head] = batch[--n];
552 r->consumer_tail = head;
553 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
554 WRITE_ONCE(r->consumer_head, head);
555 }
556
557done:
558 /* Destroy all entries left in the batch. */
559 while (n)
560 destroy(batch[--n]);
561 spin_unlock(lock: &r->producer_lock);
562 spin_unlock_irqrestore(lock: &r->consumer_lock, flags);
563}
564
565static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
566 int size, gfp_t gfp,
567 void (*destroy)(void *))
568{
569 int producer = 0;
570 void **old;
571 void *ptr;
572
573 while ((ptr = __ptr_ring_consume(r)))
574 if (producer < size)
575 queue[producer++] = ptr;
576 else if (destroy)
577 destroy(ptr);
578
579 if (producer >= size)
580 producer = 0;
581 __ptr_ring_set_size(r, size);
582 r->producer = producer;
583 r->consumer_head = 0;
584 r->consumer_tail = 0;
585 old = r->queue;
586 r->queue = queue;
587
588 return old;
589}
590
591/*
592 * Note: producer lock is nested within consumer lock, so if you
593 * resize you must make sure all uses nest correctly.
594 * In particular if you consume ring in interrupt or BH context, you must
595 * disable interrupts/BH when doing so.
596 */
597static inline int ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp,
598 void (*destroy)(void *))
599{
600 unsigned long flags;
601 void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp);
602 void **old;
603
604 if (!queue)
605 return -ENOMEM;
606
607 spin_lock_irqsave(&(r)->consumer_lock, flags);
608 spin_lock(lock: &(r)->producer_lock);
609
610 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
611
612 spin_unlock(lock: &(r)->producer_lock);
613 spin_unlock_irqrestore(lock: &(r)->consumer_lock, flags);
614
615 kvfree(addr: old);
616
617 return 0;
618}
619#define ptr_ring_resize(...) alloc_hooks(ptr_ring_resize_noprof(__VA_ARGS__))
620
621/*
622 * Note: producer lock is nested within consumer lock, so if you
623 * resize you must make sure all uses nest correctly.
624 * In particular if you consume ring in BH context, you must
625 * disable BH when doing so.
626 */
627static inline int ptr_ring_resize_multiple_bh_noprof(struct ptr_ring **rings,
628 unsigned int nrings,
629 int size, gfp_t gfp,
630 void (*destroy)(void *))
631{
632 void ***queues;
633 int i;
634
635 queues = kmalloc_array_noprof(n: nrings, size: sizeof(*queues), flags: gfp);
636 if (!queues)
637 goto noqueues;
638
639 for (i = 0; i < nrings; ++i) {
640 queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp);
641 if (!queues[i])
642 goto nomem;
643 }
644
645 for (i = 0; i < nrings; ++i) {
646 spin_lock_bh(lock: &(rings[i])->consumer_lock);
647 spin_lock(lock: &(rings[i])->producer_lock);
648 queues[i] = __ptr_ring_swap_queue(r: rings[i], queue: queues[i],
649 size, gfp, destroy);
650 spin_unlock(lock: &(rings[i])->producer_lock);
651 spin_unlock_bh(lock: &(rings[i])->consumer_lock);
652 }
653
654 for (i = 0; i < nrings; ++i)
655 kvfree(addr: queues[i]);
656
657 kfree(objp: queues);
658
659 return 0;
660
661nomem:
662 while (--i >= 0)
663 kvfree(addr: queues[i]);
664
665 kfree(objp: queues);
666
667noqueues:
668 return -ENOMEM;
669}
670#define ptr_ring_resize_multiple_bh(...) \
671 alloc_hooks(ptr_ring_resize_multiple_bh_noprof(__VA_ARGS__))
672
673static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
674{
675 void *ptr;
676
677 if (destroy)
678 while ((ptr = ptr_ring_consume(r)))
679 destroy(ptr);
680 kvfree(addr: r->queue);
681}
682
683#endif /* _LINUX_PTR_RING_H */
684