1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * linux/net/sunrpc/clnt.c
4 *
5 * This file contains the high-level RPC interface.
6 * It is modeled as a finite state machine to support both synchronous
7 * and asynchronous requests.
8 *
9 * - RPC header generation and argument serialization.
10 * - Credential refresh.
11 * - TCP connect handling.
12 * - Retry of operation when it is suspected the operation failed because
13 * of uid squashing on the server, or when the credentials were stale
14 * and need to be refreshed, or when a packet was damaged in transit.
15 * This may be have to be moved to the VFS layer.
16 *
17 * Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18 * Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19 */
20
21
22#include <linux/module.h>
23#include <linux/types.h>
24#include <linux/kallsyms.h>
25#include <linux/mm.h>
26#include <linux/namei.h>
27#include <linux/mount.h>
28#include <linux/slab.h>
29#include <linux/rcupdate.h>
30#include <linux/utsname.h>
31#include <linux/workqueue.h>
32#include <linux/in.h>
33#include <linux/in6.h>
34#include <linux/un.h>
35
36#include <linux/sunrpc/clnt.h>
37#include <linux/sunrpc/addr.h>
38#include <linux/sunrpc/rpc_pipe_fs.h>
39#include <linux/sunrpc/metrics.h>
40#include <linux/sunrpc/bc_xprt.h>
41#include <trace/events/sunrpc.h>
42
43#include "sunrpc.h"
44#include "sysfs.h"
45#include "netns.h"
46
47#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48# define RPCDBG_FACILITY RPCDBG_CALL
49#endif
50
51static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
52
53static void call_start(struct rpc_task *task);
54static void call_reserve(struct rpc_task *task);
55static void call_reserveresult(struct rpc_task *task);
56static void call_allocate(struct rpc_task *task);
57static void call_encode(struct rpc_task *task);
58static void call_decode(struct rpc_task *task);
59static void call_bind(struct rpc_task *task);
60static void call_bind_status(struct rpc_task *task);
61static void call_transmit(struct rpc_task *task);
62static void call_status(struct rpc_task *task);
63static void call_transmit_status(struct rpc_task *task);
64static void call_refresh(struct rpc_task *task);
65static void call_refreshresult(struct rpc_task *task);
66static void call_connect(struct rpc_task *task);
67static void call_connect_status(struct rpc_task *task);
68
69static int rpc_encode_header(struct rpc_task *task,
70 struct xdr_stream *xdr);
71static int rpc_decode_header(struct rpc_task *task,
72 struct xdr_stream *xdr);
73static int rpc_ping(struct rpc_clnt *clnt);
74static int rpc_ping_noreply(struct rpc_clnt *clnt);
75static void rpc_check_timeout(struct rpc_task *task);
76
77static void rpc_register_client(struct rpc_clnt *clnt)
78{
79 struct net *net = rpc_net_ns(clnt);
80 struct sunrpc_net *sn = net_generic(net, id: sunrpc_net_id);
81
82 spin_lock(lock: &sn->rpc_client_lock);
83 list_add(new: &clnt->cl_clients, head: &sn->all_clients);
84 spin_unlock(lock: &sn->rpc_client_lock);
85}
86
87static void rpc_unregister_client(struct rpc_clnt *clnt)
88{
89 struct net *net = rpc_net_ns(clnt);
90 struct sunrpc_net *sn = net_generic(net, id: sunrpc_net_id);
91
92 spin_lock(lock: &sn->rpc_client_lock);
93 list_del(entry: &clnt->cl_clients);
94 spin_unlock(lock: &sn->rpc_client_lock);
95}
96
97static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
98{
99 rpc_remove_client_dir(clnt);
100}
101
102static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103{
104 struct net *net = rpc_net_ns(clnt);
105 struct super_block *pipefs_sb;
106
107 pipefs_sb = rpc_get_sb_net(net);
108 if (pipefs_sb) {
109 if (pipefs_sb == clnt->pipefs_sb)
110 __rpc_clnt_remove_pipedir(clnt);
111 rpc_put_sb_net(net);
112 }
113}
114
115static int rpc_setup_pipedir_sb(struct super_block *sb,
116 struct rpc_clnt *clnt)
117{
118 static uint32_t clntid;
119 const char *dir_name = clnt->cl_program->pipe_dir_name;
120 char name[15];
121 struct dentry *dir;
122 int err;
123
124 dir = rpc_d_lookup_sb(sb, dir_name);
125 if (dir == NULL) {
126 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
127 return -ENOENT;
128 }
129 for (;;) {
130 snprintf(buf: name, size: sizeof(name), fmt: "clnt%x", (unsigned int)clntid++);
131 name[sizeof(name) - 1] = '\0';
132 err = rpc_create_client_dir(dir, name, clnt);
133 if (!err)
134 break;
135 if (err == -EEXIST)
136 continue;
137 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
138 " %s/%s, error %d\n",
139 dir_name, name, err);
140 break;
141 }
142 dput(dir);
143 return err;
144}
145
146static int
147rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
148{
149 clnt->pipefs_sb = pipefs_sb;
150
151 if (clnt->cl_program->pipe_dir_name != NULL) {
152 int err = rpc_setup_pipedir_sb(sb: pipefs_sb, clnt);
153 if (err && err != -ENOENT)
154 return err;
155 }
156 return 0;
157}
158
159static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
160{
161 if (clnt->cl_program->pipe_dir_name == NULL)
162 return 1;
163
164 switch (event) {
165 case RPC_PIPEFS_MOUNT:
166 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
167 return 1;
168 if (refcount_read(r: &clnt->cl_count) == 0)
169 return 1;
170 break;
171 case RPC_PIPEFS_UMOUNT:
172 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
173 return 1;
174 break;
175 }
176 return 0;
177}
178
179static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
180 struct super_block *sb)
181{
182 switch (event) {
183 case RPC_PIPEFS_MOUNT:
184 return rpc_setup_pipedir_sb(sb, clnt);
185 case RPC_PIPEFS_UMOUNT:
186 __rpc_clnt_remove_pipedir(clnt);
187 break;
188 default:
189 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
190 return -ENOTSUPP;
191 }
192 return 0;
193}
194
195static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
196 struct super_block *sb)
197{
198 int error = 0;
199
200 for (;; clnt = clnt->cl_parent) {
201 if (!rpc_clnt_skip_event(clnt, event))
202 error = __rpc_clnt_handle_event(clnt, event, sb);
203 if (error || clnt == clnt->cl_parent)
204 break;
205 }
206 return error;
207}
208
209static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
210{
211 struct sunrpc_net *sn = net_generic(net, id: sunrpc_net_id);
212 struct rpc_clnt *clnt;
213
214 spin_lock(lock: &sn->rpc_client_lock);
215 list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
216 if (rpc_clnt_skip_event(clnt, event))
217 continue;
218 spin_unlock(lock: &sn->rpc_client_lock);
219 return clnt;
220 }
221 spin_unlock(lock: &sn->rpc_client_lock);
222 return NULL;
223}
224
225static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
226 void *ptr)
227{
228 struct super_block *sb = ptr;
229 struct rpc_clnt *clnt;
230 int error = 0;
231
232 while ((clnt = rpc_get_client_for_event(net: sb->s_fs_info, event))) {
233 error = __rpc_pipefs_event(clnt, event, sb);
234 if (error)
235 break;
236 }
237 return error;
238}
239
240static struct notifier_block rpc_clients_block = {
241 .notifier_call = rpc_pipefs_event,
242 .priority = SUNRPC_PIPEFS_RPC_PRIO,
243};
244
245int rpc_clients_notifier_register(void)
246{
247 return rpc_pipefs_notifier_register(&rpc_clients_block);
248}
249
250void rpc_clients_notifier_unregister(void)
251{
252 return rpc_pipefs_notifier_unregister(&rpc_clients_block);
253}
254
255static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
256 struct rpc_xprt *xprt,
257 const struct rpc_timeout *timeout)
258{
259 struct rpc_xprt *old;
260
261 spin_lock(lock: &clnt->cl_lock);
262 old = rcu_dereference_protected(clnt->cl_xprt,
263 lockdep_is_held(&clnt->cl_lock));
264
265 clnt->cl_timeout = timeout;
266 rcu_assign_pointer(clnt->cl_xprt, xprt);
267 spin_unlock(lock: &clnt->cl_lock);
268
269 return old;
270}
271
272static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
273{
274 ssize_t copied;
275
276 copied = strscpy(clnt->cl_nodename,
277 nodename, sizeof(clnt->cl_nodename));
278
279 clnt->cl_nodelen = copied < 0
280 ? sizeof(clnt->cl_nodename) - 1
281 : copied;
282}
283
284static int rpc_client_register(struct rpc_clnt *clnt,
285 rpc_authflavor_t pseudoflavor,
286 const char *client_name)
287{
288 struct rpc_auth_create_args auth_args = {
289 .pseudoflavor = pseudoflavor,
290 .target_name = client_name,
291 };
292 struct rpc_auth *auth;
293 struct net *net = rpc_net_ns(clnt);
294 struct super_block *pipefs_sb;
295 int err;
296
297 rpc_clnt_debugfs_register(clnt);
298
299 pipefs_sb = rpc_get_sb_net(net);
300 if (pipefs_sb) {
301 err = rpc_setup_pipedir(pipefs_sb, clnt);
302 if (err)
303 goto out;
304 }
305
306 rpc_register_client(clnt);
307 if (pipefs_sb)
308 rpc_put_sb_net(net);
309
310 auth = rpcauth_create(&auth_args, clnt);
311 if (IS_ERR(ptr: auth)) {
312 dprintk("RPC: Couldn't create auth handle (flavor %u)\n",
313 pseudoflavor);
314 err = PTR_ERR(ptr: auth);
315 goto err_auth;
316 }
317 return 0;
318err_auth:
319 pipefs_sb = rpc_get_sb_net(net);
320 rpc_unregister_client(clnt);
321 __rpc_clnt_remove_pipedir(clnt);
322out:
323 if (pipefs_sb)
324 rpc_put_sb_net(net);
325 rpc_sysfs_client_destroy(clnt);
326 rpc_clnt_debugfs_unregister(clnt);
327 return err;
328}
329
330static DEFINE_IDA(rpc_clids);
331
332void rpc_cleanup_clids(void)
333{
334 ida_destroy(ida: &rpc_clids);
335}
336
337static int rpc_alloc_clid(struct rpc_clnt *clnt)
338{
339 int clid;
340
341 clid = ida_alloc(ida: &rpc_clids, GFP_KERNEL);
342 if (clid < 0)
343 return clid;
344 clnt->cl_clid = clid;
345 return 0;
346}
347
348static void rpc_free_clid(struct rpc_clnt *clnt)
349{
350 ida_free(&rpc_clids, id: clnt->cl_clid);
351}
352
353static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
354 struct rpc_xprt_switch *xps,
355 struct rpc_xprt *xprt,
356 struct rpc_clnt *parent)
357{
358 const struct rpc_program *program = args->program;
359 const struct rpc_version *version;
360 struct rpc_clnt *clnt = NULL;
361 const struct rpc_timeout *timeout;
362 const char *nodename = args->nodename;
363 int err;
364
365 err = rpciod_up();
366 if (err)
367 goto out_no_rpciod;
368
369 err = -EINVAL;
370 if (args->version >= program->nrvers)
371 goto out_err;
372 version = program->version[args->version];
373 if (version == NULL)
374 goto out_err;
375
376 err = -ENOMEM;
377 clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
378 if (!clnt)
379 goto out_err;
380 clnt->cl_parent = parent ? : clnt;
381 clnt->cl_xprtsec = args->xprtsec;
382
383 err = rpc_alloc_clid(clnt);
384 if (err)
385 goto out_no_clid;
386
387 clnt->cl_cred = get_cred(cred: args->cred);
388 clnt->cl_procinfo = version->procs;
389 clnt->cl_maxproc = version->nrprocs;
390 clnt->cl_prog = args->prognumber ? : program->number;
391 clnt->cl_vers = version->number;
392 clnt->cl_stats = args->stats ? : program->stats;
393 clnt->cl_metrics = rpc_alloc_iostats(clnt);
394 rpc_init_pipe_dir_head(pdh: &clnt->cl_pipedir_objects);
395 err = -ENOMEM;
396 if (clnt->cl_metrics == NULL)
397 goto out_no_stats;
398 clnt->cl_program = program;
399 INIT_LIST_HEAD(list: &clnt->cl_tasks);
400 spin_lock_init(&clnt->cl_lock);
401
402 timeout = xprt->timeout;
403 if (args->timeout != NULL) {
404 memcpy(to: &clnt->cl_timeout_default, from: args->timeout,
405 len: sizeof(clnt->cl_timeout_default));
406 timeout = &clnt->cl_timeout_default;
407 }
408
409 rpc_clnt_set_transport(clnt, xprt, timeout);
410 xprt->main = true;
411 xprt_iter_init(xpi: &clnt->cl_xpi, xps);
412 xprt_switch_put(xps);
413
414 clnt->cl_rtt = &clnt->cl_rtt_default;
415 rpc_init_rtt(rt: &clnt->cl_rtt_default, timeo: clnt->cl_timeout->to_initval);
416
417 refcount_set(r: &clnt->cl_count, n: 1);
418
419 if (nodename == NULL)
420 nodename = utsname()->nodename;
421 /* save the nodename */
422 rpc_clnt_set_nodename(clnt, nodename);
423
424 rpc_sysfs_client_setup(clnt, xprt_switch: xps, net: rpc_net_ns(clnt));
425 err = rpc_client_register(clnt, pseudoflavor: args->authflavor, client_name: args->client_name);
426 if (err)
427 goto out_no_path;
428 if (parent)
429 refcount_inc(r: &parent->cl_count);
430
431 trace_rpc_clnt_new(clnt, xprt, args);
432 return clnt;
433
434out_no_path:
435 rpc_free_iostats(clnt->cl_metrics);
436out_no_stats:
437 put_cred(cred: clnt->cl_cred);
438 rpc_free_clid(clnt);
439out_no_clid:
440 kfree(objp: clnt);
441out_err:
442 rpciod_down();
443out_no_rpciod:
444 xprt_switch_put(xps);
445 xprt_put(xprt);
446 trace_rpc_clnt_new_err(program: program->name, server: args->servername, error: err);
447 return ERR_PTR(error: err);
448}
449
450static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
451 struct rpc_xprt *xprt)
452{
453 struct rpc_clnt *clnt = NULL;
454 struct rpc_xprt_switch *xps;
455
456 if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
457 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
458 xps = args->bc_xprt->xpt_bc_xps;
459 xprt_switch_get(xps);
460 } else {
461 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
462 if (xps == NULL) {
463 xprt_put(xprt);
464 return ERR_PTR(error: -ENOMEM);
465 }
466 if (xprt->bc_xprt) {
467 xprt_switch_get(xps);
468 xprt->bc_xprt->xpt_bc_xps = xps;
469 }
470 }
471 clnt = rpc_new_client(args, xps, xprt, NULL);
472 if (IS_ERR(ptr: clnt))
473 return clnt;
474
475 if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
476 int err = rpc_ping(clnt);
477 if (err != 0) {
478 rpc_shutdown_client(clnt);
479 return ERR_PTR(error: err);
480 }
481 } else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
482 int err = rpc_ping_noreply(clnt);
483 if (err != 0) {
484 rpc_shutdown_client(clnt);
485 return ERR_PTR(error: err);
486 }
487 }
488
489 clnt->cl_softrtry = 1;
490 if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
491 clnt->cl_softrtry = 0;
492 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
493 clnt->cl_softerr = 1;
494 }
495
496 if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
497 clnt->cl_autobind = 1;
498 if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
499 clnt->cl_noretranstimeo = 1;
500 if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
501 clnt->cl_discrtry = 1;
502 if (!(args->flags & RPC_CLNT_CREATE_QUIET))
503 clnt->cl_chatty = 1;
504 if (args->flags & RPC_CLNT_CREATE_NETUNREACH_FATAL)
505 clnt->cl_netunreach_fatal = 1;
506
507 return clnt;
508}
509
510/**
511 * rpc_create - create an RPC client and transport with one call
512 * @args: rpc_clnt create argument structure
513 *
514 * Creates and initializes an RPC transport and an RPC client.
515 *
516 * It can ping the server in order to determine if it is up, and to see if
517 * it supports this program and version. RPC_CLNT_CREATE_NOPING disables
518 * this behavior so asynchronous tasks can also use rpc_create.
519 */
520struct rpc_clnt *rpc_create(struct rpc_create_args *args)
521{
522 struct rpc_xprt *xprt;
523 struct xprt_create xprtargs = {
524 .net = args->net,
525 .ident = args->protocol,
526 .srcaddr = args->saddress,
527 .dstaddr = args->address,
528 .addrlen = args->addrsize,
529 .servername = args->servername,
530 .bc_xprt = args->bc_xprt,
531 .xprtsec = args->xprtsec,
532 .connect_timeout = args->connect_timeout,
533 .reconnect_timeout = args->reconnect_timeout,
534 };
535 char servername[RPC_MAXNETNAMELEN];
536 struct rpc_clnt *clnt;
537 int i;
538
539 if (args->bc_xprt) {
540 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
541 xprt = args->bc_xprt->xpt_bc_xprt;
542 if (xprt) {
543 xprt_get(xprt);
544 return rpc_create_xprt(args, xprt);
545 }
546 }
547
548 if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
549 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
550 if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
551 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
552 /*
553 * If the caller chooses not to specify a hostname, whip
554 * up a string representation of the passed-in address.
555 */
556 if (xprtargs.servername == NULL) {
557 struct sockaddr_un *sun =
558 (struct sockaddr_un *)args->address;
559 struct sockaddr_in *sin =
560 (struct sockaddr_in *)args->address;
561 struct sockaddr_in6 *sin6 =
562 (struct sockaddr_in6 *)args->address;
563
564 servername[0] = '\0';
565 switch (args->address->sa_family) {
566 case AF_LOCAL:
567 if (sun->sun_path[0])
568 snprintf(buf: servername, size: sizeof(servername), fmt: "%s",
569 sun->sun_path);
570 else
571 snprintf(buf: servername, size: sizeof(servername), fmt: "@%s",
572 sun->sun_path+1);
573 break;
574 case AF_INET:
575 snprintf(buf: servername, size: sizeof(servername), fmt: "%pI4",
576 &sin->sin_addr.s_addr);
577 break;
578 case AF_INET6:
579 snprintf(buf: servername, size: sizeof(servername), fmt: "%pI6",
580 &sin6->sin6_addr);
581 break;
582 default:
583 /* caller wants default server name, but
584 * address family isn't recognized. */
585 return ERR_PTR(error: -EINVAL);
586 }
587 xprtargs.servername = servername;
588 }
589
590 xprt = xprt_create_transport(args: &xprtargs);
591 if (IS_ERR(ptr: xprt))
592 return (struct rpc_clnt *)xprt;
593
594 /*
595 * By default, kernel RPC client connects from a reserved port.
596 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
597 * but it is always enabled for rpciod, which handles the connect
598 * operation.
599 */
600 xprt->resvport = 1;
601 if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
602 xprt->resvport = 0;
603 xprt->reuseport = 0;
604 if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
605 xprt->reuseport = 1;
606
607 clnt = rpc_create_xprt(args, xprt);
608 if (IS_ERR(ptr: clnt) || args->nconnect <= 1)
609 return clnt;
610
611 for (i = 0; i < args->nconnect - 1; i++) {
612 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
613 break;
614 }
615 return clnt;
616}
617EXPORT_SYMBOL_GPL(rpc_create);
618
619/*
620 * This function clones the RPC client structure. It allows us to share the
621 * same transport while varying parameters such as the authentication
622 * flavour.
623 */
624static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
625 struct rpc_clnt *clnt)
626{
627 struct rpc_xprt_switch *xps;
628 struct rpc_xprt *xprt;
629 struct rpc_clnt *new;
630 int err;
631
632 err = -ENOMEM;
633 rcu_read_lock();
634 xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
635 xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
636 rcu_read_unlock();
637 if (xprt == NULL || xps == NULL) {
638 xprt_put(xprt);
639 xprt_switch_put(xps);
640 goto out_err;
641 }
642 args->servername = xprt->servername;
643 args->nodename = clnt->cl_nodename;
644
645 new = rpc_new_client(args, xps, xprt, parent: clnt);
646 if (IS_ERR(ptr: new))
647 return new;
648
649 /* Turn off autobind on clones */
650 new->cl_autobind = 0;
651 new->cl_softrtry = clnt->cl_softrtry;
652 new->cl_softerr = clnt->cl_softerr;
653 new->cl_noretranstimeo = clnt->cl_noretranstimeo;
654 new->cl_discrtry = clnt->cl_discrtry;
655 new->cl_chatty = clnt->cl_chatty;
656 new->cl_netunreach_fatal = clnt->cl_netunreach_fatal;
657 new->cl_principal = clnt->cl_principal;
658 new->cl_max_connect = clnt->cl_max_connect;
659 return new;
660
661out_err:
662 trace_rpc_clnt_clone_err(clnt, error: err);
663 return ERR_PTR(error: err);
664}
665
666/**
667 * rpc_clone_client - Clone an RPC client structure
668 *
669 * @clnt: RPC client whose parameters are copied
670 *
671 * Returns a fresh RPC client or an ERR_PTR.
672 */
673struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
674{
675 struct rpc_create_args args = {
676 .program = clnt->cl_program,
677 .prognumber = clnt->cl_prog,
678 .version = clnt->cl_vers,
679 .authflavor = clnt->cl_auth->au_flavor,
680 .cred = clnt->cl_cred,
681 .stats = clnt->cl_stats,
682 };
683 return __rpc_clone_client(args: &args, clnt);
684}
685EXPORT_SYMBOL_GPL(rpc_clone_client);
686
687/**
688 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
689 *
690 * @clnt: RPC client whose parameters are copied
691 * @flavor: security flavor for new client
692 *
693 * Returns a fresh RPC client or an ERR_PTR.
694 */
695struct rpc_clnt *
696rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
697{
698 struct rpc_create_args args = {
699 .program = clnt->cl_program,
700 .prognumber = clnt->cl_prog,
701 .version = clnt->cl_vers,
702 .authflavor = flavor,
703 .cred = clnt->cl_cred,
704 .stats = clnt->cl_stats,
705 };
706 return __rpc_clone_client(args: &args, clnt);
707}
708EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
709
710/**
711 * rpc_switch_client_transport: switch the RPC transport on the fly
712 * @clnt: pointer to a struct rpc_clnt
713 * @args: pointer to the new transport arguments
714 * @timeout: pointer to the new timeout parameters
715 *
716 * This function allows the caller to switch the RPC transport for the
717 * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
718 * server, for instance. It assumes that the caller has ensured that
719 * there are no active RPC tasks by using some form of locking.
720 *
721 * Returns zero if "clnt" is now using the new xprt. Otherwise a
722 * negative errno is returned, and "clnt" continues to use the old
723 * xprt.
724 */
725int rpc_switch_client_transport(struct rpc_clnt *clnt,
726 struct xprt_create *args,
727 const struct rpc_timeout *timeout)
728{
729 const struct rpc_timeout *old_timeo;
730 rpc_authflavor_t pseudoflavor;
731 struct rpc_xprt_switch *xps, *oldxps;
732 struct rpc_xprt *xprt, *old;
733 struct rpc_clnt *parent;
734 int err;
735
736 args->xprtsec = clnt->cl_xprtsec;
737 xprt = xprt_create_transport(args);
738 if (IS_ERR(ptr: xprt))
739 return PTR_ERR(ptr: xprt);
740
741 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
742 if (xps == NULL) {
743 xprt_put(xprt);
744 return -ENOMEM;
745 }
746
747 pseudoflavor = clnt->cl_auth->au_flavor;
748
749 old_timeo = clnt->cl_timeout;
750 old = rpc_clnt_set_transport(clnt, xprt, timeout);
751 oldxps = xprt_iter_xchg_switch(xpi: &clnt->cl_xpi, newswitch: xps);
752
753 rpc_unregister_client(clnt);
754 __rpc_clnt_remove_pipedir(clnt);
755 rpc_sysfs_client_destroy(clnt);
756 rpc_clnt_debugfs_unregister(clnt);
757
758 /*
759 * A new transport was created. "clnt" therefore
760 * becomes the root of a new cl_parent tree. clnt's
761 * children, if it has any, still point to the old xprt.
762 */
763 parent = clnt->cl_parent;
764 clnt->cl_parent = clnt;
765
766 /*
767 * The old rpc_auth cache cannot be re-used. GSS
768 * contexts in particular are between a single
769 * client and server.
770 */
771 err = rpc_client_register(clnt, pseudoflavor, NULL);
772 if (err)
773 goto out_revert;
774
775 synchronize_rcu();
776 if (parent != clnt)
777 rpc_release_client(parent);
778 xprt_switch_put(xps: oldxps);
779 xprt_put(xprt: old);
780 trace_rpc_clnt_replace_xprt(clnt);
781 return 0;
782
783out_revert:
784 xps = xprt_iter_xchg_switch(xpi: &clnt->cl_xpi, newswitch: oldxps);
785 rpc_clnt_set_transport(clnt, xprt: old, timeout: old_timeo);
786 clnt->cl_parent = parent;
787 rpc_client_register(clnt, pseudoflavor, NULL);
788 xprt_switch_put(xps);
789 xprt_put(xprt);
790 trace_rpc_clnt_replace_xprt_err(clnt);
791 return err;
792}
793EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
794
795static struct rpc_xprt_switch *rpc_clnt_xprt_switch_get(struct rpc_clnt *clnt)
796{
797 struct rpc_xprt_switch *xps;
798
799 rcu_read_lock();
800 xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
801 rcu_read_unlock();
802
803 return xps;
804}
805
806static
807int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
808 void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
809{
810 struct rpc_xprt_switch *xps;
811
812 xps = rpc_clnt_xprt_switch_get(clnt);
813 if (xps == NULL)
814 return -EAGAIN;
815 func(xpi, xps);
816 xprt_switch_put(xps);
817 return 0;
818}
819
820static
821int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
822{
823 return _rpc_clnt_xprt_iter_init(clnt, xpi, func: xprt_iter_init_listall);
824}
825
826static
827int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
828 struct rpc_xprt_iter *xpi)
829{
830 return _rpc_clnt_xprt_iter_init(clnt, xpi, func: xprt_iter_init_listoffline);
831}
832
833/**
834 * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
835 * @clnt: pointer to client
836 * @fn: function to apply
837 * @data: void pointer to function data
838 *
839 * Iterates through the list of RPC transports currently attached to the
840 * client and applies the function fn(clnt, xprt, data).
841 *
842 * On error, the iteration stops, and the function returns the error value.
843 */
844int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
845 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
846 void *data)
847{
848 struct rpc_xprt_iter xpi;
849 int ret;
850
851 ret = rpc_clnt_xprt_iter_init(clnt, xpi: &xpi);
852 if (ret)
853 return ret;
854 for (;;) {
855 struct rpc_xprt *xprt = xprt_iter_get_next(xpi: &xpi);
856
857 if (!xprt)
858 break;
859 ret = fn(clnt, xprt, data);
860 xprt_put(xprt);
861 if (ret < 0)
862 break;
863 }
864 xprt_iter_destroy(xpi: &xpi);
865 return ret;
866}
867EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
868
869/*
870 * Kill all tasks for the given client.
871 * XXX: kill their descendants as well?
872 */
873void rpc_killall_tasks(struct rpc_clnt *clnt)
874{
875 struct rpc_task *rovr;
876
877
878 if (list_empty(head: &clnt->cl_tasks))
879 return;
880
881 /*
882 * Spin lock all_tasks to prevent changes...
883 */
884 trace_rpc_clnt_killall(clnt);
885 spin_lock(lock: &clnt->cl_lock);
886 list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
887 rpc_signal_task(rovr);
888 spin_unlock(lock: &clnt->cl_lock);
889}
890EXPORT_SYMBOL_GPL(rpc_killall_tasks);
891
892/**
893 * rpc_cancel_tasks - try to cancel a set of RPC tasks
894 * @clnt: Pointer to RPC client
895 * @error: RPC task error value to set
896 * @fnmatch: Pointer to selector function
897 * @data: User data
898 *
899 * Uses @fnmatch to define a set of RPC tasks that are to be cancelled.
900 * The argument @error must be a negative error value.
901 */
902unsigned long rpc_cancel_tasks(struct rpc_clnt *clnt, int error,
903 bool (*fnmatch)(const struct rpc_task *,
904 const void *),
905 const void *data)
906{
907 struct rpc_task *task;
908 unsigned long count = 0;
909
910 if (list_empty(head: &clnt->cl_tasks))
911 return 0;
912 /*
913 * Spin lock all_tasks to prevent changes...
914 */
915 spin_lock(lock: &clnt->cl_lock);
916 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
917 if (!RPC_IS_ACTIVATED(task))
918 continue;
919 if (!fnmatch(task, data))
920 continue;
921 rpc_task_try_cancel(task, error);
922 count++;
923 }
924 spin_unlock(lock: &clnt->cl_lock);
925 return count;
926}
927EXPORT_SYMBOL_GPL(rpc_cancel_tasks);
928
929static int rpc_clnt_disconnect_xprt(struct rpc_clnt *clnt,
930 struct rpc_xprt *xprt, void *dummy)
931{
932 if (xprt_connected(xprt))
933 xprt_force_disconnect(xprt);
934 return 0;
935}
936
937void rpc_clnt_disconnect(struct rpc_clnt *clnt)
938{
939 rpc_clnt_iterate_for_each_xprt(clnt, rpc_clnt_disconnect_xprt, NULL);
940}
941EXPORT_SYMBOL_GPL(rpc_clnt_disconnect);
942
943/*
944 * Properly shut down an RPC client, terminating all outstanding
945 * requests.
946 */
947void rpc_shutdown_client(struct rpc_clnt *clnt)
948{
949 might_sleep();
950
951 trace_rpc_clnt_shutdown(clnt);
952
953 clnt->cl_shutdown = 1;
954 while (!list_empty(head: &clnt->cl_tasks)) {
955 rpc_killall_tasks(clnt);
956 wait_event_timeout(destroy_wait,
957 list_empty(&clnt->cl_tasks), 1*HZ);
958 }
959
960 /* wait for tasks still in workqueue or waitqueue */
961 wait_event_timeout(destroy_wait,
962 atomic_read(&clnt->cl_task_count) == 0, 1 * HZ);
963
964 rpc_release_client(clnt);
965}
966EXPORT_SYMBOL_GPL(rpc_shutdown_client);
967
968/*
969 * Free an RPC client
970 */
971static void rpc_free_client_work(struct work_struct *work)
972{
973 struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
974
975 trace_rpc_clnt_free(clnt);
976
977 /* These might block on processes that might allocate memory,
978 * so they cannot be called in rpciod, so they are handled separately
979 * here.
980 */
981 rpc_sysfs_client_destroy(clnt);
982 rpc_clnt_debugfs_unregister(clnt);
983 rpc_free_clid(clnt);
984 rpc_clnt_remove_pipedir(clnt);
985 xprt_put(rcu_dereference_raw(clnt->cl_xprt));
986
987 kfree(objp: clnt);
988 rpciod_down();
989}
990static struct rpc_clnt *
991rpc_free_client(struct rpc_clnt *clnt)
992{
993 struct rpc_clnt *parent = NULL;
994
995 trace_rpc_clnt_release(clnt);
996 if (clnt->cl_parent != clnt)
997 parent = clnt->cl_parent;
998 rpc_unregister_client(clnt);
999 rpc_free_iostats(clnt->cl_metrics);
1000 clnt->cl_metrics = NULL;
1001 xprt_iter_destroy(xpi: &clnt->cl_xpi);
1002 put_cred(cred: clnt->cl_cred);
1003
1004 INIT_WORK(&clnt->cl_work, rpc_free_client_work);
1005 schedule_work(work: &clnt->cl_work);
1006 return parent;
1007}
1008
1009/*
1010 * Free an RPC client
1011 */
1012static struct rpc_clnt *
1013rpc_free_auth(struct rpc_clnt *clnt)
1014{
1015 /*
1016 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
1017 * release remaining GSS contexts. This mechanism ensures
1018 * that it can do so safely.
1019 */
1020 if (clnt->cl_auth != NULL) {
1021 rpcauth_release(clnt->cl_auth);
1022 clnt->cl_auth = NULL;
1023 }
1024 if (refcount_dec_and_test(r: &clnt->cl_count))
1025 return rpc_free_client(clnt);
1026 return NULL;
1027}
1028
1029/*
1030 * Release reference to the RPC client
1031 */
1032void
1033rpc_release_client(struct rpc_clnt *clnt)
1034{
1035 do {
1036 if (list_empty(head: &clnt->cl_tasks))
1037 wake_up(&destroy_wait);
1038 if (refcount_dec_not_one(r: &clnt->cl_count))
1039 break;
1040 clnt = rpc_free_auth(clnt);
1041 } while (clnt != NULL);
1042}
1043EXPORT_SYMBOL_GPL(rpc_release_client);
1044
1045/**
1046 * rpc_bind_new_program - bind a new RPC program to an existing client
1047 * @old: old rpc_client
1048 * @program: rpc program to set
1049 * @vers: rpc program version
1050 *
1051 * Clones the rpc client and sets up a new RPC program. This is mainly
1052 * of use for enabling different RPC programs to share the same transport.
1053 * The Sun NFSv2/v3 ACL protocol can do this.
1054 */
1055struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
1056 const struct rpc_program *program,
1057 u32 vers)
1058{
1059 struct rpc_create_args args = {
1060 .program = program,
1061 .prognumber = program->number,
1062 .version = vers,
1063 .authflavor = old->cl_auth->au_flavor,
1064 .cred = old->cl_cred,
1065 .stats = old->cl_stats,
1066 .timeout = old->cl_timeout,
1067 };
1068 struct rpc_clnt *clnt;
1069 int err;
1070
1071 clnt = __rpc_clone_client(args: &args, clnt: old);
1072 if (IS_ERR(ptr: clnt))
1073 goto out;
1074 err = rpc_ping(clnt);
1075 if (err != 0) {
1076 rpc_shutdown_client(clnt);
1077 clnt = ERR_PTR(error: err);
1078 }
1079out:
1080 return clnt;
1081}
1082EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1083
1084struct rpc_xprt *
1085rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1086{
1087 struct rpc_xprt_switch *xps;
1088
1089 if (!xprt)
1090 return NULL;
1091 rcu_read_lock();
1092 xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1093 atomic_long_inc(v: &xps->xps_queuelen);
1094 rcu_read_unlock();
1095 atomic_long_inc(v: &xprt->queuelen);
1096
1097 return xprt;
1098}
1099
1100static void
1101rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1102{
1103 struct rpc_xprt_switch *xps;
1104
1105 atomic_long_dec(v: &xprt->queuelen);
1106 rcu_read_lock();
1107 xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1108 atomic_long_dec(v: &xps->xps_queuelen);
1109 rcu_read_unlock();
1110
1111 xprt_put(xprt);
1112}
1113
1114void rpc_task_release_transport(struct rpc_task *task)
1115{
1116 struct rpc_xprt *xprt = task->tk_xprt;
1117
1118 if (xprt) {
1119 task->tk_xprt = NULL;
1120 if (task->tk_client)
1121 rpc_task_release_xprt(clnt: task->tk_client, xprt);
1122 else
1123 xprt_put(xprt);
1124 }
1125}
1126EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1127
1128void rpc_task_release_client(struct rpc_task *task)
1129{
1130 struct rpc_clnt *clnt = task->tk_client;
1131
1132 rpc_task_release_transport(task);
1133 if (clnt != NULL) {
1134 /* Remove from client task list */
1135 spin_lock(lock: &clnt->cl_lock);
1136 list_del(entry: &task->tk_task);
1137 spin_unlock(lock: &clnt->cl_lock);
1138 task->tk_client = NULL;
1139 atomic_dec(v: &clnt->cl_task_count);
1140
1141 rpc_release_client(clnt);
1142 }
1143}
1144
1145static struct rpc_xprt *
1146rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1147{
1148 struct rpc_xprt *xprt;
1149
1150 rcu_read_lock();
1151 xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1152 rcu_read_unlock();
1153 return rpc_task_get_xprt(clnt, xprt);
1154}
1155
1156static struct rpc_xprt *
1157rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1158{
1159 return rpc_task_get_xprt(clnt, xprt: xprt_iter_get_next(xpi: &clnt->cl_xpi));
1160}
1161
1162static
1163void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1164{
1165 if (task->tk_xprt) {
1166 if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1167 (task->tk_flags & RPC_TASK_MOVEABLE)))
1168 return;
1169 xprt_release(task);
1170 xprt_put(xprt: task->tk_xprt);
1171 }
1172 if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1173 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1174 else
1175 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1176}
1177
1178static
1179void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1180{
1181 rpc_task_set_transport(task, clnt);
1182 task->tk_client = clnt;
1183 refcount_inc(r: &clnt->cl_count);
1184 if (clnt->cl_softrtry)
1185 task->tk_flags |= RPC_TASK_SOFT;
1186 if (clnt->cl_softerr)
1187 task->tk_flags |= RPC_TASK_TIMEOUT;
1188 if (clnt->cl_noretranstimeo)
1189 task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1190 if (clnt->cl_netunreach_fatal)
1191 task->tk_flags |= RPC_TASK_NETUNREACH_FATAL;
1192 atomic_inc(v: &clnt->cl_task_count);
1193}
1194
1195static void
1196rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1197{
1198 if (msg != NULL) {
1199 task->tk_msg.rpc_proc = msg->rpc_proc;
1200 task->tk_msg.rpc_argp = msg->rpc_argp;
1201 task->tk_msg.rpc_resp = msg->rpc_resp;
1202 task->tk_msg.rpc_cred = msg->rpc_cred;
1203 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1204 get_cred(cred: task->tk_msg.rpc_cred);
1205 }
1206}
1207
1208/*
1209 * Default callback for async RPC calls
1210 */
1211static void
1212rpc_default_callback(struct rpc_task *task, void *data)
1213{
1214}
1215
1216static const struct rpc_call_ops rpc_default_ops = {
1217 .rpc_call_done = rpc_default_callback,
1218};
1219
1220/**
1221 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1222 * @task_setup_data: pointer to task initialisation data
1223 */
1224struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1225{
1226 struct rpc_task *task;
1227
1228 task = rpc_new_task(task_setup_data);
1229 if (IS_ERR(ptr: task))
1230 return task;
1231
1232 if (!RPC_IS_ASYNC(task))
1233 task->tk_flags |= RPC_TASK_CRED_NOREF;
1234
1235 rpc_task_set_client(task, clnt: task_setup_data->rpc_client);
1236 rpc_task_set_rpc_message(task, msg: task_setup_data->rpc_message);
1237
1238 if (task->tk_action == NULL)
1239 rpc_call_start(task);
1240
1241 atomic_inc(v: &task->tk_count);
1242 rpc_execute(task);
1243 return task;
1244}
1245EXPORT_SYMBOL_GPL(rpc_run_task);
1246
1247/**
1248 * rpc_call_sync - Perform a synchronous RPC call
1249 * @clnt: pointer to RPC client
1250 * @msg: RPC call parameters
1251 * @flags: RPC call flags
1252 */
1253int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1254{
1255 struct rpc_task *task;
1256 struct rpc_task_setup task_setup_data = {
1257 .rpc_client = clnt,
1258 .rpc_message = msg,
1259 .callback_ops = &rpc_default_ops,
1260 .flags = flags,
1261 };
1262 int status;
1263
1264 WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1265 if (flags & RPC_TASK_ASYNC) {
1266 rpc_release_calldata(task_setup_data.callback_ops,
1267 task_setup_data.callback_data);
1268 return -EINVAL;
1269 }
1270
1271 task = rpc_run_task(&task_setup_data);
1272 if (IS_ERR(ptr: task))
1273 return PTR_ERR(ptr: task);
1274 status = task->tk_status;
1275 rpc_put_task(task);
1276 return status;
1277}
1278EXPORT_SYMBOL_GPL(rpc_call_sync);
1279
1280/**
1281 * rpc_call_async - Perform an asynchronous RPC call
1282 * @clnt: pointer to RPC client
1283 * @msg: RPC call parameters
1284 * @flags: RPC call flags
1285 * @tk_ops: RPC call ops
1286 * @data: user call data
1287 */
1288int
1289rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1290 const struct rpc_call_ops *tk_ops, void *data)
1291{
1292 struct rpc_task *task;
1293 struct rpc_task_setup task_setup_data = {
1294 .rpc_client = clnt,
1295 .rpc_message = msg,
1296 .callback_ops = tk_ops,
1297 .callback_data = data,
1298 .flags = flags|RPC_TASK_ASYNC,
1299 };
1300
1301 task = rpc_run_task(&task_setup_data);
1302 if (IS_ERR(ptr: task))
1303 return PTR_ERR(ptr: task);
1304 rpc_put_task(task);
1305 return 0;
1306}
1307EXPORT_SYMBOL_GPL(rpc_call_async);
1308
1309#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1310static void call_bc_encode(struct rpc_task *task);
1311
1312/**
1313 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1314 * rpc_execute against it
1315 * @req: RPC request
1316 * @timeout: timeout values to use for this task
1317 */
1318struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
1319 struct rpc_timeout *timeout)
1320{
1321 struct rpc_task *task;
1322 struct rpc_task_setup task_setup_data = {
1323 .callback_ops = &rpc_default_ops,
1324 .flags = RPC_TASK_SOFTCONN |
1325 RPC_TASK_NO_RETRANS_TIMEOUT,
1326 };
1327
1328 dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1329 /*
1330 * Create an rpc_task to send the data
1331 */
1332 task = rpc_new_task(&task_setup_data);
1333 if (IS_ERR(task)) {
1334 xprt_free_bc_request(req);
1335 return task;
1336 }
1337
1338 xprt_init_bc_request(req, task, timeout);
1339
1340 task->tk_action = call_bc_encode;
1341 atomic_inc(&task->tk_count);
1342 WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1343 rpc_execute(task);
1344
1345 dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1346 return task;
1347}
1348#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1349
1350/**
1351 * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1352 * @req: RPC request to prepare
1353 * @pages: vector of struct page pointers
1354 * @base: offset in first page where receive should start, in bytes
1355 * @len: expected size of the upper layer data payload, in bytes
1356 * @hdrsize: expected size of upper layer reply header, in XDR words
1357 *
1358 */
1359void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1360 unsigned int base, unsigned int len,
1361 unsigned int hdrsize)
1362{
1363 hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1364
1365 xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1366 trace_rpc_xdr_reply_pages(task: req->rq_task, xdr: &req->rq_rcv_buf);
1367}
1368EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1369
1370void
1371rpc_call_start(struct rpc_task *task)
1372{
1373 task->tk_action = call_start;
1374}
1375EXPORT_SYMBOL_GPL(rpc_call_start);
1376
1377/**
1378 * rpc_peeraddr - extract remote peer address from clnt's xprt
1379 * @clnt: RPC client structure
1380 * @buf: target buffer
1381 * @bufsize: length of target buffer
1382 *
1383 * Returns the number of bytes that are actually in the stored address.
1384 */
1385size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1386{
1387 size_t bytes;
1388 struct rpc_xprt *xprt;
1389
1390 rcu_read_lock();
1391 xprt = rcu_dereference(clnt->cl_xprt);
1392
1393 bytes = xprt->addrlen;
1394 if (bytes > bufsize)
1395 bytes = bufsize;
1396 memcpy(to: buf, from: &xprt->addr, len: bytes);
1397 rcu_read_unlock();
1398
1399 return bytes;
1400}
1401EXPORT_SYMBOL_GPL(rpc_peeraddr);
1402
1403/**
1404 * rpc_peeraddr2str - return remote peer address in printable format
1405 * @clnt: RPC client structure
1406 * @format: address format
1407 *
1408 * NB: the lifetime of the memory referenced by the returned pointer is
1409 * the same as the rpc_xprt itself. As long as the caller uses this
1410 * pointer, it must hold the RCU read lock.
1411 */
1412const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1413 enum rpc_display_format_t format)
1414{
1415 struct rpc_xprt *xprt;
1416
1417 xprt = rcu_dereference(clnt->cl_xprt);
1418
1419 if (xprt->address_strings[format] != NULL)
1420 return xprt->address_strings[format];
1421 else
1422 return "unprintable";
1423}
1424EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1425
1426static const struct sockaddr_in rpc_inaddr_loopback = {
1427 .sin_family = AF_INET,
1428 .sin_addr.s_addr = htonl(INADDR_ANY),
1429};
1430
1431static const struct sockaddr_in6 rpc_in6addr_loopback = {
1432 .sin6_family = AF_INET6,
1433 .sin6_addr = IN6ADDR_ANY_INIT,
1434};
1435
1436/*
1437 * Try a getsockname() on a connected datagram socket. Using a
1438 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1439 * This conserves the ephemeral port number space.
1440 *
1441 * Returns zero and fills in "buf" if successful; otherwise, a
1442 * negative errno is returned.
1443 */
1444static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1445 struct sockaddr *buf)
1446{
1447 struct socket *sock;
1448 int err;
1449
1450 err = __sock_create(net, family: sap->sa_family,
1451 type: SOCK_DGRAM, IPPROTO_UDP, res: &sock, kern: 1);
1452 if (err < 0) {
1453 dprintk("RPC: can't create UDP socket (%d)\n", err);
1454 goto out;
1455 }
1456
1457 switch (sap->sa_family) {
1458 case AF_INET:
1459 err = kernel_bind(sock,
1460 addr: (struct sockaddr *)&rpc_inaddr_loopback,
1461 addrlen: sizeof(rpc_inaddr_loopback));
1462 break;
1463 case AF_INET6:
1464 err = kernel_bind(sock,
1465 addr: (struct sockaddr *)&rpc_in6addr_loopback,
1466 addrlen: sizeof(rpc_in6addr_loopback));
1467 break;
1468 default:
1469 err = -EAFNOSUPPORT;
1470 goto out_release;
1471 }
1472 if (err < 0) {
1473 dprintk("RPC: can't bind UDP socket (%d)\n", err);
1474 goto out_release;
1475 }
1476
1477 err = kernel_connect(sock, addr: sap, addrlen: salen, flags: 0);
1478 if (err < 0) {
1479 dprintk("RPC: can't connect UDP socket (%d)\n", err);
1480 goto out_release;
1481 }
1482
1483 err = kernel_getsockname(sock, addr: buf);
1484 if (err < 0) {
1485 dprintk("RPC: getsockname failed (%d)\n", err);
1486 goto out_release;
1487 }
1488
1489 err = 0;
1490 if (buf->sa_family == AF_INET6) {
1491 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1492 sin6->sin6_scope_id = 0;
1493 }
1494 dprintk("RPC: %s succeeded\n", __func__);
1495
1496out_release:
1497 sock_release(sock);
1498out:
1499 return err;
1500}
1501
1502/*
1503 * Scraping a connected socket failed, so we don't have a useable
1504 * local address. Fallback: generate an address that will prevent
1505 * the server from calling us back.
1506 *
1507 * Returns zero and fills in "buf" if successful; otherwise, a
1508 * negative errno is returned.
1509 */
1510static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1511{
1512 switch (family) {
1513 case AF_INET:
1514 if (buflen < sizeof(rpc_inaddr_loopback))
1515 return -EINVAL;
1516 memcpy(to: buf, from: &rpc_inaddr_loopback,
1517 len: sizeof(rpc_inaddr_loopback));
1518 break;
1519 case AF_INET6:
1520 if (buflen < sizeof(rpc_in6addr_loopback))
1521 return -EINVAL;
1522 memcpy(to: buf, from: &rpc_in6addr_loopback,
1523 len: sizeof(rpc_in6addr_loopback));
1524 break;
1525 default:
1526 dprintk("RPC: %s: address family not supported\n",
1527 __func__);
1528 return -EAFNOSUPPORT;
1529 }
1530 dprintk("RPC: %s: succeeded\n", __func__);
1531 return 0;
1532}
1533
1534/**
1535 * rpc_localaddr - discover local endpoint address for an RPC client
1536 * @clnt: RPC client structure
1537 * @buf: target buffer
1538 * @buflen: size of target buffer, in bytes
1539 *
1540 * Returns zero and fills in "buf" and "buflen" if successful;
1541 * otherwise, a negative errno is returned.
1542 *
1543 * This works even if the underlying transport is not currently connected,
1544 * or if the upper layer never previously provided a source address.
1545 *
1546 * The result of this function call is transient: multiple calls in
1547 * succession may give different results, depending on how local
1548 * networking configuration changes over time.
1549 */
1550int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1551{
1552 struct sockaddr_storage address;
1553 struct sockaddr *sap = (struct sockaddr *)&address;
1554 struct rpc_xprt *xprt;
1555 struct net *net;
1556 size_t salen;
1557 int err;
1558
1559 rcu_read_lock();
1560 xprt = rcu_dereference(clnt->cl_xprt);
1561 salen = xprt->addrlen;
1562 memcpy(to: sap, from: &xprt->addr, len: salen);
1563 net = get_net(net: xprt->xprt_net);
1564 rcu_read_unlock();
1565
1566 rpc_set_port(sap, port: 0);
1567 err = rpc_sockname(net, sap, salen, buf);
1568 put_net(net);
1569 if (err != 0)
1570 /* Couldn't discover local address, return ANYADDR */
1571 return rpc_anyaddr(family: sap->sa_family, buf, buflen);
1572 return 0;
1573}
1574EXPORT_SYMBOL_GPL(rpc_localaddr);
1575
1576void
1577rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1578{
1579 struct rpc_xprt *xprt;
1580
1581 rcu_read_lock();
1582 xprt = rcu_dereference(clnt->cl_xprt);
1583 if (xprt->ops->set_buffer_size)
1584 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1585 rcu_read_unlock();
1586}
1587EXPORT_SYMBOL_GPL(rpc_setbufsize);
1588
1589/**
1590 * rpc_net_ns - Get the network namespace for this RPC client
1591 * @clnt: RPC client to query
1592 *
1593 */
1594struct net *rpc_net_ns(struct rpc_clnt *clnt)
1595{
1596 struct net *ret;
1597
1598 rcu_read_lock();
1599 ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1600 rcu_read_unlock();
1601 return ret;
1602}
1603EXPORT_SYMBOL_GPL(rpc_net_ns);
1604
1605/**
1606 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1607 * @clnt: RPC client to query
1608 *
1609 * For stream transports, this is one RPC record fragment (see RFC
1610 * 1831), as we don't support multi-record requests yet. For datagram
1611 * transports, this is the size of an IP packet minus the IP, UDP, and
1612 * RPC header sizes.
1613 */
1614size_t rpc_max_payload(struct rpc_clnt *clnt)
1615{
1616 size_t ret;
1617
1618 rcu_read_lock();
1619 ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1620 rcu_read_unlock();
1621 return ret;
1622}
1623EXPORT_SYMBOL_GPL(rpc_max_payload);
1624
1625/**
1626 * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1627 * @clnt: RPC client to query
1628 */
1629size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1630{
1631 struct rpc_xprt *xprt;
1632 size_t ret;
1633
1634 rcu_read_lock();
1635 xprt = rcu_dereference(clnt->cl_xprt);
1636 ret = xprt->ops->bc_maxpayload(xprt);
1637 rcu_read_unlock();
1638 return ret;
1639}
1640EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1641
1642unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1643{
1644 struct rpc_xprt *xprt;
1645 unsigned int ret;
1646
1647 rcu_read_lock();
1648 xprt = rcu_dereference(clnt->cl_xprt);
1649 ret = xprt->ops->bc_num_slots(xprt);
1650 rcu_read_unlock();
1651 return ret;
1652}
1653EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1654
1655/**
1656 * rpc_force_rebind - force transport to check that remote port is unchanged
1657 * @clnt: client to rebind
1658 *
1659 */
1660void rpc_force_rebind(struct rpc_clnt *clnt)
1661{
1662 if (clnt->cl_autobind) {
1663 rcu_read_lock();
1664 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1665 rcu_read_unlock();
1666 }
1667}
1668EXPORT_SYMBOL_GPL(rpc_force_rebind);
1669
1670static int
1671__rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1672{
1673 task->tk_status = 0;
1674 task->tk_rpc_status = 0;
1675 task->tk_action = action;
1676 return 1;
1677}
1678
1679/*
1680 * Restart an (async) RPC call. Usually called from within the
1681 * exit handler.
1682 */
1683int
1684rpc_restart_call(struct rpc_task *task)
1685{
1686 return __rpc_restart_call(task, action: call_start);
1687}
1688EXPORT_SYMBOL_GPL(rpc_restart_call);
1689
1690/*
1691 * Restart an (async) RPC call from the call_prepare state.
1692 * Usually called from within the exit handler.
1693 */
1694int
1695rpc_restart_call_prepare(struct rpc_task *task)
1696{
1697 if (task->tk_ops->rpc_call_prepare != NULL)
1698 return __rpc_restart_call(task, action: rpc_prepare_task);
1699 return rpc_restart_call(task);
1700}
1701EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1702
1703const char
1704*rpc_proc_name(const struct rpc_task *task)
1705{
1706 const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1707
1708 if (proc) {
1709 if (proc->p_name)
1710 return proc->p_name;
1711 else
1712 return "NULL";
1713 } else
1714 return "no proc";
1715}
1716
1717static void
1718__rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1719{
1720 trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1721 rpc_task_set_rpc_status(task, rpc_status);
1722 rpc_exit(task, tk_status);
1723}
1724
1725static void
1726rpc_call_rpcerror(struct rpc_task *task, int status)
1727{
1728 __rpc_call_rpcerror(task, tk_status: status, rpc_status: status);
1729}
1730
1731/*
1732 * 0. Initial state
1733 *
1734 * Other FSM states can be visited zero or more times, but
1735 * this state is visited exactly once for each RPC.
1736 */
1737static void
1738call_start(struct rpc_task *task)
1739{
1740 struct rpc_clnt *clnt = task->tk_client;
1741 int idx = task->tk_msg.rpc_proc->p_statidx;
1742
1743 trace_rpc_request(task);
1744
1745 if (task->tk_client->cl_shutdown) {
1746 rpc_call_rpcerror(task, status: -EIO);
1747 return;
1748 }
1749
1750 /* Increment call count (version might not be valid for ping) */
1751 if (clnt->cl_program->version[clnt->cl_vers])
1752 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1753 clnt->cl_stats->rpccnt++;
1754 task->tk_action = call_reserve;
1755 rpc_task_set_transport(task, clnt);
1756}
1757
1758/*
1759 * 1. Reserve an RPC call slot
1760 */
1761static void
1762call_reserve(struct rpc_task *task)
1763{
1764 task->tk_status = 0;
1765 task->tk_action = call_reserveresult;
1766 xprt_reserve(task);
1767}
1768
1769static void call_retry_reserve(struct rpc_task *task);
1770
1771/*
1772 * 1b. Grok the result of xprt_reserve()
1773 */
1774static void
1775call_reserveresult(struct rpc_task *task)
1776{
1777 int status = task->tk_status;
1778
1779 /*
1780 * After a call to xprt_reserve(), we must have either
1781 * a request slot or else an error status.
1782 */
1783 task->tk_status = 0;
1784 if (status >= 0) {
1785 if (task->tk_rqstp) {
1786 task->tk_action = call_refresh;
1787
1788 /* Add to the client's list of all tasks */
1789 spin_lock(lock: &task->tk_client->cl_lock);
1790 if (list_empty(head: &task->tk_task))
1791 list_add_tail(new: &task->tk_task, head: &task->tk_client->cl_tasks);
1792 spin_unlock(lock: &task->tk_client->cl_lock);
1793 return;
1794 }
1795 rpc_call_rpcerror(task, status: -EIO);
1796 return;
1797 }
1798
1799 switch (status) {
1800 case -ENOMEM:
1801 rpc_delay(task, HZ >> 2);
1802 fallthrough;
1803 case -EAGAIN: /* woken up; retry */
1804 task->tk_action = call_retry_reserve;
1805 return;
1806 default:
1807 rpc_call_rpcerror(task, status);
1808 }
1809}
1810
1811/*
1812 * 1c. Retry reserving an RPC call slot
1813 */
1814static void
1815call_retry_reserve(struct rpc_task *task)
1816{
1817 task->tk_status = 0;
1818 task->tk_action = call_reserveresult;
1819 xprt_retry_reserve(task);
1820}
1821
1822/*
1823 * 2. Bind and/or refresh the credentials
1824 */
1825static void
1826call_refresh(struct rpc_task *task)
1827{
1828 task->tk_action = call_refreshresult;
1829 task->tk_status = 0;
1830 task->tk_client->cl_stats->rpcauthrefresh++;
1831 rpcauth_refreshcred(task);
1832}
1833
1834/*
1835 * 2a. Process the results of a credential refresh
1836 */
1837static void
1838call_refreshresult(struct rpc_task *task)
1839{
1840 int status = task->tk_status;
1841
1842 task->tk_status = 0;
1843 task->tk_action = call_refresh;
1844 switch (status) {
1845 case 0:
1846 if (rpcauth_uptodatecred(task)) {
1847 task->tk_action = call_allocate;
1848 return;
1849 }
1850 /* Use rate-limiting and a max number of retries if refresh
1851 * had status 0 but failed to update the cred.
1852 */
1853 fallthrough;
1854 case -ETIMEDOUT:
1855 rpc_delay(task, 3*HZ);
1856 fallthrough;
1857 case -EAGAIN:
1858 status = -EACCES;
1859 if (!task->tk_cred_retry)
1860 break;
1861 task->tk_cred_retry--;
1862 trace_rpc_retry_refresh_status(task);
1863 return;
1864 case -EKEYEXPIRED:
1865 break;
1866 case -ENOMEM:
1867 rpc_delay(task, HZ >> 4);
1868 return;
1869 }
1870 trace_rpc_refresh_status(task);
1871 rpc_call_rpcerror(task, status);
1872}
1873
1874/*
1875 * 2b. Allocate the buffer. For details, see sched.c:rpc_malloc.
1876 * (Note: buffer memory is freed in xprt_release).
1877 */
1878static void
1879call_allocate(struct rpc_task *task)
1880{
1881 const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1882 struct rpc_rqst *req = task->tk_rqstp;
1883 struct rpc_xprt *xprt = req->rq_xprt;
1884 const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1885 int status;
1886
1887 task->tk_status = 0;
1888 task->tk_action = call_encode;
1889
1890 if (req->rq_buffer)
1891 return;
1892
1893 /*
1894 * Calculate the size (in quads) of the RPC call
1895 * and reply headers, and convert both values
1896 * to byte sizes.
1897 */
1898 req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1899 proc->p_arglen;
1900 req->rq_callsize <<= 2;
1901 /*
1902 * Note: the reply buffer must at minimum allocate enough space
1903 * for the 'struct accepted_reply' from RFC5531.
1904 */
1905 req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1906 max_t(size_t, proc->p_replen, 2);
1907 req->rq_rcvsize <<= 2;
1908
1909 status = xprt->ops->buf_alloc(task);
1910 trace_rpc_buf_alloc(task, status);
1911 if (status == 0)
1912 return;
1913 if (status != -ENOMEM) {
1914 rpc_call_rpcerror(task, status);
1915 return;
1916 }
1917
1918 if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1919 task->tk_action = call_allocate;
1920 rpc_delay(task, HZ>>4);
1921 return;
1922 }
1923
1924 rpc_call_rpcerror(task, status: -ERESTARTSYS);
1925}
1926
1927static int
1928rpc_task_need_encode(struct rpc_task *task)
1929{
1930 return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1931 (!(task->tk_flags & RPC_TASK_SENT) ||
1932 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1933 xprt_request_need_retransmit(task));
1934}
1935
1936static void
1937rpc_xdr_encode(struct rpc_task *task)
1938{
1939 struct rpc_rqst *req = task->tk_rqstp;
1940 struct xdr_stream xdr;
1941
1942 xdr_buf_init(buf: &req->rq_snd_buf,
1943 start: req->rq_buffer,
1944 len: req->rq_callsize);
1945 xdr_buf_init(buf: &req->rq_rcv_buf,
1946 start: req->rq_rbuffer,
1947 len: req->rq_rcvsize);
1948
1949 req->rq_reply_bytes_recvd = 0;
1950 req->rq_snd_buf.head[0].iov_len = 0;
1951 xdr_init_encode(xdr: &xdr, buf: &req->rq_snd_buf,
1952 p: req->rq_snd_buf.head[0].iov_base, rqst: req);
1953 if (rpc_encode_header(task, xdr: &xdr))
1954 return;
1955
1956 task->tk_status = rpcauth_wrap_req(task, xdr: &xdr);
1957}
1958
1959/*
1960 * 3. Encode arguments of an RPC call
1961 */
1962static void
1963call_encode(struct rpc_task *task)
1964{
1965 if (!rpc_task_need_encode(task))
1966 goto out;
1967
1968 /* Dequeue task from the receive queue while we're encoding */
1969 xprt_request_dequeue_xprt(task);
1970 /* Encode here so that rpcsec_gss can use correct sequence number. */
1971 rpc_xdr_encode(task);
1972 /* Add task to reply queue before transmission to avoid races */
1973 if (task->tk_status == 0 && rpc_reply_expected(task))
1974 task->tk_status = xprt_request_enqueue_receive(task);
1975 /* Did the encode result in an error condition? */
1976 if (task->tk_status != 0) {
1977 /* Was the error nonfatal? */
1978 switch (task->tk_status) {
1979 case -EAGAIN:
1980 case -ENOMEM:
1981 rpc_delay(task, HZ >> 4);
1982 break;
1983 case -EKEYEXPIRED:
1984 if (!task->tk_cred_retry) {
1985 rpc_call_rpcerror(task, status: task->tk_status);
1986 } else {
1987 task->tk_action = call_refresh;
1988 task->tk_cred_retry--;
1989 trace_rpc_retry_refresh_status(task);
1990 }
1991 break;
1992 default:
1993 rpc_call_rpcerror(task, status: task->tk_status);
1994 }
1995 return;
1996 }
1997
1998 xprt_request_enqueue_transmit(task);
1999out:
2000 task->tk_action = call_transmit;
2001 /* Check that the connection is OK */
2002 if (!xprt_bound(xprt: task->tk_xprt))
2003 task->tk_action = call_bind;
2004 else if (!xprt_connected(xprt: task->tk_xprt))
2005 task->tk_action = call_connect;
2006}
2007
2008/*
2009 * Helpers to check if the task was already transmitted, and
2010 * to take action when that is the case.
2011 */
2012static bool
2013rpc_task_transmitted(struct rpc_task *task)
2014{
2015 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
2016}
2017
2018static void
2019rpc_task_handle_transmitted(struct rpc_task *task)
2020{
2021 xprt_end_transmit(task);
2022 task->tk_action = call_transmit_status;
2023}
2024
2025/*
2026 * 4. Get the server port number if not yet set
2027 */
2028static void
2029call_bind(struct rpc_task *task)
2030{
2031 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2032
2033 if (rpc_task_transmitted(task)) {
2034 rpc_task_handle_transmitted(task);
2035 return;
2036 }
2037
2038 if (xprt_bound(xprt)) {
2039 task->tk_action = call_connect;
2040 return;
2041 }
2042
2043 task->tk_action = call_bind_status;
2044 if (!xprt_prepare_transmit(task))
2045 return;
2046
2047 xprt->ops->rpcbind(task);
2048}
2049
2050/*
2051 * 4a. Sort out bind result
2052 */
2053static void
2054call_bind_status(struct rpc_task *task)
2055{
2056 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2057 int status = -EIO;
2058
2059 if (rpc_task_transmitted(task)) {
2060 rpc_task_handle_transmitted(task);
2061 return;
2062 }
2063
2064 if (task->tk_status >= 0)
2065 goto out_next;
2066 if (xprt_bound(xprt)) {
2067 task->tk_status = 0;
2068 goto out_next;
2069 }
2070
2071 switch (task->tk_status) {
2072 case -ENOMEM:
2073 rpc_delay(task, HZ >> 2);
2074 goto retry_timeout;
2075 case -EACCES:
2076 trace_rpcb_prog_unavail_err(task);
2077 /* fail immediately if this is an RPC ping */
2078 if (task->tk_msg.rpc_proc->p_proc == 0) {
2079 status = -EOPNOTSUPP;
2080 break;
2081 }
2082 rpc_delay(task, 3*HZ);
2083 goto retry_timeout;
2084 case -ENOBUFS:
2085 rpc_delay(task, HZ >> 2);
2086 goto retry_timeout;
2087 case -EAGAIN:
2088 goto retry_timeout;
2089 case -ETIMEDOUT:
2090 trace_rpcb_timeout_err(task);
2091 goto retry_timeout;
2092 case -EPFNOSUPPORT:
2093 /* server doesn't support any rpcbind version we know of */
2094 trace_rpcb_bind_version_err(task);
2095 break;
2096 case -EPROTONOSUPPORT:
2097 trace_rpcb_bind_version_err(task);
2098 goto retry_timeout;
2099 case -ENETDOWN:
2100 case -ENETUNREACH:
2101 if (task->tk_flags & RPC_TASK_NETUNREACH_FATAL)
2102 break;
2103 fallthrough;
2104 case -ECONNREFUSED: /* connection problems */
2105 case -ECONNRESET:
2106 case -ECONNABORTED:
2107 case -ENOTCONN:
2108 case -EHOSTDOWN:
2109 case -EHOSTUNREACH:
2110 case -EPIPE:
2111 trace_rpcb_unreachable_err(task);
2112 if (!RPC_IS_SOFTCONN(task)) {
2113 rpc_delay(task, 5*HZ);
2114 goto retry_timeout;
2115 }
2116 status = task->tk_status;
2117 break;
2118 default:
2119 trace_rpcb_unrecognized_err(task);
2120 }
2121
2122 rpc_call_rpcerror(task, status);
2123 return;
2124out_next:
2125 task->tk_action = call_connect;
2126 return;
2127retry_timeout:
2128 task->tk_status = 0;
2129 task->tk_action = call_bind;
2130 rpc_check_timeout(task);
2131}
2132
2133/*
2134 * 4b. Connect to the RPC server
2135 */
2136static void
2137call_connect(struct rpc_task *task)
2138{
2139 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2140
2141 if (rpc_task_transmitted(task)) {
2142 rpc_task_handle_transmitted(task);
2143 return;
2144 }
2145
2146 if (xprt_connected(xprt)) {
2147 task->tk_action = call_transmit;
2148 return;
2149 }
2150
2151 task->tk_action = call_connect_status;
2152 if (task->tk_status < 0)
2153 return;
2154 if (task->tk_flags & RPC_TASK_NOCONNECT) {
2155 rpc_call_rpcerror(task, status: -ENOTCONN);
2156 return;
2157 }
2158 if (!xprt_prepare_transmit(task))
2159 return;
2160 xprt_connect(task);
2161}
2162
2163/*
2164 * 4c. Sort out connect result
2165 */
2166static void
2167call_connect_status(struct rpc_task *task)
2168{
2169 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2170 struct rpc_clnt *clnt = task->tk_client;
2171 int status = task->tk_status;
2172
2173 if (rpc_task_transmitted(task)) {
2174 rpc_task_handle_transmitted(task);
2175 return;
2176 }
2177
2178 trace_rpc_connect_status(task);
2179
2180 if (task->tk_status == 0) {
2181 clnt->cl_stats->netreconn++;
2182 goto out_next;
2183 }
2184 if (xprt_connected(xprt)) {
2185 task->tk_status = 0;
2186 goto out_next;
2187 }
2188
2189 task->tk_status = 0;
2190 switch (status) {
2191 case -ENETDOWN:
2192 case -ENETUNREACH:
2193 if (task->tk_flags & RPC_TASK_NETUNREACH_FATAL)
2194 break;
2195 fallthrough;
2196 case -ECONNREFUSED:
2197 case -ECONNRESET:
2198 /* A positive refusal suggests a rebind is needed. */
2199 if (clnt->cl_autobind) {
2200 rpc_force_rebind(clnt);
2201 if (RPC_IS_SOFTCONN(task))
2202 break;
2203 goto out_retry;
2204 }
2205 fallthrough;
2206 case -ECONNABORTED:
2207 case -EHOSTUNREACH:
2208 case -EPIPE:
2209 case -EPROTO:
2210 xprt_conditional_disconnect(xprt: task->tk_rqstp->rq_xprt,
2211 cookie: task->tk_rqstp->rq_connect_cookie);
2212 if (RPC_IS_SOFTCONN(task))
2213 break;
2214 /* retry with existing socket, after a delay */
2215 rpc_delay(task, 3*HZ);
2216 fallthrough;
2217 case -EADDRINUSE:
2218 case -ENOTCONN:
2219 case -EAGAIN:
2220 case -ETIMEDOUT:
2221 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2222 (task->tk_flags & RPC_TASK_MOVEABLE) &&
2223 test_bit(XPRT_REMOVE, &xprt->state)) {
2224 struct rpc_xprt *saved = task->tk_xprt;
2225 struct rpc_xprt_switch *xps;
2226
2227 xps = rpc_clnt_xprt_switch_get(clnt);
2228 if (xps->xps_nxprts > 1) {
2229 long value;
2230
2231 xprt_release(task);
2232 value = atomic_long_dec_return(v: &xprt->queuelen);
2233 if (value == 0)
2234 rpc_xprt_switch_remove_xprt(xps, xprt: saved,
2235 offline: true);
2236 xprt_put(xprt: saved);
2237 task->tk_xprt = NULL;
2238 task->tk_action = call_start;
2239 }
2240 xprt_switch_put(xps);
2241 if (!task->tk_xprt)
2242 goto out;
2243 }
2244 goto out_retry;
2245 case -ENOBUFS:
2246 rpc_delay(task, HZ >> 2);
2247 goto out_retry;
2248 }
2249 rpc_call_rpcerror(task, status);
2250 return;
2251out_next:
2252 task->tk_action = call_transmit;
2253 return;
2254out_retry:
2255 /* Check for timeouts before looping back to call_bind */
2256 task->tk_action = call_bind;
2257out:
2258 rpc_check_timeout(task);
2259}
2260
2261/*
2262 * 5. Transmit the RPC request, and wait for reply
2263 */
2264static void
2265call_transmit(struct rpc_task *task)
2266{
2267 if (rpc_task_transmitted(task)) {
2268 rpc_task_handle_transmitted(task);
2269 return;
2270 }
2271
2272 task->tk_action = call_transmit_status;
2273 if (!xprt_prepare_transmit(task))
2274 return;
2275 task->tk_status = 0;
2276 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2277 if (!xprt_connected(xprt: task->tk_xprt)) {
2278 task->tk_status = -ENOTCONN;
2279 return;
2280 }
2281 xprt_transmit(task);
2282 }
2283 xprt_end_transmit(task);
2284}
2285
2286/*
2287 * 5a. Handle cleanup after a transmission
2288 */
2289static void
2290call_transmit_status(struct rpc_task *task)
2291{
2292 task->tk_action = call_status;
2293
2294 /*
2295 * Common case: success. Force the compiler to put this
2296 * test first.
2297 */
2298 if (rpc_task_transmitted(task)) {
2299 task->tk_status = 0;
2300 xprt_request_wait_receive(task);
2301 return;
2302 }
2303
2304 switch (task->tk_status) {
2305 default:
2306 break;
2307 case -EBADMSG:
2308 task->tk_status = 0;
2309 task->tk_action = call_encode;
2310 break;
2311 /*
2312 * Special cases: if we've been waiting on the
2313 * socket's write_space() callback, or if the
2314 * socket just returned a connection error,
2315 * then hold onto the transport lock.
2316 */
2317 case -ENOMEM:
2318 case -ENOBUFS:
2319 rpc_delay(task, HZ>>2);
2320 fallthrough;
2321 case -EBADSLT:
2322 case -EAGAIN:
2323 task->tk_action = call_transmit;
2324 task->tk_status = 0;
2325 break;
2326 case -EHOSTDOWN:
2327 case -ENETDOWN:
2328 case -EHOSTUNREACH:
2329 case -ENETUNREACH:
2330 case -EPERM:
2331 break;
2332 case -ECONNREFUSED:
2333 if (RPC_IS_SOFTCONN(task)) {
2334 if (!task->tk_msg.rpc_proc->p_proc)
2335 trace_xprt_ping(xprt: task->tk_xprt,
2336 status: task->tk_status);
2337 rpc_call_rpcerror(task, status: task->tk_status);
2338 return;
2339 }
2340 fallthrough;
2341 case -ECONNRESET:
2342 case -ECONNABORTED:
2343 case -EADDRINUSE:
2344 case -ENOTCONN:
2345 case -EPIPE:
2346 task->tk_action = call_bind;
2347 task->tk_status = 0;
2348 break;
2349 }
2350 rpc_check_timeout(task);
2351}
2352
2353#if defined(CONFIG_SUNRPC_BACKCHANNEL)
2354static void call_bc_transmit(struct rpc_task *task);
2355static void call_bc_transmit_status(struct rpc_task *task);
2356
2357static void
2358call_bc_encode(struct rpc_task *task)
2359{
2360 xprt_request_enqueue_transmit(task);
2361 task->tk_action = call_bc_transmit;
2362}
2363
2364/*
2365 * 5b. Send the backchannel RPC reply. On error, drop the reply. In
2366 * addition, disconnect on connectivity errors.
2367 */
2368static void
2369call_bc_transmit(struct rpc_task *task)
2370{
2371 task->tk_action = call_bc_transmit_status;
2372 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2373 if (!xprt_prepare_transmit(task))
2374 return;
2375 task->tk_status = 0;
2376 xprt_transmit(task);
2377 }
2378 xprt_end_transmit(task);
2379}
2380
2381static void
2382call_bc_transmit_status(struct rpc_task *task)
2383{
2384 struct rpc_rqst *req = task->tk_rqstp;
2385
2386 if (rpc_task_transmitted(task))
2387 task->tk_status = 0;
2388
2389 switch (task->tk_status) {
2390 case 0:
2391 /* Success */
2392 case -ENETDOWN:
2393 case -EHOSTDOWN:
2394 case -EHOSTUNREACH:
2395 case -ENETUNREACH:
2396 case -ECONNRESET:
2397 case -ECONNREFUSED:
2398 case -EADDRINUSE:
2399 case -ENOTCONN:
2400 case -EPIPE:
2401 break;
2402 case -ENOMEM:
2403 case -ENOBUFS:
2404 rpc_delay(task, HZ>>2);
2405 fallthrough;
2406 case -EBADSLT:
2407 case -EAGAIN:
2408 task->tk_status = 0;
2409 task->tk_action = call_bc_transmit;
2410 return;
2411 case -ETIMEDOUT:
2412 /*
2413 * Problem reaching the server. Disconnect and let the
2414 * forechannel reestablish the connection. The server will
2415 * have to retransmit the backchannel request and we'll
2416 * reprocess it. Since these ops are idempotent, there's no
2417 * need to cache our reply at this time.
2418 */
2419 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2420 "error: %d\n", task->tk_status);
2421 xprt_conditional_disconnect(req->rq_xprt,
2422 req->rq_connect_cookie);
2423 break;
2424 default:
2425 /*
2426 * We were unable to reply and will have to drop the
2427 * request. The server should reconnect and retransmit.
2428 */
2429 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2430 "error: %d\n", task->tk_status);
2431 break;
2432 }
2433 task->tk_action = rpc_exit_task;
2434}
2435#endif /* CONFIG_SUNRPC_BACKCHANNEL */
2436
2437/*
2438 * 6. Sort out the RPC call status
2439 */
2440static void
2441call_status(struct rpc_task *task)
2442{
2443 struct rpc_clnt *clnt = task->tk_client;
2444 int status;
2445
2446 if (!task->tk_msg.rpc_proc->p_proc)
2447 trace_xprt_ping(xprt: task->tk_xprt, status: task->tk_status);
2448
2449 status = task->tk_status;
2450 if (status >= 0) {
2451 task->tk_action = call_decode;
2452 return;
2453 }
2454
2455 trace_rpc_call_status(task);
2456 task->tk_status = 0;
2457 switch(status) {
2458 case -ENETDOWN:
2459 case -ENETUNREACH:
2460 if (task->tk_flags & RPC_TASK_NETUNREACH_FATAL)
2461 goto out_exit;
2462 fallthrough;
2463 case -EHOSTDOWN:
2464 case -EHOSTUNREACH:
2465 case -EPERM:
2466 if (RPC_IS_SOFTCONN(task))
2467 goto out_exit;
2468 /*
2469 * Delay any retries for 3 seconds, then handle as if it
2470 * were a timeout.
2471 */
2472 rpc_delay(task, 3*HZ);
2473 fallthrough;
2474 case -ETIMEDOUT:
2475 break;
2476 case -ECONNREFUSED:
2477 case -ECONNRESET:
2478 case -ECONNABORTED:
2479 case -ENOTCONN:
2480 rpc_force_rebind(clnt);
2481 break;
2482 case -EADDRINUSE:
2483 rpc_delay(task, 3*HZ);
2484 fallthrough;
2485 case -EPIPE:
2486 case -EAGAIN:
2487 break;
2488 case -ENFILE:
2489 case -ENOBUFS:
2490 case -ENOMEM:
2491 rpc_delay(task, HZ>>2);
2492 break;
2493 case -EIO:
2494 /* shutdown or soft timeout */
2495 goto out_exit;
2496 default:
2497 if (clnt->cl_chatty)
2498 printk("%s: RPC call returned error %d\n",
2499 clnt->cl_program->name, -status);
2500 goto out_exit;
2501 }
2502 task->tk_action = call_encode;
2503 rpc_check_timeout(task);
2504 return;
2505out_exit:
2506 rpc_call_rpcerror(task, status);
2507}
2508
2509static bool
2510rpc_check_connected(const struct rpc_rqst *req)
2511{
2512 /* No allocated request or transport? return true */
2513 if (!req || !req->rq_xprt)
2514 return true;
2515 return xprt_connected(xprt: req->rq_xprt);
2516}
2517
2518static void
2519rpc_check_timeout(struct rpc_task *task)
2520{
2521 struct rpc_clnt *clnt = task->tk_client;
2522
2523 if (RPC_SIGNALLED(task))
2524 return;
2525
2526 if (xprt_adjust_timeout(req: task->tk_rqstp) == 0)
2527 return;
2528
2529 trace_rpc_timeout_status(task);
2530 task->tk_timeouts++;
2531
2532 if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(req: task->tk_rqstp)) {
2533 rpc_call_rpcerror(task, status: -ETIMEDOUT);
2534 return;
2535 }
2536
2537 if (RPC_IS_SOFT(task)) {
2538 /*
2539 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2540 * been sent, it should time out only if the transport
2541 * connection gets terminally broken.
2542 */
2543 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2544 rpc_check_connected(req: task->tk_rqstp))
2545 return;
2546
2547 if (clnt->cl_chatty) {
2548 pr_notice_ratelimited(
2549 "%s: server %s not responding, timed out\n",
2550 clnt->cl_program->name,
2551 task->tk_xprt->servername);
2552 }
2553 if (task->tk_flags & RPC_TASK_TIMEOUT)
2554 rpc_call_rpcerror(task, status: -ETIMEDOUT);
2555 else
2556 __rpc_call_rpcerror(task, tk_status: -EIO, rpc_status: -ETIMEDOUT);
2557 return;
2558 }
2559
2560 if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2561 task->tk_flags |= RPC_CALL_MAJORSEEN;
2562 if (clnt->cl_chatty) {
2563 pr_notice_ratelimited(
2564 "%s: server %s not responding, still trying\n",
2565 clnt->cl_program->name,
2566 task->tk_xprt->servername);
2567 }
2568 }
2569 rpc_force_rebind(clnt);
2570 /*
2571 * Did our request time out due to an RPCSEC_GSS out-of-sequence
2572 * event? RFC2203 requires the server to drop all such requests.
2573 */
2574 rpcauth_invalcred(task);
2575}
2576
2577/*
2578 * 7. Decode the RPC reply
2579 */
2580static void
2581call_decode(struct rpc_task *task)
2582{
2583 struct rpc_clnt *clnt = task->tk_client;
2584 struct rpc_rqst *req = task->tk_rqstp;
2585 struct xdr_stream xdr;
2586 int err;
2587
2588 if (!task->tk_msg.rpc_proc->p_decode) {
2589 task->tk_action = rpc_exit_task;
2590 return;
2591 }
2592
2593 if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2594 if (clnt->cl_chatty) {
2595 pr_notice_ratelimited("%s: server %s OK\n",
2596 clnt->cl_program->name,
2597 task->tk_xprt->servername);
2598 }
2599 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2600 }
2601
2602 /*
2603 * Did we ever call xprt_complete_rqst()? If not, we should assume
2604 * the message is incomplete.
2605 */
2606 err = -EAGAIN;
2607 if (!req->rq_reply_bytes_recvd)
2608 goto out;
2609
2610 /* Ensure that we see all writes made by xprt_complete_rqst()
2611 * before it changed req->rq_reply_bytes_recvd.
2612 */
2613 smp_rmb();
2614
2615 req->rq_rcv_buf.len = req->rq_private_buf.len;
2616 trace_rpc_xdr_recvfrom(task, xdr: &req->rq_rcv_buf);
2617
2618 /* Check that the softirq receive buffer is valid */
2619 WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2620 sizeof(req->rq_rcv_buf)) != 0);
2621
2622 xdr_init_decode(xdr: &xdr, buf: &req->rq_rcv_buf,
2623 p: req->rq_rcv_buf.head[0].iov_base, rqst: req);
2624 err = rpc_decode_header(task, xdr: &xdr);
2625out:
2626 switch (err) {
2627 case 0:
2628 task->tk_action = rpc_exit_task;
2629 task->tk_status = rpcauth_unwrap_resp(task, xdr: &xdr);
2630 xdr_finish_decode(xdr: &xdr);
2631 return;
2632 case -EAGAIN:
2633 task->tk_status = 0;
2634 if (task->tk_client->cl_discrtry)
2635 xprt_conditional_disconnect(xprt: req->rq_xprt,
2636 cookie: req->rq_connect_cookie);
2637 task->tk_action = call_encode;
2638 rpc_check_timeout(task);
2639 break;
2640 case -EKEYREJECTED:
2641 task->tk_action = call_reserve;
2642 rpc_check_timeout(task);
2643 rpcauth_invalcred(task);
2644 /* Ensure we obtain a new XID if we retry! */
2645 xprt_release(task);
2646 }
2647}
2648
2649static int
2650rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2651{
2652 struct rpc_clnt *clnt = task->tk_client;
2653 struct rpc_rqst *req = task->tk_rqstp;
2654 __be32 *p;
2655 int error;
2656
2657 error = -EMSGSIZE;
2658 p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2659 if (!p)
2660 goto out_fail;
2661 *p++ = req->rq_xid;
2662 *p++ = rpc_call;
2663 *p++ = cpu_to_be32(RPC_VERSION);
2664 *p++ = cpu_to_be32(clnt->cl_prog);
2665 *p++ = cpu_to_be32(clnt->cl_vers);
2666 *p = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2667
2668 error = rpcauth_marshcred(task, xdr);
2669 if (error < 0)
2670 goto out_fail;
2671 return 0;
2672out_fail:
2673 trace_rpc_bad_callhdr(task);
2674 rpc_call_rpcerror(task, status: error);
2675 return error;
2676}
2677
2678static noinline int
2679rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2680{
2681 struct rpc_clnt *clnt = task->tk_client;
2682 int error;
2683 __be32 *p;
2684
2685 /* RFC-1014 says that the representation of XDR data must be a
2686 * multiple of four bytes
2687 * - if it isn't pointer subtraction in the NFS client may give
2688 * undefined results
2689 */
2690 if (task->tk_rqstp->rq_rcv_buf.len & 3)
2691 goto out_unparsable;
2692
2693 p = xdr_inline_decode(xdr, nbytes: 3 * sizeof(*p));
2694 if (!p)
2695 goto out_unparsable;
2696 p++; /* skip XID */
2697 if (*p++ != rpc_reply)
2698 goto out_unparsable;
2699 if (*p++ != rpc_msg_accepted)
2700 goto out_msg_denied;
2701
2702 error = rpcauth_checkverf(task, xdr);
2703 if (error) {
2704 struct rpc_cred *cred = task->tk_rqstp->rq_cred;
2705
2706 if (!test_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags)) {
2707 rpcauth_invalcred(task);
2708 if (!task->tk_cred_retry)
2709 goto out_err;
2710 task->tk_cred_retry--;
2711 trace_rpc__stale_creds(task);
2712 return -EKEYREJECTED;
2713 }
2714 goto out_verifier;
2715 }
2716
2717 p = xdr_inline_decode(xdr, nbytes: sizeof(*p));
2718 if (!p)
2719 goto out_unparsable;
2720 switch (*p) {
2721 case rpc_success:
2722 return 0;
2723 case rpc_prog_unavail:
2724 trace_rpc__prog_unavail(task);
2725 error = -EPFNOSUPPORT;
2726 goto out_err;
2727 case rpc_prog_mismatch:
2728 trace_rpc__prog_mismatch(task);
2729 error = -EPROTONOSUPPORT;
2730 goto out_err;
2731 case rpc_proc_unavail:
2732 trace_rpc__proc_unavail(task);
2733 error = -EOPNOTSUPP;
2734 goto out_err;
2735 case rpc_garbage_args:
2736 case rpc_system_err:
2737 trace_rpc__garbage_args(task);
2738 error = -EIO;
2739 break;
2740 default:
2741 goto out_unparsable;
2742 }
2743
2744out_garbage:
2745 clnt->cl_stats->rpcgarbage++;
2746 if (task->tk_garb_retry) {
2747 task->tk_garb_retry--;
2748 task->tk_action = call_encode;
2749 return -EAGAIN;
2750 }
2751out_err:
2752 rpc_call_rpcerror(task, status: error);
2753 return error;
2754
2755out_unparsable:
2756 trace_rpc__unparsable(task);
2757 error = -EIO;
2758 goto out_garbage;
2759
2760out_verifier:
2761 trace_rpc_bad_verifier(task);
2762 switch (error) {
2763 case -EPROTONOSUPPORT:
2764 goto out_err;
2765 case -EACCES:
2766 /* possible RPCSEC_GSS out-of-sequence event (RFC2203),
2767 * reset recv state and keep waiting, don't retransmit
2768 */
2769 task->tk_rqstp->rq_reply_bytes_recvd = 0;
2770 task->tk_status = xprt_request_enqueue_receive(task);
2771 task->tk_action = call_transmit_status;
2772 return -EBADMSG;
2773 default:
2774 goto out_garbage;
2775 }
2776
2777out_msg_denied:
2778 error = -EACCES;
2779 p = xdr_inline_decode(xdr, nbytes: sizeof(*p));
2780 if (!p)
2781 goto out_unparsable;
2782 switch (*p++) {
2783 case rpc_auth_error:
2784 break;
2785 case rpc_mismatch:
2786 trace_rpc__mismatch(task);
2787 error = -EPROTONOSUPPORT;
2788 goto out_err;
2789 default:
2790 goto out_unparsable;
2791 }
2792
2793 p = xdr_inline_decode(xdr, nbytes: sizeof(*p));
2794 if (!p)
2795 goto out_unparsable;
2796 switch (*p++) {
2797 case rpc_autherr_rejectedcred:
2798 case rpc_autherr_rejectedverf:
2799 case rpcsec_gsserr_credproblem:
2800 case rpcsec_gsserr_ctxproblem:
2801 rpcauth_invalcred(task);
2802 if (!task->tk_cred_retry)
2803 break;
2804 task->tk_cred_retry--;
2805 trace_rpc__stale_creds(task);
2806 return -EKEYREJECTED;
2807 case rpc_autherr_badcred:
2808 case rpc_autherr_badverf:
2809 /* possibly garbled cred/verf? */
2810 if (!task->tk_garb_retry)
2811 break;
2812 task->tk_garb_retry--;
2813 trace_rpc__bad_creds(task);
2814 task->tk_action = call_encode;
2815 return -EAGAIN;
2816 case rpc_autherr_tooweak:
2817 trace_rpc__auth_tooweak(task);
2818 pr_warn("RPC: server %s requires stronger authentication.\n",
2819 task->tk_xprt->servername);
2820 break;
2821 default:
2822 goto out_unparsable;
2823 }
2824 goto out_err;
2825}
2826
2827static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2828 const void *obj)
2829{
2830}
2831
2832static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2833 void *obj)
2834{
2835 return 0;
2836}
2837
2838static const struct rpc_procinfo rpcproc_null = {
2839 .p_encode = rpcproc_encode_null,
2840 .p_decode = rpcproc_decode_null,
2841};
2842
2843static const struct rpc_procinfo rpcproc_null_noreply = {
2844 .p_encode = rpcproc_encode_null,
2845};
2846
2847static void
2848rpc_null_call_prepare(struct rpc_task *task, void *data)
2849{
2850 task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2851 rpc_call_start(task);
2852}
2853
2854static const struct rpc_call_ops rpc_null_ops = {
2855 .rpc_call_prepare = rpc_null_call_prepare,
2856 .rpc_call_done = rpc_default_callback,
2857};
2858
2859static
2860struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2861 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2862 const struct rpc_call_ops *ops, void *data)
2863{
2864 struct rpc_message msg = {
2865 .rpc_proc = &rpcproc_null,
2866 };
2867 struct rpc_task_setup task_setup_data = {
2868 .rpc_client = clnt,
2869 .rpc_xprt = xprt,
2870 .rpc_message = &msg,
2871 .rpc_op_cred = cred,
2872 .callback_ops = ops ?: &rpc_null_ops,
2873 .callback_data = data,
2874 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2875 RPC_TASK_NULLCREDS,
2876 };
2877
2878 return rpc_run_task(&task_setup_data);
2879}
2880
2881struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2882{
2883 return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2884}
2885EXPORT_SYMBOL_GPL(rpc_call_null);
2886
2887static int rpc_ping(struct rpc_clnt *clnt)
2888{
2889 struct rpc_task *task;
2890 int status;
2891
2892 if (clnt->cl_auth->au_ops->ping)
2893 return clnt->cl_auth->au_ops->ping(clnt);
2894
2895 task = rpc_call_null_helper(clnt, NULL, NULL, flags: 0, NULL, NULL);
2896 if (IS_ERR(ptr: task))
2897 return PTR_ERR(ptr: task);
2898 status = task->tk_status;
2899 rpc_put_task(task);
2900 return status;
2901}
2902
2903static int rpc_ping_noreply(struct rpc_clnt *clnt)
2904{
2905 struct rpc_message msg = {
2906 .rpc_proc = &rpcproc_null_noreply,
2907 };
2908 struct rpc_task_setup task_setup_data = {
2909 .rpc_client = clnt,
2910 .rpc_message = &msg,
2911 .callback_ops = &rpc_null_ops,
2912 .flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2913 };
2914 struct rpc_task *task;
2915 int status;
2916
2917 task = rpc_run_task(&task_setup_data);
2918 if (IS_ERR(ptr: task))
2919 return PTR_ERR(ptr: task);
2920 status = task->tk_status;
2921 rpc_put_task(task);
2922 return status;
2923}
2924
2925struct rpc_cb_add_xprt_calldata {
2926 struct rpc_xprt_switch *xps;
2927 struct rpc_xprt *xprt;
2928};
2929
2930static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2931{
2932 struct rpc_cb_add_xprt_calldata *data = calldata;
2933
2934 if (task->tk_status == 0)
2935 rpc_xprt_switch_add_xprt(xps: data->xps, xprt: data->xprt);
2936}
2937
2938static void rpc_cb_add_xprt_release(void *calldata)
2939{
2940 struct rpc_cb_add_xprt_calldata *data = calldata;
2941
2942 xprt_put(xprt: data->xprt);
2943 xprt_switch_put(xps: data->xps);
2944 kfree(objp: data);
2945}
2946
2947static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2948 .rpc_call_prepare = rpc_null_call_prepare,
2949 .rpc_call_done = rpc_cb_add_xprt_done,
2950 .rpc_release = rpc_cb_add_xprt_release,
2951};
2952
2953/**
2954 * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2955 * @clnt: pointer to struct rpc_clnt
2956 * @xps: pointer to struct rpc_xprt_switch,
2957 * @xprt: pointer struct rpc_xprt
2958 * @in_max_connect: pointer to the max_connect value for the passed in xprt transport
2959 */
2960int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2961 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2962 void *in_max_connect)
2963{
2964 struct rpc_cb_add_xprt_calldata *data;
2965 struct rpc_task *task;
2966 int max_connect = clnt->cl_max_connect;
2967
2968 if (in_max_connect)
2969 max_connect = *(int *)in_max_connect;
2970 if (xps->xps_nunique_destaddr_xprts + 1 > max_connect) {
2971 rcu_read_lock();
2972 pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2973 "transport to server: %s\n", max_connect,
2974 rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2975 rcu_read_unlock();
2976 return -EINVAL;
2977 }
2978
2979 data = kmalloc(sizeof(*data), GFP_KERNEL);
2980 if (!data)
2981 return -ENOMEM;
2982 data->xps = xprt_switch_get(xps);
2983 data->xprt = xprt_get(xprt);
2984 if (rpc_xprt_switch_has_addr(xps: data->xps, sap: (struct sockaddr *)&xprt->addr)) {
2985 rpc_cb_add_xprt_release(calldata: data);
2986 goto success;
2987 }
2988
2989 task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2990 ops: &rpc_cb_add_xprt_call_ops, data);
2991 if (IS_ERR(ptr: task))
2992 return PTR_ERR(ptr: task);
2993
2994 data->xps->xps_nunique_destaddr_xprts++;
2995 rpc_put_task(task);
2996success:
2997 return 1;
2998}
2999EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
3000
3001static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
3002 struct rpc_xprt *xprt,
3003 struct rpc_add_xprt_test *data)
3004{
3005 struct rpc_task *task;
3006 int status = -EADDRINUSE;
3007
3008 /* Test the connection */
3009 task = rpc_call_null_helper(clnt, xprt, NULL, flags: 0, NULL, NULL);
3010 if (IS_ERR(ptr: task))
3011 return PTR_ERR(ptr: task);
3012
3013 status = task->tk_status;
3014 rpc_put_task(task);
3015
3016 if (status < 0)
3017 return status;
3018
3019 /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
3020 data->add_xprt_test(clnt, xprt, data->data);
3021
3022 return 0;
3023}
3024
3025/**
3026 * rpc_clnt_setup_test_and_add_xprt()
3027 *
3028 * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
3029 * 1) caller of the test function must dereference the rpc_xprt_switch
3030 * and the rpc_xprt.
3031 * 2) test function must call rpc_xprt_switch_add_xprt, usually in
3032 * the rpc_call_done routine.
3033 *
3034 * Upon success (return of 1), the test function adds the new
3035 * transport to the rpc_clnt xprt switch
3036 *
3037 * @clnt: struct rpc_clnt to get the new transport
3038 * @xps: the rpc_xprt_switch to hold the new transport
3039 * @xprt: the rpc_xprt to test
3040 * @data: a struct rpc_add_xprt_test pointer that holds the test function
3041 * and test function call data
3042 */
3043int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
3044 struct rpc_xprt_switch *xps,
3045 struct rpc_xprt *xprt,
3046 void *data)
3047{
3048 int status = -EADDRINUSE;
3049
3050 xprt = xprt_get(xprt);
3051 xprt_switch_get(xps);
3052
3053 if (rpc_xprt_switch_has_addr(xps, sap: (struct sockaddr *)&xprt->addr))
3054 goto out_err;
3055
3056 status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3057 if (status < 0)
3058 goto out_err;
3059
3060 status = 1;
3061out_err:
3062 xprt_put(xprt);
3063 xprt_switch_put(xps);
3064 if (status < 0)
3065 pr_info("RPC: rpc_clnt_test_xprt failed: %d addr %s not "
3066 "added\n", status,
3067 xprt->address_strings[RPC_DISPLAY_ADDR]);
3068 /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
3069 return status;
3070}
3071EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
3072
3073/**
3074 * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
3075 * @clnt: pointer to struct rpc_clnt
3076 * @xprtargs: pointer to struct xprt_create
3077 * @setup: callback to test and/or set up the connection
3078 * @data: pointer to setup function data
3079 *
3080 * Creates a new transport using the parameters set in args and
3081 * adds it to clnt.
3082 * If ping is set, then test that connectivity succeeds before
3083 * adding the new transport.
3084 *
3085 */
3086int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
3087 struct xprt_create *xprtargs,
3088 int (*setup)(struct rpc_clnt *,
3089 struct rpc_xprt_switch *,
3090 struct rpc_xprt *,
3091 void *),
3092 void *data)
3093{
3094 struct rpc_xprt_switch *xps;
3095 struct rpc_xprt *xprt;
3096 unsigned long connect_timeout;
3097 unsigned long reconnect_timeout;
3098 unsigned char resvport, reuseport;
3099 int ret = 0, ident;
3100
3101 rcu_read_lock();
3102 xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3103 xprt = xprt_iter_xprt(xpi: &clnt->cl_xpi);
3104 if (xps == NULL || xprt == NULL) {
3105 rcu_read_unlock();
3106 xprt_switch_put(xps);
3107 return -EAGAIN;
3108 }
3109 resvport = xprt->resvport;
3110 reuseport = xprt->reuseport;
3111 connect_timeout = xprt->connect_timeout;
3112 reconnect_timeout = xprt->max_reconnect_timeout;
3113 ident = xprt->xprt_class->ident;
3114 rcu_read_unlock();
3115
3116 if (!xprtargs->ident)
3117 xprtargs->ident = ident;
3118 xprtargs->xprtsec = clnt->cl_xprtsec;
3119 xprt = xprt_create_transport(args: xprtargs);
3120 if (IS_ERR(ptr: xprt)) {
3121 ret = PTR_ERR(ptr: xprt);
3122 goto out_put_switch;
3123 }
3124 xprt->resvport = resvport;
3125 xprt->reuseport = reuseport;
3126
3127 if (xprtargs->connect_timeout)
3128 connect_timeout = xprtargs->connect_timeout;
3129 if (xprtargs->reconnect_timeout)
3130 reconnect_timeout = xprtargs->reconnect_timeout;
3131 if (xprt->ops->set_connect_timeout != NULL)
3132 xprt->ops->set_connect_timeout(xprt,
3133 connect_timeout,
3134 reconnect_timeout);
3135
3136 rpc_xprt_switch_set_roundrobin(xps);
3137 if (setup) {
3138 ret = setup(clnt, xps, xprt, data);
3139 if (ret != 0)
3140 goto out_put_xprt;
3141 }
3142 rpc_xprt_switch_add_xprt(xps, xprt);
3143out_put_xprt:
3144 xprt_put(xprt);
3145out_put_switch:
3146 xprt_switch_put(xps);
3147 return ret;
3148}
3149EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3150
3151static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3152 struct rpc_xprt *xprt,
3153 struct rpc_add_xprt_test *data)
3154{
3155 struct rpc_xprt *main_xprt;
3156 int status = 0;
3157
3158 xprt_get(xprt);
3159
3160 rcu_read_lock();
3161 main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3162 status = rpc_cmp_addr_port(sap1: (struct sockaddr *)&xprt->addr,
3163 sap2: (struct sockaddr *)&main_xprt->addr);
3164 rcu_read_unlock();
3165 xprt_put(xprt: main_xprt);
3166 if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3167 goto out;
3168
3169 status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3170out:
3171 xprt_put(xprt);
3172 return status;
3173}
3174
3175/* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3176 * @clnt rpc_clnt structure
3177 *
3178 * For each offlined transport found in the rpc_clnt structure call
3179 * the function rpc_xprt_probe_trunked() which will determine if this
3180 * transport still belongs to the trunking group.
3181 */
3182void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3183 struct rpc_add_xprt_test *data)
3184{
3185 struct rpc_xprt_iter xpi;
3186 int ret;
3187
3188 ret = rpc_clnt_xprt_iter_offline_init(clnt, xpi: &xpi);
3189 if (ret)
3190 return;
3191 for (;;) {
3192 struct rpc_xprt *xprt = xprt_iter_get_next(xpi: &xpi);
3193
3194 if (!xprt)
3195 break;
3196 ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3197 xprt_put(xprt);
3198 if (ret < 0)
3199 break;
3200 xprt_iter_rewind(xpi: &xpi);
3201 }
3202 xprt_iter_destroy(xpi: &xpi);
3203}
3204EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3205
3206static int rpc_xprt_offline(struct rpc_clnt *clnt,
3207 struct rpc_xprt *xprt,
3208 void *data)
3209{
3210 struct rpc_xprt *main_xprt;
3211 struct rpc_xprt_switch *xps;
3212 int err = 0;
3213
3214 xprt_get(xprt);
3215
3216 rcu_read_lock();
3217 main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3218 xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3219 err = rpc_cmp_addr_port(sap1: (struct sockaddr *)&xprt->addr,
3220 sap2: (struct sockaddr *)&main_xprt->addr);
3221 rcu_read_unlock();
3222 xprt_put(xprt: main_xprt);
3223 if (err)
3224 goto out;
3225
3226 if (wait_on_bit_lock(word: &xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3227 err = -EINTR;
3228 goto out;
3229 }
3230 xprt_set_offline_locked(xprt, xps);
3231
3232 xprt_release_write(xprt, NULL);
3233out:
3234 xprt_put(xprt);
3235 xprt_switch_put(xps);
3236 return err;
3237}
3238
3239/* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3240 * @clnt rpc_clnt structure
3241 *
3242 * For each active transport found in the rpc_clnt structure call
3243 * the function rpc_xprt_offline() which will identify trunked transports
3244 * and will mark them offline.
3245 */
3246void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3247{
3248 rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3249}
3250EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3251
3252struct connect_timeout_data {
3253 unsigned long connect_timeout;
3254 unsigned long reconnect_timeout;
3255};
3256
3257static int
3258rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3259 struct rpc_xprt *xprt,
3260 void *data)
3261{
3262 struct connect_timeout_data *timeo = data;
3263
3264 if (xprt->ops->set_connect_timeout)
3265 xprt->ops->set_connect_timeout(xprt,
3266 timeo->connect_timeout,
3267 timeo->reconnect_timeout);
3268 return 0;
3269}
3270
3271void
3272rpc_set_connect_timeout(struct rpc_clnt *clnt,
3273 unsigned long connect_timeout,
3274 unsigned long reconnect_timeout)
3275{
3276 struct connect_timeout_data timeout = {
3277 .connect_timeout = connect_timeout,
3278 .reconnect_timeout = reconnect_timeout,
3279 };
3280 rpc_clnt_iterate_for_each_xprt(clnt,
3281 rpc_xprt_set_connect_timeout,
3282 &timeout);
3283}
3284EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3285
3286void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3287{
3288 struct rpc_xprt_switch *xps;
3289
3290 xps = rpc_clnt_xprt_switch_get(clnt);
3291 xprt_set_online_locked(xprt, xps);
3292 xprt_switch_put(xps);
3293}
3294
3295void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3296{
3297 struct rpc_xprt_switch *xps;
3298
3299 if (rpc_clnt_xprt_switch_has_addr(clnt,
3300 sap: (const struct sockaddr *)&xprt->addr)) {
3301 return rpc_clnt_xprt_set_online(clnt, xprt);
3302 }
3303
3304 xps = rpc_clnt_xprt_switch_get(clnt);
3305 rpc_xprt_switch_add_xprt(xps, xprt);
3306 xprt_switch_put(xps);
3307}
3308EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3309
3310void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3311{
3312 struct rpc_xprt_switch *xps;
3313
3314 rcu_read_lock();
3315 xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3316 rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3317 xprt, offline: 0);
3318 xps->xps_nunique_destaddr_xprts--;
3319 rcu_read_unlock();
3320}
3321EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3322
3323bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3324 const struct sockaddr *sap)
3325{
3326 struct rpc_xprt_switch *xps;
3327 bool ret;
3328
3329 rcu_read_lock();
3330 xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3331 ret = rpc_xprt_switch_has_addr(xps, sap);
3332 rcu_read_unlock();
3333 return ret;
3334}
3335EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3336
3337#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3338static void rpc_show_header(struct rpc_clnt *clnt)
3339{
3340 printk(KERN_INFO "clnt[%pISpc] RPC tasks[%d]\n",
3341 (struct sockaddr *)&clnt->cl_xprt->addr,
3342 atomic_read(&clnt->cl_task_count));
3343 printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3344 "-timeout ---ops--\n");
3345}
3346
3347static void rpc_show_task(const struct rpc_clnt *clnt,
3348 const struct rpc_task *task)
3349{
3350 const char *rpc_waitq = "none";
3351
3352 if (RPC_IS_QUEUED(task))
3353 rpc_waitq = rpc_qname(task->tk_waitqueue);
3354
3355 printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3356 task->tk_pid, task->tk_flags, task->tk_status,
3357 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3358 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3359 task->tk_action, rpc_waitq);
3360}
3361
3362void rpc_show_tasks(struct net *net)
3363{
3364 struct rpc_clnt *clnt;
3365 struct rpc_task *task;
3366 int header = 0;
3367 struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3368
3369 spin_lock(&sn->rpc_client_lock);
3370 list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3371 spin_lock(&clnt->cl_lock);
3372 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3373 if (!header) {
3374 rpc_show_header(clnt);
3375 header++;
3376 }
3377 rpc_show_task(clnt, task);
3378 }
3379 spin_unlock(&clnt->cl_lock);
3380 }
3381 spin_unlock(&sn->rpc_client_lock);
3382}
3383#endif
3384
3385#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3386static int
3387rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3388 struct rpc_xprt *xprt,
3389 void *dummy)
3390{
3391 return xprt_enable_swap(xprt);
3392}
3393
3394int
3395rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3396{
3397 while (clnt != clnt->cl_parent)
3398 clnt = clnt->cl_parent;
3399 if (atomic_inc_return(&clnt->cl_swapper) == 1)
3400 return rpc_clnt_iterate_for_each_xprt(clnt,
3401 rpc_clnt_swap_activate_callback, NULL);
3402 return 0;
3403}
3404EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3405
3406static int
3407rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3408 struct rpc_xprt *xprt,
3409 void *dummy)
3410{
3411 xprt_disable_swap(xprt);
3412 return 0;
3413}
3414
3415void
3416rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3417{
3418 while (clnt != clnt->cl_parent)
3419 clnt = clnt->cl_parent;
3420 if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3421 rpc_clnt_iterate_for_each_xprt(clnt,
3422 rpc_clnt_swap_deactivate_callback, NULL);
3423}
3424EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3425#endif /* CONFIG_SUNRPC_SWAP */
3426