diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 5 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 8 | ||||
-rw-r--r-- | net/sunrpc/rpcb_clnt.c | 2 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 255 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 45 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 7 |
6 files changed, 155 insertions, 167 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 6dac38792288..ef6384961808 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -266,6 +266,7 @@ gss_release_msg(struct gss_upcall_msg *gss_msg) BUG_ON(!list_empty(&gss_msg->list)); if (gss_msg->ctx != NULL) gss_put_ctx(gss_msg->ctx); + rpc_destroy_wait_queue(&gss_msg->rpc_waitqueue); kfree(gss_msg); } @@ -408,13 +409,13 @@ gss_refresh_upcall(struct rpc_task *task) } spin_lock(&inode->i_lock); if (gss_cred->gc_upcall != NULL) - rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL, NULL); + rpc_sleep_on(&gss_cred->gc_upcall->rpc_waitqueue, task, NULL); else if (gss_msg->ctx == NULL && gss_msg->msg.errno >= 0) { task->tk_timeout = 0; gss_cred->gc_upcall = gss_msg; /* gss_upcall_callback will release the reference to gss_upcall_msg */ atomic_inc(&gss_msg->count); - rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback, NULL); + rpc_sleep_on(&gss_msg->rpc_waitqueue, task, gss_upcall_callback); } else err = gss_msg->msg.errno; spin_unlock(&inode->i_lock); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 8c6a7f1a25e9..ea14314331b0 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -548,7 +548,7 @@ EXPORT_SYMBOL_GPL(rpc_run_task); * @msg: RPC call parameters * @flags: RPC call flags */ -int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags) +int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags) { struct rpc_task *task; struct rpc_task_setup task_setup_data = { @@ -579,7 +579,7 @@ EXPORT_SYMBOL_GPL(rpc_call_sync); * @data: user call data */ int -rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, +rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags, const struct rpc_call_ops *tk_ops, void *data) { struct rpc_task *task; @@ -1066,7 +1066,7 @@ call_transmit(struct rpc_task *task) if (task->tk_msg.rpc_proc->p_decode != NULL) return; task->tk_action = rpc_exit_task; - rpc_wake_up_task(task); + rpc_wake_up_queued_task(&task->tk_xprt->pending, task); } /* @@ -1535,7 +1535,7 @@ void rpc_show_tasks(void) proc = -1; if (RPC_IS_QUEUED(t)) - rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); + rpc_waitq = rpc_qname(t->tk_waitqueue); printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", t->tk_pid, proc, diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c index 3164a0871cf0..f480c718b400 100644 --- a/net/sunrpc/rpcb_clnt.c +++ b/net/sunrpc/rpcb_clnt.c @@ -298,7 +298,7 @@ void rpcb_getport_async(struct rpc_task *task) /* Put self on queue before sending rpcbind request, in case * rpcb_getport_done completes before we return from rpc_run_task */ - rpc_sleep_on(&xprt->binding, task, NULL, NULL); + rpc_sleep_on(&xprt->binding, task, NULL); /* Someone else may have bound if we slept */ if (xprt_bound(xprt)) { diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 4c669121e607..cae219c8caeb 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -38,9 +38,9 @@ static struct kmem_cache *rpc_buffer_slabp __read_mostly; static mempool_t *rpc_task_mempool __read_mostly; static mempool_t *rpc_buffer_mempool __read_mostly; -static void __rpc_default_timer(struct rpc_task *task); static void rpc_async_schedule(struct work_struct *); static void rpc_release_task(struct rpc_task *task); +static void __rpc_queue_timer_fn(unsigned long ptr); /* * RPC tasks sit here while waiting for conditions to improve. @@ -57,41 +57,30 @@ struct workqueue_struct *rpciod_workqueue; * queue->lock and bh_disabled in order to avoid races within * rpc_run_timer(). */ -static inline void -__rpc_disable_timer(struct rpc_task *task) +static void +__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task) { + if (task->tk_timeout == 0) + return; dprintk("RPC: %5u disabling timer\n", task->tk_pid); - task->tk_timeout_fn = NULL; task->tk_timeout = 0; + list_del(&task->u.tk_wait.timer_list); + if (list_empty(&queue->timer_list.list)) + del_timer(&queue->timer_list.timer); } -/* - * Run a timeout function. - * We use the callback in order to allow __rpc_wake_up_task() - * and friends to disable the timer synchronously on SMP systems - * without calling del_timer_sync(). The latter could cause a - * deadlock if called while we're holding spinlocks... - */ -static void rpc_run_timer(struct rpc_task *task) +static void +rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires) { - void (*callback)(struct rpc_task *); - - callback = task->tk_timeout_fn; - task->tk_timeout_fn = NULL; - if (callback && RPC_IS_QUEUED(task)) { - dprintk("RPC: %5u running timer\n", task->tk_pid); - callback(task); - } - smp_mb__before_clear_bit(); - clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); - smp_mb__after_clear_bit(); + queue->timer_list.expires = expires; + mod_timer(&queue->timer_list.timer, expires); } /* * Set up a timer for the current task. */ -static inline void -__rpc_add_timer(struct rpc_task *task, rpc_action timer) +static void +__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task) { if (!task->tk_timeout) return; @@ -99,27 +88,10 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer) dprintk("RPC: %5u setting alarm for %lu ms\n", task->tk_pid, task->tk_timeout * 1000 / HZ); - if (timer) - task->tk_timeout_fn = timer; - else - task->tk_timeout_fn = __rpc_default_timer; - set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); - mod_timer(&task->tk_timer, jiffies + task->tk_timeout); -} - -/* - * Delete any timer for the current task. Because we use del_timer_sync(), - * this function should never be called while holding queue->lock. - */ -static void -rpc_delete_timer(struct rpc_task *task) -{ - if (RPC_IS_QUEUED(task)) - return; - if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) { - del_singleshot_timer_sync(&task->tk_timer); - dprintk("RPC: %5u deleting timer\n", task->tk_pid); - } + task->u.tk_wait.expires = jiffies + task->tk_timeout; + if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires)) + rpc_set_queue_timer(queue, task->u.tk_wait.expires); + list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list); } /* @@ -161,7 +133,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task * list_add(&task->u.tk_wait.list, &queue->tasks[0]); else list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); - task->u.tk_wait.rpc_waitq = queue; + task->tk_waitqueue = queue; queue->qlen++; rpc_set_queued(task); @@ -181,22 +153,18 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task) list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); } - list_del(&task->u.tk_wait.list); } /* * Remove request from queue. * Note: must be called with spin lock held. */ -static void __rpc_remove_wait_queue(struct rpc_task *task) +static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) { - struct rpc_wait_queue *queue; - queue = task->u.tk_wait.rpc_waitq; - + __rpc_disable_timer(queue, task); if (RPC_IS_PRIORITY(queue)) __rpc_remove_wait_queue_priority(task); - else - list_del(&task->u.tk_wait.list); + list_del(&task->u.tk_wait.list); queue->qlen--; dprintk("RPC: %5u removed from queue %p \"%s\"\n", task->tk_pid, queue, rpc_qname(queue)); @@ -229,6 +197,9 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c INIT_LIST_HEAD(&queue->tasks[i]); queue->maxpriority = nr_queues - 1; rpc_reset_waitqueue_priority(queue); + queue->qlen = 0; + setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue); + INIT_LIST_HEAD(&queue->timer_list.list); #ifdef RPC_DEBUG queue->name = qname; #endif @@ -245,6 +216,12 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) } EXPORT_SYMBOL_GPL(rpc_init_wait_queue); +void rpc_destroy_wait_queue(struct rpc_wait_queue *queue) +{ + del_timer_sync(&queue->timer_list.timer); +} +EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); + static int rpc_wait_bit_killable(void *word) { if (fatal_signal_pending(current)) @@ -313,7 +290,6 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); */ static void rpc_make_runnable(struct rpc_task *task) { - BUG_ON(task->tk_timeout_fn); rpc_clear_queued(task); if (rpc_test_and_set_running(task)) return; @@ -326,7 +302,7 @@ static void rpc_make_runnable(struct rpc_task *task) int status; INIT_WORK(&task->u.tk_work, rpc_async_schedule); - status = queue_work(task->tk_workqueue, &task->u.tk_work); + status = queue_work(rpciod_workqueue, &task->u.tk_work); if (status < 0) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); task->tk_status = status; @@ -343,7 +319,7 @@ static void rpc_make_runnable(struct rpc_task *task) * as it's on a wait queue. */ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, - rpc_action action, rpc_action timer) + rpc_action action) { dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", task->tk_pid, rpc_qname(q), jiffies); @@ -357,11 +333,11 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, BUG_ON(task->tk_callback != NULL); task->tk_callback = action; - __rpc_add_timer(task, timer); + __rpc_add_timer(q, task); } void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, - rpc_action action, rpc_action timer) + rpc_action action) { /* Mark the task as being activated if so needed */ rpc_set_active(task); @@ -370,18 +346,19 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, * Protect the queue operations. */ spin_lock_bh(&q->lock); - __rpc_sleep_on(q, task, action, timer); + __rpc_sleep_on(q, task, action); spin_unlock_bh(&q->lock); } EXPORT_SYMBOL_GPL(rpc_sleep_on); /** * __rpc_do_wake_up_task - wake up a single rpc_task + * @queue: wait queue * @task: task to be woken up * * Caller must hold queue->lock, and have cleared the task queued flag. */ -static void __rpc_do_wake_up_task(struct rpc_task *task) +static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) { dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", task->tk_pid, jiffies); @@ -395,8 +372,7 @@ static void __rpc_do_wake_up_task(struct rpc_task *task) return; } - __rpc_disable_timer(task); - __rpc_remove_wait_queue(task); + __rpc_remove_wait_queue(queue, task); rpc_make_runnable(task); @@ -404,48 +380,32 @@ static void __rpc_do_wake_up_task(struct rpc_task *task) } /* - * Wake up the specified task + * Wake up a queued task while the queue lock is being held */ -static void __rpc_wake_up_task(struct rpc_task *task) +static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) { - if (rpc_start_wakeup(task)) { - if (RPC_IS_QUEUED(task)) - __rpc_do_wake_up_task(task); - rpc_finish_wakeup(task); - } + if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue) + __rpc_do_wake_up_task(queue, task); } /* - * Default timeout handler if none specified by user + * Wake up a task on a specific queue */ -static void -__rpc_default_timer(struct rpc_task *task) +void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task) { - dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid); - task->tk_status = -ETIMEDOUT; - rpc_wake_up_task(task); + spin_lock_bh(&queue->lock); + rpc_wake_up_task_queue_locked(queue, task); + spin_unlock_bh(&queue->lock); } +EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task); /* * Wake up the specified task */ -void rpc_wake_up_task(struct rpc_task *task) +static void rpc_wake_up_task(struct rpc_task *task) { - rcu_read_lock_bh(); - if (rpc_start_wakeup(task)) { - if (RPC_IS_QUEUED(task)) { - struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; - - /* Note: we're already in a bh-safe context */ - spin_lock(&queue->lock); - __rpc_do_wake_up_task(task); - spin_unlock(&queue->lock); - } - rpc_finish_wakeup(task); - } - rcu_read_unlock_bh(); + rpc_wake_up_queued_task(task->tk_waitqueue, task); } -EXPORT_SYMBOL_GPL(rpc_wake_up_task); /* * Wake up the next task on a priority queue. @@ -495,7 +455,7 @@ new_queue: new_owner: rpc_set_waitqueue_owner(queue, task->tk_owner); out: - __rpc_wake_up_task(task); + rpc_wake_up_task_queue_locked(queue, task); return task; } @@ -508,16 +468,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); - rcu_read_lock_bh(); - spin_lock(&queue->lock); + spin_lock_bh(&queue->lock); if (RPC_IS_PRIORITY(queue)) task = __rpc_wake_up_next_priority(queue); else { task_for_first(task, &queue->tasks[0]) - __rpc_wake_up_task(task); + rpc_wake_up_task_queue_locked(queue, task); } - spin_unlock(&queue->lock); - rcu_read_unlock_bh(); + spin_unlock_bh(&queue->lock); return task; } @@ -534,18 +492,16 @@ void rpc_wake_up(struct rpc_wait_queue *queue) struct rpc_task *task, *next; struct list_head *head; - rcu_read_lock_bh(); - spin_lock(&queue->lock); + spin_lock_bh(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) - __rpc_wake_up_task(task); + rpc_wake_up_task_queue_locked(queue, task); if (head == &queue->tasks[0]) break; head--; } - spin_unlock(&queue->lock); - rcu_read_unlock_bh(); + spin_unlock_bh(&queue->lock); } EXPORT_SYMBOL_GPL(rpc_wake_up); @@ -561,26 +517,48 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) struct rpc_task *task, *next; struct list_head *head; - rcu_read_lock_bh(); - spin_lock(&queue->lock); + spin_lock_bh(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) { task->tk_status = status; - __rpc_wake_up_task(task); + rpc_wake_up_task_queue_locked(queue, task); } if (head == &queue->tasks[0]) break; head--; } - spin_unlock(&queue->lock); - rcu_read_unlock_bh(); + spin_unlock_bh(&queue->lock); } EXPORT_SYMBOL_GPL(rpc_wake_up_status); +static void __rpc_queue_timer_fn(unsigned long ptr) +{ + struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr; + struct rpc_task *task, *n; + unsigned long expires, now, timeo; + + spin_lock(&queue->lock); + expires = now = jiffies; + list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) { + timeo = task->u.tk_wait.expires; + if (time_after_eq(now, timeo)) { + dprintk("RPC: %5u timeout\n", task->tk_pid); + task->tk_status = -ETIMEDOUT; + rpc_wake_up_task_queue_locked(queue, task); + continue; + } + if (expires == now || time_after(expires, timeo)) + expires = timeo; + } + if (!list_empty(&queue->timer_list.list)) + rpc_set_queue_timer(queue, expires); + spin_unlock(&queue->lock); +} + static void __rpc_atrun(struct rpc_task *task) { - rpc_wake_up_task(task); + task->tk_status = 0; } /* @@ -589,7 +567,7 @@ static void __rpc_atrun(struct rpc_task *task) void rpc_delay(struct rpc_task *task, unsigned long delay) { task->tk_timeout = delay; - rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); + rpc_sleep_on(&delay_queue, task, __rpc_atrun); } EXPORT_SYMBOL_GPL(rpc_delay); @@ -644,10 +622,6 @@ static void __rpc_execute(struct rpc_task *task) BUG_ON(RPC_IS_QUEUED(task)); for (;;) { - /* - * Garbage collection of pending timers... - */ - rpc_delete_timer(task); /* * Execute any pending callback. @@ -816,8 +790,6 @@ EXPORT_SYMBOL_GPL(rpc_free); static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data) { memset(task, 0, sizeof(*task)); - setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer, - (unsigned long)task); atomic_set(&task->tk_count, 1); task->tk_flags = task_setup_data->flags; task->tk_ops = task_setup_data->callback_ops; @@ -832,7 +804,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta task->tk_owner = current->tgid; /* Initialize workqueue for async tasks */ - task->tk_workqueue = rpciod_workqueue; + task->tk_workqueue = task_setup_data->workqueue; task->tk_client = task_setup_data->rpc_client; if (task->tk_client != NULL) { @@ -868,13 +840,6 @@ rpc_alloc_task(void) return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); } -static void rpc_free_task(struct rcu_head *rcu) -{ - struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); - dprintk("RPC: %5u freeing task\n", task->tk_pid); - mempool_free(task, rpc_task_mempool); -} - /* * Create a new task for the specified client. */ @@ -898,12 +863,25 @@ out: return task; } - -void rpc_put_task(struct rpc_task *task) +static void rpc_free_task(struct rpc_task *task) { const struct rpc_call_ops *tk_ops = task->tk_ops; void *calldata = task->tk_calldata; + if (task->tk_flags & RPC_TASK_DYNAMIC) { + dprintk("RPC: %5u freeing task\n", task->tk_pid); + mempool_free(task, rpc_task_mempool); + } + rpc_release_calldata(tk_ops, calldata); +} + +static void rpc_async_release(struct work_struct *work) +{ + rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); +} + +void rpc_put_task(struct rpc_task *task) +{ if (!atomic_dec_and_test(&task->tk_count)) return; /* Release resources */ @@ -915,9 +893,11 @@ void rpc_put_task(struct rpc_task *task) rpc_release_client(task->tk_client); task->tk_client = NULL; } - if (task->tk_flags & RPC_TASK_DYNAMIC) - call_rcu_bh(&task->u.tk_rcu, rpc_free_task); - rpc_release_calldata(tk_ops, calldata); + if (task->tk_workqueue != NULL) { + INIT_WORK(&task->u.tk_work, rpc_async_release); + queue_work(task->tk_workqueue, &task->u.tk_work); + } else + rpc_free_task(task); } EXPORT_SYMBOL_GPL(rpc_put_task); @@ -937,9 +917,6 @@ static void rpc_release_task(struct rpc_task *task) } BUG_ON (RPC_IS_QUEUED(task)); - /* Synchronously delete any running timer */ - rpc_delete_timer(task); - #ifdef RPC_DEBUG task->tk_magic = 0; #endif @@ -1029,11 +1006,20 @@ rpc_destroy_mempool(void) kmem_cache_destroy(rpc_task_slabp); if (rpc_buffer_slabp) kmem_cache_destroy(rpc_buffer_slabp); + rpc_destroy_wait_queue(&delay_queue); } int rpc_init_mempool(void) { + /* + * The following is not strictly a mempool initialisation, + * but there is no harm in doing it here + */ + rpc_init_wait_queue(&delay_queue, "delayq"); + if (!rpciod_start()) + goto err_nomem; + rpc_task_slabp = kmem_cache_create("rpc_tasks", sizeof(struct rpc_task), 0, SLAB_HWCACHE_ALIGN, @@ -1054,13 +1040,6 @@ rpc_init_mempool(void) rpc_buffer_slabp); if (!rpc_buffer_mempool) goto err_nomem; - if (!rpciod_start()) - goto err_nomem; - /* - * The following is not strictly a mempool initialisation, - * but there is no harm in doing it here - */ - rpc_init_wait_queue(&delay_queue, "delayq"); return 0; err_nomem: rpc_destroy_mempool(); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index d5553b8179f9..85199c647022 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -188,9 +188,9 @@ out_sleep: task->tk_timeout = 0; task->tk_status = -EAGAIN; if (req && req->rq_ntrans) - rpc_sleep_on(&xprt->resend, task, NULL, NULL); + rpc_sleep_on(&xprt->resend, task, NULL); else - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + rpc_sleep_on(&xprt->sending, task, NULL); return 0; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt); @@ -238,9 +238,9 @@ out_sleep: task->tk_timeout = 0; task->tk_status = -EAGAIN; if (req && req->rq_ntrans) - rpc_sleep_on(&xprt->resend, task, NULL, NULL); + rpc_sleep_on(&xprt->resend, task, NULL); else - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + rpc_sleep_on(&xprt->sending, task, NULL); return 0; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); @@ -453,7 +453,7 @@ void xprt_wait_for_buffer_space(struct rpc_task *task) struct rpc_xprt *xprt = req->rq_xprt; task->tk_timeout = req->rq_timeout; - rpc_sleep_on(&xprt->pending, task, NULL, NULL); + rpc_sleep_on(&xprt->pending, task, NULL); } EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); @@ -472,7 +472,7 @@ void xprt_write_space(struct rpc_xprt *xprt) if (xprt->snd_task) { dprintk("RPC: write space: waking waiting task on " "xprt %p\n", xprt); - rpc_wake_up_task(xprt->snd_task); + rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); } spin_unlock_bh(&xprt->transport_lock); } @@ -602,8 +602,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) queue_work(rpciod_workqueue, &xprt->task_cleanup); - else if (xprt->snd_task != NULL) - rpc_wake_up_task(xprt->snd_task); + xprt_wake_pending_tasks(xprt, -ENOTCONN); spin_unlock_bh(&xprt->transport_lock); } EXPORT_SYMBOL_GPL(xprt_force_disconnect); @@ -653,7 +652,7 @@ void xprt_connect(struct rpc_task *task) task->tk_rqstp->rq_bytes_sent = 0; task->tk_timeout = xprt->connect_timeout; - rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); + rpc_sleep_on(&xprt->pending, task, xprt_connect_status); xprt->stat.connect_start = jiffies; xprt->ops->connect(task); } @@ -749,18 +748,19 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt); void xprt_complete_rqst(struct rpc_task *task, int copied) { struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", task->tk_pid, ntohl(req->rq_xid), copied); - task->tk_xprt->stat.recvs++; + xprt->stat.recvs++; task->tk_rtt = (long)jiffies - req->rq_xtime; list_del_init(&req->rq_list); /* Ensure all writes are done before we update req->rq_received */ smp_wmb(); req->rq_received = req->rq_private_buf.len = copied; - rpc_wake_up_task(task); + rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); @@ -769,17 +769,17 @@ static void xprt_timer(struct rpc_task *task) struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; + if (task->tk_status != -ETIMEDOUT) + return; dprintk("RPC: %5u xprt_timer\n", task->tk_pid); - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); if (!req->rq_received) { if (xprt->ops->timer) xprt->ops->timer(task); - task->tk_status = -ETIMEDOUT; - } - task->tk_timeout = 0; - rpc_wake_up_task(task); - spin_unlock(&xprt->transport_lock); + } else + task->tk_status = 0; + spin_unlock_bh(&xprt->transport_lock); } /** @@ -864,7 +864,7 @@ void xprt_transmit(struct rpc_task *task) if (!xprt_connected(xprt)) task->tk_status = -ENOTCONN; else if (!req->rq_received) - rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); + rpc_sleep_on(&xprt->pending, task, xprt_timer); spin_unlock_bh(&xprt->transport_lock); return; } @@ -875,7 +875,7 @@ void xprt_transmit(struct rpc_task *task) */ task->tk_status = status; if (status == -ECONNREFUSED) - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + rpc_sleep_on(&xprt->sending, task, NULL); } static inline void do_xprt_reserve(struct rpc_task *task) @@ -895,7 +895,7 @@ static inline void do_xprt_reserve(struct rpc_task *task) dprintk("RPC: waiting for request slot\n"); task->tk_status = -EAGAIN; task->tk_timeout = 0; - rpc_sleep_on(&xprt->backlog, task, NULL, NULL); + rpc_sleep_on(&xprt->backlog, task, NULL); } /** @@ -1052,6 +1052,11 @@ static void xprt_destroy(struct kref *kref) xprt->shutdown = 1; del_timer_sync(&xprt->timer); + rpc_destroy_wait_queue(&xprt->binding); + rpc_destroy_wait_queue(&xprt->pending); + rpc_destroy_wait_queue(&xprt->sending); + rpc_destroy_wait_queue(&xprt->resend); + rpc_destroy_wait_queue(&xprt->backlog); /* * Tear down transport state and free the rpc_xprt */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 30e7ac243a90..8bd3b0f73ac0 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1073,6 +1073,7 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes) { struct rpc_xprt *xprt; read_descriptor_t rd_desc; + int read; dprintk("RPC: xs_tcp_data_ready...\n"); @@ -1084,8 +1085,10 @@ static void xs_tcp_data_ready(struct sock *sk, int bytes) /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ rd_desc.arg.data = xprt; - rd_desc.count = 65536; - tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + do { + rd_desc.count = 65536; + read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + } while (read > 0); out: read_unlock(&sk->sk_callback_lock); } |