Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
     26 /* All Rights Reserved */
     27 /*
     28  * Portions of this source code were derived from Berkeley
     29  * 4.3 BSD under license from the Regents of the University of
     30  * California.
     31  */
     32 
     33 #include <sys/param.h>
     34 #include <sys/types.h>
     35 #include <sys/user.h>
     36 #include <sys/systm.h>
     37 #include <sys/sysmacros.h>
     38 #include <sys/errno.h>
     39 #include <sys/kmem.h>
     40 #include <sys/debug.h>
     41 #include <sys/systm.h>
     42 #include <sys/kstat.h>
     43 #include <sys/t_lock.h>
     44 #include <sys/ddi.h>
     45 #include <sys/cmn_err.h>
     46 #include <sys/time.h>
     47 #include <sys/isa_defs.h>
     48 #include <sys/zone.h>
     49 #include <sys/sdt.h>
     50 
     51 #include <rpc/types.h>
     52 #include <rpc/xdr.h>
     53 #include <rpc/auth.h>
     54 #include <rpc/clnt.h>
     55 #include <rpc/rpc_msg.h>
     56 #include <rpc/rpc_rdma.h>
     57 #include <nfs/nfs.h>
     58 #include <nfs/nfs4_kprot.h>
     59 
     60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
     61 
     62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
     63 			    XDR *, xdrproc_t, caddr_t);
     64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
     65 		    XDR **, uint_t *);
     66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
     67 static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
     68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
     69 static void clnt_check_credit(CONN *);
     70 static void clnt_return_credit(CONN *);
     71 static void clnt_decode_long_reply(CONN *, struct clist *,
     72 		struct clist *, XDR *, XDR **, struct clist *,
     73 		struct clist *, uint_t, uint_t);
     74 
     75 static void clnt_update_credit(CONN *, uint32_t);
     76 
     77 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
     78     caddr_t, xdrproc_t, caddr_t, struct timeval);
     79 static void	clnt_rdma_kabort(CLIENT *);
     80 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
     81 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
     82 static void	clnt_rdma_kdestroy(CLIENT *);
     83 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
     84 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
     85     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
     86 
     87 /*
     88  * Operations vector for RDMA based RPC
     89  */
     90 static struct clnt_ops rdma_clnt_ops = {
     91 	clnt_rdma_kcallit,	/* do rpc call */
     92 	clnt_rdma_kabort,	/* abort call */
     93 	clnt_rdma_kerror,	/* return error status */
     94 	clnt_rdma_kfreeres,	/* free results */
     95 	clnt_rdma_kdestroy,	/* destroy rpc handle */
     96 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
     97 	clnt_rdma_ksettimers,	/* set retry timers */
     98 };
     99 
    100 /*
    101  * The size of the preserialized RPC header information.
    102  */
    103 #define	CKU_HDRSIZE	20
    104 #define	CLNT_RDMA_SUCCESS 0
    105 #define	CLNT_RDMA_FAIL (-1)
    106 
    107 #define	AUTH_REFRESH_COUNT 2
    108 
    109 #define	IS_RPCSEC_GSS(authh)			\
    110 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
    111 
    112 /*
    113  * Per RPC RDMA endpoint details
    114  */
    115 typedef struct cku_private {
    116 	CLIENT			cku_client;	/* client handle */
    117 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
    118 	void			*cku_rd_handle;	/* underlying RDMA device */
    119 	struct netbuf		cku_srcaddr;	/* source address for retries */
    120 	struct netbuf		cku_addr;	/* remote netbuf address */
    121 	int			cku_addrfmly;	/* for finding addr_type */
    122 	struct rpc_err		cku_err;	/* error status */
    123 	struct cred		*cku_cred;	/* credentials */
    124 	XDR			cku_outxdr;	/* xdr stream for output */
    125 	uint32_t		cku_outsz;
    126 	XDR			cku_inxdr;	/* xdr stream for input */
    127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
    128 	uint32_t		cku_xid;	/* current XID */
    129 } cku_private_t;
    130 
    131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
    132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
    133 
    134 struct {
    135 	kstat_named_t	rccalls;
    136 	kstat_named_t	rcbadcalls;
    137 	kstat_named_t	rcbadxids;
    138 	kstat_named_t	rctimeouts;
    139 	kstat_named_t	rcnewcreds;
    140 	kstat_named_t	rcbadverfs;
    141 	kstat_named_t	rctimers;
    142 	kstat_named_t	rccantconn;
    143 	kstat_named_t	rcnomem;
    144 	kstat_named_t	rcintrs;
    145 	kstat_named_t	rclongrpcs;
    146 } rdmarcstat = {
    147 	{ "calls",	KSTAT_DATA_UINT64 },
    148 	{ "badcalls",	KSTAT_DATA_UINT64 },
    149 	{ "badxids",	KSTAT_DATA_UINT64 },
    150 	{ "timeouts",	KSTAT_DATA_UINT64 },
    151 	{ "newcreds",	KSTAT_DATA_UINT64 },
    152 	{ "badverfs",	KSTAT_DATA_UINT64 },
    153 	{ "timers",	KSTAT_DATA_UINT64 },
    154 	{ "cantconn",	KSTAT_DATA_UINT64 },
    155 	{ "nomem",	KSTAT_DATA_UINT64 },
    156 	{ "interrupts", KSTAT_DATA_UINT64 },
    157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
    158 };
    159 
    160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
    161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
    162 
    163 #ifdef DEBUG
    164 int rdma_clnt_debug = 0;
    165 #endif
    166 
    167 #ifdef accurate_stats
    168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
    169 
    170 #define	RCSTAT_INCR(x)			\
    171 	mutex_enter(&rdmarcstat_lock);	\
    172 	rdmarcstat.x.value.ui64++;	\
    173 	mutex_exit(&rdmarcstat_lock);
    174 #else
    175 #define	RCSTAT_INCR(x)			\
    176 	rdmarcstat.x.value.ui64++;
    177 #endif
    178 
    179 #define	ptoh(p)		(&((p)->cku_client))
    180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
    181 
    182 uint_t
    183 calc_length(uint_t len)
    184 {
    185 	len = RNDUP(len);
    186 
    187 	if (len <= 64 * 1024) {
    188 		if (len > 32 * 1024) {
    189 			len = 64 * 1024;
    190 		} else {
    191 			if (len > 16 * 1024) {
    192 				len = 32 * 1024;
    193 			} else {
    194 				if (len > 8 * 1024) {
    195 					len = 16 * 1024;
    196 				} else {
    197 					len = 8 * 1024;
    198 				}
    199 			}
    200 		}
    201 	}
    202 	return (len);
    203 }
    204 int
    205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
    206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
    207 {
    208 	CLIENT *h;
    209 	struct cku_private *p;
    210 	struct rpc_msg call_msg;
    211 	rdma_registry_t *rp;
    212 
    213 	ASSERT(INGLOBALZONE(curproc));
    214 
    215 	if (cl == NULL)
    216 		return (EINVAL);
    217 	*cl = NULL;
    218 
    219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
    220 
    221 	/*
    222 	 * Find underlying RDMATF plugin
    223 	 */
    224 	rw_enter(&rdma_lock, RW_READER);
    225 	rp = rdma_mod_head;
    226 	while (rp != NULL) {
    227 		if (strcmp(rp->r_mod->rdma_api, proto))
    228 			rp = rp->r_next;
    229 		else {
    230 			p->cku_rd_mod = rp->r_mod;
    231 			p->cku_rd_handle = handle;
    232 			break;
    233 		}
    234 	}
    235 	rw_exit(&rdma_lock);
    236 
    237 	if (p->cku_rd_mod == NULL) {
    238 		/*
    239 		 * Should not happen.
    240 		 * No matching RDMATF plugin.
    241 		 */
    242 		kmem_free(p, sizeof (struct cku_private));
    243 		return (EINVAL);
    244 	}
    245 
    246 	h = ptoh(p);
    247 	h->cl_ops = &rdma_clnt_ops;
    248 	h->cl_private = (caddr_t)p;
    249 	h->cl_auth = authkern_create();
    250 
    251 	/* call message, just used to pre-serialize below */
    252 	call_msg.rm_xid = 0;
    253 	call_msg.rm_direction = CALL;
    254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
    255 	call_msg.rm_call.cb_prog = pgm;
    256 	call_msg.rm_call.cb_vers = vers;
    257 
    258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
    259 	/* pre-serialize call message header */
    260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
    261 		XDR_DESTROY(&p->cku_outxdr);
    262 		auth_destroy(h->cl_auth);
    263 		kmem_free(p, sizeof (struct cku_private));
    264 		return (EINVAL);
    265 	}
    266 
    267 	/*
    268 	 * Set up the rpc information
    269 	 */
    270 	p->cku_cred = cred;
    271 	p->cku_srcaddr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    272 	p->cku_srcaddr.maxlen = raddr->maxlen;
    273 	p->cku_srcaddr.len = 0;
    274 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    275 	p->cku_addr.maxlen = raddr->maxlen;
    276 	p->cku_addr.len = raddr->len;
    277 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
    278 	p->cku_addrfmly = family;
    279 
    280 	*cl = h;
    281 	return (0);
    282 }
    283 
    284 static void
    285 clnt_rdma_kdestroy(CLIENT *h)
    286 {
    287 	struct cku_private *p = htop(h);
    288 
    289 	kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
    290 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
    291 	kmem_free(p, sizeof (*p));
    292 }
    293 
    294 void
    295 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
    296     struct cred *cred)
    297 {
    298 	struct cku_private *p = htop(h);
    299 	rdma_registry_t *rp;
    300 
    301 	ASSERT(INGLOBALZONE(curproc));
    302 	/*
    303 	 * Find underlying RDMATF plugin
    304 	 */
    305 	p->cku_rd_mod = NULL;
    306 	rw_enter(&rdma_lock, RW_READER);
    307 	rp = rdma_mod_head;
    308 	while (rp != NULL) {
    309 		if (strcmp(rp->r_mod->rdma_api, proto))
    310 			rp = rp->r_next;
    311 		else {
    312 			p->cku_rd_mod = rp->r_mod;
    313 			p->cku_rd_handle = handle;
    314 			break;
    315 		}
    316 
    317 	}
    318 	rw_exit(&rdma_lock);
    319 
    320 	/*
    321 	 * Set up the rpc information
    322 	 */
    323 	p->cku_cred = cred;
    324 	p->cku_xid = 0;
    325 
    326 	if (p->cku_addr.maxlen < raddr->len) {
    327 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
    328 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
    329 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
    330 		p->cku_addr.maxlen = raddr->maxlen;
    331 	}
    332 
    333 	p->cku_srcaddr.len = 0;
    334 
    335 	p->cku_addr.len = raddr->len;
    336 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
    337 	h->cl_ops = &rdma_clnt_ops;
    338 }
    339 
    340 static int
    341 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
    342     rdma_buf_t *rpcmsg, XDR *xdrs,
    343     xdrproc_t xdr_args, caddr_t argsp)
    344 {
    345 	cku_private_t *p = htop(h);
    346 
    347 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
    348 		/*
    349 		 * Copy in the preserialized RPC header
    350 		 * information.
    351 		 */
    352 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
    353 
    354 		/*
    355 		 * transaction id is the 1st thing in the output
    356 		 * buffer.
    357 		 */
    358 		/* LINTED pointer alignment */
    359 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
    360 
    361 		/* Skip the preserialized stuff. */
    362 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
    363 
    364 		/* Serialize dynamic stuff into the output buffer. */
    365 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
    366 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
    367 		    (!(*xdr_args)(xdrs, argsp))) {
    368 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
    369 			return (CLNT_RDMA_FAIL);
    370 		}
    371 		p->cku_outsz = XDR_GETPOS(xdrs);
    372 	} else {
    373 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
    374 		IXDR_PUT_U_INT32(uproc, procnum);
    375 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
    376 		XDR_SETPOS(xdrs, 0);
    377 
    378 		/* Serialize the procedure number and the arguments. */
    379 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
    380 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
    381 			if (rpcmsg->addr != xdrs->x_base) {
    382 				rpcmsg->addr = xdrs->x_base;
    383 				rpcmsg->len = xdr_getbufsize(xdrs);
    384 			}
    385 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
    386 			return (CLNT_RDMA_FAIL);
    387 		}
    388 		/*
    389 		 * If we had to allocate a new buffer while encoding
    390 		 * then update the addr and len.
    391 		 */
    392 		if (rpcmsg->addr != xdrs->x_base) {
    393 			rpcmsg->addr = xdrs->x_base;
    394 			rpcmsg->len = xdr_getbufsize(xdrs);
    395 		}
    396 
    397 		p->cku_outsz = XDR_GETPOS(xdrs);
    398 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
    399 	}
    400 
    401 	return (CLNT_RDMA_SUCCESS);
    402 }
    403 
    404 static int
    405 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
    406     XDR **xdrs, uint_t *op)
    407 {
    408 	cku_private_t *p = htop(h);
    409 	uint_t vers;
    410 	uint32_t rdma_credit = rdma_bufs_rqst;
    411 
    412 	vers = RPCRDMA_VERS;
    413 	clmsg->type = SEND_BUFFER;
    414 
    415 	if (rdma_buf_alloc(conn, clmsg)) {
    416 		return (CLNT_RDMA_FAIL);
    417 	}
    418 
    419 	*xdrs = &p->cku_outxdr;
    420 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
    421 
    422 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
    423 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
    424 	(void) xdr_u_int(*xdrs, &vers);
    425 	(void) xdr_u_int(*xdrs, &rdma_credit);
    426 	(void) xdr_u_int(*xdrs, op);
    427 
    428 	return (CLNT_RDMA_SUCCESS);
    429 }
    430 
    431 /*
    432  * If xp_cl is NULL value, then the RPC payload will NOT carry
    433  * an RDMA READ chunk list, in this case we insert FALSE into
    434  * the XDR stream. Otherwise we use the clist and RDMA register
    435  * the memory and encode the clist into the outbound XDR stream.
    436  */
    437 static int
    438 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
    439 {
    440 	int status;
    441 	struct clist *rclp;
    442 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
    443 
    444 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
    445 
    446 	if (rclp != NULL) {
    447 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
    448 		if (status != RDMA_SUCCESS) {
    449 			return (CLNT_RDMA_FAIL);
    450 		}
    451 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
    452 	}
    453 	(void) xdr_do_clist(xdrs, &rclp);
    454 
    455 	return (CLNT_RDMA_SUCCESS);
    456 }
    457 
    458 /*
    459  * If xp_wcl is NULL value, then the RPC payload will NOT carry
    460  * an RDMA WRITE chunk list, in this case we insert FALSE into
    461  * the XDR stream. Otherwise we use the clist and  RDMA register
    462  * the memory and encode the clist into the outbound XDR stream.
    463  */
    464 static int
    465 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
    466 {
    467 	int status;
    468 	struct clist *wlist, *rndcl;
    469 	int wlen, rndlen;
    470 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
    471 
    472 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
    473 
    474 	if (wlist != NULL) {
    475 		/*
    476 		 * If we are sending a non 4-byte alligned length
    477 		 * the server will roundup the length to 4-byte
    478 		 * boundary. In such a case, a trailing chunk is
    479 		 * added to take any spill over roundup bytes.
    480 		 */
    481 		wlen = clist_len(wlist);
    482 		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
    483 		if (rndlen) {
    484 			rndcl = clist_alloc();
    485 			/*
    486 			 * calc_length() will allocate a PAGESIZE
    487 			 * buffer below.
    488 			 */
    489 			rndcl->c_len = calc_length(rndlen);
    490 			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
    491 			rndcl->rb_longbuf.len = rndcl->c_len;
    492 			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
    493 				clist_free(rndcl);
    494 				return (CLNT_RDMA_FAIL);
    495 			}
    496 
    497 			/* Roundup buffer freed back in caller */
    498 			*rndbuf = rndcl->rb_longbuf;
    499 
    500 			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
    501 			rndcl->c_next = NULL;
    502 			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
    503 			wlist->c_next = rndcl;
    504 		}
    505 
    506 		status = clist_register(conn, wlist, CLIST_REG_DST);
    507 		if (status != RDMA_SUCCESS) {
    508 			rdma_buf_free(conn, rndbuf);
    509 			bzero(rndbuf, sizeof (rdma_buf_t));
    510 			return (CLNT_RDMA_FAIL);
    511 		}
    512 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
    513 	}
    514 
    515 	if (!xdr_encode_wlist(xdrs, wlist)) {
    516 		if (rndlen) {
    517 			rdma_buf_free(conn, rndbuf);
    518 			bzero(rndbuf, sizeof (rdma_buf_t));
    519 		}
    520 		return (CLNT_RDMA_FAIL);
    521 	}
    522 
    523 	return (CLNT_RDMA_SUCCESS);
    524 }
    525 
    526 static int
    527 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
    528 {
    529 	if (length == 0) {
    530 		*clpp = NULL;
    531 		return (CLNT_RDMA_SUCCESS);
    532 	}
    533 
    534 	*clpp = clist_alloc();
    535 
    536 	(*clpp)->rb_longbuf.len = calc_length(length);
    537 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
    538 
    539 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
    540 		clist_free(*clpp);
    541 		*clpp = NULL;
    542 		return (CLNT_RDMA_FAIL);
    543 	}
    544 
    545 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
    546 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
    547 	(*clpp)->c_next = NULL;
    548 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
    549 
    550 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
    551 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
    552 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
    553 		clist_free(*clpp);
    554 		*clpp = NULL;
    555 		return (CLNT_RDMA_FAIL);
    556 	}
    557 
    558 	return (CLNT_RDMA_SUCCESS);
    559 }
    560 
    561 /* ARGSUSED */
    562 static enum clnt_stat
    563 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
    564     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
    565     struct timeval wait)
    566 {
    567 	cku_private_t *p = htop(h);
    568 
    569 	int 	try_call_again;
    570 	int	refresh_attempt = AUTH_REFRESH_COUNT;
    571 	int 	status;
    572 	int 	msglen;
    573 
    574 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
    575 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
    576 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
    577 
    578 	struct rpc_msg 	reply_msg;
    579 	rdma_registry_t	*m;
    580 
    581 	struct clist *cl_sendlist;
    582 	struct clist *cl_recvlist;
    583 	struct clist *cl;
    584 	struct clist *cl_rpcmsg;
    585 	struct clist *cl_rdma_reply;
    586 	struct clist *cl_rpcreply_wlist;
    587 	struct clist *cl_long_reply;
    588 	rdma_buf_t  rndup;
    589 
    590 	uint_t vers;
    591 	uint_t op;
    592 	uint_t off;
    593 	uint32_t seg_array_len;
    594 	uint_t long_reply_len;
    595 	uint_t rpcsec_gss;
    596 	uint_t gss_i_or_p;
    597 
    598 	CONN *conn = NULL;
    599 	rdma_buf_t clmsg;
    600 	rdma_buf_t rpcmsg;
    601 	rdma_chunkinfo_lengths_t rcil;
    602 
    603 	clock_t	ticks;
    604 	bool_t wlist_exists_reply;
    605 
    606 	uint32_t rdma_credit = rdma_bufs_rqst;
    607 
    608 	RCSTAT_INCR(rccalls);
    609 
    610 call_again:
    611 
    612 	bzero(&clmsg, sizeof (clmsg));
    613 	bzero(&rpcmsg, sizeof (rpcmsg));
    614 	bzero(&rndup, sizeof (rndup));
    615 	try_call_again = 0;
    616 	cl_sendlist = NULL;
    617 	cl_recvlist = NULL;
    618 	cl = NULL;
    619 	cl_rpcmsg = NULL;
    620 	cl_rdma_reply = NULL;
    621 	call_xdrp = NULL;
    622 	reply_xdrp = NULL;
    623 	wlist_exists_reply  = FALSE;
    624 	cl_rpcreply_wlist = NULL;
    625 	cl_long_reply = NULL;
    626 	rcil.rcil_len = 0;
    627 	rcil.rcil_len_alt = 0;
    628 	long_reply_len = 0;
    629 
    630 	rw_enter(&rdma_lock, RW_READER);
    631 	m = (rdma_registry_t *)p->cku_rd_handle;
    632 	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
    633 		/*
    634 		 * If we didn't find a matching RDMA module in the registry
    635 		 * then there is no transport.
    636 		 */
    637 		rw_exit(&rdma_lock);
    638 		p->cku_err.re_status = RPC_CANTSEND;
    639 		p->cku_err.re_errno = EIO;
    640 		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
    641 		if (h->cl_nosignal == TRUE) {
    642 			delay(ticks);
    643 		} else {
    644 			if (delay_sig(ticks) == EINTR) {
    645 				p->cku_err.re_status = RPC_INTR;
    646 				p->cku_err.re_errno = EINTR;
    647 			}
    648 		}
    649 		return (RPC_CANTSEND);
    650 	}
    651 	/*
    652 	 * Get unique xid
    653 	 */
    654 	if (p->cku_xid == 0)
    655 		p->cku_xid = alloc_xid();
    656 
    657 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_srcaddr,
    658 	    &p->cku_addr, p->cku_addrfmly, p->cku_rd_handle, &conn);
    659 	rw_exit(&rdma_lock);
    660 
    661 	/*
    662 	 * If there is a problem with the connection reflect the issue
    663 	 * back to the higher level to address, we MAY delay for a short
    664 	 * period so that we are kind to the transport.
    665 	 */
    666 	if (conn == NULL) {
    667 		/*
    668 		 * Connect failed to server. Could be because of one
    669 		 * of several things. In some cases we don't want
    670 		 * the caller to retry immediately - delay before
    671 		 * returning to caller.
    672 		 */
    673 		switch (status) {
    674 		case RDMA_TIMEDOUT:
    675 			/*
    676 			 * Already timed out. No need to delay
    677 			 * some more.
    678 			 */
    679 			p->cku_err.re_status = RPC_TIMEDOUT;
    680 			p->cku_err.re_errno = ETIMEDOUT;
    681 			break;
    682 		case RDMA_INTR:
    683 			/*
    684 			 * Failed because of an signal. Very likely
    685 			 * the caller will not retry.
    686 			 */
    687 			p->cku_err.re_status = RPC_INTR;
    688 			p->cku_err.re_errno = EINTR;
    689 			break;
    690 		default:
    691 			/*
    692 			 * All other failures - server down or service
    693 			 * down or temporary resource failure. Delay before
    694 			 * returning to caller.
    695 			 */
    696 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
    697 			p->cku_err.re_status = RPC_CANTCONNECT;
    698 			p->cku_err.re_errno = EIO;
    699 
    700 			if (h->cl_nosignal == TRUE) {
    701 				delay(ticks);
    702 			} else {
    703 				if (delay_sig(ticks) == EINTR) {
    704 					p->cku_err.re_status = RPC_INTR;
    705 					p->cku_err.re_errno = EINTR;
    706 				}
    707 			}
    708 			break;
    709 		}
    710 
    711 		return (p->cku_err.re_status);
    712 	}
    713 
    714 	if (p->cku_srcaddr.maxlen < conn->c_laddr.len) {
    715 		if ((p->cku_srcaddr.maxlen != 0) &&
    716 		    (p->cku_srcaddr.buf != NULL))
    717 			kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
    718 		p->cku_srcaddr.buf = kmem_zalloc(conn->c_laddr.maxlen,
    719 		    KM_SLEEP);
    720 		p->cku_srcaddr.maxlen = conn->c_laddr.maxlen;
    721 	}
    722 
    723 	p->cku_srcaddr.len = conn->c_laddr.len;
    724 	bcopy(conn->c_laddr.buf, p->cku_srcaddr.buf, conn->c_laddr.len);
    725 
    726 	clnt_check_credit(conn);
    727 
    728 	status = CLNT_RDMA_FAIL;
    729 
    730 	rpcsec_gss = gss_i_or_p = FALSE;
    731 
    732 	if (IS_RPCSEC_GSS(h)) {
    733 		rpcsec_gss = TRUE;
    734 		if (rpc_gss_get_service_type(h->cl_auth) ==
    735 		    rpc_gss_svc_integrity ||
    736 		    rpc_gss_get_service_type(h->cl_auth) ==
    737 		    rpc_gss_svc_privacy)
    738 			gss_i_or_p = TRUE;
    739 	}
    740 
    741 	/*
    742 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
    743 	 * or if RPCSEC_GSS is being used for authentication only.
    744 	 */
    745 	if (rpcsec_gss == FALSE ||
    746 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
    747 		/*
    748 		 * Grab a send buffer for the request.  Try to
    749 		 * encode it to see if it fits. If not, then it
    750 		 * needs to be sent in a chunk.
    751 		 */
    752 		rpcmsg.type = SEND_BUFFER;
    753 		if (rdma_buf_alloc(conn, &rpcmsg)) {
    754 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
    755 			goto done;
    756 		}
    757 
    758 		/* First try to encode into regular send buffer */
    759 		op = RDMA_MSG;
    760 
    761 		call_xdrp = &callxdr;
    762 
    763 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
    764 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
    765 
    766 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
    767 		    xdr_args, argsp);
    768 
    769 		if (status != CLNT_RDMA_SUCCESS) {
    770 			/* Clean up from previous encode attempt */
    771 			rdma_buf_free(conn, &rpcmsg);
    772 			XDR_DESTROY(call_xdrp);
    773 		} else {
    774 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
    775 		}
    776 	}
    777 
    778 	/* If the encode didn't work, then try a NOMSG */
    779 	if (status != CLNT_RDMA_SUCCESS) {
    780 
    781 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
    782 		    xdr_sizeof(xdr_args, argsp);
    783 
    784 		msglen = calc_length(msglen);
    785 
    786 		/* pick up the lengths for the reply buffer needed */
    787 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
    788 		    &rcil.rcil_len, &rcil.rcil_len_alt);
    789 
    790 		/*
    791 		 * Construct a clist to describe the CHUNK_BUFFER
    792 		 * for the rpcmsg.
    793 		 */
    794 		cl_rpcmsg = clist_alloc();
    795 		cl_rpcmsg->c_len = msglen;
    796 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
    797 		cl_rpcmsg->rb_longbuf.len = msglen;
    798 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
    799 			clist_free(cl_rpcmsg);
    800 			goto done;
    801 		}
    802 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
    803 
    804 		op = RDMA_NOMSG;
    805 		call_xdrp = &callxdr;
    806 
    807 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
    808 		    cl_rpcmsg->rb_longbuf.len, 0,
    809 		    cl_rpcmsg, XDR_ENCODE, conn);
    810 
    811 		status = clnt_compose_rpcmsg(h, procnum, &cl_rpcmsg->rb_longbuf,
    812 		    call_xdrp, xdr_args, argsp);
    813 
    814 		DTRACE_PROBE2(krpc__i__clntrdma__callit__longbuf, int, status,
    815 		    int, msglen);
    816 		if (status != CLNT_RDMA_SUCCESS) {
    817 			p->cku_err.re_status = RPC_CANTENCODEARGS;
    818 			p->cku_err.re_errno = EIO;
    819 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
    820 			goto done;
    821 		}
    822 	}
    823 
    824 	/*
    825 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
    826 	 * RDMA WRITE clist.
    827 	 *
    828 	 * First pull the RDMA READ chunk list from the XDR private
    829 	 * area to keep it handy.
    830 	 */
    831 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
    832 
    833 	if (gss_i_or_p) {
    834 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
    835 		long_reply_len += MAX_AUTH_BYTES;
    836 	} else {
    837 		long_reply_len = rcil.rcil_len;
    838 	}
    839 
    840 	/*
    841 	 * Update the chunk size information for the Long RPC msg.
    842 	 */
    843 	if (cl && op == RDMA_NOMSG)
    844 		cl->c_len = p->cku_outsz;
    845 
    846 	/*
    847 	 * Prepare the RDMA header. On success xdrs will hold the result
    848 	 * of xdrmem_create() for a SEND_BUFFER.
    849 	 */
    850 	status = clnt_compose_rdma_header(conn, h, &clmsg,
    851 	    &rdmahdr_o_xdrs, &op);
    852 
    853 	if (status != CLNT_RDMA_SUCCESS) {
    854 		p->cku_err.re_status = RPC_CANTSEND;
    855 		p->cku_err.re_errno = EIO;
    856 		RCSTAT_INCR(rcnomem);
    857 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
    858 		goto done;
    859 	}
    860 
    861 	/*
    862 	 * Now insert the RDMA READ list iff present
    863 	 */
    864 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
    865 	if (status != CLNT_RDMA_SUCCESS) {
    866 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
    867 		rdma_buf_free(conn, &clmsg);
    868 		p->cku_err.re_status = RPC_CANTSEND;
    869 		p->cku_err.re_errno = EIO;
    870 		goto done;
    871 	}
    872 
    873 	/*
    874 	 * Setup RDMA WRITE chunk list for nfs read operation
    875 	 * other operations will have a NULL which will result
    876 	 * as a NULL list in the XDR stream.
    877 	 */
    878 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
    879 	if (status != CLNT_RDMA_SUCCESS) {
    880 		rdma_buf_free(conn, &clmsg);
    881 		p->cku_err.re_status = RPC_CANTSEND;
    882 		p->cku_err.re_errno = EIO;
    883 		goto done;
    884 	}
    885 
    886 	/*
    887 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
    888 	 * large responses can flow back to the client.
    889 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
    890 	 */
    891 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
    892 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
    893 		long_reply_len += 1024;
    894 
    895 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
    896 
    897 	DTRACE_PROBE2(krpc__i__clntrdma__callit__longreply, int, status,
    898 	    int, long_reply_len);
    899 
    900 	if (status != CLNT_RDMA_SUCCESS) {
    901 		rdma_buf_free(conn, &clmsg);
    902 		p->cku_err.re_status = RPC_CANTSEND;
    903 		p->cku_err.re_errno = EIO;
    904 		goto done;
    905 	}
    906 
    907 	/*
    908 	 * XDR encode the RDMA_REPLY write chunk
    909 	 */
    910 	seg_array_len = (cl_long_reply ? 1 : 0);
    911 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
    912 	    seg_array_len);
    913 
    914 	/*
    915 	 * Construct a clist in "sendlist" that represents what we
    916 	 * will push over the wire.
    917 	 *
    918 	 * Start with the RDMA header and clist (if any)
    919 	 */
    920 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
    921 	    clmsg.addr, NULL, NULL);
    922 
    923 	/*
    924 	 * Put the RPC call message in  sendlist if small RPC
    925 	 */
    926 	if (op == RDMA_MSG) {
    927 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
    928 		    rpcmsg.addr, NULL, NULL);
    929 	} else {
    930 		/* Long RPC already in chunk list */
    931 		RCSTAT_INCR(rclongrpcs);
    932 	}
    933 
    934 	/*
    935 	 * Set up a reply buffer ready for the reply
    936 	 */
    937 	status = rdma_clnt_postrecv(conn, p->cku_xid);
    938 	if (status != RDMA_SUCCESS) {
    939 		rdma_buf_free(conn, &clmsg);
    940 		p->cku_err.re_status = RPC_CANTSEND;
    941 		p->cku_err.re_errno = EIO;
    942 		goto done;
    943 	}
    944 
    945 	/*
    946 	 * sync the memory for dma
    947 	 */
    948 	if (cl != NULL) {
    949 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
    950 		if (status != RDMA_SUCCESS) {
    951 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
    952 			rdma_buf_free(conn, &clmsg);
    953 			p->cku_err.re_status = RPC_CANTSEND;
    954 			p->cku_err.re_errno = EIO;
    955 			goto done;
    956 		}
    957 	}
    958 
    959 	/*
    960 	 * Send the RDMA Header and RPC call message to the server
    961 	 */
    962 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
    963 	if (status != RDMA_SUCCESS) {
    964 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
    965 		p->cku_err.re_status = RPC_CANTSEND;
    966 		p->cku_err.re_errno = EIO;
    967 		goto done;
    968 	}
    969 
    970 	/*
    971 	 * RDMA plugin now owns the send msg buffers.
    972 	 * Clear them out and don't free them.
    973 	 */
    974 	clmsg.addr = NULL;
    975 	if (rpcmsg.type == SEND_BUFFER)
    976 		rpcmsg.addr = NULL;
    977 
    978 	/*
    979 	 * Recv rpc reply
    980 	 */
    981 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
    982 
    983 	/*
    984 	 * Now check recv status
    985 	 */
    986 	if (status != 0) {
    987 		if (status == RDMA_INTR) {
    988 			p->cku_err.re_status = RPC_INTR;
    989 			p->cku_err.re_errno = EINTR;
    990 			RCSTAT_INCR(rcintrs);
    991 		} else if (status == RPC_TIMEDOUT) {
    992 			p->cku_err.re_status = RPC_TIMEDOUT;
    993 			p->cku_err.re_errno = ETIMEDOUT;
    994 			RCSTAT_INCR(rctimeouts);
    995 		} else {
    996 			p->cku_err.re_status = RPC_CANTRECV;
    997 			p->cku_err.re_errno = EIO;
    998 		}
    999 		goto done;
   1000 	}
   1001 
   1002 	/*
   1003 	 * Process the reply message.
   1004 	 *
   1005 	 * First the chunk list (if any)
   1006 	 */
   1007 	rdmahdr_i_xdrs = &(p->cku_inxdr);
   1008 	xdrmem_create(rdmahdr_i_xdrs,
   1009 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
   1010 	    cl_recvlist->c_len, XDR_DECODE);
   1011 
   1012 	/*
   1013 	 * Treat xid as opaque (xid is the first entity
   1014 	 * in the rpc rdma message).
   1015 	 * Skip xid and set the xdr position accordingly.
   1016 	 */
   1017 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
   1018 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
   1019 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
   1020 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
   1021 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
   1022 
   1023 	clnt_update_credit(conn, rdma_credit);
   1024 
   1025 	wlist_exists_reply = FALSE;
   1026 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
   1027 	    &wlist_exists_reply)) {
   1028 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
   1029 		p->cku_err.re_status = RPC_CANTDECODERES;
   1030 		p->cku_err.re_errno = EIO;
   1031 		goto done;
   1032 	}
   1033 
   1034 	/*
   1035 	 * The server shouldn't have sent a RDMA_SEND that
   1036 	 * the client needs to RDMA_WRITE a reply back to
   1037 	 * the server.  So silently ignoring what the
   1038 	 * server returns in the rdma_reply section of the
   1039 	 * header.
   1040 	 */
   1041 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
   1042 	off = xdr_getpos(rdmahdr_i_xdrs);
   1043 
   1044 	clnt_decode_long_reply(conn, cl_long_reply,
   1045 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
   1046 	    cl, cl_recvlist, op, off);
   1047 
   1048 	if (reply_xdrp == NULL)
   1049 		goto done;
   1050 
   1051 	if (wlist_exists_reply) {
   1052 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
   1053 	}
   1054 
   1055 	reply_msg.rm_direction = REPLY;
   1056 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
   1057 	reply_msg.acpted_rply.ar_stat = SUCCESS;
   1058 	reply_msg.acpted_rply.ar_verf = _null_auth;
   1059 
   1060 	/*
   1061 	 *  xdr_results will be done in AUTH_UNWRAP.
   1062 	 */
   1063 	reply_msg.acpted_rply.ar_results.where = NULL;
   1064 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
   1065 
   1066 	/*
   1067 	 * Decode and validate the response.
   1068 	 */
   1069 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
   1070 		enum clnt_stat re_status;
   1071 
   1072 		_seterr_reply(&reply_msg, &(p->cku_err));
   1073 
   1074 		re_status = p->cku_err.re_status;
   1075 		if (re_status == RPC_SUCCESS) {
   1076 			/*
   1077 			 * Reply is good, check auth.
   1078 			 */
   1079 			if (!AUTH_VALIDATE(h->cl_auth,
   1080 			    &reply_msg.acpted_rply.ar_verf)) {
   1081 				p->cku_err.re_status = RPC_AUTHERROR;
   1082 				p->cku_err.re_why = AUTH_INVALIDRESP;
   1083 				RCSTAT_INCR(rcbadverfs);
   1084 				DTRACE_PROBE(
   1085 				    krpc__e__clntrdma__callit__authvalidate);
   1086 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
   1087 			    xdr_results, resultsp)) {
   1088 				p->cku_err.re_status = RPC_CANTDECODERES;
   1089 				p->cku_err.re_errno = EIO;
   1090 				DTRACE_PROBE(
   1091 				    krpc__e__clntrdma__callit__authunwrap);
   1092 			}
   1093 		} else {
   1094 			/* set errno in case we can't recover */
   1095 			if (re_status != RPC_VERSMISMATCH &&
   1096 			    re_status != RPC_AUTHERROR &&
   1097 			    re_status != RPC_PROGVERSMISMATCH)
   1098 				p->cku_err.re_errno = EIO;
   1099 
   1100 			if (re_status == RPC_AUTHERROR) {
   1101 				if ((refresh_attempt > 0) &&
   1102 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
   1103 				    p->cku_cred)) {
   1104 					refresh_attempt--;
   1105 					try_call_again = 1;
   1106 					goto done;
   1107 				}
   1108 
   1109 				try_call_again = 0;
   1110 
   1111 				/*
   1112 				 * We have used the client handle to
   1113 				 * do an AUTH_REFRESH and the RPC status may
   1114 				 * be set to RPC_SUCCESS; Let's make sure to
   1115 				 * set it to RPC_AUTHERROR.
   1116 				 */
   1117 				p->cku_err.re_status = RPC_AUTHERROR;
   1118 
   1119 				/*
   1120 				 * Map recoverable and unrecoverable
   1121 				 * authentication errors to appropriate
   1122 				 * errno
   1123 				 */
   1124 				switch (p->cku_err.re_why) {
   1125 				case AUTH_BADCRED:
   1126 				case AUTH_BADVERF:
   1127 				case AUTH_INVALIDRESP:
   1128 				case AUTH_TOOWEAK:
   1129 				case AUTH_FAILED:
   1130 				case RPCSEC_GSS_NOCRED:
   1131 				case RPCSEC_GSS_FAILED:
   1132 					p->cku_err.re_errno = EACCES;
   1133 					break;
   1134 				case AUTH_REJECTEDCRED:
   1135 				case AUTH_REJECTEDVERF:
   1136 				default:
   1137 					p->cku_err.re_errno = EIO;
   1138 					break;
   1139 				}
   1140 			}
   1141 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
   1142 			    int, p->cku_err.re_why);
   1143 		}
   1144 	} else {
   1145 		p->cku_err.re_status = RPC_CANTDECODERES;
   1146 		p->cku_err.re_errno = EIO;
   1147 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
   1148 	}
   1149 
   1150 done:
   1151 	clnt_return_credit(conn);
   1152 
   1153 	if (cl_sendlist != NULL)
   1154 		clist_free(cl_sendlist);
   1155 
   1156 	/*
   1157 	 * If rpc reply is in a chunk, free it now.
   1158 	 */
   1159 	if (cl_long_reply) {
   1160 		(void) clist_deregister(conn, cl_long_reply);
   1161 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
   1162 		clist_free(cl_long_reply);
   1163 	}
   1164 
   1165 	if (call_xdrp)
   1166 		XDR_DESTROY(call_xdrp);
   1167 
   1168 	if (rndup.rb_private) {
   1169 		rdma_buf_free(conn, &rndup);
   1170 	}
   1171 
   1172 	if (reply_xdrp) {
   1173 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
   1174 		XDR_DESTROY(reply_xdrp);
   1175 	}
   1176 
   1177 	if (cl_rdma_reply) {
   1178 		clist_free(cl_rdma_reply);
   1179 	}
   1180 
   1181 	if (cl_recvlist) {
   1182 		rdma_buf_t	recvmsg = {0};
   1183 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
   1184 		recvmsg.type = RECV_BUFFER;
   1185 		RDMA_BUF_FREE(conn, &recvmsg);
   1186 		clist_free(cl_recvlist);
   1187 	}
   1188 
   1189 	RDMA_REL_CONN(conn);
   1190 
   1191 	if (try_call_again)
   1192 		goto call_again;
   1193 
   1194 	if (p->cku_err.re_status != RPC_SUCCESS) {
   1195 		RCSTAT_INCR(rcbadcalls);
   1196 	}
   1197 	return (p->cku_err.re_status);
   1198 }
   1199 
   1200 
   1201 static void
   1202 clnt_decode_long_reply(CONN *conn,
   1203     struct clist *cl_long_reply,
   1204     struct clist *cl_rdma_reply, XDR *xdrs,
   1205     XDR **rxdrp, struct clist *cl,
   1206     struct clist *cl_recvlist,
   1207     uint_t  op, uint_t off)
   1208 {
   1209 	if (op != RDMA_NOMSG) {
   1210 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
   1211 		    int, cl_recvlist->c_len - off);
   1212 		xdrrdma_create(xdrs,
   1213 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
   1214 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
   1215 		*rxdrp = xdrs;
   1216 		return;
   1217 	}
   1218 
   1219 	/* op must be RDMA_NOMSG */
   1220 	if (cl) {
   1221 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
   1222 		return;
   1223 	}
   1224 
   1225 	if (cl_long_reply->u.c_daddr) {
   1226 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
   1227 		    int, cl_rdma_reply->c_len);
   1228 
   1229 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
   1230 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
   1231 
   1232 		*rxdrp = xdrs;
   1233 	}
   1234 }
   1235 
   1236 static void
   1237 clnt_return_credit(CONN *conn)
   1238 {
   1239 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
   1240 
   1241 	mutex_enter(&conn->c_lock);
   1242 	cc_info->clnt_cc_in_flight_ops--;
   1243 	cv_signal(&cc_info->clnt_cc_cv);
   1244 	mutex_exit(&conn->c_lock);
   1245 }
   1246 
   1247 static void
   1248 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
   1249 {
   1250 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
   1251 
   1252 	/*
   1253 	 * If the granted has not altered, avoid taking the
   1254 	 * mutex, to essentially do nothing..
   1255 	 */
   1256 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
   1257 		return;
   1258 	/*
   1259 	 * Get the granted number of buffers for credit control.
   1260 	 */
   1261 	mutex_enter(&conn->c_lock);
   1262 	cc_info->clnt_cc_granted_ops = rdma_credit;
   1263 	mutex_exit(&conn->c_lock);
   1264 }
   1265 
   1266 static void
   1267 clnt_check_credit(CONN *conn)
   1268 {
   1269 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
   1270 
   1271 	/*
   1272 	 * Make sure we are not going over our allowed buffer use
   1273 	 * (and make sure we have gotten a granted value before).
   1274 	 */
   1275 	mutex_enter(&conn->c_lock);
   1276 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
   1277 	    cc_info->clnt_cc_granted_ops != 0) {
   1278 		/*
   1279 		 * Client has maxed out its granted buffers due to
   1280 		 * credit control.  Current handling is to block and wait.
   1281 		 */
   1282 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
   1283 	}
   1284 	cc_info->clnt_cc_in_flight_ops++;
   1285 	mutex_exit(&conn->c_lock);
   1286 }
   1287 
   1288 /* ARGSUSED */
   1289 static void
   1290 clnt_rdma_kabort(CLIENT *h)
   1291 {
   1292 }
   1293 
   1294 static void
   1295 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
   1296 {
   1297 	struct cku_private *p = htop(h);
   1298 	*err = p->cku_err;
   1299 }
   1300 
   1301 static bool_t
   1302 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
   1303 {
   1304 	struct cku_private *p = htop(h);
   1305 	XDR *xdrs;
   1306 
   1307 	xdrs = &(p->cku_outxdr);
   1308 	xdrs->x_op = XDR_FREE;
   1309 	return ((*xdr_res)(xdrs, res_ptr));
   1310 }
   1311 
   1312 /* ARGSUSED */
   1313 static bool_t
   1314 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
   1315 {
   1316 	return (TRUE);
   1317 }
   1318 
   1319 /* ARGSUSED */
   1320 static int
   1321 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
   1322 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
   1323 	uint32_t xid)
   1324 {
   1325 	RCSTAT_INCR(rctimers);
   1326 	return (0);
   1327 }
   1328 
   1329 int
   1330 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
   1331 {
   1332 	rdma_registry_t	*rp;
   1333 	void *handle = NULL;
   1334 	struct knetconfig *knc;
   1335 	char *pf, *p;
   1336 	rdma_stat status;
   1337 	int error = 0;
   1338 
   1339 	if (!INGLOBALZONE(curproc))
   1340 		return (-1);
   1341 
   1342 	/*
   1343 	 * modload the RDMA plugins if not already done.
   1344 	 */
   1345 	if (!rdma_modloaded) {
   1346 		mutex_enter(&rdma_modload_lock);
   1347 		if (!rdma_modloaded) {
   1348 			error = rdma_modload();
   1349 		}
   1350 		mutex_exit(&rdma_modload_lock);
   1351 		if (error)
   1352 			return (-1);
   1353 	}
   1354 
   1355 	if (!rdma_dev_available)
   1356 		return (-1);
   1357 
   1358 	rw_enter(&rdma_lock, RW_READER);
   1359 	rp = rdma_mod_head;
   1360 	while (rp != NULL) {
   1361 		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
   1362 			rp = rp->r_next;
   1363 			continue;
   1364 		}
   1365 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
   1366 		    &handle);
   1367 		if (status == RDMA_SUCCESS) {
   1368 			knc = kmem_zalloc(sizeof (struct knetconfig),
   1369 			    KM_SLEEP);
   1370 			knc->knc_semantics = NC_TPI_RDMA;
   1371 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
   1372 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
   1373 			if (addr_type == AF_INET)
   1374 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
   1375 			else if (addr_type == AF_INET6)
   1376 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
   1377 			pf[KNC_STRSIZE - 1] = '\0';
   1378 
   1379 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
   1380 			p[KNC_STRSIZE - 1] = '\0';
   1381 
   1382 			knc->knc_protofmly = pf;
   1383 			knc->knc_proto = p;
   1384 			knc->knc_rdev = (dev_t)rp;
   1385 			*knconf = knc;
   1386 			rw_exit(&rdma_lock);
   1387 			return (0);
   1388 		}
   1389 		rp = rp->r_next;
   1390 	}
   1391 	rw_exit(&rdma_lock);
   1392 	return (-1);
   1393 }
   1394