Branch data Line data Source code
1 : : /*
2 : : * Copyright (C) 2002 Sistina Software (UK) Limited.
3 : : * Copyright (C) 2006 Red Hat GmbH
4 : : *
5 : : * This file is released under the GPL.
6 : : *
7 : : * Kcopyd provides a simple interface for copying an area of one
8 : : * block-device to one or more other block-devices, with an asynchronous
9 : : * completion notification.
10 : : */
11 : :
12 : : #include <linux/types.h>
13 : : #include <linux/atomic.h>
14 : : #include <linux/blkdev.h>
15 : : #include <linux/fs.h>
16 : : #include <linux/init.h>
17 : : #include <linux/list.h>
18 : : #include <linux/mempool.h>
19 : : #include <linux/module.h>
20 : : #include <linux/pagemap.h>
21 : : #include <linux/slab.h>
22 : : #include <linux/vmalloc.h>
23 : : #include <linux/workqueue.h>
24 : : #include <linux/mutex.h>
25 : : #include <linux/delay.h>
26 : : #include <linux/device-mapper.h>
27 : : #include <linux/dm-kcopyd.h>
28 : :
29 : : #include "dm-core.h"
30 : :
31 : : #define SPLIT_COUNT 8
32 : : #define MIN_JOBS 8
33 : :
34 : : #define DEFAULT_SUB_JOB_SIZE_KB 512
35 : : #define MAX_SUB_JOB_SIZE_KB 1024
36 : :
37 : : static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB;
38 : :
39 : : module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR);
40 : : MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients");
41 : :
42 : 0 : static unsigned dm_get_kcopyd_subjob_size(void)
43 : : {
44 : 0 : unsigned sub_job_size_kb;
45 : :
46 : 0 : sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb,
47 : : DEFAULT_SUB_JOB_SIZE_KB,
48 : : MAX_SUB_JOB_SIZE_KB);
49 : :
50 : 0 : return sub_job_size_kb << 1;
51 : : }
52 : :
53 : : /*-----------------------------------------------------------------
54 : : * Each kcopyd client has its own little pool of preallocated
55 : : * pages for kcopyd io.
56 : : *---------------------------------------------------------------*/
57 : : struct dm_kcopyd_client {
58 : : struct page_list *pages;
59 : : unsigned nr_reserved_pages;
60 : : unsigned nr_free_pages;
61 : : unsigned sub_job_size;
62 : :
63 : : struct dm_io_client *io_client;
64 : :
65 : : wait_queue_head_t destroyq;
66 : :
67 : : mempool_t job_pool;
68 : :
69 : : struct workqueue_struct *kcopyd_wq;
70 : : struct work_struct kcopyd_work;
71 : :
72 : : struct dm_kcopyd_throttle *throttle;
73 : :
74 : : atomic_t nr_jobs;
75 : :
76 : : /*
77 : : * We maintain four lists of jobs:
78 : : *
79 : : * i) jobs waiting for pages
80 : : * ii) jobs that have pages, and are waiting for the io to be issued.
81 : : * iii) jobs that don't need to do any IO and just run a callback
82 : : * iv) jobs that have completed.
83 : : *
84 : : * All four of these are protected by job_lock.
85 : : */
86 : : spinlock_t job_lock;
87 : : struct list_head callback_jobs;
88 : : struct list_head complete_jobs;
89 : : struct list_head io_jobs;
90 : : struct list_head pages_jobs;
91 : : };
92 : :
93 : : static struct page_list zero_page_list;
94 : :
95 : : static DEFINE_SPINLOCK(throttle_spinlock);
96 : :
97 : : /*
98 : : * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
99 : : * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
100 : : * by 2.
101 : : */
102 : : #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ
103 : :
104 : : /*
105 : : * Sleep this number of milliseconds.
106 : : *
107 : : * The value was decided experimentally.
108 : : * Smaller values seem to cause an increased copy rate above the limit.
109 : : * The reason for this is unknown but possibly due to jiffies rounding errors
110 : : * or read/write cache inside the disk.
111 : : */
112 : : #define SLEEP_MSEC 100
113 : :
114 : : /*
115 : : * Maximum number of sleep events. There is a theoretical livelock if more
116 : : * kcopyd clients do work simultaneously which this limit avoids.
117 : : */
118 : : #define MAX_SLEEPS 10
119 : :
120 : 0 : static void io_job_start(struct dm_kcopyd_throttle *t)
121 : : {
122 : 0 : unsigned throttle, now, difference;
123 : 0 : int slept = 0, skew;
124 : :
125 [ # # ]: 0 : if (unlikely(!t))
126 : : return;
127 : :
128 : 0 : try_again:
129 : 0 : spin_lock_irq(&throttle_spinlock);
130 : :
131 [ # # ]: 0 : throttle = READ_ONCE(t->throttle);
132 : :
133 [ # # ]: 0 : if (likely(throttle >= 100))
134 : 0 : goto skip_limit;
135 : :
136 : 0 : now = jiffies;
137 : 0 : difference = now - t->last_jiffies;
138 : 0 : t->last_jiffies = now;
139 [ # # ]: 0 : if (t->num_io_jobs)
140 : 0 : t->io_period += difference;
141 : 0 : t->total_period += difference;
142 : :
143 : : /*
144 : : * Maintain sane values if we got a temporary overflow.
145 : : */
146 [ # # ]: 0 : if (unlikely(t->io_period > t->total_period))
147 : 0 : t->io_period = t->total_period;
148 : :
149 [ # # ]: 0 : if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
150 : 0 : int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
151 : 0 : t->total_period >>= shift;
152 : 0 : t->io_period >>= shift;
153 : : }
154 : :
155 : 0 : skew = t->io_period - throttle * t->total_period / 100;
156 : :
157 [ # # # # ]: 0 : if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
158 : 0 : slept++;
159 : 0 : spin_unlock_irq(&throttle_spinlock);
160 : 0 : msleep(SLEEP_MSEC);
161 : 0 : goto try_again;
162 : : }
163 : :
164 : 0 : skip_limit:
165 : 0 : t->num_io_jobs++;
166 : :
167 : 0 : spin_unlock_irq(&throttle_spinlock);
168 : : }
169 : :
170 : 0 : static void io_job_finish(struct dm_kcopyd_throttle *t)
171 : : {
172 : 0 : unsigned long flags;
173 : :
174 [ # # ]: 0 : if (unlikely(!t))
175 : : return;
176 : :
177 : 0 : spin_lock_irqsave(&throttle_spinlock, flags);
178 : :
179 : 0 : t->num_io_jobs--;
180 : :
181 [ # # ]: 0 : if (likely(READ_ONCE(t->throttle) >= 100))
182 : 0 : goto skip_limit;
183 : :
184 [ # # ]: 0 : if (!t->num_io_jobs) {
185 : 0 : unsigned now, difference;
186 : :
187 : 0 : now = jiffies;
188 : 0 : difference = now - t->last_jiffies;
189 : 0 : t->last_jiffies = now;
190 : :
191 : 0 : t->io_period += difference;
192 : 0 : t->total_period += difference;
193 : :
194 : : /*
195 : : * Maintain sane values if we got a temporary overflow.
196 : : */
197 [ # # ]: 0 : if (unlikely(t->io_period > t->total_period))
198 : 0 : t->io_period = t->total_period;
199 : : }
200 : :
201 : 0 : skip_limit:
202 : 0 : spin_unlock_irqrestore(&throttle_spinlock, flags);
203 : : }
204 : :
205 : :
206 : 0 : static void wake(struct dm_kcopyd_client *kc)
207 : : {
208 : 0 : queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
209 : 0 : }
210 : :
211 : : /*
212 : : * Obtain one page for the use of kcopyd.
213 : : */
214 : 0 : static struct page_list *alloc_pl(gfp_t gfp)
215 : : {
216 : 0 : struct page_list *pl;
217 : :
218 [ # # ]: 0 : pl = kmalloc(sizeof(*pl), gfp);
219 [ # # ]: 0 : if (!pl)
220 : : return NULL;
221 : :
222 : 0 : pl->page = alloc_page(gfp);
223 [ # # ]: 0 : if (!pl->page) {
224 : 0 : kfree(pl);
225 : 0 : return NULL;
226 : : }
227 : :
228 : : return pl;
229 : : }
230 : :
231 : 0 : static void free_pl(struct page_list *pl)
232 : : {
233 : 0 : __free_page(pl->page);
234 : 0 : kfree(pl);
235 : 0 : }
236 : :
237 : : /*
238 : : * Add the provided pages to a client's free page list, releasing
239 : : * back to the system any beyond the reserved_pages limit.
240 : : */
241 : 0 : static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
242 : : {
243 : 0 : struct page_list *next;
244 : :
245 : 0 : do {
246 : 0 : next = pl->next;
247 : :
248 [ # # ]: 0 : if (kc->nr_free_pages >= kc->nr_reserved_pages)
249 : 0 : free_pl(pl);
250 : : else {
251 : 0 : pl->next = kc->pages;
252 : 0 : kc->pages = pl;
253 : 0 : kc->nr_free_pages++;
254 : : }
255 : :
256 : 0 : pl = next;
257 [ # # ]: 0 : } while (pl);
258 : 0 : }
259 : :
260 : 0 : static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
261 : : unsigned int nr, struct page_list **pages)
262 : : {
263 : 0 : struct page_list *pl;
264 : :
265 : 0 : *pages = NULL;
266 : :
267 : 0 : do {
268 : 0 : pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
269 [ # # ]: 0 : if (unlikely(!pl)) {
270 : : /* Use reserved pages */
271 : 0 : pl = kc->pages;
272 [ # # ]: 0 : if (unlikely(!pl))
273 : 0 : goto out_of_memory;
274 : 0 : kc->pages = pl->next;
275 : 0 : kc->nr_free_pages--;
276 : : }
277 : 0 : pl->next = *pages;
278 : 0 : *pages = pl;
279 [ # # ]: 0 : } while (--nr);
280 : :
281 : : return 0;
282 : :
283 : : out_of_memory:
284 [ # # ]: 0 : if (*pages)
285 : 0 : kcopyd_put_pages(kc, *pages);
286 : : return -ENOMEM;
287 : : }
288 : :
289 : : /*
290 : : * These three functions resize the page pool.
291 : : */
292 : 0 : static void drop_pages(struct page_list *pl)
293 : : {
294 : 0 : struct page_list *next;
295 : :
296 [ # # ]: 0 : while (pl) {
297 : 0 : next = pl->next;
298 : 0 : free_pl(pl);
299 : 0 : pl = next;
300 : : }
301 : 0 : }
302 : :
303 : : /*
304 : : * Allocate and reserve nr_pages for the use of a specific client.
305 : : */
306 : 0 : static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
307 : : {
308 : 0 : unsigned i;
309 : 0 : struct page_list *pl = NULL, *next;
310 : :
311 [ # # ]: 0 : for (i = 0; i < nr_pages; i++) {
312 : 0 : next = alloc_pl(GFP_KERNEL);
313 [ # # ]: 0 : if (!next) {
314 [ # # ]: 0 : if (pl)
315 : 0 : drop_pages(pl);
316 : 0 : return -ENOMEM;
317 : : }
318 : 0 : next->next = pl;
319 : 0 : pl = next;
320 : : }
321 : :
322 : 0 : kc->nr_reserved_pages += nr_pages;
323 : 0 : kcopyd_put_pages(kc, pl);
324 : :
325 : 0 : return 0;
326 : : }
327 : :
328 : 0 : static void client_free_pages(struct dm_kcopyd_client *kc)
329 : : {
330 [ # # ]: 0 : BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
331 : 0 : drop_pages(kc->pages);
332 : 0 : kc->pages = NULL;
333 : 0 : kc->nr_free_pages = kc->nr_reserved_pages = 0;
334 : 0 : }
335 : :
336 : : /*-----------------------------------------------------------------
337 : : * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
338 : : * for this reason we use a mempool to prevent the client from
339 : : * ever having to do io (which could cause a deadlock).
340 : : *---------------------------------------------------------------*/
341 : : struct kcopyd_job {
342 : : struct dm_kcopyd_client *kc;
343 : : struct list_head list;
344 : : unsigned long flags;
345 : :
346 : : /*
347 : : * Error state of the job.
348 : : */
349 : : int read_err;
350 : : unsigned long write_err;
351 : :
352 : : /*
353 : : * Either READ or WRITE
354 : : */
355 : : int rw;
356 : : struct dm_io_region source;
357 : :
358 : : /*
359 : : * The destinations for the transfer.
360 : : */
361 : : unsigned int num_dests;
362 : : struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
363 : :
364 : : struct page_list *pages;
365 : :
366 : : /*
367 : : * Set this to ensure you are notified when the job has
368 : : * completed. 'context' is for callback to use.
369 : : */
370 : : dm_kcopyd_notify_fn fn;
371 : : void *context;
372 : :
373 : : /*
374 : : * These fields are only used if the job has been split
375 : : * into more manageable parts.
376 : : */
377 : : struct mutex lock;
378 : : atomic_t sub_jobs;
379 : : sector_t progress;
380 : : sector_t write_offset;
381 : :
382 : : struct kcopyd_job *master_job;
383 : : };
384 : :
385 : : static struct kmem_cache *_job_cache;
386 : :
387 : 28 : int __init dm_kcopyd_init(void)
388 : : {
389 : 28 : _job_cache = kmem_cache_create("kcopyd_job",
390 : : sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
391 : : __alignof__(struct kcopyd_job), 0, NULL);
392 [ + - ]: 28 : if (!_job_cache)
393 : : return -ENOMEM;
394 : :
395 : 28 : zero_page_list.next = &zero_page_list;
396 [ - + ]: 28 : zero_page_list.page = ZERO_PAGE(0);
397 : :
398 : 28 : return 0;
399 : : }
400 : :
401 : 0 : void dm_kcopyd_exit(void)
402 : : {
403 : 0 : kmem_cache_destroy(_job_cache);
404 : 0 : _job_cache = NULL;
405 : 0 : }
406 : :
407 : : /*
408 : : * Functions to push and pop a job onto the head of a given job
409 : : * list.
410 : : */
411 : : static struct kcopyd_job *pop_io_job(struct list_head *jobs,
412 : : struct dm_kcopyd_client *kc)
413 : : {
414 : : struct kcopyd_job *job;
415 : :
416 : : /*
417 : : * For I/O jobs, pop any read, any write without sequential write
418 : : * constraint and sequential writes that are at the right position.
419 : : */
420 : : list_for_each_entry(job, jobs, list) {
421 : : if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
422 : : list_del(&job->list);
423 : : return job;
424 : : }
425 : :
426 : : if (job->write_offset == job->master_job->write_offset) {
427 : : job->master_job->write_offset += job->source.count;
428 : : list_del(&job->list);
429 : : return job;
430 : : }
431 : : }
432 : :
433 : : return NULL;
434 : : }
435 : :
436 : 0 : static struct kcopyd_job *pop(struct list_head *jobs,
437 : : struct dm_kcopyd_client *kc)
438 : : {
439 : 0 : struct kcopyd_job *job = NULL;
440 : 0 : unsigned long flags;
441 : :
442 : 0 : spin_lock_irqsave(&kc->job_lock, flags);
443 : :
444 [ # # ]: 0 : if (!list_empty(jobs)) {
445 [ # # ]: 0 : if (jobs == &kc->io_jobs)
446 : 0 : job = pop_io_job(jobs, kc);
447 : : else {
448 : 0 : job = list_entry(jobs->next, struct kcopyd_job, list);
449 : 0 : list_del(&job->list);
450 : : }
451 : : }
452 : 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
453 : :
454 : 0 : return job;
455 : : }
456 : :
457 : 0 : static void push(struct list_head *jobs, struct kcopyd_job *job)
458 : : {
459 : 0 : unsigned long flags;
460 : 0 : struct dm_kcopyd_client *kc = job->kc;
461 : :
462 : 0 : spin_lock_irqsave(&kc->job_lock, flags);
463 : 0 : list_add_tail(&job->list, jobs);
464 : 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
465 : 0 : }
466 : :
467 : :
468 : 0 : static void push_head(struct list_head *jobs, struct kcopyd_job *job)
469 : : {
470 : 0 : unsigned long flags;
471 : 0 : struct dm_kcopyd_client *kc = job->kc;
472 : :
473 : 0 : spin_lock_irqsave(&kc->job_lock, flags);
474 : 0 : list_add(&job->list, jobs);
475 : 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
476 : 0 : }
477 : :
478 : : /*
479 : : * These three functions process 1 item from the corresponding
480 : : * job list.
481 : : *
482 : : * They return:
483 : : * < 0: error
484 : : * 0: success
485 : : * > 0: can't process yet.
486 : : */
487 : 0 : static int run_complete_job(struct kcopyd_job *job)
488 : : {
489 : 0 : void *context = job->context;
490 : 0 : int read_err = job->read_err;
491 : 0 : unsigned long write_err = job->write_err;
492 : 0 : dm_kcopyd_notify_fn fn = job->fn;
493 : 0 : struct dm_kcopyd_client *kc = job->kc;
494 : :
495 [ # # # # ]: 0 : if (job->pages && job->pages != &zero_page_list)
496 : 0 : kcopyd_put_pages(kc, job->pages);
497 : : /*
498 : : * If this is the master job, the sub jobs have already
499 : : * completed so we can free everything.
500 : : */
501 [ # # ]: 0 : if (job->master_job == job) {
502 : 0 : mutex_destroy(&job->lock);
503 : 0 : mempool_free(job, &kc->job_pool);
504 : : }
505 : 0 : fn(read_err, write_err, context);
506 : :
507 [ # # ]: 0 : if (atomic_dec_and_test(&kc->nr_jobs))
508 : 0 : wake_up(&kc->destroyq);
509 : :
510 : 0 : cond_resched();
511 : :
512 : 0 : return 0;
513 : : }
514 : :
515 : 0 : static void complete_io(unsigned long error, void *context)
516 : : {
517 : 0 : struct kcopyd_job *job = (struct kcopyd_job *) context;
518 : 0 : struct dm_kcopyd_client *kc = job->kc;
519 : :
520 : 0 : io_job_finish(kc->throttle);
521 : :
522 [ # # ]: 0 : if (error) {
523 [ # # ]: 0 : if (op_is_write(job->rw))
524 : 0 : job->write_err |= error;
525 : : else
526 : 0 : job->read_err = 1;
527 : :
528 [ # # ]: 0 : if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
529 : 0 : push(&kc->complete_jobs, job);
530 : 0 : wake(kc);
531 : 0 : return;
532 : : }
533 : : }
534 : :
535 [ # # ]: 0 : if (op_is_write(job->rw))
536 : 0 : push(&kc->complete_jobs, job);
537 : :
538 : : else {
539 : 0 : job->rw = WRITE;
540 : 0 : push(&kc->io_jobs, job);
541 : : }
542 : :
543 : 0 : wake(kc);
544 : : }
545 : :
546 : : /*
547 : : * Request io on as many buffer heads as we can currently get for
548 : : * a particular job.
549 : : */
550 : 0 : static int run_io_job(struct kcopyd_job *job)
551 : : {
552 : 0 : int r;
553 : 0 : struct dm_io_request io_req = {
554 : 0 : .bi_op = job->rw,
555 : : .bi_op_flags = 0,
556 : : .mem.type = DM_IO_PAGE_LIST,
557 : 0 : .mem.ptr.pl = job->pages,
558 : : .mem.offset = 0,
559 : : .notify.fn = complete_io,
560 : : .notify.context = job,
561 : 0 : .client = job->kc->io_client,
562 : : };
563 : :
564 : : /*
565 : : * If we need to write sequentially and some reads or writes failed,
566 : : * no point in continuing.
567 : : */
568 [ # # ]: 0 : if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
569 [ # # ]: 0 : job->master_job->write_err) {
570 : 0 : job->write_err = job->master_job->write_err;
571 : 0 : return -EIO;
572 : : }
573 : :
574 : 0 : io_job_start(job->kc->throttle);
575 : :
576 [ # # ]: 0 : if (job->rw == READ)
577 : 0 : r = dm_io(&io_req, 1, &job->source, NULL);
578 : : else
579 : 0 : r = dm_io(&io_req, job->num_dests, job->dests, NULL);
580 : :
581 : : return r;
582 : : }
583 : :
584 : 0 : static int run_pages_job(struct kcopyd_job *job)
585 : : {
586 : 0 : int r;
587 : 0 : unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
588 : :
589 : 0 : r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
590 [ # # ]: 0 : if (!r) {
591 : : /* this job is ready for io */
592 : 0 : push(&job->kc->io_jobs, job);
593 : 0 : return 0;
594 : : }
595 : :
596 [ # # ]: 0 : if (r == -ENOMEM)
597 : : /* can't complete now */
598 : 0 : return 1;
599 : :
600 : : return r;
601 : : }
602 : :
603 : : /*
604 : : * Run through a list for as long as possible. Returns the count
605 : : * of successful jobs.
606 : : */
607 : 0 : static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
608 : : int (*fn) (struct kcopyd_job *))
609 : : {
610 : 0 : struct kcopyd_job *job;
611 : 0 : int r, count = 0;
612 : :
613 [ # # ]: 0 : while ((job = pop(jobs, kc))) {
614 : :
615 : 0 : r = fn(job);
616 : :
617 [ # # ]: 0 : if (r < 0) {
618 : : /* error this rogue job */
619 [ # # ]: 0 : if (op_is_write(job->rw))
620 : 0 : job->write_err = (unsigned long) -1L;
621 : : else
622 : 0 : job->read_err = 1;
623 : 0 : push(&kc->complete_jobs, job);
624 : 0 : wake(kc);
625 : : break;
626 : : }
627 : :
628 [ # # ]: 0 : if (r > 0) {
629 : : /*
630 : : * We couldn't service this job ATM, so
631 : : * push this job back onto the list.
632 : : */
633 : 0 : push_head(jobs, job);
634 : 0 : break;
635 : : }
636 : :
637 : 0 : count++;
638 : : }
639 : :
640 : 0 : return count;
641 : : }
642 : :
643 : : /*
644 : : * kcopyd does this every time it's woken up.
645 : : */
646 : 0 : static void do_work(struct work_struct *work)
647 : : {
648 : 0 : struct dm_kcopyd_client *kc = container_of(work,
649 : : struct dm_kcopyd_client, kcopyd_work);
650 : 0 : struct blk_plug plug;
651 : 0 : unsigned long flags;
652 : :
653 : : /*
654 : : * The order that these are called is *very* important.
655 : : * complete jobs can free some pages for pages jobs.
656 : : * Pages jobs when successful will jump onto the io jobs
657 : : * list. io jobs call wake when they complete and it all
658 : : * starts again.
659 : : */
660 : 0 : spin_lock_irqsave(&kc->job_lock, flags);
661 [ # # ]: 0 : list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
662 : 0 : spin_unlock_irqrestore(&kc->job_lock, flags);
663 : :
664 : 0 : blk_start_plug(&plug);
665 : 0 : process_jobs(&kc->complete_jobs, kc, run_complete_job);
666 : 0 : process_jobs(&kc->pages_jobs, kc, run_pages_job);
667 : 0 : process_jobs(&kc->io_jobs, kc, run_io_job);
668 : 0 : blk_finish_plug(&plug);
669 : 0 : }
670 : :
671 : : /*
672 : : * If we are copying a small region we just dispatch a single job
673 : : * to do the copy, otherwise the io has to be split up into many
674 : : * jobs.
675 : : */
676 : 0 : static void dispatch_job(struct kcopyd_job *job)
677 : : {
678 : 0 : struct dm_kcopyd_client *kc = job->kc;
679 : 0 : atomic_inc(&kc->nr_jobs);
680 [ # # ]: 0 : if (unlikely(!job->source.count))
681 : 0 : push(&kc->callback_jobs, job);
682 [ # # ]: 0 : else if (job->pages == &zero_page_list)
683 : 0 : push(&kc->io_jobs, job);
684 : : else
685 : 0 : push(&kc->pages_jobs, job);
686 : 0 : wake(kc);
687 : 0 : }
688 : :
689 : 0 : static void segment_complete(int read_err, unsigned long write_err,
690 : : void *context)
691 : : {
692 : : /* FIXME: tidy this function */
693 : 0 : sector_t progress = 0;
694 : 0 : sector_t count = 0;
695 : 0 : struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
696 : 0 : struct kcopyd_job *job = sub_job->master_job;
697 : 0 : struct dm_kcopyd_client *kc = job->kc;
698 : :
699 : 0 : mutex_lock(&job->lock);
700 : :
701 : : /* update the error */
702 [ # # ]: 0 : if (read_err)
703 : 0 : job->read_err = 1;
704 : :
705 [ # # ]: 0 : if (write_err)
706 : 0 : job->write_err |= write_err;
707 : :
708 : : /*
709 : : * Only dispatch more work if there hasn't been an error.
710 : : */
711 [ # # # # : 0 : if ((!job->read_err && !job->write_err) ||
# # ]
712 : 0 : test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
713 : : /* get the next chunk of work */
714 : 0 : progress = job->progress;
715 : 0 : count = job->source.count - progress;
716 [ # # ]: 0 : if (count) {
717 : 0 : if (count > kc->sub_job_size)
718 : : count = kc->sub_job_size;
719 : :
720 : 0 : job->progress += count;
721 : : }
722 : : }
723 : 0 : mutex_unlock(&job->lock);
724 : :
725 [ # # ]: 0 : if (count) {
726 : 0 : int i;
727 : :
728 : 0 : *sub_job = *job;
729 : 0 : sub_job->write_offset = progress;
730 : 0 : sub_job->source.sector += progress;
731 : 0 : sub_job->source.count = count;
732 : :
733 [ # # ]: 0 : for (i = 0; i < job->num_dests; i++) {
734 : 0 : sub_job->dests[i].sector += progress;
735 : 0 : sub_job->dests[i].count = count;
736 : : }
737 : :
738 : 0 : sub_job->fn = segment_complete;
739 : 0 : sub_job->context = sub_job;
740 : 0 : dispatch_job(sub_job);
741 : :
742 [ # # ]: 0 : } else if (atomic_dec_and_test(&job->sub_jobs)) {
743 : :
744 : : /*
745 : : * Queue the completion callback to the kcopyd thread.
746 : : *
747 : : * Some callers assume that all the completions are called
748 : : * from a single thread and don't race with each other.
749 : : *
750 : : * We must not call the callback directly here because this
751 : : * code may not be executing in the thread.
752 : : */
753 : 0 : push(&kc->complete_jobs, job);
754 : 0 : wake(kc);
755 : : }
756 : 0 : }
757 : :
758 : : /*
759 : : * Create some sub jobs to share the work between them.
760 : : */
761 : 0 : static void split_job(struct kcopyd_job *master_job)
762 : : {
763 : 0 : int i;
764 : :
765 : 0 : atomic_inc(&master_job->kc->nr_jobs);
766 : :
767 : 0 : atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
768 [ # # ]: 0 : for (i = 0; i < SPLIT_COUNT; i++) {
769 : 0 : master_job[i + 1].master_job = master_job;
770 : 0 : segment_complete(0, 0u, &master_job[i + 1]);
771 : : }
772 : 0 : }
773 : :
774 : 0 : void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
775 : : unsigned int num_dests, struct dm_io_region *dests,
776 : : unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
777 : : {
778 : 0 : struct kcopyd_job *job;
779 : 0 : int i;
780 : :
781 : : /*
782 : : * Allocate an array of jobs consisting of one master job
783 : : * followed by SPLIT_COUNT sub jobs.
784 : : */
785 : 0 : job = mempool_alloc(&kc->job_pool, GFP_NOIO);
786 : 0 : mutex_init(&job->lock);
787 : :
788 : : /*
789 : : * set up for the read.
790 : : */
791 : 0 : job->kc = kc;
792 : 0 : job->flags = flags;
793 : 0 : job->read_err = 0;
794 : 0 : job->write_err = 0;
795 : :
796 : 0 : job->num_dests = num_dests;
797 : 0 : memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
798 : :
799 : : /*
800 : : * If one of the destination is a host-managed zoned block device,
801 : : * we need to write sequentially. If one of the destination is a
802 : : * host-aware device, then leave it to the caller to choose what to do.
803 : : */
804 [ # # ]: 0 : if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
805 [ # # ]: 0 : for (i = 0; i < job->num_dests; i++) {
806 [ # # # # ]: 0 : if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
807 : 0 : set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
808 : 0 : break;
809 : : }
810 : : }
811 : : }
812 : :
813 : : /*
814 : : * If we need to write sequentially, errors cannot be ignored.
815 : : */
816 [ # # # # ]: 0 : if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
817 : : test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
818 : 0 : clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
819 : :
820 [ # # ]: 0 : if (from) {
821 : 0 : job->source = *from;
822 : 0 : job->pages = NULL;
823 : 0 : job->rw = READ;
824 : : } else {
825 : 0 : memset(&job->source, 0, sizeof job->source);
826 : 0 : job->source.count = job->dests[0].count;
827 : 0 : job->pages = &zero_page_list;
828 : :
829 : : /*
830 : : * Use WRITE ZEROES to optimize zeroing if all dests support it.
831 : : */
832 : 0 : job->rw = REQ_OP_WRITE_ZEROES;
833 [ # # ]: 0 : for (i = 0; i < job->num_dests; i++)
834 [ # # # # ]: 0 : if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
835 : 0 : job->rw = WRITE;
836 : 0 : break;
837 : : }
838 : : }
839 : :
840 : 0 : job->fn = fn;
841 : 0 : job->context = context;
842 : 0 : job->master_job = job;
843 : 0 : job->write_offset = 0;
844 : :
845 [ # # ]: 0 : if (job->source.count <= kc->sub_job_size)
846 : 0 : dispatch_job(job);
847 : : else {
848 : 0 : job->progress = 0;
849 : 0 : split_job(job);
850 : : }
851 : 0 : }
852 : : EXPORT_SYMBOL(dm_kcopyd_copy);
853 : :
854 : 0 : void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
855 : : unsigned num_dests, struct dm_io_region *dests,
856 : : unsigned flags, dm_kcopyd_notify_fn fn, void *context)
857 : : {
858 : 0 : dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
859 : 0 : }
860 : : EXPORT_SYMBOL(dm_kcopyd_zero);
861 : :
862 : 0 : void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
863 : : dm_kcopyd_notify_fn fn, void *context)
864 : : {
865 : 0 : struct kcopyd_job *job;
866 : :
867 : 0 : job = mempool_alloc(&kc->job_pool, GFP_NOIO);
868 : :
869 : 0 : memset(job, 0, sizeof(struct kcopyd_job));
870 : 0 : job->kc = kc;
871 : 0 : job->fn = fn;
872 : 0 : job->context = context;
873 : 0 : job->master_job = job;
874 : :
875 : 0 : atomic_inc(&kc->nr_jobs);
876 : :
877 : 0 : return job;
878 : : }
879 : : EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
880 : :
881 : 0 : void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
882 : : {
883 : 0 : struct kcopyd_job *job = j;
884 : 0 : struct dm_kcopyd_client *kc = job->kc;
885 : :
886 : 0 : job->read_err = read_err;
887 : 0 : job->write_err = write_err;
888 : :
889 : 0 : push(&kc->callback_jobs, job);
890 : 0 : wake(kc);
891 : 0 : }
892 : : EXPORT_SYMBOL(dm_kcopyd_do_callback);
893 : :
894 : : /*
895 : : * Cancels a kcopyd job, eg. someone might be deactivating a
896 : : * mirror.
897 : : */
898 : : #if 0
899 : : int kcopyd_cancel(struct kcopyd_job *job, int block)
900 : : {
901 : : /* FIXME: finish */
902 : : return -1;
903 : : }
904 : : #endif /* 0 */
905 : :
906 : : /*-----------------------------------------------------------------
907 : : * Client setup
908 : : *---------------------------------------------------------------*/
909 : 0 : struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
910 : : {
911 : 0 : int r;
912 : 0 : unsigned reserve_pages;
913 : 0 : struct dm_kcopyd_client *kc;
914 : :
915 : 0 : kc = kzalloc(sizeof(*kc), GFP_KERNEL);
916 [ # # ]: 0 : if (!kc)
917 : : return ERR_PTR(-ENOMEM);
918 : :
919 : 0 : spin_lock_init(&kc->job_lock);
920 : 0 : INIT_LIST_HEAD(&kc->callback_jobs);
921 : 0 : INIT_LIST_HEAD(&kc->complete_jobs);
922 : 0 : INIT_LIST_HEAD(&kc->io_jobs);
923 : 0 : INIT_LIST_HEAD(&kc->pages_jobs);
924 : 0 : kc->throttle = throttle;
925 : :
926 : 0 : r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
927 [ # # ]: 0 : if (r)
928 : 0 : goto bad_slab;
929 : :
930 : 0 : INIT_WORK(&kc->kcopyd_work, do_work);
931 : 0 : kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
932 [ # # ]: 0 : if (!kc->kcopyd_wq) {
933 : 0 : r = -ENOMEM;
934 : 0 : goto bad_workqueue;
935 : : }
936 : :
937 : 0 : kc->sub_job_size = dm_get_kcopyd_subjob_size();
938 : 0 : reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE);
939 : :
940 : 0 : kc->pages = NULL;
941 : 0 : kc->nr_reserved_pages = kc->nr_free_pages = 0;
942 : 0 : r = client_reserve_pages(kc, reserve_pages);
943 [ # # ]: 0 : if (r)
944 : 0 : goto bad_client_pages;
945 : :
946 : 0 : kc->io_client = dm_io_client_create();
947 [ # # ]: 0 : if (IS_ERR(kc->io_client)) {
948 : 0 : r = PTR_ERR(kc->io_client);
949 : 0 : goto bad_io_client;
950 : : }
951 : :
952 : 0 : init_waitqueue_head(&kc->destroyq);
953 : 0 : atomic_set(&kc->nr_jobs, 0);
954 : :
955 : 0 : return kc;
956 : :
957 : : bad_io_client:
958 : 0 : client_free_pages(kc);
959 : 0 : bad_client_pages:
960 : 0 : destroy_workqueue(kc->kcopyd_wq);
961 : 0 : bad_workqueue:
962 : 0 : mempool_exit(&kc->job_pool);
963 : 0 : bad_slab:
964 : 0 : kfree(kc);
965 : :
966 : 0 : return ERR_PTR(r);
967 : : }
968 : : EXPORT_SYMBOL(dm_kcopyd_client_create);
969 : :
970 : 0 : void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
971 : : {
972 : : /* Wait for completion of all jobs submitted by this client. */
973 [ # # # # ]: 0 : wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
974 : :
975 [ # # ]: 0 : BUG_ON(!list_empty(&kc->callback_jobs));
976 [ # # ]: 0 : BUG_ON(!list_empty(&kc->complete_jobs));
977 [ # # ]: 0 : BUG_ON(!list_empty(&kc->io_jobs));
978 [ # # ]: 0 : BUG_ON(!list_empty(&kc->pages_jobs));
979 : 0 : destroy_workqueue(kc->kcopyd_wq);
980 : 0 : dm_io_client_destroy(kc->io_client);
981 : 0 : client_free_pages(kc);
982 : 0 : mempool_exit(&kc->job_pool);
983 : 0 : kfree(kc);
984 : 0 : }
985 : : EXPORT_SYMBOL(dm_kcopyd_client_destroy);
|