Line data Source code
1 : // SPDX-License-Identifier: GPL-2.0
2 : /*
3 : * Copyright (C) 2007 Oracle. All rights reserved.
4 : * Copyright (C) 2014 Fujitsu. All rights reserved.
5 : */
6 :
7 : #include <linux/kthread.h>
8 : #include <linux/slab.h>
9 : #include <linux/list.h>
10 : #include <linux/spinlock.h>
11 : #include <linux/freezer.h>
12 : #include "async-thread.h"
13 : #include "ctree.h"
14 :
15 : enum {
16 : WORK_DONE_BIT,
17 : WORK_ORDER_DONE_BIT,
18 : };
19 :
20 : #define NO_THRESHOLD (-1)
21 : #define DFT_THRESHOLD (32)
22 :
23 : struct btrfs_workqueue {
24 : struct workqueue_struct *normal_wq;
25 :
26 : /* File system this workqueue services */
27 : struct btrfs_fs_info *fs_info;
28 :
29 : /* List head pointing to ordered work list */
30 : struct list_head ordered_list;
31 :
32 : /* Spinlock for ordered_list */
33 : spinlock_t list_lock;
34 :
35 : /* Thresholding related variants */
36 : atomic_t pending;
37 :
38 : /* Up limit of concurrency workers */
39 : int limit_active;
40 :
41 : /* Current number of concurrency workers */
42 : int current_active;
43 :
44 : /* Threshold to change current_active */
45 : int thresh;
46 : unsigned int count;
47 : spinlock_t thres_lock;
48 : };
49 :
50 0 : struct btrfs_fs_info * __pure btrfs_workqueue_owner(const struct btrfs_workqueue *wq)
51 : {
52 0 : return wq->fs_info;
53 : }
54 :
55 39989 : struct btrfs_fs_info * __pure btrfs_work_owner(const struct btrfs_work *work)
56 : {
57 39989 : return work->wq->fs_info;
58 : }
59 :
60 1550958 : bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue *wq)
61 : {
62 : /*
63 : * We could compare wq->pending with num_online_cpus()
64 : * to support "thresh == NO_THRESHOLD" case, but it requires
65 : * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
66 : * postpone it until someone needs the support of that case.
67 : */
68 1550958 : if (wq->thresh == NO_THRESHOLD)
69 : return false;
70 :
71 1550958 : return atomic_read(&wq->pending) > wq->thresh * 2;
72 : }
73 :
74 28908 : static void btrfs_init_workqueue(struct btrfs_workqueue *wq,
75 : struct btrfs_fs_info *fs_info)
76 : {
77 28908 : wq->fs_info = fs_info;
78 28908 : atomic_set(&wq->pending, 0);
79 28908 : INIT_LIST_HEAD(&wq->ordered_list);
80 28908 : spin_lock_init(&wq->list_lock);
81 28908 : spin_lock_init(&wq->thres_lock);
82 28908 : }
83 :
84 22484 : struct btrfs_workqueue *btrfs_alloc_workqueue(struct btrfs_fs_info *fs_info,
85 : const char *name, unsigned int flags,
86 : int limit_active, int thresh)
87 : {
88 22484 : struct btrfs_workqueue *ret = kzalloc(sizeof(*ret), GFP_KERNEL);
89 :
90 22484 : if (!ret)
91 : return NULL;
92 :
93 22484 : btrfs_init_workqueue(ret, fs_info);
94 :
95 22484 : ret->limit_active = limit_active;
96 22484 : if (thresh == 0)
97 : thresh = DFT_THRESHOLD;
98 : /* For low threshold, disabling threshold is a better choice */
99 9636 : if (thresh < DFT_THRESHOLD) {
100 9636 : ret->current_active = limit_active;
101 9636 : ret->thresh = NO_THRESHOLD;
102 : } else {
103 : /*
104 : * For threshold-able wq, let its concurrency grow on demand.
105 : * Use minimal max_active at alloc time to reduce resource
106 : * usage.
107 : */
108 12848 : ret->current_active = 1;
109 12848 : ret->thresh = thresh;
110 : }
111 :
112 22484 : ret->normal_wq = alloc_workqueue("btrfs-%s", flags, ret->current_active,
113 : name);
114 22484 : if (!ret->normal_wq) {
115 0 : kfree(ret);
116 0 : return NULL;
117 : }
118 :
119 22484 : trace_btrfs_workqueue_alloc(ret, name);
120 22484 : return ret;
121 : }
122 :
123 6424 : struct btrfs_workqueue *btrfs_alloc_ordered_workqueue(
124 : struct btrfs_fs_info *fs_info, const char *name,
125 : unsigned int flags)
126 : {
127 6424 : struct btrfs_workqueue *ret;
128 :
129 6424 : ret = kzalloc(sizeof(*ret), GFP_KERNEL);
130 6424 : if (!ret)
131 : return NULL;
132 :
133 6424 : btrfs_init_workqueue(ret, fs_info);
134 :
135 : /* Ordered workqueues don't allow @max_active adjustments. */
136 6424 : ret->limit_active = 1;
137 6424 : ret->current_active = 1;
138 6424 : ret->thresh = NO_THRESHOLD;
139 :
140 6424 : ret->normal_wq = alloc_ordered_workqueue("btrfs-%s", flags, name);
141 6424 : if (!ret->normal_wq) {
142 0 : kfree(ret);
143 0 : return NULL;
144 : }
145 :
146 6424 : trace_btrfs_workqueue_alloc(ret, name);
147 6424 : return ret;
148 : }
149 :
150 : /*
151 : * Hook for threshold which will be called in btrfs_queue_work.
152 : * This hook WILL be called in IRQ handler context,
153 : * so workqueue_set_max_active MUST NOT be called in this hook
154 : */
155 4263645 : static inline void thresh_queue_hook(struct btrfs_workqueue *wq)
156 : {
157 4263645 : if (wq->thresh == NO_THRESHOLD)
158 : return;
159 812917 : atomic_inc(&wq->pending);
160 : }
161 :
162 : /*
163 : * Hook for threshold which will be called before executing the work,
164 : * This hook is called in kthread content.
165 : * So workqueue_set_max_active is called here.
166 : */
167 4264714 : static inline void thresh_exec_hook(struct btrfs_workqueue *wq)
168 : {
169 4264714 : int new_current_active;
170 4264714 : long pending;
171 4264714 : int need_change = 0;
172 :
173 4264714 : if (wq->thresh == NO_THRESHOLD)
174 : return;
175 :
176 813994 : atomic_dec(&wq->pending);
177 813993 : spin_lock(&wq->thres_lock);
178 : /*
179 : * Use wq->count to limit the calling frequency of
180 : * workqueue_set_max_active.
181 : */
182 813995 : wq->count++;
183 813995 : wq->count %= (wq->thresh / 4);
184 813995 : if (!wq->count)
185 100900 : goto out;
186 713095 : new_current_active = wq->current_active;
187 :
188 : /*
189 : * pending may be changed later, but it's OK since we really
190 : * don't need it so accurate to calculate new_max_active.
191 : */
192 713095 : pending = atomic_read(&wq->pending);
193 713095 : if (pending > wq->thresh)
194 205251 : new_current_active++;
195 713095 : if (pending < wq->thresh / 2)
196 387983 : new_current_active--;
197 713095 : new_current_active = clamp_val(new_current_active, 1, wq->limit_active);
198 713095 : if (new_current_active != wq->current_active) {
199 62970 : need_change = 1;
200 62970 : wq->current_active = new_current_active;
201 : }
202 650125 : out:
203 813995 : spin_unlock(&wq->thres_lock);
204 :
205 813995 : if (need_change) {
206 62970 : workqueue_set_max_active(wq->normal_wq, wq->current_active);
207 : }
208 : }
209 :
210 39977 : static void run_ordered_work(struct btrfs_workqueue *wq,
211 : struct btrfs_work *self)
212 : {
213 39977 : struct list_head *list = &wq->ordered_list;
214 39977 : struct btrfs_work *work;
215 39977 : spinlock_t *lock = &wq->list_lock;
216 39977 : unsigned long flags;
217 39977 : bool free_self = false;
218 :
219 79966 : while (1) {
220 79966 : spin_lock_irqsave(lock, flags);
221 79978 : if (list_empty(list))
222 : break;
223 79363 : work = list_entry(list->next, struct btrfs_work,
224 : ordered_list);
225 79363 : if (!test_bit(WORK_DONE_BIT, &work->flags))
226 : break;
227 : /*
228 : * Orders all subsequent loads after reading WORK_DONE_BIT,
229 : * paired with the smp_mb__before_atomic in btrfs_work_helper
230 : * this guarantees that the ordered function will see all
231 : * updates from ordinary work function.
232 : */
233 49732 : smp_rmb();
234 :
235 : /*
236 : * we are going to call the ordered done function, but
237 : * we leave the work item on the list as a barrier so
238 : * that later work items that are done don't have their
239 : * functions called before this one returns
240 : */
241 49732 : if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
242 : break;
243 39989 : trace_btrfs_ordered_sched(work);
244 39989 : spin_unlock_irqrestore(lock, flags);
245 39989 : work->ordered_func(work);
246 :
247 : /* now take the lock again and drop our item from the list */
248 39989 : spin_lock_irqsave(lock, flags);
249 39989 : list_del(&work->ordered_list);
250 39989 : spin_unlock_irqrestore(lock, flags);
251 :
252 39989 : if (work == self) {
253 : /*
254 : * This is the work item that the worker is currently
255 : * executing.
256 : *
257 : * The kernel workqueue code guarantees non-reentrancy
258 : * of work items. I.e., if a work item with the same
259 : * address and work function is queued twice, the second
260 : * execution is blocked until the first one finishes. A
261 : * work item may be freed and recycled with the same
262 : * work function; the workqueue code assumes that the
263 : * original work item cannot depend on the recycled work
264 : * item in that case (see find_worker_executing_work()).
265 : *
266 : * Note that different types of Btrfs work can depend on
267 : * each other, and one type of work on one Btrfs
268 : * filesystem may even depend on the same type of work
269 : * on another Btrfs filesystem via, e.g., a loop device.
270 : * Therefore, we must not allow the current work item to
271 : * be recycled until we are really done, otherwise we
272 : * break the above assumption and can deadlock.
273 : */
274 : free_self = true;
275 : } else {
276 : /*
277 : * We don't want to call the ordered free functions with
278 : * the lock held.
279 : */
280 33588 : work->ordered_free(work);
281 : /* NB: work must not be dereferenced past this point. */
282 33588 : trace_btrfs_all_work_done(wq->fs_info, work);
283 : }
284 : }
285 39989 : spin_unlock_irqrestore(lock, flags);
286 :
287 39988 : if (free_self) {
288 6401 : self->ordered_free(self);
289 : /* NB: self must not be dereferenced past this point. */
290 6401 : trace_btrfs_all_work_done(wq->fs_info, self);
291 : }
292 39988 : }
293 :
294 4264720 : static void btrfs_work_helper(struct work_struct *normal_work)
295 : {
296 4264720 : struct btrfs_work *work = container_of(normal_work, struct btrfs_work,
297 : normal_work);
298 4264720 : struct btrfs_workqueue *wq = work->wq;
299 4264720 : int need_order = 0;
300 :
301 : /*
302 : * We should not touch things inside work in the following cases:
303 : * 1) after work->func() if it has no ordered_free
304 : * Since the struct is freed in work->func().
305 : * 2) after setting WORK_DONE_BIT
306 : * The work may be freed in other threads almost instantly.
307 : * So we save the needed things here.
308 : */
309 4264720 : if (work->ordered_func)
310 39989 : need_order = 1;
311 :
312 4264720 : trace_btrfs_work_sched(work);
313 4264714 : thresh_exec_hook(wq);
314 4264651 : work->func(work);
315 4263697 : if (need_order) {
316 : /*
317 : * Ensures all memory accesses done in the work function are
318 : * ordered before setting the WORK_DONE_BIT. Ensuring the thread
319 : * which is going to executed the ordered work sees them.
320 : * Pairs with the smp_rmb in run_ordered_work.
321 : */
322 39968 : smp_mb__before_atomic();
323 39968 : set_bit(WORK_DONE_BIT, &work->flags);
324 39979 : run_ordered_work(wq, work);
325 : } else {
326 : /* NB: work must not be dereferenced past this point. */
327 4223729 : trace_btrfs_all_work_done(wq->fs_info, work);
328 : }
329 4263130 : }
330 :
331 5855002 : void btrfs_init_work(struct btrfs_work *work, btrfs_func_t func,
332 : btrfs_func_t ordered_func, btrfs_func_t ordered_free)
333 : {
334 5855002 : work->func = func;
335 5855002 : work->ordered_func = ordered_func;
336 5855002 : work->ordered_free = ordered_free;
337 5855002 : INIT_WORK(&work->normal_work, btrfs_work_helper);
338 5855002 : INIT_LIST_HEAD(&work->ordered_list);
339 5855002 : work->flags = 0;
340 5855002 : }
341 :
342 4263582 : void btrfs_queue_work(struct btrfs_workqueue *wq, struct btrfs_work *work)
343 : {
344 4263582 : unsigned long flags;
345 :
346 4263582 : work->wq = wq;
347 4263582 : thresh_queue_hook(wq);
348 4264119 : if (work->ordered_func) {
349 39989 : spin_lock_irqsave(&wq->list_lock, flags);
350 39989 : list_add_tail(&work->ordered_list, &wq->ordered_list);
351 39989 : spin_unlock_irqrestore(&wq->list_lock, flags);
352 : }
353 4264119 : trace_btrfs_work_queued(work);
354 4263897 : queue_work(wq->normal_wq, &work->normal_work);
355 4264724 : }
356 :
357 28917 : void btrfs_destroy_workqueue(struct btrfs_workqueue *wq)
358 : {
359 28917 : if (!wq)
360 : return;
361 28917 : destroy_workqueue(wq->normal_wq);
362 28917 : trace_btrfs_workqueue_destroy(wq);
363 28917 : kfree(wq);
364 : }
365 :
366 30 : void btrfs_workqueue_set_max(struct btrfs_workqueue *wq, int limit_active)
367 : {
368 30 : if (wq)
369 30 : wq->limit_active = limit_active;
370 30 : }
371 :
372 9581 : void btrfs_flush_workqueue(struct btrfs_workqueue *wq)
373 : {
374 9581 : flush_workqueue(wq->normal_wq);
375 9581 : }
|