ceph: set i_head_snapc when getting CEPH_CAP_FILE_WR reference

In most cases that snap context is needed, we are holding
reference of CEPH_CAP_FILE_WR. So we can set ceph inode's
i_head_snapc when getting the CEPH_CAP_FILE_WR reference,
and make codes get snap context from i_head_snapc. This makes
the code simpler.

Another benefit of this change is that we can handle snap
notification more elegantly. Especially when snap context
is updated while someone else is doing write. The old queue
cap_snap code may set cap_snap's context to ether the old
context or the new snap context, depending on if i_head_snapc
is set. The new queue capp_snap code always set cap_snap's
context to the old snap context.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
This commit is contained in:
Yan, Zheng 2015-04-30 14:40:54 +08:00 committed by Ilya Dryomov
parent 7b06a826e7
commit 5dda377cf0
4 changed files with 218 additions and 143 deletions

View file

@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page)
inode = mapping->host;
ci = ceph_inode(inode);
/*
* Note that we're grabbing a snapc ref here without holding
* any locks!
*/
snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
/* dirty the head */
spin_lock(&ci->i_ceph_lock);
if (ci->i_head_snapc == NULL)
ci->i_head_snapc = ceph_get_snap_context(snapc);
++ci->i_wrbuffer_ref_head;
BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
if (__ceph_have_pending_cap_snap(ci)) {
struct ceph_cap_snap *capsnap =
list_last_entry(&ci->i_cap_snaps,
struct ceph_cap_snap,
ci_item);
snapc = ceph_get_snap_context(capsnap->context);
capsnap->dirty_pages++;
} else {
BUG_ON(!ci->i_head_snapc);
snapc = ceph_get_snap_context(ci->i_head_snapc);
++ci->i_wrbuffer_ref_head;
}
if (ci->i_wrbuffer_ref == 0)
ihold(inode);
++ci->i_wrbuffer_ref;
@ -1033,7 +1037,6 @@ static int ceph_update_writeable_page(struct file *file,
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
loff_t page_off = pos & PAGE_CACHE_MASK;
int pos_in_page = pos & ~PAGE_CACHE_MASK;
int end_in_page = pos_in_page + len;
@ -1045,10 +1048,6 @@ static int ceph_update_writeable_page(struct file *file,
/* writepages currently holds page lock, but if we change that later, */
wait_on_page_writeback(page);
/* check snap context */
BUG_ON(!ci->i_snap_realm);
down_read(&mdsc->snap_rwsem);
BUG_ON(!ci->i_snap_realm->cached_context);
snapc = page_snap_context(page);
if (snapc && snapc != ci->i_head_snapc) {
/*
@ -1056,7 +1055,6 @@ static int ceph_update_writeable_page(struct file *file,
* context! is it writeable now?
*/
oldest = get_oldest_context(inode, NULL);
up_read(&mdsc->snap_rwsem);
if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
@ -1113,7 +1111,6 @@ static int ceph_update_writeable_page(struct file *file,
}
/* we need to read it. */
up_read(&mdsc->snap_rwsem);
r = readpage_nounlock(file, page);
if (r < 0)
goto fail_nosnap;
@ -1158,16 +1155,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
/*
* we don't do anything in here that simple_write_end doesn't do
* except adjust dirty page accounting and drop read lock on
* mdsc->snap_rwsem.
* except adjust dirty page accounting
*/
static int ceph_write_end(struct file *file, struct address_space *mapping,
loff_t pos, unsigned len, unsigned copied,
struct page *page, void *fsdata)
{
struct inode *inode = file_inode(file);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
unsigned from = pos & (PAGE_CACHE_SIZE - 1);
int check_cap = 0;
@ -1189,7 +1183,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
set_page_dirty(page);
unlock_page(page);
up_read(&mdsc->snap_rwsem);
page_cache_release(page);
if (check_cap)
@ -1315,7 +1308,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = file_inode(vma->vm_file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct page *page = vmf->page;
loff_t off = page_offset(page);
loff_t size = i_size_read(inode);
@ -1374,7 +1366,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
if (ret == 0) {
/* success. we'll keep the page locked. */
set_page_dirty(page);
up_read(&mdsc->snap_rwsem);
ret = VM_FAULT_LOCKED;
} else {
if (ret == -ENOMEM)

View file

@ -2073,7 +2073,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
*
* Protected by i_ceph_lock.
*/
static void __take_cap_refs(struct ceph_inode_info *ci, int got)
static void __take_cap_refs(struct ceph_inode_info *ci, int got,
bool snap_rwsem_locked)
{
if (got & CEPH_CAP_PIN)
ci->i_pin_ref++;
@ -2081,8 +2082,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
ci->i_rd_ref++;
if (got & CEPH_CAP_FILE_CACHE)
ci->i_rdcache_ref++;
if (got & CEPH_CAP_FILE_WR)
if (got & CEPH_CAP_FILE_WR) {
if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
BUG_ON(!snap_rwsem_locked);
ci->i_head_snapc = ceph_get_snap_context(
ci->i_snap_realm->cached_context);
}
ci->i_wr_ref++;
}
if (got & CEPH_CAP_FILE_BUFFER) {
if (ci->i_wb_ref == 0)
ihold(&ci->vfs_inode);
@ -2100,16 +2107,19 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS.
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, int *check_max, int *err)
loff_t endoff, bool nonblock, int *got, int *err)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
int ret = 0;
int have, implemented;
int file_wanted;
bool snap_rwsem_locked = false;
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
again:
spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
@ -2125,6 +2135,10 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
/* finish pending truncate */
while (ci->i_truncate_pending) {
spin_unlock(&ci->i_ceph_lock);
if (snap_rwsem_locked) {
up_read(&mdsc->snap_rwsem);
snap_rwsem_locked = false;
}
__ceph_do_pending_vmtruncate(inode);
spin_lock(&ci->i_ceph_lock);
}
@ -2136,7 +2150,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
inode, endoff, ci->i_max_size);
if (endoff > ci->i_requested_max_size) {
*check_max = 1;
*err = -EAGAIN;
ret = 1;
}
goto out_unlock;
@ -2164,8 +2178,29 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking));
if ((revoking & not) == 0) {
if (!snap_rwsem_locked &&
!ci->i_head_snapc &&
(need & CEPH_CAP_FILE_WR)) {
if (!down_read_trylock(&mdsc->snap_rwsem)) {
/*
* we can not call down_read() when
* task isn't in TASK_RUNNING state
*/
if (nonblock) {
*err = -EAGAIN;
ret = 1;
goto out_unlock;
}
spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
snap_rwsem_locked = true;
goto again;
}
snap_rwsem_locked = true;
}
*got = need | (have & want);
__take_cap_refs(ci, *got);
__take_cap_refs(ci, *got, true);
ret = 1;
}
} else {
@ -2189,6 +2224,8 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
}
out_unlock:
spin_unlock(&ci->i_ceph_lock);
if (snap_rwsem_locked)
up_read(&mdsc->snap_rwsem);
dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got));
@ -2231,54 +2268,70 @@ static void check_max_size(struct inode *inode, loff_t endoff)
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
int _got, check_max, ret, err = 0;
int _got, ret, err = 0;
ret = ceph_pool_perm_check(ci, need);
if (ret < 0)
return ret;
retry:
if (endoff > 0)
check_max_size(&ci->vfs_inode, endoff);
_got = 0;
check_max = 0;
ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff,
&_got, &check_max, &err));
if (err)
ret = err;
if (ret < 0)
return ret;
while (true) {
if (endoff > 0)
check_max_size(&ci->vfs_inode, endoff);
if (check_max)
goto retry;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(&ci->vfs_inode) > 0) {
struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
goto out;
}
page_cache_release(page);
}
/*
* drop cap refs first because getattr while holding
* caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
err = 0;
_got = 0;
ret = try_get_cap_refs(ci, need, want, endoff,
false, &_got, &err);
if (ret) {
if (err == -EAGAIN)
continue;
if (err < 0)
return err;
} else {
ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff,
true, &_got, &err));
if (err == -EAGAIN)
continue;
if (err < 0)
ret = err;
if (ret < 0)
return ret;
}
/* getattr request will bring inline data into page cache */
ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret < 0)
return ret;
goto retry;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(&ci->vfs_inode) > 0) {
struct page *page =
find_get_page(ci->vfs_inode.i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
break;
}
page_cache_release(page);
}
/*
* drop cap refs first because getattr while
* holding * caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
_got = 0;
/*
* getattr request will bring inline data into
* page cache
*/
ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
CEPH_STAT_CAP_INLINE_DATA,
true);
if (ret < 0)
return ret;
continue;
}
break;
}
out:
*got = _got;
return 0;
}
@ -2290,7 +2343,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
{
spin_lock(&ci->i_ceph_lock);
__take_cap_refs(ci, caps);
__take_cap_refs(ci, caps, false);
spin_unlock(&ci->i_ceph_lock);
}
@ -2341,6 +2394,13 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
wake = 1;
}
}
if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) {
BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
}
/* see comment in __ceph_remove_cap() */
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
drop_inode_snap_realm(ci);
@ -2384,7 +2444,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (ci->i_head_snapc == snapc) {
ci->i_wrbuffer_ref_head -= nr;
if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
ci->i_wr_ref == 0 &&
ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) {
BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
@ -2775,7 +2837,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
dout(" inode %p now clean\n", inode);
BUG_ON(!list_empty(&ci->i_dirty_item));
drop = 1;
if (ci->i_wrbuffer_ref_head == 0) {
if (ci->i_wr_ref == 0 &&
ci->i_wrbuffer_ref_head == 0) {
BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;

View file

@ -557,13 +557,13 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
* objects, rollback on failure, etc.)
*/
static ssize_t
ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
struct ceph_snap_context *snapc)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_snap_context *snapc;
struct ceph_vino vino;
struct ceph_osd_request *req;
struct page **pages;
@ -600,7 +600,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
size_t start;
ssize_t n;
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 0,
@ -674,13 +673,13 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
* objects, rollback on failure, etc.)
*/
static ssize_t
ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
struct ceph_snap_context *snapc)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_snap_context *snapc;
struct ceph_vino vino;
struct ceph_osd_request *req;
struct page **pages;
@ -717,7 +716,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
size_t left;
int n;
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 0, 1,
@ -996,14 +994,30 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
struct ceph_snap_context *snapc;
struct iov_iter data;
mutex_unlock(&inode->i_mutex);
spin_lock(&ci->i_ceph_lock);
if (__ceph_have_pending_cap_snap(ci)) {
struct ceph_cap_snap *capsnap =
list_last_entry(&ci->i_cap_snaps,
struct ceph_cap_snap,
ci_item);
snapc = ceph_get_snap_context(capsnap->context);
} else {
BUG_ON(!ci->i_head_snapc);
snapc = ceph_get_snap_context(ci->i_head_snapc);
}
spin_unlock(&ci->i_ceph_lock);
/* we might need to revert back to that point */
data = *from;
if (iocb->ki_flags & IOCB_DIRECT)
written = ceph_sync_direct_write(iocb, &data, pos);
written = ceph_sync_direct_write(iocb, &data, pos,
snapc);
else
written = ceph_sync_write(iocb, &data, pos);
written = ceph_sync_write(iocb, &data, pos, snapc);
if (written == -EOLDSNAPC) {
dout("aio_write %p %llx.%llx %llu~%u"
"got EOLDSNAPC, retrying\n",
@ -1014,6 +1028,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
}
if (written > 0)
iov_iter_advance(from, written);
ceph_put_snap_context(snapc);
} else {
loff_t old_size = inode->i_size;
/*

View file

@ -455,6 +455,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap;
struct ceph_snap_context *old_snapc;
int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
@ -467,6 +468,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci);
old_snapc = ci->i_head_snapc;
/*
* If there is a write in progress, treat that as a dirty Fw,
* even though it hasn't completed yet; by the time we finish
@ -481,76 +484,79 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
writes in progress now were started before the previous
cap_snap. lucky us. */
dout("queue_cap_snap %p already pending\n", inode);
kfree(capsnap);
} else if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
goto update_snapc;
}
if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
dout("queue_cap_snap %p empty snapc\n", inode);
kfree(capsnap);
} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
struct ceph_snap_context *snapc = ci->i_head_snapc;
/*
* if we are a sync write, we may need to go to the snaprealm
* to get the current snapc.
*/
if (!snapc)
snapc = ci->i_snap_realm->cached_context;
dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
inode, capsnap, snapc, ceph_cap_string(dirty));
ihold(inode);
atomic_set(&capsnap->nref, 1);
capsnap->ci = ci;
INIT_LIST_HEAD(&capsnap->ci_item);
INIT_LIST_HEAD(&capsnap->flushing_item);
capsnap->follows = snapc->seq;
capsnap->issued = __ceph_caps_issued(ci, NULL);
capsnap->dirty = dirty;
capsnap->mode = inode->i_mode;
capsnap->uid = inode->i_uid;
capsnap->gid = inode->i_gid;
if (dirty & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
capsnap->xattr_blob =
ceph_buffer_get(ci->i_xattrs.blob);
capsnap->xattr_version = ci->i_xattrs.version;
} else {
capsnap->xattr_blob = NULL;
capsnap->xattr_version = 0;
}
capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
/* dirty page count moved from _head to this cap_snap;
all subsequent writes page dirties occur _after_ this
snapshot. */
capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
ci->i_wrbuffer_ref_head = 0;
capsnap->context = snapc;
ci->i_head_snapc =
ceph_get_snap_context(ci->i_snap_realm->cached_context);
dout(" new snapc is %p\n", ci->i_head_snapc);
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
if (used & CEPH_CAP_FILE_WR) {
dout("queue_cap_snap %p cap_snap %p snapc %p"
" seq %llu used WR, now pending\n", inode,
capsnap, snapc, snapc->seq);
capsnap->writing = 1;
} else {
/* note mtime, size NOW. */
__ceph_finish_cap_snap(ci, capsnap);
}
} else {
goto update_snapc;
}
if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
dout("queue_cap_snap %p nothing dirty|writing\n", inode);
kfree(capsnap);
goto update_snapc;
}
BUG_ON(!old_snapc);
dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
inode, capsnap, old_snapc, ceph_cap_string(dirty));
ihold(inode);
atomic_set(&capsnap->nref, 1);
capsnap->ci = ci;
INIT_LIST_HEAD(&capsnap->ci_item);
INIT_LIST_HEAD(&capsnap->flushing_item);
capsnap->follows = old_snapc->seq;
capsnap->issued = __ceph_caps_issued(ci, NULL);
capsnap->dirty = dirty;
capsnap->mode = inode->i_mode;
capsnap->uid = inode->i_uid;
capsnap->gid = inode->i_gid;
if (dirty & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
capsnap->xattr_blob =
ceph_buffer_get(ci->i_xattrs.blob);
capsnap->xattr_version = ci->i_xattrs.version;
} else {
capsnap->xattr_blob = NULL;
capsnap->xattr_version = 0;
}
capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
/* dirty page count moved from _head to this cap_snap;
all subsequent writes page dirties occur _after_ this
snapshot. */
capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
ci->i_wrbuffer_ref_head = 0;
capsnap->context = old_snapc;
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
old_snapc = NULL;
if (used & CEPH_CAP_FILE_WR) {
dout("queue_cap_snap %p cap_snap %p snapc %p"
" seq %llu used WR, now pending\n", inode,
capsnap, old_snapc, old_snapc->seq);
capsnap->writing = 1;
} else {
/* note mtime, size NOW. */
__ceph_finish_cap_snap(ci, capsnap);
}
capsnap = NULL;
update_snapc:
if (ci->i_head_snapc) {
ci->i_head_snapc = ceph_get_snap_context(
ci->i_snap_realm->cached_context);
dout(" new snapc is %p\n", ci->i_head_snapc);
}
spin_unlock(&ci->i_ceph_lock);
kfree(capsnap);
ceph_put_snap_context(old_snapc);
}
/*