ceph: generalize mon requests, add pool op support

Generalize the current statfs synchronous requests, and support pool_ops.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Yehuda Sadeh 2010-05-17 12:40:28 -07:00 committed by Sage Weil
parent 0eb6cd49f6
commit e56fa10e92
2 changed files with 163 additions and 22 deletions

View file

@ -349,7 +349,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
} }
/* /*
* statfs * generic requests (e.g., statfs, poolop)
*/ */
static struct ceph_mon_generic_request *__lookup_generic_req( static struct ceph_mon_generic_request *__lookup_generic_req(
struct ceph_mon_client *monc, u64 tid) struct ceph_mon_client *monc, u64 tid)
@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m; return m;
} }
static int do_generic_request(struct ceph_mon_client *monc,
struct ceph_mon_generic_request *req)
{
int err;
/* register request */
mutex_lock(&monc->mutex);
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
ceph_con_send(monc->con, ceph_msg_get(req->request));
mutex_unlock(&monc->mutex);
err = wait_for_completion_interruptible(&req->completion);
mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
mutex_unlock(&monc->mutex);
if (!err)
err = req->result;
return err;
}
/*
* statfs
*/
static void handle_statfs_reply(struct ceph_mon_client *monc, static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg) struct ceph_msg *msg)
{ {
@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
return; return;
bad: bad:
pr_err("corrupt generic reply, no tid\n"); pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg); ceph_msg_dump(msg);
} }
@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
kref_init(&req->kref); kref_init(&req->kref);
req->buf = buf; req->buf = buf;
req->buf_len = sizeof(*buf);
init_completion(&req->completion); init_completion(&req->completion);
err = -ENOMEM; err = -ENOMEM;
@ -504,25 +534,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
h->monhdr.session_mon_tid = 0; h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid; h->fsid = monc->monmap->fsid;
/* register request */ err = do_generic_request(monc, req);
mutex_lock(&monc->mutex);
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
mutex_unlock(&monc->mutex);
/* send request and wait */
ceph_con_send(monc->con, ceph_msg_get(req->request));
err = wait_for_completion_interruptible(&req->completion);
mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
mutex_unlock(&monc->mutex);
if (!err)
err = req->result;
out: out:
kref_put(&req->kref, release_generic_request); kref_put(&req->kref, release_generic_request);
@ -530,7 +542,126 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
} }
/* /*
* Resend pending statfs requests. * pool ops
*/
static int get_poolop_reply_buf(const char *src, size_t src_len,
char *dst, size_t dst_len)
{
u32 buf_len;
if (src_len != sizeof(u32) + dst_len)
return -EINVAL;
buf_len = le32_to_cpu(*(u32 *)src);
if (buf_len != dst_len)
return -EINVAL;
memcpy(dst, src + sizeof(u32), dst_len);
return 0;
}
static void handle_poolop_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
u64 tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*reply))
goto bad;
dout("handle_poolop_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
req = __lookup_generic_req(monc, tid);
if (req) {
if (req->buf_len &&
get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
msg->front.iov_len - sizeof(*reply),
req->buf, req->buf_len) < 0) {
mutex_unlock(&monc->mutex);
goto bad;
}
req->result = le32_to_cpu(reply->reply_code);
get_generic_request(req);
}
mutex_unlock(&monc->mutex);
if (req) {
complete(&req->completion);
put_generic_request(req);
}
return;
bad:
pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
/*
* Do a synchronous pool op.
*/
int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop *h;
int err;
req = kzalloc(sizeof(*req), GFP_NOFS);
if (!req)
return -ENOMEM;
kref_init(&req->kref);
req->buf = buf;
req->buf_len = len;
init_completion(&req->completion);
err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
if (!req->request)
goto out;
req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
if (!req->reply)
goto out;
/* fill out request */
req->request->hdr.version = cpu_to_le16(2);
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->pool = cpu_to_le32(pool);
h->op = cpu_to_le32(op);
h->auid = 0;
h->snapid = cpu_to_le64(snapid);
h->name_len = 0;
err = do_generic_request(monc, req);
out:
kref_put(&req->kref, release_generic_request);
return err;
}
int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));
}
int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, snapid, 0, 0);
}
/*
* Resend pending generic requests.
*/ */
static void __resend_generic_request(struct ceph_mon_client *monc) static void __resend_generic_request(struct ceph_mon_client *monc)
{ {
@ -783,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg); handle_statfs_reply(monc, msg);
break; break;
case CEPH_MSG_POOLOP_REPLY:
handle_poolop_reply(monc, msg);
break;
case CEPH_MSG_MON_MAP: case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg); ceph_monc_handle_map(monc, msg);
break; break;
@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK: case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack); m = ceph_msg_get(monc->m_subscribe_ack);
break; break;
case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY: case CEPH_MSG_STATFS_REPLY:
return get_generic_reply(con, hdr, skip); return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_AUTH_REPLY:

View file

@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
struct rb_node node; struct rb_node node;
int result; int result;
void *buf; void *buf;
int buf_len;
struct completion completion; struct completion completion;
struct ceph_msg *request; /* original request */ struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */ struct ceph_msg *reply; /* and reply */
@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid);
extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid);
#endif #endif