Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: add recovery callbacks
  dlm: add node slots and generation
  dlm: move recovery barrier calls
  dlm: convert rsb list to rb_tree
This commit is contained in:
Linus Torvalds 2012-01-10 14:55:55 -08:00
commit 49d41bae46
17 changed files with 945 additions and 274 deletions

View file

@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -17,6 +17,7 @@
#include <linux/slab.h>
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/dlmconstants.h>
#include <net/ipv6.h>
#include <net/sock.h>
@ -36,6 +37,7 @@
static struct config_group *space_list;
static struct config_group *comm_list;
static struct dlm_comm *local_comm;
static uint32_t dlm_comm_count;
struct dlm_clusters;
struct dlm_cluster;
@ -103,6 +105,8 @@ struct dlm_cluster {
unsigned int cl_timewarn_cs;
unsigned int cl_waitwarn_us;
unsigned int cl_new_rsb_count;
unsigned int cl_recover_callbacks;
char cl_cluster_name[DLM_LOCKSPACE_LEN];
};
enum {
@ -118,6 +122,8 @@ enum {
CLUSTER_ATTR_TIMEWARN_CS,
CLUSTER_ATTR_WAITWARN_US,
CLUSTER_ATTR_NEW_RSB_COUNT,
CLUSTER_ATTR_RECOVER_CALLBACKS,
CLUSTER_ATTR_CLUSTER_NAME,
};
struct cluster_attribute {
@ -126,6 +132,27 @@ struct cluster_attribute {
ssize_t (*store)(struct dlm_cluster *, const char *, size_t);
};
static ssize_t cluster_cluster_name_read(struct dlm_cluster *cl, char *buf)
{
return sprintf(buf, "%s\n", cl->cl_cluster_name);
}
static ssize_t cluster_cluster_name_write(struct dlm_cluster *cl,
const char *buf, size_t len)
{
strncpy(dlm_config.ci_cluster_name, buf, DLM_LOCKSPACE_LEN);
strncpy(cl->cl_cluster_name, buf, DLM_LOCKSPACE_LEN);
return len;
}
static struct cluster_attribute cluster_attr_cluster_name = {
.attr = { .ca_owner = THIS_MODULE,
.ca_name = "cluster_name",
.ca_mode = S_IRUGO | S_IWUSR },
.show = cluster_cluster_name_read,
.store = cluster_cluster_name_write,
};
static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
int *info_field, int check_zero,
const char *buf, size_t len)
@ -171,6 +198,7 @@ CLUSTER_ATTR(protocol, 0);
CLUSTER_ATTR(timewarn_cs, 1);
CLUSTER_ATTR(waitwarn_us, 0);
CLUSTER_ATTR(new_rsb_count, 0);
CLUSTER_ATTR(recover_callbacks, 0);
static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@ -185,6 +213,8 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
[CLUSTER_ATTR_RECOVER_CALLBACKS] = &cluster_attr_recover_callbacks.attr,
[CLUSTER_ATTR_CLUSTER_NAME] = &cluster_attr_cluster_name.attr,
NULL,
};
@ -293,6 +323,7 @@ struct dlm_comms {
struct dlm_comm {
struct config_item item;
int seq;
int nodeid;
int local;
int addr_count;
@ -309,6 +340,7 @@ struct dlm_node {
int nodeid;
int weight;
int new;
int comm_seq; /* copy of cm->seq when nd->nodeid is set */
};
static struct configfs_group_operations clusters_ops = {
@ -455,6 +487,9 @@ static struct config_group *make_cluster(struct config_group *g,
cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks;
memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name,
DLM_LOCKSPACE_LEN);
space_list = &sps->ss_group;
comm_list = &cms->cs_group;
@ -558,6 +593,11 @@ static struct config_item *make_comm(struct config_group *g, const char *name)
return ERR_PTR(-ENOMEM);
config_item_init_type_name(&cm->item, name, &comm_type);
cm->seq = dlm_comm_count++;
if (!cm->seq)
cm->seq = dlm_comm_count++;
cm->nodeid = -1;
cm->local = 0;
cm->addr_count = 0;
@ -801,7 +841,10 @@ static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf)
static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf,
size_t len)
{
uint32_t seq = 0;
nd->nodeid = simple_strtol(buf, NULL, 0);
dlm_comm_seq(nd->nodeid, &seq);
nd->comm_seq = seq;
return len;
}
@ -908,13 +951,13 @@ static void put_comm(struct dlm_comm *cm)
}
/* caller must free mem */
int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
int **new_out, int *new_count_out)
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out)
{
struct dlm_space *sp;
struct dlm_node *nd;
int i = 0, rv = 0, ids_count = 0, new_count = 0;
int *ids, *new;
struct dlm_config_node *nodes, *node;
int rv, count;
sp = get_space(lsname);
if (!sp)
@ -927,73 +970,42 @@ int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
goto out;
}
ids_count = sp->members_count;
count = sp->members_count;
ids = kcalloc(ids_count, sizeof(int), GFP_NOFS);
if (!ids) {
nodes = kcalloc(count, sizeof(struct dlm_config_node), GFP_NOFS);
if (!nodes) {
rv = -ENOMEM;
goto out;
}
node = nodes;
list_for_each_entry(nd, &sp->members, list) {
ids[i++] = nd->nodeid;
if (nd->new)
new_count++;
node->nodeid = nd->nodeid;
node->weight = nd->weight;
node->new = nd->new;
node->comm_seq = nd->comm_seq;
node++;
nd->new = 0;
}
if (ids_count != i)
printk(KERN_ERR "dlm: bad nodeid count %d %d\n", ids_count, i);
if (!new_count)
goto out_ids;
new = kcalloc(new_count, sizeof(int), GFP_NOFS);
if (!new) {
kfree(ids);
rv = -ENOMEM;
goto out;
}
i = 0;
list_for_each_entry(nd, &sp->members, list) {
if (nd->new) {
new[i++] = nd->nodeid;
nd->new = 0;
}
}
*new_count_out = new_count;
*new_out = new;
out_ids:
*ids_count_out = ids_count;
*ids_out = ids;
*count_out = count;
*nodes_out = nodes;
rv = 0;
out:
mutex_unlock(&sp->members_lock);
put_space(sp);
return rv;
}
int dlm_node_weight(char *lsname, int nodeid)
int dlm_comm_seq(int nodeid, uint32_t *seq)
{
struct dlm_space *sp;
struct dlm_node *nd;
int w = -EEXIST;
sp = get_space(lsname);
if (!sp)
goto out;
mutex_lock(&sp->members_lock);
list_for_each_entry(nd, &sp->members, list) {
if (nd->nodeid != nodeid)
continue;
w = nd->weight;
break;
}
mutex_unlock(&sp->members_lock);
put_space(sp);
out:
return w;
struct dlm_comm *cm = get_comm(nodeid, NULL);
if (!cm)
return -EEXIST;
*seq = cm->seq;
put_comm(cm);
return 0;
}
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr)
@ -1047,6 +1059,8 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US 0
#define DEFAULT_NEW_RSB_COUNT 128
#define DEFAULT_RECOVER_CALLBACKS 0
#define DEFAULT_CLUSTER_NAME ""
struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
@ -1060,6 +1074,8 @@ struct dlm_config_info dlm_config = {
.ci_protocol = DEFAULT_PROTOCOL,
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
.ci_waitwarn_us = DEFAULT_WAITWARN_US,
.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT
.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,
.ci_recover_callbacks = DEFAULT_RECOVER_CALLBACKS,
.ci_cluster_name = DEFAULT_CLUSTER_NAME
};

View file

@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -14,6 +14,13 @@
#ifndef __CONFIG_DOT_H__
#define __CONFIG_DOT_H__
struct dlm_config_node {
int nodeid;
int weight;
int new;
uint32_t comm_seq;
};
#define DLM_MAX_ADDR_COUNT 3
struct dlm_config_info {
@ -29,15 +36,17 @@ struct dlm_config_info {
int ci_timewarn_cs;
int ci_waitwarn_us;
int ci_new_rsb_count;
int ci_recover_callbacks;
char ci_cluster_name[DLM_LOCKSPACE_LEN];
};
extern struct dlm_config_info dlm_config;
int dlm_config_init(void);
void dlm_config_exit(void);
int dlm_node_weight(char *lsname, int nodeid);
int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
int **new_out, int *new_count_out);
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out);
int dlm_comm_seq(int nodeid, uint32_t *seq);
int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
int dlm_our_nodeid(void);

View file

@ -393,6 +393,7 @@ static const struct seq_operations format3_seq_ops;
static void *table_seq_start(struct seq_file *seq, loff_t *pos)
{
struct rb_node *node;
struct dlm_ls *ls = seq->private;
struct rsbtbl_iter *ri;
struct dlm_rsb *r;
@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
ri->format = 3;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
res_hashchain) {
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
if (!entry--) {
dlm_hold_rsb(r);
ri->rsb = r;
@ -449,9 +451,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
}
spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
struct dlm_rsb, res_hashchain);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
node = rb_first(&ls->ls_rsbtbl[bucket].keep);
r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
{
struct dlm_ls *ls = seq->private;
struct rsbtbl_iter *ri = iter_ptr;
struct list_head *next;
struct rb_node *next;
struct dlm_rsb *r, *rp;
loff_t n = *pos;
unsigned bucket;
@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
spin_lock(&ls->ls_rsbtbl[bucket].lock);
rp = ri->rsb;
next = rp->res_hashchain.next;
next = rb_next(&rp->res_hashnode);
if (next != &ls->ls_rsbtbl[bucket].list) {
r = list_entry(next, struct dlm_rsb, res_hashchain);
if (next) {
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
}
spin_lock(&ls->ls_rsbtbl[bucket].lock);
if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
struct dlm_rsb, res_hashchain);
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
next = rb_first(&ls->ls_rsbtbl[bucket].keep);
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;

View file

@ -290,7 +290,6 @@ int dlm_recover_directory(struct dlm_ls *ls)
out_status:
error = 0;
dlm_set_recover_status(ls, DLM_RS_DIR);
log_debug(ls, "dlm_recover_directory %d entries", count);
out_free:
kfree(last_name);

View file

@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -103,8 +103,8 @@ struct dlm_dirtable {
};
struct dlm_rsbtable {
struct list_head list;
struct list_head toss;
struct rb_root keep;
struct rb_root toss;
spinlock_t lock;
};
@ -117,6 +117,10 @@ struct dlm_member {
struct list_head list;
int nodeid;
int weight;
int slot;
int slot_prev;
int comm_seq;
uint32_t generation;
};
/*
@ -125,10 +129,8 @@ struct dlm_member {
struct dlm_recover {
struct list_head list;
int *nodeids; /* nodeids of all members */
int node_count;
int *new; /* nodeids of new members */
int new_count;
struct dlm_config_node *nodes;
int nodes_count;
uint64_t seq;
};
@ -285,7 +287,10 @@ struct dlm_rsb {
unsigned long res_toss_time;
uint32_t res_first_lkid;
struct list_head res_lookup; /* lkbs waiting on first */
struct list_head res_hashchain; /* rsbtbl */
union {
struct list_head res_hashchain;
struct rb_node res_hashnode; /* rsbtbl */
};
struct list_head res_grantqueue;
struct list_head res_convertqueue;
struct list_head res_waitqueue;
@ -334,7 +339,9 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
/* dlm_header is first element of all structs sent between nodes */
#define DLM_HEADER_MAJOR 0x00030000
#define DLM_HEADER_MINOR 0x00000000
#define DLM_HEADER_MINOR 0x00000001
#define DLM_HEADER_SLOTS 0x00000001
#define DLM_MSG 1
#define DLM_RCOM 2
@ -422,10 +429,34 @@ union dlm_packet {
struct dlm_rcom rcom;
};
#define DLM_RSF_NEED_SLOTS 0x00000001
/* RCOM_STATUS data */
struct rcom_status {
__le32 rs_flags;
__le32 rs_unused1;
__le64 rs_unused2;
};
/* RCOM_STATUS_REPLY data */
struct rcom_config {
__le32 rf_lvblen;
__le32 rf_lsflags;
__le64 rf_unused;
/* DLM_HEADER_SLOTS adds: */
__le32 rf_flags;
__le16 rf_our_slot;
__le16 rf_num_slots;
__le32 rf_generation;
__le32 rf_unused1;
__le64 rf_unused2;
};
struct rcom_slot {
__le32 ro_nodeid;
__le16 ro_slot;
__le16 ro_unused1;
__le64 ro_unused2;
};
struct rcom_lock {
@ -452,6 +483,7 @@ struct dlm_ls {
struct list_head ls_list; /* list of lockspaces */
dlm_lockspace_t *ls_local_handle;
uint32_t ls_global_id; /* global unique lockspace ID */
uint32_t ls_generation;
uint32_t ls_exflags;
int ls_lvblen;
int ls_count; /* refcount of processes in
@ -490,6 +522,11 @@ struct dlm_ls {
int ls_total_weight;
int *ls_node_array;
int ls_slot;
int ls_num_slots;
int ls_slots_size;
struct dlm_slot *ls_slots;
struct dlm_rsb ls_stub_rsb; /* for returning errors */
struct dlm_lkb ls_stub_lkb; /* for returning errors */
struct dlm_message ls_stub_ms; /* for faking a reply */
@ -537,6 +574,9 @@ struct dlm_ls {
struct list_head ls_root_list; /* root resources */
struct rw_semaphore ls_root_sem; /* protect root_list */
const struct dlm_lockspace_ops *ls_ops;
void *ls_ops_arg;
int ls_namelen;
char ls_name[1];
};

View file

@ -56,6 +56,7 @@
L: receive_xxxx_reply() <- R: send_xxxx_reply()
*/
#include <linux/types.h>
#include <linux/rbtree.h>
#include <linux/slab.h>
#include "dlm_internal.h"
#include <linux/dlm_device.h>
@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
list_del(&r->res_hashchain);
/* Convert the empty list_head to a NULL rb_node for tree usage: */
memset(&r->res_hashnode, 0, sizeof(struct rb_node));
ls->ls_new_rsb_count--;
spin_unlock(&ls->ls_new_rsb_spin);
@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
memcpy(r->res_name, name, len);
mutex_init(&r->res_mutex);
INIT_LIST_HEAD(&r->res_hashchain);
INIT_LIST_HEAD(&r->res_lookup);
INIT_LIST_HEAD(&r->res_grantqueue);
INIT_LIST_HEAD(&r->res_convertqueue);
@ -400,14 +402,31 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
return 0;
}
static int search_rsb_list(struct list_head *head, char *name, int len,
static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
{
char maxname[DLM_RESNAME_MAXLEN];
memset(maxname, 0, DLM_RESNAME_MAXLEN);
memcpy(maxname, name, nlen);
return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
}
static int search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct rb_node *node = tree->rb_node;
struct dlm_rsb *r;
int error = 0;
int rc;
list_for_each_entry(r, head, res_hashchain) {
if (len == r->res_length && !memcmp(name, r->res_name, len))
while (node) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
rc = rsb_cmp(r, name, len);
if (rc < 0)
node = node->rb_left;
else if (rc > 0)
node = node->rb_right;
else
goto found;
}
*r_ret = NULL;
@ -420,22 +439,54 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
return error;
}
static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
{
struct rb_node **newn = &tree->rb_node;
struct rb_node *parent = NULL;
int rc;
while (*newn) {
struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
res_hashnode);
parent = *newn;
rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
if (rc < 0)
newn = &parent->rb_left;
else if (rc > 0)
newn = &parent->rb_right;
else {
log_print("rsb_insert match");
dlm_dump_rsb(rsb);
dlm_dump_rsb(cur);
return -EEXIST;
}
}
rb_link_node(&rsb->res_hashnode, parent, newn);
rb_insert_color(&rsb->res_hashnode, tree);
return 0;
}
static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
int error;
error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
if (!error) {
kref_get(&r->res_ref);
goto out;
}
error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
if (error)
goto out;
list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
if (error)
return error;
if (dlm_no_directory(ls))
goto out;
@ -527,8 +578,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
nodeid = 0;
r->res_nodeid = nodeid;
}
list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
error = 0;
error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
out_unlock:
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
out:
@ -556,7 +606,8 @@ static void toss_rsb(struct kref *kref)
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
kref_init(&r->res_ref);
list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
r->res_toss_time = jiffies;
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
@ -1082,19 +1133,19 @@ static void dir_remove(struct dlm_rsb *r)
r->res_name, r->res_length);
}
/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
found since they are in order of newest to oldest? */
/* FIXME: make this more efficient */
static int shrink_bucket(struct dlm_ls *ls, int b)
{
struct rb_node *n;
struct dlm_rsb *r;
int count = 0, found;
for (;;) {
found = 0;
spin_lock(&ls->ls_rsbtbl[b].lock);
list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
res_hashchain) {
for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!time_after_eq(jiffies, r->res_toss_time +
dlm_config.ci_toss_secs * HZ))
continue;
@ -1108,7 +1159,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
}
if (kref_put(&r->res_ref, kill_rsb)) {
list_del(&r->res_hashchain);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
spin_unlock(&ls->ls_rsbtbl[b].lock);
if (is_master(r))
@ -4441,10 +4492,12 @@ int dlm_purge_locks(struct dlm_ls *ls)
static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
{
struct rb_node *n;
struct dlm_rsb *r, *r_ret = NULL;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!rsb_flag(r, RSB_LOCKS_PURGED))
continue;
hold_rsb(r);

View file

@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -386,12 +386,15 @@ static void threads_stop(void)
dlm_lowcomms_stop();
}
static int new_lockspace(const char *name, int namelen, void **lockspace,
uint32_t flags, int lvblen)
static int new_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops, void *ops_arg,
int *ops_result, dlm_lockspace_t **lockspace)
{
struct dlm_ls *ls;
int i, size, error;
int do_unreg = 0;
int namelen = strlen(name);
if (namelen > DLM_LOCKSPACE_LEN)
return -EINVAL;
@ -403,8 +406,24 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
return -EINVAL;
if (!dlm_user_daemon_available()) {
module_put(THIS_MODULE);
return -EUNATCH;
log_print("dlm user daemon not available");
error = -EUNATCH;
goto out;
}
if (ops && ops_result) {
if (!dlm_config.ci_recover_callbacks)
*ops_result = -EOPNOTSUPP;
else
*ops_result = 0;
}
if (dlm_config.ci_recover_callbacks && cluster &&
strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
log_print("dlm cluster name %s mismatch %s",
dlm_config.ci_cluster_name, cluster);
error = -EBADR;
goto out;
}
error = 0;
@ -442,6 +461,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
ls->ls_flags = 0;
ls->ls_scan_time = jiffies;
if (ops && dlm_config.ci_recover_callbacks) {
ls->ls_ops = ops;
ls->ls_ops_arg = ops_arg;
}
if (flags & DLM_LSFL_TIMEWARN)
set_bit(LSFL_TIMEWARN, &ls->ls_flags);
@ -457,8 +481,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
if (!ls->ls_rsbtbl)
goto out_lsfree;
for (i = 0; i < size; i++) {
INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
ls->ls_rsbtbl[i].keep.rb_node = NULL;
ls->ls_rsbtbl[i].toss.rb_node = NULL;
spin_lock_init(&ls->ls_rsbtbl[i].lock);
}
@ -525,6 +549,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
if (!ls->ls_recover_buf)
goto out_dirfree;
ls->ls_slot = 0;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_slots = NULL;
INIT_LIST_HEAD(&ls->ls_recover_list);
spin_lock_init(&ls->ls_recover_list_lock);
ls->ls_recover_list_count = 0;
@ -614,8 +643,10 @@ static int new_lockspace(const char *name, int namelen, void **lockspace,
return error;
}
int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
uint32_t flags, int lvblen)
int dlm_new_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops, void *ops_arg,
int *ops_result, dlm_lockspace_t **lockspace)
{
int error = 0;
@ -625,7 +656,8 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
if (error)
goto out;
error = new_lockspace(name, namelen, lockspace, flags, lvblen);
error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
ops_result, lockspace);
if (!error)
ls_count++;
if (error > 0)
@ -685,7 +717,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
static int release_lockspace(struct dlm_ls *ls, int force)
{
struct dlm_rsb *rsb;
struct list_head *head;
struct rb_node *n;
int i, busy, rv;
busy = lockspace_busy(ls, force);
@ -746,20 +778,15 @@ static int release_lockspace(struct dlm_ls *ls, int force)
*/
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
head = &ls->ls_rsbtbl[i].list;
while (!list_empty(head)) {
rsb = list_entry(head->next, struct dlm_rsb,
res_hashchain);
list_del(&rsb->res_hashchain);
while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].keep);
dlm_free_rsb(rsb);
}
head = &ls->ls_rsbtbl[i].toss;
while (!list_empty(head)) {
rsb = list_entry(head->next, struct dlm_rsb,
res_hashchain);
list_del(&rsb->res_hashchain);
while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].toss);
dlm_free_rsb(rsb);
}
}

View file

@ -1,7 +1,7 @@
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005-2009 Red Hat, Inc. All rights reserved.
** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -19,6 +19,280 @@
#include "config.h"
#include "lowcomms.h"
int dlm_slots_version(struct dlm_header *h)
{
if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
return 0;
return 1;
}
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
struct dlm_member *memb)
{
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
if (!dlm_slots_version(&rc->rc_header))
return;
memb->slot = le16_to_cpu(rf->rf_our_slot);
memb->generation = le32_to_cpu(rf->rf_generation);
}
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
{
struct dlm_slot *slot;
struct rcom_slot *ro;
int i;
ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
/* ls_slots array is sparse, but not rcom_slots */
for (i = 0; i < ls->ls_slots_size; i++) {
slot = &ls->ls_slots[i];
if (!slot->nodeid)
continue;
ro->ro_nodeid = cpu_to_le32(slot->nodeid);
ro->ro_slot = cpu_to_le16(slot->slot);
ro++;
}
}
#define SLOT_DEBUG_LINE 128
static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
struct rcom_slot *ro0, struct dlm_slot *array,
int array_size)
{
char line[SLOT_DEBUG_LINE];
int len = SLOT_DEBUG_LINE - 1;
int pos = 0;
int ret, i;
if (!dlm_config.ci_log_debug)
return;
memset(line, 0, sizeof(line));
if (array) {
for (i = 0; i < array_size; i++) {
if (!array[i].nodeid)
continue;
ret = snprintf(line + pos, len - pos, " %d:%d",
array[i].slot, array[i].nodeid);
if (ret >= len - pos)
break;
pos += ret;
}
} else if (ro0) {
for (i = 0; i < num_slots; i++) {
ret = snprintf(line + pos, len - pos, " %d:%d",
ro0[i].ro_slot, ro0[i].ro_nodeid);
if (ret >= len - pos)
break;
pos += ret;
}
}
log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
}
int dlm_slots_copy_in(struct dlm_ls *ls)
{
struct dlm_member *memb;
struct dlm_rcom *rc = ls->ls_recover_buf;
struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
struct rcom_slot *ro0, *ro;
int our_nodeid = dlm_our_nodeid();
int i, num_slots;
uint32_t gen;
if (!dlm_slots_version(&rc->rc_header))
return -1;
gen = le32_to_cpu(rf->rf_generation);
if (gen <= ls->ls_generation) {
log_error(ls, "dlm_slots_copy_in gen %u old %u",
gen, ls->ls_generation);
}
ls->ls_generation = gen;
num_slots = le16_to_cpu(rf->rf_num_slots);
if (!num_slots)
return -1;
ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
ro->ro_slot = le16_to_cpu(ro->ro_slot);
}
log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
list_for_each_entry(memb, &ls->ls_nodes, list) {
for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
if (ro->ro_nodeid != memb->nodeid)
continue;
memb->slot = ro->ro_slot;
memb->slot_prev = memb->slot;
break;
}
if (memb->nodeid == our_nodeid) {
if (ls->ls_slot && ls->ls_slot != memb->slot) {
log_error(ls, "dlm_slots_copy_in our slot "
"changed %d %d", ls->ls_slot,
memb->slot);
return -1;
}
if (!ls->ls_slot)
ls->ls_slot = memb->slot;
}
if (!memb->slot) {
log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
memb->nodeid);
return -1;
}
}
return 0;
}
/* for any nodes that do not support slots, we will not have set memb->slot
in wait_status_all(), so memb->slot will remain -1, and we will not
assign slots or set ls_num_slots here */
int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
struct dlm_slot **slots_out, uint32_t *gen_out)
{
struct dlm_member *memb;
struct dlm_slot *array;
int our_nodeid = dlm_our_nodeid();
int array_size, max_slots, i;
int need = 0;
int max = 0;
int num = 0;
uint32_t gen = 0;
/* our own memb struct will have slot -1 gen 0 */
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->nodeid == our_nodeid) {
memb->slot = ls->ls_slot;
memb->generation = ls->ls_generation;
break;
}
}
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->generation > gen)
gen = memb->generation;
/* node doesn't support slots */
if (memb->slot == -1)
return -1;
/* node needs a slot assigned */
if (!memb->slot)
need++;
/* node has a slot assigned */
num++;
if (!max || max < memb->slot)
max = memb->slot;
/* sanity check, once slot is assigned it shouldn't change */
if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
log_error(ls, "nodeid %d slot changed %d %d",
memb->nodeid, memb->slot_prev, memb->slot);
return -1;
}
memb->slot_prev = memb->slot;
}
array_size = max + need;
array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
if (!array)
return -ENOMEM;
num = 0;
/* fill in slots (offsets) that are used */
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (!memb->slot)
continue;
if (memb->slot > array_size) {
log_error(ls, "invalid slot number %d", memb->slot);
kfree(array);
return -1;
}
array[memb->slot - 1].nodeid = memb->nodeid;
array[memb->slot - 1].slot = memb->slot;
num++;
}
/* assign new slots from unused offsets */
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->slot)
continue;
for (i = 0; i < array_size; i++) {
if (array[i].nodeid)
continue;
memb->slot = i + 1;
memb->slot_prev = memb->slot;
array[i].nodeid = memb->nodeid;
array[i].slot = memb->slot;
num++;
if (!ls->ls_slot && memb->nodeid == our_nodeid)
ls->ls_slot = memb->slot;
break;
}
if (!memb->slot) {
log_error(ls, "no free slot found");
kfree(array);
return -1;
}
}
gen++;
log_debug_slots(ls, gen, num, NULL, array, array_size);
max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
if (num > max_slots) {
log_error(ls, "num_slots %d exceeds max_slots %d",
num, max_slots);
kfree(array);
return -1;
}
*gen_out = gen;
*slots_out = array;
*slots_size = array_size;
*num_slots = num;
return 0;
}
static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
{
struct dlm_member *memb = NULL;
@ -43,59 +317,51 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
}
}
static int dlm_add_member(struct dlm_ls *ls, int nodeid)
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
{
struct dlm_member *memb;
int w, error;
int error;
memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
if (!memb)
return -ENOMEM;
w = dlm_node_weight(ls->ls_name, nodeid);
if (w < 0) {
kfree(memb);
return w;
}
error = dlm_lowcomms_connect_node(nodeid);
error = dlm_lowcomms_connect_node(node->nodeid);
if (error < 0) {
kfree(memb);
return error;
}
memb->nodeid = nodeid;
memb->weight = w;
memb->nodeid = node->nodeid;
memb->weight = node->weight;
memb->comm_seq = node->comm_seq;
add_ordered_member(ls, memb);
ls->ls_num_nodes++;
return 0;
}
static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
static struct dlm_member *find_memb(struct list_head *head, int nodeid)
{
list_move(&memb->list, &ls->ls_nodes_gone);
ls->ls_num_nodes--;
struct dlm_member *memb;
list_for_each_entry(memb, head, list) {
if (memb->nodeid == nodeid)
return memb;
}
return NULL;
}
int dlm_is_member(struct dlm_ls *ls, int nodeid)
{
struct dlm_member *memb;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->nodeid == nodeid)
return 1;
}
if (find_memb(&ls->ls_nodes, nodeid))
return 1;
return 0;
}
int dlm_is_removed(struct dlm_ls *ls, int nodeid)
{
struct dlm_member *memb;
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
if (memb->nodeid == nodeid)
return 1;
}
if (find_memb(&ls->ls_nodes_gone, nodeid))
return 1;
return 0;
}
@ -176,7 +442,7 @@ static int ping_members(struct dlm_ls *ls)
error = dlm_recovery_stopped(ls);
if (error)
break;
error = dlm_rcom_status(ls, memb->nodeid);
error = dlm_rcom_status(ls, memb->nodeid, 0);
if (error)
break;
}
@ -186,10 +452,88 @@ static int ping_members(struct dlm_ls *ls)
return error;
}
static void dlm_lsop_recover_prep(struct dlm_ls *ls)
{
if (!ls->ls_ops || !ls->ls_ops->recover_prep)
return;
ls->ls_ops->recover_prep(ls->ls_ops_arg);
}
static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
{
struct dlm_slot slot;
uint32_t seq;
int error;
if (!ls->ls_ops || !ls->ls_ops->recover_slot)
return;
/* if there is no comms connection with this node
or the present comms connection is newer
than the one when this member was added, then
we consider the node to have failed (versus
being removed due to dlm_release_lockspace) */
error = dlm_comm_seq(memb->nodeid, &seq);
if (!error && seq == memb->comm_seq)
return;
slot.nodeid = memb->nodeid;
slot.slot = memb->slot;
ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
}
void dlm_lsop_recover_done(struct dlm_ls *ls)
{
struct dlm_member *memb;
struct dlm_slot *slots;
int i, num;
if (!ls->ls_ops || !ls->ls_ops->recover_done)
return;
num = ls->ls_num_nodes;
slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
if (!slots)
return;
i = 0;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (i == num) {
log_error(ls, "dlm_lsop_recover_done bad num %d", num);
goto out;
}
slots[i].nodeid = memb->nodeid;
slots[i].slot = memb->slot;
i++;
}
ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
ls->ls_slot, ls->ls_generation);
out:
kfree(slots);
}
static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
int nodeid)
{
int i;
for (i = 0; i < rv->nodes_count; i++) {
if (rv->nodes[i].nodeid == nodeid)
return &rv->nodes[i];
}
return NULL;
}
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
{
struct dlm_member *memb, *safe;
int i, error, found, pos = 0, neg = 0, low = -1;
struct dlm_config_node *node;
int i, error, neg = 0, low = -1;
/* previously removed members that we've not finished removing need to
count as a negative change so the "neg" recovery steps will happen */
@ -202,46 +546,32 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
/* move departed members from ls_nodes to ls_nodes_gone */
list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
found = 0;
for (i = 0; i < rv->node_count; i++) {
if (memb->nodeid == rv->nodeids[i]) {
found = 1;
break;
}
}
if (!found) {
neg++;
dlm_remove_member(ls, memb);
log_debug(ls, "remove member %d", memb->nodeid);
}
}
/* Add an entry to ls_nodes_gone for members that were removed and
then added again, so that previous state for these nodes will be
cleared during recovery. */
for (i = 0; i < rv->new_count; i++) {
if (!dlm_is_member(ls, rv->new[i]))
node = find_config_node(rv, memb->nodeid);
if (node && !node->new)
continue;
log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
if (!memb)
return -ENOMEM;
memb->nodeid = rv->new[i];
list_add_tail(&memb->list, &ls->ls_nodes_gone);
if (!node) {
log_debug(ls, "remove member %d", memb->nodeid);
} else {
/* removed and re-added */
log_debug(ls, "remove member %d comm_seq %u %u",
memb->nodeid, memb->comm_seq, node->comm_seq);
}
neg++;
list_move(&memb->list, &ls->ls_nodes_gone);
ls->ls_num_nodes--;
dlm_lsop_recover_slot(ls, memb);
}
/* add new members to ls_nodes */
for (i = 0; i < rv->node_count; i++) {
if (dlm_is_member(ls, rv->nodeids[i]))
for (i = 0; i < rv->nodes_count; i++) {
node = &rv->nodes[i];
if (dlm_is_member(ls, node->nodeid))
continue;
dlm_add_member(ls, rv->nodeids[i]);
pos++;
log_debug(ls, "add member %d", rv->nodeids[i]);
dlm_add_member(ls, node);
log_debug(ls, "add member %d", node->nodeid);
}
list_for_each_entry(memb, &ls->ls_nodes, list) {
@ -251,7 +581,6 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
ls->ls_low_nodeid = low;
make_member_array(ls);
dlm_set_recover_status(ls, DLM_RS_NODES);
*neg_out = neg;
error = ping_members(ls);
@ -261,12 +590,8 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
ls->ls_members_result = error;
complete(&ls->ls_members_done);
}
if (error)
goto out;
error = dlm_recover_members_wait(ls);
out:
log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
return error;
}
@ -327,26 +652,35 @@ int dlm_ls_stop(struct dlm_ls *ls)
*/
dlm_recoverd_suspend(ls);
spin_lock(&ls->ls_recover_lock);
kfree(ls->ls_slots);
ls->ls_slots = NULL;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_recover_status = 0;
spin_unlock(&ls->ls_recover_lock);
dlm_recoverd_resume(ls);
if (!ls->ls_recover_begin)
ls->ls_recover_begin = jiffies;
dlm_lsop_recover_prep(ls);
return 0;
}
int dlm_ls_start(struct dlm_ls *ls)
{
struct dlm_recover *rv = NULL, *rv_old;
int *ids = NULL, *new = NULL;
int error, ids_count = 0, new_count = 0;
struct dlm_config_node *nodes;
int error, count;
rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
if (!rv)
return -ENOMEM;
error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
&new, &new_count);
error = dlm_config_nodes(ls->ls_name, &nodes, &count);
if (error < 0)
goto fail;
@ -361,10 +695,8 @@ int dlm_ls_start(struct dlm_ls *ls)
goto fail;
}
rv->nodeids = ids;
rv->node_count = ids_count;
rv->new = new;
rv->new_count = new_count;
rv->nodes = nodes;
rv->nodes_count = count;
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
@ -372,9 +704,8 @@ int dlm_ls_start(struct dlm_ls *ls)
if (rv_old) {
log_error(ls, "unused recovery %llx %d",
(unsigned long long)rv_old->seq, rv_old->node_count);
kfree(rv_old->nodeids);
kfree(rv_old->new);
(unsigned long long)rv_old->seq, rv_old->nodes_count);
kfree(rv_old->nodes);
kfree(rv_old);
}
@ -383,8 +714,7 @@ int dlm_ls_start(struct dlm_ls *ls)
fail:
kfree(rv);
kfree(ids);
kfree(new);
kfree(nodes);
return error;
}

View file

@ -1,7 +1,7 @@
/******************************************************************************
*******************************************************************************
**
** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -20,6 +20,14 @@ void dlm_clear_members_gone(struct dlm_ls *ls);
int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
int dlm_is_removed(struct dlm_ls *ls, int nodeid);
int dlm_is_member(struct dlm_ls *ls, int nodeid);
int dlm_slots_version(struct dlm_header *h);
void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
struct dlm_member *memb);
void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_slots_copy_in(struct dlm_ls *ls);
int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
struct dlm_slot **slots_out, uint32_t *gen_out);
void dlm_lsop_recover_done(struct dlm_ls *ls);
#endif /* __MEMBER_DOT_H__ */

View file

@ -23,6 +23,7 @@
#include "memory.h"
#include "lock.h"
#include "util.h"
#include "member.h"
static int rcom_response(struct dlm_ls *ls)
@ -72,20 +73,30 @@ static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
dlm_lowcomms_commit_buffer(mh);
}
static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
uint32_t flags)
{
rs->rs_flags = cpu_to_le32(flags);
}
/* When replying to a status request, a node also sends back its
configuration values. The requesting node then checks that the remote
node is configured the same way as itself. */
static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
uint32_t num_slots)
{
rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
rf->rf_num_slots = cpu_to_le16(num_slots);
rf->rf_generation = cpu_to_le32(ls->ls_generation);
}
static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
log_error(ls, "version mismatch: %x nodeid %d: %x",
@ -94,12 +105,6 @@ static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
return -EPROTO;
}
if (rc->rc_header.h_length < conf_size) {
log_error(ls, "config too short: %d nodeid %d",
rc->rc_header.h_length, nodeid);
return -EPROTO;
}
if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@ -127,7 +132,18 @@ static void disallow_sync_reply(struct dlm_ls *ls)
spin_unlock(&ls->ls_rcom_spin);
}
int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
/*
* low nodeid gathers one slot value at a time from each node.
* it sets need_slots=0, and saves rf_our_slot returned from each
* rcom_config.
*
* other nodes gather all slot values at once from the low nodeid.
* they set need_slots=1, and ignore the rf_our_slot returned from each
* rcom_config. they use the rf_num_slots returned from the low
* node's rcom_config.
*/
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@ -141,10 +157,13 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
goto out;
}
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
sizeof(struct rcom_status), &rc, &mh);
if (error)
goto out;
set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
@ -161,8 +180,11 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
/* we pretend the remote lockspace exists with 0 status */
log_debug(ls, "remote node %d not ready", nodeid);
rc->rc_result = 0;
} else
error = check_config(ls, rc, nodeid);
error = 0;
} else {
error = check_rcom_config(ls, rc, nodeid);
}
/* the caller looks at rc_result for the remote recovery status */
out:
return error;
@ -172,17 +194,60 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
int error, nodeid = rc_in->rc_header.h_nodeid;
struct rcom_status *rs;
uint32_t status;
int nodeid = rc_in->rc_header.h_nodeid;
int len = sizeof(struct rcom_config);
int num_slots = 0;
int error;
if (!dlm_slots_version(&rc_in->rc_header)) {
status = dlm_recover_status(ls);
goto do_create;
}
rs = (struct rcom_status *)rc_in->rc_buf;
if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
status = dlm_recover_status(ls);
goto do_create;
}
spin_lock(&ls->ls_recover_lock);
status = ls->ls_recover_status;
num_slots = ls->ls_num_slots;
spin_unlock(&ls->ls_recover_lock);
len += num_slots * sizeof(struct rcom_slot);
do_create:
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
sizeof(struct rcom_config), &rc, &mh);
len, &rc, &mh);
if (error)
return;
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
rc->rc_result = dlm_recover_status(ls);
make_config(ls, (struct rcom_config *) rc->rc_buf);
rc->rc_result = status;
set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
if (!num_slots)
goto do_send;
spin_lock(&ls->ls_recover_lock);
if (ls->ls_num_slots != num_slots) {
spin_unlock(&ls->ls_recover_lock);
log_debug(ls, "receive_rcom_status num_slots %d to %d",
num_slots, ls->ls_num_slots);
rc->rc_result = 0;
set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
goto do_send;
}
dlm_slots_copy_out(ls, rc);
spin_unlock(&ls->ls_recover_lock);
do_send:
send_rcom(ls, mh, rc);
}

View file

@ -14,7 +14,7 @@
#ifndef __RCOM_DOT_H__
#define __RCOM_DOT_H__
int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);

View file

@ -85,14 +85,20 @@ uint32_t dlm_recover_status(struct dlm_ls *ls)
return status;
}
static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
{
ls->ls_recover_status |= status;
}
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
{
spin_lock(&ls->ls_recover_lock);
ls->ls_recover_status |= status;
_set_recover_status(ls, status);
spin_unlock(&ls->ls_recover_lock);
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
int save_slots)
{
struct dlm_rcom *rc = ls->ls_recover_buf;
struct dlm_member *memb;
@ -106,10 +112,13 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
goto out;
}
error = dlm_rcom_status(ls, memb->nodeid);
error = dlm_rcom_status(ls, memb->nodeid, 0);
if (error)
goto out;
if (save_slots)
dlm_slot_save(ls, rc, memb);
if (rc->rc_result & wait_status)
break;
if (delay < 1000)
@ -121,7 +130,8 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
return error;
}
static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
uint32_t status_flags)
{
struct dlm_rcom *rc = ls->ls_recover_buf;
int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@ -132,7 +142,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
goto out;
}
error = dlm_rcom_status(ls, nodeid);
error = dlm_rcom_status(ls, nodeid, status_flags);
if (error)
break;
@ -152,18 +162,56 @@ static int wait_status(struct dlm_ls *ls, uint32_t status)
int error;
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
error = wait_status_all(ls, status);
error = wait_status_all(ls, status, 0);
if (!error)
dlm_set_recover_status(ls, status_all);
} else
error = wait_status_low(ls, status_all);
error = wait_status_low(ls, status_all, 0);
return error;
}
int dlm_recover_members_wait(struct dlm_ls *ls)
{
return wait_status(ls, DLM_RS_NODES);
struct dlm_member *memb;
struct dlm_slot *slots;
int num_slots, slots_size;
int error, rv;
uint32_t gen;
list_for_each_entry(memb, &ls->ls_nodes, list) {
memb->slot = -1;
memb->generation = 0;
}
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
error = wait_status_all(ls, DLM_RS_NODES, 1);
if (error)
goto out;
/* slots array is sparse, slots_size may be > num_slots */
rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
if (!rv) {
spin_lock(&ls->ls_recover_lock);
_set_recover_status(ls, DLM_RS_NODES_ALL);
ls->ls_num_slots = num_slots;
ls->ls_slots_size = slots_size;
ls->ls_slots = slots;
ls->ls_generation = gen;
spin_unlock(&ls->ls_recover_lock);
} else {
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
} else {
error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
if (error)
goto out;
dlm_slots_copy_in(ls);
}
out:
return error;
}
int dlm_recover_directory_wait(struct dlm_ls *ls)
@ -542,8 +590,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
out:
if (error)
recover_list_clear(ls);
else
dlm_set_recover_status(ls, DLM_RS_LOCKS);
return error;
}
@ -715,6 +761,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
int dlm_create_root_list(struct dlm_ls *ls)
{
struct rb_node *n;
struct dlm_rsb *r;
int i, error = 0;
@ -727,7 +774,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
dlm_hold_rsb(r);
}
@ -741,7 +789,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
continue;
}
list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
list_add(&r->res_root_list, &ls->ls_root_list);
dlm_hold_rsb(r);
}
@ -771,16 +820,18 @@ void dlm_release_root_list(struct dlm_ls *ls)
void dlm_clear_toss_list(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
struct rb_node *n, *next;
struct dlm_rsb *rsb;
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
res_hashchain) {
if (dlm_no_directory(ls) || !is_master(r)) {
list_del(&r->res_hashchain);
dlm_free_rsb(r);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);;
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
if (dlm_no_directory(ls) || !is_master(rsb)) {
rb_erase(n, &ls->ls_rsbtbl[i].toss);
dlm_free_rsb(rsb);
}
}
spin_unlock(&ls->ls_rsbtbl[i].lock);

View file

@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -54,7 +54,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
unsigned long start;
int error, neg = 0;
log_debug(ls, "recover %llx", (unsigned long long)rv->seq);
log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq);
mutex_lock(&ls->ls_recoverd_active);
@ -76,14 +76,22 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
/*
* Add or remove nodes from the lockspace's ls_nodes list.
* Also waits for all nodes to complete dlm_recover_members.
*/
error = dlm_recover_members(ls, rv, &neg);
if (error) {
log_debug(ls, "recover_members failed %d", error);
log_debug(ls, "dlm_recover_members error %d", error);
goto fail;
}
dlm_set_recover_status(ls, DLM_RS_NODES);
error = dlm_recover_members_wait(ls);
if (error) {
log_debug(ls, "dlm_recover_members_wait error %d", error);
goto fail;
}
start = jiffies;
/*
@ -93,17 +101,15 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_directory(ls);
if (error) {
log_debug(ls, "recover_directory failed %d", error);
log_debug(ls, "dlm_recover_directory error %d", error);
goto fail;
}
/*
* Wait for all nodes to complete directory rebuild.
*/
dlm_set_recover_status(ls, DLM_RS_DIR);
error = dlm_recover_directory_wait(ls);
if (error) {
log_debug(ls, "recover_directory_wait failed %d", error);
log_debug(ls, "dlm_recover_directory_wait error %d", error);
goto fail;
}
@ -133,7 +139,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_masters(ls);
if (error) {
log_debug(ls, "recover_masters failed %d", error);
log_debug(ls, "dlm_recover_masters error %d", error);
goto fail;
}
@ -143,13 +149,15 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_locks(ls);
if (error) {
log_debug(ls, "recover_locks failed %d", error);
log_debug(ls, "dlm_recover_locks error %d", error);
goto fail;
}
dlm_set_recover_status(ls, DLM_RS_LOCKS);
error = dlm_recover_locks_wait(ls);
if (error) {
log_debug(ls, "recover_locks_wait failed %d", error);
log_debug(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
}
@ -170,7 +178,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = dlm_recover_locks_wait(ls);
if (error) {
log_debug(ls, "recover_locks_wait failed %d", error);
log_debug(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
}
}
@ -186,9 +194,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_purge_requestqueue(ls);
dlm_set_recover_status(ls, DLM_RS_DONE);
error = dlm_recover_done_wait(ls);
if (error) {
log_debug(ls, "recover_done_wait failed %d", error);
log_debug(ls, "dlm_recover_done_wait error %d", error);
goto fail;
}
@ -200,34 +209,35 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
error = enable_locking(ls, rv->seq);
if (error) {
log_debug(ls, "enable_locking failed %d", error);
log_debug(ls, "enable_locking error %d", error);
goto fail;
}
error = dlm_process_requestqueue(ls);
if (error) {
log_debug(ls, "process_requestqueue failed %d", error);
log_debug(ls, "dlm_process_requestqueue error %d", error);
goto fail;
}
error = dlm_recover_waiters_post(ls);
if (error) {
log_debug(ls, "recover_waiters_post failed %d", error);
log_debug(ls, "dlm_recover_waiters_post error %d", error);
goto fail;
}
dlm_grant_after_purge(ls);
log_debug(ls, "recover %llx done: %u ms",
(unsigned long long)rv->seq,
log_debug(ls, "dlm_recover %llx generation %u done: %u ms",
(unsigned long long)rv->seq, ls->ls_generation,
jiffies_to_msecs(jiffies - start));
mutex_unlock(&ls->ls_recoverd_active);
dlm_lsop_recover_done(ls);
return 0;
fail:
dlm_release_root_list(ls);
log_debug(ls, "recover %llx error %d",
log_debug(ls, "dlm_recover %llx error %d",
(unsigned long long)rv->seq, error);
mutex_unlock(&ls->ls_recoverd_active);
return error;
@ -250,8 +260,7 @@ static void do_ls_recovery(struct dlm_ls *ls)
if (rv) {
ls_recover(ls, rv);
kfree(rv->nodeids);
kfree(rv->new);
kfree(rv->nodes);
kfree(rv);
}
}

View file

@ -392,8 +392,9 @@ static int device_create_lockspace(struct dlm_lspace_params *params)
if (!capable(CAP_SYS_ADMIN))
return -EPERM;
error = dlm_new_lockspace(params->name, strlen(params->name),
&lockspace, params->flags, DLM_USER_LVB_LEN);
error = dlm_new_lockspace(params->name, NULL, params->flags,
DLM_USER_LVB_LEN, NULL, NULL, NULL,
&lockspace);
if (error)
return error;

View file

@ -195,10 +195,10 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *fsname)
return -EINVAL;
}
error = dlm_new_lockspace(fsname, strlen(fsname), &ls->ls_dlm,
error = dlm_new_lockspace(fsname, NULL,
DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
(ls->ls_nodir ? DLM_LSFL_NODIR : 0),
GDLM_LVB_SIZE);
GDLM_LVB_SIZE, NULL, NULL, NULL, &ls->ls_dlm);
if (error)
printk(KERN_ERR "dlm_new_lockspace error %d", error);

View file

@ -827,8 +827,8 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
goto out;
}
rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name),
&fsdlm, DLM_LSFL_FS, DLM_LVB_LEN);
rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
NULL, NULL, NULL, &fsdlm);
if (rc) {
ocfs2_live_connection_drop(control);
goto out;

View file

@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@ -74,15 +74,76 @@ struct dlm_lksb {
#ifdef __KERNEL__
struct dlm_slot {
int nodeid; /* 1 to MAX_INT */
int slot; /* 1 to MAX_INT */
};
/*
* recover_prep: called before the dlm begins lock recovery.
* Notfies lockspace user that locks from failed members will be granted.
* recover_slot: called after recover_prep and before recover_done.
* Identifies a failed lockspace member.
* recover_done: called after the dlm completes lock recovery.
* Identifies lockspace members and lockspace generation number.
*/
struct dlm_lockspace_ops {
void (*recover_prep) (void *ops_arg);
void (*recover_slot) (void *ops_arg, struct dlm_slot *slot);
void (*recover_done) (void *ops_arg, struct dlm_slot *slots,
int num_slots, int our_slot, uint32_t generation);
};
/*
* dlm_new_lockspace
*
* Starts a lockspace with the given name. If the named lockspace exists in
* the cluster, the calling node joins it.
* Create/join a lockspace.
*
* name: lockspace name, null terminated, up to DLM_LOCKSPACE_LEN (not
* including terminating null).
*
* cluster: cluster name, null terminated, up to DLM_LOCKSPACE_LEN (not
* including terminating null). Optional. When cluster is null, it
* is not used. When set, dlm_new_lockspace() returns -EBADR if cluster
* is not equal to the dlm cluster name.
*
* flags:
* DLM_LSFL_NODIR
* The dlm should not use a resource directory, but statically assign
* resource mastery to nodes based on the name hash that is otherwise
* used to select the directory node. Must be the same on all nodes.
* DLM_LSFL_TIMEWARN
* The dlm should emit netlink messages if locks have been waiting
* for a configurable amount of time. (Unused.)
* DLM_LSFL_FS
* The lockspace user is in the kernel (i.e. filesystem). Enables
* direct bast/cast callbacks.
* DLM_LSFL_NEWEXCL
* dlm_new_lockspace() should return -EEXIST if the lockspace exists.
*
* lvblen: length of lvb in bytes. Must be multiple of 8.
* dlm_new_lockspace() returns an error if this does not match
* what other nodes are using.
*
* ops: callbacks that indicate lockspace recovery points so the
* caller can coordinate its recovery and know lockspace members.
* This is only used by the initial dlm_new_lockspace() call.
* Optional.
*
* ops_arg: arg for ops callbacks.
*
* ops_result: tells caller if the ops callbacks (if provided) will
* be used or not. 0: will be used, -EXXX will not be used.
* -EOPNOTSUPP: the dlm does not have recovery_callbacks enabled.
*
* lockspace: handle for dlm functions
*/
int dlm_new_lockspace(const char *name, int namelen,
dlm_lockspace_t **lockspace, uint32_t flags, int lvblen);
int dlm_new_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops, void *ops_arg,
int *ops_result, dlm_lockspace_t **lockspace);
/*
* dlm_release_lockspace