Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2

* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec/ocfs2: (31 commits)
  ocfs2: avoid unaligned access to dqc_bitmap
  ocfs2: Use filemap_write_and_wait() instead of write_inode_now()
  ocfs2: honor O_(D)SYNC flag in fallocate
  ocfs2: Add a missing journal credit in ocfs2_link_credits() -v2
  ocfs2: send correct UUID to cleancache initialization
  ocfs2: Commit transactions in error cases -v2
  ocfs2: make direntry invalid when deleting it
  fs/ocfs2/dlm/dlmlock.c: free kmem_cache_zalloc'd data using kmem_cache_free
  ocfs2: Avoid livelock in ocfs2_readpage()
  ocfs2: serialize unaligned aio
  ocfs2: Implement llseek()
  ocfs2: Fix ocfs2_page_mkwrite()
  ocfs2: Add comment about orphan scanning
  ocfs2: Clean up messages in the fs
  ocfs2/cluster: Cluster up now includes network connections too
  ocfs2/cluster: Add new function o2net_fill_node_map()
  ocfs2/cluster: Fix output in file elapsed_time_in_ms
  ocfs2/dlm: dlmlock_remote() needs to account for remastery
  ocfs2/dlm: Take inflight reference count for remotely mastered resources too
  ocfs2/dlm: Cleanup dlm_wait_for_node_death() and dlm_wait_for_node_recovery()
  ...
This commit is contained in:
Linus Torvalds 2011-12-01 14:55:34 -08:00
commit 0a4ebed781
31 changed files with 999 additions and 540 deletions

View file

@ -5699,7 +5699,7 @@ int ocfs2_remove_btree_range(struct inode *inode,
OCFS2_JOURNAL_ACCESS_WRITE); OCFS2_JOURNAL_ACCESS_WRITE);
if (ret) { if (ret) {
mlog_errno(ret); mlog_errno(ret);
goto out; goto out_commit;
} }
dquot_free_space_nodirty(inode, dquot_free_space_nodirty(inode,

View file

@ -290,7 +290,15 @@ static int ocfs2_readpage(struct file *file, struct page *page)
} }
if (down_read_trylock(&oi->ip_alloc_sem) == 0) { if (down_read_trylock(&oi->ip_alloc_sem) == 0) {
/*
* Unlock the page and cycle ip_alloc_sem so that we don't
* busyloop waiting for ip_alloc_sem to unlock
*/
ret = AOP_TRUNCATED_PAGE; ret = AOP_TRUNCATED_PAGE;
unlock_page(page);
unlock = 0;
down_read(&oi->ip_alloc_sem);
up_read(&oi->ip_alloc_sem);
goto out_inode_unlock; goto out_inode_unlock;
} }
@ -563,6 +571,7 @@ static void ocfs2_dio_end_io(struct kiocb *iocb,
{ {
struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode; struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode;
int level; int level;
wait_queue_head_t *wq = ocfs2_ioend_wq(inode);
/* this io's submitter should not have unlocked this before we could */ /* this io's submitter should not have unlocked this before we could */
BUG_ON(!ocfs2_iocb_is_rw_locked(iocb)); BUG_ON(!ocfs2_iocb_is_rw_locked(iocb));
@ -570,6 +579,15 @@ static void ocfs2_dio_end_io(struct kiocb *iocb,
if (ocfs2_iocb_is_sem_locked(iocb)) if (ocfs2_iocb_is_sem_locked(iocb))
ocfs2_iocb_clear_sem_locked(iocb); ocfs2_iocb_clear_sem_locked(iocb);
if (ocfs2_iocb_is_unaligned_aio(iocb)) {
ocfs2_iocb_clear_unaligned_aio(iocb);
if (atomic_dec_and_test(&OCFS2_I(inode)->ip_unaligned_aio) &&
waitqueue_active(wq)) {
wake_up_all(wq);
}
}
ocfs2_iocb_clear_rw_locked(iocb); ocfs2_iocb_clear_rw_locked(iocb);
level = ocfs2_iocb_rw_locked_level(iocb); level = ocfs2_iocb_rw_locked_level(iocb);
@ -862,6 +880,12 @@ struct ocfs2_write_ctxt {
struct page *w_pages[OCFS2_MAX_CTXT_PAGES]; struct page *w_pages[OCFS2_MAX_CTXT_PAGES];
struct page *w_target_page; struct page *w_target_page;
/*
* w_target_locked is used for page_mkwrite path indicating no unlocking
* against w_target_page in ocfs2_write_end_nolock.
*/
unsigned int w_target_locked:1;
/* /*
* ocfs2_write_end() uses this to know what the real range to * ocfs2_write_end() uses this to know what the real range to
* write in the target should be. * write in the target should be.
@ -895,6 +919,24 @@ void ocfs2_unlock_and_free_pages(struct page **pages, int num_pages)
static void ocfs2_free_write_ctxt(struct ocfs2_write_ctxt *wc) static void ocfs2_free_write_ctxt(struct ocfs2_write_ctxt *wc)
{ {
int i;
/*
* w_target_locked is only set to true in the page_mkwrite() case.
* The intent is to allow us to lock the target page from write_begin()
* to write_end(). The caller must hold a ref on w_target_page.
*/
if (wc->w_target_locked) {
BUG_ON(!wc->w_target_page);
for (i = 0; i < wc->w_num_pages; i++) {
if (wc->w_target_page == wc->w_pages[i]) {
wc->w_pages[i] = NULL;
break;
}
}
mark_page_accessed(wc->w_target_page);
page_cache_release(wc->w_target_page);
}
ocfs2_unlock_and_free_pages(wc->w_pages, wc->w_num_pages); ocfs2_unlock_and_free_pages(wc->w_pages, wc->w_num_pages);
brelse(wc->w_di_bh); brelse(wc->w_di_bh);
@ -1132,20 +1174,17 @@ static int ocfs2_grab_pages_for_write(struct address_space *mapping,
*/ */
lock_page(mmap_page); lock_page(mmap_page);
/* Exit and let the caller retry */
if (mmap_page->mapping != mapping) { if (mmap_page->mapping != mapping) {
WARN_ON(mmap_page->mapping);
unlock_page(mmap_page); unlock_page(mmap_page);
/* ret = -EAGAIN;
* Sanity check - the locking in
* ocfs2_pagemkwrite() should ensure
* that this code doesn't trigger.
*/
ret = -EINVAL;
mlog_errno(ret);
goto out; goto out;
} }
page_cache_get(mmap_page); page_cache_get(mmap_page);
wc->w_pages[i] = mmap_page; wc->w_pages[i] = mmap_page;
wc->w_target_locked = true;
} else { } else {
wc->w_pages[i] = find_or_create_page(mapping, index, wc->w_pages[i] = find_or_create_page(mapping, index,
GFP_NOFS); GFP_NOFS);
@ -1160,6 +1199,8 @@ static int ocfs2_grab_pages_for_write(struct address_space *mapping,
wc->w_target_page = wc->w_pages[i]; wc->w_target_page = wc->w_pages[i];
} }
out: out:
if (ret)
wc->w_target_locked = false;
return ret; return ret;
} }
@ -1817,11 +1858,23 @@ int ocfs2_write_begin_nolock(struct file *filp,
*/ */
ret = ocfs2_grab_pages_for_write(mapping, wc, wc->w_cpos, pos, len, ret = ocfs2_grab_pages_for_write(mapping, wc, wc->w_cpos, pos, len,
cluster_of_pages, mmap_page); cluster_of_pages, mmap_page);
if (ret) { if (ret && ret != -EAGAIN) {
mlog_errno(ret); mlog_errno(ret);
goto out_quota; goto out_quota;
} }
/*
* ocfs2_grab_pages_for_write() returns -EAGAIN if it could not lock
* the target page. In this case, we exit with no error and no target
* page. This will trigger the caller, page_mkwrite(), to re-try
* the operation.
*/
if (ret == -EAGAIN) {
BUG_ON(wc->w_target_page);
ret = 0;
goto out_quota;
}
ret = ocfs2_write_cluster_by_desc(mapping, data_ac, meta_ac, wc, pos, ret = ocfs2_write_cluster_by_desc(mapping, data_ac, meta_ac, wc, pos,
len); len);
if (ret) { if (ret) {

View file

@ -78,6 +78,7 @@ enum ocfs2_iocb_lock_bits {
OCFS2_IOCB_RW_LOCK = 0, OCFS2_IOCB_RW_LOCK = 0,
OCFS2_IOCB_RW_LOCK_LEVEL, OCFS2_IOCB_RW_LOCK_LEVEL,
OCFS2_IOCB_SEM, OCFS2_IOCB_SEM,
OCFS2_IOCB_UNALIGNED_IO,
OCFS2_IOCB_NUM_LOCKS OCFS2_IOCB_NUM_LOCKS
}; };
@ -91,4 +92,17 @@ enum ocfs2_iocb_lock_bits {
clear_bit(OCFS2_IOCB_SEM, (unsigned long *)&iocb->private) clear_bit(OCFS2_IOCB_SEM, (unsigned long *)&iocb->private)
#define ocfs2_iocb_is_sem_locked(iocb) \ #define ocfs2_iocb_is_sem_locked(iocb) \
test_bit(OCFS2_IOCB_SEM, (unsigned long *)&iocb->private) test_bit(OCFS2_IOCB_SEM, (unsigned long *)&iocb->private)
#define ocfs2_iocb_set_unaligned_aio(iocb) \
set_bit(OCFS2_IOCB_UNALIGNED_IO, (unsigned long *)&iocb->private)
#define ocfs2_iocb_clear_unaligned_aio(iocb) \
clear_bit(OCFS2_IOCB_UNALIGNED_IO, (unsigned long *)&iocb->private)
#define ocfs2_iocb_is_unaligned_aio(iocb) \
test_bit(OCFS2_IOCB_UNALIGNED_IO, (unsigned long *)&iocb->private)
#define OCFS2_IOEND_WQ_HASH_SZ 37
#define ocfs2_ioend_wq(v) (&ocfs2__ioend_wq[((unsigned long)(v)) %\
OCFS2_IOEND_WQ_HASH_SZ])
extern wait_queue_head_t ocfs2__ioend_wq[OCFS2_IOEND_WQ_HASH_SZ];
#endif /* OCFS2_FILE_H */ #endif /* OCFS2_FILE_H */

View file

@ -216,6 +216,7 @@ struct o2hb_region {
struct list_head hr_all_item; struct list_head hr_all_item;
unsigned hr_unclean_stop:1, unsigned hr_unclean_stop:1,
hr_aborted_start:1,
hr_item_pinned:1, hr_item_pinned:1,
hr_item_dropped:1; hr_item_dropped:1;
@ -254,6 +255,10 @@ struct o2hb_region {
* a more complete api that doesn't lead to this sort of fragility. */ * a more complete api that doesn't lead to this sort of fragility. */
atomic_t hr_steady_iterations; atomic_t hr_steady_iterations;
/* terminate o2hb thread if it does not reach steady state
* (hr_steady_iterations == 0) within hr_unsteady_iterations */
atomic_t hr_unsteady_iterations;
char hr_dev_name[BDEVNAME_SIZE]; char hr_dev_name[BDEVNAME_SIZE];
unsigned int hr_timeout_ms; unsigned int hr_timeout_ms;
@ -324,6 +329,10 @@ static void o2hb_write_timeout(struct work_struct *work)
static void o2hb_arm_write_timeout(struct o2hb_region *reg) static void o2hb_arm_write_timeout(struct o2hb_region *reg)
{ {
/* Arm writeout only after thread reaches steady state */
if (atomic_read(&reg->hr_steady_iterations) != 0)
return;
mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n", mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
O2HB_MAX_WRITE_TIMEOUT_MS); O2HB_MAX_WRITE_TIMEOUT_MS);
@ -537,9 +546,14 @@ static int o2hb_verify_crc(struct o2hb_region *reg,
return read == computed; return read == computed;
} }
/* We want to make sure that nobody is heartbeating on top of us -- /*
* this will help detect an invalid configuration. */ * Compare the slot data with what we wrote in the last iteration.
static void o2hb_check_last_timestamp(struct o2hb_region *reg) * If the match fails, print an appropriate error message. This is to
* detect errors like... another node hearting on the same slot,
* flaky device that is losing writes, etc.
* Returns 1 if check succeeds, 0 otherwise.
*/
static int o2hb_check_own_slot(struct o2hb_region *reg)
{ {
struct o2hb_disk_slot *slot; struct o2hb_disk_slot *slot;
struct o2hb_disk_heartbeat_block *hb_block; struct o2hb_disk_heartbeat_block *hb_block;
@ -548,13 +562,13 @@ static void o2hb_check_last_timestamp(struct o2hb_region *reg)
slot = &reg->hr_slots[o2nm_this_node()]; slot = &reg->hr_slots[o2nm_this_node()];
/* Don't check on our 1st timestamp */ /* Don't check on our 1st timestamp */
if (!slot->ds_last_time) if (!slot->ds_last_time)
return; return 0;
hb_block = slot->ds_raw_block; hb_block = slot->ds_raw_block;
if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time && if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation && le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
hb_block->hb_node == slot->ds_node_num) hb_block->hb_node == slot->ds_node_num)
return; return 1;
#define ERRSTR1 "Another node is heartbeating on device" #define ERRSTR1 "Another node is heartbeating on device"
#define ERRSTR2 "Heartbeat generation mismatch on device" #define ERRSTR2 "Heartbeat generation mismatch on device"
@ -574,6 +588,8 @@ static void o2hb_check_last_timestamp(struct o2hb_region *reg)
(unsigned long long)slot->ds_last_time, hb_block->hb_node, (unsigned long long)slot->ds_last_time, hb_block->hb_node,
(unsigned long long)le64_to_cpu(hb_block->hb_generation), (unsigned long long)le64_to_cpu(hb_block->hb_generation),
(unsigned long long)le64_to_cpu(hb_block->hb_seq)); (unsigned long long)le64_to_cpu(hb_block->hb_seq));
return 0;
} }
static inline void o2hb_prepare_block(struct o2hb_region *reg, static inline void o2hb_prepare_block(struct o2hb_region *reg,
@ -719,17 +735,24 @@ static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
o2nm_node_put(node); o2nm_node_put(node);
} }
static void o2hb_set_quorum_device(struct o2hb_region *reg, static void o2hb_set_quorum_device(struct o2hb_region *reg)
struct o2hb_disk_slot *slot)
{ {
assert_spin_locked(&o2hb_live_lock);
if (!o2hb_global_heartbeat_active()) if (!o2hb_global_heartbeat_active())
return; return;
if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) /* Prevent race with o2hb_heartbeat_group_drop_item() */
if (kthread_should_stop())
return; return;
/* Tag region as quorum only after thread reaches steady state */
if (atomic_read(&reg->hr_steady_iterations) != 0)
return;
spin_lock(&o2hb_live_lock);
if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
goto unlock;
/* /*
* A region can be added to the quorum only when it sees all * A region can be added to the quorum only when it sees all
* live nodes heartbeat on it. In other words, the region has been * live nodes heartbeat on it. In other words, the region has been
@ -737,13 +760,10 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg,
*/ */
if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap, if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
sizeof(o2hb_live_node_bitmap))) sizeof(o2hb_live_node_bitmap)))
return; goto unlock;
if (slot->ds_changed_samples < O2HB_LIVE_THRESHOLD) printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
return; config_item_name(&reg->hr_item), reg->hr_dev_name);
printk(KERN_NOTICE "o2hb: Region %s is now a quorum device\n",
config_item_name(&reg->hr_item));
set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
@ -754,6 +774,8 @@ static void o2hb_set_quorum_device(struct o2hb_region *reg,
if (o2hb_pop_count(&o2hb_quorum_region_bitmap, if (o2hb_pop_count(&o2hb_quorum_region_bitmap,
O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF) O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
o2hb_region_unpin(NULL); o2hb_region_unpin(NULL);
unlock:
spin_unlock(&o2hb_live_lock);
} }
static int o2hb_check_slot(struct o2hb_region *reg, static int o2hb_check_slot(struct o2hb_region *reg,
@ -925,8 +947,6 @@ static int o2hb_check_slot(struct o2hb_region *reg,
slot->ds_equal_samples = 0; slot->ds_equal_samples = 0;
} }
out: out:
o2hb_set_quorum_device(reg, slot);
spin_unlock(&o2hb_live_lock); spin_unlock(&o2hb_live_lock);
o2hb_run_event_list(&event); o2hb_run_event_list(&event);
@ -957,7 +977,8 @@ static int o2hb_highest_node(unsigned long *nodes,
static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
{ {
int i, ret, highest_node, change = 0; int i, ret, highest_node;
int membership_change = 0, own_slot_ok = 0;
unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
struct o2hb_bio_wait_ctxt write_wc; struct o2hb_bio_wait_ctxt write_wc;
@ -966,7 +987,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
sizeof(configured_nodes)); sizeof(configured_nodes));
if (ret) { if (ret) {
mlog_errno(ret); mlog_errno(ret);
return ret; goto bail;
} }
/* /*
@ -982,8 +1003,9 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
if (highest_node >= O2NM_MAX_NODES) { if (highest_node >= O2NM_MAX_NODES) {
mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
return -EINVAL; ret = -EINVAL;
goto bail;
} }
/* No sense in reading the slots of nodes that don't exist /* No sense in reading the slots of nodes that don't exist
@ -993,29 +1015,27 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
ret = o2hb_read_slots(reg, highest_node + 1); ret = o2hb_read_slots(reg, highest_node + 1);
if (ret < 0) { if (ret < 0) {
mlog_errno(ret); mlog_errno(ret);
return ret; goto bail;
} }
/* With an up to date view of the slots, we can check that no /* With an up to date view of the slots, we can check that no
* other node has been improperly configured to heartbeat in * other node has been improperly configured to heartbeat in
* our slot. */ * our slot. */
o2hb_check_last_timestamp(reg); own_slot_ok = o2hb_check_own_slot(reg);
/* fill in the proper info for our next heartbeat */ /* fill in the proper info for our next heartbeat */
o2hb_prepare_block(reg, reg->hr_generation); o2hb_prepare_block(reg, reg->hr_generation);
/* And fire off the write. Note that we don't wait on this I/O
* until later. */
ret = o2hb_issue_node_write(reg, &write_wc); ret = o2hb_issue_node_write(reg, &write_wc);
if (ret < 0) { if (ret < 0) {
mlog_errno(ret); mlog_errno(ret);
return ret; goto bail;
} }
i = -1; i = -1;
while((i = find_next_bit(configured_nodes, while((i = find_next_bit(configured_nodes,
O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
change |= o2hb_check_slot(reg, &reg->hr_slots[i]); membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
} }
/* /*
@ -1030,18 +1050,39 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
* disk */ * disk */
mlog(ML_ERROR, "Write error %d on device \"%s\"\n", mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
write_wc.wc_error, reg->hr_dev_name); write_wc.wc_error, reg->hr_dev_name);
return write_wc.wc_error; ret = write_wc.wc_error;
goto bail;
} }
o2hb_arm_write_timeout(reg); /* Skip disarming the timeout if own slot has stale/bad data */
if (own_slot_ok) {
o2hb_set_quorum_device(reg);
o2hb_arm_write_timeout(reg);
}
bail:
/* let the person who launched us know when things are steady */ /* let the person who launched us know when things are steady */
if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) { if (atomic_read(&reg->hr_steady_iterations) != 0) {
if (atomic_dec_and_test(&reg->hr_steady_iterations)) if (!ret && own_slot_ok && !membership_change) {
wake_up(&o2hb_steady_queue); if (atomic_dec_and_test(&reg->hr_steady_iterations))
wake_up(&o2hb_steady_queue);
}
} }
return 0; if (atomic_read(&reg->hr_steady_iterations) != 0) {
if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
printk(KERN_NOTICE "o2hb: Unable to stabilize "
"heartbeart on region %s (%s)\n",
config_item_name(&reg->hr_item),
reg->hr_dev_name);
atomic_set(&reg->hr_steady_iterations, 0);
reg->hr_aborted_start = 1;
wake_up(&o2hb_steady_queue);
ret = -EIO;
}
}
return ret;
} }
/* Subtract b from a, storing the result in a. a *must* have a larger /* Subtract b from a, storing the result in a. a *must* have a larger
@ -1095,7 +1136,8 @@ static int o2hb_thread(void *data)
/* Pin node */ /* Pin node */
o2nm_depend_this_node(); o2nm_depend_this_node();
while (!kthread_should_stop() && !reg->hr_unclean_stop) { while (!kthread_should_stop() &&
!reg->hr_unclean_stop && !reg->hr_aborted_start) {
/* We track the time spent inside /* We track the time spent inside
* o2hb_do_disk_heartbeat so that we avoid more than * o2hb_do_disk_heartbeat so that we avoid more than
* hr_timeout_ms between disk writes. On busy systems * hr_timeout_ms between disk writes. On busy systems
@ -1103,10 +1145,7 @@ static int o2hb_thread(void *data)
* likely to time itself out. */ * likely to time itself out. */
do_gettimeofday(&before_hb); do_gettimeofday(&before_hb);
i = 0; ret = o2hb_do_disk_heartbeat(reg);
do {
ret = o2hb_do_disk_heartbeat(reg);
} while (ret && ++i < 2);
do_gettimeofday(&after_hb); do_gettimeofday(&after_hb);
elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
@ -1117,7 +1156,8 @@ static int o2hb_thread(void *data)
after_hb.tv_sec, (unsigned long) after_hb.tv_usec, after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
elapsed_msec); elapsed_msec);
if (elapsed_msec < reg->hr_timeout_ms) { if (!kthread_should_stop() &&
elapsed_msec < reg->hr_timeout_ms) {
/* the kthread api has blocked signals for us so no /* the kthread api has blocked signals for us so no
* need to record the return value. */ * need to record the return value. */
msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
@ -1134,20 +1174,20 @@ static int o2hb_thread(void *data)
* to timeout on this region when we could just as easily * to timeout on this region when we could just as easily
* write a clear generation - thus indicating to them that * write a clear generation - thus indicating to them that
* this node has left this region. * this node has left this region.
* */
* XXX: Should we skip this on unclean_stop? */ if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
o2hb_prepare_block(reg, 0); o2hb_prepare_block(reg, 0);
ret = o2hb_issue_node_write(reg, &write_wc); ret = o2hb_issue_node_write(reg, &write_wc);
if (ret == 0) { if (ret == 0)
o2hb_wait_on_io(reg, &write_wc); o2hb_wait_on_io(reg, &write_wc);
} else { else
mlog_errno(ret); mlog_errno(ret);
} }
/* Unpin node */ /* Unpin node */
o2nm_undepend_this_node(); o2nm_undepend_this_node();
mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
return 0; return 0;
} }
@ -1158,6 +1198,7 @@ static int o2hb_debug_open(struct inode *inode, struct file *file)
struct o2hb_debug_buf *db = inode->i_private; struct o2hb_debug_buf *db = inode->i_private;
struct o2hb_region *reg; struct o2hb_region *reg;
unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long lts;
char *buf = NULL; char *buf = NULL;
int i = -1; int i = -1;
int out = 0; int out = 0;
@ -1194,9 +1235,11 @@ static int o2hb_debug_open(struct inode *inode, struct file *file)
case O2HB_DB_TYPE_REGION_ELAPSED_TIME: case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
reg = (struct o2hb_region *)db->db_data; reg = (struct o2hb_region *)db->db_data;
out += snprintf(buf + out, PAGE_SIZE - out, "%u\n", lts = reg->hr_last_timeout_start;
jiffies_to_msecs(jiffies - /* If 0, it has never been set before */
reg->hr_last_timeout_start)); if (lts)
lts = jiffies_to_msecs(jiffies - lts);
out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
goto done; goto done;
case O2HB_DB_TYPE_REGION_PINNED: case O2HB_DB_TYPE_REGION_PINNED:
@ -1426,6 +1469,8 @@ static void o2hb_region_release(struct config_item *item)
struct page *page; struct page *page;
struct o2hb_region *reg = to_o2hb_region(item); struct o2hb_region *reg = to_o2hb_region(item);
mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
if (reg->hr_tmp_block) if (reg->hr_tmp_block)
kfree(reg->hr_tmp_block); kfree(reg->hr_tmp_block);
@ -1792,7 +1837,10 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
live_threshold <<= 1; live_threshold <<= 1;
spin_unlock(&o2hb_live_lock); spin_unlock(&o2hb_live_lock);
} }
atomic_set(&reg->hr_steady_iterations, live_threshold + 1); ++live_threshold;
atomic_set(&reg->hr_steady_iterations, live_threshold);
/* unsteady_iterations is double the steady_iterations */
atomic_set(&reg->hr_unsteady_iterations, (live_threshold << 1));
hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
reg->hr_item.ci_name); reg->hr_item.ci_name);
@ -1809,14 +1857,12 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
ret = wait_event_interruptible(o2hb_steady_queue, ret = wait_event_interruptible(o2hb_steady_queue,
atomic_read(&reg->hr_steady_iterations) == 0); atomic_read(&reg->hr_steady_iterations) == 0);
if (ret) { if (ret) {
/* We got interrupted (hello ptrace!). Clean up */ atomic_set(&reg->hr_steady_iterations, 0);
spin_lock(&o2hb_live_lock); reg->hr_aborted_start = 1;
hb_task = reg->hr_task; }
reg->hr_task = NULL;
spin_unlock(&o2hb_live_lock);
if (hb_task) if (reg->hr_aborted_start) {
kthread_stop(hb_task); ret = -EIO;
goto out; goto out;
} }
@ -1833,8 +1879,8 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
ret = -EIO; ret = -EIO;
if (hb_task && o2hb_global_heartbeat_active()) if (hb_task && o2hb_global_heartbeat_active())
printk(KERN_NOTICE "o2hb: Heartbeat started on region %s\n", printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
config_item_name(&reg->hr_item)); config_item_name(&reg->hr_item), reg->hr_dev_name);
out: out:
if (filp) if (filp)
@ -2092,13 +2138,6 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
/* stop the thread when the user removes the region dir */ /* stop the thread when the user removes the region dir */
spin_lock(&o2hb_live_lock); spin_lock(&o2hb_live_lock);
if (o2hb_global_heartbeat_active()) {
clear_bit(reg->hr_region_num, o2hb_region_bitmap);
clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
quorum_region = 1;
clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
}
hb_task = reg->hr_task; hb_task = reg->hr_task;
reg->hr_task = NULL; reg->hr_task = NULL;
reg->hr_item_dropped = 1; reg->hr_item_dropped = 1;
@ -2107,19 +2146,30 @@ static void o2hb_heartbeat_group_drop_item(struct config_group *group,
if (hb_task) if (hb_task)
kthread_stop(hb_task); kthread_stop(hb_task);
if (o2hb_global_heartbeat_active()) {
spin_lock(&o2hb_live_lock);
clear_bit(reg->hr_region_num, o2hb_region_bitmap);
clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
quorum_region = 1;
clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
spin_unlock(&o2hb_live_lock);
printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
((atomic_read(&reg->hr_steady_iterations) == 0) ?
"stopped" : "start aborted"), config_item_name(item),
reg->hr_dev_name);
}
/* /*
* If we're racing a dev_write(), we need to wake them. They will * If we're racing a dev_write(), we need to wake them. They will
* check reg->hr_task * check reg->hr_task
*/ */
if (atomic_read(&reg->hr_steady_iterations) != 0) { if (atomic_read(&reg->hr_steady_iterations) != 0) {
reg->hr_aborted_start = 1;
atomic_set(&reg->hr_steady_iterations, 0); atomic_set(&reg->hr_steady_iterations, 0);
wake_up(&o2hb_steady_queue); wake_up(&o2hb_steady_queue);
} }
if (o2hb_global_heartbeat_active())
printk(KERN_NOTICE "o2hb: Heartbeat stopped on region %s\n",
config_item_name(&reg->hr_item));
config_item_put(item); config_item_put(item);
if (!o2hb_global_heartbeat_active() || !quorum_region) if (!o2hb_global_heartbeat_active() || !quorum_region)

View file

@ -47,6 +47,7 @@
#define SC_DEBUG_NAME "sock_containers" #define SC_DEBUG_NAME "sock_containers"
#define NST_DEBUG_NAME "send_tracking" #define NST_DEBUG_NAME "send_tracking"
#define STATS_DEBUG_NAME "stats" #define STATS_DEBUG_NAME "stats"
#define NODES_DEBUG_NAME "connected_nodes"
#define SHOW_SOCK_CONTAINERS 0 #define SHOW_SOCK_CONTAINERS 0
#define SHOW_SOCK_STATS 1 #define SHOW_SOCK_STATS 1
@ -55,6 +56,7 @@ static struct dentry *o2net_dentry;
static struct dentry *sc_dentry; static struct dentry *sc_dentry;
static struct dentry *nst_dentry; static struct dentry *nst_dentry;
static struct dentry *stats_dentry; static struct dentry *stats_dentry;
static struct dentry *nodes_dentry;
static DEFINE_SPINLOCK(o2net_debug_lock); static DEFINE_SPINLOCK(o2net_debug_lock);
@ -491,53 +493,87 @@ static const struct file_operations sc_seq_fops = {
.release = sc_fop_release, .release = sc_fop_release,
}; };
int o2net_debugfs_init(void) static int o2net_fill_bitmap(char *buf, int len)
{ {
o2net_dentry = debugfs_create_dir(O2NET_DEBUG_DIR, NULL); unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
if (!o2net_dentry) { int i = -1, out = 0;
mlog_errno(-ENOMEM);
goto bail;
}
nst_dentry = debugfs_create_file(NST_DEBUG_NAME, S_IFREG|S_IRUSR, o2net_fill_node_map(map, sizeof(map));
o2net_dentry, NULL,
&nst_seq_fops);
if (!nst_dentry) {
mlog_errno(-ENOMEM);
goto bail;
}
sc_dentry = debugfs_create_file(SC_DEBUG_NAME, S_IFREG|S_IRUSR, while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES)
o2net_dentry, NULL, out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
&sc_seq_fops); out += snprintf(buf + out, PAGE_SIZE - out, "\n");
if (!sc_dentry) {
mlog_errno(-ENOMEM);
goto bail;
}
stats_dentry = debugfs_create_file(STATS_DEBUG_NAME, S_IFREG|S_IRUSR, return out;
o2net_dentry, NULL, }
&stats_seq_fops);
if (!stats_dentry) { static int nodes_fop_open(struct inode *inode, struct file *file)
mlog_errno(-ENOMEM); {
goto bail; char *buf;
}
buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
if (!buf)
return -ENOMEM;
i_size_write(inode, o2net_fill_bitmap(buf, PAGE_SIZE));
file->private_data = buf;
return 0; return 0;
bail:
debugfs_remove(stats_dentry);
debugfs_remove(sc_dentry);
debugfs_remove(nst_dentry);
debugfs_remove(o2net_dentry);
return -ENOMEM;
} }
static int o2net_debug_release(struct inode *inode, struct file *file)
{
kfree(file->private_data);
return 0;
}
static ssize_t o2net_debug_read(struct file *file, char __user *buf,
size_t nbytes, loff_t *ppos)
{
return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
i_size_read(file->f_mapping->host));
}
static const struct file_operations nodes_fops = {
.open = nodes_fop_open,
.release = o2net_debug_release,
.read = o2net_debug_read,
.llseek = generic_file_llseek,
};
void o2net_debugfs_exit(void) void o2net_debugfs_exit(void)
{ {
debugfs_remove(nodes_dentry);
debugfs_remove(stats_dentry); debugfs_remove(stats_dentry);
debugfs_remove(sc_dentry); debugfs_remove(sc_dentry);
debugfs_remove(nst_dentry); debugfs_remove(nst_dentry);
debugfs_remove(o2net_dentry); debugfs_remove(o2net_dentry);
} }
int o2net_debugfs_init(void)
{
mode_t mode = S_IFREG|S_IRUSR;
o2net_dentry = debugfs_create_dir(O2NET_DEBUG_DIR, NULL);
if (o2net_dentry)
nst_dentry = debugfs_create_file(NST_DEBUG_NAME, mode,
o2net_dentry, NULL, &nst_seq_fops);
if (nst_dentry)
sc_dentry = debugfs_create_file(SC_DEBUG_NAME, mode,
o2net_dentry, NULL, &sc_seq_fops);
if (sc_dentry)
stats_dentry = debugfs_create_file(STATS_DEBUG_NAME, mode,
o2net_dentry, NULL, &stats_seq_fops);
if (stats_dentry)
nodes_dentry = debugfs_create_file(NODES_DEBUG_NAME, mode,
o2net_dentry, NULL, &nodes_fops);
if (nodes_dentry)
return 0;
o2net_debugfs_exit();
mlog_errno(-ENOMEM);
return -ENOMEM;
}
#endif /* CONFIG_DEBUG_FS */ #endif /* CONFIG_DEBUG_FS */

View file

@ -546,7 +546,7 @@ static void o2net_set_nn_state(struct o2net_node *nn,
} }
if (was_valid && !valid) { if (was_valid && !valid) {
printk(KERN_NOTICE "o2net: no longer connected to " printk(KERN_NOTICE "o2net: No longer connected to "
SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc)); SC_NODEF_FMT "\n", SC_NODEF_ARGS(old_sc));
o2net_complete_nodes_nsw(nn); o2net_complete_nodes_nsw(nn);
} }
@ -556,7 +556,7 @@ static void o2net_set_nn_state(struct o2net_node *nn,
cancel_delayed_work(&nn->nn_connect_expired); cancel_delayed_work(&nn->nn_connect_expired);
printk(KERN_NOTICE "o2net: %s " SC_NODEF_FMT "\n", printk(KERN_NOTICE "o2net: %s " SC_NODEF_FMT "\n",
o2nm_this_node() > sc->sc_node->nd_num ? o2nm_this_node() > sc->sc_node->nd_num ?
"connected to" : "accepted connection from", "Connected to" : "Accepted connection from",
SC_NODEF_ARGS(sc)); SC_NODEF_ARGS(sc));
} }
@ -644,7 +644,7 @@ static void o2net_state_change(struct sock *sk)
o2net_sc_queue_work(sc, &sc->sc_connect_work); o2net_sc_queue_work(sc, &sc->sc_connect_work);
break; break;
default: default:
printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT printk(KERN_INFO "o2net: Connection to " SC_NODEF_FMT
" shutdown, state %d\n", " shutdown, state %d\n",
SC_NODEF_ARGS(sc), sk->sk_state); SC_NODEF_ARGS(sc), sk->sk_state);
o2net_sc_queue_work(sc, &sc->sc_shutdown_work); o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
@ -1035,6 +1035,25 @@ static int o2net_tx_can_proceed(struct o2net_node *nn,
return ret; return ret;
} }
/* Get a map of all nodes to which this node is currently connected to */
void o2net_fill_node_map(unsigned long *map, unsigned bytes)
{
struct o2net_sock_container *sc;
int node, ret;
BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
memset(map, 0, bytes);
for (node = 0; node < O2NM_MAX_NODES; ++node) {
o2net_tx_can_proceed(o2net_nn_from_num(node), &sc, &ret);
if (!ret) {
set_bit(node, map);
sc_put(sc);
}
}
}
EXPORT_SYMBOL_GPL(o2net_fill_node_map);
int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
size_t caller_veclen, u8 target_node, int *status) size_t caller_veclen, u8 target_node, int *status)
{ {
@ -1285,11 +1304,11 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) { if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) {
mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol " printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " Advertised net "
"version %llu but %llu is required, disconnecting\n", "protocol version %llu but %llu is required. "
SC_NODEF_ARGS(sc), "Disconnecting.\n", SC_NODEF_ARGS(sc),
(unsigned long long)be64_to_cpu(hand->protocol_version), (unsigned long long)be64_to_cpu(hand->protocol_version),
O2NET_PROTOCOL_VERSION); O2NET_PROTOCOL_VERSION);
/* don't bother reconnecting if its the wrong version. */ /* don't bother reconnecting if its the wrong version. */
o2net_ensure_shutdown(nn, sc, -ENOTCONN); o2net_ensure_shutdown(nn, sc, -ENOTCONN);
@ -1303,33 +1322,33 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
*/ */
if (be32_to_cpu(hand->o2net_idle_timeout_ms) != if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
o2net_idle_timeout()) { o2net_idle_timeout()) {
mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of " printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a network "
"%u ms, but we use %u ms locally. disconnecting\n", "idle timeout of %u ms, but we use %u ms locally. "
SC_NODEF_ARGS(sc), "Disconnecting.\n", SC_NODEF_ARGS(sc),
be32_to_cpu(hand->o2net_idle_timeout_ms), be32_to_cpu(hand->o2net_idle_timeout_ms),
o2net_idle_timeout()); o2net_idle_timeout());
o2net_ensure_shutdown(nn, sc, -ENOTCONN); o2net_ensure_shutdown(nn, sc, -ENOTCONN);
return -1; return -1;
} }
if (be32_to_cpu(hand->o2net_keepalive_delay_ms) != if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
o2net_keepalive_delay()) { o2net_keepalive_delay()) {
mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of " printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a keepalive "
"%u ms, but we use %u ms locally. disconnecting\n", "delay of %u ms, but we use %u ms locally. "
SC_NODEF_ARGS(sc), "Disconnecting.\n", SC_NODEF_ARGS(sc),
be32_to_cpu(hand->o2net_keepalive_delay_ms), be32_to_cpu(hand->o2net_keepalive_delay_ms),
o2net_keepalive_delay()); o2net_keepalive_delay());
o2net_ensure_shutdown(nn, sc, -ENOTCONN); o2net_ensure_shutdown(nn, sc, -ENOTCONN);
return -1; return -1;
} }
if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) != if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
O2HB_MAX_WRITE_TIMEOUT_MS) { O2HB_MAX_WRITE_TIMEOUT_MS) {
mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of " printk(KERN_NOTICE "o2net: " SC_NODEF_FMT " uses a heartbeat "
"%u ms, but we use %u ms locally. disconnecting\n", "timeout of %u ms, but we use %u ms locally. "
SC_NODEF_ARGS(sc), "Disconnecting.\n", SC_NODEF_ARGS(sc),
be32_to_cpu(hand->o2hb_heartbeat_timeout_ms), be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
O2HB_MAX_WRITE_TIMEOUT_MS); O2HB_MAX_WRITE_TIMEOUT_MS);
o2net_ensure_shutdown(nn, sc, -ENOTCONN); o2net_ensure_shutdown(nn, sc, -ENOTCONN);
return -1; return -1;
} }
@ -1540,28 +1559,16 @@ static void o2net_idle_timer(unsigned long data)
{ {
struct o2net_sock_container *sc = (struct o2net_sock_container *)data; struct o2net_sock_container *sc = (struct o2net_sock_container *)data;
struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
#ifdef CONFIG_DEBUG_FS #ifdef CONFIG_DEBUG_FS
ktime_t now = ktime_get(); unsigned long msecs = ktime_to_ms(ktime_get()) -
ktime_to_ms(sc->sc_tv_timer);
#else
unsigned long msecs = o2net_idle_timeout();
#endif #endif
printk(KERN_NOTICE "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u " printk(KERN_NOTICE "o2net: Connection to " SC_NODEF_FMT " has been "
"seconds, shutting it down.\n", SC_NODEF_ARGS(sc), "idle for %lu.%lu secs, shutting it down.\n", SC_NODEF_ARGS(sc),
o2net_idle_timeout() / 1000, msecs / 1000, msecs % 1000);
o2net_idle_timeout() % 1000);
#ifdef CONFIG_DEBUG_FS
mlog(ML_NOTICE, "Here are some times that might help debug the "
"situation: (Timer: %lld, Now %lld, DataReady %lld, Advance %lld-%lld, "
"Key 0x%08x, Func %u, FuncTime %lld-%lld)\n",
(long long)ktime_to_us(sc->sc_tv_timer), (long long)ktime_to_us(now),
(long long)ktime_to_us(sc->sc_tv_data_ready),
(long long)ktime_to_us(sc->sc_tv_advance_start),
(long long)ktime_to_us(sc->sc_tv_advance_stop),
sc->sc_msg_key, sc->sc_msg_type,
(long long)ktime_to_us(sc->sc_tv_func_start),
(long long)ktime_to_us(sc->sc_tv_func_stop));
#endif
/* /*
* Initialize the nn_timeout so that the next connection attempt * Initialize the nn_timeout so that the next connection attempt
@ -1694,8 +1701,8 @@ static void o2net_start_connect(struct work_struct *work)
out: out:
if (ret) { if (ret) {
mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed " printk(KERN_NOTICE "o2net: Connect attempt to " SC_NODEF_FMT
"with errno %d\n", SC_NODEF_ARGS(sc), ret); " failed with errno %d\n", SC_NODEF_ARGS(sc), ret);
/* 0 err so that another will be queued and attempted /* 0 err so that another will be queued and attempted
* from set_nn_state */ * from set_nn_state */
if (sc) if (sc)
@ -1718,8 +1725,8 @@ static void o2net_connect_expired(struct work_struct *work)
spin_lock(&nn->nn_lock); spin_lock(&nn->nn_lock);
if (!nn->nn_sc_valid) { if (!nn->nn_sc_valid) {
mlog(ML_ERROR, "no connection established with node %u after " printk(KERN_NOTICE "o2net: No connection established with "
"%u.%u seconds, giving up and returning errors.\n", "node %u after %u.%u seconds, giving up.\n",
o2net_num_from_nn(nn), o2net_num_from_nn(nn),
o2net_idle_timeout() / 1000, o2net_idle_timeout() / 1000,
o2net_idle_timeout() % 1000); o2net_idle_timeout() % 1000);
@ -1862,21 +1869,21 @@ static int o2net_accept_one(struct socket *sock)
node = o2nm_get_node_by_ip(sin.sin_addr.s_addr); node = o2nm_get_node_by_ip(sin.sin_addr.s_addr);
if (node == NULL) { if (node == NULL) {
mlog(ML_NOTICE, "attempt to connect from unknown node at %pI4:%d\n", printk(KERN_NOTICE "o2net: Attempt to connect from unknown "
&sin.sin_addr.s_addr, ntohs(sin.sin_port)); "node at %pI4:%d\n", &sin.sin_addr.s_addr,
ntohs(sin.sin_port));
ret = -EINVAL; ret = -EINVAL;
goto out; goto out;
} }
if (o2nm_this_node() >= node->nd_num) { if (o2nm_this_node() >= node->nd_num) {
local_node = o2nm_get_node_by_num(o2nm_this_node()); local_node = o2nm_get_node_by_num(o2nm_this_node());
mlog(ML_NOTICE, "unexpected connect attempt seen at node '%s' (" printk(KERN_NOTICE "o2net: Unexpected connect attempt seen "
"%u, %pI4:%d) from node '%s' (%u, %pI4:%d)\n", "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
local_node->nd_name, local_node->nd_num, "%pI4:%d)\n", local_node->nd_name, local_node->nd_num,
&(local_node->nd_ipv4_address), &(local_node->nd_ipv4_address),
ntohs(local_node->nd_ipv4_port), ntohs(local_node->nd_ipv4_port), node->nd_name,
node->nd_name, node->nd_num, &sin.sin_addr.s_addr, node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port));
ntohs(sin.sin_port));
ret = -EINVAL; ret = -EINVAL;
goto out; goto out;
} }
@ -1901,10 +1908,10 @@ static int o2net_accept_one(struct socket *sock)
ret = 0; ret = 0;
spin_unlock(&nn->nn_lock); spin_unlock(&nn->nn_lock);
if (ret) { if (ret) {
mlog(ML_NOTICE, "attempt to connect from node '%s' at " printk(KERN_NOTICE "o2net: Attempt to connect from node '%s' "
"%pI4:%d but it already has an open connection\n", "at %pI4:%d but it already has an open connection\n",
node->nd_name, &sin.sin_addr.s_addr, node->nd_name, &sin.sin_addr.s_addr,
ntohs(sin.sin_port)); ntohs(sin.sin_port));
goto out; goto out;
} }
@ -1984,7 +1991,7 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port)
ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret < 0) { if (ret < 0) {
mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret); printk(KERN_ERR "o2net: Error %d while creating socket\n", ret);
goto out; goto out;
} }
@ -2001,16 +2008,15 @@ static int o2net_open_listening_sock(__be32 addr, __be16 port)
sock->sk->sk_reuse = 1; sock->sk->sk_reuse = 1;
ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
if (ret < 0) { if (ret < 0) {
mlog(ML_ERROR, "unable to bind socket at %pI4:%u, " printk(KERN_ERR "o2net: Error %d while binding socket at "
"ret=%d\n", &addr, ntohs(port), ret); "%pI4:%u\n", ret, &addr, ntohs(port));
goto out; goto out;
} }
ret = sock->ops->listen(sock, 64); ret = sock->ops->listen(sock, 64);
if (ret < 0) { if (ret < 0)
mlog(ML_ERROR, "unable to listen on %pI4:%u, ret=%d\n", printk(KERN_ERR "o2net: Error %d while listening on %pI4:%u\n",
&addr, ntohs(port), ret); ret, &addr, ntohs(port));
}
out: out:
if (ret) { if (ret) {

View file

@ -106,6 +106,8 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
struct list_head *unreg_list); struct list_head *unreg_list);
void o2net_unregister_handler_list(struct list_head *list); void o2net_unregister_handler_list(struct list_head *list);
void o2net_fill_node_map(unsigned long *map, unsigned bytes);
struct o2nm_node; struct o2nm_node;
int o2net_register_hb_callbacks(void); int o2net_register_hb_callbacks(void);
void o2net_unregister_hb_callbacks(void); void o2net_unregister_hb_callbacks(void);

View file

@ -1184,8 +1184,7 @@ static int __ocfs2_delete_entry(handle_t *handle, struct inode *dir,
if (pde) if (pde)
le16_add_cpu(&pde->rec_len, le16_add_cpu(&pde->rec_len,
le16_to_cpu(de->rec_len)); le16_to_cpu(de->rec_len));
else de->inode = 0;
de->inode = 0;
dir->i_version++; dir->i_version++;
ocfs2_journal_dirty(handle, bh); ocfs2_journal_dirty(handle, bh);
goto bail; goto bail;

View file

@ -859,8 +859,8 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
void dlm_wait_for_recovery(struct dlm_ctxt *dlm); void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout);
void dlm_put(struct dlm_ctxt *dlm); void dlm_put(struct dlm_ctxt *dlm);
struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
@ -877,9 +877,8 @@ static inline void dlm_lockres_get(struct dlm_lock_resource *res)
kref_get(&res->refs); kref_get(&res->refs);
} }
void dlm_lockres_put(struct dlm_lock_resource *res); void dlm_lockres_put(struct dlm_lock_resource *res);
void __dlm_unhash_lockres(struct dlm_lock_resource *res); void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
void __dlm_insert_lockres(struct dlm_ctxt *dlm, void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
struct dlm_lock_resource *res);
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
const char *name, const char *name,
unsigned int len, unsigned int len,
@ -902,46 +901,15 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
const char *name, const char *name,
unsigned int namelen); unsigned int namelen);
#define dlm_lockres_set_refmap_bit(bit,res) \ void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
__dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__) struct dlm_lock_resource *res, int bit);
#define dlm_lockres_clear_refmap_bit(bit,res) \ void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
__dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__) struct dlm_lock_resource *res, int bit);
static inline void __dlm_lockres_set_refmap_bit(int bit, void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, struct dlm_lock_resource *res);
const char *file, void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
int line) struct dlm_lock_resource *res);
{
//printk("%s:%d:%.*s: setting bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
set_bit(bit, res->refmap);
}
static inline void __dlm_lockres_clear_refmap_bit(int bit,
struct dlm_lock_resource *res,
const char *file,
int line)
{
//printk("%s:%d:%.*s: clearing bit %d\n", file, line,
// res->lockname.len, res->lockname.name, bit);
clear_bit(bit, res->refmap);
}
void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
const char *file,
int line);
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int new_lockres,
const char *file,
int line);
#define dlm_lockres_drop_inflight_ref(d,r) \
__dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
#define dlm_lockres_grab_inflight_ref_new(d,r) \
__dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);

View file

@ -157,16 +157,18 @@ static int dlm_protocol_compare(struct dlm_protocol_version *existing,
static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
{ {
if (!hlist_unhashed(&lockres->hash_node)) { if (hlist_unhashed(&res->hash_node))
hlist_del_init(&lockres->hash_node); return;
dlm_lockres_put(lockres);
} mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len,
res->lockname.name);
hlist_del_init(&res->hash_node);
dlm_lockres_put(res);
} }
void __dlm_insert_lockres(struct dlm_ctxt *dlm, void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
struct dlm_lock_resource *res)
{ {
struct hlist_head *bucket; struct hlist_head *bucket;
struct qstr *q; struct qstr *q;
@ -180,6 +182,9 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
dlm_lockres_get(res); dlm_lockres_get(res);
hlist_add_head(&res->hash_node, bucket); hlist_add_head(&res->hash_node, bucket);
mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
res->lockname.name);
} }
struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
@ -539,17 +544,17 @@ static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
static void __dlm_print_nodes(struct dlm_ctxt *dlm) static void __dlm_print_nodes(struct dlm_ctxt *dlm)
{ {
int node = -1; int node = -1, num = 0;
assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->spinlock);
printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name); printk("( ");
while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
node + 1)) < O2NM_MAX_NODES) { node + 1)) < O2NM_MAX_NODES) {
printk("%d ", node); printk("%d ", node);
++num;
} }
printk("\n"); printk(") %u nodes\n", num);
} }
static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
@ -566,11 +571,10 @@ static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
node = exit_msg->node_idx; node = exit_msg->node_idx;
printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name);
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
clear_bit(node, dlm->domain_map); clear_bit(node, dlm->domain_map);
clear_bit(node, dlm->exit_domain_map); clear_bit(node, dlm->exit_domain_map);
printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
__dlm_print_nodes(dlm); __dlm_print_nodes(dlm);
/* notify anything attached to the heartbeat events */ /* notify anything attached to the heartbeat events */
@ -755,6 +759,7 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm)
dlm_mark_domain_leaving(dlm); dlm_mark_domain_leaving(dlm);
dlm_leave_domain(dlm); dlm_leave_domain(dlm);
printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
dlm_force_free_mles(dlm); dlm_force_free_mles(dlm);
dlm_complete_dlm_shutdown(dlm); dlm_complete_dlm_shutdown(dlm);
} }
@ -970,7 +975,7 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
clear_bit(assert->node_idx, dlm->exit_domain_map); clear_bit(assert->node_idx, dlm->exit_domain_map);
__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n", printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ",
assert->node_idx, dlm->name); assert->node_idx, dlm->name);
__dlm_print_nodes(dlm); __dlm_print_nodes(dlm);
@ -1701,8 +1706,10 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
bail: bail:
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
if (!status) if (!status) {
printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
__dlm_print_nodes(dlm); __dlm_print_nodes(dlm);
}
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
if (ctxt) { if (ctxt) {
@ -2131,13 +2138,6 @@ struct dlm_ctxt * dlm_register_domain(const char *domain,
goto leave; goto leave;
} }
if (!o2hb_check_local_node_heartbeating()) {
mlog(ML_ERROR, "the local node has not been configured, or is "
"not heartbeating\n");
ret = -EPROTO;
goto leave;
}
mlog(0, "register called for domain \"%s\"\n", domain); mlog(0, "register called for domain \"%s\"\n", domain);
retry: retry:

View file

@ -183,10 +183,6 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
kick_thread = 1; kick_thread = 1;
} }
} }
/* reduce the inflight count, this may result in the lockres
* being purged below during calc_usage */
if (lock->ml.node == dlm->node_num)
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
wake_up(&res->wq); wake_up(&res->wq);
@ -231,10 +227,16 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
lock->ml.type, res->lockname.len, lock->ml.type, res->lockname.len,
res->lockname.name, flags); res->lockname.name, flags);
/*
* Wait if resource is getting recovered, remastered, etc.
* If the resource was remastered and new owner is self, then exit.
*/
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
/* will exit this call with spinlock held */
__dlm_wait_on_lockres(res); __dlm_wait_on_lockres(res);
if (res->owner == dlm->node_num) {
spin_unlock(&res->spinlock);
return DLM_RECOVERING;
}
res->state |= DLM_LOCK_RES_IN_PROGRESS; res->state |= DLM_LOCK_RES_IN_PROGRESS;
/* add lock to local (secondary) queue */ /* add lock to local (secondary) queue */
@ -319,27 +321,23 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
sizeof(create), res->owner, &status); sizeof(create), res->owner, &status);
if (tmpret >= 0) { if (tmpret >= 0) {
// successfully sent and received ret = status;
ret = status; // this is already a dlm_status
if (ret == DLM_REJECTED) { if (ret == DLM_REJECTED) {
mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
"no longer owned by %u. that node is coming back " "owned by node %u. That node is coming back up "
"up currently.\n", dlm->name, create.namelen, "currently.\n", dlm->name, create.namelen,
create.name, res->owner); create.name, res->owner);
dlm_print_one_lock_resource(res); dlm_print_one_lock_resource(res);
BUG(); BUG();
} }
} else { } else {
mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
"node %u\n", tmpret, DLM_CREATE_LOCK_MSG, dlm->key, "node %u\n", dlm->name, create.namelen, create.name,
res->owner); tmpret, res->owner);
if (dlm_is_host_down(tmpret)) { if (dlm_is_host_down(tmpret))
ret = DLM_RECOVERING; ret = DLM_RECOVERING;
mlog(0, "node %u died so returning DLM_RECOVERING " else
"from lock message!\n", res->owner);
} else {
ret = dlm_err_to_dlm_status(tmpret); ret = dlm_err_to_dlm_status(tmpret);
}
} }
return ret; return ret;
@ -440,7 +438,7 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
/* zero memory only if kernel-allocated */ /* zero memory only if kernel-allocated */
lksb = kzalloc(sizeof(*lksb), GFP_NOFS); lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
if (!lksb) { if (!lksb) {
kfree(lock); kmem_cache_free(dlm_lock_cache, lock);
return NULL; return NULL;
} }
kernel_allocated = 1; kernel_allocated = 1;
@ -718,18 +716,10 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
if (status == DLM_RECOVERING || status == DLM_MIGRATING || if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
status == DLM_FORWARD) { status == DLM_FORWARD) {
mlog(0, "retrying lock with migration/"
"recovery/in progress\n");
msleep(100); msleep(100);
/* no waiting for dlm_reco_thread */
if (recovery) { if (recovery) {
if (status != DLM_RECOVERING) if (status != DLM_RECOVERING)
goto retry_lock; goto retry_lock;
mlog(0, "%s: got RECOVERING "
"for $RECOVERY lock, master "
"was %u\n", dlm->name,
res->owner);
/* wait to see the node go down, then /* wait to see the node go down, then
* drop down and allow the lockres to * drop down and allow the lockres to
* get cleaned up. need to remaster. */ * get cleaned up. need to remaster. */
@ -741,6 +731,14 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
} }
} }
/* Inflight taken in dlm_get_lock_resource() is dropped here */
spin_lock(&res->spinlock);
dlm_lockres_drop_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
dlm_lockres_calc_usage(dlm, res);
dlm_kick_thread(dlm, res);
if (status != DLM_NORMAL) { if (status != DLM_NORMAL) {
lock->lksb->flags &= ~DLM_LKSB_GET_LVB; lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
if (status != DLM_NOTQUEUED) if (status != DLM_NOTQUEUED)

View file

@ -631,39 +631,54 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
return NULL; return NULL;
} }
void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, struct dlm_lock_resource *res, int bit)
int new_lockres,
const char *file,
int line)
{ {
if (!new_lockres) assert_spin_locked(&res->spinlock);
assert_spin_locked(&res->spinlock);
if (!test_bit(dlm->node_num, res->refmap)) { mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
BUG_ON(res->inflight_locks != 0); res->lockname.name, bit, __builtin_return_address(0));
dlm_lockres_set_refmap_bit(dlm->node_num, res);
} set_bit(bit, res->refmap);
res->inflight_locks++;
mlog(0, "%s:%.*s: inflight++: now %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->inflight_locks);
} }
void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, struct dlm_lock_resource *res, int bit)
const char *file, {
int line) assert_spin_locked(&res->spinlock);
mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
res->lockname.name, bit, __builtin_return_address(0));
clear_bit(bit, res->refmap);
}
void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
assert_spin_locked(&res->spinlock);
res->inflight_locks++;
mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
res->lockname.len, res->lockname.name, res->inflight_locks,
__builtin_return_address(0));
}
void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{ {
assert_spin_locked(&res->spinlock); assert_spin_locked(&res->spinlock);
BUG_ON(res->inflight_locks == 0); BUG_ON(res->inflight_locks == 0);
res->inflight_locks--; res->inflight_locks--;
mlog(0, "%s:%.*s: inflight--: now %u\n",
dlm->name, res->lockname.len, res->lockname.name, mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
res->inflight_locks); res->lockname.len, res->lockname.name, res->inflight_locks,
if (res->inflight_locks == 0) __builtin_return_address(0));
dlm_lockres_clear_refmap_bit(dlm->node_num, res);
wake_up(&res->wq); wake_up(&res->wq);
} }
@ -697,7 +712,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
unsigned int hash; unsigned int hash;
int tries = 0; int tries = 0;
int bit, wait_on_recovery = 0; int bit, wait_on_recovery = 0;
int drop_inflight_if_nonlocal = 0;
BUG_ON(!lockid); BUG_ON(!lockid);
@ -709,36 +723,33 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
if (tmpres) { if (tmpres) {
int dropping_ref = 0;
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
spin_lock(&tmpres->spinlock); spin_lock(&tmpres->spinlock);
/* We wait for the other thread that is mastering the resource */ /* Wait on the thread that is mastering the resource */
if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
__dlm_wait_on_lockres(tmpres); __dlm_wait_on_lockres(tmpres);
BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN); BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
}
if (tmpres->owner == dlm->node_num) {
BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
dlm_lockres_grab_inflight_ref(dlm, tmpres);
} else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
dropping_ref = 1;
spin_unlock(&tmpres->spinlock);
/* wait until done messaging the master, drop our ref to allow
* the lockres to be purged, start over. */
if (dropping_ref) {
spin_lock(&tmpres->spinlock);
__dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
spin_unlock(&tmpres->spinlock); spin_unlock(&tmpres->spinlock);
dlm_lockres_put(tmpres); dlm_lockres_put(tmpres);
tmpres = NULL; tmpres = NULL;
goto lookup; goto lookup;
} }
mlog(0, "found in hash!\n"); /* Wait on the resource purge to complete before continuing */
if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
BUG_ON(tmpres->owner == dlm->node_num);
__dlm_wait_on_lockres_flags(tmpres,
DLM_LOCK_RES_DROPPING_REF);
spin_unlock(&tmpres->spinlock);
dlm_lockres_put(tmpres);
tmpres = NULL;
goto lookup;
}
/* Grab inflight ref to pin the resource */
dlm_lockres_grab_inflight_ref(dlm, tmpres);
spin_unlock(&tmpres->spinlock);
if (res) if (res)
dlm_lockres_put(res); dlm_lockres_put(res);
res = tmpres; res = tmpres;
@ -829,8 +840,8 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
* but they might own this lockres. wait on them. */ * but they might own this lockres. wait on them. */
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) { if (bit < O2NM_MAX_NODES) {
mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " mlog(0, "%s: res %.*s, At least one node (%d) "
"recover before lock mastery can begin\n", "to recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit); dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1; wait_on_recovery = 1;
} }
@ -843,12 +854,11 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
/* finally add the lockres to its hash bucket */ /* finally add the lockres to its hash bucket */
__dlm_insert_lockres(dlm, res); __dlm_insert_lockres(dlm, res);
/* since this lockres is new it doesn't not require the spinlock */
dlm_lockres_grab_inflight_ref_new(dlm, res);
/* if this node does not become the master make sure to drop /* Grab inflight ref to pin the resource */
* this inflight reference below */ spin_lock(&res->spinlock);
drop_inflight_if_nonlocal = 1; dlm_lockres_grab_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
/* get an extra ref on the mle in case this is a BLOCK /* get an extra ref on the mle in case this is a BLOCK
* if so, the creator of the BLOCK may try to put the last * if so, the creator of the BLOCK may try to put the last
@ -864,8 +874,8 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
* dlm spinlock would be detectable be a change on the mle, * dlm spinlock would be detectable be a change on the mle,
* so we only need to clear out the recovery map once. */ * so we only need to clear out the recovery map once. */
if (dlm_is_recovery_lock(lockid, namelen)) { if (dlm_is_recovery_lock(lockid, namelen)) {
mlog(ML_NOTICE, "%s: recovery map is not empty, but " mlog(0, "%s: Recovery map is not empty, but must "
"must master $RECOVERY lock now\n", dlm->name); "master $RECOVERY lock now\n", dlm->name);
if (!dlm_pre_master_reco_lockres(dlm, res)) if (!dlm_pre_master_reco_lockres(dlm, res))
wait_on_recovery = 0; wait_on_recovery = 0;
else { else {
@ -883,8 +893,8 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) { if (bit < O2NM_MAX_NODES) {
mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " mlog(0, "%s: res %.*s, At least one node (%d) "
"recover before lock mastery can begin\n", "to recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit); dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1; wait_on_recovery = 1;
} else } else
@ -913,8 +923,8 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
* yet, keep going until it does. this is how the * yet, keep going until it does. this is how the
* master will know that asserts are needed back to * master will know that asserts are needed back to
* the lower nodes. */ * the lower nodes. */
mlog(0, "%s:%.*s: requests only up to %u but master " mlog(0, "%s: res %.*s, Requests only up to %u but "
"is %u, keep going\n", dlm->name, namelen, "master is %u, keep going\n", dlm->name, namelen,
lockid, nodenum, mle->master); lockid, nodenum, mle->master);
} }
} }
@ -924,13 +934,12 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
if (ret < 0) { if (ret < 0) {
wait_on_recovery = 1; wait_on_recovery = 1;
mlog(0, "%s:%.*s: node map changed, redo the " mlog(0, "%s: res %.*s, Node map changed, redo the master "
"master request now, blocked=%d\n", "request now, blocked=%d\n", dlm->name, res->lockname.len,
dlm->name, res->lockname.len,
res->lockname.name, blocked); res->lockname.name, blocked);
if (++tries > 20) { if (++tries > 20) {
mlog(ML_ERROR, "%s:%.*s: spinning on " mlog(ML_ERROR, "%s: res %.*s, Spinning on "
"dlm_wait_for_lock_mastery, blocked=%d\n", "dlm_wait_for_lock_mastery, blocked = %d\n",
dlm->name, res->lockname.len, dlm->name, res->lockname.len,
res->lockname.name, blocked); res->lockname.name, blocked);
dlm_print_one_lock_resource(res); dlm_print_one_lock_resource(res);
@ -940,7 +949,8 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
goto redo_request; goto redo_request;
} }
mlog(0, "lockres mastered by %u\n", res->owner); mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
res->lockname.name, res->owner);
/* make sure we never continue without this */ /* make sure we never continue without this */
BUG_ON(res->owner == O2NM_MAX_NODES); BUG_ON(res->owner == O2NM_MAX_NODES);
@ -952,8 +962,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
wake_waiters: wake_waiters:
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
dlm_lockres_drop_inflight_ref(dlm, res);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS; res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
wake_up(&res->wq); wake_up(&res->wq);
@ -1426,9 +1434,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
} }
if (res->owner == dlm->node_num) { if (res->owner == dlm->node_num) {
mlog(0, "%s:%.*s: setting bit %u in refmap\n", dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
dlm->name, namelen, name, request->node_idx);
dlm_lockres_set_refmap_bit(request->node_idx, res);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
response = DLM_MASTER_RESP_YES; response = DLM_MASTER_RESP_YES;
if (mle) if (mle)
@ -1493,10 +1499,8 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
* go back and clean the mles on any * go back and clean the mles on any
* other nodes */ * other nodes */
dispatch_assert = 1; dispatch_assert = 1;
dlm_lockres_set_refmap_bit(request->node_idx, res); dlm_lockres_set_refmap_bit(dlm, res,
mlog(0, "%s:%.*s: setting bit %u in refmap\n", request->node_idx);
dlm->name, namelen, name,
request->node_idx);
} else } else
response = DLM_MASTER_RESP_NO; response = DLM_MASTER_RESP_NO;
} else { } else {
@ -1702,7 +1706,7 @@ static int dlm_do_assert_master(struct dlm_ctxt *dlm,
"lockres, set the bit in the refmap\n", "lockres, set the bit in the refmap\n",
namelen, lockname, to); namelen, lockname, to);
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(to, res); dlm_lockres_set_refmap_bit(dlm, res, to);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
} }
} }
@ -2187,8 +2191,6 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
namelen = res->lockname.len; namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN); BUG_ON(namelen > O2NM_MAX_NAME_LEN);
mlog(0, "%s:%.*s: sending deref to %d\n",
dlm->name, namelen, lockname, res->owner);
memset(&deref, 0, sizeof(deref)); memset(&deref, 0, sizeof(deref));
deref.node_idx = dlm->node_num; deref.node_idx = dlm->node_num;
deref.namelen = namelen; deref.namelen = namelen;
@ -2197,14 +2199,12 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
&deref, sizeof(deref), res->owner, &r); &deref, sizeof(deref), res->owner, &r);
if (ret < 0) if (ret < 0)
mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
"node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key, dlm->name, namelen, lockname, ret, res->owner);
res->owner);
else if (r < 0) { else if (r < 0) {
/* BAD. other node says I did not have a ref. */ /* BAD. other node says I did not have a ref. */
mlog(ML_ERROR,"while dropping ref on %s:%.*s " mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
"(master=%u) got %d.\n", dlm->name, namelen, dlm->name, namelen, lockname, res->owner, r);
lockname, res->owner, r);
dlm_print_one_lock_resource(res); dlm_print_one_lock_resource(res);
BUG(); BUG();
} }
@ -2260,7 +2260,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
else { else {
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) { if (test_bit(node, res->refmap)) {
dlm_lockres_clear_refmap_bit(node, res); dlm_lockres_clear_refmap_bit(dlm, res, node);
cleared = 1; cleared = 1;
} }
} }
@ -2320,7 +2320,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) { if (test_bit(node, res->refmap)) {
__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
dlm_lockres_clear_refmap_bit(node, res); dlm_lockres_clear_refmap_bit(dlm, res, node);
cleared = 1; cleared = 1;
} }
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
@ -2802,7 +2802,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(!list_empty(&lock->bast_list));
BUG_ON(lock->ast_pending); BUG_ON(lock->ast_pending);
BUG_ON(lock->bast_pending); BUG_ON(lock->bast_pending);
dlm_lockres_clear_refmap_bit(lock->ml.node, res); dlm_lockres_clear_refmap_bit(dlm, res,
lock->ml.node);
list_del_init(&lock->list); list_del_init(&lock->list);
dlm_lock_put(lock); dlm_lock_put(lock);
/* In a normal unlock, we would have added a /* In a normal unlock, we would have added a
@ -2823,7 +2824,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
mlog(0, "%s:%.*s: node %u had a ref to this " mlog(0, "%s:%.*s: node %u had a ref to this "
"migrating lockres, clearing\n", dlm->name, "migrating lockres, clearing\n", dlm->name,
res->lockname.len, res->lockname.name, bit); res->lockname.len, res->lockname.name, bit);
dlm_lockres_clear_refmap_bit(bit, res); dlm_lockres_clear_refmap_bit(dlm, res, bit);
} }
bit++; bit++;
} }
@ -2916,9 +2917,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
&migrate, sizeof(migrate), nodenum, &migrate, sizeof(migrate), nodenum,
&status); &status);
if (ret < 0) { if (ret < 0) {
mlog(ML_ERROR, "Error %d when sending message %u (key " mlog(ML_ERROR, "%s: res %.*s, Error %d send "
"0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG, "MIGRATE_REQUEST to node %u\n", dlm->name,
dlm->key, nodenum); migrate.namelen, migrate.name, ret, nodenum);
if (!dlm_is_host_down(ret)) { if (!dlm_is_host_down(ret)) {
mlog(ML_ERROR, "unhandled error=%d!\n", ret); mlog(ML_ERROR, "unhandled error=%d!\n", ret);
BUG(); BUG();
@ -2937,7 +2938,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
dlm->name, res->lockname.len, res->lockname.name, dlm->name, res->lockname.len, res->lockname.name,
nodenum); nodenum);
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(nodenum, res); dlm_lockres_set_refmap_bit(dlm, res, nodenum);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
} }
} }
@ -3271,7 +3272,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* mastery reference here since old_master will briefly have * mastery reference here since old_master will briefly have
* a reference after the migration completes */ * a reference after the migration completes */
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(old_master, res); dlm_lockres_set_refmap_bit(dlm, res, old_master);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
mlog(0, "now time to do a migrate request to other nodes\n"); mlog(0, "now time to do a migrate request to other nodes\n");

View file

@ -362,40 +362,38 @@ static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
} }
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
{ {
if (timeout) { if (dlm_is_node_dead(dlm, node))
mlog(ML_NOTICE, "%s: waiting %dms for notification of " return;
"death of node %u\n", dlm->name, timeout, node);
printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
"domain %s\n", node, dlm->name);
if (timeout)
wait_event_timeout(dlm->dlm_reco_thread_wq, wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm, node), dlm_is_node_dead(dlm, node),
msecs_to_jiffies(timeout)); msecs_to_jiffies(timeout));
} else { else
mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
"of death of node %u\n", dlm->name, node);
wait_event(dlm->dlm_reco_thread_wq, wait_event(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm, node)); dlm_is_node_dead(dlm, node));
}
/* for now, return 0 */
return 0;
} }
int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
{ {
if (timeout) { if (dlm_is_node_recovered(dlm, node))
mlog(0, "%s: waiting %dms for notification of " return;
"recovery of node %u\n", dlm->name, timeout, node);
printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
"domain %s\n", node, dlm->name);
if (timeout)
wait_event_timeout(dlm->dlm_reco_thread_wq, wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_is_node_recovered(dlm, node), dlm_is_node_recovered(dlm, node),
msecs_to_jiffies(timeout)); msecs_to_jiffies(timeout));
} else { else
mlog(0, "%s: waiting indefinitely for notification "
"of recovery of node %u\n", dlm->name, node);
wait_event(dlm->dlm_reco_thread_wq, wait_event(dlm->dlm_reco_thread_wq,
dlm_is_node_recovered(dlm, node)); dlm_is_node_recovered(dlm, node));
}
/* for now, return 0 */
return 0;
} }
/* callers of the top-level api calls (dlmlock/dlmunlock) should /* callers of the top-level api calls (dlmlock/dlmunlock) should
@ -430,6 +428,8 @@ static void dlm_begin_recovery(struct dlm_ctxt *dlm)
{ {
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n",
dlm->name, dlm->reco.dead_node);
dlm->reco.state |= DLM_RECO_STATE_ACTIVE; dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
} }
@ -440,9 +440,18 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
wake_up(&dlm->reco.event); wake_up(&dlm->reco.event);
} }
static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
{
printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the "
"dead node %u in domain %s\n", dlm->reco.new_master,
(dlm->node_num == dlm->reco.new_master ? "me" : "he"),
dlm->reco.dead_node, dlm->name);
}
static int dlm_do_recovery(struct dlm_ctxt *dlm) static int dlm_do_recovery(struct dlm_ctxt *dlm)
{ {
int status = 0; int status = 0;
@ -505,9 +514,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
} }
mlog(0, "another node will master this recovery session.\n"); mlog(0, "another node will master this recovery session.\n");
} }
mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master, dlm_print_recovery_master(dlm);
dlm->node_num, dlm->reco.dead_node);
/* it is safe to start everything back up here /* it is safe to start everything back up here
* because all of the dead node's lock resources * because all of the dead node's lock resources
@ -518,15 +526,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
return 0; return 0;
master_here: master_here:
mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node " dlm_print_recovery_master(dlm);
"%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task),
dlm->node_num, dlm->reco.dead_node, dlm->name);
status = dlm_remaster_locks(dlm, dlm->reco.dead_node); status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
if (status < 0) { if (status < 0) {
/* we should never hit this anymore */ /* we should never hit this anymore */
mlog(ML_ERROR, "error %d remastering locks for node %u, " mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, "
"retrying.\n", status, dlm->reco.dead_node); "retrying.\n", dlm->name, status, dlm->reco.dead_node);
/* yield a bit to allow any final network messages /* yield a bit to allow any final network messages
* to get handled on remaining nodes */ * to get handled on remaining nodes */
msleep(100); msleep(100);
@ -567,7 +573,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
ndata->state = DLM_RECO_NODE_DATA_REQUESTING; ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
mlog(0, "requesting lock info from node %u\n", mlog(0, "%s: Requesting lock info from node %u\n", dlm->name,
ndata->node_num); ndata->node_num);
if (ndata->node_num == dlm->node_num) { if (ndata->node_num == dlm->node_num) {
@ -640,7 +646,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
spin_unlock(&dlm_reco_state_lock); spin_unlock(&dlm_reco_state_lock);
} }
mlog(0, "done requesting all lock info\n"); mlog(0, "%s: Done requesting all lock info\n", dlm->name);
/* nodes should be sending reco data now /* nodes should be sending reco data now
* just need to wait */ * just need to wait */
@ -802,10 +808,9 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
/* negative status is handled by caller */ /* negative status is handled by caller */
if (ret < 0) if (ret < 0)
mlog(ML_ERROR, "Error %d when sending message %u (key " mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u "
"0x%x) to node %u\n", ret, DLM_LOCK_REQUEST_MSG, "to recover dead node %u\n", dlm->name, ret,
dlm->key, request_from); request_from, dead_node);
// return from here, then // return from here, then
// sleep until all received or error // sleep until all received or error
return ret; return ret;
@ -956,9 +961,9 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
sizeof(done_msg), send_to, &tmpret); sizeof(done_msg), send_to, &tmpret);
if (ret < 0) { if (ret < 0) {
mlog(ML_ERROR, "Error %d when sending message %u (key " mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u "
"0x%x) to node %u\n", ret, DLM_RECO_DATA_DONE_MSG, "to recover dead node %u\n", dlm->name, ret, send_to,
dlm->key, send_to); dead_node);
if (!dlm_is_host_down(ret)) { if (!dlm_is_host_down(ret)) {
BUG(); BUG();
} }
@ -1127,9 +1132,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
if (ret < 0) { if (ret < 0) {
/* XXX: negative status is not handled. /* XXX: negative status is not handled.
* this will end up killing this node. */ * this will end up killing this node. */
mlog(ML_ERROR, "Error %d when sending message %u (key " mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to "
"0x%x) to node %u\n", ret, DLM_MIG_LOCKRES_MSG, "node %u (%s)\n", dlm->name, mres->lockname_len,
dlm->key, send_to); mres->lockname, ret, send_to,
(orig_flags & DLM_MRES_MIGRATION ?
"migration" : "recovery"));
} else { } else {
/* might get an -ENOMEM back here */ /* might get an -ENOMEM back here */
ret = status; ret = status;
@ -1767,7 +1774,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
dlm->name, mres->lockname_len, mres->lockname, dlm->name, mres->lockname_len, mres->lockname,
from); from);
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
dlm_lockres_set_refmap_bit(from, res); dlm_lockres_set_refmap_bit(dlm, res, from);
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
added++; added++;
break; break;
@ -1965,7 +1972,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
mlog(0, "%s:%.*s: added lock for node %u, " mlog(0, "%s:%.*s: added lock for node %u, "
"setting refmap bit\n", dlm->name, "setting refmap bit\n", dlm->name,
res->lockname.len, res->lockname.name, ml->node); res->lockname.len, res->lockname.name, ml->node);
dlm_lockres_set_refmap_bit(ml->node, res); dlm_lockres_set_refmap_bit(dlm, res, ml->node);
added++; added++;
} }
spin_unlock(&res->spinlock); spin_unlock(&res->spinlock);
@ -2084,6 +2091,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
if (res->owner == dead_node) { if (res->owner == dead_node) {
mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->owner, new_master);
list_del_init(&res->recovering); list_del_init(&res->recovering);
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
/* new_master has our reference from /* new_master has our reference from
@ -2105,40 +2115,30 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
for (i = 0; i < DLM_HASH_BUCKETS; i++) { for (i = 0; i < DLM_HASH_BUCKETS; i++) {
bucket = dlm_lockres_hash(dlm, i); bucket = dlm_lockres_hash(dlm, i);
hlist_for_each_entry(res, hash_iter, bucket, hash_node) { hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
if (res->state & DLM_LOCK_RES_RECOVERING) { if (!(res->state & DLM_LOCK_RES_RECOVERING))
if (res->owner == dead_node) { continue;
mlog(0, "(this=%u) res %.*s owner=%u "
"was not on recovering list, but "
"clearing state anyway\n",
dlm->node_num, res->lockname.len,
res->lockname.name, new_master);
} else if (res->owner == dlm->node_num) {
mlog(0, "(this=%u) res %.*s owner=%u "
"was not on recovering list, "
"owner is THIS node, clearing\n",
dlm->node_num, res->lockname.len,
res->lockname.name, new_master);
} else
continue;
if (!list_empty(&res->recovering)) { if (res->owner != dead_node &&
mlog(0, "%s:%.*s: lockres was " res->owner != dlm->node_num)
"marked RECOVERING, owner=%u\n", continue;
dlm->name, res->lockname.len,
res->lockname.name, res->owner); if (!list_empty(&res->recovering)) {
list_del_init(&res->recovering); list_del_init(&res->recovering);
dlm_lockres_put(res); dlm_lockres_put(res);
}
spin_lock(&res->spinlock);
/* new_master has our reference from
* the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
} }
/* new_master has our reference from
* the lock state sent during recovery */
mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
dlm->name, res->lockname.len, res->lockname.name,
res->owner, new_master);
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
if (__dlm_lockres_has_locks(res))
__dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
} }
} }
} }
@ -2252,12 +2252,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
res->lockname.len, res->lockname.name, freed, dead_node); res->lockname.len, res->lockname.name, freed, dead_node);
__dlm_print_one_lock_resource(res); __dlm_print_one_lock_resource(res);
} }
dlm_lockres_clear_refmap_bit(dead_node, res); dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
} else if (test_bit(dead_node, res->refmap)) { } else if (test_bit(dead_node, res->refmap)) {
mlog(0, "%s:%.*s: dead node %u had a ref, but had " mlog(0, "%s:%.*s: dead node %u had a ref, but had "
"no locks and had not purged before dying\n", dlm->name, "no locks and had not purged before dying\n", dlm->name,
res->lockname.len, res->lockname.name, dead_node); res->lockname.len, res->lockname.name, dead_node);
dlm_lockres_clear_refmap_bit(dead_node, res); dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
} }
/* do not kick thread yet */ /* do not kick thread yet */
@ -2324,9 +2324,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
dlm_revalidate_lvb(dlm, res, dead_node); dlm_revalidate_lvb(dlm, res, dead_node);
if (res->owner == dead_node) { if (res->owner == dead_node) {
if (res->state & DLM_LOCK_RES_DROPPING_REF) { if (res->state & DLM_LOCK_RES_DROPPING_REF) {
mlog(ML_NOTICE, "Ignore %.*s for " mlog(ML_NOTICE, "%s: res %.*s, Skip "
"recovery as it is being freed\n", "recovery as it is being freed\n",
res->lockname.len, dlm->name, res->lockname.len,
res->lockname.name); res->lockname.name);
} else } else
dlm_move_lockres_to_recovery_list(dlm, dlm_move_lockres_to_recovery_list(dlm,

View file

@ -94,24 +94,26 @@ int __dlm_lockres_unused(struct dlm_lock_resource *res)
{ {
int bit; int bit;
assert_spin_locked(&res->spinlock);
if (__dlm_lockres_has_locks(res)) if (__dlm_lockres_has_locks(res))
return 0; return 0;
/* Locks are in the process of being created */
if (res->inflight_locks)
return 0;
if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY)
return 0; return 0;
if (res->state & DLM_LOCK_RES_RECOVERING) if (res->state & DLM_LOCK_RES_RECOVERING)
return 0; return 0;
/* Another node has this resource with this node as the master */
bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) if (bit < O2NM_MAX_NODES)
return 0; return 0;
/*
* since the bit for dlm->node_num is not set, inflight_locks better
* be zero
*/
BUG_ON(res->inflight_locks != 0);
return 1; return 1;
} }
@ -185,8 +187,6 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm,
/* clear our bit from the master's refmap, ignore errors */ /* clear our bit from the master's refmap, ignore errors */
ret = dlm_drop_lockres_ref(dlm, res); ret = dlm_drop_lockres_ref(dlm, res);
if (ret < 0) { if (ret < 0) {
mlog(ML_ERROR, "%s: deref %.*s failed %d\n", dlm->name,
res->lockname.len, res->lockname.name, ret);
if (!dlm_is_host_down(ret)) if (!dlm_is_host_down(ret))
BUG(); BUG();
} }
@ -209,7 +209,7 @@ static void dlm_purge_lockres(struct dlm_ctxt *dlm,
BUG(); BUG();
} }
__dlm_unhash_lockres(res); __dlm_unhash_lockres(dlm, res);
/* lockres is not in the hash now. drop the flag and wake up /* lockres is not in the hash now. drop the flag and wake up
* any processes waiting in dlm_get_lock_resource. */ * any processes waiting in dlm_get_lock_resource. */

View file

@ -1692,7 +1692,7 @@ int ocfs2_open_lock(struct inode *inode)
mlog(0, "inode %llu take PRMODE open lock\n", mlog(0, "inode %llu take PRMODE open lock\n",
(unsigned long long)OCFS2_I(inode)->ip_blkno); (unsigned long long)OCFS2_I(inode)->ip_blkno);
if (ocfs2_mount_local(osb)) if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
goto out; goto out;
lockres = &OCFS2_I(inode)->ip_open_lockres; lockres = &OCFS2_I(inode)->ip_open_lockres;
@ -1718,6 +1718,12 @@ int ocfs2_try_open_lock(struct inode *inode, int write)
(unsigned long long)OCFS2_I(inode)->ip_blkno, (unsigned long long)OCFS2_I(inode)->ip_blkno,
write ? "EXMODE" : "PRMODE"); write ? "EXMODE" : "PRMODE");
if (ocfs2_is_hard_readonly(osb)) {
if (write)
status = -EROFS;
goto out;
}
if (ocfs2_mount_local(osb)) if (ocfs2_mount_local(osb))
goto out; goto out;
@ -2298,7 +2304,7 @@ int ocfs2_inode_lock_full_nested(struct inode *inode,
if (ocfs2_is_hard_readonly(osb)) { if (ocfs2_is_hard_readonly(osb)) {
if (ex) if (ex)
status = -EROFS; status = -EROFS;
goto bail; goto getbh;
} }
if (ocfs2_mount_local(osb)) if (ocfs2_mount_local(osb))
@ -2356,7 +2362,7 @@ int ocfs2_inode_lock_full_nested(struct inode *inode,
mlog_errno(status); mlog_errno(status);
goto bail; goto bail;
} }
getbh:
if (ret_bh) { if (ret_bh) {
status = ocfs2_assign_bh(inode, ret_bh, local_bh); status = ocfs2_assign_bh(inode, ret_bh, local_bh);
if (status < 0) { if (status < 0) {
@ -2628,8 +2634,11 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex)
BUG_ON(!dl); BUG_ON(!dl);
if (ocfs2_is_hard_readonly(osb)) if (ocfs2_is_hard_readonly(osb)) {
return -EROFS; if (ex)
return -EROFS;
return 0;
}
if (ocfs2_mount_local(osb)) if (ocfs2_mount_local(osb))
return 0; return 0;
@ -2647,7 +2656,7 @@ void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
struct ocfs2_dentry_lock *dl = dentry->d_fsdata; struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
if (!ocfs2_mount_local(osb)) if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
} }

View file

@ -832,6 +832,102 @@ int ocfs2_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
return ret; return ret;
} }
int ocfs2_seek_data_hole_offset(struct file *file, loff_t *offset, int origin)
{
struct inode *inode = file->f_mapping->host;
int ret;
unsigned int is_last = 0, is_data = 0;
u16 cs_bits = OCFS2_SB(inode->i_sb)->s_clustersize_bits;
u32 cpos, cend, clen, hole_size;
u64 extoff, extlen;
struct buffer_head *di_bh = NULL;
struct ocfs2_extent_rec rec;
BUG_ON(origin != SEEK_DATA && origin != SEEK_HOLE);
ret = ocfs2_inode_lock(inode, &di_bh, 0);
if (ret) {
mlog_errno(ret);
goto out;
}
down_read(&OCFS2_I(inode)->ip_alloc_sem);
if (*offset >= inode->i_size) {
ret = -ENXIO;
goto out_unlock;
}
if (OCFS2_I(inode)->ip_dyn_features & OCFS2_INLINE_DATA_FL) {
if (origin == SEEK_HOLE)
*offset = inode->i_size;
goto out_unlock;
}
clen = 0;
cpos = *offset >> cs_bits;
cend = ocfs2_clusters_for_bytes(inode->i_sb, inode->i_size);
while (cpos < cend && !is_last) {
ret = ocfs2_get_clusters_nocache(inode, di_bh, cpos, &hole_size,
&rec, &is_last);
if (ret) {
mlog_errno(ret);
goto out_unlock;
}
extoff = cpos;
extoff <<= cs_bits;
if (rec.e_blkno == 0ULL) {
clen = hole_size;
is_data = 0;
} else {
clen = le16_to_cpu(rec.e_leaf_clusters) -
(cpos - le32_to_cpu(rec.e_cpos));
is_data = (rec.e_flags & OCFS2_EXT_UNWRITTEN) ? 0 : 1;
}
if ((!is_data && origin == SEEK_HOLE) ||
(is_data && origin == SEEK_DATA)) {
if (extoff > *offset)
*offset = extoff;
goto out_unlock;
}
if (!is_last)
cpos += clen;
}
if (origin == SEEK_HOLE) {
extoff = cpos;
extoff <<= cs_bits;
extlen = clen;
extlen <<= cs_bits;
if ((extoff + extlen) > inode->i_size)
extlen = inode->i_size - extoff;
extoff += extlen;
if (extoff > *offset)
*offset = extoff;
goto out_unlock;
}
ret = -ENXIO;
out_unlock:
brelse(di_bh);
up_read(&OCFS2_I(inode)->ip_alloc_sem);
ocfs2_inode_unlock(inode, 0);
out:
if (ret && ret != -ENXIO)
ret = -ENXIO;
return ret;
}
int ocfs2_read_virt_blocks(struct inode *inode, u64 v_block, int nr, int ocfs2_read_virt_blocks(struct inode *inode, u64 v_block, int nr,
struct buffer_head *bhs[], int flags, struct buffer_head *bhs[], int flags,
int (*validate)(struct super_block *sb, int (*validate)(struct super_block *sb,

View file

@ -53,6 +53,8 @@ int ocfs2_extent_map_get_blocks(struct inode *inode, u64 v_blkno, u64 *p_blkno,
int ocfs2_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo, int ocfs2_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
u64 map_start, u64 map_len); u64 map_start, u64 map_len);
int ocfs2_seek_data_hole_offset(struct file *file, loff_t *offset, int origin);
int ocfs2_xattr_get_clusters(struct inode *inode, u32 v_cluster, int ocfs2_xattr_get_clusters(struct inode *inode, u32 v_cluster,
u32 *p_cluster, u32 *num_clusters, u32 *p_cluster, u32 *num_clusters,
struct ocfs2_extent_list *el, struct ocfs2_extent_list *el,

View file

@ -1950,6 +1950,9 @@ static int __ocfs2_change_file_space(struct file *file, struct inode *inode,
if (ret < 0) if (ret < 0)
mlog_errno(ret); mlog_errno(ret);
if (file->f_flags & O_SYNC)
handle->h_sync = 1;
ocfs2_commit_trans(osb, handle); ocfs2_commit_trans(osb, handle);
out_inode_unlock: out_inode_unlock:
@ -2052,6 +2055,23 @@ int ocfs2_check_range_for_refcount(struct inode *inode, loff_t pos,
return ret; return ret;
} }
static void ocfs2_aiodio_wait(struct inode *inode)
{
wait_queue_head_t *wq = ocfs2_ioend_wq(inode);
wait_event(*wq, (atomic_read(&OCFS2_I(inode)->ip_unaligned_aio) == 0));
}
static int ocfs2_is_io_unaligned(struct inode *inode, size_t count, loff_t pos)
{
int blockmask = inode->i_sb->s_blocksize - 1;
loff_t final_size = pos + count;
if ((pos & blockmask) || (final_size & blockmask))
return 1;
return 0;
}
static int ocfs2_prepare_inode_for_refcount(struct inode *inode, static int ocfs2_prepare_inode_for_refcount(struct inode *inode,
struct file *file, struct file *file,
loff_t pos, size_t count, loff_t pos, size_t count,
@ -2230,6 +2250,7 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
int full_coherency = !(osb->s_mount_opt & int full_coherency = !(osb->s_mount_opt &
OCFS2_MOUNT_COHERENCY_BUFFERED); OCFS2_MOUNT_COHERENCY_BUFFERED);
int unaligned_dio = 0;
trace_ocfs2_file_aio_write(inode, file, file->f_path.dentry, trace_ocfs2_file_aio_write(inode, file, file->f_path.dentry,
(unsigned long long)OCFS2_I(inode)->ip_blkno, (unsigned long long)OCFS2_I(inode)->ip_blkno,
@ -2297,6 +2318,10 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
goto out; goto out;
} }
if (direct_io && !is_sync_kiocb(iocb))
unaligned_dio = ocfs2_is_io_unaligned(inode, iocb->ki_left,
*ppos);
/* /*
* We can't complete the direct I/O as requested, fall back to * We can't complete the direct I/O as requested, fall back to
* buffered I/O. * buffered I/O.
@ -2311,6 +2336,18 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
goto relock; goto relock;
} }
if (unaligned_dio) {
/*
* Wait on previous unaligned aio to complete before
* proceeding.
*/
ocfs2_aiodio_wait(inode);
/* Mark the iocb as needing a decrement in ocfs2_dio_end_io */
atomic_inc(&OCFS2_I(inode)->ip_unaligned_aio);
ocfs2_iocb_set_unaligned_aio(iocb);
}
/* /*
* To later detect whether a journal commit for sync writes is * To later detect whether a journal commit for sync writes is
* necessary, we sample i_size, and cluster count here. * necessary, we sample i_size, and cluster count here.
@ -2382,8 +2419,12 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
if ((ret == -EIOCBQUEUED) || (!ocfs2_iocb_is_rw_locked(iocb))) { if ((ret == -EIOCBQUEUED) || (!ocfs2_iocb_is_rw_locked(iocb))) {
rw_level = -1; rw_level = -1;
have_alloc_sem = 0; have_alloc_sem = 0;
unaligned_dio = 0;
} }
if (unaligned_dio)
atomic_dec(&OCFS2_I(inode)->ip_unaligned_aio);
out: out:
if (rw_level != -1) if (rw_level != -1)
ocfs2_rw_unlock(inode, rw_level); ocfs2_rw_unlock(inode, rw_level);
@ -2591,6 +2632,57 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
return ret; return ret;
} }
/* Refer generic_file_llseek_unlocked() */
static loff_t ocfs2_file_llseek(struct file *file, loff_t offset, int origin)
{
struct inode *inode = file->f_mapping->host;
int ret = 0;
mutex_lock(&inode->i_mutex);
switch (origin) {
case SEEK_SET:
break;
case SEEK_END:
offset += inode->i_size;
break;
case SEEK_CUR:
if (offset == 0) {
offset = file->f_pos;
goto out;
}
offset += file->f_pos;
break;
case SEEK_DATA:
case SEEK_HOLE:
ret = ocfs2_seek_data_hole_offset(file, &offset, origin);
if (ret)
goto out;
break;
default:
ret = -EINVAL;
goto out;
}
if (offset < 0 && !(file->f_mode & FMODE_UNSIGNED_OFFSET))
ret = -EINVAL;
if (!ret && offset > inode->i_sb->s_maxbytes)
ret = -EINVAL;
if (ret)
goto out;
if (offset != file->f_pos) {
file->f_pos = offset;
file->f_version = 0;
}
out:
mutex_unlock(&inode->i_mutex);
if (ret)
return ret;
return offset;
}
const struct inode_operations ocfs2_file_iops = { const struct inode_operations ocfs2_file_iops = {
.setattr = ocfs2_setattr, .setattr = ocfs2_setattr,
.getattr = ocfs2_getattr, .getattr = ocfs2_getattr,
@ -2615,7 +2707,7 @@ const struct inode_operations ocfs2_special_file_iops = {
* ocfs2_fops_no_plocks and ocfs2_dops_no_plocks! * ocfs2_fops_no_plocks and ocfs2_dops_no_plocks!
*/ */
const struct file_operations ocfs2_fops = { const struct file_operations ocfs2_fops = {
.llseek = generic_file_llseek, .llseek = ocfs2_file_llseek,
.read = do_sync_read, .read = do_sync_read,
.write = do_sync_write, .write = do_sync_write,
.mmap = ocfs2_mmap, .mmap = ocfs2_mmap,
@ -2663,7 +2755,7 @@ const struct file_operations ocfs2_dops = {
* the cluster. * the cluster.
*/ */
const struct file_operations ocfs2_fops_no_plocks = { const struct file_operations ocfs2_fops_no_plocks = {
.llseek = generic_file_llseek, .llseek = ocfs2_file_llseek,
.read = do_sync_read, .read = do_sync_read,
.write = do_sync_write, .write = do_sync_write,
.mmap = ocfs2_mmap, .mmap = ocfs2_mmap,

View file

@ -951,7 +951,7 @@ static void ocfs2_cleanup_delete_inode(struct inode *inode,
trace_ocfs2_cleanup_delete_inode( trace_ocfs2_cleanup_delete_inode(
(unsigned long long)OCFS2_I(inode)->ip_blkno, sync_data); (unsigned long long)OCFS2_I(inode)->ip_blkno, sync_data);
if (sync_data) if (sync_data)
write_inode_now(inode, 1); filemap_write_and_wait(inode->i_mapping);
truncate_inode_pages(&inode->i_data, 0); truncate_inode_pages(&inode->i_data, 0);
} }

View file

@ -43,6 +43,9 @@ struct ocfs2_inode_info
/* protects extended attribute changes on this inode */ /* protects extended attribute changes on this inode */
struct rw_semaphore ip_xattr_sem; struct rw_semaphore ip_xattr_sem;
/* Number of outstanding AIO's which are not page aligned */
atomic_t ip_unaligned_aio;
/* These fields are protected by ip_lock */ /* These fields are protected by ip_lock */
spinlock_t ip_lock; spinlock_t ip_lock;
u32 ip_open_count; u32 ip_open_count;

View file

@ -122,7 +122,7 @@ static int ocfs2_set_inode_attr(struct inode *inode, unsigned flags,
if ((oldflags & OCFS2_IMMUTABLE_FL) || ((flags ^ oldflags) & if ((oldflags & OCFS2_IMMUTABLE_FL) || ((flags ^ oldflags) &
(OCFS2_APPEND_FL | OCFS2_IMMUTABLE_FL))) { (OCFS2_APPEND_FL | OCFS2_IMMUTABLE_FL))) {
if (!capable(CAP_LINUX_IMMUTABLE)) if (!capable(CAP_LINUX_IMMUTABLE))
goto bail_unlock; goto bail_commit;
} }
ocfs2_inode->ip_attr = flags; ocfs2_inode->ip_attr = flags;
@ -132,6 +132,7 @@ static int ocfs2_set_inode_attr(struct inode *inode, unsigned flags,
if (status < 0) if (status < 0)
mlog_errno(status); mlog_errno(status);
bail_commit:
ocfs2_commit_trans(osb, handle); ocfs2_commit_trans(osb, handle);
bail_unlock: bail_unlock:
ocfs2_inode_unlock(inode, 1); ocfs2_inode_unlock(inode, 1);
@ -381,7 +382,7 @@ int ocfs2_info_handle_freeinode(struct inode *inode,
if (!oifi) { if (!oifi) {
status = -ENOMEM; status = -ENOMEM;
mlog_errno(status); mlog_errno(status);
goto bail; goto out_err;
} }
if (o2info_from_user(*oifi, req)) if (o2info_from_user(*oifi, req))
@ -431,7 +432,7 @@ int ocfs2_info_handle_freeinode(struct inode *inode,
o2info_set_request_error(&oifi->ifi_req, req); o2info_set_request_error(&oifi->ifi_req, req);
kfree(oifi); kfree(oifi);
out_err:
return status; return status;
} }
@ -666,7 +667,7 @@ int ocfs2_info_handle_freefrag(struct inode *inode,
if (!oiff) { if (!oiff) {
status = -ENOMEM; status = -ENOMEM;
mlog_errno(status); mlog_errno(status);
goto bail; goto out_err;
} }
if (o2info_from_user(*oiff, req)) if (o2info_from_user(*oiff, req))
@ -716,7 +717,7 @@ int ocfs2_info_handle_freefrag(struct inode *inode,
o2info_set_request_error(&oiff->iff_req, req); o2info_set_request_error(&oiff->iff_req, req);
kfree(oiff); kfree(oiff);
out_err:
return status; return status;
} }

View file

@ -1544,9 +1544,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
/* we need to run complete recovery for offline orphan slots */ /* we need to run complete recovery for offline orphan slots */
ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", printk(KERN_NOTICE "ocfs2: Begin replay journal (node %d, slot %d) on "\
node_num, slot_num, "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); MINOR(osb->sb->s_dev));
OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters); OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters);
@ -1601,6 +1601,9 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
jbd2_journal_destroy(journal); jbd2_journal_destroy(journal);
printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\
"device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev),
MINOR(osb->sb->s_dev));
done: done:
/* drop the lock on this nodes journal */ /* drop the lock on this nodes journal */
if (got_lock) if (got_lock)
@ -1808,6 +1811,20 @@ static inline unsigned long ocfs2_orphan_scan_timeout(void)
* every slot, queuing a recovery of the slot on the ocfs2_wq thread. This * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This
* is done to catch any orphans that are left over in orphan directories. * is done to catch any orphans that are left over in orphan directories.
* *
* It scans all slots, even ones that are in use. It does so to handle the
* case described below:
*
* Node 1 has an inode it was using. The dentry went away due to memory
* pressure. Node 1 closes the inode, but it's on the free list. The node
* has the open lock.
* Node 2 unlinks the inode. It grabs the dentry lock to notify others,
* but node 1 has no dentry and doesn't get the message. It trylocks the
* open lock, sees that another node has a PR, and does nothing.
* Later node 2 runs its orphan dir. It igets the inode, trylocks the
* open lock, sees the PR still, and does nothing.
* Basically, we have to trigger an orphan iput on node 1. The only way
* for this to happen is if node 1 runs node 2's orphan dir.
*
* ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT
* seconds. It gets an EX lock on os_lockres and checks sequence number * seconds. It gets an EX lock on os_lockres and checks sequence number
* stored in LVB. If the sequence number has changed, it means some other * stored in LVB. If the sequence number has changed, it means some other

View file

@ -441,10 +441,11 @@ static inline int ocfs2_mknod_credits(struct super_block *sb, int is_dir,
#define OCFS2_SIMPLE_DIR_EXTEND_CREDITS (2) #define OCFS2_SIMPLE_DIR_EXTEND_CREDITS (2)
/* file update (nlink, etc) + directory mtime/ctime + dir entry block + quota /* file update (nlink, etc) + directory mtime/ctime + dir entry block + quota
* update on dir + index leaf + dx root update for free list */ * update on dir + index leaf + dx root update for free list +
* previous dirblock update in the free list */
static inline int ocfs2_link_credits(struct super_block *sb) static inline int ocfs2_link_credits(struct super_block *sb)
{ {
return 2*OCFS2_INODE_UPDATE_CREDITS + 3 + return 2*OCFS2_INODE_UPDATE_CREDITS + 4 +
ocfs2_quota_trans_credits(sb); ocfs2_quota_trans_credits(sb);
} }

View file

@ -61,7 +61,7 @@ static int ocfs2_fault(struct vm_area_struct *area, struct vm_fault *vmf)
static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh, static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh,
struct page *page) struct page *page)
{ {
int ret; int ret = VM_FAULT_NOPAGE;
struct inode *inode = file->f_path.dentry->d_inode; struct inode *inode = file->f_path.dentry->d_inode;
struct address_space *mapping = inode->i_mapping; struct address_space *mapping = inode->i_mapping;
loff_t pos = page_offset(page); loff_t pos = page_offset(page);
@ -71,32 +71,25 @@ static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh,
void *fsdata; void *fsdata;
loff_t size = i_size_read(inode); loff_t size = i_size_read(inode);
/*
* Another node might have truncated while we were waiting on
* cluster locks.
* We don't check size == 0 before the shift. This is borrowed
* from do_generic_file_read.
*/
last_index = (size - 1) >> PAGE_CACHE_SHIFT; last_index = (size - 1) >> PAGE_CACHE_SHIFT;
if (unlikely(!size || page->index > last_index)) {
ret = -EINVAL;
goto out;
}
/* /*
* The i_size check above doesn't catch the case where nodes * There are cases that lead to the page no longer bebongs to the
* truncated and then re-extended the file. We'll re-check the * mapping.
* page mapping after taking the page lock inside of * 1) pagecache truncates locally due to memory pressure.
* ocfs2_write_begin_nolock(). * 2) pagecache truncates when another is taking EX lock against
* inode lock. see ocfs2_data_convert_worker.
*
* The i_size check doesn't catch the case where nodes truncated and
* then re-extended the file. We'll re-check the page mapping after
* taking the page lock inside of ocfs2_write_begin_nolock().
*
* Let VM retry with these cases.
*/ */
if (!PageUptodate(page) || page->mapping != inode->i_mapping) { if ((page->mapping != inode->i_mapping) ||
/* (!PageUptodate(page)) ||
* the page has been umapped in ocfs2_data_downconvert_worker. (page_offset(page) >= size))
* So return 0 here and let VFS retry.
*/
ret = 0;
goto out; goto out;
}
/* /*
* Call ocfs2_write_begin() and ocfs2_write_end() to take * Call ocfs2_write_begin() and ocfs2_write_end() to take
@ -116,17 +109,21 @@ static int __ocfs2_page_mkwrite(struct file *file, struct buffer_head *di_bh,
if (ret) { if (ret) {
if (ret != -ENOSPC) if (ret != -ENOSPC)
mlog_errno(ret); mlog_errno(ret);
if (ret == -ENOMEM)
ret = VM_FAULT_OOM;
else
ret = VM_FAULT_SIGBUS;
goto out; goto out;
} }
ret = ocfs2_write_end_nolock(mapping, pos, len, len, locked_page, if (!locked_page) {
fsdata); ret = VM_FAULT_NOPAGE;
if (ret < 0) {
mlog_errno(ret);
goto out; goto out;
} }
ret = ocfs2_write_end_nolock(mapping, pos, len, len, locked_page,
fsdata);
BUG_ON(ret != len); BUG_ON(ret != len);
ret = 0; ret = VM_FAULT_LOCKED;
out: out:
return ret; return ret;
} }
@ -168,8 +165,6 @@ static int ocfs2_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
out: out:
ocfs2_unblock_signals(&oldset); ocfs2_unblock_signals(&oldset);
if (ret)
ret = VM_FAULT_SIGBUS;
return ret; return ret;
} }

View file

@ -745,7 +745,7 @@ static int ocfs2_move_extent(struct ocfs2_move_extents_context *context,
*/ */
ocfs2_probe_alloc_group(inode, gd_bh, &goal_bit, len, move_max_hop, ocfs2_probe_alloc_group(inode, gd_bh, &goal_bit, len, move_max_hop,
new_phys_cpos); new_phys_cpos);
if (!new_phys_cpos) { if (!*new_phys_cpos) {
ret = -ENOSPC; ret = -ENOSPC;
goto out_commit; goto out_commit;
} }

View file

@ -836,18 +836,65 @@ static inline unsigned int ocfs2_clusters_to_megabytes(struct super_block *sb,
static inline void _ocfs2_set_bit(unsigned int bit, unsigned long *bitmap) static inline void _ocfs2_set_bit(unsigned int bit, unsigned long *bitmap)
{ {
__test_and_set_bit_le(bit, bitmap); __set_bit_le(bit, bitmap);
} }
#define ocfs2_set_bit(bit, addr) _ocfs2_set_bit((bit), (unsigned long *)(addr)) #define ocfs2_set_bit(bit, addr) _ocfs2_set_bit((bit), (unsigned long *)(addr))
static inline void _ocfs2_clear_bit(unsigned int bit, unsigned long *bitmap) static inline void _ocfs2_clear_bit(unsigned int bit, unsigned long *bitmap)
{ {
__test_and_clear_bit_le(bit, bitmap); __clear_bit_le(bit, bitmap);
} }
#define ocfs2_clear_bit(bit, addr) _ocfs2_clear_bit((bit), (unsigned long *)(addr)) #define ocfs2_clear_bit(bit, addr) _ocfs2_clear_bit((bit), (unsigned long *)(addr))
#define ocfs2_test_bit test_bit_le #define ocfs2_test_bit test_bit_le
#define ocfs2_find_next_zero_bit find_next_zero_bit_le #define ocfs2_find_next_zero_bit find_next_zero_bit_le
#define ocfs2_find_next_bit find_next_bit_le #define ocfs2_find_next_bit find_next_bit_le
static inline void *correct_addr_and_bit_unaligned(int *bit, void *addr)
{
#if BITS_PER_LONG == 64
*bit += ((unsigned long) addr & 7UL) << 3;
addr = (void *) ((unsigned long) addr & ~7UL);
#elif BITS_PER_LONG == 32
*bit += ((unsigned long) addr & 3UL) << 3;
addr = (void *) ((unsigned long) addr & ~3UL);
#else
#error "how many bits you are?!"
#endif
return addr;
}
static inline void ocfs2_set_bit_unaligned(int bit, void *bitmap)
{
bitmap = correct_addr_and_bit_unaligned(&bit, bitmap);
ocfs2_set_bit(bit, bitmap);
}
static inline void ocfs2_clear_bit_unaligned(int bit, void *bitmap)
{
bitmap = correct_addr_and_bit_unaligned(&bit, bitmap);
ocfs2_clear_bit(bit, bitmap);
}
static inline int ocfs2_test_bit_unaligned(int bit, void *bitmap)
{
bitmap = correct_addr_and_bit_unaligned(&bit, bitmap);
return ocfs2_test_bit(bit, bitmap);
}
static inline int ocfs2_find_next_zero_bit_unaligned(void *bitmap, int max,
int start)
{
int fix = 0, ret, tmpmax;
bitmap = correct_addr_and_bit_unaligned(&fix, bitmap);
tmpmax = max + fix;
start += fix;
ret = ocfs2_find_next_zero_bit(bitmap, tmpmax, start) - fix;
if (ret > max)
return max;
return ret;
}
#endif /* OCFS2_H */ #endif /* OCFS2_H */

View file

@ -404,7 +404,9 @@ struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery(
int status = 0; int status = 0;
struct ocfs2_quota_recovery *rec; struct ocfs2_quota_recovery *rec;
mlog(ML_NOTICE, "Beginning quota recovery in slot %u\n", slot_num); printk(KERN_NOTICE "ocfs2: Beginning quota recovery on device (%s) for "
"slot %u\n", osb->dev_str, slot_num);
rec = ocfs2_alloc_quota_recovery(); rec = ocfs2_alloc_quota_recovery();
if (!rec) if (!rec)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
@ -549,8 +551,8 @@ static int ocfs2_recover_local_quota_file(struct inode *lqinode,
goto out_commit; goto out_commit;
} }
lock_buffer(qbh); lock_buffer(qbh);
WARN_ON(!ocfs2_test_bit(bit, dchunk->dqc_bitmap)); WARN_ON(!ocfs2_test_bit_unaligned(bit, dchunk->dqc_bitmap));
ocfs2_clear_bit(bit, dchunk->dqc_bitmap); ocfs2_clear_bit_unaligned(bit, dchunk->dqc_bitmap);
le32_add_cpu(&dchunk->dqc_free, 1); le32_add_cpu(&dchunk->dqc_free, 1);
unlock_buffer(qbh); unlock_buffer(qbh);
ocfs2_journal_dirty(handle, qbh); ocfs2_journal_dirty(handle, qbh);
@ -596,7 +598,9 @@ int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
struct inode *lqinode; struct inode *lqinode;
unsigned int flags; unsigned int flags;
mlog(ML_NOTICE, "Finishing quota recovery in slot %u\n", slot_num); printk(KERN_NOTICE "ocfs2: Finishing quota recovery on device (%s) for "
"slot %u\n", osb->dev_str, slot_num);
mutex_lock(&sb_dqopt(sb)->dqonoff_mutex); mutex_lock(&sb_dqopt(sb)->dqonoff_mutex);
for (type = 0; type < MAXQUOTAS; type++) { for (type = 0; type < MAXQUOTAS; type++) {
if (list_empty(&(rec->r_list[type]))) if (list_empty(&(rec->r_list[type])))
@ -612,8 +616,9 @@ int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
/* Someone else is holding the lock? Then he must be /* Someone else is holding the lock? Then he must be
* doing the recovery. Just skip the file... */ * doing the recovery. Just skip the file... */
if (status == -EAGAIN) { if (status == -EAGAIN) {
mlog(ML_NOTICE, "skipping quota recovery for slot %d " printk(KERN_NOTICE "ocfs2: Skipping quota recovery on "
"because quota file is locked.\n", slot_num); "device (%s) for slot %d because quota file is "
"locked.\n", osb->dev_str, slot_num);
status = 0; status = 0;
goto out_put; goto out_put;
} else if (status < 0) { } else if (status < 0) {
@ -944,7 +949,7 @@ static struct ocfs2_quota_chunk *ocfs2_find_free_entry(struct super_block *sb,
* ol_quota_entries_per_block(sb); * ol_quota_entries_per_block(sb);
} }
found = ocfs2_find_next_zero_bit(dchunk->dqc_bitmap, len, 0); found = ocfs2_find_next_zero_bit_unaligned(dchunk->dqc_bitmap, len, 0);
/* We failed? */ /* We failed? */
if (found == len) { if (found == len) {
mlog(ML_ERROR, "Did not find empty entry in chunk %d with %u" mlog(ML_ERROR, "Did not find empty entry in chunk %d with %u"
@ -1208,7 +1213,7 @@ static void olq_alloc_dquot(struct buffer_head *bh, void *private)
struct ocfs2_local_disk_chunk *dchunk; struct ocfs2_local_disk_chunk *dchunk;
dchunk = (struct ocfs2_local_disk_chunk *)bh->b_data; dchunk = (struct ocfs2_local_disk_chunk *)bh->b_data;
ocfs2_set_bit(*offset, dchunk->dqc_bitmap); ocfs2_set_bit_unaligned(*offset, dchunk->dqc_bitmap);
le32_add_cpu(&dchunk->dqc_free, -1); le32_add_cpu(&dchunk->dqc_free, -1);
} }
@ -1289,7 +1294,7 @@ int ocfs2_local_release_dquot(handle_t *handle, struct dquot *dquot)
(od->dq_chunk->qc_headerbh->b_data); (od->dq_chunk->qc_headerbh->b_data);
/* Mark structure as freed */ /* Mark structure as freed */
lock_buffer(od->dq_chunk->qc_headerbh); lock_buffer(od->dq_chunk->qc_headerbh);
ocfs2_clear_bit(offset, dchunk->dqc_bitmap); ocfs2_clear_bit_unaligned(offset, dchunk->dqc_bitmap);
le32_add_cpu(&dchunk->dqc_free, 1); le32_add_cpu(&dchunk->dqc_free, 1);
unlock_buffer(od->dq_chunk->qc_headerbh); unlock_buffer(od->dq_chunk->qc_headerbh);
ocfs2_journal_dirty(handle, od->dq_chunk->qc_headerbh); ocfs2_journal_dirty(handle, od->dq_chunk->qc_headerbh);

View file

@ -493,8 +493,8 @@ int ocfs2_find_slot(struct ocfs2_super *osb)
goto bail; goto bail;
} }
} else } else
mlog(ML_NOTICE, "slot %d is already allocated to this node!\n", printk(KERN_INFO "ocfs2: Slot %d on device (%s) was already "
slot); "allocated to this node!\n", slot, osb->dev_str);
ocfs2_set_slot(si, slot, osb->node_num); ocfs2_set_slot(si, slot, osb->node_num);
osb->slot_num = slot; osb->slot_num = slot;

View file

@ -28,6 +28,7 @@
#include "cluster/masklog.h" #include "cluster/masklog.h"
#include "cluster/nodemanager.h" #include "cluster/nodemanager.h"
#include "cluster/heartbeat.h" #include "cluster/heartbeat.h"
#include "cluster/tcp.h"
#include "stackglue.h" #include "stackglue.h"
@ -255,6 +256,61 @@ static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb)
dlm_print_one_lock(lksb->lksb_o2dlm.lockid); dlm_print_one_lock(lksb->lksb_o2dlm.lockid);
} }
/*
* Check if this node is heartbeating and is connected to all other
* heartbeating nodes.
*/
static int o2cb_cluster_check(void)
{
u8 node_num;
int i;
unsigned long hbmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long netmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
node_num = o2nm_this_node();
if (node_num == O2NM_MAX_NODES) {
printk(KERN_ERR "o2cb: This node has not been configured.\n");
return -EINVAL;
}
/*
* o2dlm expects o2net sockets to be created. If not, then
* dlm_join_domain() fails with a stack of errors which are both cryptic
* and incomplete. The idea here is to detect upfront whether we have
* managed to connect to all nodes or not. If not, then list the nodes
* to allow the user to check the configuration (incorrect IP, firewall,
* etc.) Yes, this is racy. But its not the end of the world.
*/
#define O2CB_MAP_STABILIZE_COUNT 60
for (i = 0; i < O2CB_MAP_STABILIZE_COUNT; ++i) {
o2hb_fill_node_map(hbmap, sizeof(hbmap));
if (!test_bit(node_num, hbmap)) {
printk(KERN_ERR "o2cb: %s heartbeat has not been "
"started.\n", (o2hb_global_heartbeat_active() ?
"Global" : "Local"));
return -EINVAL;
}
o2net_fill_node_map(netmap, sizeof(netmap));
/* Force set the current node to allow easy compare */
set_bit(node_num, netmap);
if (!memcmp(hbmap, netmap, sizeof(hbmap)))
return 0;
if (i < O2CB_MAP_STABILIZE_COUNT)
msleep(1000);
}
printk(KERN_ERR "o2cb: This node could not connect to nodes:");
i = -1;
while ((i = find_next_bit(hbmap, O2NM_MAX_NODES,
i + 1)) < O2NM_MAX_NODES) {
if (!test_bit(i, netmap))
printk(" %u", i);
}
printk(".\n");
return -ENOTCONN;
}
/* /*
* Called from the dlm when it's about to evict a node. This is how the * Called from the dlm when it's about to evict a node. This is how the
* classic stack signals node death. * classic stack signals node death.
@ -263,8 +319,8 @@ static void o2dlm_eviction_cb(int node_num, void *data)
{ {
struct ocfs2_cluster_connection *conn = data; struct ocfs2_cluster_connection *conn = data;
mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n", printk(KERN_NOTICE "o2cb: o2dlm has evicted node %d from domain %.*s\n",
node_num, conn->cc_namelen, conn->cc_name); node_num, conn->cc_namelen, conn->cc_name);
conn->cc_recovery_handler(node_num, conn->cc_recovery_data); conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
} }
@ -280,12 +336,11 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn)
BUG_ON(conn == NULL); BUG_ON(conn == NULL);
BUG_ON(conn->cc_proto == NULL); BUG_ON(conn->cc_proto == NULL);
/* for now we only have one cluster/node, make sure we see it /* Ensure cluster stack is up and all nodes are connected */
* in the heartbeat universe */ rc = o2cb_cluster_check();
if (!o2hb_check_local_node_heartbeating()) { if (rc) {
if (o2hb_global_heartbeat_active()) printk(KERN_ERR "o2cb: Cluster check failed. Fix errors "
mlog(ML_ERROR, "Global heartbeat not started\n"); "before retrying.\n");
rc = -EINVAL;
goto out; goto out;
} }

View file

@ -54,6 +54,7 @@
#include "ocfs1_fs_compat.h" #include "ocfs1_fs_compat.h"
#include "alloc.h" #include "alloc.h"
#include "aops.h"
#include "blockcheck.h" #include "blockcheck.h"
#include "dlmglue.h" #include "dlmglue.h"
#include "export.h" #include "export.h"
@ -1107,9 +1108,9 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
ocfs2_set_ro_flag(osb, 1); ocfs2_set_ro_flag(osb, 1);
printk(KERN_NOTICE "Readonly device detected. No cluster " printk(KERN_NOTICE "ocfs2: Readonly device (%s) detected. "
"services will be utilized for this mount. Recovery " "Cluster services will not be used for this mount. "
"will be skipped.\n"); "Recovery will be skipped.\n", osb->dev_str);
} }
if (!ocfs2_is_hard_readonly(osb)) { if (!ocfs2_is_hard_readonly(osb)) {
@ -1616,12 +1617,17 @@ static int ocfs2_show_options(struct seq_file *s, struct vfsmount *mnt)
return 0; return 0;
} }
wait_queue_head_t ocfs2__ioend_wq[OCFS2_IOEND_WQ_HASH_SZ];
static int __init ocfs2_init(void) static int __init ocfs2_init(void)
{ {
int status; int status, i;
ocfs2_print_version(); ocfs2_print_version();
for (i = 0; i < OCFS2_IOEND_WQ_HASH_SZ; i++)
init_waitqueue_head(&ocfs2__ioend_wq[i]);
status = init_ocfs2_uptodate_cache(); status = init_ocfs2_uptodate_cache();
if (status < 0) { if (status < 0) {
mlog_errno(status); mlog_errno(status);
@ -1760,7 +1766,7 @@ static void ocfs2_inode_init_once(void *data)
ocfs2_extent_map_init(&oi->vfs_inode); ocfs2_extent_map_init(&oi->vfs_inode);
INIT_LIST_HEAD(&oi->ip_io_markers); INIT_LIST_HEAD(&oi->ip_io_markers);
oi->ip_dir_start_lookup = 0; oi->ip_dir_start_lookup = 0;
atomic_set(&oi->ip_unaligned_aio, 0);
init_rwsem(&oi->ip_alloc_sem); init_rwsem(&oi->ip_alloc_sem);
init_rwsem(&oi->ip_xattr_sem); init_rwsem(&oi->ip_xattr_sem);
mutex_init(&oi->ip_io_mutex); mutex_init(&oi->ip_io_mutex);
@ -1974,7 +1980,8 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
* If we failed before we got a uuid_str yet, we can't stop * If we failed before we got a uuid_str yet, we can't stop
* heartbeat. Otherwise, do it. * heartbeat. Otherwise, do it.
*/ */
if (!mnt_err && !ocfs2_mount_local(osb) && osb->uuid_str) if (!mnt_err && !ocfs2_mount_local(osb) && osb->uuid_str &&
!ocfs2_is_hard_readonly(osb))
hangup_needed = 1; hangup_needed = 1;
if (osb->cconn) if (osb->cconn)
@ -2353,7 +2360,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
mlog_errno(status); mlog_errno(status);
goto bail; goto bail;
} }
cleancache_init_shared_fs((char *)&uuid_net_key, sb); cleancache_init_shared_fs((char *)&di->id2.i_super.s_uuid, sb);
bail: bail:
return status; return status;
@ -2462,8 +2469,8 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
goto finally; goto finally;
} }
} else { } else {
mlog(ML_NOTICE, "File system was not unmounted cleanly, " printk(KERN_NOTICE "ocfs2: File system on device (%s) was not "
"recovering volume.\n"); "unmounted cleanly, recovering it.\n", osb->dev_str);
} }
local = ocfs2_mount_local(osb); local = ocfs2_mount_local(osb);

View file

@ -2376,16 +2376,18 @@ static int ocfs2_remove_value_outside(struct inode*inode,
} }
ret = ocfs2_xattr_value_truncate(inode, vb, 0, &ctxt); ret = ocfs2_xattr_value_truncate(inode, vb, 0, &ctxt);
if (ret < 0) {
mlog_errno(ret);
break;
}
ocfs2_commit_trans(osb, ctxt.handle); ocfs2_commit_trans(osb, ctxt.handle);
if (ctxt.meta_ac) { if (ctxt.meta_ac) {
ocfs2_free_alloc_context(ctxt.meta_ac); ocfs2_free_alloc_context(ctxt.meta_ac);
ctxt.meta_ac = NULL; ctxt.meta_ac = NULL;
} }
if (ret < 0) {
mlog_errno(ret);
break;
}
} }
if (ctxt.meta_ac) if (ctxt.meta_ac)