ceph: fix cap_snap and realm split

The cap_snap creation/queueing relies on both the current i_head_snapc
_and_ the i_snap_realm pointers being correct, so that the new cap_snap
can properly reference the old context and the new i_head_snapc can be
updated to reference the new snaprealm's context.  To fix this, we:

 - move inodes completely to the new (split) realm so that i_snap_realm
   is correct, and
 - generate the new snapc's _before_ queueing the cap_snaps in
   ceph_update_snap_trace().

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2010-09-16 16:26:51 -07:00
parent cfc0bf6640
commit ae00d4f37f
3 changed files with 33 additions and 61 deletions

View file

@ -411,8 +411,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
if (i_size < page_off + len) if (i_size < page_off + len)
len = i_size - page_off; len = i_size - page_off;
dout("writepage %p page %p index %lu on %llu~%u\n", dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
inode, page, page->index, page_off, len); inode, page, page->index, page_off, len, snapc);
writeback_stat = atomic_long_inc_return(&client->writeback_count); writeback_stat = atomic_long_inc_return(&client->writeback_count);
if (writeback_stat > if (writeback_stat >

View file

@ -119,6 +119,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
INIT_LIST_HEAD(&realm->children); INIT_LIST_HEAD(&realm->children);
INIT_LIST_HEAD(&realm->child_item); INIT_LIST_HEAD(&realm->child_item);
INIT_LIST_HEAD(&realm->empty_item); INIT_LIST_HEAD(&realm->empty_item);
INIT_LIST_HEAD(&realm->dirty_item);
INIT_LIST_HEAD(&realm->inodes_with_caps); INIT_LIST_HEAD(&realm->inodes_with_caps);
spin_lock_init(&realm->inodes_with_caps_lock); spin_lock_init(&realm->inodes_with_caps_lock);
__insert_snap_realm(&mdsc->snap_realms, realm); __insert_snap_realm(&mdsc->snap_realms, realm);
@ -604,6 +605,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm; struct ceph_snap_realm *realm;
int invalidate = 0; int invalidate = 0;
int err = -ENOMEM; int err = -ENOMEM;
LIST_HEAD(dirty_realms);
dout("update_snap_trace deletion=%d\n", deletion); dout("update_snap_trace deletion=%d\n", deletion);
more: more:
@ -626,24 +628,6 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
} }
} }
if (le64_to_cpu(ri->seq) > realm->seq) {
dout("update_snap_trace updating %llx %p %lld -> %lld\n",
realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
/*
* if the realm seq has changed, queue a cap_snap for every
* inode with open caps. we do this _before_ we update
* the realm info so that we prepare for writeback under the
* _previous_ snap context.
*
* ...unless it's a snap deletion!
*/
if (!deletion)
queue_realm_cap_snaps(realm);
} else {
dout("update_snap_trace %llx %p seq %lld unchanged\n",
realm->ino, realm, realm->seq);
}
/* ensure the parent is correct */ /* ensure the parent is correct */
err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent)); err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
if (err < 0) if (err < 0)
@ -651,6 +635,8 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
invalidate += err; invalidate += err;
if (le64_to_cpu(ri->seq) > realm->seq) { if (le64_to_cpu(ri->seq) > realm->seq) {
dout("update_snap_trace updating %llx %p %lld -> %lld\n",
realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
/* update realm parameters, snap lists */ /* update realm parameters, snap lists */
realm->seq = le64_to_cpu(ri->seq); realm->seq = le64_to_cpu(ri->seq);
realm->created = le64_to_cpu(ri->created); realm->created = le64_to_cpu(ri->created);
@ -668,9 +654,17 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
if (err < 0) if (err < 0)
goto fail; goto fail;
/* queue realm for cap_snap creation */
list_add(&realm->dirty_item, &dirty_realms);
invalidate = 1; invalidate = 1;
} else if (!realm->cached_context) { } else if (!realm->cached_context) {
dout("update_snap_trace %llx %p seq %lld new\n",
realm->ino, realm, realm->seq);
invalidate = 1; invalidate = 1;
} else {
dout("update_snap_trace %llx %p seq %lld unchanged\n",
realm->ino, realm, realm->seq);
} }
dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
@ -683,6 +677,14 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
if (invalidate) if (invalidate)
rebuild_snap_realms(realm); rebuild_snap_realms(realm);
/*
* queue cap snaps _after_ we've built the new snap contexts,
* so that i_head_snapc can be set appropriately.
*/
list_for_each_entry(realm, &dirty_realms, dirty_item) {
queue_realm_cap_snaps(realm);
}
__cleanup_empty_realms(mdsc); __cleanup_empty_realms(mdsc);
return 0; return 0;
@ -816,6 +818,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
}; };
struct inode *inode = ceph_find_inode(sb, vino); struct inode *inode = ceph_find_inode(sb, vino);
struct ceph_inode_info *ci; struct ceph_inode_info *ci;
struct ceph_snap_realm *oldrealm;
if (!inode) if (!inode)
continue; continue;
@ -841,18 +844,19 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
dout(" will move %p to split realm %llx %p\n", dout(" will move %p to split realm %llx %p\n",
inode, realm->ino, realm); inode, realm->ino, realm);
/* /*
* Remove the inode from the realm's inode * Move the inode to the new realm
* list, but don't add it to the new realm
* yet. We don't want the cap_snap to be
* queued (again) by ceph_update_snap_trace()
* below. Queue it _now_, under the old context.
*/ */
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item); list_del_init(&ci->i_snap_realm_item);
list_add(&ci->i_snap_realm_item,
&realm->inodes_with_caps);
oldrealm = ci->i_snap_realm;
ci->i_snap_realm = realm;
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
ceph_queue_cap_snap(ci); ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm);
iput(inode); iput(inode);
continue; continue;
@ -880,43 +884,9 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
ceph_update_snap_trace(mdsc, p, e, ceph_update_snap_trace(mdsc, p, e,
op == CEPH_SNAP_OP_DESTROY); op == CEPH_SNAP_OP_DESTROY);
if (op == CEPH_SNAP_OP_SPLIT) { if (op == CEPH_SNAP_OP_SPLIT)
/*
* ok, _now_ add the inodes into the new realm.
*/
for (i = 0; i < num_split_inos; i++) {
struct ceph_vino vino = {
.ino = le64_to_cpu(split_inos[i]),
.snap = CEPH_NOSNAP,
};
struct inode *inode = ceph_find_inode(sb, vino);
struct ceph_inode_info *ci;
if (!inode)
continue;
ci = ceph_inode(inode);
spin_lock(&inode->i_lock);
if (list_empty(&ci->i_snap_realm_item)) {
struct ceph_snap_realm *oldrealm =
ci->i_snap_realm;
dout(" moving %p to split realm %llx %p\n",
inode, realm->ino, realm);
spin_lock(&realm->inodes_with_caps_lock);
list_add(&ci->i_snap_realm_item,
&realm->inodes_with_caps);
ci->i_snap_realm = realm;
spin_unlock(&realm->inodes_with_caps_lock);
ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm);
}
spin_unlock(&inode->i_lock);
iput(inode);
}
/* we took a reference when we created the realm, above */ /* we took a reference when we created the realm, above */
ceph_put_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, realm);
}
__cleanup_empty_realms(mdsc); __cleanup_empty_realms(mdsc);

View file

@ -690,6 +690,8 @@ struct ceph_snap_realm {
struct list_head empty_item; /* if i have ref==0 */ struct list_head empty_item; /* if i have ref==0 */
struct list_head dirty_item; /* if realm needs new context */
/* the current set of snaps for this realm */ /* the current set of snaps for this realm */
struct ceph_snap_context *cached_context; struct ceph_snap_context *cached_context;