Branch data Line data Source code
1 : : /* SPDX-License-Identifier: GPL-2.0-or-later */
2 : : /*
3 : : * Definitions for the 'struct ptr_ring' datastructure.
4 : : *
5 : : * Author:
6 : : * Michael S. Tsirkin <mst@redhat.com>
7 : : *
8 : : * Copyright (C) 2016 Red Hat, Inc.
9 : : *
10 : : * This is a limited-size FIFO maintaining pointers in FIFO order, with
11 : : * one CPU producing entries and another consuming entries from a FIFO.
12 : : *
13 : : * This implementation tries to minimize cache-contention when there is a
14 : : * single producer and a single consumer CPU.
15 : : */
16 : :
17 : : #ifndef _LINUX_PTR_RING_H
18 : : #define _LINUX_PTR_RING_H 1
19 : :
20 : : #ifdef __KERNEL__
21 : : #include <linux/spinlock.h>
22 : : #include <linux/cache.h>
23 : : #include <linux/types.h>
24 : : #include <linux/compiler.h>
25 : : #include <linux/slab.h>
26 : : #include <linux/mm.h>
27 : : #include <asm/errno.h>
28 : : #endif
29 : :
30 : : struct ptr_ring {
31 : : int producer ____cacheline_aligned_in_smp;
32 : : spinlock_t producer_lock;
33 : : int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34 : : int consumer_tail; /* next entry to invalidate */
35 : : spinlock_t consumer_lock;
36 : : /* Shared consumer/producer data */
37 : : /* Read-only by both the producer and the consumer */
38 : : int size ____cacheline_aligned_in_smp; /* max entries in queue */
39 : : int batch; /* number of entries to consume in a batch */
40 : : void **queue;
41 : : };
42 : :
43 : : /* Note: callers invoking this in a loop must use a compiler barrier,
44 : : * for example cpu_relax().
45 : : *
46 : : * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47 : : * see e.g. ptr_ring_full.
48 : : */
49 : : static inline bool __ptr_ring_full(struct ptr_ring *r)
50 : : {
51 : : return r->queue[r->producer];
52 : : }
53 : :
54 : : static inline bool ptr_ring_full(struct ptr_ring *r)
55 : : {
56 : : bool ret;
57 : :
58 : : spin_lock(&r->producer_lock);
59 : : ret = __ptr_ring_full(r);
60 : : spin_unlock(&r->producer_lock);
61 : :
62 : : return ret;
63 : : }
64 : :
65 : : static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66 : : {
67 : : bool ret;
68 : :
69 : : spin_lock_irq(&r->producer_lock);
70 : : ret = __ptr_ring_full(r);
71 : : spin_unlock_irq(&r->producer_lock);
72 : :
73 : : return ret;
74 : : }
75 : :
76 : : static inline bool ptr_ring_full_any(struct ptr_ring *r)
77 : : {
78 : : unsigned long flags;
79 : : bool ret;
80 : :
81 : : spin_lock_irqsave(&r->producer_lock, flags);
82 : : ret = __ptr_ring_full(r);
83 : : spin_unlock_irqrestore(&r->producer_lock, flags);
84 : :
85 : : return ret;
86 : : }
87 : :
88 : : static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89 : : {
90 : : bool ret;
91 : :
92 : : spin_lock_bh(&r->producer_lock);
93 : : ret = __ptr_ring_full(r);
94 : : spin_unlock_bh(&r->producer_lock);
95 : :
96 : : return ret;
97 : : }
98 : :
99 : : /* Note: callers invoking this in a loop must use a compiler barrier,
100 : : * for example cpu_relax(). Callers must hold producer_lock.
101 : : * Callers are responsible for making sure pointer that is being queued
102 : : * points to a valid data.
103 : : */
104 : 17 : static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105 : : {
106 [ + - + - ]: 17 : if (unlikely(!r->size) || r->queue[r->producer])
107 : : return -ENOSPC;
108 : :
109 : : /* Make sure the pointer we are storing points to a valid data. */
110 : : /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
111 : 17 : smp_wmb();
112 : :
113 [ - + ]: 17 : WRITE_ONCE(r->queue[r->producer++], ptr);
114 [ - + ]: 17 : if (unlikely(r->producer >= r->size))
115 : 0 : r->producer = 0;
116 : : return 0;
117 : : }
118 : :
119 : : /*
120 : : * Note: resize (below) nests producer lock within consumer lock, so if you
121 : : * consume in interrupt or BH context, you must disable interrupts/BH when
122 : : * calling this.
123 : : */
124 : 17 : static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125 : : {
126 : 17 : int ret;
127 : :
128 : 17 : spin_lock(&r->producer_lock);
129 : 17 : ret = __ptr_ring_produce(r, ptr);
130 : 17 : spin_unlock(&r->producer_lock);
131 : :
132 : 17 : return ret;
133 : : }
134 : :
135 : : static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136 : : {
137 : : int ret;
138 : :
139 : : spin_lock_irq(&r->producer_lock);
140 : : ret = __ptr_ring_produce(r, ptr);
141 : : spin_unlock_irq(&r->producer_lock);
142 : :
143 : : return ret;
144 : : }
145 : :
146 : : static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147 : : {
148 : : unsigned long flags;
149 : : int ret;
150 : :
151 : : spin_lock_irqsave(&r->producer_lock, flags);
152 : : ret = __ptr_ring_produce(r, ptr);
153 : : spin_unlock_irqrestore(&r->producer_lock, flags);
154 : :
155 : : return ret;
156 : : }
157 : :
158 : 0 : static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159 : : {
160 : 0 : int ret;
161 : :
162 : 0 : spin_lock_bh(&r->producer_lock);
163 : 0 : ret = __ptr_ring_produce(r, ptr);
164 : 0 : spin_unlock_bh(&r->producer_lock);
165 : :
166 : 0 : return ret;
167 : : }
168 : :
169 : 3 : static inline void *__ptr_ring_peek(struct ptr_ring *r)
170 : : {
171 [ # # ]: 0 : if (likely(r->size))
172 [ + - ]: 3 : return READ_ONCE(r->queue[r->consumer_head]);
173 : : return NULL;
174 : : }
175 : :
176 : : /*
177 : : * Test ring empty status without taking any locks.
178 : : *
179 : : * NB: This is only safe to call if ring is never resized.
180 : : *
181 : : * However, if some other CPU consumes ring entries at the same time, the value
182 : : * returned is not guaranteed to be correct.
183 : : *
184 : : * In this case - to avoid incorrectly detecting the ring
185 : : * as empty - the CPU consuming the ring entries is responsible
186 : : * for either consuming all ring entries until the ring is empty,
187 : : * or synchronizing with some other CPU and causing it to
188 : : * re-test __ptr_ring_empty and/or consume the ring enteries
189 : : * after the synchronization point.
190 : : *
191 : : * Note: callers invoking this in a loop must use a compiler barrier,
192 : : * for example cpu_relax().
193 : : */
194 : 3 : static inline bool __ptr_ring_empty(struct ptr_ring *r)
195 : : {
196 [ + - ]: 3 : if (likely(r->size))
197 [ - + ]: 3 : return !r->queue[READ_ONCE(r->consumer_head)];
198 : : return true;
199 : : }
200 : :
201 : : static inline bool ptr_ring_empty(struct ptr_ring *r)
202 : : {
203 : : bool ret;
204 : :
205 : : spin_lock(&r->consumer_lock);
206 : : ret = __ptr_ring_empty(r);
207 : : spin_unlock(&r->consumer_lock);
208 : :
209 : : return ret;
210 : : }
211 : :
212 : : static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213 : : {
214 : : bool ret;
215 : :
216 : : spin_lock_irq(&r->consumer_lock);
217 : : ret = __ptr_ring_empty(r);
218 : : spin_unlock_irq(&r->consumer_lock);
219 : :
220 : : return ret;
221 : : }
222 : :
223 : : static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224 : : {
225 : : unsigned long flags;
226 : : bool ret;
227 : :
228 : : spin_lock_irqsave(&r->consumer_lock, flags);
229 : : ret = __ptr_ring_empty(r);
230 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
231 : :
232 : : return ret;
233 : : }
234 : :
235 : : static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236 : : {
237 : : bool ret;
238 : :
239 : : spin_lock_bh(&r->consumer_lock);
240 : : ret = __ptr_ring_empty(r);
241 : : spin_unlock_bh(&r->consumer_lock);
242 : :
243 : : return ret;
244 : : }
245 : :
246 : : /* Must only be called after __ptr_ring_peek returned !NULL */
247 : 3 : static inline void __ptr_ring_discard_one(struct ptr_ring *r)
248 : : {
249 : : /* Fundamentally, what we want to do is update consumer
250 : : * index and zero out the entry so producer can reuse it.
251 : : * Doing it naively at each consume would be as simple as:
252 : : * consumer = r->consumer;
253 : : * r->queue[consumer++] = NULL;
254 : : * if (unlikely(consumer >= r->size))
255 : : * consumer = 0;
256 : : * r->consumer = consumer;
257 : : * but that is suboptimal when the ring is full as producer is writing
258 : : * out new entries in the same cache line. Defer these updates until a
259 : : * batch of entries has been consumed.
260 : : */
261 : : /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
262 : : * to work correctly.
263 : : */
264 : 3 : int consumer_head = r->consumer_head;
265 : 3 : int head = consumer_head++;
266 : :
267 : : /* Once we have processed enough entries invalidate them in
268 : : * the ring all at once so producer can reuse their space in the ring.
269 : : * We also do this when we reach end of the ring - not mandatory
270 : : * but helps keep the implementation simple.
271 : : */
272 [ - + - + ]: 3 : if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
273 : : consumer_head >= r->size)) {
274 : : /* Zero out entries in the reverse order: this way we touch the
275 : : * cache line that producer might currently be reading the last;
276 : : * producer won't make progress and touch other cache lines
277 : : * besides the first one until we write out all entries.
278 : : */
279 [ # # ]: 0 : while (likely(head >= r->consumer_tail))
280 : 0 : r->queue[head--] = NULL;
281 : 0 : r->consumer_tail = consumer_head;
282 : : }
283 [ - + ]: 3 : if (unlikely(consumer_head >= r->size)) {
284 : 0 : consumer_head = 0;
285 : 0 : r->consumer_tail = 0;
286 : : }
287 : : /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
288 : 3 : WRITE_ONCE(r->consumer_head, consumer_head);
289 : 3 : }
290 : :
291 : 3 : static inline void *__ptr_ring_consume(struct ptr_ring *r)
292 : : {
293 : 3 : void *ptr;
294 : :
295 : : /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
296 : : * accessing data through the pointer is up to date. Pairs
297 : : * with smp_wmb in __ptr_ring_produce.
298 : : */
299 [ + - ]: 3 : ptr = __ptr_ring_peek(r);
300 [ + - ]: 3 : if (ptr)
301 : 3 : __ptr_ring_discard_one(r);
302 : :
303 : 3 : return ptr;
304 : : }
305 : :
306 : : static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
307 : : void **array, int n)
308 : : {
309 : : void *ptr;
310 : : int i;
311 : :
312 : : for (i = 0; i < n; i++) {
313 : : ptr = __ptr_ring_consume(r);
314 : : if (!ptr)
315 : : break;
316 : : array[i] = ptr;
317 : : }
318 : :
319 : : return i;
320 : : }
321 : :
322 : : /*
323 : : * Note: resize (below) nests producer lock within consumer lock, so if you
324 : : * call this in interrupt or BH context, you must disable interrupts/BH when
325 : : * producing.
326 : : */
327 : : static inline void *ptr_ring_consume(struct ptr_ring *r)
328 : : {
329 : : void *ptr;
330 : :
331 : : spin_lock(&r->consumer_lock);
332 : : ptr = __ptr_ring_consume(r);
333 : : spin_unlock(&r->consumer_lock);
334 : :
335 : : return ptr;
336 : : }
337 : :
338 : : static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
339 : : {
340 : : void *ptr;
341 : :
342 : : spin_lock_irq(&r->consumer_lock);
343 : : ptr = __ptr_ring_consume(r);
344 : : spin_unlock_irq(&r->consumer_lock);
345 : :
346 : : return ptr;
347 : : }
348 : :
349 : : static inline void *ptr_ring_consume_any(struct ptr_ring *r)
350 : : {
351 : : unsigned long flags;
352 : : void *ptr;
353 : :
354 : : spin_lock_irqsave(&r->consumer_lock, flags);
355 : : ptr = __ptr_ring_consume(r);
356 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
357 : :
358 : : return ptr;
359 : : }
360 : :
361 : 0 : static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
362 : : {
363 : 0 : void *ptr;
364 : :
365 : 0 : spin_lock_bh(&r->consumer_lock);
366 : 0 : ptr = __ptr_ring_consume(r);
367 : 0 : spin_unlock_bh(&r->consumer_lock);
368 : :
369 : 0 : return ptr;
370 : : }
371 : :
372 : : static inline int ptr_ring_consume_batched(struct ptr_ring *r,
373 : : void **array, int n)
374 : : {
375 : : int ret;
376 : :
377 : : spin_lock(&r->consumer_lock);
378 : : ret = __ptr_ring_consume_batched(r, array, n);
379 : : spin_unlock(&r->consumer_lock);
380 : :
381 : : return ret;
382 : : }
383 : :
384 : : static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
385 : : void **array, int n)
386 : : {
387 : : int ret;
388 : :
389 : : spin_lock_irq(&r->consumer_lock);
390 : : ret = __ptr_ring_consume_batched(r, array, n);
391 : : spin_unlock_irq(&r->consumer_lock);
392 : :
393 : : return ret;
394 : : }
395 : :
396 : : static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
397 : : void **array, int n)
398 : : {
399 : : unsigned long flags;
400 : : int ret;
401 : :
402 : : spin_lock_irqsave(&r->consumer_lock, flags);
403 : : ret = __ptr_ring_consume_batched(r, array, n);
404 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
405 : :
406 : : return ret;
407 : : }
408 : :
409 : : static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
410 : : void **array, int n)
411 : : {
412 : : int ret;
413 : :
414 : : spin_lock_bh(&r->consumer_lock);
415 : : ret = __ptr_ring_consume_batched(r, array, n);
416 : : spin_unlock_bh(&r->consumer_lock);
417 : :
418 : : return ret;
419 : : }
420 : :
421 : : /* Cast to structure type and call a function without discarding from FIFO.
422 : : * Function must return a value.
423 : : * Callers must take consumer_lock.
424 : : */
425 : : #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
426 : :
427 : : #define PTR_RING_PEEK_CALL(r, f) ({ \
428 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
429 : : \
430 : : spin_lock(&(r)->consumer_lock); \
431 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
432 : : spin_unlock(&(r)->consumer_lock); \
433 : : __PTR_RING_PEEK_CALL_v; \
434 : : })
435 : :
436 : : #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
437 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
438 : : \
439 : : spin_lock_irq(&(r)->consumer_lock); \
440 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
441 : : spin_unlock_irq(&(r)->consumer_lock); \
442 : : __PTR_RING_PEEK_CALL_v; \
443 : : })
444 : :
445 : : #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
446 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
447 : : \
448 : : spin_lock_bh(&(r)->consumer_lock); \
449 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
450 : : spin_unlock_bh(&(r)->consumer_lock); \
451 : : __PTR_RING_PEEK_CALL_v; \
452 : : })
453 : :
454 : : #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
455 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
456 : : unsigned long __PTR_RING_PEEK_CALL_f;\
457 : : \
458 : : spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
459 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460 : : spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
461 : : __PTR_RING_PEEK_CALL_v; \
462 : : })
463 : :
464 : : /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
465 : : * documentation for vmalloc for which of them are legal.
466 : : */
467 : 9 : static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
468 : : {
469 : 9 : if (size > KMALLOC_MAX_SIZE / sizeof(void *))
470 : : return NULL;
471 : 9 : return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
472 : : }
473 : :
474 : 9 : static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
475 : : {
476 : 9 : r->size = size;
477 : 9 : r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
478 : : /* We need to set batch at least to 1 to make logic
479 : : * in __ptr_ring_discard_one work correctly.
480 : : * Batching too much (because ring is small) would cause a lot of
481 : : * burstiness. Needs tuning, for now disable batching.
482 : : */
483 : 9 : if (r->batch > r->size / 2 || !r->batch)
484 : 0 : r->batch = 1;
485 : : }
486 : :
487 : 9 : static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
488 : : {
489 [ + - ]: 9 : r->queue = __ptr_ring_init_queue_alloc(size, gfp);
490 [ + - ]: 9 : if (!r->queue)
491 : : return -ENOMEM;
492 : :
493 [ - + ]: 9 : __ptr_ring_set_size(r, size);
494 : 9 : r->producer = r->consumer_head = r->consumer_tail = 0;
495 : 9 : spin_lock_init(&r->producer_lock);
496 : 9 : spin_lock_init(&r->consumer_lock);
497 : :
498 : 9 : return 0;
499 : : }
500 : :
501 : : /*
502 : : * Return entries into ring. Destroy entries that don't fit.
503 : : *
504 : : * Note: this is expected to be a rare slow path operation.
505 : : *
506 : : * Note: producer lock is nested within consumer lock, so if you
507 : : * resize you must make sure all uses nest correctly.
508 : : * In particular if you consume ring in interrupt or BH context, you must
509 : : * disable interrupts/BH when doing so.
510 : : */
511 : : static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
512 : : void (*destroy)(void *))
513 : : {
514 : : unsigned long flags;
515 : : int head;
516 : :
517 : : spin_lock_irqsave(&r->consumer_lock, flags);
518 : : spin_lock(&r->producer_lock);
519 : :
520 : : if (!r->size)
521 : : goto done;
522 : :
523 : : /*
524 : : * Clean out buffered entries (for simplicity). This way following code
525 : : * can test entries for NULL and if not assume they are valid.
526 : : */
527 : : head = r->consumer_head - 1;
528 : : while (likely(head >= r->consumer_tail))
529 : : r->queue[head--] = NULL;
530 : : r->consumer_tail = r->consumer_head;
531 : :
532 : : /*
533 : : * Go over entries in batch, start moving head back and copy entries.
534 : : * Stop when we run into previously unconsumed entries.
535 : : */
536 : : while (n) {
537 : : head = r->consumer_head - 1;
538 : : if (head < 0)
539 : : head = r->size - 1;
540 : : if (r->queue[head]) {
541 : : /* This batch entry will have to be destroyed. */
542 : : goto done;
543 : : }
544 : : r->queue[head] = batch[--n];
545 : : r->consumer_tail = head;
546 : : /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
547 : : WRITE_ONCE(r->consumer_head, head);
548 : : }
549 : :
550 : : done:
551 : : /* Destroy all entries left in the batch. */
552 : : while (n)
553 : : destroy(batch[--n]);
554 : : spin_unlock(&r->producer_lock);
555 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
556 : : }
557 : :
558 : : static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
559 : : int size, gfp_t gfp,
560 : : void (*destroy)(void *))
561 : : {
562 : : int producer = 0;
563 : : void **old;
564 : : void *ptr;
565 : :
566 : : while ((ptr = __ptr_ring_consume(r)))
567 : : if (producer < size)
568 : : queue[producer++] = ptr;
569 : : else if (destroy)
570 : : destroy(ptr);
571 : :
572 : : if (producer >= size)
573 : : producer = 0;
574 : : __ptr_ring_set_size(r, size);
575 : : r->producer = producer;
576 : : r->consumer_head = 0;
577 : : r->consumer_tail = 0;
578 : : old = r->queue;
579 : : r->queue = queue;
580 : :
581 : : return old;
582 : : }
583 : :
584 : : /*
585 : : * Note: producer lock is nested within consumer lock, so if you
586 : : * resize you must make sure all uses nest correctly.
587 : : * In particular if you consume ring in interrupt or BH context, you must
588 : : * disable interrupts/BH when doing so.
589 : : */
590 : : static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
591 : : void (*destroy)(void *))
592 : : {
593 : : unsigned long flags;
594 : : void **queue = __ptr_ring_init_queue_alloc(size, gfp);
595 : : void **old;
596 : :
597 : : if (!queue)
598 : : return -ENOMEM;
599 : :
600 : : spin_lock_irqsave(&(r)->consumer_lock, flags);
601 : : spin_lock(&(r)->producer_lock);
602 : :
603 : : old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
604 : :
605 : : spin_unlock(&(r)->producer_lock);
606 : : spin_unlock_irqrestore(&(r)->consumer_lock, flags);
607 : :
608 : : kvfree(old);
609 : :
610 : : return 0;
611 : : }
612 : :
613 : : /*
614 : : * Note: producer lock is nested within consumer lock, so if you
615 : : * resize you must make sure all uses nest correctly.
616 : : * In particular if you consume ring in interrupt or BH context, you must
617 : : * disable interrupts/BH when doing so.
618 : : */
619 : 0 : static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
620 : : unsigned int nrings,
621 : : int size,
622 : : gfp_t gfp, void (*destroy)(void *))
623 : : {
624 : 0 : unsigned long flags;
625 : 0 : void ***queues;
626 : 0 : int i;
627 : :
628 : 0 : queues = kmalloc_array(nrings, sizeof(*queues), gfp);
629 [ # # ]: 0 : if (!queues)
630 : 0 : goto noqueues;
631 : :
632 [ # # ]: 0 : for (i = 0; i < nrings; ++i) {
633 [ # # ]: 0 : queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
634 [ # # ]: 0 : if (!queues[i])
635 : 0 : goto nomem;
636 : : }
637 : :
638 [ # # ]: 0 : for (i = 0; i < nrings; ++i) {
639 : 0 : spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
640 : 0 : spin_lock(&(rings[i])->producer_lock);
641 : 0 : queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
642 : : size, gfp, destroy);
643 : 0 : spin_unlock(&(rings[i])->producer_lock);
644 : 0 : spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
645 : : }
646 : :
647 [ # # ]: 0 : for (i = 0; i < nrings; ++i)
648 : 0 : kvfree(queues[i]);
649 : :
650 : 0 : kfree(queues);
651 : :
652 : 0 : return 0;
653 : :
654 : : nomem:
655 [ # # ]: 0 : while (--i >= 0)
656 : 0 : kvfree(queues[i]);
657 : :
658 : 0 : kfree(queues);
659 : :
660 : : noqueues:
661 : : return -ENOMEM;
662 : : }
663 : :
664 : 0 : static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
665 : : {
666 : 0 : void *ptr;
667 : :
668 : 0 : if (destroy)
669 : : while ((ptr = ptr_ring_consume(r)))
670 : : destroy(ptr);
671 : 0 : kvfree(r->queue);
672 : 0 : }
673 : :
674 : : #endif /* _LINUX_PTR_RING_H */
|