1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* Copyright (C) 2019 Oracle. All Rights Reserved.
* Author: Darrick J. Wong <darrick.wong@oracle.com>
*/
#include "xfs.h"
#include "xfs_fs.h"
#include "xfs_shared.h"
#include "xfs_format.h"
#include "xfs_log_format.h"
#include "xfs_trans_resv.h"
#include "xfs_mount.h"
#include "xfs_trace.h"
#include "xfs_sysctl.h"
#include "xfs_pwork.h"
#include <linux/nmi.h>
/*
* Parallel Work Queue
* ===================
*
* Abstract away the details of running a large and "obviously" parallelizable
* task across multiple CPUs. Callers initialize the pwork control object with
* a desired level of parallelization and a work function. Next, they embed
* struct xfs_pwork in whatever structure they use to pass work context to a
* worker thread and queue that pwork. The work function will be passed the
* pwork item when it is run (from process context) and any returned error will
* be recorded in xfs_pwork_ctl.error. Work functions should check for errors
* and abort if necessary; the non-zeroness of xfs_pwork_ctl.error does not
* stop workqueue item processing.
*
* This is the rough equivalent of the xfsprogs workqueue code, though we can't
* reuse that name here.
*/
/* Invoke our caller's function. */
static void
xfs_pwork_work(
struct work_struct *work)
{
struct xfs_pwork *pwork;
struct xfs_pwork_ctl *pctl;
int error;
pwork = container_of(work, struct xfs_pwork, work);
pctl = pwork->pctl;
error = pctl->work_fn(pctl->mp, pwork);
if (error && !pctl->error)
pctl->error = error;
if (atomic_dec_and_test(&pctl->nr_work))
wake_up(&pctl->poll_wait);
}
/*
* Set up control data for parallel work. @work_fn is the function that will
* be called. @tag will be written into the kernel threads. @nr_threads is
* the level of parallelism desired, or 0 for no limit.
*/
int
xfs_pwork_init(
struct xfs_mount *mp,
struct xfs_pwork_ctl *pctl,
xfs_pwork_work_fn work_fn,
const char *tag,
unsigned int nr_threads)
{
#ifdef DEBUG
if (xfs_globals.pwork_threads >= 0)
nr_threads = xfs_globals.pwork_threads;
#endif
trace_xfs_pwork_init(mp, nr_threads, current->pid);
pctl->wq = alloc_workqueue("%s-%d", WQ_FREEZABLE, nr_threads, tag,
current->pid);
if (!pctl->wq)
return -ENOMEM;
pctl->work_fn = work_fn;
pctl->error = 0;
pctl->mp = mp;
atomic_set(&pctl->nr_work, 0);
init_waitqueue_head(&pctl->poll_wait);
return 0;
}
/* Queue some parallel work. */
void
xfs_pwork_queue(
struct xfs_pwork_ctl *pctl,
struct xfs_pwork *pwork)
{
INIT_WORK(&pwork->work, xfs_pwork_work);
pwork->pctl = pctl;
atomic_inc(&pctl->nr_work);
queue_work(pctl->wq, &pwork->work);
}
/* Wait for the work to finish and tear down the control structure. */
int
xfs_pwork_destroy(
struct xfs_pwork_ctl *pctl)
{
destroy_workqueue(pctl->wq);
pctl->wq = NULL;
return pctl->error;
}
/*
* Wait for the work to finish by polling completion status and touch the soft
* lockup watchdog. This is for callers such as mount which hold locks.
*/
void
xfs_pwork_poll(
struct xfs_pwork_ctl *pctl)
{
while (wait_event_timeout(pctl->poll_wait,
atomic_read(&pctl->nr_work) == 0, HZ) == 0)
touch_softlockup_watchdog();
}
/*
* Return the amount of parallelism that the data device can handle, or 0 for
* no limit.
*/
unsigned int
xfs_pwork_guess_datadev_parallelism(
struct xfs_mount *mp)
{
struct xfs_buftarg *btp = mp->m_ddev_targp;
/*
* For now we'll go with the most conservative setting possible,
* which is two threads for an SSD and 1 thread everywhere else.
*/
return blk_queue_nonrot(btp->bt_bdev->bd_queue) ? 2 : 1;
}
|