dlm: recover locks waiting for overlap replies

When recovery looks at locks waiting for replies, it fails to consider
locks that have already received a reply for their first remote operation,
but not received a reply for secondary, overlapping unlock/cancel.  The
appropriate stub reply needs to be called for these waiters.

Appears when we start doing recovery in the presence of a many overlapping
unlock/cancel ops.

Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
David Teigland 2008-01-07 16:15:05 -06:00
parent 8a358ca8e7
commit 601342ce02

View file

@ -3846,6 +3846,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
void dlm_recover_waiters_pre(struct dlm_ls *ls) void dlm_recover_waiters_pre(struct dlm_ls *ls)
{ {
struct dlm_lkb *lkb, *safe; struct dlm_lkb *lkb, *safe;
int wait_type, stub_unlock_result, stub_cancel_result;
mutex_lock(&ls->ls_waiters_mutex); mutex_lock(&ls->ls_waiters_mutex);
@ -3864,7 +3865,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
if (!waiter_needs_recovery(ls, lkb)) if (!waiter_needs_recovery(ls, lkb))
continue; continue;
switch (lkb->lkb_wait_type) { wait_type = lkb->lkb_wait_type;
stub_unlock_result = -DLM_EUNLOCK;
stub_cancel_result = -DLM_ECANCEL;
/* Main reply may have been received leaving a zero wait_type,
but a reply for the overlapping op may not have been
received. In that case we need to fake the appropriate
reply for the overlap op. */
if (!wait_type) {
if (is_overlap_cancel(lkb)) {
wait_type = DLM_MSG_CANCEL;
if (lkb->lkb_grmode == DLM_LOCK_IV)
stub_cancel_result = 0;
}
if (is_overlap_unlock(lkb)) {
wait_type = DLM_MSG_UNLOCK;
if (lkb->lkb_grmode == DLM_LOCK_IV)
stub_unlock_result = -ENOENT;
}
log_debug(ls, "rwpre overlap %x %x %d %d %d",
lkb->lkb_id, lkb->lkb_flags, wait_type,
stub_cancel_result, stub_unlock_result);
}
switch (wait_type) {
case DLM_MSG_REQUEST: case DLM_MSG_REQUEST:
lkb->lkb_flags |= DLM_IFL_RESEND; lkb->lkb_flags |= DLM_IFL_RESEND;
@ -3877,7 +3904,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
case DLM_MSG_UNLOCK: case DLM_MSG_UNLOCK:
hold_lkb(lkb); hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
ls->ls_stub_ms.m_result = -DLM_EUNLOCK; ls->ls_stub_ms.m_result = stub_unlock_result;
ls->ls_stub_ms.m_flags = lkb->lkb_flags; ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_receive_unlock_reply(lkb, &ls->ls_stub_ms); _receive_unlock_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
@ -3886,15 +3913,15 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
case DLM_MSG_CANCEL: case DLM_MSG_CANCEL:
hold_lkb(lkb); hold_lkb(lkb);
ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
ls->ls_stub_ms.m_result = -DLM_ECANCEL; ls->ls_stub_ms.m_result = stub_cancel_result;
ls->ls_stub_ms.m_flags = lkb->lkb_flags; ls->ls_stub_ms.m_flags = lkb->lkb_flags;
_receive_cancel_reply(lkb, &ls->ls_stub_ms); _receive_cancel_reply(lkb, &ls->ls_stub_ms);
dlm_put_lkb(lkb); dlm_put_lkb(lkb);
break; break;
default: default:
log_error(ls, "invalid lkb wait_type %d", log_error(ls, "invalid lkb wait_type %d %d",
lkb->lkb_wait_type); lkb->lkb_wait_type, wait_type);
} }
schedule(); schedule();
} }