Branch data Line data Source code
1 : : /* SPDX-License-Identifier: GPL-2.0-or-later */
2 : : /*
3 : : * Definitions for the 'struct ptr_ring' datastructure.
4 : : *
5 : : * Author:
6 : : * Michael S. Tsirkin <mst@redhat.com>
7 : : *
8 : : * Copyright (C) 2016 Red Hat, Inc.
9 : : *
10 : : * This is a limited-size FIFO maintaining pointers in FIFO order, with
11 : : * one CPU producing entries and another consuming entries from a FIFO.
12 : : *
13 : : * This implementation tries to minimize cache-contention when there is a
14 : : * single producer and a single consumer CPU.
15 : : */
16 : :
17 : : #ifndef _LINUX_PTR_RING_H
18 : : #define _LINUX_PTR_RING_H 1
19 : :
20 : : #ifdef __KERNEL__
21 : : #include <linux/spinlock.h>
22 : : #include <linux/cache.h>
23 : : #include <linux/types.h>
24 : : #include <linux/compiler.h>
25 : : #include <linux/slab.h>
26 : : #include <asm/errno.h>
27 : : #endif
28 : :
29 : : struct ptr_ring {
30 : : int producer ____cacheline_aligned_in_smp;
31 : : spinlock_t producer_lock;
32 : : int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
33 : : int consumer_tail; /* next entry to invalidate */
34 : : spinlock_t consumer_lock;
35 : : /* Shared consumer/producer data */
36 : : /* Read-only by both the producer and the consumer */
37 : : int size ____cacheline_aligned_in_smp; /* max entries in queue */
38 : : int batch; /* number of entries to consume in a batch */
39 : : void **queue;
40 : : };
41 : :
42 : : /* Note: callers invoking this in a loop must use a compiler barrier,
43 : : * for example cpu_relax().
44 : : *
45 : : * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
46 : : * see e.g. ptr_ring_full.
47 : : */
48 : : static inline bool __ptr_ring_full(struct ptr_ring *r)
49 : : {
50 : : return r->queue[r->producer];
51 : : }
52 : :
53 : : static inline bool ptr_ring_full(struct ptr_ring *r)
54 : : {
55 : : bool ret;
56 : :
57 : : spin_lock(&r->producer_lock);
58 : : ret = __ptr_ring_full(r);
59 : : spin_unlock(&r->producer_lock);
60 : :
61 : : return ret;
62 : : }
63 : :
64 : : static inline bool ptr_ring_full_irq(struct ptr_ring *r)
65 : : {
66 : : bool ret;
67 : :
68 : : spin_lock_irq(&r->producer_lock);
69 : : ret = __ptr_ring_full(r);
70 : : spin_unlock_irq(&r->producer_lock);
71 : :
72 : : return ret;
73 : : }
74 : :
75 : : static inline bool ptr_ring_full_any(struct ptr_ring *r)
76 : : {
77 : : unsigned long flags;
78 : : bool ret;
79 : :
80 : : spin_lock_irqsave(&r->producer_lock, flags);
81 : : ret = __ptr_ring_full(r);
82 : : spin_unlock_irqrestore(&r->producer_lock, flags);
83 : :
84 : : return ret;
85 : : }
86 : :
87 : : static inline bool ptr_ring_full_bh(struct ptr_ring *r)
88 : : {
89 : : bool ret;
90 : :
91 : : spin_lock_bh(&r->producer_lock);
92 : : ret = __ptr_ring_full(r);
93 : : spin_unlock_bh(&r->producer_lock);
94 : :
95 : : return ret;
96 : : }
97 : :
98 : : /* Note: callers invoking this in a loop must use a compiler barrier,
99 : : * for example cpu_relax(). Callers must hold producer_lock.
100 : : * Callers are responsible for making sure pointer that is being queued
101 : : * points to a valid data.
102 : : */
103 : 3 : static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
104 : : {
105 : 3 : if (unlikely(!r->size) || r->queue[r->producer])
106 : : return -ENOSPC;
107 : :
108 : : /* Make sure the pointer we are storing points to a valid data. */
109 : : /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
110 : 3 : smp_wmb();
111 : :
112 : 3 : WRITE_ONCE(r->queue[r->producer++], ptr);
113 : 3 : if (unlikely(r->producer >= r->size))
114 : 1 : r->producer = 0;
115 : : return 0;
116 : : }
117 : :
118 : : /*
119 : : * Note: resize (below) nests producer lock within consumer lock, so if you
120 : : * consume in interrupt or BH context, you must disable interrupts/BH when
121 : : * calling this.
122 : : */
123 : 3 : static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
124 : : {
125 : : int ret;
126 : :
127 : : spin_lock(&r->producer_lock);
128 : 3 : ret = __ptr_ring_produce(r, ptr);
129 : : spin_unlock(&r->producer_lock);
130 : :
131 : 3 : return ret;
132 : : }
133 : :
134 : : static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
135 : : {
136 : : int ret;
137 : :
138 : : spin_lock_irq(&r->producer_lock);
139 : : ret = __ptr_ring_produce(r, ptr);
140 : : spin_unlock_irq(&r->producer_lock);
141 : :
142 : : return ret;
143 : : }
144 : :
145 : : static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
146 : : {
147 : : unsigned long flags;
148 : : int ret;
149 : :
150 : : spin_lock_irqsave(&r->producer_lock, flags);
151 : : ret = __ptr_ring_produce(r, ptr);
152 : : spin_unlock_irqrestore(&r->producer_lock, flags);
153 : :
154 : : return ret;
155 : : }
156 : :
157 : : static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
158 : : {
159 : : int ret;
160 : :
161 : : spin_lock_bh(&r->producer_lock);
162 : : ret = __ptr_ring_produce(r, ptr);
163 : : spin_unlock_bh(&r->producer_lock);
164 : :
165 : : return ret;
166 : : }
167 : :
168 : : static inline void *__ptr_ring_peek(struct ptr_ring *r)
169 : : {
170 : 3 : if (likely(r->size))
171 : 3 : return READ_ONCE(r->queue[r->consumer_head]);
172 : : return NULL;
173 : : }
174 : :
175 : : /*
176 : : * Test ring empty status without taking any locks.
177 : : *
178 : : * NB: This is only safe to call if ring is never resized.
179 : : *
180 : : * However, if some other CPU consumes ring entries at the same time, the value
181 : : * returned is not guaranteed to be correct.
182 : : *
183 : : * In this case - to avoid incorrectly detecting the ring
184 : : * as empty - the CPU consuming the ring entries is responsible
185 : : * for either consuming all ring entries until the ring is empty,
186 : : * or synchronizing with some other CPU and causing it to
187 : : * re-test __ptr_ring_empty and/or consume the ring enteries
188 : : * after the synchronization point.
189 : : *
190 : : * Note: callers invoking this in a loop must use a compiler barrier,
191 : : * for example cpu_relax().
192 : : */
193 : : static inline bool __ptr_ring_empty(struct ptr_ring *r)
194 : : {
195 : 3 : if (likely(r->size))
196 : 3 : return !r->queue[READ_ONCE(r->consumer_head)];
197 : : return true;
198 : : }
199 : :
200 : : static inline bool ptr_ring_empty(struct ptr_ring *r)
201 : : {
202 : : bool ret;
203 : :
204 : : spin_lock(&r->consumer_lock);
205 : : ret = __ptr_ring_empty(r);
206 : : spin_unlock(&r->consumer_lock);
207 : :
208 : : return ret;
209 : : }
210 : :
211 : : static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
212 : : {
213 : : bool ret;
214 : :
215 : : spin_lock_irq(&r->consumer_lock);
216 : : ret = __ptr_ring_empty(r);
217 : : spin_unlock_irq(&r->consumer_lock);
218 : :
219 : : return ret;
220 : : }
221 : :
222 : : static inline bool ptr_ring_empty_any(struct ptr_ring *r)
223 : : {
224 : : unsigned long flags;
225 : : bool ret;
226 : :
227 : : spin_lock_irqsave(&r->consumer_lock, flags);
228 : : ret = __ptr_ring_empty(r);
229 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
230 : :
231 : : return ret;
232 : : }
233 : :
234 : : static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
235 : : {
236 : : bool ret;
237 : :
238 : : spin_lock_bh(&r->consumer_lock);
239 : : ret = __ptr_ring_empty(r);
240 : : spin_unlock_bh(&r->consumer_lock);
241 : :
242 : : return ret;
243 : : }
244 : :
245 : : /* Must only be called after __ptr_ring_peek returned !NULL */
246 : 3 : static inline void __ptr_ring_discard_one(struct ptr_ring *r)
247 : : {
248 : : /* Fundamentally, what we want to do is update consumer
249 : : * index and zero out the entry so producer can reuse it.
250 : : * Doing it naively at each consume would be as simple as:
251 : : * consumer = r->consumer;
252 : : * r->queue[consumer++] = NULL;
253 : : * if (unlikely(consumer >= r->size))
254 : : * consumer = 0;
255 : : * r->consumer = consumer;
256 : : * but that is suboptimal when the ring is full as producer is writing
257 : : * out new entries in the same cache line. Defer these updates until a
258 : : * batch of entries has been consumed.
259 : : */
260 : : /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
261 : : * to work correctly.
262 : : */
263 : 3 : int consumer_head = r->consumer_head;
264 : 3 : int head = consumer_head++;
265 : :
266 : : /* Once we have processed enough entries invalidate them in
267 : : * the ring all at once so producer can reuse their space in the ring.
268 : : * We also do this when we reach end of the ring - not mandatory
269 : : * but helps keep the implementation simple.
270 : : */
271 : 3 : if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
272 : : consumer_head >= r->size)) {
273 : : /* Zero out entries in the reverse order: this way we touch the
274 : : * cache line that producer might currently be reading the last;
275 : : * producer won't make progress and touch other cache lines
276 : : * besides the first one until we write out all entries.
277 : : */
278 : 3 : while (likely(head >= r->consumer_tail))
279 : 3 : r->queue[head--] = NULL;
280 : 3 : r->consumer_tail = consumer_head;
281 : : }
282 : 3 : if (unlikely(consumer_head >= r->size)) {
283 : : consumer_head = 0;
284 : 1 : r->consumer_tail = 0;
285 : : }
286 : : /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
287 : : WRITE_ONCE(r->consumer_head, consumer_head);
288 : 3 : }
289 : :
290 : 3 : static inline void *__ptr_ring_consume(struct ptr_ring *r)
291 : : {
292 : : void *ptr;
293 : :
294 : : /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
295 : : * accessing data through the pointer is up to date. Pairs
296 : : * with smp_wmb in __ptr_ring_produce.
297 : : */
298 : : ptr = __ptr_ring_peek(r);
299 : 3 : if (ptr)
300 : 3 : __ptr_ring_discard_one(r);
301 : :
302 : 3 : return ptr;
303 : : }
304 : :
305 : 0 : static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
306 : : void **array, int n)
307 : : {
308 : : void *ptr;
309 : : int i;
310 : :
311 : 0 : for (i = 0; i < n; i++) {
312 : 0 : ptr = __ptr_ring_consume(r);
313 : 0 : if (!ptr)
314 : : break;
315 : 0 : array[i] = ptr;
316 : : }
317 : :
318 : 0 : return i;
319 : : }
320 : :
321 : : /*
322 : : * Note: resize (below) nests producer lock within consumer lock, so if you
323 : : * call this in interrupt or BH context, you must disable interrupts/BH when
324 : : * producing.
325 : : */
326 : 0 : static inline void *ptr_ring_consume(struct ptr_ring *r)
327 : : {
328 : : void *ptr;
329 : :
330 : : spin_lock(&r->consumer_lock);
331 : 0 : ptr = __ptr_ring_consume(r);
332 : : spin_unlock(&r->consumer_lock);
333 : :
334 : 0 : return ptr;
335 : : }
336 : :
337 : : static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
338 : : {
339 : : void *ptr;
340 : :
341 : : spin_lock_irq(&r->consumer_lock);
342 : : ptr = __ptr_ring_consume(r);
343 : : spin_unlock_irq(&r->consumer_lock);
344 : :
345 : : return ptr;
346 : : }
347 : :
348 : : static inline void *ptr_ring_consume_any(struct ptr_ring *r)
349 : : {
350 : : unsigned long flags;
351 : : void *ptr;
352 : :
353 : : spin_lock_irqsave(&r->consumer_lock, flags);
354 : : ptr = __ptr_ring_consume(r);
355 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
356 : :
357 : : return ptr;
358 : : }
359 : :
360 : : static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
361 : : {
362 : : void *ptr;
363 : :
364 : : spin_lock_bh(&r->consumer_lock);
365 : : ptr = __ptr_ring_consume(r);
366 : : spin_unlock_bh(&r->consumer_lock);
367 : :
368 : : return ptr;
369 : : }
370 : :
371 : 0 : static inline int ptr_ring_consume_batched(struct ptr_ring *r,
372 : : void **array, int n)
373 : : {
374 : : int ret;
375 : :
376 : : spin_lock(&r->consumer_lock);
377 : 0 : ret = __ptr_ring_consume_batched(r, array, n);
378 : : spin_unlock(&r->consumer_lock);
379 : :
380 : 0 : return ret;
381 : : }
382 : :
383 : : static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
384 : : void **array, int n)
385 : : {
386 : : int ret;
387 : :
388 : : spin_lock_irq(&r->consumer_lock);
389 : : ret = __ptr_ring_consume_batched(r, array, n);
390 : : spin_unlock_irq(&r->consumer_lock);
391 : :
392 : : return ret;
393 : : }
394 : :
395 : : static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
396 : : void **array, int n)
397 : : {
398 : : unsigned long flags;
399 : : int ret;
400 : :
401 : : spin_lock_irqsave(&r->consumer_lock, flags);
402 : : ret = __ptr_ring_consume_batched(r, array, n);
403 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
404 : :
405 : : return ret;
406 : : }
407 : :
408 : : static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
409 : : void **array, int n)
410 : : {
411 : : int ret;
412 : :
413 : : spin_lock_bh(&r->consumer_lock);
414 : : ret = __ptr_ring_consume_batched(r, array, n);
415 : : spin_unlock_bh(&r->consumer_lock);
416 : :
417 : : return ret;
418 : : }
419 : :
420 : : /* Cast to structure type and call a function without discarding from FIFO.
421 : : * Function must return a value.
422 : : * Callers must take consumer_lock.
423 : : */
424 : : #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
425 : :
426 : : #define PTR_RING_PEEK_CALL(r, f) ({ \
427 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
428 : : \
429 : : spin_lock(&(r)->consumer_lock); \
430 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
431 : : spin_unlock(&(r)->consumer_lock); \
432 : : __PTR_RING_PEEK_CALL_v; \
433 : : })
434 : :
435 : : #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
436 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
437 : : \
438 : : spin_lock_irq(&(r)->consumer_lock); \
439 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
440 : : spin_unlock_irq(&(r)->consumer_lock); \
441 : : __PTR_RING_PEEK_CALL_v; \
442 : : })
443 : :
444 : : #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
445 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
446 : : \
447 : : spin_lock_bh(&(r)->consumer_lock); \
448 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
449 : : spin_unlock_bh(&(r)->consumer_lock); \
450 : : __PTR_RING_PEEK_CALL_v; \
451 : : })
452 : :
453 : : #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
454 : : typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
455 : : unsigned long __PTR_RING_PEEK_CALL_f;\
456 : : \
457 : : spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
458 : : __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
459 : : spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
460 : : __PTR_RING_PEEK_CALL_v; \
461 : : })
462 : :
463 : : /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
464 : : * documentation for vmalloc for which of them are legal.
465 : : */
466 : : static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
467 : : {
468 : 3 : if (size > KMALLOC_MAX_SIZE / sizeof(void *))
469 : : return NULL;
470 : 3 : return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO);
471 : : }
472 : :
473 : : static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
474 : : {
475 : 3 : r->size = size;
476 : 3 : r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
477 : : /* We need to set batch at least to 1 to make logic
478 : : * in __ptr_ring_discard_one work correctly.
479 : : * Batching too much (because ring is small) would cause a lot of
480 : : * burstiness. Needs tuning, for now disable batching.
481 : : */
482 : 3 : if (r->batch > r->size / 2 || !r->batch)
483 : 0 : r->batch = 1;
484 : : }
485 : :
486 : 3 : static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
487 : : {
488 : 3 : r->queue = __ptr_ring_init_queue_alloc(size, gfp);
489 : 3 : if (!r->queue)
490 : : return -ENOMEM;
491 : :
492 : : __ptr_ring_set_size(r, size);
493 : 3 : r->producer = r->consumer_head = r->consumer_tail = 0;
494 : 3 : spin_lock_init(&r->producer_lock);
495 : 3 : spin_lock_init(&r->consumer_lock);
496 : :
497 : 3 : return 0;
498 : : }
499 : :
500 : : /*
501 : : * Return entries into ring. Destroy entries that don't fit.
502 : : *
503 : : * Note: this is expected to be a rare slow path operation.
504 : : *
505 : : * Note: producer lock is nested within consumer lock, so if you
506 : : * resize you must make sure all uses nest correctly.
507 : : * In particular if you consume ring in interrupt or BH context, you must
508 : : * disable interrupts/BH when doing so.
509 : : */
510 : : static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
511 : : void (*destroy)(void *))
512 : : {
513 : : unsigned long flags;
514 : : int head;
515 : :
516 : : spin_lock_irqsave(&r->consumer_lock, flags);
517 : : spin_lock(&r->producer_lock);
518 : :
519 : : if (!r->size)
520 : : goto done;
521 : :
522 : : /*
523 : : * Clean out buffered entries (for simplicity). This way following code
524 : : * can test entries for NULL and if not assume they are valid.
525 : : */
526 : : head = r->consumer_head - 1;
527 : : while (likely(head >= r->consumer_tail))
528 : : r->queue[head--] = NULL;
529 : : r->consumer_tail = r->consumer_head;
530 : :
531 : : /*
532 : : * Go over entries in batch, start moving head back and copy entries.
533 : : * Stop when we run into previously unconsumed entries.
534 : : */
535 : : while (n) {
536 : : head = r->consumer_head - 1;
537 : : if (head < 0)
538 : : head = r->size - 1;
539 : : if (r->queue[head]) {
540 : : /* This batch entry will have to be destroyed. */
541 : : goto done;
542 : : }
543 : : r->queue[head] = batch[--n];
544 : : r->consumer_tail = head;
545 : : /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
546 : : WRITE_ONCE(r->consumer_head, head);
547 : : }
548 : :
549 : : done:
550 : : /* Destroy all entries left in the batch. */
551 : : while (n)
552 : : destroy(batch[--n]);
553 : : spin_unlock(&r->producer_lock);
554 : : spin_unlock_irqrestore(&r->consumer_lock, flags);
555 : : }
556 : :
557 : 0 : static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
558 : : int size, gfp_t gfp,
559 : : void (*destroy)(void *))
560 : : {
561 : : int producer = 0;
562 : : void **old;
563 : : void *ptr;
564 : :
565 : 0 : while ((ptr = __ptr_ring_consume(r)))
566 : 0 : if (producer < size)
567 : 0 : queue[producer++] = ptr;
568 : 0 : else if (destroy)
569 : 0 : destroy(ptr);
570 : :
571 : 0 : if (producer >= size)
572 : : producer = 0;
573 : : __ptr_ring_set_size(r, size);
574 : 0 : r->producer = producer;
575 : 0 : r->consumer_head = 0;
576 : 0 : r->consumer_tail = 0;
577 : 0 : old = r->queue;
578 : 0 : r->queue = queue;
579 : :
580 : 0 : return old;
581 : : }
582 : :
583 : : /*
584 : : * Note: producer lock is nested within consumer lock, so if you
585 : : * resize you must make sure all uses nest correctly.
586 : : * In particular if you consume ring in interrupt or BH context, you must
587 : : * disable interrupts/BH when doing so.
588 : : */
589 : : static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
590 : : void (*destroy)(void *))
591 : : {
592 : : unsigned long flags;
593 : : void **queue = __ptr_ring_init_queue_alloc(size, gfp);
594 : : void **old;
595 : :
596 : : if (!queue)
597 : : return -ENOMEM;
598 : :
599 : : spin_lock_irqsave(&(r)->consumer_lock, flags);
600 : : spin_lock(&(r)->producer_lock);
601 : :
602 : : old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
603 : :
604 : : spin_unlock(&(r)->producer_lock);
605 : : spin_unlock_irqrestore(&(r)->consumer_lock, flags);
606 : :
607 : : kvfree(old);
608 : :
609 : : return 0;
610 : : }
611 : :
612 : : /*
613 : : * Note: producer lock is nested within consumer lock, so if you
614 : : * resize you must make sure all uses nest correctly.
615 : : * In particular if you consume ring in interrupt or BH context, you must
616 : : * disable interrupts/BH when doing so.
617 : : */
618 : 0 : static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
619 : : unsigned int nrings,
620 : : int size,
621 : : gfp_t gfp, void (*destroy)(void *))
622 : : {
623 : : unsigned long flags;
624 : : void ***queues;
625 : : int i;
626 : :
627 : 0 : queues = kmalloc_array(nrings, sizeof(*queues), gfp);
628 : 0 : if (!queues)
629 : : goto noqueues;
630 : :
631 : 0 : for (i = 0; i < nrings; ++i) {
632 : 0 : queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
633 : 0 : if (!queues[i])
634 : : goto nomem;
635 : : }
636 : :
637 : 0 : for (i = 0; i < nrings; ++i) {
638 : 0 : spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
639 : 0 : spin_lock(&(rings[i])->producer_lock);
640 : 0 : queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
641 : : size, gfp, destroy);
642 : 0 : spin_unlock(&(rings[i])->producer_lock);
643 : 0 : spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
644 : : }
645 : :
646 : 0 : for (i = 0; i < nrings; ++i)
647 : 0 : kvfree(queues[i]);
648 : :
649 : 0 : kfree(queues);
650 : :
651 : 0 : return 0;
652 : :
653 : : nomem:
654 : 0 : while (--i >= 0)
655 : 0 : kvfree(queues[i]);
656 : :
657 : 0 : kfree(queues);
658 : :
659 : : noqueues:
660 : : return -ENOMEM;
661 : : }
662 : :
663 : 0 : static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
664 : : {
665 : : void *ptr;
666 : :
667 : 0 : if (destroy)
668 : 0 : while ((ptr = ptr_ring_consume(r)))
669 : 0 : destroy(ptr);
670 : 0 : kvfree(r->queue);
671 : 0 : }
672 : :
673 : : #endif /* _LINUX_PTR_RING_H */
|