net/ceph: make ceph_msgr_wq non-reentrant

ceph messenger code does a rather complex dancing around multithread
workqueue to make sure the same work item isn't executed concurrently
on different CPUs.  This restriction can be provided by workqueue with
WQ_NON_REENTRANT.

Make ceph_msgr_wq non-reentrant workqueue with the default concurrency
level and remove the QUEUED/BUSY logic.

* This removes backoff handling in con_work() but it couldn't reliably
  block execution of con_work() to begin with - queue_con() can be
  called after the work started but before BUSY is set.  It seems that
  it was an optimization for a rather cold path and can be safely
  removed.

* The number of concurrent work items is bound by the number of
  connections and connetions are independent from each other.  With
  the default concurrency level, different connections will be
  executed independently.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Sage Weil <sage@newdream.net>
Cc: ceph-devel@vger.kernel.org
Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Tejun Heo 2011-01-03 14:49:46 +01:00 committed by Sage Weil
parent 01e6acc4ea
commit f363e45fd1
2 changed files with 2 additions and 49 deletions

View file

@ -110,17 +110,12 @@ struct ceph_msg_pos {
/* /*
* ceph_connection state bit flags * ceph_connection state bit flags
*
* QUEUED and BUSY are used together to ensure that only a single
* thread is currently opening, reading or writing data to the socket.
*/ */
#define LOSSYTX 0 /* we can close channel or drop messages on errors */ #define LOSSYTX 0 /* we can close channel or drop messages on errors */
#define CONNECTING 1 #define CONNECTING 1
#define NEGOTIATING 2 #define NEGOTIATING 2
#define KEEPALIVE_PENDING 3 #define KEEPALIVE_PENDING 3
#define WRITE_PENDING 4 /* we have data ready to send */ #define WRITE_PENDING 4 /* we have data ready to send */
#define QUEUED 5 /* there is work queued on this connection */
#define BUSY 6 /* work is being done */
#define STANDBY 8 /* no outgoing messages, socket closed. we keep #define STANDBY 8 /* no outgoing messages, socket closed. we keep
* the ceph_connection around to maintain shared * the ceph_connection around to maintain shared
* state with the peer. */ * state with the peer. */

View file

@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq;
int ceph_msgr_init(void) int ceph_msgr_init(void)
{ {
ceph_msgr_wq = create_workqueue("ceph-msgr"); ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
if (!ceph_msgr_wq) { if (!ceph_msgr_wq) {
pr_err("msgr_init failed to create workqueue\n"); pr_err("msgr_init failed to create workqueue\n");
return -ENOMEM; return -ENOMEM;
@ -1920,20 +1920,6 @@ static int try_read(struct ceph_connection *con)
/* /*
* Atomically queue work on a connection. Bump @con reference to * Atomically queue work on a connection. Bump @con reference to
* avoid races with connection teardown. * avoid races with connection teardown.
*
* There is some trickery going on with QUEUED and BUSY because we
* only want a _single_ thread operating on each connection at any
* point in time, but we want to use all available CPUs.
*
* The worker thread only proceeds if it can atomically set BUSY. It
* clears QUEUED and does it's thing. When it thinks it's done, it
* clears BUSY, then rechecks QUEUED.. if it's set again, it loops
* (tries again to set BUSY).
*
* To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
* try to queue work. If that fails (work is already queued, or BUSY)
* we give up (work also already being done or is queued) but leave QUEUED
* set so that the worker thread will loop if necessary.
*/ */
static void queue_con(struct ceph_connection *con) static void queue_con(struct ceph_connection *con)
{ {
@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
return; return;
} }
set_bit(QUEUED, &con->state); if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
if (test_bit(BUSY, &con->state)) {
dout("queue_con %p - already BUSY\n", con);
con->ops->put(con);
} else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
dout("queue_con %p - already queued\n", con); dout("queue_con %p - already queued\n", con);
con->ops->put(con); con->ops->put(con);
} else { } else {
@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work)
{ {
struct ceph_connection *con = container_of(work, struct ceph_connection, struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work); work.work);
int backoff = 0;
more:
if (test_and_set_bit(BUSY, &con->state) != 0) {
dout("con_work %p BUSY already set\n", con);
goto out;
}
dout("con_work %p start, clearing QUEUED\n", con);
clear_bit(QUEUED, &con->state);
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
@ -1994,28 +1967,13 @@ static void con_work(struct work_struct *work)
try_read(con) < 0 || try_read(con) < 0 ||
try_write(con) < 0) { try_write(con) < 0) {
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
backoff = 1;
ceph_fault(con); /* error/fault path */ ceph_fault(con); /* error/fault path */
goto done_unlocked; goto done_unlocked;
} }
done: done:
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
done_unlocked: done_unlocked:
clear_bit(BUSY, &con->state);
dout("con->state=%lu\n", con->state);
if (test_bit(QUEUED, &con->state)) {
if (!backoff || test_bit(OPENING, &con->state)) {
dout("con_work %p QUEUED reset, looping\n", con);
goto more;
}
dout("con_work %p QUEUED reset, but just faulted\n", con);
clear_bit(QUEUED, &con->state);
}
dout("con_work %p done\n", con);
out:
con->ops->put(con); con->ops->put(con);
} }