[PATCH] RPC: separate TCP and UDP write space callbacks
Split the socket write space callback function into a TCP version and UDP version, eliminating one dependence on the "xprt->stream" variable. Keep the common pieces of this path in xprt.c so other transports can use it too. Test-plan: Write-intensive workload on a single mount point. Version: Thu, 11 Aug 2005 16:07:51 -0400 Signed-off-by: Chuck Lever <cel@netapp.com> Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
This commit is contained in:
parent
55aa4f58aa
commit
c7b2cae8a6
3 changed files with 89 additions and 31 deletions
|
@ -240,6 +240,8 @@ int xprt_destroy(struct rpc_xprt *xprt);
|
|||
* Transport switch helper functions
|
||||
*/
|
||||
void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status);
|
||||
void xprt_wait_for_buffer_space(struct rpc_task *task);
|
||||
void xprt_write_space(struct rpc_xprt *xprt);
|
||||
struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid);
|
||||
void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied);
|
||||
void xprt_disconnect(struct rpc_xprt *xprt);
|
||||
|
|
|
@ -241,6 +241,40 @@ void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
|
|||
rpc_wake_up(&xprt->pending);
|
||||
}
|
||||
|
||||
/**
|
||||
* xprt_wait_for_buffer_space - wait for transport output buffer to clear
|
||||
* @task: task to be put to sleep
|
||||
*
|
||||
*/
|
||||
void xprt_wait_for_buffer_space(struct rpc_task *task)
|
||||
{
|
||||
struct rpc_rqst *req = task->tk_rqstp;
|
||||
struct rpc_xprt *xprt = req->rq_xprt;
|
||||
|
||||
task->tk_timeout = req->rq_timeout;
|
||||
rpc_sleep_on(&xprt->pending, task, NULL, NULL);
|
||||
}
|
||||
|
||||
/**
|
||||
* xprt_write_space - wake the task waiting for transport output buffer space
|
||||
* @xprt: transport with waiting tasks
|
||||
*
|
||||
* Can be called in a soft IRQ context, so xprt_write_space never sleeps.
|
||||
*/
|
||||
void xprt_write_space(struct rpc_xprt *xprt)
|
||||
{
|
||||
if (unlikely(xprt->shutdown))
|
||||
return;
|
||||
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
if (xprt->snd_task) {
|
||||
dprintk("RPC: write space: waking waiting task on xprt %p\n",
|
||||
xprt);
|
||||
rpc_wake_up_task(xprt->snd_task);
|
||||
}
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
}
|
||||
|
||||
static void xprt_reset_majortimeo(struct rpc_rqst *req)
|
||||
{
|
||||
struct rpc_timeout *to = &req->rq_xprt->timeout;
|
||||
|
|
|
@ -308,15 +308,13 @@ static int xs_send_request(struct rpc_task *task)
|
|||
|
||||
if (status == -EAGAIN) {
|
||||
if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
|
||||
/* Protect against races with xs_write_space */
|
||||
/* Protect against races with write_space */
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
/* Don't race with disconnect */
|
||||
if (!xprt_connected(xprt))
|
||||
task->tk_status = -ENOTCONN;
|
||||
else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
|
||||
task->tk_timeout = req->rq_timeout;
|
||||
rpc_sleep_on(&xprt->pending, task, NULL, NULL);
|
||||
}
|
||||
else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags))
|
||||
xprt_wait_for_buffer_space(task);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
return status;
|
||||
}
|
||||
|
@ -721,45 +719,68 @@ static void xs_tcp_state_change(struct sock *sk)
|
|||
}
|
||||
|
||||
/**
|
||||
* xs_write_space - callback invoked when socket buffer space becomes
|
||||
* available
|
||||
* xs_udp_write_space - callback invoked when socket buffer space
|
||||
* becomes available
|
||||
* @sk: socket whose state has changed
|
||||
*
|
||||
* Called when more output buffer space is available for this socket.
|
||||
* We try not to wake our writers until they can make "significant"
|
||||
* progress, otherwise we'll waste resources thrashing sock_sendmsg
|
||||
* progress, otherwise we'll waste resources thrashing kernel_sendmsg
|
||||
* with a bunch of small requests.
|
||||
*/
|
||||
static void xs_write_space(struct sock *sk)
|
||||
static void xs_udp_write_space(struct sock *sk)
|
||||
{
|
||||
struct rpc_xprt *xprt;
|
||||
struct socket *sock;
|
||||
|
||||
read_lock(&sk->sk_callback_lock);
|
||||
if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
|
||||
goto out;
|
||||
if (xprt->shutdown)
|
||||
goto out;
|
||||
|
||||
/* Wait until we have enough socket memory */
|
||||
if (xprt->stream) {
|
||||
/* from net/core/stream.c:sk_stream_write_space */
|
||||
if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))
|
||||
/* from net/core/sock.c:sock_def_write_space */
|
||||
if (sock_writeable(sk)) {
|
||||
struct socket *sock;
|
||||
struct rpc_xprt *xprt;
|
||||
|
||||
if (unlikely(!(sock = sk->sk_socket)))
|
||||
goto out;
|
||||
} else {
|
||||
/* from net/core/sock.c:sock_def_write_space */
|
||||
if (!sock_writeable(sk))
|
||||
if (unlikely(!(xprt = xprt_from_sock(sk))))
|
||||
goto out;
|
||||
if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
|
||||
goto out;
|
||||
|
||||
xprt_write_space(xprt);
|
||||
}
|
||||
|
||||
if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
|
||||
goto out;
|
||||
out:
|
||||
read_unlock(&sk->sk_callback_lock);
|
||||
}
|
||||
|
||||
spin_lock_bh(&xprt->transport_lock);
|
||||
if (xprt->snd_task)
|
||||
rpc_wake_up_task(xprt->snd_task);
|
||||
spin_unlock_bh(&xprt->transport_lock);
|
||||
out:
|
||||
/**
|
||||
* xs_tcp_write_space - callback invoked when socket buffer space
|
||||
* becomes available
|
||||
* @sk: socket whose state has changed
|
||||
*
|
||||
* Called when more output buffer space is available for this socket.
|
||||
* We try not to wake our writers until they can make "significant"
|
||||
* progress, otherwise we'll waste resources thrashing kernel_sendmsg
|
||||
* with a bunch of small requests.
|
||||
*/
|
||||
static void xs_tcp_write_space(struct sock *sk)
|
||||
{
|
||||
read_lock(&sk->sk_callback_lock);
|
||||
|
||||
/* from net/core/stream.c:sk_stream_write_space */
|
||||
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
|
||||
struct socket *sock;
|
||||
struct rpc_xprt *xprt;
|
||||
|
||||
if (unlikely(!(sock = sk->sk_socket)))
|
||||
goto out;
|
||||
if (unlikely(!(xprt = xprt_from_sock(sk))))
|
||||
goto out;
|
||||
if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
|
||||
goto out;
|
||||
|
||||
xprt_write_space(xprt);
|
||||
}
|
||||
|
||||
out:
|
||||
read_unlock(&sk->sk_callback_lock);
|
||||
}
|
||||
|
||||
|
@ -855,15 +876,16 @@ static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
|
|||
xprt->old_write_space = sk->sk_write_space;
|
||||
if (xprt->prot == IPPROTO_UDP) {
|
||||
sk->sk_data_ready = xs_udp_data_ready;
|
||||
sk->sk_write_space = xs_udp_write_space;
|
||||
sk->sk_no_check = UDP_CSUM_NORCV;
|
||||
xprt_set_connected(xprt);
|
||||
} else {
|
||||
tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */
|
||||
sk->sk_data_ready = xs_tcp_data_ready;
|
||||
sk->sk_state_change = xs_tcp_state_change;
|
||||
sk->sk_write_space = xs_tcp_write_space;
|
||||
xprt_clear_connected(xprt);
|
||||
}
|
||||
sk->sk_write_space = xs_write_space;
|
||||
|
||||
/* Reset to new socket */
|
||||
xprt->sock = sock;
|
||||
|
|
Loading…
Reference in a new issue