Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 /* Copyright (c) 1990 Mentat Inc. */
     26 
     27 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
     28 /*	  All Rights Reserved  	*/
     29 
     30 /*
     31  * Kernel RPC filtering module
     32  */
     33 
     34 #include <sys/param.h>
     35 #include <sys/types.h>
     36 #include <sys/stream.h>
     37 #include <sys/stropts.h>
     38 #include <sys/strsubr.h>
     39 #include <sys/tihdr.h>
     40 #include <sys/timod.h>
     41 #include <sys/tiuser.h>
     42 #include <sys/debug.h>
     43 #include <sys/signal.h>
     44 #include <sys/pcb.h>
     45 #include <sys/user.h>
     46 #include <sys/errno.h>
     47 #include <sys/cred.h>
     48 #include <sys/policy.h>
     49 #include <sys/inline.h>
     50 #include <sys/cmn_err.h>
     51 #include <sys/kmem.h>
     52 #include <sys/file.h>
     53 #include <sys/sysmacros.h>
     54 #include <sys/systm.h>
     55 #include <sys/t_lock.h>
     56 #include <sys/ddi.h>
     57 #include <sys/vtrace.h>
     58 #include <sys/callb.h>
     59 #include <sys/strsun.h>
     60 
     61 #include <sys/strlog.h>
     62 #include <rpc/rpc_com.h>
     63 #include <inet/common.h>
     64 #include <rpc/types.h>
     65 #include <sys/time.h>
     66 #include <rpc/xdr.h>
     67 #include <rpc/auth.h>
     68 #include <rpc/clnt.h>
     69 #include <rpc/rpc_msg.h>
     70 #include <rpc/clnt.h>
     71 #include <rpc/svc.h>
     72 #include <rpc/rpcsys.h>
     73 #include <rpc/rpc_rdma.h>
     74 
     75 /*
     76  * This is the loadable module wrapper.
     77  */
     78 #include <sys/conf.h>
     79 #include <sys/modctl.h>
     80 #include <sys/syscall.h>
     81 
     82 extern struct streamtab rpcinfo;
     83 
     84 static struct fmodsw fsw = {
     85 	"rpcmod",
     86 	&rpcinfo,
     87 	D_NEW|D_MP,
     88 };
     89 
     90 /*
     91  * Module linkage information for the kernel.
     92  */
     93 
     94 static struct modlstrmod modlstrmod = {
     95 	&mod_strmodops, "rpc interface str mod", &fsw
     96 };
     97 
     98 /*
     99  * For the RPC system call.
    100  */
    101 static struct sysent rpcsysent = {
    102 	2,
    103 	SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
    104 	rpcsys
    105 };
    106 
    107 static struct modlsys modlsys = {
    108 	&mod_syscallops,
    109 	"RPC syscall",
    110 	&rpcsysent
    111 };
    112 
    113 #ifdef _SYSCALL32_IMPL
    114 static struct modlsys modlsys32 = {
    115 	&mod_syscallops32,
    116 	"32-bit RPC syscall",
    117 	&rpcsysent
    118 };
    119 #endif /* _SYSCALL32_IMPL */
    120 
    121 static struct modlinkage modlinkage = {
    122 	MODREV_1,
    123 	{
    124 		&modlsys,
    125 #ifdef _SYSCALL32_IMPL
    126 		&modlsys32,
    127 #endif
    128 		&modlstrmod,
    129 		NULL
    130 	}
    131 };
    132 
    133 int
    134 _init(void)
    135 {
    136 	int error = 0;
    137 	callb_id_t cid;
    138 	int status;
    139 
    140 	svc_init();
    141 	clnt_init();
    142 	cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
    143 
    144 	if (error = mod_install(&modlinkage)) {
    145 		/*
    146 		 * Could not install module, cleanup previous
    147 		 * initialization work.
    148 		 */
    149 		clnt_fini();
    150 		if (cid != NULL)
    151 			(void) callb_delete(cid);
    152 
    153 		return (error);
    154 	}
    155 
    156 	/*
    157 	 * Load up the RDMA plugins and initialize the stats. Even if the
    158 	 * plugins loadup fails, but rpcmod was successfully installed the
    159 	 * counters still get initialized.
    160 	 */
    161 	rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
    162 	mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
    163 
    164 	cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL);
    165 	mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL);
    166 
    167 	mt_kstat_init();
    168 
    169 	/*
    170 	 * Get our identification into ldi.  This is used for loading
    171 	 * other modules, e.g. rpcib.
    172 	 */
    173 	status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
    174 	if (status != 0) {
    175 		cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
    176 		rpcmod_li = NULL;
    177 	}
    178 
    179 	return (error);
    180 }
    181 
    182 /*
    183  * The unload entry point fails, because we advertise entry points into
    184  * rpcmod from the rest of kRPC: rpcmod_release().
    185  */
    186 int
    187 _fini(void)
    188 {
    189 	return (EBUSY);
    190 }
    191 
    192 int
    193 _info(struct modinfo *modinfop)
    194 {
    195 	return (mod_info(&modlinkage, modinfop));
    196 }
    197 
    198 extern int nulldev();
    199 
    200 #define	RPCMOD_ID	2049
    201 
    202 int rmm_open(), rmm_close();
    203 
    204 /*
    205  * To save instructions, since STREAMS ignores the return value
    206  * from these functions, they are defined as void here. Kind of icky, but...
    207  */
    208 void rmm_rput(queue_t *, mblk_t *);
    209 void rmm_wput(queue_t *, mblk_t *);
    210 void rmm_rsrv(queue_t *);
    211 void rmm_wsrv(queue_t *);
    212 
    213 int rpcmodopen(), rpcmodclose();
    214 void rpcmodrput(), rpcmodwput();
    215 void rpcmodrsrv(), rpcmodwsrv();
    216 
    217 static	void	rpcmodwput_other(queue_t *, mblk_t *);
    218 static	int	mir_close(queue_t *q);
    219 static	int	mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
    220 		    cred_t *credp);
    221 static	void	mir_rput(queue_t *q, mblk_t *mp);
    222 static	void	mir_rsrv(queue_t *q);
    223 static	void	mir_wput(queue_t *q, mblk_t *mp);
    224 static	void	mir_wsrv(queue_t *q);
    225 
    226 static struct module_info rpcmod_info =
    227 	{RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
    228 
    229 /*
    230  * Read side has no service procedure.
    231  */
    232 static struct qinit rpcmodrinit = {
    233 	(int (*)())rmm_rput,
    234 	(int (*)())rmm_rsrv,
    235 	rmm_open,
    236 	rmm_close,
    237 	nulldev,
    238 	&rpcmod_info,
    239 	NULL
    240 };
    241 
    242 /*
    243  * The write put procedure is simply putnext to conserve stack space.
    244  * The write service procedure is not used to queue data, but instead to
    245  * synchronize with flow control.
    246  */
    247 static struct qinit rpcmodwinit = {
    248 	(int (*)())rmm_wput,
    249 	(int (*)())rmm_wsrv,
    250 	rmm_open,
    251 	rmm_close,
    252 	nulldev,
    253 	&rpcmod_info,
    254 	NULL
    255 };
    256 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
    257 
    258 struct xprt_style_ops {
    259 	int (*xo_open)();
    260 	int (*xo_close)();
    261 	void (*xo_wput)();
    262 	void (*xo_wsrv)();
    263 	void (*xo_rput)();
    264 	void (*xo_rsrv)();
    265 };
    266 
    267 static struct xprt_style_ops xprt_clts_ops = {
    268 	rpcmodopen,
    269 	rpcmodclose,
    270 	rpcmodwput,
    271 	rpcmodwsrv,
    272 	rpcmodrput,
    273 	NULL
    274 };
    275 
    276 static struct xprt_style_ops xprt_cots_ops = {
    277 	mir_open,
    278 	mir_close,
    279 	mir_wput,
    280 	mir_wsrv,
    281 	mir_rput,
    282 	mir_rsrv
    283 };
    284 
    285 /*
    286  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
    287  */
    288 struct rpcm {
    289 	void		*rm_krpc_cell;	/* Reserved for use by KRPC */
    290 	struct		xprt_style_ops	*rm_ops;
    291 	int		rm_type;	/* Client or server side stream */
    292 #define	RM_CLOSING	0x1		/* somebody is trying to close slot */
    293 	uint_t		rm_state;	/* state of the slot. see above */
    294 	uint_t		rm_ref;		/* cnt of external references to slot */
    295 	kmutex_t	rm_lock;	/* mutex protecting above fields */
    296 	kcondvar_t	rm_cwait;	/* condition for closing */
    297 	zoneid_t	rm_zoneid;	/* zone which pushed rpcmod */
    298 };
    299 
    300 struct temp_slot {
    301 	void *cell;
    302 	struct xprt_style_ops *ops;
    303 	int type;
    304 	mblk_t *info_ack;
    305 	kmutex_t lock;
    306 	kcondvar_t wait;
    307 };
    308 
    309 typedef struct mir_s {
    310 	void	*mir_krpc_cell;	/* Reserved for KRPC use. This field */
    311 					/* must be first in the structure. */
    312 	struct xprt_style_ops	*rm_ops;
    313 	int	mir_type;		/* Client or server side stream */
    314 
    315 	mblk_t	*mir_head_mp;		/* RPC msg in progress */
    316 		/*
    317 		 * mir_head_mp points the first mblk being collected in
    318 		 * the current RPC message.  Record headers are removed
    319 		 * before data is linked into mir_head_mp.
    320 		 */
    321 	mblk_t	*mir_tail_mp;		/* Last mblk in mir_head_mp */
    322 		/*
    323 		 * mir_tail_mp points to the last mblk in the message
    324 		 * chain starting at mir_head_mp.  It is only valid
    325 		 * if mir_head_mp is non-NULL and is used to add new
    326 		 * data blocks to the end of chain quickly.
    327 		 */
    328 
    329 	int32_t	mir_frag_len;		/* Bytes seen in the current frag */
    330 		/*
    331 		 * mir_frag_len starts at -4 for beginning of each fragment.
    332 		 * When this length is negative, it indicates the number of
    333 		 * bytes that rpcmod needs to complete the record marker
    334 		 * header.  When it is positive or zero, it holds the number
    335 		 * of bytes that have arrived for the current fragment and
    336 		 * are held in mir_header_mp.
    337 		 */
    338 
    339 	int32_t	mir_frag_header;
    340 		/*
    341 		 * Fragment header as collected for the current fragment.
    342 		 * It holds the last-fragment indicator and the number
    343 		 * of bytes in the fragment.
    344 		 */
    345 
    346 	unsigned int
    347 		mir_ordrel_pending : 1,	/* Sent T_ORDREL_REQ */
    348 		mir_hold_inbound : 1,	/* Hold inbound messages on server */
    349 					/* side until outbound flow control */
    350 					/* is relieved. */
    351 		mir_closing : 1,	/* The stream is being closed */
    352 		mir_inrservice : 1,	/* data queued or rd srv proc running */
    353 		mir_inwservice : 1,	/* data queued or wr srv proc running */
    354 		mir_inwflushdata : 1,	/* flush M_DATAs when srv runs */
    355 		/*
    356 		 * On client streams, mir_clntreq is 0 or 1; it is set
    357 		 * to 1 whenever a new request is sent out (mir_wput)
    358 		 * and cleared when the timer fires (mir_timer).  If
    359 		 * the timer fires with this value equal to 0, then the
    360 		 * stream is considered idle and KRPC is notified.
    361 		 */
    362 		mir_clntreq : 1,
    363 		/*
    364 		 * On server streams, stop accepting messages
    365 		 */
    366 		mir_svc_no_more_msgs : 1,
    367 		mir_listen_stream : 1,	/* listen end point */
    368 		mir_unused : 1,	/* no longer used */
    369 		mir_timer_call : 1,
    370 		mir_junk_fill_thru_bit_31 : 21;
    371 
    372 	int	mir_setup_complete;	/* server has initialized everything */
    373 	timeout_id_t mir_timer_id;	/* Timer for idle checks */
    374 	clock_t	mir_idle_timeout;	/* Allowed idle time before shutdown */
    375 		/*
    376 		 * This value is copied from clnt_idle_timeout or
    377 		 * svc_idle_timeout during the appropriate ioctl.
    378 		 * Kept in milliseconds
    379 		 */
    380 	clock_t	mir_use_timestamp;	/* updated on client with each use */
    381 		/*
    382 		 * This value is set to lbolt
    383 		 * every time a client stream sends or receives data.
    384 		 * Even if the timer message arrives, we don't shutdown
    385 		 * client unless:
    386 		 *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
    387 		 * This value is kept in HZ.
    388 		 */
    389 
    390 	uint_t	*mir_max_msg_sizep;	/* Reference to sanity check size */
    391 		/*
    392 		 * This pointer is set to &clnt_max_msg_size or
    393 		 * &svc_max_msg_size during the appropriate ioctl.
    394 		 */
    395 	zoneid_t mir_zoneid;	/* zone which pushed rpcmod */
    396 	/* Server-side fields. */
    397 	int	mir_ref_cnt;		/* Reference count: server side only */
    398 					/* counts the number of references */
    399 					/* that a kernel RPC server thread */
    400 					/* (see svc_run()) has on this rpcmod */
    401 					/* slot. Effectively, it is the */
    402 					/* number * of unprocessed messages */
    403 					/* that have been passed up to the */
    404 					/* KRPC layer */
    405 
    406 	mblk_t	*mir_svc_pend_mp;	/* Pending T_ORDREL_IND or */
    407 					/* T_DISCON_IND */
    408 
    409 	/*
    410 	 * these fields are for both client and server, but for debugging,
    411 	 * it is easier to have these last in the structure.
    412 	 */
    413 	kmutex_t	mir_mutex;	/* Mutex and condvar for close */
    414 	kcondvar_t	mir_condvar;	/* synchronization. */
    415 	kcondvar_t	mir_timer_cv;	/* Timer routine sync. */
    416 } mir_t;
    417 
    418 void tmp_rput(queue_t *q, mblk_t *mp);
    419 
    420 struct xprt_style_ops tmpops = {
    421 	NULL,
    422 	NULL,
    423 	putnext,
    424 	NULL,
    425 	tmp_rput,
    426 	NULL
    427 };
    428 
    429 void
    430 tmp_rput(queue_t *q, mblk_t *mp)
    431 {
    432 	struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
    433 	struct T_info_ack *pptr;
    434 
    435 	switch (mp->b_datap->db_type) {
    436 	case M_PCPROTO:
    437 		pptr = (struct T_info_ack *)mp->b_rptr;
    438 		switch (pptr->PRIM_type) {
    439 		case T_INFO_ACK:
    440 			mutex_enter(&t->lock);
    441 			t->info_ack = mp;
    442 			cv_signal(&t->wait);
    443 			mutex_exit(&t->lock);
    444 			return;
    445 		default:
    446 			break;
    447 		}
    448 	default:
    449 		break;
    450 	}
    451 
    452 	/*
    453 	 * Not an info-ack, so free it. This is ok because we should
    454 	 * not be receiving data until the open finishes: rpcmod
    455 	 * is pushed well before the end-point is bound to an address.
    456 	 */
    457 	freemsg(mp);
    458 }
    459 
    460 int
    461 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
    462 {
    463 	mblk_t *bp;
    464 	struct temp_slot ts, *t;
    465 	struct T_info_ack *pptr;
    466 	int error = 0;
    467 
    468 	ASSERT(q != NULL);
    469 	/*
    470 	 * Check for re-opens.
    471 	 */
    472 	if (q->q_ptr) {
    473 		TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
    474 		    "rpcmodopen_end:(%s)", "q->qptr");
    475 		return (0);
    476 	}
    477 
    478 	t = &ts;
    479 	bzero(t, sizeof (*t));
    480 	q->q_ptr = (void *)t;
    481 	WR(q)->q_ptr = (void *)t;
    482 
    483 	/*
    484 	 * Allocate the required messages upfront.
    485 	 */
    486 	if ((bp = allocb_cred(sizeof (struct T_info_req) +
    487 	    sizeof (struct T_info_ack), crp, curproc->p_pid)) == NULL) {
    488 		return (ENOBUFS);
    489 	}
    490 
    491 	mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
    492 	cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
    493 
    494 	t->ops = &tmpops;
    495 
    496 	qprocson(q);
    497 	bp->b_datap->db_type = M_PCPROTO;
    498 	*(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
    499 	bp->b_wptr += sizeof (struct T_info_req);
    500 	putnext(WR(q), bp);
    501 
    502 	mutex_enter(&t->lock);
    503 	while (t->info_ack == NULL) {
    504 		if (cv_wait_sig(&t->wait, &t->lock) == 0) {
    505 			error = EINTR;
    506 			break;
    507 		}
    508 	}
    509 	mutex_exit(&t->lock);
    510 
    511 	if (error)
    512 		goto out;
    513 
    514 	pptr = (struct T_info_ack *)t->info_ack->b_rptr;
    515 
    516 	if (pptr->SERV_type == T_CLTS) {
    517 		if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
    518 			((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
    519 	} else {
    520 		if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
    521 			((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
    522 	}
    523 
    524 out:
    525 	if (error)
    526 		qprocsoff(q);
    527 
    528 	freemsg(t->info_ack);
    529 	mutex_destroy(&t->lock);
    530 	cv_destroy(&t->wait);
    531 
    532 	return (error);
    533 }
    534 
    535 void
    536 rmm_rput(queue_t *q, mblk_t  *mp)
    537 {
    538 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
    539 }
    540 
    541 void
    542 rmm_rsrv(queue_t *q)
    543 {
    544 	(*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
    545 }
    546 
    547 void
    548 rmm_wput(queue_t *q, mblk_t *mp)
    549 {
    550 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
    551 }
    552 
    553 void
    554 rmm_wsrv(queue_t *q)
    555 {
    556 	(*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
    557 }
    558 
    559 int
    560 rmm_close(queue_t *q, int flag, cred_t *crp)
    561 {
    562 	return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
    563 }
    564 
    565 static void rpcmod_release(queue_t *, mblk_t *);
    566 /*
    567  * rpcmodopen -	open routine gets called when the module gets pushed
    568  *		onto the stream.
    569  */
    570 /*ARGSUSED*/
    571 int
    572 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
    573 {
    574 	struct rpcm *rmp;
    575 
    576 	extern void (*rpc_rele)(queue_t *, mblk_t *);
    577 
    578 	TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
    579 
    580 	/*
    581 	 * Initialize entry points to release a rpcmod slot (and an input
    582 	 * message if supplied) and to send an output message to the module
    583 	 * below rpcmod.
    584 	 */
    585 	if (rpc_rele == NULL)
    586 		rpc_rele = rpcmod_release;
    587 
    588 	/*
    589 	 * Only sufficiently privileged users can use this module, and it
    590 	 * is assumed that they will use this module properly, and NOT send
    591 	 * bulk data from downstream.
    592 	 */
    593 	if (secpolicy_rpcmod_open(crp) != 0)
    594 		return (EPERM);
    595 
    596 	/*
    597 	 * Allocate slot data structure.
    598 	 */
    599 	rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
    600 
    601 	mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
    602 	cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
    603 	rmp->rm_zoneid = rpc_zoneid();
    604 	/*
    605 	 * slot type will be set by kRPC client and server ioctl's
    606 	 */
    607 	rmp->rm_type = 0;
    608 
    609 	q->q_ptr = (void *)rmp;
    610 	WR(q)->q_ptr = (void *)rmp;
    611 
    612 	TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
    613 	return (0);
    614 }
    615 
    616 /*
    617  * rpcmodclose - This routine gets called when the module gets popped
    618  * off of the stream.
    619  */
    620 /*ARGSUSED*/
    621 int
    622 rpcmodclose(queue_t *q, int flag, cred_t *crp)
    623 {
    624 	struct rpcm *rmp;
    625 
    626 	ASSERT(q != NULL);
    627 	rmp = (struct rpcm *)q->q_ptr;
    628 
    629 	/*
    630 	 * Mark our state as closing.
    631 	 */
    632 	mutex_enter(&rmp->rm_lock);
    633 	rmp->rm_state |= RM_CLOSING;
    634 
    635 	/*
    636 	 * Check and see if there are any messages on the queue.  If so, send
    637 	 * the messages, regardless whether the downstream module is ready to
    638 	 * accept data.
    639 	 */
    640 	if (rmp->rm_type == RPC_SERVER) {
    641 		flushq(q, FLUSHDATA);
    642 
    643 		qenable(WR(q));
    644 
    645 		if (rmp->rm_ref) {
    646 			mutex_exit(&rmp->rm_lock);
    647 			/*
    648 			 * call into SVC to clean the queue
    649 			 */
    650 			svc_queueclean(q);
    651 			mutex_enter(&rmp->rm_lock);
    652 
    653 			/*
    654 			 * Block while there are kRPC threads with a reference
    655 			 * to this message.
    656 			 */
    657 			while (rmp->rm_ref)
    658 				cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
    659 		}
    660 
    661 		mutex_exit(&rmp->rm_lock);
    662 
    663 		/*
    664 		 * It is now safe to remove this queue from the stream. No kRPC
    665 		 * threads have a reference to the stream, and none ever will,
    666 		 * because RM_CLOSING is set.
    667 		 */
    668 		qprocsoff(q);
    669 
    670 		/* Notify kRPC that this stream is going away. */
    671 		svc_queueclose(q);
    672 	} else {
    673 		mutex_exit(&rmp->rm_lock);
    674 		qprocsoff(q);
    675 	}
    676 
    677 	q->q_ptr = NULL;
    678 	WR(q)->q_ptr = NULL;
    679 	mutex_destroy(&rmp->rm_lock);
    680 	cv_destroy(&rmp->rm_cwait);
    681 	kmem_free(rmp, sizeof (*rmp));
    682 	return (0);
    683 }
    684 
    685 #ifdef	DEBUG
    686 int	rpcmod_send_msg_up = 0;
    687 int	rpcmod_send_uderr = 0;
    688 int	rpcmod_send_dup = 0;
    689 int	rpcmod_send_dup_cnt = 0;
    690 #endif
    691 
    692 /*
    693  * rpcmodrput -	Module read put procedure.  This is called from
    694  *		the module, driver, or stream head downstream.
    695  */
    696 void
    697 rpcmodrput(queue_t *q, mblk_t *mp)
    698 {
    699 	struct rpcm *rmp;
    700 	union T_primitives *pptr;
    701 	int hdrsz;
    702 
    703 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
    704 
    705 	ASSERT(q != NULL);
    706 	rmp = (struct rpcm *)q->q_ptr;
    707 
    708 	if (rmp->rm_type == 0) {
    709 		freemsg(mp);
    710 		return;
    711 	}
    712 
    713 #ifdef DEBUG
    714 	if (rpcmod_send_msg_up > 0) {
    715 		mblk_t *nmp = copymsg(mp);
    716 		if (nmp) {
    717 			putnext(q, nmp);
    718 			rpcmod_send_msg_up--;
    719 		}
    720 	}
    721 	if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
    722 		mblk_t *nmp;
    723 		struct T_unitdata_ind *data;
    724 		struct T_uderror_ind *ud;
    725 		int d;
    726 		data = (struct T_unitdata_ind *)mp->b_rptr;
    727 		if (data->PRIM_type == T_UNITDATA_IND) {
    728 			d = sizeof (*ud) - sizeof (*data);
    729 			nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
    730 			if (nmp) {
    731 				ud = (struct T_uderror_ind *)nmp->b_rptr;
    732 				ud->PRIM_type = T_UDERROR_IND;
    733 				ud->DEST_length = data->SRC_length;
    734 				ud->DEST_offset = data->SRC_offset + d;
    735 				ud->OPT_length = data->OPT_length;
    736 				ud->OPT_offset = data->OPT_offset + d;
    737 				ud->ERROR_type = ENETDOWN;
    738 				if (data->SRC_length) {
    739 					bcopy(mp->b_rptr +
    740 					    data->SRC_offset,
    741 					    nmp->b_rptr +
    742 					    ud->DEST_offset,
    743 					    data->SRC_length);
    744 				}
    745 				if (data->OPT_length) {
    746 					bcopy(mp->b_rptr +
    747 					    data->OPT_offset,
    748 					    nmp->b_rptr +
    749 					    ud->OPT_offset,
    750 					    data->OPT_length);
    751 				}
    752 				nmp->b_wptr += d;
    753 				nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
    754 				nmp->b_datap->db_type = M_PROTO;
    755 				putnext(q, nmp);
    756 				rpcmod_send_uderr--;
    757 			}
    758 		}
    759 	}
    760 #endif
    761 	switch (mp->b_datap->db_type) {
    762 	default:
    763 		putnext(q, mp);
    764 		break;
    765 
    766 	case M_PROTO:
    767 	case M_PCPROTO:
    768 		ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
    769 		pptr = (union T_primitives *)mp->b_rptr;
    770 
    771 		/*
    772 		 * Forward this message to krpc if it is data.
    773 		 */
    774 		if (pptr->type == T_UNITDATA_IND) {
    775 			mblk_t *nmp;
    776 
    777 		/*
    778 		 * Check if the module is being popped.
    779 		 */
    780 			mutex_enter(&rmp->rm_lock);
    781 			if (rmp->rm_state & RM_CLOSING) {
    782 				mutex_exit(&rmp->rm_lock);
    783 				putnext(q, mp);
    784 				break;
    785 			}
    786 
    787 			switch (rmp->rm_type) {
    788 			case RPC_CLIENT:
    789 				mutex_exit(&rmp->rm_lock);
    790 				hdrsz = mp->b_wptr - mp->b_rptr;
    791 
    792 				/*
    793 				 * Make sure the header is sane.
    794 				 */
    795 				if (hdrsz < TUNITDATAINDSZ ||
    796 				    hdrsz < (pptr->unitdata_ind.OPT_length +
    797 				    pptr->unitdata_ind.OPT_offset) ||
    798 				    hdrsz < (pptr->unitdata_ind.SRC_length +
    799 				    pptr->unitdata_ind.SRC_offset)) {
    800 					freemsg(mp);
    801 					return;
    802 				}
    803 
    804 				/*
    805 				 * Call clnt_clts_dispatch_notify, so that it
    806 				 * can pass the message to the proper caller.
    807 				 * Don't discard the header just yet since the
    808 				 * client may need the sender's address.
    809 				 */
    810 				clnt_clts_dispatch_notify(mp, hdrsz,
    811 				    rmp->rm_zoneid);
    812 				return;
    813 			case RPC_SERVER:
    814 				/*
    815 				 * rm_krpc_cell is exclusively used by the kRPC
    816 				 * CLTS server
    817 				 */
    818 				if (rmp->rm_krpc_cell) {
    819 #ifdef DEBUG
    820 					/*
    821 					 * Test duplicate request cache and
    822 					 * rm_ref count handling by sending a
    823 					 * duplicate every so often, if
    824 					 * desired.
    825 					 */
    826 					if (rpcmod_send_dup &&
    827 					    rpcmod_send_dup_cnt++ %
    828 					    rpcmod_send_dup)
    829 						nmp = copymsg(mp);
    830 					else
    831 						nmp = NULL;
    832 #endif
    833 					/*
    834 					 * Raise the reference count on this
    835 					 * module to prevent it from being
    836 					 * popped before krpc generates the
    837 					 * reply.
    838 					 */
    839 					rmp->rm_ref++;
    840 					mutex_exit(&rmp->rm_lock);
    841 
    842 					/*
    843 					 * Submit the message to krpc.
    844 					 */
    845 					svc_queuereq(q, mp);
    846 #ifdef DEBUG
    847 					/*
    848 					 * Send duplicate if we created one.
    849 					 */
    850 					if (nmp) {
    851 						mutex_enter(&rmp->rm_lock);
    852 						rmp->rm_ref++;
    853 						mutex_exit(&rmp->rm_lock);
    854 						svc_queuereq(q, nmp);
    855 					}
    856 #endif
    857 				} else {
    858 					mutex_exit(&rmp->rm_lock);
    859 					freemsg(mp);
    860 				}
    861 				return;
    862 			default:
    863 				mutex_exit(&rmp->rm_lock);
    864 				freemsg(mp);
    865 				return;
    866 			} /* end switch(rmp->rm_type) */
    867 		} else if (pptr->type == T_UDERROR_IND) {
    868 			mutex_enter(&rmp->rm_lock);
    869 			hdrsz = mp->b_wptr - mp->b_rptr;
    870 
    871 			/*
    872 			 * Make sure the header is sane
    873 			 */
    874 			if (hdrsz < TUDERRORINDSZ ||
    875 			    hdrsz < (pptr->uderror_ind.OPT_length +
    876 			    pptr->uderror_ind.OPT_offset) ||
    877 			    hdrsz < (pptr->uderror_ind.DEST_length +
    878 			    pptr->uderror_ind.DEST_offset)) {
    879 				mutex_exit(&rmp->rm_lock);
    880 				freemsg(mp);
    881 				return;
    882 			}
    883 
    884 			/*
    885 			 * In the case where a unit data error has been
    886 			 * received, all we need to do is clear the message from
    887 			 * the queue.
    888 			 */
    889 			mutex_exit(&rmp->rm_lock);
    890 			freemsg(mp);
    891 			RPCLOG(32, "rpcmodrput: unitdata error received at "
    892 			    "%ld\n", gethrestime_sec());
    893 			return;
    894 		} /* end else if (pptr->type == T_UDERROR_IND) */
    895 
    896 		putnext(q, mp);
    897 		break;
    898 	} /* end switch (mp->b_datap->db_type) */
    899 
    900 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
    901 	    "rpcmodrput_end:");
    902 	/*
    903 	 * Return codes are not looked at by the STREAMS framework.
    904 	 */
    905 }
    906 
    907 /*
    908  * write put procedure
    909  */
    910 void
    911 rpcmodwput(queue_t *q, mblk_t *mp)
    912 {
    913 	struct rpcm	*rmp;
    914 
    915 	ASSERT(q != NULL);
    916 
    917 	switch (mp->b_datap->db_type) {
    918 		case M_PROTO:
    919 		case M_PCPROTO:
    920 			break;
    921 		default:
    922 			rpcmodwput_other(q, mp);
    923 			return;
    924 	}
    925 
    926 	/*
    927 	 * Check to see if we can send the message downstream.
    928 	 */
    929 	if (canputnext(q)) {
    930 		putnext(q, mp);
    931 		return;
    932 	}
    933 
    934 	rmp = (struct rpcm *)q->q_ptr;
    935 	ASSERT(rmp != NULL);
    936 
    937 	/*
    938 	 * The first canputnext failed.  Try again except this time with the
    939 	 * lock held, so that we can check the state of the stream to see if
    940 	 * it is closing.  If either of these conditions evaluate to true
    941 	 * then send the meesage.
    942 	 */
    943 	mutex_enter(&rmp->rm_lock);
    944 	if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
    945 		mutex_exit(&rmp->rm_lock);
    946 		putnext(q, mp);
    947 	} else {
    948 		/*
    949 		 * canputnext failed again and the stream is not closing.
    950 		 * Place the message on the queue and let the service
    951 		 * procedure handle the message.
    952 		 */
    953 		mutex_exit(&rmp->rm_lock);
    954 		(void) putq(q, mp);
    955 	}
    956 }
    957 
    958 static void
    959 rpcmodwput_other(queue_t *q, mblk_t *mp)
    960 {
    961 	struct rpcm	*rmp;
    962 	struct iocblk	*iocp;
    963 
    964 	rmp = (struct rpcm *)q->q_ptr;
    965 	ASSERT(rmp != NULL);
    966 
    967 	switch (mp->b_datap->db_type) {
    968 		case M_IOCTL:
    969 			iocp = (struct iocblk *)mp->b_rptr;
    970 			ASSERT(iocp != NULL);
    971 			switch (iocp->ioc_cmd) {
    972 				case RPC_CLIENT:
    973 				case RPC_SERVER:
    974 					mutex_enter(&rmp->rm_lock);
    975 					rmp->rm_type = iocp->ioc_cmd;
    976 					mutex_exit(&rmp->rm_lock);
    977 					mp->b_datap->db_type = M_IOCACK;
    978 					qreply(q, mp);
    979 					return;
    980 				default:
    981 				/*
    982 				 * pass the ioctl downstream and hope someone
    983 				 * down there knows how to handle it.
    984 				 */
    985 					putnext(q, mp);
    986 					return;
    987 			}
    988 		default:
    989 			break;
    990 	}
    991 	/*
    992 	 * This is something we definitely do not know how to handle, just
    993 	 * pass the message downstream
    994 	 */
    995 	putnext(q, mp);
    996 }
    997 
    998 /*
    999  * Module write service procedure. This is called by downstream modules
   1000  * for back enabling during flow control.
   1001  */
   1002 void
   1003 rpcmodwsrv(queue_t *q)
   1004 {
   1005 	struct rpcm	*rmp;
   1006 	mblk_t		*mp = NULL;
   1007 
   1008 	rmp = (struct rpcm *)q->q_ptr;
   1009 	ASSERT(rmp != NULL);
   1010 
   1011 	/*
   1012 	 * Get messages that may be queued and send them down stream
   1013 	 */
   1014 	while ((mp = getq(q)) != NULL) {
   1015 		/*
   1016 		 * Optimize the service procedure for the server-side, by
   1017 		 * avoiding a call to canputnext().
   1018 		 */
   1019 		if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
   1020 			putnext(q, mp);
   1021 			continue;
   1022 		}
   1023 		(void) putbq(q, mp);
   1024 		return;
   1025 	}
   1026 }
   1027 
   1028 static void
   1029 rpcmod_release(queue_t *q, mblk_t *bp)
   1030 {
   1031 	struct rpcm *rmp;
   1032 
   1033 	/*
   1034 	 * For now, just free the message.
   1035 	 */
   1036 	if (bp)
   1037 		freemsg(bp);
   1038 	rmp = (struct rpcm *)q->q_ptr;
   1039 
   1040 	mutex_enter(&rmp->rm_lock);
   1041 	rmp->rm_ref--;
   1042 
   1043 	if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
   1044 		cv_broadcast(&rmp->rm_cwait);
   1045 	}
   1046 
   1047 	mutex_exit(&rmp->rm_lock);
   1048 }
   1049 
   1050 /*
   1051  * This part of rpcmod is pushed on a connection-oriented transport for use
   1052  * by RPC.  It serves to bypass the Stream head, implements
   1053  * the record marking protocol, and dispatches incoming RPC messages.
   1054  */
   1055 
   1056 /* Default idle timer values */
   1057 #define	MIR_CLNT_IDLE_TIMEOUT	(5 * (60 * 1000L))	/* 5 minutes */
   1058 #define	MIR_SVC_IDLE_TIMEOUT	(6 * (60 * 1000L))	/* 6 minutes */
   1059 #define	MIR_SVC_ORDREL_TIMEOUT	(10 * (60 * 1000L))	/* 10 minutes */
   1060 #define	MIR_LASTFRAG	0x80000000	/* Record marker */
   1061 
   1062 #define	MIR_SVC_QUIESCED(mir)	\
   1063 	(mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
   1064 
   1065 #define	MIR_CLEAR_INRSRV(mir_ptr)	{	\
   1066 	(mir_ptr)->mir_inrservice = 0;	\
   1067 	if ((mir_ptr)->mir_type == RPC_SERVER &&	\
   1068 		(mir_ptr)->mir_closing)	\
   1069 		cv_signal(&(mir_ptr)->mir_condvar);	\
   1070 }
   1071 
   1072 /*
   1073  * Don't block service procedure (and mir_close) if
   1074  * we are in the process of closing.
   1075  */
   1076 #define	MIR_WCANPUTNEXT(mir_ptr, write_q)	\
   1077 	(canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
   1078 
   1079 static int	mir_clnt_dup_request(queue_t *q, mblk_t *mp);
   1080 static void	mir_rput_proto(queue_t *q, mblk_t *mp);
   1081 static int	mir_svc_policy_notify(queue_t *q, int event);
   1082 static void	mir_svc_release(queue_t *wq, mblk_t *mp);
   1083 static void	mir_svc_start(queue_t *wq);
   1084 static void	mir_svc_idle_start(queue_t *, mir_t *);
   1085 static void	mir_svc_idle_stop(queue_t *, mir_t *);
   1086 static void	mir_svc_start_close(queue_t *, mir_t *);
   1087 static void	mir_clnt_idle_do_stop(queue_t *);
   1088 static void	mir_clnt_idle_stop(queue_t *, mir_t *);
   1089 static void	mir_clnt_idle_start(queue_t *, mir_t *);
   1090 static void	mir_wput(queue_t *q, mblk_t *mp);
   1091 static void	mir_wput_other(queue_t *q, mblk_t *mp);
   1092 static void	mir_wsrv(queue_t *q);
   1093 static	void	mir_disconnect(queue_t *, mir_t *ir);
   1094 static	int	mir_check_len(queue_t *, int32_t, mblk_t *);
   1095 static	void	mir_timer(void *);
   1096 
   1097 extern void	(*mir_rele)(queue_t *, mblk_t *);
   1098 extern void	(*mir_start)(queue_t *);
   1099 extern void	(*clnt_stop_idle)(queue_t *);
   1100 
   1101 clock_t	clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
   1102 clock_t	svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
   1103 
   1104 /*
   1105  * Timeout for subsequent notifications of idle connection.  This is
   1106  * typically used to clean up after a wedged orderly release.
   1107  */
   1108 clock_t	svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
   1109 
   1110 extern	uint_t	*clnt_max_msg_sizep;
   1111 extern	uint_t	*svc_max_msg_sizep;
   1112 uint_t	clnt_max_msg_size = RPC_MAXDATASIZE;
   1113 uint_t	svc_max_msg_size = RPC_MAXDATASIZE;
   1114 uint_t	mir_krpc_cell_null;
   1115 
   1116 static void
   1117 mir_timer_stop(mir_t *mir)
   1118 {
   1119 	timeout_id_t tid;
   1120 
   1121 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1122 
   1123 	/*
   1124 	 * Since the mir_mutex lock needs to be released to call
   1125 	 * untimeout(), we need to make sure that no other thread
   1126 	 * can start/stop the timer (changing mir_timer_id) during
   1127 	 * that time.  The mir_timer_call bit and the mir_timer_cv
   1128 	 * condition variable are used to synchronize this.  Setting
   1129 	 * mir_timer_call also tells mir_timer() (refer to the comments
   1130 	 * in mir_timer()) that it does not need to do anything.
   1131 	 */
   1132 	while (mir->mir_timer_call)
   1133 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
   1134 	mir->mir_timer_call = B_TRUE;
   1135 
   1136 	if ((tid = mir->mir_timer_id) != 0) {
   1137 		mir->mir_timer_id = 0;
   1138 		mutex_exit(&mir->mir_mutex);
   1139 		(void) untimeout(tid);
   1140 		mutex_enter(&mir->mir_mutex);
   1141 	}
   1142 	mir->mir_timer_call = B_FALSE;
   1143 	cv_broadcast(&mir->mir_timer_cv);
   1144 }
   1145 
   1146 static void
   1147 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
   1148 {
   1149 	timeout_id_t tid;
   1150 
   1151 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1152 
   1153 	while (mir->mir_timer_call)
   1154 		cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
   1155 	mir->mir_timer_call = B_TRUE;
   1156 
   1157 	if ((tid = mir->mir_timer_id) != 0) {
   1158 		mutex_exit(&mir->mir_mutex);
   1159 		(void) untimeout(tid);
   1160 		mutex_enter(&mir->mir_mutex);
   1161 	}
   1162 	/* Only start the timer when it is not closing. */
   1163 	if (!mir->mir_closing) {
   1164 		mir->mir_timer_id = timeout(mir_timer, q,
   1165 		    MSEC_TO_TICK(intrvl));
   1166 	}
   1167 	mir->mir_timer_call = B_FALSE;
   1168 	cv_broadcast(&mir->mir_timer_cv);
   1169 }
   1170 
   1171 static int
   1172 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
   1173 {
   1174 	mblk_t  *mp1;
   1175 	uint32_t  new_xid;
   1176 	uint32_t  old_xid;
   1177 
   1178 	ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
   1179 	new_xid = BE32_TO_U32(&mp->b_rptr[4]);
   1180 	/*
   1181 	 * This loop is a bit tacky -- it walks the STREAMS list of
   1182 	 * flow-controlled messages.
   1183 	 */
   1184 	if ((mp1 = q->q_first) != NULL) {
   1185 		do {
   1186 			old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
   1187 			if (new_xid == old_xid)
   1188 				return (1);
   1189 		} while ((mp1 = mp1->b_next) != NULL);
   1190 	}
   1191 	return (0);
   1192 }
   1193 
   1194 static int
   1195 mir_close(queue_t *q)
   1196 {
   1197 	mir_t	*mir = q->q_ptr;
   1198 	mblk_t	*mp;
   1199 	bool_t queue_cleaned = FALSE;
   1200 
   1201 	RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
   1202 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   1203 	mutex_enter(&mir->mir_mutex);
   1204 	if ((mp = mir->mir_head_mp) != NULL) {
   1205 		mir->mir_head_mp = NULL;
   1206 		mir->mir_tail_mp = NULL;
   1207 		freemsg(mp);
   1208 	}
   1209 	/*
   1210 	 * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
   1211 	 * is TRUE.  And mir_timer_start() won't start the timer again.
   1212 	 */
   1213 	mir->mir_closing = B_TRUE;
   1214 	mir_timer_stop(mir);
   1215 
   1216 	if (mir->mir_type == RPC_SERVER) {
   1217 		flushq(q, FLUSHDATA);	/* Ditch anything waiting on read q */
   1218 
   1219 		/*
   1220 		 * This will prevent more requests from arriving and
   1221 		 * will force rpcmod to ignore flow control.
   1222 		 */
   1223 		mir_svc_start_close(WR(q), mir);
   1224 
   1225 		while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
   1226 
   1227 			if (mir->mir_ref_cnt && !mir->mir_inrservice &&
   1228 			    (queue_cleaned == FALSE)) {
   1229 				/*
   1230 				 * call into SVC to clean the queue
   1231 				 */
   1232 				mutex_exit(&mir->mir_mutex);
   1233 				svc_queueclean(q);
   1234 				queue_cleaned = TRUE;
   1235 				mutex_enter(&mir->mir_mutex);
   1236 				continue;
   1237 			}
   1238 
   1239 			/*
   1240 			 * Bugid 1253810 - Force the write service
   1241 			 * procedure to send its messages, regardless
   1242 			 * whether the downstream  module is ready
   1243 			 * to accept data.
   1244 			 */
   1245 			if (mir->mir_inwservice == 1)
   1246 				qenable(WR(q));
   1247 
   1248 			cv_wait(&mir->mir_condvar, &mir->mir_mutex);
   1249 		}
   1250 
   1251 		mutex_exit(&mir->mir_mutex);
   1252 		qprocsoff(q);
   1253 
   1254 		/* Notify KRPC that this stream is going away. */
   1255 		svc_queueclose(q);
   1256 	} else {
   1257 		mutex_exit(&mir->mir_mutex);
   1258 		qprocsoff(q);
   1259 	}
   1260 
   1261 	mutex_destroy(&mir->mir_mutex);
   1262 	cv_destroy(&mir->mir_condvar);
   1263 	cv_destroy(&mir->mir_timer_cv);
   1264 	kmem_free(mir, sizeof (mir_t));
   1265 	return (0);
   1266 }
   1267 
   1268 /*
   1269  * This is server side only (RPC_SERVER).
   1270  *
   1271  * Exit idle mode.
   1272  */
   1273 static void
   1274 mir_svc_idle_stop(queue_t *q, mir_t *mir)
   1275 {
   1276 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1277 	ASSERT((q->q_flag & QREADR) == 0);
   1278 	ASSERT(mir->mir_type == RPC_SERVER);
   1279 	RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
   1280 
   1281 	mir_timer_stop(mir);
   1282 }
   1283 
   1284 /*
   1285  * This is server side only (RPC_SERVER).
   1286  *
   1287  * Start idle processing, which will include setting idle timer if the
   1288  * stream is not being closed.
   1289  */
   1290 static void
   1291 mir_svc_idle_start(queue_t *q, mir_t *mir)
   1292 {
   1293 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   1294 	ASSERT((q->q_flag & QREADR) == 0);
   1295 	ASSERT(mir->mir_type == RPC_SERVER);
   1296 	RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
   1297 
   1298 	/*
   1299 	 * Don't re-start idle timer if we are closing queues.
   1300 	 */
   1301 	if (mir->mir_closing) {
   1302 		RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
   1303 		    (void *)q);
   1304 
   1305 		/*
   1306 		 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
   1307 		 * is true.  When it is true, and we are in the process of
   1308 		 * closing the stream, signal any thread waiting in
   1309 		 * mir_close().
   1310 		 */
   1311 		if (mir->mir_inwservice == 0)
   1312 			cv_signal(&mir->mir_condvar);
   1313 
   1314 	} else {
   1315 		RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
   1316 		    mir->mir_ordrel_pending ? "ordrel" : "normal");
   1317 		/*
   1318 		 * Normal condition, start the idle timer.  If an orderly
   1319 		 * release has been sent, set the timeout to wait for the
   1320 		 * client to close its side of the connection.  Otherwise,
   1321 		 * use the normal idle timeout.
   1322 		 */
   1323 		mir_timer_start(q, mir, mir->mir_ordrel_pending ?
   1324 		    svc_ordrel_timeout : mir->mir_idle_timeout);
   1325 	}
   1326 }
   1327 
   1328 /* ARGSUSED */
   1329 static int
   1330 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
   1331 {
   1332 	mir_t	*mir;
   1333 
   1334 	RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
   1335 	/* Set variables used directly by KRPC. */
   1336 	if (!mir_rele)
   1337 		mir_rele = mir_svc_release;
   1338 	if (!mir_start)
   1339 		mir_start = mir_svc_start;
   1340 	if (!clnt_stop_idle)
   1341 		clnt_stop_idle = mir_clnt_idle_do_stop;
   1342 	if (!clnt_max_msg_sizep)
   1343 		clnt_max_msg_sizep = &clnt_max_msg_size;
   1344 	if (!svc_max_msg_sizep)
   1345 		svc_max_msg_sizep = &svc_max_msg_size;
   1346 
   1347 	/* Allocate a zero'ed out mir structure for this stream. */
   1348 	mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
   1349 
   1350 	/*
   1351 	 * We set hold inbound here so that incoming messages will
   1352 	 * be held on the read-side queue until the stream is completely
   1353 	 * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
   1354 	 * the ioctl processing, the flag is cleared and any messages that
   1355 	 * arrived between the open and the ioctl are delivered to KRPC.
   1356 	 *
   1357 	 * Early data should never arrive on a client stream since
   1358 	 * servers only respond to our requests and we do not send any.
   1359 	 * until after the stream is initialized.  Early data is
   1360 	 * very common on a server stream where the client will start
   1361 	 * sending data as soon as the connection is made (and this
   1362 	 * is especially true with TCP where the protocol accepts the
   1363 	 * connection before nfsd or KRPC is notified about it).
   1364 	 */
   1365 
   1366 	mir->mir_hold_inbound = 1;
   1367 
   1368 	/*
   1369 	 * Start the record marker looking for a 4-byte header.  When
   1370 	 * this length is negative, it indicates that rpcmod is looking
   1371 	 * for bytes to consume for the record marker header.  When it
   1372 	 * is positive, it holds the number of bytes that have arrived
   1373 	 * for the current fragment and are being held in mir_header_mp.
   1374 	 */
   1375 
   1376 	mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   1377 
   1378 	mir->mir_zoneid = rpc_zoneid();
   1379 	mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
   1380 	cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
   1381 	cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
   1382 
   1383 	q->q_ptr = (char *)mir;
   1384 	WR(q)->q_ptr = (char *)mir;
   1385 
   1386 	/*
   1387 	 * We noenable the read-side queue because we don't want it
   1388 	 * automatically enabled by putq.  We enable it explicitly
   1389 	 * in mir_wsrv when appropriate. (See additional comments on
   1390 	 * flow control at the beginning of mir_rsrv.)
   1391 	 */
   1392 	noenable(q);
   1393 
   1394 	qprocson(q);
   1395 	return (0);
   1396 }
   1397 
   1398 /*
   1399  * Read-side put routine for both the client and server side.  Does the
   1400  * record marking for incoming RPC messages, and when complete, dispatches
   1401  * the message to either the client or server.
   1402  */
   1403 static void
   1404 mir_rput(queue_t *q, mblk_t *mp)
   1405 {
   1406 	int	excess;
   1407 	int32_t	frag_len, frag_header;
   1408 	mblk_t	*cont_mp, *head_mp, *tail_mp, *mp1;
   1409 	mir_t	*mir = q->q_ptr;
   1410 	boolean_t stop_timer = B_FALSE;
   1411 
   1412 	ASSERT(mir != NULL);
   1413 
   1414 	/*
   1415 	 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
   1416 	 * with the corresponding ioctl, then don't accept
   1417 	 * any inbound data.  This should never happen for streams
   1418 	 * created by nfsd or client-side KRPC because they are careful
   1419 	 * to set the mode of the stream before doing anything else.
   1420 	 */
   1421 	if (mir->mir_type == 0) {
   1422 		freemsg(mp);
   1423 		return;
   1424 	}
   1425 
   1426 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   1427 
   1428 	switch (mp->b_datap->db_type) {
   1429 	case M_DATA:
   1430 		break;
   1431 	case M_PROTO:
   1432 	case M_PCPROTO:
   1433 		if (MBLKL(mp) < sizeof (t_scalar_t)) {
   1434 			RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
   1435 			    (int)MBLKL(mp));
   1436 			freemsg(mp);
   1437 			return;
   1438 		}
   1439 		if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
   1440 			mir_rput_proto(q, mp);
   1441 			return;
   1442 		}
   1443 
   1444 		/* Throw away the T_DATA_IND block and continue with data. */
   1445 		mp1 = mp;
   1446 		mp = mp->b_cont;
   1447 		freeb(mp1);
   1448 		break;
   1449 	case M_SETOPTS:
   1450 		/*
   1451 		 * If a module on the stream is trying set the Stream head's
   1452 		 * high water mark, then set our hiwater to the requested
   1453 		 * value.  We are the "stream head" for all inbound
   1454 		 * data messages since messages are passed directly to KRPC.
   1455 		 */
   1456 		if (MBLKL(mp) >= sizeof (struct stroptions)) {
   1457 			struct stroptions	*stropts;
   1458 
   1459 			stropts = (struct stroptions *)mp->b_rptr;
   1460 			if ((stropts->so_flags & SO_HIWAT) &&
   1461 			    !(stropts->so_flags & SO_BAND)) {
   1462 				(void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
   1463 			}
   1464 		}
   1465 		putnext(q, mp);
   1466 		return;
   1467 	case M_FLUSH:
   1468 		RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
   1469 		RPCLOG(32, "on q 0x%p\n", (void *)q);
   1470 		putnext(q, mp);
   1471 		return;
   1472 	default:
   1473 		putnext(q, mp);
   1474 		return;
   1475 	}
   1476 
   1477 	mutex_enter(&mir->mir_mutex);
   1478 
   1479 	/*
   1480 	 * If this connection is closing, don't accept any new messages.
   1481 	 */
   1482 	if (mir->mir_svc_no_more_msgs) {
   1483 		ASSERT(mir->mir_type == RPC_SERVER);
   1484 		mutex_exit(&mir->mir_mutex);
   1485 		freemsg(mp);
   1486 		return;
   1487 	}
   1488 
   1489 	/* Get local copies for quicker access. */
   1490 	frag_len = mir->mir_frag_len;
   1491 	frag_header = mir->mir_frag_header;
   1492 	head_mp = mir->mir_head_mp;
   1493 	tail_mp = mir->mir_tail_mp;
   1494 
   1495 	/* Loop, processing each message block in the mp chain separately. */
   1496 	do {
   1497 		cont_mp = mp->b_cont;
   1498 		mp->b_cont = NULL;
   1499 
   1500 		/*
   1501 		 * Drop zero-length mblks to prevent unbounded kernel memory
   1502 		 * consumption.
   1503 		 */
   1504 		if (MBLKL(mp) == 0) {
   1505 			freeb(mp);
   1506 			continue;
   1507 		}
   1508 
   1509 		/*
   1510 		 * If frag_len is negative, we're still in the process of
   1511 		 * building frag_header -- try to complete it with this mblk.
   1512 		 */
   1513 		while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
   1514 			frag_len++;
   1515 			frag_header <<= 8;
   1516 			frag_header += *mp->b_rptr++;
   1517 		}
   1518 
   1519 		if (MBLKL(mp) == 0 && frag_len < 0) {
   1520 			/*
   1521 			 * We consumed this mblk while trying to complete the
   1522 			 * fragment header.  Free it and move on.
   1523 			 */
   1524 			freeb(mp);
   1525 			continue;
   1526 		}
   1527 
   1528 		ASSERT(frag_len >= 0);
   1529 
   1530 		/*
   1531 		 * Now frag_header has the number of bytes in this fragment
   1532 		 * and we're just waiting to collect them all.  Chain our
   1533 		 * latest mblk onto the list and see if we now have enough
   1534 		 * bytes to complete the fragment.
   1535 		 */
   1536 		if (head_mp == NULL) {
   1537 			ASSERT(tail_mp == NULL);
   1538 			head_mp = tail_mp = mp;
   1539 		} else {
   1540 			tail_mp->b_cont = mp;
   1541 			tail_mp = mp;
   1542 		}
   1543 
   1544 		frag_len += MBLKL(mp);
   1545 		excess = frag_len - (frag_header & ~MIR_LASTFRAG);
   1546 		if (excess < 0) {
   1547 			/*
   1548 			 * We still haven't received enough data to complete
   1549 			 * the fragment, so continue on to the next mblk.
   1550 			 */
   1551 			continue;
   1552 		}
   1553 
   1554 		/*
   1555 		 * We've got a complete fragment.  If there are excess bytes,
   1556 		 * then they're part of the next fragment's header (of either
   1557 		 * this RPC message or the next RPC message).  Split that part
   1558 		 * into its own mblk so that we can safely freeb() it when
   1559 		 * building frag_header above.
   1560 		 */
   1561 		if (excess > 0) {
   1562 			if ((mp1 = dupb(mp)) == NULL &&
   1563 			    (mp1 = copyb(mp)) == NULL) {
   1564 				freemsg(head_mp);
   1565 				freemsg(cont_mp);
   1566 				RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
   1567 				mir->mir_frag_header = 0;
   1568 				mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   1569 				mir->mir_head_mp = NULL;
   1570 				mir->mir_tail_mp = NULL;
   1571 				mir_disconnect(q, mir);	/* drops mir_mutex */
   1572 				return;
   1573 			}
   1574 
   1575 			/*
   1576 			 * Relink the message chain so that the next mblk is
   1577 			 * the next fragment header, followed by the rest of
   1578 			 * the message chain.
   1579 			 */
   1580 			mp1->b_cont = cont_mp;
   1581 			cont_mp = mp1;
   1582 
   1583 			/*
   1584 			 * Data in the new mblk begins at the next fragment,
   1585 			 * and data in the old mblk ends at the next fragment.
   1586 			 */
   1587 			mp1->b_rptr = mp1->b_wptr - excess;
   1588 			mp->b_wptr -= excess;
   1589 		}
   1590 
   1591 		/*
   1592 		 * Reset frag_len and frag_header for the next fragment.
   1593 		 */
   1594 		frag_len = -(int32_t)sizeof (uint32_t);
   1595 		if (!(frag_header & MIR_LASTFRAG)) {
   1596 			/*
   1597 			 * The current fragment is complete, but more
   1598 			 * fragments need to be processed before we can
   1599 			 * pass along the RPC message headed at head_mp.
   1600 			 */
   1601 			frag_header = 0;
   1602 			continue;
   1603 		}
   1604 		frag_header = 0;
   1605 
   1606 		/*
   1607 		 * We've got a complete RPC message; pass it to the
   1608 		 * appropriate consumer.
   1609 		 */
   1610 		switch (mir->mir_type) {
   1611 		case RPC_CLIENT:
   1612 			if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) {
   1613 				/*
   1614 				 * Mark this stream as active.  This marker
   1615 				 * is used in mir_timer().
   1616 				 */
   1617 				mir->mir_clntreq = 1;
   1618 				mir->mir_use_timestamp = ddi_get_lbolt();
   1619 			} else {
   1620 				freemsg(head_mp);
   1621 			}
   1622 			break;
   1623 
   1624 		case RPC_SERVER:
   1625 			/*
   1626 			 * Check for flow control before passing the
   1627 			 * message to KRPC.
   1628 			 */
   1629 			if (!mir->mir_hold_inbound) {
   1630 				if (mir->mir_krpc_cell) {
   1631 					/*
   1632 					 * If the reference count is 0
   1633 					 * (not including this request),
   1634 					 * then the stream is transitioning
   1635 					 * from idle to non-idle.  In this case,
   1636 					 * we cancel the idle timer.
   1637 					 */
   1638 					if (mir->mir_ref_cnt++ == 0)
   1639 						stop_timer = B_TRUE;
   1640 					if (mir_check_len(q,
   1641 					    (int32_t)msgdsize(mp), mp))
   1642 						return;
   1643 					svc_queuereq(q, head_mp); /* to KRPC */
   1644 				} else {
   1645 					/*
   1646 					 * Count # of times this happens. Should
   1647 					 * be never, but experience shows
   1648 					 * otherwise.
   1649 					 */
   1650 					mir_krpc_cell_null++;
   1651 					freemsg(head_mp);
   1652 				}
   1653 			} else {
   1654 				/*
   1655 				 * If the outbound side of the stream is
   1656 				 * flow controlled, then hold this message
   1657 				 * until client catches up. mir_hold_inbound
   1658 				 * is set in mir_wput and cleared in mir_wsrv.
   1659 				 */
   1660 				(void) putq(q, head_mp);
   1661 				mir->mir_inrservice = B_TRUE;
   1662 			}
   1663 			break;
   1664 		default:
   1665 			RPCLOG(1, "mir_rput: unknown mir_type %d\n",
   1666 			    mir->mir_type);
   1667 			freemsg(head_mp);
   1668 			break;
   1669 		}
   1670 
   1671 		/*
   1672 		 * Reset the chain since we're starting on a new RPC message.
   1673 		 */
   1674 		head_mp = tail_mp = NULL;
   1675 	} while ((mp = cont_mp) != NULL);
   1676 
   1677 	/*
   1678 	 * Sanity check the message length; if it's too large mir_check_len()
   1679 	 * will shutdown the connection, drop mir_mutex, and return non-zero.
   1680 	 */
   1681 	if (head_mp != NULL && mir->mir_setup_complete &&
   1682 	    mir_check_len(q, frag_len, head_mp))
   1683 		return;
   1684 
   1685 	/* Save our local copies back in the mir structure. */
   1686 	mir->mir_frag_header = frag_header;
   1687 	mir->mir_frag_len = frag_len;
   1688 	mir->mir_head_mp = head_mp;
   1689 	mir->mir_tail_mp = tail_mp;
   1690 
   1691 	/*
   1692 	 * The timer is stopped after the whole message chain is processed.
   1693 	 * The reason is that stopping the timer releases the mir_mutex
   1694 	 * lock temporarily.  This means that the request can be serviced
   1695 	 * while we are still processing the message chain.  This is not
   1696 	 * good.  So we stop the timer here instead.
   1697 	 *
   1698 	 * Note that if the timer fires before we stop it, it will not
   1699 	 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
   1700 	 * will just return.
   1701 	 */
   1702 	if (stop_timer) {
   1703 		RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
   1704 		    "ref cnt going to non zero\n", (void *)WR(q));
   1705 		mir_svc_idle_stop(WR(q), mir);
   1706 	}
   1707 	mutex_exit(&mir->mir_mutex);
   1708 }
   1709 
   1710 static void
   1711 mir_rput_proto(queue_t *q, mblk_t *mp)
   1712 {
   1713 	mir_t	*mir = (mir_t *)q->q_ptr;
   1714 	uint32_t	type;
   1715 	uint32_t reason = 0;
   1716 
   1717 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   1718 
   1719 	type = ((union T_primitives *)mp->b_rptr)->type;
   1720 	switch (mir->mir_type) {
   1721 	case RPC_CLIENT:
   1722 		switch (type) {
   1723 		case T_DISCON_IND:
   1724 			reason = ((struct T_discon_ind *)
   1725 			    (mp->b_rptr))->DISCON_reason;
   1726 			/*FALLTHROUGH*/
   1727 		case T_ORDREL_IND:
   1728 			mutex_enter(&mir->mir_mutex);
   1729 			if (mir->mir_head_mp) {
   1730 				freemsg(mir->mir_head_mp);
   1731 				mir->mir_head_mp = (mblk_t *)0;
   1732 				mir->mir_tail_mp = (mblk_t *)0;
   1733 			}
   1734 			/*
   1735 			 * We are disconnecting, but not necessarily
   1736 			 * closing. By not closing, we will fail to
   1737 			 * pick up a possibly changed global timeout value,
   1738 			 * unless we store it now.
   1739 			 */
   1740 			mir->mir_idle_timeout = clnt_idle_timeout;
   1741 			mir_clnt_idle_stop(WR(q), mir);
   1742 
   1743 			/*
   1744 			 * Even though we are unconnected, we still
   1745 			 * leave the idle timer going on the client. The
   1746 			 * reason for is that if we've disconnected due
   1747 			 * to a server-side disconnect, reset, or connection
   1748 			 * timeout, there is a possibility the client may
   1749 			 * retry the RPC request. This retry needs to done on
   1750 			 * the same bound address for the server to interpret
   1751 			 * it as such. However, we don't want
   1752 			 * to wait forever for that possibility. If the
   1753 			 * end-point stays unconnected for mir_idle_timeout
   1754 			 * units of time, then that is a signal to the
   1755 			 * connection manager to give up waiting for the
   1756 			 * application (eg. NFS) to send a retry.
   1757 			 */
   1758 			mir_clnt_idle_start(WR(q), mir);
   1759 			mutex_exit(&mir->mir_mutex);
   1760 			clnt_dispatch_notifyall(WR(q), type, reason);
   1761 			freemsg(mp);
   1762 			return;
   1763 		case T_ERROR_ACK:
   1764 		{
   1765 			struct T_error_ack	*terror;
   1766 
   1767 			terror = (struct T_error_ack *)mp->b_rptr;
   1768 			RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
   1769 			    (void *)q);
   1770 			RPCLOG(1, " ERROR_prim: %s,",
   1771 			    rpc_tpiprim2name(terror->ERROR_prim));
   1772 			RPCLOG(1, " TLI_error: %s,",
   1773 			    rpc_tpierr2name(terror->TLI_error));
   1774 			RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
   1775 			if (terror->ERROR_prim == T_DISCON_REQ)  {
   1776 				clnt_dispatch_notifyall(WR(q), type, reason);
   1777 				freemsg(mp);
   1778 				return;
   1779 			} else {
   1780 				if (clnt_dispatch_notifyconn(WR(q), mp))
   1781 					return;
   1782 			}
   1783 			break;
   1784 		}
   1785 		case T_OK_ACK:
   1786 		{
   1787 			struct T_ok_ack	*tok = (struct T_ok_ack *)mp->b_rptr;
   1788 
   1789 			if (tok->CORRECT_prim == T_DISCON_REQ) {
   1790 				clnt_dispatch_notifyall(WR(q), type, reason);
   1791 				freemsg(mp);
   1792 				return;
   1793 			} else {
   1794 				if (clnt_dispatch_notifyconn(WR(q), mp))
   1795 					return;
   1796 			}
   1797 			break;
   1798 		}
   1799 		case T_CONN_CON:
   1800 		case T_INFO_ACK:
   1801 		case T_OPTMGMT_ACK:
   1802 			if (clnt_dispatch_notifyconn(WR(q), mp))
   1803 				return;
   1804 			break;
   1805 		case T_BIND_ACK:
   1806 			break;
   1807 		default:
   1808 			RPCLOG(1, "mir_rput: unexpected message %d "
   1809 			    "for KRPC client\n",
   1810 			    ((union T_primitives *)mp->b_rptr)->type);
   1811 			break;
   1812 		}
   1813 		break;
   1814 
   1815 	case RPC_SERVER:
   1816 		switch (type) {
   1817 		case T_BIND_ACK:
   1818 		{
   1819 			struct T_bind_ack	*tbind;
   1820 
   1821 			/*
   1822 			 * If this is a listening stream, then shut
   1823 			 * off the idle timer.
   1824 			 */
   1825 			tbind = (struct T_bind_ack *)mp->b_rptr;
   1826 			if (tbind->CONIND_number > 0) {
   1827 				mutex_enter(&mir->mir_mutex);
   1828 				mir_svc_idle_stop(WR(q), mir);
   1829 
   1830 				/*
   1831 				 * mark this as a listen endpoint
   1832 				 * for special handling.
   1833 				 */
   1834 
   1835 				mir->mir_listen_stream = 1;
   1836 				mutex_exit(&mir->mir_mutex);
   1837 			}
   1838 			break;
   1839 		}
   1840 		case T_DISCON_IND:
   1841 		case T_ORDREL_IND:
   1842 			RPCLOG(16, "mir_rput_proto: got %s indication\n",
   1843 			    type == T_DISCON_IND ? "disconnect"
   1844 			    : "orderly release");
   1845 
   1846 			/*
   1847 			 * For listen endpoint just pass
   1848 			 * on the message.
   1849 			 */
   1850 
   1851 			if (mir->mir_listen_stream)
   1852 				break;
   1853 
   1854 			mutex_enter(&mir->mir_mutex);
   1855 
   1856 			/*
   1857 			 * If client wants to break off connection, record
   1858 			 * that fact.
   1859 			 */
   1860 			mir_svc_start_close(WR(q), mir);
   1861 
   1862 			/*
   1863 			 * If we are idle, then send the orderly release
   1864 			 * or disconnect indication to nfsd.
   1865 			 */
   1866 			if (MIR_SVC_QUIESCED(mir)) {
   1867 				mutex_exit(&mir->mir_mutex);
   1868 				break;
   1869 			}
   1870 
   1871 			RPCLOG(16, "mir_rput_proto: not idle, so "
   1872 			    "disconnect/ord rel indication not passed "
   1873 			    "upstream on 0x%p\n", (void *)q);
   1874 
   1875 			/*
   1876 			 * Hold the indication until we get idle
   1877 			 * If there already is an indication stored,
   1878 			 * replace it if the new one is a disconnect. The
   1879 			 * reasoning is that disconnection takes less time
   1880 			 * to process, and once a client decides to
   1881 			 * disconnect, we should do that.
   1882 			 */
   1883 			if (mir->mir_svc_pend_mp) {
   1884 				if (type == T_DISCON_IND) {
   1885 					RPCLOG(16, "mir_rput_proto: replacing"
   1886 					    " held disconnect/ord rel"
   1887 					    " indication with disconnect on"
   1888 					    " 0x%p\n", (void *)q);
   1889 
   1890 					freemsg(mir->mir_svc_pend_mp);
   1891 					mir->mir_svc_pend_mp = mp;
   1892 				} else {
   1893 					RPCLOG(16, "mir_rput_proto: already "
   1894 					    "held a disconnect/ord rel "
   1895 					    "indication. freeing ord rel "
   1896 					    "ind on 0x%p\n", (void *)q);
   1897 					freemsg(mp);
   1898 				}
   1899 			} else
   1900 				mir->mir_svc_pend_mp = mp;
   1901 
   1902 			mutex_exit(&mir->mir_mutex);
   1903 			return;
   1904 
   1905 		default:
   1906 			/* nfsd handles server-side non-data messages. */
   1907 			break;
   1908 		}
   1909 		break;
   1910 
   1911 	default:
   1912 		break;
   1913 	}
   1914 
   1915 	putnext(q, mp);
   1916 }
   1917 
   1918 /*
   1919  * The server-side read queues are used to hold inbound messages while
   1920  * outbound flow control is exerted.  When outbound flow control is
   1921  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
   1922  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
   1923  *
   1924  * For the server side,  we have two types of messages queued. The first type
   1925  * are messages that are ready to be XDR decoded and and then sent to the
   1926  * RPC program's dispatch routine. The second type are "raw" messages that
   1927  * haven't been processed, i.e. assembled from rpc record fragements into
   1928  * full requests. The only time we will see the second type of message
   1929  * queued is if we have a memory allocation failure while processing a
   1930  * a raw message. The field mir_first_non_processed_mblk will mark the
   1931  * first such raw message. So the flow for server side is:
   1932  *
   1933  *	- send processed queued messages to kRPC until we run out or find
   1934  *	  one that needs additional processing because we were short on memory
   1935  *	  earlier
   1936  *	- process a message that was deferred because of lack of
   1937  *	  memory
   1938  *	- continue processing messages until the queue empties or we
   1939  *	  have to stop because of lack of memory
   1940  *	- during each of the above phase, if the queue is empty and
   1941  *	  there are no pending messages that were passed to the RPC
   1942  *	  layer, send upstream the pending disconnect/ordrel indication if
   1943  *	  there is one
   1944  *
   1945  * The read-side queue is also enabled by a bufcall callback if dupmsg
   1946  * fails in mir_rput.
   1947  */
   1948 static void
   1949 mir_rsrv(queue_t *q)
   1950 {
   1951 	mir_t	*mir;
   1952 	mblk_t	*mp;
   1953 	mblk_t	*cmp = NULL;
   1954 	boolean_t stop_timer = B_FALSE;
   1955 
   1956 	mir = (mir_t *)q->q_ptr;
   1957 	mutex_enter(&mir->mir_mutex);
   1958 
   1959 	mp = NULL;
   1960 	switch (mir->mir_type) {
   1961 	case RPC_SERVER:
   1962 		if (mir->mir_ref_cnt == 0)
   1963 			mir->mir_hold_inbound = 0;
   1964 		if (mir->mir_hold_inbound) {
   1965 
   1966 			ASSERT(cmp == NULL);
   1967 			if (q->q_first == NULL) {
   1968 
   1969 				MIR_CLEAR_INRSRV(mir);
   1970 
   1971 				if (MIR_SVC_QUIESCED(mir)) {
   1972 					cmp = mir->mir_svc_pend_mp;
   1973 					mir->mir_svc_pend_mp = NULL;
   1974 				}
   1975 			}
   1976 
   1977 			mutex_exit(&mir->mir_mutex);
   1978 
   1979 			if (cmp != NULL) {
   1980 				RPCLOG(16, "mir_rsrv: line %d: sending a held "
   1981 				    "disconnect/ord rel indication upstream\n",
   1982 				    __LINE__);
   1983 				putnext(q, cmp);
   1984 			}
   1985 
   1986 			return;
   1987 		}
   1988 		while (mp = getq(q)) {
   1989 			if (mir->mir_krpc_cell &&
   1990 			    (mir->mir_svc_no_more_msgs == 0)) {
   1991 				/*
   1992 				 * If we were idle, turn off idle timer since
   1993 				 * we aren't idle any more.
   1994 				 */
   1995 				if (mir->mir_ref_cnt++ == 0)
   1996 					stop_timer = B_TRUE;
   1997 				if (mir_check_len(q,
   1998 				    (int32_t)msgdsize(mp), mp))
   1999 					return;
   2000 				svc_queuereq(q, mp);
   2001 			} else {
   2002 				/*
   2003 				 * Count # of times this happens. Should be
   2004 				 * never, but experience shows otherwise.
   2005 				 */
   2006 				if (mir->mir_krpc_cell == NULL)
   2007 					mir_krpc_cell_null++;
   2008 				freemsg(mp);
   2009 			}
   2010 		}
   2011 		break;
   2012 	case RPC_CLIENT:
   2013 		break;
   2014 	default:
   2015 		RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
   2016 
   2017 		if (q->q_first == NULL)
   2018 			MIR_CLEAR_INRSRV(mir);
   2019 
   2020 		mutex_exit(&mir->mir_mutex);
   2021 
   2022 		return;
   2023 	}
   2024 
   2025 	/*
   2026 	 * The timer is stopped after all the messages are processed.
   2027 	 * The reason is that stopping the timer releases the mir_mutex
   2028 	 * lock temporarily.  This means that the request can be serviced
   2029 	 * while we are still processing the message queue.  This is not
   2030 	 * good.  So we stop the timer here instead.
   2031 	 */
   2032 	if (stop_timer)  {
   2033 		RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
   2034 		    "cnt going to non zero\n", (void *)WR(q));
   2035 		mir_svc_idle_stop(WR(q), mir);
   2036 	}
   2037 
   2038 	if (q->q_first == NULL) {
   2039 
   2040 		MIR_CLEAR_INRSRV(mir);
   2041 
   2042 		ASSERT(cmp == NULL);
   2043 		if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
   2044 			cmp = mir->mir_svc_pend_mp;
   2045 			mir->mir_svc_pend_mp = NULL;
   2046 		}
   2047 
   2048 		mutex_exit(&mir->mir_mutex);
   2049 
   2050 		if (cmp != NULL) {
   2051 			RPCLOG(16, "mir_rsrv: line %d: sending a held "
   2052 			    "disconnect/ord rel indication upstream\n",
   2053 			    __LINE__);
   2054 			putnext(q, cmp);
   2055 		}
   2056 
   2057 		return;
   2058 	}
   2059 	mutex_exit(&mir->mir_mutex);
   2060 }
   2061 
   2062 static int mir_svc_policy_fails;
   2063 
   2064 /*
   2065  * Called to send an event code to nfsd/lockd so that it initiates
   2066  * connection close.
   2067  */
   2068 static int
   2069 mir_svc_policy_notify(queue_t *q, int event)
   2070 {
   2071 	mblk_t	*mp;
   2072 #ifdef DEBUG
   2073 	mir_t *mir = (mir_t *)q->q_ptr;
   2074 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   2075 #endif
   2076 	ASSERT(q->q_flag & QREADR);
   2077 
   2078 	/*
   2079 	 * Create an M_DATA message with the event code and pass it to the
   2080 	 * Stream head (nfsd or whoever created the stream will consume it).
   2081 	 */
   2082 	mp = allocb(sizeof (int), BPRI_HI);
   2083 
   2084 	if (!mp) {
   2085 
   2086 		mir_svc_policy_fails++;
   2087 		RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
   2088 		    "%d\n", event);
   2089 		return (ENOMEM);
   2090 	}
   2091 
   2092 	U32_TO_BE32(event, mp->b_rptr);
   2093 	mp->b_wptr = mp->b_rptr + sizeof (int);
   2094 	putnext(q, mp);
   2095 	return (0);
   2096 }
   2097 
   2098 /*
   2099  * Server side: start the close phase. We want to get this rpcmod slot in an
   2100  * idle state before mir_close() is called.
   2101  */
   2102 static void
   2103 mir_svc_start_close(queue_t *wq, mir_t *mir)
   2104 {
   2105 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2106 	ASSERT((wq->q_flag & QREADR) == 0);
   2107 	ASSERT(mir->mir_type == RPC_SERVER);
   2108 
   2109 
   2110 	/*
   2111 	 * Do not accept any more messages.
   2112 	 */
   2113 	mir->mir_svc_no_more_msgs = 1;
   2114 
   2115 	/*
   2116 	 * Next two statements will make the read service procedure invoke
   2117 	 * svc_queuereq() on everything stuck in the streams read queue.
   2118 	 * It's not necessary because enabling the write queue will
   2119 	 * have the same effect, but why not speed the process along?
   2120 	 */
   2121 	mir->mir_hold_inbound = 0;
   2122 	qenable(RD(wq));
   2123 
   2124 	/*
   2125 	 * Meanwhile force the write service procedure to send the
   2126 	 * responses downstream, regardless of flow control.
   2127 	 */
   2128 	qenable(wq);
   2129 }
   2130 
   2131 /*
   2132  * This routine is called directly by KRPC after a request is completed,
   2133  * whether a reply was sent or the request was dropped.
   2134  */
   2135 static void
   2136 mir_svc_release(queue_t *wq, mblk_t *mp)
   2137 {
   2138 	mir_t   *mir = (mir_t *)wq->q_ptr;
   2139 	mblk_t	*cmp = NULL;
   2140 
   2141 	ASSERT((wq->q_flag & QREADR) == 0);
   2142 	if (mp)
   2143 		freemsg(mp);
   2144 
   2145 	mutex_enter(&mir->mir_mutex);
   2146 
   2147 	/*
   2148 	 * Start idle processing if this is the last reference.
   2149 	 */
   2150 	if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
   2151 		cmp = mir->mir_svc_pend_mp;
   2152 		mir->mir_svc_pend_mp = NULL;
   2153 	}
   2154 
   2155 	if (cmp) {
   2156 		RPCLOG(16, "mir_svc_release: sending a held "
   2157 		    "disconnect/ord rel indication upstream on queue 0x%p\n",
   2158 		    (void *)RD(wq));
   2159 
   2160 		mutex_exit(&mir->mir_mutex);
   2161 
   2162 		putnext(RD(wq), cmp);
   2163 
   2164 		mutex_enter(&mir->mir_mutex);
   2165 	}
   2166 
   2167 	/*
   2168 	 * Start idle processing if this is the last reference.
   2169 	 */
   2170 	if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
   2171 
   2172 		RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
   2173 		    "because ref cnt is zero\n", (void *) wq);
   2174 
   2175 		mir_svc_idle_start(wq, mir);
   2176 	}
   2177 
   2178 	mir->mir_ref_cnt--;
   2179 	ASSERT(mir->mir_ref_cnt >= 0);
   2180 
   2181 	/*
   2182 	 * Wake up the thread waiting to close.
   2183 	 */
   2184 
   2185 	if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
   2186 		cv_signal(&mir->mir_condvar);
   2187 
   2188 	mutex_exit(&mir->mir_mutex);
   2189 }
   2190 
   2191 /*
   2192  * This routine is called by server-side KRPC when it is ready to
   2193  * handle inbound messages on the stream.
   2194  */
   2195 static void
   2196 mir_svc_start(queue_t *wq)
   2197 {
   2198 	mir_t   *mir = (mir_t *)wq->q_ptr;
   2199 
   2200 	/*
   2201 	 * no longer need to take the mir_mutex because the
   2202 	 * mir_setup_complete field has been moved out of
   2203 	 * the binary field protected by the mir_mutex.
   2204 	 */
   2205 
   2206 	mir->mir_setup_complete = 1;
   2207 	qenable(RD(wq));
   2208 }
   2209 
   2210 /*
   2211  * client side wrapper for stopping timer with normal idle timeout.
   2212  */
   2213 static void
   2214 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
   2215 {
   2216 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2217 	ASSERT((wq->q_flag & QREADR) == 0);
   2218 	ASSERT(mir->mir_type == RPC_CLIENT);
   2219 
   2220 	mir_timer_stop(mir);
   2221 }
   2222 
   2223 /*
   2224  * client side wrapper for stopping timer with normal idle timeout.
   2225  */
   2226 static void
   2227 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
   2228 {
   2229 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2230 	ASSERT((wq->q_flag & QREADR) == 0);
   2231 	ASSERT(mir->mir_type == RPC_CLIENT);
   2232 
   2233 	mir_timer_start(wq, mir, mir->mir_idle_timeout);
   2234 }
   2235 
   2236 /*
   2237  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
   2238  * end-points that aren't connected.
   2239  */
   2240 static void
   2241 mir_clnt_idle_do_stop(queue_t *wq)
   2242 {
   2243 	mir_t   *mir = (mir_t *)wq->q_ptr;
   2244 
   2245 	RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
   2246 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   2247 	mutex_enter(&mir->mir_mutex);
   2248 	mir_clnt_idle_stop(wq, mir);
   2249 	mutex_exit(&mir->mir_mutex);
   2250 }
   2251 
   2252 /*
   2253  * Timer handler.  It handles idle timeout and memory shortage problem.
   2254  */
   2255 static void
   2256 mir_timer(void *arg)
   2257 {
   2258 	queue_t *wq = (queue_t *)arg;
   2259 	mir_t *mir = (mir_t *)wq->q_ptr;
   2260 	boolean_t notify;
   2261 	clock_t now;
   2262 
   2263 	mutex_enter(&mir->mir_mutex);
   2264 
   2265 	/*
   2266 	 * mir_timer_call is set only when either mir_timer_[start|stop]
   2267 	 * is progressing.  And mir_timer() can only be run while they
   2268 	 * are progressing if the timer is being stopped.  So just
   2269 	 * return.
   2270 	 */
   2271 	if (mir->mir_timer_call) {
   2272 		mutex_exit(&mir->mir_mutex);
   2273 		return;
   2274 	}
   2275 	mir->mir_timer_id = 0;
   2276 
   2277 	switch (mir->mir_type) {
   2278 	case RPC_CLIENT:
   2279 
   2280 		/*
   2281 		 * For clients, the timer fires at clnt_idle_timeout
   2282 		 * intervals.  If the activity marker (mir_clntreq) is
   2283 		 * zero, then the stream has been idle since the last
   2284 		 * timer event and we notify KRPC.  If mir_clntreq is
   2285 		 * non-zero, then the stream is active and we just
   2286 		 * restart the timer for another interval.  mir_clntreq
   2287 		 * is set to 1 in mir_wput for every request passed
   2288 		 * downstream.
   2289 		 *
   2290 		 * If this was a memory shortage timer reset the idle
   2291 		 * timeout regardless; the mir_clntreq will not be a
   2292 		 * valid indicator.
   2293 		 *
   2294 		 * The timer is initially started in mir_wput during
   2295 		 * RPC_CLIENT ioctl processing.
   2296 		 *
   2297 		 * The timer interval can be changed for individual
   2298 		 * streams with the ND variable "mir_idle_timeout".
   2299 		 */
   2300 		now = ddi_get_lbolt();
   2301 		if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
   2302 		    MSEC_TO_TICK(mir->mir_idle_timeout) - now >= 0) {
   2303 			clock_t tout;
   2304 
   2305 			tout = mir->mir_idle_timeout -
   2306 			    TICK_TO_MSEC(now - mir->mir_use_timestamp);
   2307 			if (tout < 0)
   2308 				tout = 1000;
   2309 #if 0
   2310 			printf("mir_timer[%d < %d + %d]: reset client timer "
   2311 			    "to %d (ms)\n", TICK_TO_MSEC(now),
   2312 			    TICK_TO_MSEC(mir->mir_use_timestamp),
   2313 			    mir->mir_idle_timeout, tout);
   2314 #endif
   2315 			mir->mir_clntreq = 0;
   2316 			mir_timer_start(wq, mir, tout);
   2317 			mutex_exit(&mir->mir_mutex);
   2318 			return;
   2319 		}
   2320 #if 0
   2321 printf("mir_timer[%d]: doing client timeout\n", now / hz);
   2322 #endif
   2323 		/*
   2324 		 * We are disconnecting, but not necessarily
   2325 		 * closing. By not closing, we will fail to
   2326 		 * pick up a possibly changed global timeout value,
   2327 		 * unless we store it now.
   2328 		 */
   2329 		mir->mir_idle_timeout = clnt_idle_timeout;
   2330 		mir_clnt_idle_start(wq, mir);
   2331 
   2332 		mutex_exit(&mir->mir_mutex);
   2333 		/*
   2334 		 * We pass T_ORDREL_REQ as an integer value
   2335 		 * to KRPC as the indication that the stream
   2336 		 * is idle.  This is not a T_ORDREL_REQ message,
   2337 		 * it is just a convenient value since we call
   2338 		 * the same KRPC routine for T_ORDREL_INDs and
   2339 		 * T_DISCON_INDs.
   2340 		 */
   2341 		clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
   2342 		return;
   2343 
   2344 	case RPC_SERVER:
   2345 
   2346 		/*
   2347 		 * For servers, the timer is only running when the stream
   2348 		 * is really idle or memory is short.  The timer is started
   2349 		 * by mir_wput when mir_type is set to RPC_SERVER and
   2350 		 * by mir_svc_idle_start whenever the stream goes idle
   2351 		 * (mir_ref_cnt == 0).  The timer is cancelled in
   2352 		 * mir_rput whenever a new inbound request is passed to KRPC
   2353 		 * and the stream was previously idle.
   2354 		 *
   2355 		 * The timer interval can be changed for individual
   2356 		 * streams with the ND variable "mir_idle_timeout".
   2357 		 *
   2358 		 * If the stream is not idle do nothing.
   2359 		 */
   2360 		if (!MIR_SVC_QUIESCED(mir)) {
   2361 			mutex_exit(&mir->mir_mutex);
   2362 			return;
   2363 		}
   2364 
   2365 		notify = !mir->mir_inrservice;
   2366 		mutex_exit(&mir->mir_mutex);
   2367 
   2368 		/*
   2369 		 * If there is no packet queued up in read queue, the stream
   2370 		 * is really idle so notify nfsd to close it.
   2371 		 */
   2372 		if (notify) {
   2373 			RPCLOG(16, "mir_timer: telling stream head listener "
   2374 			    "to close stream (0x%p)\n", (void *) RD(wq));
   2375 			(void) mir_svc_policy_notify(RD(wq), 1);
   2376 		}
   2377 		return;
   2378 	default:
   2379 		RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
   2380 		    mir->mir_type);
   2381 		mutex_exit(&mir->mir_mutex);
   2382 		return;
   2383 	}
   2384 }
   2385 
   2386 /*
   2387  * Called by the RPC package to send either a call or a return, or a
   2388  * transport connection request.  Adds the record marking header.
   2389  */
   2390 static void
   2391 mir_wput(queue_t *q, mblk_t *mp)
   2392 {
   2393 	uint_t	frag_header;
   2394 	mir_t	*mir = (mir_t *)q->q_ptr;
   2395 	uchar_t	*rptr = mp->b_rptr;
   2396 
   2397 	if (!mir) {
   2398 		freemsg(mp);
   2399 		return;
   2400 	}
   2401 
   2402 	if (mp->b_datap->db_type != M_DATA) {
   2403 		mir_wput_other(q, mp);
   2404 		return;
   2405 	}
   2406 
   2407 	if (mir->mir_ordrel_pending == 1) {
   2408 		freemsg(mp);
   2409 		RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
   2410 		    (void *)q);
   2411 		return;
   2412 	}
   2413 
   2414 	frag_header = (uint_t)DLEN(mp);
   2415 	frag_header |= MIR_LASTFRAG;
   2416 
   2417 	/* Stick in the 4 byte record marking header. */
   2418 	if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
   2419 	    !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
   2420 		/*
   2421 		 * Since we know that M_DATA messages are created exclusively
   2422 		 * by KRPC, we expect that KRPC will leave room for our header
   2423 		 * and 4 byte align which is normal for XDR.
   2424 		 * If KRPC (or someone else) does not cooperate, then we
   2425 		 * just throw away the message.
   2426 		 */
   2427 		RPCLOG(1, "mir_wput: KRPC did not leave space for record "
   2428 		    "fragment header (%d bytes left)\n",
   2429 		    (int)(rptr - mp->b_datap->db_base));
   2430 		freemsg(mp);
   2431 		return;
   2432 	}
   2433 	rptr -= sizeof (uint32_t);
   2434 	*(uint32_t *)rptr = htonl(frag_header);
   2435 	mp->b_rptr = rptr;
   2436 
   2437 	mutex_enter(&mir->mir_mutex);
   2438 	if (mir->mir_type == RPC_CLIENT) {
   2439 		/*
   2440 		 * For the client, set mir_clntreq to indicate that the
   2441 		 * connection is active.
   2442 		 */
   2443 		mir->mir_clntreq = 1;
   2444 		mir->mir_use_timestamp = ddi_get_lbolt();
   2445 	}
   2446 
   2447 	/*
   2448 	 * If we haven't already queued some data and the downstream module
   2449 	 * can accept more data, send it on, otherwise we queue the message
   2450 	 * and take other actions depending on mir_type.
   2451 	 */
   2452 	if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
   2453 		mutex_exit(&mir->mir_mutex);
   2454 
   2455 		/*
   2456 		 * Now we pass the RPC message downstream.
   2457 		 */
   2458 		putnext(q, mp);
   2459 		return;
   2460 	}
   2461 
   2462 	switch (mir->mir_type) {
   2463 	case RPC_CLIENT:
   2464 		/*
   2465 		 * Check for a previous duplicate request on the
   2466 		 * queue.  If there is one, then we throw away
   2467 		 * the current message and let the previous one
   2468 		 * go through.  If we can't find a duplicate, then
   2469 		 * send this one.  This tap dance is an effort
   2470 		 * to reduce traffic and processing requirements
   2471 		 * under load conditions.
   2472 		 */
   2473 		if (mir_clnt_dup_request(q, mp)) {
   2474 			mutex_exit(&mir->mir_mutex);
   2475 			freemsg(mp);
   2476 			return;
   2477 		}
   2478 		break;
   2479 	case RPC_SERVER:
   2480 		/*
   2481 		 * Set mir_hold_inbound so that new inbound RPC
   2482 		 * messages will be held until the client catches
   2483 		 * up on the earlier replies.  This flag is cleared
   2484 		 * in mir_wsrv after flow control is relieved;
   2485 		 * the read-side queue is also enabled at that time.
   2486 		 */
   2487 		mir->mir_hold_inbound = 1;
   2488 		break;
   2489 	default:
   2490 		RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
   2491 		break;
   2492 	}
   2493 	mir->mir_inwservice = 1;
   2494 	(void) putq(q, mp);
   2495 	mutex_exit(&mir->mir_mutex);
   2496 }
   2497 
   2498 static void
   2499 mir_wput_other(queue_t *q, mblk_t *mp)
   2500 {
   2501 	mir_t	*mir = (mir_t *)q->q_ptr;
   2502 	struct iocblk	*iocp;
   2503 	uchar_t	*rptr = mp->b_rptr;
   2504 	bool_t	flush_in_svc = FALSE;
   2505 
   2506 	ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
   2507 	switch (mp->b_datap->db_type) {
   2508 	case M_IOCTL:
   2509 		iocp = (struct iocblk *)rptr;
   2510 		switch (iocp->ioc_cmd) {
   2511 		case RPC_CLIENT:
   2512 			mutex_enter(&mir->mir_mutex);
   2513 			if (mir->mir_type != 0 &&
   2514 			    mir->mir_type != iocp->ioc_cmd) {
   2515 ioc_eperm:
   2516 				mutex_exit(&mir->mir_mutex);
   2517 				iocp->ioc_error = EPERM;
   2518 				iocp->ioc_count = 0;
   2519 				mp->b_datap->db_type = M_IOCACK;
   2520 				qreply(q, mp);
   2521 				return;
   2522 			}
   2523 
   2524 			mir->mir_type = iocp->ioc_cmd;
   2525 
   2526 			/*
   2527 			 * Clear mir_hold_inbound which was set to 1 by
   2528 			 * mir_open.  This flag is not used on client
   2529 			 * streams.
   2530 			 */
   2531 			mir->mir_hold_inbound = 0;
   2532 			mir->mir_max_msg_sizep = &clnt_max_msg_size;
   2533 
   2534 			/*
   2535 			 * Start the idle timer.  See mir_timer() for more
   2536 			 * information on how client timers work.
   2537 			 */
   2538 			mir->mir_idle_timeout = clnt_idle_timeout;
   2539 			mir_clnt_idle_start(q, mir);
   2540 			mutex_exit(&mir->mir_mutex);
   2541 
   2542 			mp->b_datap->db_type = M_IOCACK;
   2543 			qreply(q, mp);
   2544 			return;
   2545 		case RPC_SERVER:
   2546 			mutex_enter(&mir->mir_mutex);
   2547 			if (mir->mir_type != 0 &&
   2548 			    mir->mir_type != iocp->ioc_cmd)
   2549 				goto ioc_eperm;
   2550 
   2551 			/*
   2552 			 * We don't clear mir_hold_inbound here because
   2553 			 * mir_hold_inbound is used in the flow control
   2554 			 * model. If we cleared it here, then we'd commit
   2555 			 * a small violation to the model where the transport
   2556 			 * might immediately block downstream flow.
   2557 			 */
   2558 
   2559 			mir->mir_type = iocp->ioc_cmd;
   2560 			mir->mir_max_msg_sizep = &svc_max_msg_size;
   2561 
   2562 			/*
   2563 			 * Start the idle timer.  See mir_timer() for more
   2564 			 * information on how server timers work.
   2565 			 *
   2566 			 * Note that it is important to start the idle timer
   2567 			 * here so that connections time out even if we
   2568 			 * never receive any data on them.
   2569 			 */
   2570 			mir->mir_idle_timeout = svc_idle_timeout;
   2571 			RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
   2572 			    "because we got RPC_SERVER ioctl\n", (void *)q);
   2573 			mir_svc_idle_start(q, mir);
   2574 			mutex_exit(&mir->mir_mutex);
   2575 
   2576 			mp->b_datap->db_type = M_IOCACK;
   2577 			qreply(q, mp);
   2578 			return;
   2579 		default:
   2580 			break;
   2581 		}
   2582 		break;
   2583 
   2584 	case M_PROTO:
   2585 		if (mir->mir_type == RPC_CLIENT) {
   2586 			/*
   2587 			 * We are likely being called from the context of a
   2588 			 * service procedure. So we need to enqueue. However
   2589 			 * enqueing may put our message behind data messages.
   2590 			 * So flush the data first.
   2591 			 */
   2592 			flush_in_svc = TRUE;
   2593 		}
   2594 		if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
   2595 		    !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
   2596 			break;
   2597 
   2598 		switch (((union T_primitives *)rptr)->type) {
   2599 		case T_DATA_REQ:
   2600 			/* Don't pass T_DATA_REQ messages downstream. */
   2601 			freemsg(mp);
   2602 			return;
   2603 		case T_ORDREL_REQ:
   2604 			RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
   2605 			    (void *)q);
   2606 			mutex_enter(&mir->mir_mutex);
   2607 			if (mir->mir_type != RPC_SERVER) {
   2608 				/*
   2609 				 * We are likely being called from
   2610 				 * clnt_dispatch_notifyall(). Sending
   2611 				 * a T_ORDREL_REQ will result in
   2612 				 * a some kind of _IND message being sent,
   2613 				 * will be another call to
   2614 				 * clnt_dispatch_notifyall(). To keep the stack
   2615 				 * lean, queue this message.
   2616 				 */
   2617 				mir->mir_inwservice = 1;
   2618 				(void) putq(q, mp);
   2619 				mutex_exit(&mir->mir_mutex);
   2620 				return;
   2621 			}
   2622 
   2623 			/*
   2624 			 * Mark the structure such that we don't accept any
   2625 			 * more requests from client. We could defer this
   2626 			 * until we actually send the orderly release
   2627 			 * request downstream, but all that does is delay
   2628 			 * the closing of this stream.
   2629 			 */
   2630 			RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
   2631 			    " so calling mir_svc_start_close\n", (void *)q);
   2632 
   2633 			mir_svc_start_close(q, mir);
   2634 
   2635 			/*
   2636 			 * If we have sent down a T_ORDREL_REQ, don't send
   2637 			 * any more.
   2638 			 */
   2639 			if (mir->mir_ordrel_pending) {
   2640 				freemsg(mp);
   2641 				mutex_exit(&mir->mir_mutex);
   2642 				return;
   2643 			}
   2644 
   2645 			/*
   2646 			 * If the stream is not idle, then we hold the
   2647 			 * orderly release until it becomes idle.  This
   2648 			 * ensures that KRPC will be able to reply to
   2649 			 * all requests that we have passed to it.
   2650 			 *
   2651 			 * We also queue the request if there is data already
   2652 			 * queued, because we cannot allow the T_ORDREL_REQ
   2653 			 * to go before data. When we had a separate reply
   2654 			 * count, this was not a problem, because the
   2655 			 * reply count was reconciled when mir_wsrv()
   2656 			 * completed.
   2657 			 */
   2658 			if (!MIR_SVC_QUIESCED(mir) ||
   2659 			    mir->mir_inwservice == 1) {
   2660 				mir->mir_inwservice = 1;
   2661 				(void) putq(q, mp);
   2662 
   2663 				RPCLOG(16, "mir_wput_other: queuing "
   2664 				    "T_ORDREL_REQ on 0x%p\n", (void *)q);
   2665 
   2666 				mutex_exit(&mir->mir_mutex);
   2667 				return;
   2668 			}
   2669 
   2670 			/*
   2671 			 * Mark the structure so that we know we sent
   2672 			 * an orderly release request, and reset the idle timer.
   2673 			 */
   2674 			mir->mir_ordrel_pending = 1;
   2675 
   2676 			RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
   2677 			    " on 0x%p because we got T_ORDREL_REQ\n",
   2678 			    (void *)q);
   2679 
   2680 			mir_svc_idle_start(q, mir);
   2681 			mutex_exit(&mir->mir_mutex);
   2682 
   2683 			/*
   2684 			 * When we break, we will putnext the T_ORDREL_REQ.
   2685 			 */
   2686 			break;
   2687 
   2688 		case T_CONN_REQ:
   2689 			mutex_enter(&mir->mir_mutex);
   2690 			if (mir->mir_head_mp != NULL) {
   2691 				freemsg(mir->mir_head_mp);
   2692 				mir->mir_head_mp = NULL;
   2693 				mir->mir_tail_mp = NULL;
   2694 			}
   2695 			mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   2696 			/*
   2697 			 * Restart timer in case mir_clnt_idle_do_stop() was
   2698 			 * called.
   2699 			 */
   2700 			mir->mir_idle_timeout = clnt_idle_timeout;
   2701 			mir_clnt_idle_stop(q, mir);
   2702 			mir_clnt_idle_start(q, mir);
   2703 			mutex_exit(&mir->mir_mutex);
   2704 			break;
   2705 
   2706 		default:
   2707 			/*
   2708 			 * T_DISCON_REQ is one of the interesting default
   2709 			 * cases here. Ideally, an M_FLUSH is done before
   2710 			 * T_DISCON_REQ is done. However, that is somewhat
   2711 			 * cumbersome for clnt_cots.c to do. So we queue
   2712 			 * T_DISCON_REQ, and let the service procedure
   2713 			 * flush all M_DATA.
   2714 			 */
   2715 			break;
   2716 		}
   2717 		/* fallthru */;
   2718 	default:
   2719 		if (mp->b_datap->db_type >= QPCTL) {
   2720 			if (mp->b_datap->db_type == M_FLUSH) {
   2721 				if (mir->mir_type == RPC_CLIENT &&
   2722 				    *mp->b_rptr & FLUSHW) {
   2723 					RPCLOG(32, "mir_wput_other: flushing "
   2724 					    "wq 0x%p\n", (void *)q);
   2725 					if (*mp->b_rptr & FLUSHBAND) {
   2726 						flushband(q, *(mp->b_rptr + 1),
   2727 						    FLUSHDATA);
   2728 					} else {
   2729 						flushq(q, FLUSHDATA);
   2730 					}
   2731 				} else {
   2732 					RPCLOG(32, "mir_wput_other: ignoring "
   2733 					    "M_FLUSH on wq 0x%p\n", (void *)q);
   2734 				}
   2735 			}
   2736 			break;
   2737 		}
   2738 
   2739 		mutex_enter(&mir->mir_mutex);
   2740 		if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
   2741 			mutex_exit(&mir->mir_mutex);
   2742 			break;
   2743 		}
   2744 		mir->mir_inwservice = 1;
   2745 		mir->mir_inwflushdata = flush_in_svc;
   2746 		(void) putq(q, mp);
   2747 		mutex_exit(&mir->mir_mutex);
   2748 		qenable(q);
   2749 
   2750 		return;
   2751 	}
   2752 	putnext(q, mp);
   2753 }
   2754 
   2755 static void
   2756 mir_wsrv(queue_t *q)
   2757 {
   2758 	mblk_t	*mp;
   2759 	mir_t	*mir;
   2760 	bool_t flushdata;
   2761 
   2762 	mir = (mir_t *)q->q_ptr;
   2763 	mutex_enter(&mir->mir_mutex);
   2764 
   2765 	flushdata = mir->mir_inwflushdata;
   2766 	mir->mir_inwflushdata = 0;
   2767 
   2768 	while (mp = getq(q)) {
   2769 		if (mp->b_datap->db_type == M_DATA) {
   2770 			/*
   2771 			 * Do not send any more data if we have sent
   2772 			 * a T_ORDREL_REQ.
   2773 			 */
   2774 			if (flushdata || mir->mir_ordrel_pending == 1) {
   2775 				freemsg(mp);
   2776 				continue;
   2777 			}
   2778 
   2779 			/*
   2780 			 * Make sure that the stream can really handle more
   2781 			 * data.
   2782 			 */
   2783 			if (!MIR_WCANPUTNEXT(mir, q)) {
   2784 				(void) putbq(q, mp);
   2785 				mutex_exit(&mir->mir_mutex);
   2786 				return;
   2787 			}
   2788 
   2789 			/*
   2790 			 * Now we pass the RPC message downstream.
   2791 			 */
   2792 			mutex_exit(&mir->mir_mutex);
   2793 			putnext(q, mp);
   2794 			mutex_enter(&mir->mir_mutex);
   2795 			continue;
   2796 		}
   2797 
   2798 		/*
   2799 		 * This is not an RPC message, pass it downstream
   2800 		 * (ignoring flow control) if the server side is not sending a
   2801 		 * T_ORDREL_REQ downstream.
   2802 		 */
   2803 		if (mir->mir_type != RPC_SERVER ||
   2804 		    ((union T_primitives *)mp->b_rptr)->type !=
   2805 		    T_ORDREL_REQ) {
   2806 			mutex_exit(&mir->mir_mutex);
   2807 			putnext(q, mp);
   2808 			mutex_enter(&mir->mir_mutex);
   2809 			continue;
   2810 		}
   2811 
   2812 		if (mir->mir_ordrel_pending == 1) {
   2813 			/*
   2814 			 * Don't send two T_ORDRELs
   2815 			 */
   2816 			freemsg(mp);
   2817 			continue;
   2818 		}
   2819 
   2820 		/*
   2821 		 * Mark the structure so that we know we sent an orderly
   2822 		 * release request.  We will check to see slot is idle at the
   2823 		 * end of this routine, and if so, reset the idle timer to
   2824 		 * handle orderly release timeouts.
   2825 		 */
   2826 		mir->mir_ordrel_pending = 1;
   2827 		RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
   2828 		    (void *)q);
   2829 		/*
   2830 		 * Send the orderly release downstream. If there are other
   2831 		 * pending replies we won't be able to send them.  However,
   2832 		 * the only reason we should send the orderly release is if
   2833 		 * we were idle, or if an unusual event occurred.
   2834 		 */
   2835 		mutex_exit(&mir->mir_mutex);
   2836 		putnext(q, mp);
   2837 		mutex_enter(&mir->mir_mutex);
   2838 	}
   2839 
   2840 	if (q->q_first == NULL)
   2841 		/*
   2842 		 * If we call mir_svc_idle_start() below, then
   2843 		 * clearing mir_inwservice here will also result in
   2844 		 * any thread waiting in mir_close() to be signaled.
   2845 		 */
   2846 		mir->mir_inwservice = 0;
   2847 
   2848 	if (mir->mir_type != RPC_SERVER) {
   2849 		mutex_exit(&mir->mir_mutex);
   2850 		return;
   2851 	}
   2852 
   2853 	/*
   2854 	 * If idle we call mir_svc_idle_start to start the timer (or wakeup
   2855 	 * a close). Also make sure not to start the idle timer on the
   2856 	 * listener stream. This can cause nfsd to send an orderly release
   2857 	 * command on the listener stream.
   2858 	 */
   2859 	if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
   2860 		RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
   2861 		    "because mir slot is idle\n", (void *)q);
   2862 		mir_svc_idle_start(q, mir);
   2863 	}
   2864 
   2865 	/*
   2866 	 * If outbound flow control has been relieved, then allow new
   2867 	 * inbound requests to be processed.
   2868 	 */
   2869 	if (mir->mir_hold_inbound) {
   2870 		mir->mir_hold_inbound = 0;
   2871 		qenable(RD(q));
   2872 	}
   2873 	mutex_exit(&mir->mir_mutex);
   2874 }
   2875 
   2876 static void
   2877 mir_disconnect(queue_t *q, mir_t *mir)
   2878 {
   2879 	ASSERT(MUTEX_HELD(&mir->mir_mutex));
   2880 
   2881 	switch (mir->mir_type) {
   2882 	case RPC_CLIENT:
   2883 		/*
   2884 		 * We are disconnecting, but not necessarily
   2885 		 * closing. By not closing, we will fail to
   2886 		 * pick up a possibly changed global timeout value,
   2887 		 * unless we store it now.
   2888 		 */
   2889 		mir->mir_idle_timeout = clnt_idle_timeout;
   2890 		mir_clnt_idle_start(WR(q), mir);
   2891 		mutex_exit(&mir->mir_mutex);
   2892 
   2893 		/*
   2894 		 * T_DISCON_REQ is passed to KRPC as an integer value
   2895 		 * (this is not a TPI message).  It is used as a
   2896 		 * convenient value to indicate a sanity check
   2897 		 * failure -- the same KRPC routine is also called
   2898 		 * for T_DISCON_INDs and T_ORDREL_INDs.
   2899 		 */
   2900 		clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
   2901 		break;
   2902 
   2903 	case RPC_SERVER:
   2904 		mir->mir_svc_no_more_msgs = 1;
   2905 		mir_svc_idle_stop(WR(q), mir);
   2906 		mutex_exit(&mir->mir_mutex);
   2907 		RPCLOG(16, "mir_disconnect: telling "
   2908 		    "stream head listener to disconnect stream "
   2909 		    "(0x%p)\n", (void *) q);
   2910 		(void) mir_svc_policy_notify(q, 2);
   2911 		break;
   2912 
   2913 	default:
   2914 		mutex_exit(&mir->mir_mutex);
   2915 		break;
   2916 	}
   2917 }
   2918 
   2919 /*
   2920  * Sanity check the message length, and if it's too large, shutdown the
   2921  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
   2922  */
   2923 static int
   2924 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
   2925 {
   2926 	mir_t *mir = q->q_ptr;
   2927 	uint_t maxsize = 0;
   2928 
   2929 	if (mir->mir_max_msg_sizep != NULL)
   2930 		maxsize = *mir->mir_max_msg_sizep;
   2931 
   2932 	if (maxsize == 0 || frag_len <= (int)maxsize)
   2933 		return (0);
   2934 
   2935 	freemsg(head_mp);
   2936 	mir->mir_head_mp = NULL;
   2937 	mir->mir_tail_mp = NULL;
   2938 	mir->mir_frag_header = 0;
   2939 	mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
   2940 	if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
   2941 		cmn_err(CE_NOTE,
   2942 		    "KRPC: record fragment from %s of size(%d) exceeds "
   2943 		    "maximum (%u). Disconnecting",
   2944 		    (mir->mir_type == RPC_CLIENT) ? "server" :
   2945 		    (mir->mir_type == RPC_SERVER) ? "client" :
   2946 		    "test tool", frag_len, maxsize);
   2947 	}
   2948 
   2949 	mir_disconnect(q, mir);
   2950 	return (1);
   2951 }
   2952