Home | History | Annotate | Download | only in rpc
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 /*
     28  * Copyright 1993 OpenVision Technologies, Inc., All Rights Reserved.
     29  */
     30 
     31 /*	Copyright (c) 1983, 1984, 1985,  1986, 1987, 1988, 1989 AT&T	*/
     32 /*	  All Rights Reserved  	*/
     33 
     34 /*
     35  * Portions of this source code were derived from Berkeley 4.3 BSD
     36  * under license from the Regents of the University of California.
     37  */
     38 
     39 /*
     40  * Server-side remote procedure call interface.
     41  *
     42  * Master transport handle (SVCMASTERXPRT).
     43  *   The master transport handle structure is shared among service
     44  *   threads processing events on the transport. Some fields in the
     45  *   master structure are protected by locks
     46  *   - xp_req_lock protects the request queue:
     47  *	xp_req_head, xp_req_tail
     48  *   - xp_thread_lock protects the thread (clone) counts
     49  *	xp_threads, xp_detached_threads, xp_wq
     50  *   Each master transport is registered to exactly one thread pool.
     51  *
     52  * Clone transport handle (SVCXPRT)
     53  *   The clone transport handle structure is a per-service-thread handle
     54  *   to the transport. The structure carries all the fields/buffers used
     55  *   for request processing. A service thread or, in other words, a clone
     56  *   structure, can be linked to an arbitrary master structure to process
     57  *   requests on this transport. The master handle keeps track of reference
     58  *   counts of threads (clones) linked to it. A service thread can switch
     59  *   to another transport by unlinking its clone handle from the current
     60  *   transport and linking to a new one. Switching is relatively inexpensive
     61  *   but it involves locking (master's xprt->xp_thread_lock).
     62  *
     63  * Pools.
     64  *   A pool represents a kernel RPC service (NFS, Lock Manager, etc.).
     65  *   Transports related to the service are registered to the service pool.
     66  *   Service threads can switch between different transports in the pool.
     67  *   Thus, each service has its own pool of service threads. The maximum
     68  *   number of threads in a pool is pool->p_maxthreads. This limit allows
     69  *   to restrict resource usage by the service. Some fields are protected
     70  *   by locks:
     71  *   - p_req_lock protects several counts and flags:
     72  *	p_reqs, p_walkers, p_asleep, p_drowsy, p_req_cv
     73  *   - p_thread_lock governs other thread counts:
     74  *	p_threads, p_detached_threads, p_reserved_threads, p_closing
     75  *
     76  *   In addition, each pool contains a doubly-linked list of transports,
     77  *   an `xprt-ready' queue and a creator thread (see below). Threads in
     78  *   the pool share some other parameters such as stack size and
     79  *   polling timeout.
     80  *
     81  *   Pools are initialized through the svc_pool_create() function called from
     82  *   the nfssys() system call. However, thread creation must be done by
     83  *   the userland agent. This is done by using SVCPOOL_WAIT and
     84  *   SVCPOOL_RUN arguments to nfssys(), which call svc_wait() and
     85  *   svc_do_run(), respectively. Once the pool has been initialized,
     86  *   the userland process must set up a 'creator' thread. This thread
     87  *   should park itself in the kernel by calling svc_wait(). If
     88  *   svc_wait() returns successfully, it should fork off a new worker
     89  *   thread, which then calls svc_do_run() in order to get work. When
     90  *   that thread is complete, svc_do_run() will return, and the user
     91  *   program should call thr_exit().
     92  *
     93  *   When we try to register a new pool and there is an old pool with
     94  *   the same id in the doubly linked pool list (this happens when we kill
     95  *   and restart nfsd or lockd), then we unlink the old pool from the list
     96  *   and mark its state as `closing'. After that the transports can still
     97  *   process requests but new transports won't be registered. When all the
     98  *   transports and service threads associated with the pool are gone the
     99  *   creator thread (see below) will clean up the pool structure and exit.
    100  *
    101  * svc_queuereq() and svc_run().
    102  *   The kernel RPC server is interrupt driven. The svc_queuereq() interrupt
    103  *   routine is called to deliver an RPC request. The service threads
    104  *   loop in svc_run(). The interrupt function queues a request on the
    105  *   transport's queue and it makes sure that the request is serviced.
    106  *   It may either wake up one of sleeping threads, or ask for a new thread
    107  *   to be created, or, if the previous request is just being picked up, do
    108  *   nothing. In the last case the service thread that is picking up the
    109  *   previous request will wake up or create the next thread. After a service
    110  *   thread processes a request and sends a reply it returns to svc_run()
    111  *   and svc_run() calls svc_poll() to find new input.
    112  *
    113  *   There is no longer an "inconsistent" but "safe" optimization in the
    114  *   svc_queuereq() code. This "inconsistent" state was leading to
    115  *   inconsistencies between the actual number of requests and the value
    116  *   of p_reqs (the total number of requests). Because of this, hangs were
    117  *   occurring in svc_poll() where p_reqs was greater than one and no
    118  *   requests were found on the request queues.
    119  *
    120  * svc_poll().
    121  *   In order to avoid unnecessary locking, which causes performance
    122  *   problems, we always look for a pending request on the current transport.
    123  *   If there is none we take a hint from the pool's `xprt-ready' queue.
    124  *   If the queue had an overflow we switch to the `drain' mode checking
    125  *   each transport  in the pool's transport list. Once we find a
    126  *   master transport handle with a pending request we latch the request
    127  *   lock on this transport and return to svc_run(). If the request
    128  *   belongs to a transport different than the one the service thread is
    129  *   linked to we need to unlink and link again.
    130  *
    131  *   A service thread goes asleep when there are no pending
    132  *   requests on the transports registered on the pool's transports.
    133  *   All the pool's threads sleep on the same condition variable.
    134  *   If a thread has been sleeping for too long period of time
    135  *   (by default 5 seconds) it wakes up and exits.  Also when a transport
    136  *   is closing sleeping threads wake up to unlink from this transport.
    137  *
    138  * The `xprt-ready' queue.
    139  *   If a service thread finds no request on a transport it is currently linked
    140  *   to it will find another transport with a pending request. To make
    141  *   this search more efficient each pool has an `xprt-ready' queue.
    142  *   The queue is a FIFO. When the interrupt routine queues a request it also
    143  *   inserts a pointer to the transport into the `xprt-ready' queue. A
    144  *   thread looking for a transport with a pending request can pop up a
    145  *   transport and check for a request. The request can be already gone
    146  *   since it could be taken by a thread linked to that transport. In such a
    147  *   case we try the next hint. The `xprt-ready' queue has fixed size (by
    148  *   default 256 nodes). If it overflows svc_poll() has to switch to the
    149  *   less efficient but safe `drain' mode and walk through the pool's
    150  *   transport list.
    151  *
    152  *   Both the svc_poll() loop and the `xprt-ready' queue are optimized
    153  *   for the peak load case that is for the situation when the queue is not
    154  *   empty, there are all the time few pending requests, and a service
    155  *   thread which has just processed a request does not go asleep but picks
    156  *   up immediately the next request.
    157  *
    158  * Thread creator.
    159  *   Each pool has a thread creator associated with it. The creator thread
    160  *   sleeps on a condition variable and waits for a signal to create a
    161  *   service thread. The actual thread creation is done in userland by
    162  *   the method described in "Pools" above.
    163  *
    164  *   Signaling threads should turn on the `creator signaled' flag, and
    165  *   can avoid sending signals when the flag is on. The flag is cleared
    166  *   when the thread is created.
    167  *
    168  *   When the pool is in closing state (ie it has been already unregistered
    169  *   from the pool list) the last thread on the last transport in the pool
    170  *   should turn the p_creator_exit flag on. The creator thread will
    171  *   clean up the pool structure and exit.
    172  *
    173  * Thread reservation; Detaching service threads.
    174  *   A service thread can detach itself to block for an extended amount
    175  *   of time. However, to keep the service active we need to guarantee
    176  *   at least pool->p_redline non-detached threads that can process incoming
    177  *   requests. This, the maximum number of detached and reserved threads is
    178  *   p->p_maxthreads - p->p_redline. A service thread should first acquire
    179  *   a reservation, and if the reservation was granted it can detach itself.
    180  *   If a reservation was granted but the thread does not detach itself
    181  *   it should cancel the reservation before it returns to svc_run().
    182  */
    183 
    184 #include <sys/param.h>
    185 #include <sys/types.h>
    186 #include <rpc/types.h>
    187 #include <sys/socket.h>
    188 #include <sys/time.h>
    189 #include <sys/tiuser.h>
    190 #include <sys/t_kuser.h>
    191 #include <netinet/in.h>
    192 #include <rpc/xdr.h>
    193 #include <rpc/auth.h>
    194 #include <rpc/clnt.h>
    195 #include <rpc/rpc_msg.h>
    196 #include <rpc/svc.h>
    197 #include <sys/proc.h>
    198 #include <sys/user.h>
    199 #include <sys/stream.h>
    200 #include <sys/strsubr.h>
    201 #include <sys/tihdr.h>
    202 #include <sys/debug.h>
    203 #include <sys/cmn_err.h>
    204 #include <sys/file.h>
    205 #include <sys/systm.h>
    206 #include <sys/callb.h>
    207 #include <sys/vtrace.h>
    208 #include <sys/zone.h>
    209 #include <nfs/nfs.h>
    210 #include <sys/tsol/label_macro.h>
    211 
    212 #define	RQCRED_SIZE	400	/* this size is excessive */
    213 
    214 /*
    215  * Defines for svc_poll()
    216  */
    217 #define	SVC_EXPRTGONE ((SVCMASTERXPRT *)1)	/* Transport is closing */
    218 #define	SVC_ETIMEDOUT ((SVCMASTERXPRT *)2)	/* Timeout */
    219 #define	SVC_EINTR ((SVCMASTERXPRT *)3)		/* Interrupted by signal */
    220 
    221 /*
    222  * Default stack size for service threads.
    223  */
    224 #define	DEFAULT_SVC_RUN_STKSIZE		(0)	/* default kernel stack */
    225 
    226 int    svc_default_stksize = DEFAULT_SVC_RUN_STKSIZE;
    227 
    228 /*
    229  * Default polling timeout for service threads.
    230  * Multiplied by hz when used.
    231  */
    232 #define	DEFAULT_SVC_POLL_TIMEOUT	(5)	/* seconds */
    233 
    234 clock_t svc_default_timeout = DEFAULT_SVC_POLL_TIMEOUT;
    235 
    236 /*
    237  * Size of the `xprt-ready' queue.
    238  */
    239 #define	DEFAULT_SVC_QSIZE		(256)	/* qnodes */
    240 
    241 size_t svc_default_qsize = DEFAULT_SVC_QSIZE;
    242 
    243 /*
    244  * Default limit for the number of service threads.
    245  */
    246 #define	DEFAULT_SVC_MAXTHREADS		(INT16_MAX)
    247 
    248 int    svc_default_maxthreads = DEFAULT_SVC_MAXTHREADS;
    249 
    250 /*
    251  * Maximum number of requests from the same transport (in `drain' mode).
    252  */
    253 #define	DEFAULT_SVC_MAX_SAME_XPRT	(8)
    254 
    255 int    svc_default_max_same_xprt = DEFAULT_SVC_MAX_SAME_XPRT;
    256 
    257 
    258 /*
    259  * Default `Redline' of non-detached threads.
    260  * Total number of detached and reserved threads in an RPC server
    261  * thread pool is limited to pool->p_maxthreads - svc_redline.
    262  */
    263 #define	DEFAULT_SVC_REDLINE		(1)
    264 
    265 int    svc_default_redline = DEFAULT_SVC_REDLINE;
    266 
    267 /*
    268  * A node for the `xprt-ready' queue.
    269  * See below.
    270  */
    271 struct __svcxprt_qnode {
    272 	__SVCXPRT_QNODE	*q_next;
    273 	SVCMASTERXPRT	*q_xprt;
    274 };
    275 
    276 /*
    277  * Global SVC variables (private).
    278  */
    279 struct svc_globals {
    280 	SVCPOOL		*svc_pools;
    281 	kmutex_t	svc_plock;
    282 };
    283 
    284 /*
    285  * Debug variable to check for rdma based
    286  * transport startup and cleanup. Contorlled
    287  * through /etc/system. Off by default.
    288  */
    289 int rdma_check = 0;
    290 
    291 /*
    292  * Authentication parameters list.
    293  */
    294 static caddr_t rqcred_head;
    295 static kmutex_t rqcred_lock;
    296 
    297 /*
    298  * Pointers to transport specific `rele' routines in rpcmod (set from rpcmod).
    299  */
    300 void	(*rpc_rele)(queue_t *, mblk_t *) = NULL;
    301 void	(*mir_rele)(queue_t *, mblk_t *) = NULL;
    302 
    303 /* ARGSUSED */
    304 void
    305 rpc_rdma_rele(queue_t *q, mblk_t *mp)
    306 {
    307 }
    308 void    (*rdma_rele)(queue_t *, mblk_t *) = rpc_rdma_rele;
    309 
    310 
    311 /*
    312  * This macro picks which `rele' routine to use, based on the transport type.
    313  */
    314 #define	RELE_PROC(xprt) \
    315 	((xprt)->xp_type == T_RDMA ? rdma_rele : \
    316 	(((xprt)->xp_type == T_CLTS) ? rpc_rele : mir_rele))
    317 
    318 /*
    319  * If true, then keep quiet about version mismatch.
    320  * This macro is for broadcast RPC only. We have no broadcast RPC in
    321  * kernel now but one may define a flag in the transport structure
    322  * and redefine this macro.
    323  */
    324 #define	version_keepquiet(xprt)	(FALSE)
    325 
    326 /*
    327  * ZSD key used to retrieve zone-specific svc globals
    328  */
    329 static zone_key_t svc_zone_key;
    330 
    331 static void svc_callout_free(SVCMASTERXPRT *);
    332 static void svc_xprt_qinit(SVCPOOL *, size_t);
    333 static void svc_xprt_qdestroy(SVCPOOL *);
    334 static void svc_thread_creator(SVCPOOL *);
    335 static void svc_creator_signal(SVCPOOL *);
    336 static void svc_creator_signalexit(SVCPOOL *);
    337 static void svc_pool_unregister(struct svc_globals *, SVCPOOL *);
    338 static int svc_run(SVCPOOL *);
    339 
    340 /* ARGSUSED */
    341 static void *
    342 svc_zoneinit(zoneid_t zoneid)
    343 {
    344 	struct svc_globals *svc;
    345 
    346 	svc = kmem_alloc(sizeof (*svc), KM_SLEEP);
    347 	mutex_init(&svc->svc_plock, NULL, MUTEX_DEFAULT, NULL);
    348 	svc->svc_pools = NULL;
    349 	return (svc);
    350 }
    351 
    352 /* ARGSUSED */
    353 static void
    354 svc_zoneshutdown(zoneid_t zoneid, void *arg)
    355 {
    356 	struct svc_globals *svc = arg;
    357 	SVCPOOL *pool;
    358 
    359 	mutex_enter(&svc->svc_plock);
    360 	while ((pool = svc->svc_pools) != NULL) {
    361 		svc_pool_unregister(svc, pool);
    362 	}
    363 	mutex_exit(&svc->svc_plock);
    364 }
    365 
    366 /* ARGSUSED */
    367 static void
    368 svc_zonefini(zoneid_t zoneid, void *arg)
    369 {
    370 	struct svc_globals *svc = arg;
    371 
    372 	ASSERT(svc->svc_pools == NULL);
    373 	mutex_destroy(&svc->svc_plock);
    374 	kmem_free(svc, sizeof (*svc));
    375 }
    376 
    377 /*
    378  * Global SVC init routine.
    379  * Initialize global generic and transport type specific structures
    380  * used by the kernel RPC server side. This routine is called only
    381  * once when the module is being loaded.
    382  */
    383 void
    384 svc_init()
    385 {
    386 	zone_key_create(&svc_zone_key, svc_zoneinit, svc_zoneshutdown,
    387 	    svc_zonefini);
    388 	svc_cots_init();
    389 	svc_clts_init();
    390 }
    391 
    392 /*
    393  * Destroy the SVCPOOL structure.
    394  */
    395 static void
    396 svc_pool_cleanup(SVCPOOL *pool)
    397 {
    398 	ASSERT(pool->p_threads + pool->p_detached_threads == 0);
    399 	ASSERT(pool->p_lcount == 0);
    400 	ASSERT(pool->p_closing);
    401 
    402 	/*
    403 	 * Call the user supplied shutdown function.  This is done
    404 	 * here so the user of the pool will be able to cleanup
    405 	 * service related resources.
    406 	 */
    407 	if (pool->p_shutdown != NULL)
    408 		(pool->p_shutdown)();
    409 
    410 	/* Destroy `xprt-ready' queue */
    411 	svc_xprt_qdestroy(pool);
    412 
    413 	/* Destroy transport list */
    414 	rw_destroy(&pool->p_lrwlock);
    415 
    416 	/* Destroy locks and condition variables */
    417 	mutex_destroy(&pool->p_thread_lock);
    418 	mutex_destroy(&pool->p_req_lock);
    419 	cv_destroy(&pool->p_req_cv);
    420 
    421 	/* Destroy creator's locks and condition variables */
    422 	mutex_destroy(&pool->p_creator_lock);
    423 	cv_destroy(&pool->p_creator_cv);
    424 	mutex_destroy(&pool->p_user_lock);
    425 	cv_destroy(&pool->p_user_cv);
    426 
    427 	/* Free pool structure */
    428 	kmem_free(pool, sizeof (SVCPOOL));
    429 }
    430 
    431 /*
    432  * If all the transports and service threads are already gone
    433  * signal the creator thread to clean up and exit.
    434  */
    435 static bool_t
    436 svc_pool_tryexit(SVCPOOL *pool)
    437 {
    438 	ASSERT(MUTEX_HELD(&pool->p_thread_lock));
    439 	ASSERT(pool->p_closing);
    440 
    441 	if (pool->p_threads + pool->p_detached_threads == 0) {
    442 		rw_enter(&pool->p_lrwlock, RW_READER);
    443 		if (pool->p_lcount == 0) {
    444 			/*
    445 			 * Release the locks before sending a signal.
    446 			 */
    447 			rw_exit(&pool->p_lrwlock);
    448 			mutex_exit(&pool->p_thread_lock);
    449 
    450 			/*
    451 			 * Notify the creator thread to clean up and exit
    452 			 *
    453 			 * NOTICE: No references to the pool beyond this point!
    454 			 *		   The pool is being destroyed.
    455 			 */
    456 			ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
    457 			svc_creator_signalexit(pool);
    458 
    459 			return (TRUE);
    460 		}
    461 		rw_exit(&pool->p_lrwlock);
    462 	}
    463 
    464 	ASSERT(MUTEX_HELD(&pool->p_thread_lock));
    465 	return (FALSE);
    466 }
    467 
    468 /*
    469  * Find a pool with a given id.
    470  */
    471 static SVCPOOL *
    472 svc_pool_find(struct svc_globals *svc, int id)
    473 {
    474 	SVCPOOL *pool;
    475 
    476 	ASSERT(MUTEX_HELD(&svc->svc_plock));
    477 
    478 	/*
    479 	 * Search the list for a pool with a matching id
    480 	 * and register the transport handle with that pool.
    481 	 */
    482 	for (pool = svc->svc_pools; pool; pool = pool->p_next)
    483 		if (pool->p_id == id)
    484 			return (pool);
    485 
    486 	return (NULL);
    487 }
    488 
    489 /*
    490  * PSARC 2003/523 Contract Private Interface
    491  * svc_do_run
    492  * Changes must be reviewed by Solaris File Sharing
    493  * Changes must be communicated to contract-2003-523 (at) sun.com
    494  */
    495 int
    496 svc_do_run(int id)
    497 {
    498 	SVCPOOL *pool;
    499 	int err = 0;
    500 	struct svc_globals *svc;
    501 
    502 	svc = zone_getspecific(svc_zone_key, curproc->p_zone);
    503 	mutex_enter(&svc->svc_plock);
    504 
    505 	pool = svc_pool_find(svc, id);
    506 
    507 	mutex_exit(&svc->svc_plock);
    508 
    509 	if (pool == NULL)
    510 		return (ENOENT);
    511 
    512 	/*
    513 	 * Increment counter of pool threads now
    514 	 * that a thread has been created.
    515 	 */
    516 	mutex_enter(&pool->p_thread_lock);
    517 	pool->p_threads++;
    518 	mutex_exit(&pool->p_thread_lock);
    519 
    520 	/* Give work to the new thread. */
    521 	err = svc_run(pool);
    522 
    523 	return (err);
    524 }
    525 
    526 /*
    527  * Unregister a pool from the pool list.
    528  * Set the closing state. If all the transports and service threads
    529  * are already gone signal the creator thread to clean up and exit.
    530  */
    531 static void
    532 svc_pool_unregister(struct svc_globals *svc, SVCPOOL *pool)
    533 {
    534 	SVCPOOL *next = pool->p_next;
    535 	SVCPOOL *prev = pool->p_prev;
    536 
    537 	ASSERT(MUTEX_HELD(&svc->svc_plock));
    538 
    539 	/* Remove from the list */
    540 	if (pool == svc->svc_pools)
    541 		svc->svc_pools = next;
    542 	if (next)
    543 		next->p_prev = prev;
    544 	if (prev)
    545 		prev->p_next = next;
    546 	pool->p_next = pool->p_prev = NULL;
    547 
    548 	/*
    549 	 * Offline the pool. Mark the pool as closing.
    550 	 * If there are no transports in this pool notify
    551 	 * the creator thread to clean it up and exit.
    552 	 */
    553 	mutex_enter(&pool->p_thread_lock);
    554 	if (pool->p_offline != NULL)
    555 		(pool->p_offline)();
    556 	pool->p_closing = TRUE;
    557 	if (svc_pool_tryexit(pool))
    558 		return;
    559 	mutex_exit(&pool->p_thread_lock);
    560 }
    561 
    562 /*
    563  * Register a pool with a given id in the global doubly linked pool list.
    564  * - if there is a pool with the same id in the list then unregister it
    565  * - insert the new pool into the list.
    566  */
    567 static void
    568 svc_pool_register(struct svc_globals *svc, SVCPOOL *pool, int id)
    569 {
    570 	SVCPOOL *old_pool;
    571 
    572 	/*
    573 	 * If there is a pool with the same id then remove it from
    574 	 * the list and mark the pool as closing.
    575 	 */
    576 	mutex_enter(&svc->svc_plock);
    577 
    578 	if (old_pool = svc_pool_find(svc, id))
    579 		svc_pool_unregister(svc, old_pool);
    580 
    581 	/* Insert into the doubly linked list */
    582 	pool->p_id = id;
    583 	pool->p_next = svc->svc_pools;
    584 	pool->p_prev = NULL;
    585 	if (svc->svc_pools)
    586 		svc->svc_pools->p_prev = pool;
    587 	svc->svc_pools = pool;
    588 
    589 	mutex_exit(&svc->svc_plock);
    590 }
    591 
    592 /*
    593  * Initialize a newly created pool structure
    594  */
    595 static int
    596 svc_pool_init(SVCPOOL *pool, uint_t maxthreads, uint_t redline,
    597 	uint_t qsize, uint_t timeout, uint_t stksize, uint_t max_same_xprt)
    598 {
    599 	klwp_t *lwp = ttolwp(curthread);
    600 
    601 	ASSERT(pool);
    602 
    603 	if (maxthreads == 0)
    604 		maxthreads = svc_default_maxthreads;
    605 	if (redline == 0)
    606 		redline = svc_default_redline;
    607 	if (qsize == 0)
    608 		qsize = svc_default_qsize;
    609 	if (timeout == 0)
    610 		timeout = svc_default_timeout;
    611 	if (stksize == 0)
    612 		stksize = svc_default_stksize;
    613 	if (max_same_xprt == 0)
    614 		max_same_xprt = svc_default_max_same_xprt;
    615 
    616 	if (maxthreads < redline)
    617 		return (EINVAL);
    618 
    619 	/* Allocate and initialize the `xprt-ready' queue */
    620 	svc_xprt_qinit(pool, qsize);
    621 
    622 	/* Initialize doubly-linked xprt list */
    623 	rw_init(&pool->p_lrwlock, NULL, RW_DEFAULT, NULL);
    624 
    625 	/*
    626 	 * Setting lwp_childstksz on the current lwp so that
    627 	 * descendants of this lwp get the modified stacksize, if
    628 	 * it is defined. It is important that either this lwp or
    629 	 * one of its descendants do the actual servicepool thread
    630 	 * creation to maintain the stacksize inheritance.
    631 	 */
    632 	if (lwp != NULL)
    633 		lwp->lwp_childstksz = stksize;
    634 
    635 	/* Initialize thread limits, locks and condition variables */
    636 	pool->p_maxthreads = maxthreads;
    637 	pool->p_redline = redline;
    638 	pool->p_timeout = timeout * hz;
    639 	pool->p_stksize = stksize;
    640 	pool->p_max_same_xprt = max_same_xprt;
    641 	mutex_init(&pool->p_thread_lock, NULL, MUTEX_DEFAULT, NULL);
    642 	mutex_init(&pool->p_req_lock, NULL, MUTEX_DEFAULT, NULL);
    643 	cv_init(&pool->p_req_cv, NULL, CV_DEFAULT, NULL);
    644 
    645 	/* Initialize userland creator */
    646 	pool->p_user_exit = FALSE;
    647 	pool->p_signal_create_thread = FALSE;
    648 	pool->p_user_waiting = FALSE;
    649 	mutex_init(&pool->p_user_lock, NULL, MUTEX_DEFAULT, NULL);
    650 	cv_init(&pool->p_user_cv, NULL, CV_DEFAULT, NULL);
    651 
    652 	/* Initialize the creator and start the creator thread */
    653 	pool->p_creator_exit = FALSE;
    654 	mutex_init(&pool->p_creator_lock, NULL, MUTEX_DEFAULT, NULL);
    655 	cv_init(&pool->p_creator_cv, NULL, CV_DEFAULT, NULL);
    656 
    657 	(void) zthread_create(NULL, pool->p_stksize, svc_thread_creator,
    658 	    pool, 0, minclsyspri);
    659 
    660 	return (0);
    661 }
    662 
    663 /*
    664  * PSARC 2003/523 Contract Private Interface
    665  * svc_pool_create
    666  * Changes must be reviewed by Solaris File Sharing
    667  * Changes must be communicated to contract-2003-523 (at) sun.com
    668  *
    669  * Create an kernel RPC server-side thread/transport pool.
    670  *
    671  * This is public interface for creation of a server RPC thread pool
    672  * for a given service provider. Transports registered with the pool's id
    673  * will be served by a pool's threads. This function is called from the
    674  * nfssys() system call.
    675  */
    676 int
    677 svc_pool_create(struct svcpool_args *args)
    678 {
    679 	SVCPOOL *pool;
    680 	int error;
    681 	struct svc_globals *svc;
    682 
    683 	/*
    684 	 * Caller should check credentials in a way appropriate
    685 	 * in the context of the call.
    686 	 */
    687 
    688 	svc = zone_getspecific(svc_zone_key, curproc->p_zone);
    689 	/* Allocate a new pool */
    690 	pool = kmem_zalloc(sizeof (SVCPOOL), KM_SLEEP);
    691 
    692 	/*
    693 	 * Initialize the pool structure and create a creator thread.
    694 	 */
    695 	error = svc_pool_init(pool, args->maxthreads, args->redline,
    696 	    args->qsize, args->timeout, args->stksize, args->max_same_xprt);
    697 
    698 	if (error) {
    699 		kmem_free(pool, sizeof (SVCPOOL));
    700 		return (error);
    701 	}
    702 
    703 	/* Register the pool with the global pool list */
    704 	svc_pool_register(svc, pool, args->id);
    705 
    706 	return (0);
    707 }
    708 
    709 int
    710 svc_pool_control(int id, int cmd, void *arg)
    711 {
    712 	SVCPOOL *pool;
    713 	struct svc_globals *svc;
    714 
    715 	svc = zone_getspecific(svc_zone_key, curproc->p_zone);
    716 
    717 	switch (cmd) {
    718 	case SVCPSET_SHUTDOWN_PROC:
    719 		/*
    720 		 * Search the list for a pool with a matching id
    721 		 * and register the transport handle with that pool.
    722 		 */
    723 		mutex_enter(&svc->svc_plock);
    724 
    725 		if ((pool = svc_pool_find(svc, id)) == NULL) {
    726 			mutex_exit(&svc->svc_plock);
    727 			return (ENOENT);
    728 		}
    729 		/*
    730 		 * Grab the transport list lock before releasing the
    731 		 * pool list lock
    732 		 */
    733 		rw_enter(&pool->p_lrwlock, RW_WRITER);
    734 		mutex_exit(&svc->svc_plock);
    735 
    736 		pool->p_shutdown = *((void (*)())arg);
    737 
    738 		rw_exit(&pool->p_lrwlock);
    739 
    740 		return (0);
    741 	case SVCPSET_UNREGISTER_PROC:
    742 		/*
    743 		 * Search the list for a pool with a matching id
    744 		 * and register the unregister callback handle with that pool.
    745 		 */
    746 		mutex_enter(&svc->svc_plock);
    747 
    748 		if ((pool = svc_pool_find(svc, id)) == NULL) {
    749 			mutex_exit(&svc->svc_plock);
    750 			return (ENOENT);
    751 		}
    752 		/*
    753 		 * Grab the transport list lock before releasing the
    754 		 * pool list lock
    755 		 */
    756 		rw_enter(&pool->p_lrwlock, RW_WRITER);
    757 		mutex_exit(&svc->svc_plock);
    758 
    759 		pool->p_offline = *((void (*)())arg);
    760 
    761 		rw_exit(&pool->p_lrwlock);
    762 
    763 		return (0);
    764 	default:
    765 		return (EINVAL);
    766 	}
    767 }
    768 
    769 /*
    770  * Pool's transport list manipulation routines.
    771  * - svc_xprt_register()
    772  * - svc_xprt_unregister()
    773  *
    774  * svc_xprt_register() is called from svc_tli_kcreate() to
    775  * insert a new master transport handle into the doubly linked
    776  * list of server transport handles (one list per pool).
    777  *
    778  * The list is used by svc_poll(), when it operates in `drain'
    779  * mode, to search for a next transport with a pending request.
    780  */
    781 
    782 int
    783 svc_xprt_register(SVCMASTERXPRT *xprt, int id)
    784 {
    785 	SVCMASTERXPRT *prev, *next;
    786 	SVCPOOL *pool;
    787 	struct svc_globals *svc;
    788 
    789 	svc = zone_getspecific(svc_zone_key, curproc->p_zone);
    790 	/*
    791 	 * Search the list for a pool with a matching id
    792 	 * and register the transport handle with that pool.
    793 	 */
    794 	mutex_enter(&svc->svc_plock);
    795 
    796 	if ((pool = svc_pool_find(svc, id)) == NULL) {
    797 		mutex_exit(&svc->svc_plock);
    798 		return (ENOENT);
    799 	}
    800 
    801 	/* Grab the transport list lock before releasing the pool list lock */
    802 	rw_enter(&pool->p_lrwlock, RW_WRITER);
    803 	mutex_exit(&svc->svc_plock);
    804 
    805 	/* Don't register new transports when the pool is in closing state */
    806 	if (pool->p_closing) {
    807 		rw_exit(&pool->p_lrwlock);
    808 		return (EBUSY);
    809 	}
    810 
    811 	/*
    812 	 * Initialize xp_pool to point to the pool.
    813 	 * We don't want to go through the pool list every time.
    814 	 */
    815 	xprt->xp_pool = pool;
    816 
    817 	/*
    818 	 * Insert a transport handle into the list.
    819 	 * The list head points to the most recently inserted transport.
    820 	 */
    821 	if (pool->p_lhead == NULL)
    822 		pool->p_lhead = xprt->xp_prev = xprt->xp_next = xprt;
    823 	else {
    824 		next = pool->p_lhead;
    825 		prev = pool->p_lhead->xp_prev;
    826 
    827 		xprt->xp_next = next;
    828 		xprt->xp_prev = prev;
    829 
    830 		pool->p_lhead = prev->xp_next = next->xp_prev = xprt;
    831 	}
    832 
    833 	/* Increment the transports count */
    834 	pool->p_lcount++;
    835 
    836 	rw_exit(&pool->p_lrwlock);
    837 	return (0);
    838 }
    839 
    840 /*
    841  * Called from svc_xprt_cleanup() to remove a master transport handle
    842  * from the pool's list of server transports (when a transport is
    843  * being destroyed).
    844  */
    845 void
    846 svc_xprt_unregister(SVCMASTERXPRT *xprt)
    847 {
    848 	SVCPOOL *pool = xprt->xp_pool;
    849 
    850 	/*
    851 	 * Unlink xprt from the list.
    852 	 * If the list head points to this xprt then move it
    853 	 * to the next xprt or reset to NULL if this is the last
    854 	 * xprt in the list.
    855 	 */
    856 	rw_enter(&pool->p_lrwlock, RW_WRITER);
    857 
    858 	if (xprt == xprt->xp_next)
    859 		pool->p_lhead = NULL;
    860 	else {
    861 		SVCMASTERXPRT *next = xprt->xp_next;
    862 		SVCMASTERXPRT *prev = xprt->xp_prev;
    863 
    864 		next->xp_prev = prev;
    865 		prev->xp_next = next;
    866 
    867 		if (pool->p_lhead == xprt)
    868 			pool->p_lhead = next;
    869 	}
    870 
    871 	xprt->xp_next = xprt->xp_prev = NULL;
    872 
    873 	/* Decrement list count */
    874 	pool->p_lcount--;
    875 
    876 	rw_exit(&pool->p_lrwlock);
    877 }
    878 
    879 static void
    880 svc_xprt_qdestroy(SVCPOOL *pool)
    881 {
    882 	mutex_destroy(&pool->p_qend_lock);
    883 	kmem_free(pool->p_qbody, pool->p_qsize * sizeof (__SVCXPRT_QNODE));
    884 }
    885 
    886 /*
    887  * Initialize an `xprt-ready' queue for a given pool.
    888  */
    889 static void
    890 svc_xprt_qinit(SVCPOOL *pool, size_t qsize)
    891 {
    892 	int i;
    893 
    894 	pool->p_qsize = qsize;
    895 	pool->p_qbody = kmem_zalloc(pool->p_qsize * sizeof (__SVCXPRT_QNODE),
    896 	    KM_SLEEP);
    897 
    898 	for (i = 0; i < pool->p_qsize - 1; i++)
    899 		pool->p_qbody[i].q_next = &(pool->p_qbody[i+1]);
    900 
    901 	pool->p_qbody[pool->p_qsize-1].q_next = &(pool->p_qbody[0]);
    902 	pool->p_qtop = &(pool->p_qbody[0]);
    903 	pool->p_qend = &(pool->p_qbody[0]);
    904 
    905 	mutex_init(&pool->p_qend_lock, NULL, MUTEX_DEFAULT, NULL);
    906 }
    907 
    908 /*
    909  * Called from the svc_queuereq() interrupt routine to queue
    910  * a hint for svc_poll() which transport has a pending request.
    911  * - insert a pointer to xprt into the xprt-ready queue (FIFO)
    912  * - if the xprt-ready queue is full turn the overflow flag on.
    913  *
    914  * NOTICE: pool->p_qtop is protected by the the pool's request lock
    915  * and the caller (svc_queuereq()) must hold the lock.
    916  */
    917 static void
    918 svc_xprt_qput(SVCPOOL *pool, SVCMASTERXPRT *xprt)
    919 {
    920 	ASSERT(MUTEX_HELD(&pool->p_req_lock));
    921 
    922 	/* If the overflow flag is there is nothing we can do */
    923 	if (pool->p_qoverflow)
    924 		return;
    925 
    926 	/* If the queue is full turn the overflow flag on and exit */
    927 	if (pool->p_qtop->q_next == pool->p_qend) {
    928 		mutex_enter(&pool->p_qend_lock);
    929 		if (pool->p_qtop->q_next == pool->p_qend) {
    930 			pool->p_qoverflow = TRUE;
    931 			mutex_exit(&pool->p_qend_lock);
    932 			return;
    933 		}
    934 		mutex_exit(&pool->p_qend_lock);
    935 	}
    936 
    937 	/* Insert a hint and move pool->p_qtop */
    938 	pool->p_qtop->q_xprt = xprt;
    939 	pool->p_qtop = pool->p_qtop->q_next;
    940 }
    941 
    942 /*
    943  * Called from svc_poll() to get a hint which transport has a
    944  * pending request. Returns a pointer to a transport or NULL if the
    945  * `xprt-ready' queue is empty.
    946  *
    947  * Since we do not acquire the pool's request lock while checking if
    948  * the queue is empty we may miss a request that is just being delivered.
    949  * However this is ok since svc_poll() will retry again until the
    950  * count indicates that there are pending requests for this pool.
    951  */
    952 static SVCMASTERXPRT *
    953 svc_xprt_qget(SVCPOOL *pool)
    954 {
    955 	SVCMASTERXPRT *xprt;
    956 
    957 	mutex_enter(&pool->p_qend_lock);
    958 	do {
    959 		/*
    960 		 * If the queue is empty return NULL.
    961 		 * Since we do not acquire the pool's request lock which
    962 		 * protects pool->p_qtop this is not exact check. However,
    963 		 * this is safe - if we miss a request here svc_poll()
    964 		 * will retry again.
    965 		 */
    966 		if (pool->p_qend == pool->p_qtop) {
    967 			mutex_exit(&pool->p_qend_lock);
    968 			return (NULL);
    969 		}
    970 
    971 		/* Get a hint and move pool->p_qend */
    972 		xprt = pool->p_qend->q_xprt;
    973 		pool->p_qend = pool->p_qend->q_next;
    974 
    975 		/* Skip fields deleted by svc_xprt_qdelete()	 */
    976 	} while (xprt == NULL);
    977 	mutex_exit(&pool->p_qend_lock);
    978 
    979 	return (xprt);
    980 }
    981 
    982 /*
    983  * Delete all the references to a transport handle that
    984  * is being destroyed from the xprt-ready queue.
    985  * Deleted pointers are replaced with NULLs.
    986  */
    987 static void
    988 svc_xprt_qdelete(SVCPOOL *pool, SVCMASTERXPRT *xprt)
    989 {
    990 	__SVCXPRT_QNODE *q = pool->p_qend;
    991 	__SVCXPRT_QNODE *qtop = pool->p_qtop;
    992 
    993 	/*
    994 	 * Delete all the references to xprt between the current
    995 	 * position of pool->p_qend and current pool->p_qtop.
    996 	 */
    997 	for (;;) {
    998 		if (q->q_xprt == xprt)
    999 			q->q_xprt = NULL;
   1000 		if (q == qtop)
   1001 			return;
   1002 		q = q->q_next;
   1003 	}
   1004 }
   1005 
   1006 /*
   1007  * Destructor for a master server transport handle.
   1008  * - if there are no more non-detached threads linked to this transport
   1009  *   then, if requested, call xp_closeproc (we don't wait for detached
   1010  *   threads linked to this transport to complete).
   1011  * - if there are no more threads linked to this
   1012  *   transport then
   1013  *   a) remove references to this transport from the xprt-ready queue
   1014  *   b) remove a reference to this transport from the pool's transport list
   1015  *   c) call a transport specific `destroy' function
   1016  *   d) cancel remaining thread reservations.
   1017  *
   1018  * NOTICE: Caller must hold the transport's thread lock.
   1019  */
   1020 static void
   1021 svc_xprt_cleanup(SVCMASTERXPRT *xprt, bool_t detached)
   1022 {
   1023 	ASSERT(MUTEX_HELD(&xprt->xp_thread_lock));
   1024 	ASSERT(xprt->xp_wq == NULL);
   1025 
   1026 	/*
   1027 	 * If called from the last non-detached thread
   1028 	 * it should call the closeproc on this transport.
   1029 	 */
   1030 	if (!detached && xprt->xp_threads == 0 && xprt->xp_closeproc) {
   1031 		(*(xprt->xp_closeproc)) (xprt);
   1032 	}
   1033 
   1034 	if (xprt->xp_threads + xprt->xp_detached_threads > 0)
   1035 		mutex_exit(&xprt->xp_thread_lock);
   1036 	else {
   1037 		/* Remove references to xprt from the `xprt-ready' queue */
   1038 		svc_xprt_qdelete(xprt->xp_pool, xprt);
   1039 
   1040 		/* Unregister xprt from the pool's transport list */
   1041 		svc_xprt_unregister(xprt);
   1042 		svc_callout_free(xprt);
   1043 		SVC_DESTROY(xprt);
   1044 	}
   1045 }
   1046 
   1047 /*
   1048  * Find a dispatch routine for a given prog/vers pair.
   1049  * This function is called from svc_getreq() to search the callout
   1050  * table for an entry with a matching RPC program number `prog'
   1051  * and a version range that covers `vers'.
   1052  * - if it finds a matching entry it returns pointer to the dispatch routine
   1053  * - otherwise it returns NULL and, if `minp' or `maxp' are not NULL,
   1054  *   fills them with, respectively, lowest version and highest version
   1055  *   supported for the program `prog'
   1056  */
   1057 static SVC_DISPATCH *
   1058 svc_callout_find(SVCXPRT *xprt, rpcprog_t prog, rpcvers_t vers,
   1059     rpcvers_t *vers_min, rpcvers_t *vers_max)
   1060 {
   1061 	SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
   1062 	int i;
   1063 
   1064 	*vers_min = ~(rpcvers_t)0;
   1065 	*vers_max = 0;
   1066 
   1067 	for (i = 0; i < sct->sct_size; i++) {
   1068 		SVC_CALLOUT *sc = &sct->sct_sc[i];
   1069 
   1070 		if (prog == sc->sc_prog) {
   1071 			if (vers >= sc->sc_versmin && vers <= sc->sc_versmax)
   1072 				return (sc->sc_dispatch);
   1073 
   1074 			if (*vers_max < sc->sc_versmax)
   1075 				*vers_max = sc->sc_versmax;
   1076 			if (*vers_min > sc->sc_versmin)
   1077 				*vers_min = sc->sc_versmin;
   1078 		}
   1079 	}
   1080 
   1081 	return (NULL);
   1082 }
   1083 
   1084 /*
   1085  * Optionally free callout table allocated for this transport by
   1086  * the service provider.
   1087  */
   1088 static void
   1089 svc_callout_free(SVCMASTERXPRT *xprt)
   1090 {
   1091 	SVC_CALLOUT_TABLE *sct = xprt->xp_sct;
   1092 
   1093 	if (sct->sct_free) {
   1094 		kmem_free(sct->sct_sc, sct->sct_size * sizeof (SVC_CALLOUT));
   1095 		kmem_free(sct, sizeof (SVC_CALLOUT_TABLE));
   1096 	}
   1097 }
   1098 
   1099 /*
   1100  * Send a reply to an RPC request
   1101  *
   1102  * PSARC 2003/523 Contract Private Interface
   1103  * svc_sendreply
   1104  * Changes must be reviewed by Solaris File Sharing
   1105  * Changes must be communicated to contract-2003-523 (at) sun.com
   1106  */
   1107 bool_t
   1108 svc_sendreply(const SVCXPRT *clone_xprt, const xdrproc_t xdr_results,
   1109     const caddr_t xdr_location)
   1110 {
   1111 	struct rpc_msg rply;
   1112 
   1113 	rply.rm_direction = REPLY;
   1114 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
   1115 	rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
   1116 	rply.acpted_rply.ar_stat = SUCCESS;
   1117 	rply.acpted_rply.ar_results.where = xdr_location;
   1118 	rply.acpted_rply.ar_results.proc = xdr_results;
   1119 
   1120 	return (SVC_REPLY((SVCXPRT *)clone_xprt, &rply));
   1121 }
   1122 
   1123 /*
   1124  * No procedure error reply
   1125  *
   1126  * PSARC 2003/523 Contract Private Interface
   1127  * svcerr_noproc
   1128  * Changes must be reviewed by Solaris File Sharing
   1129  * Changes must be communicated to contract-2003-523 (at) sun.com
   1130  */
   1131 void
   1132 svcerr_noproc(const SVCXPRT *clone_xprt)
   1133 {
   1134 	struct rpc_msg rply;
   1135 
   1136 	rply.rm_direction = REPLY;
   1137 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
   1138 	rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
   1139 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
   1140 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1141 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1142 }
   1143 
   1144 /*
   1145  * Can't decode arguments error reply
   1146  *
   1147  * PSARC 2003/523 Contract Private Interface
   1148  * svcerr_decode
   1149  * Changes must be reviewed by Solaris File Sharing
   1150  * Changes must be communicated to contract-2003-523 (at) sun.com
   1151  */
   1152 void
   1153 svcerr_decode(const SVCXPRT *clone_xprt)
   1154 {
   1155 	struct rpc_msg rply;
   1156 
   1157 	rply.rm_direction = REPLY;
   1158 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
   1159 	rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
   1160 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
   1161 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1162 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1163 }
   1164 
   1165 /*
   1166  * Some system error
   1167  */
   1168 void
   1169 svcerr_systemerr(const SVCXPRT *clone_xprt)
   1170 {
   1171 	struct rpc_msg rply;
   1172 
   1173 	rply.rm_direction = REPLY;
   1174 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
   1175 	rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
   1176 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
   1177 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1178 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1179 }
   1180 
   1181 /*
   1182  * Authentication error reply
   1183  */
   1184 void
   1185 svcerr_auth(const SVCXPRT *clone_xprt, const enum auth_stat why)
   1186 {
   1187 	struct rpc_msg rply;
   1188 
   1189 	rply.rm_direction = REPLY;
   1190 	rply.rm_reply.rp_stat = MSG_DENIED;
   1191 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
   1192 	rply.rjcted_rply.rj_why = why;
   1193 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1194 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1195 }
   1196 
   1197 /*
   1198  * Authentication too weak error reply
   1199  */
   1200 void
   1201 svcerr_weakauth(const SVCXPRT *clone_xprt)
   1202 {
   1203 	svcerr_auth((SVCXPRT *)clone_xprt, AUTH_TOOWEAK);
   1204 }
   1205 
   1206 /*
   1207  * Authentication error; bad credentials
   1208  */
   1209 void
   1210 svcerr_badcred(const SVCXPRT *clone_xprt)
   1211 {
   1212 	struct rpc_msg rply;
   1213 
   1214 	rply.rm_direction = REPLY;
   1215 	rply.rm_reply.rp_stat = MSG_DENIED;
   1216 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
   1217 	rply.rjcted_rply.rj_why = AUTH_BADCRED;
   1218 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1219 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1220 }
   1221 
   1222 /*
   1223  * Program unavailable error reply
   1224  *
   1225  * PSARC 2003/523 Contract Private Interface
   1226  * svcerr_noprog
   1227  * Changes must be reviewed by Solaris File Sharing
   1228  * Changes must be communicated to contract-2003-523 (at) sun.com
   1229  */
   1230 void
   1231 svcerr_noprog(const SVCXPRT *clone_xprt)
   1232 {
   1233 	struct rpc_msg rply;
   1234 
   1235 	rply.rm_direction = REPLY;
   1236 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
   1237 	rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
   1238 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
   1239 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1240 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1241 }
   1242 
   1243 /*
   1244  * Program version mismatch error reply
   1245  *
   1246  * PSARC 2003/523 Contract Private Interface
   1247  * svcerr_progvers
   1248  * Changes must be reviewed by Solaris File Sharing
   1249  * Changes must be communicated to contract-2003-523 (at) sun.com
   1250  */
   1251 void
   1252 svcerr_progvers(const SVCXPRT *clone_xprt,
   1253     const rpcvers_t low_vers, const rpcvers_t high_vers)
   1254 {
   1255 	struct rpc_msg rply;
   1256 
   1257 	rply.rm_direction = REPLY;
   1258 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
   1259 	rply.acpted_rply.ar_verf = clone_xprt->xp_verf;
   1260 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
   1261 	rply.acpted_rply.ar_vers.low = low_vers;
   1262 	rply.acpted_rply.ar_vers.high = high_vers;
   1263 	SVC_FREERES((SVCXPRT *)clone_xprt);
   1264 	SVC_REPLY((SVCXPRT *)clone_xprt, &rply);
   1265 }
   1266 
   1267 /*
   1268  * Get server side input from some transport.
   1269  *
   1270  * Statement of authentication parameters management:
   1271  * This function owns and manages all authentication parameters, specifically
   1272  * the "raw" parameters (msg.rm_call.cb_cred and msg.rm_call.cb_verf) and
   1273  * the "cooked" credentials (rqst->rq_clntcred).
   1274  * However, this function does not know the structure of the cooked
   1275  * credentials, so it make the following assumptions:
   1276  *   a) the structure is contiguous (no pointers), and
   1277  *   b) the cred structure size does not exceed RQCRED_SIZE bytes.
   1278  * In all events, all three parameters are freed upon exit from this routine.
   1279  * The storage is trivially managed on the call stack in user land, but
   1280  * is malloced in kernel land.
   1281  *
   1282  * Note: the xprt's xp_svc_lock is not held while the service's dispatch
   1283  * routine is running.	If we decide to implement svc_unregister(), we'll
   1284  * need to decide whether it's okay for a thread to unregister a service
   1285  * while a request is being processed.	If we decide that this is a
   1286  * problem, we can probably use some sort of reference counting scheme to
   1287  * keep the callout entry from going away until the request has completed.
   1288  */
   1289 static void
   1290 svc_getreq(
   1291 	SVCXPRT *clone_xprt,	/* clone transport handle */
   1292 	mblk_t *mp)
   1293 {
   1294 	struct rpc_msg msg;
   1295 	struct svc_req r;
   1296 	char  *cred_area;	/* too big to allocate on call stack */
   1297 
   1298 	TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_START,
   1299 	    "svc_getreq_start:");
   1300 
   1301 	ASSERT(clone_xprt->xp_master != NULL);
   1302 	ASSERT(!is_system_labeled() || msg_getcred(mp, NULL) != NULL ||
   1303 	    mp->b_datap->db_type != M_DATA);
   1304 
   1305 	/*
   1306 	 * Firstly, allocate the authentication parameters' storage
   1307 	 */
   1308 	mutex_enter(&rqcred_lock);
   1309 	if (rqcred_head) {
   1310 		cred_area = rqcred_head;
   1311 
   1312 		/* LINTED pointer alignment */
   1313 		rqcred_head = *(caddr_t *)rqcred_head;
   1314 		mutex_exit(&rqcred_lock);
   1315 	} else {
   1316 		mutex_exit(&rqcred_lock);
   1317 		cred_area = kmem_alloc(2 * MAX_AUTH_BYTES + RQCRED_SIZE,
   1318 		    KM_SLEEP);
   1319 	}
   1320 	msg.rm_call.cb_cred.oa_base = cred_area;
   1321 	msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
   1322 	r.rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
   1323 
   1324 	/*
   1325 	 * underlying transport recv routine may modify mblk data
   1326 	 * and make it difficult to extract label afterwards. So
   1327 	 * get the label from the raw mblk data now.
   1328 	 */
   1329 	if (is_system_labeled()) {
   1330 		cred_t *cr;
   1331 
   1332 		r.rq_label = kmem_alloc(sizeof (bslabel_t), KM_SLEEP);
   1333 		cr = msg_getcred(mp, NULL);
   1334 		ASSERT(cr != NULL);
   1335 
   1336 		bcopy(label2bslabel(crgetlabel(cr)), r.rq_label,
   1337 		    sizeof (bslabel_t));
   1338 	} else {
   1339 		r.rq_label = NULL;
   1340 	}
   1341 
   1342 	/*
   1343 	 * Now receive a message from the transport.
   1344 	 */
   1345 	if (SVC_RECV(clone_xprt, mp, &msg)) {
   1346 		void (*dispatchroutine) (struct svc_req *, SVCXPRT *);
   1347 		rpcvers_t vers_min;
   1348 		rpcvers_t vers_max;
   1349 		bool_t no_dispatch;
   1350 		enum auth_stat why;
   1351 
   1352 		/*
   1353 		 * Find the registered program and call its
   1354 		 * dispatch routine.
   1355 		 */
   1356 		r.rq_xprt = clone_xprt;
   1357 		r.rq_prog = msg.rm_call.cb_prog;
   1358 		r.rq_vers = msg.rm_call.cb_vers;
   1359 		r.rq_proc = msg.rm_call.cb_proc;
   1360 		r.rq_cred = msg.rm_call.cb_cred;
   1361 
   1362 		/*
   1363 		 * First authenticate the message.
   1364 		 */
   1365 		TRACE_0(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_START,
   1366 		    "svc_getreq_auth_start:");
   1367 		if ((why = sec_svc_msg(&r, &msg, &no_dispatch)) != AUTH_OK) {
   1368 			TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
   1369 			    "svc_getreq_auth_end:(%S)", "failed");
   1370 			svcerr_auth(clone_xprt, why);
   1371 			/*
   1372 			 * Free the arguments.
   1373 			 */
   1374 			(void) SVC_FREEARGS(clone_xprt, NULL, NULL);
   1375 		} else if (no_dispatch) {
   1376 			/*
   1377 			 * XXX - when bug id 4053736 is done, remove
   1378 			 * the SVC_FREEARGS() call.
   1379 			 */
   1380 			(void) SVC_FREEARGS(clone_xprt, NULL, NULL);
   1381 		} else {
   1382 			TRACE_1(TR_FAC_KRPC, TR_SVC_GETREQ_AUTH_END,
   1383 			    "svc_getreq_auth_end:(%S)", "good");
   1384 
   1385 			dispatchroutine = svc_callout_find(clone_xprt,
   1386 			    r.rq_prog, r.rq_vers, &vers_min, &vers_max);
   1387 
   1388 			if (dispatchroutine) {
   1389 				(*dispatchroutine) (&r, clone_xprt);
   1390 			} else {
   1391 				/*
   1392 				 * If we got here, the program or version
   1393 				 * is not served ...
   1394 				 */
   1395 				if (vers_max == 0 ||
   1396 				    version_keepquiet(clone_xprt))
   1397 					svcerr_noprog(clone_xprt);
   1398 				else
   1399 					svcerr_progvers(clone_xprt, vers_min,
   1400 					    vers_max);
   1401 
   1402 				/*
   1403 				 * Free the arguments. For successful calls
   1404 				 * this is done by the dispatch routine.
   1405 				 */
   1406 				(void) SVC_FREEARGS(clone_xprt, NULL, NULL);
   1407 				/* Fall through to ... */
   1408 			}
   1409 			/*
   1410 			 * Call cleanup procedure for RPCSEC_GSS.
   1411 			 * This is a hack since there is currently no
   1412 			 * op, such as SVC_CLEANAUTH. rpc_gss_cleanup
   1413 			 * should only be called for a non null proc.
   1414 			 * Null procs in RPC GSS are overloaded to
   1415 			 * provide context setup and control. The main
   1416 			 * purpose of rpc_gss_cleanup is to decrement the
   1417 			 * reference count associated with the cached
   1418 			 * GSS security context. We should never get here
   1419 			 * for an RPCSEC_GSS null proc since *no_dispatch
   1420 			 * would have been set to true from sec_svc_msg above.
   1421 			 */
   1422 			if (r.rq_cred.oa_flavor == RPCSEC_GSS)
   1423 				rpc_gss_cleanup(clone_xprt);
   1424 		}
   1425 	}
   1426 
   1427 	if (r.rq_label != NULL)
   1428 		kmem_free(r.rq_label, sizeof (bslabel_t));
   1429 
   1430 	/*
   1431 	 * Free authentication parameters' storage
   1432 	 */
   1433 	mutex_enter(&rqcred_lock);
   1434 	/* LINTED pointer alignment */
   1435 	*(caddr_t *)cred_area = rqcred_head;
   1436 	rqcred_head = cred_area;
   1437 	mutex_exit(&rqcred_lock);
   1438 }
   1439 
   1440 /*
   1441  * Allocate new clone transport handle.
   1442  */
   1443 SVCXPRT *
   1444 svc_clone_init(void)
   1445 {
   1446 	SVCXPRT *clone_xprt;
   1447 
   1448 	clone_xprt = kmem_zalloc(sizeof (SVCXPRT), KM_SLEEP);
   1449 	clone_xprt->xp_cred = crget();
   1450 	return (clone_xprt);
   1451 }
   1452 
   1453 /*
   1454  * Free memory allocated by svc_clone_init.
   1455  */
   1456 void
   1457 svc_clone_free(SVCXPRT *clone_xprt)
   1458 {
   1459 	/* Fre credentials from crget() */
   1460 	if (clone_xprt->xp_cred)
   1461 		crfree(clone_xprt->xp_cred);
   1462 	kmem_free(clone_xprt, sizeof (SVCXPRT));
   1463 }
   1464 
   1465 /*
   1466  * Link a per-thread clone transport handle to a master
   1467  * - increment a thread reference count on the master
   1468  * - copy some of the master's fields to the clone
   1469  * - call a transport specific clone routine.
   1470  */
   1471 void
   1472 svc_clone_link(SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
   1473 {
   1474 	cred_t *cred = clone_xprt->xp_cred;
   1475 
   1476 	ASSERT(cred);
   1477 
   1478 	/*
   1479 	 * Bump up master's thread count.
   1480 	 * Linking a per-thread clone transport handle to a master
   1481 	 * associates a service thread with the master.
   1482 	 */
   1483 	mutex_enter(&xprt->xp_thread_lock);
   1484 	xprt->xp_threads++;
   1485 	mutex_exit(&xprt->xp_thread_lock);
   1486 
   1487 	/* Clear everything */
   1488 	bzero(clone_xprt, sizeof (SVCXPRT));
   1489 
   1490 	/* Set pointer to the master transport stucture */
   1491 	clone_xprt->xp_master = xprt;
   1492 
   1493 	/* Structure copy of all the common fields */
   1494 	clone_xprt->xp_xpc = xprt->xp_xpc;
   1495 
   1496 	/* Restore per-thread fields (xp_cred) */
   1497 	clone_xprt->xp_cred = cred;
   1498 
   1499 
   1500 	/*
   1501 	 * NOTICE: There is no transport-type specific code now.
   1502 	 *	   If you want to add a transport-type specific cloning code
   1503 	 *	   add one more operation (e.g. xp_clone()) to svc_ops,
   1504 	 *	   implement it for each transport type, and call it here
   1505 	 *	   through an appropriate macro (e.g. SVC_CLONE()).
   1506 	 */
   1507 }
   1508 
   1509 /*
   1510  * Unlink a non-detached clone transport handle from a master
   1511  * - decrement a thread reference count on the master
   1512  * - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
   1513  *   if this is the last non-detached/absolute thread on this transport
   1514  *   then it will close/destroy the transport
   1515  * - call transport specific function to destroy the clone handle
   1516  * - clear xp_master to avoid recursion.
   1517  */
   1518 void
   1519 svc_clone_unlink(SVCXPRT *clone_xprt)
   1520 {
   1521 	SVCMASTERXPRT *xprt = clone_xprt->xp_master;
   1522 
   1523 	/* This cannot be a detached thread */
   1524 	ASSERT(!clone_xprt->xp_detached);
   1525 	ASSERT(xprt->xp_threads > 0);
   1526 
   1527 	/* Decrement a reference count on the transport */
   1528 	mutex_enter(&xprt->xp_thread_lock);
   1529 	xprt->xp_threads--;
   1530 
   1531 	/* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
   1532 	if (xprt->xp_wq)
   1533 		mutex_exit(&xprt->xp_thread_lock);
   1534 	else
   1535 		svc_xprt_cleanup(xprt, FALSE);
   1536 
   1537 	/* Call a transport specific clone `destroy' function */
   1538 	SVC_CLONE_DESTROY(clone_xprt);
   1539 
   1540 	/* Clear xp_master */
   1541 	clone_xprt->xp_master = NULL;
   1542 }
   1543 
   1544 /*
   1545  * Unlink a detached clone transport handle from a master
   1546  * - decrement the thread count on the master
   1547  * - if the transport is closing (xp_wq is NULL) call svc_xprt_cleanup();
   1548  *   if this is the last thread on this transport then it will destroy
   1549  *   the transport.
   1550  * - call a transport specific function to destroy the clone handle
   1551  * - clear xp_master to avoid recursion.
   1552  */
   1553 static void
   1554 svc_clone_unlinkdetached(SVCXPRT *clone_xprt)
   1555 {
   1556 	SVCMASTERXPRT *xprt = clone_xprt->xp_master;
   1557 
   1558 	/* This must be a detached thread */
   1559 	ASSERT(clone_xprt->xp_detached);
   1560 	ASSERT(xprt->xp_detached_threads > 0);
   1561 	ASSERT(xprt->xp_threads + xprt->xp_detached_threads > 0);
   1562 
   1563 	/* Grab xprt->xp_thread_lock and decrement link counts */
   1564 	mutex_enter(&xprt->xp_thread_lock);
   1565 	xprt->xp_detached_threads--;
   1566 
   1567 	/* svc_xprt_cleanup() unlocks xp_thread_lock or destroys xprt */
   1568 	if (xprt->xp_wq)
   1569 		mutex_exit(&xprt->xp_thread_lock);
   1570 	else
   1571 		svc_xprt_cleanup(xprt, TRUE);
   1572 
   1573 	/* Call transport specific clone `destroy' function */
   1574 	SVC_CLONE_DESTROY(clone_xprt);
   1575 
   1576 	/* Clear xp_master */
   1577 	clone_xprt->xp_master = NULL;
   1578 }
   1579 
   1580 /*
   1581  * Try to exit a non-detached service thread
   1582  * - check if there are enough threads left
   1583  * - if this thread (ie its clone transport handle) are linked
   1584  *   to a master transport then unlink it
   1585  * - free the clone structure
   1586  * - return to userland for thread exit
   1587  *
   1588  * If this is the last non-detached or the last thread on this
   1589  * transport then the call to svc_clone_unlink() will, respectively,
   1590  * close and/or destroy the transport.
   1591  */
   1592 static void
   1593 svc_thread_exit(SVCPOOL *pool, SVCXPRT *clone_xprt)
   1594 {
   1595 	if (clone_xprt->xp_master)
   1596 		svc_clone_unlink(clone_xprt);
   1597 	svc_clone_free(clone_xprt);
   1598 
   1599 	mutex_enter(&pool->p_thread_lock);
   1600 	pool->p_threads--;
   1601 	if (pool->p_closing && svc_pool_tryexit(pool))
   1602 		/* return -  thread exit will be handled at user level */
   1603 		return;
   1604 	mutex_exit(&pool->p_thread_lock);
   1605 
   1606 	/* return -  thread exit will be handled at user level */
   1607 }
   1608 
   1609 /*
   1610  * Exit a detached service thread that returned to svc_run
   1611  * - decrement the `detached thread' count for the pool
   1612  * - unlink the detached clone transport handle from the master
   1613  * - free the clone structure
   1614  * - return to userland for thread exit
   1615  *
   1616  * If this is the last thread on this transport then the call
   1617  * to svc_clone_unlinkdetached() will destroy the transport.
   1618  */
   1619 static void
   1620 svc_thread_exitdetached(SVCPOOL *pool, SVCXPRT *clone_xprt)
   1621 {
   1622 	/* This must be a detached thread */
   1623 	ASSERT(clone_xprt->xp_master);
   1624 	ASSERT(clone_xprt->xp_detached);
   1625 	ASSERT(!MUTEX_HELD(&pool->p_thread_lock));
   1626 
   1627 	svc_clone_unlinkdetached(clone_xprt);
   1628 	svc_clone_free(clone_xprt);
   1629 
   1630 	mutex_enter(&pool->p_thread_lock);
   1631 
   1632 	ASSERT(pool->p_reserved_threads >= 0);
   1633 	ASSERT(pool->p_detached_threads > 0);
   1634 
   1635 	pool->p_detached_threads--;
   1636 	if (pool->p_closing && svc_pool_tryexit(pool))
   1637 		/* return -  thread exit will be handled at user level */
   1638 		return;
   1639 	mutex_exit(&pool->p_thread_lock);
   1640 
   1641 	/* return -  thread exit will be handled at user level */
   1642 }
   1643 
   1644 /*
   1645  * PSARC 2003/523 Contract Private Interface
   1646  * svc_wait
   1647  * Changes must be reviewed by Solaris File Sharing
   1648  * Changes must be communicated to contract-2003-523 (at) sun.com
   1649  */
   1650 int
   1651 svc_wait(int id)
   1652 {
   1653 	SVCPOOL *pool;
   1654 	int	err = 0;
   1655 	struct svc_globals *svc;
   1656 
   1657 	svc = zone_getspecific(svc_zone_key, curproc->p_zone);
   1658 	mutex_enter(&svc->svc_plock);
   1659 	pool = svc_pool_find(svc, id);
   1660 	mutex_exit(&svc->svc_plock);
   1661 
   1662 	if (pool == NULL)
   1663 		return (ENOENT);
   1664 
   1665 	mutex_enter(&pool->p_user_lock);
   1666 
   1667 	/* Check if there's already a user thread waiting on this pool */
   1668 	if (pool->p_user_waiting) {
   1669 		mutex_exit(&pool->p_user_lock);
   1670 		return (EBUSY);
   1671 	}
   1672 
   1673 	pool->p_user_waiting = TRUE;
   1674 
   1675 	/* Go to sleep, waiting for the signaled flag. */
   1676 	while (!pool->p_signal_create_thread && !pool->p_user_exit) {
   1677 		if (cv_wait_sig(&pool->p_user_cv, &pool->p_user_lock) == 0) {
   1678 			/* Interrupted, return to handle exit or signal */
   1679 			pool->p_user_waiting = FALSE;
   1680 			pool->p_signal_create_thread = FALSE;
   1681 			mutex_exit(&pool->p_user_lock);
   1682 
   1683 			/*
   1684 			 * Thread has been interrupted and therefore
   1685 			 * the service daemon is leaving as well so
   1686 			 * let's go ahead and remove the service
   1687 			 * pool at this time.
   1688 			 */
   1689 			mutex_enter(&svc->svc_plock);
   1690 			svc_pool_unregister(svc, pool);
   1691 			mutex_exit(&svc->svc_plock);
   1692 
   1693 			return (EINTR);
   1694 		}
   1695 	}
   1696 
   1697 	pool->p_signal_create_thread = FALSE;
   1698 	pool->p_user_waiting = FALSE;
   1699 
   1700 	/*
   1701 	 * About to exit the service pool. Set return value
   1702 	 * to let the userland code know our intent. Signal
   1703 	 * svc_thread_creator() so that it can clean up the
   1704 	 * pool structure.
   1705 	 */
   1706 	if (pool->p_user_exit) {
   1707 		err = ECANCELED;
   1708 		cv_signal(&pool->p_user_cv);
   1709 	}
   1710 
   1711 	mutex_exit(&pool->p_user_lock);
   1712 
   1713 	/* Return to userland with error code, for possible thread creation. */
   1714 	return (err);
   1715 }
   1716 
   1717 /*
   1718  * `Service threads' creator thread.
   1719  * The creator thread waits for a signal to create new thread.
   1720  */
   1721 static void
   1722 svc_thread_creator(SVCPOOL *pool)
   1723 {
   1724 	callb_cpr_t cpr_info;	/* CPR info for the creator thread */
   1725 
   1726 	CALLB_CPR_INIT(&cpr_info, &pool->p_creator_lock, callb_generic_cpr,
   1727 	    "svc_thread_creator");
   1728 
   1729 	for (;;) {
   1730 		mutex_enter(&pool->p_creator_lock);
   1731 
   1732 		/* Check if someone set the exit flag */
   1733 		if (pool->p_creator_exit)
   1734 			break;
   1735 
   1736 		/* Clear the `signaled' flag and go asleep */
   1737 		pool->p_creator_signaled = FALSE;
   1738 
   1739 		CALLB_CPR_SAFE_BEGIN(&cpr_info);
   1740 		cv_wait(&pool->p_creator_cv, &pool->p_creator_lock);
   1741 		CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
   1742 
   1743 		/* Check if someone signaled to exit */
   1744 		if (pool->p_creator_exit)
   1745 			break;
   1746 
   1747 		mutex_exit(&pool->p_creator_lock);
   1748 
   1749 		mutex_enter(&pool->p_thread_lock);
   1750 
   1751 		/*
   1752 		 * When the pool is in closing state and all the transports
   1753 		 * are gone the creator should not create any new threads.
   1754 		 */
   1755 		if (pool->p_closing) {
   1756 			rw_enter(&pool->p_lrwlock, RW_READER);
   1757 			if (pool->p_lcount == 0) {
   1758 				rw_exit(&pool->p_lrwlock);
   1759 				mutex_exit(&pool->p_thread_lock);
   1760 				continue;
   1761 			}
   1762 			rw_exit(&pool->p_lrwlock);
   1763 		}
   1764 
   1765 		/*
   1766 		 * Create a new service thread now.
   1767 		 */
   1768 		ASSERT(pool->p_reserved_threads >= 0);
   1769 		ASSERT(pool->p_detached_threads >= 0);
   1770 
   1771 		if (pool->p_threads + pool->p_detached_threads <
   1772 		    pool->p_maxthreads) {
   1773 			/*
   1774 			 * Signal the service pool wait thread
   1775 			 * only if it hasn't already been signaled.
   1776 			 */
   1777 			mutex_enter(&pool->p_user_lock);
   1778 			if (pool->p_signal_create_thread == FALSE) {
   1779 				pool->p_signal_create_thread = TRUE;
   1780 				cv_signal(&pool->p_user_cv);
   1781 			}
   1782 			mutex_exit(&pool->p_user_lock);
   1783 
   1784 		}
   1785 
   1786 		mutex_exit(&pool->p_thread_lock);
   1787 	}
   1788 
   1789 	/*
   1790 	 * Pool is closed. Cleanup and exit.
   1791 	 */
   1792 
   1793 	/* Signal userland creator thread that it can stop now. */
   1794 	mutex_enter(&pool->p_user_lock);
   1795 	pool->p_user_exit = TRUE;
   1796 	cv_broadcast(&pool->p_user_cv);
   1797 	mutex_exit(&pool->p_user_lock);
   1798 
   1799 	/* Wait for svc_wait() to be done with the pool */
   1800 	mutex_enter(&pool->p_user_lock);
   1801 	while (pool->p_user_waiting) {
   1802 		CALLB_CPR_SAFE_BEGIN(&cpr_info);
   1803 		cv_wait(&pool->p_user_cv, &pool->p_user_lock);
   1804 		CALLB_CPR_SAFE_END(&cpr_info, &pool->p_creator_lock);
   1805 	}
   1806 	mutex_exit(&pool->p_user_lock);
   1807 
   1808 	CALLB_CPR_EXIT(&cpr_info);
   1809 	svc_pool_cleanup(pool);
   1810 	zthread_exit();
   1811 }
   1812 
   1813 /*
   1814  * If the creator thread  is idle signal it to create
   1815  * a new service thread.
   1816  */
   1817 static void
   1818 svc_creator_signal(SVCPOOL *pool)
   1819 {
   1820 	mutex_enter(&pool->p_creator_lock);
   1821 	if (pool->p_creator_signaled == FALSE) {
   1822 		pool->p_creator_signaled = TRUE;
   1823 		cv_signal(&pool->p_creator_cv);
   1824 	}
   1825 	mutex_exit(&pool->p_creator_lock);
   1826 }
   1827 
   1828 /*
   1829  * Notify the creator thread to clean up and exit.
   1830  */
   1831 static void
   1832 svc_creator_signalexit(SVCPOOL *pool)
   1833 {
   1834 	mutex_enter(&pool->p_creator_lock);
   1835 	pool->p_creator_exit = TRUE;
   1836 	cv_signal(&pool->p_creator_cv);
   1837 	mutex_exit(&pool->p_creator_lock);
   1838 }
   1839 
   1840 /*
   1841  * Polling part of the svc_run().
   1842  * - search for a transport with a pending request
   1843  * - when one is found then latch the request lock and return to svc_run()
   1844  * - if there is no request go asleep and wait for a signal
   1845  * - handle two exceptions:
   1846  *   a) current transport is closing
   1847  *   b) timeout waiting for a new request
   1848  *   in both cases return to svc_run()
   1849  */
   1850 static SVCMASTERXPRT *
   1851 svc_poll(SVCPOOL *pool, SVCMASTERXPRT *xprt, SVCXPRT *clone_xprt)
   1852 {
   1853 	/*
   1854 	 * Main loop iterates until
   1855 	 * a) we find a pending request,
   1856 	 * b) detect that the current transport is closing
   1857 	 * c) time out waiting for a new request.
   1858 	 */
   1859 	for (;;) {
   1860 		SVCMASTERXPRT *next;
   1861 		clock_t timeleft;
   1862 
   1863 		/*
   1864 		 * Step 1.
   1865 		 * Check if there is a pending request on the current
   1866 		 * transport handle so that we can avoid cloning.
   1867 		 * If so then decrement the `pending-request' count for
   1868 		 * the pool and return to svc_run().
   1869 		 *
   1870 		 * We need to prevent a potential starvation. When
   1871 		 * a selected transport has all pending requests coming in
   1872 		 * all the time then the service threads will never switch to
   1873 		 * another transport. With a limited number of service
   1874 		 * threads some transports may be never serviced.
   1875 		 * To prevent such a scenario we pick up at most
   1876 		 * pool->p_max_same_xprt requests from the same transport
   1877 		 * and then take a hint from the xprt-ready queue or walk
   1878 		 * the transport list.
   1879 		 */
   1880 		if (xprt && xprt->xp_req_head && (!pool->p_qoverflow ||
   1881 		    clone_xprt->xp_same_xprt++ < pool->p_max_same_xprt)) {
   1882 			mutex_enter(&xprt->xp_req_lock);
   1883 			if (xprt->xp_req_head) {
   1884 				mutex_enter(&pool->p_req_lock);
   1885 				pool->p_reqs--;
   1886 				if (pool->p_reqs == 0)
   1887 					pool->p_qoverflow = FALSE;
   1888 				mutex_exit(&pool->p_req_lock);
   1889 
   1890 				return (xprt);
   1891 			}
   1892 			mutex_exit(&xprt->xp_req_lock);
   1893 		}
   1894 		clone_xprt->xp_same_xprt = 0;
   1895 
   1896 		/*
   1897 		 * Step 2.
   1898 		 * If there is no request on the current transport try to
   1899 		 * find another transport with a pending request.
   1900 		 */
   1901 		mutex_enter(&pool->p_req_lock);
   1902 		pool->p_walkers++;
   1903 		mutex_exit(&pool->p_req_lock);
   1904 
   1905 		/*
   1906 		 * Make sure that transports will not be destroyed just
   1907 		 * while we are checking them.
   1908 		 */
   1909 		rw_enter(&pool->p_lrwlock, RW_READER);
   1910 
   1911 		for (;;) {
   1912 			SVCMASTERXPRT *hint;
   1913 
   1914 			/*
   1915 			 * Get the next transport from the xprt-ready queue.
   1916 			 * This is a hint. There is no guarantee that the
   1917 			 * transport still has a pending request since it
   1918 			 * could be picked up by another thread in step 1.
   1919 			 *
   1920 			 * If the transport has a pending request then keep
   1921 			 * it locked. Decrement the `pending-requests' for
   1922 			 * the pool and `walking-threads' counts, and return
   1923 			 * to svc_run().
   1924 			 */
   1925 			hint = svc_xprt_qget(pool);
   1926 
   1927 			if (hint && hint->xp_req_head) {
   1928 				mutex_enter(&hint->xp_req_lock);
   1929 				if (hint->xp_req_head) {
   1930 					rw_exit(&pool->p_lrwlock);
   1931 
   1932 					mutex_enter(&pool->p_req_lock);
   1933 					pool->p_reqs--;
   1934 					if (pool->p_reqs == 0)
   1935 						pool->p_qoverflow = FALSE;
   1936 					pool->p_walkers--;
   1937 					mutex_exit(&pool->p_req_lock);
   1938 
   1939 					return (hint);
   1940 				}
   1941 				mutex_exit(&hint->xp_req_lock);
   1942 			}
   1943 
   1944 			/*
   1945 			 * If there was no hint in the xprt-ready queue then
   1946 			 * - if there is less pending requests than polling
   1947 			 *   threads go asleep
   1948 			 * - otherwise check if there was an overflow in the
   1949 			 *   xprt-ready queue; if so, then we need to break
   1950 			 *   the `drain' mode
   1951 			 */
   1952 			if (hint == NULL) {
   1953 				if (pool->p_reqs < pool->p_walkers) {
   1954 					mutex_enter(&pool->p_req_lock);
   1955 					if (pool->p_reqs < pool->p_walkers)
   1956 						goto sleep;
   1957 					mutex_exit(&pool->p_req_lock);
   1958 				}
   1959 				if (pool->p_qoverflow) {
   1960 					break;
   1961 				}
   1962 			}
   1963 		}
   1964 
   1965 		/*
   1966 		 * If there was an overflow in the xprt-ready queue then we
   1967 		 * need to switch to the `drain' mode, i.e. walk through the
   1968 		 * pool's transport list and search for a transport with a
   1969 		 * pending request. If we manage to drain all the pending
   1970 		 * requests then we can clear the overflow flag. This will
   1971 		 * switch svc_poll() back to taking hints from the xprt-ready
   1972 		 * queue (which is generally more efficient).
   1973 		 *
   1974 		 * If there are no registered transports simply go asleep.
   1975 		 */
   1976 		if (xprt == NULL && pool->p_lhead == NULL) {
   1977 			mutex_enter(&pool->p_req_lock);
   1978 			goto sleep;
   1979 		}
   1980 
   1981 		/*
   1982 		 * `Walk' through the pool's list of master server
   1983 		 * transport handles. Continue to loop until there are less
   1984 		 * looping threads then pending requests.
   1985 		 */
   1986 		next = xprt ? xprt->xp_next : pool->p_lhead;
   1987 
   1988 		for (;;) {
   1989 			/*
   1990 			 * Check if there is a request on this transport.
   1991 			 *
   1992 			 * Since blocking on a locked mutex is very expensive
   1993 			 * check for a request without a lock first. If we miss
   1994 			 * a request that is just being delivered but this will
   1995 			 * cost at most one full walk through the list.
   1996 			 */
   1997 			if (next->xp_req_head) {
   1998 				/*
   1999 				 * Check again, now with a lock.
   2000 				 */
   2001 				mutex_enter(&next->xp_req_lock);
   2002 				if (next->xp_req_head) {
   2003 					rw_exit(&pool->p_lrwlock);
   2004 
   2005 					mutex_enter(&pool->p_req_lock);
   2006 					pool->p_reqs--;
   2007 					if (pool->p_reqs == 0)
   2008 						pool->p_qoverflow = FALSE;
   2009 					pool->p_walkers--;
   2010 					mutex_exit(&pool->p_req_lock);
   2011 
   2012 					return (next);
   2013 				}
   2014 				mutex_exit(&next->xp_req_lock);
   2015 			}
   2016 
   2017 			/*
   2018 			 * Continue to `walk' through the pool's
   2019 			 * transport list until there is less requests
   2020 			 * than walkers. Check this condition without
   2021 			 * a lock first to avoid contention on a mutex.
   2022 			 */
   2023 			if (pool->p_reqs < pool->p_walkers) {
   2024 				/* Check again, now with the lock. */
   2025 				mutex_enter(&pool->p_req_lock);
   2026 				if (pool->p_reqs < pool->p_walkers)
   2027 					break;	/* goto sleep */
   2028 				mutex_exit(&pool->p_req_lock);
   2029 			}
   2030 
   2031 			next = next->xp_next;
   2032 		}
   2033 
   2034 	sleep:
   2035 		/*
   2036 		 * No work to do. Stop the `walk' and go asleep.
   2037 		 * Decrement the `walking-threads' count for the pool.
   2038 		 */
   2039 		pool->p_walkers--;
   2040 		rw_exit(&pool->p_lrwlock);
   2041 
   2042 		/*
   2043 		 * Count us as asleep, mark this thread as safe
   2044 		 * for suspend and wait for a request.
   2045 		 */
   2046 		pool->p_asleep++;
   2047 		timeleft = cv_reltimedwait_sig(&pool->p_req_cv,
   2048 		    &pool->p_req_lock, pool->p_timeout, TR_CLOCK_TICK);
   2049 
   2050 		/*
   2051 		 * If the drowsy flag is on this means that
   2052 		 * someone has signaled a wakeup. In such a case
   2053 		 * the `asleep-threads' count has already updated
   2054 		 * so just clear the flag.
   2055 		 *
   2056 		 * If the drowsy flag is off then we need to update
   2057 		 * the `asleep-threads' count.
   2058 		 */
   2059 		if (pool->p_drowsy) {
   2060 			pool->p_drowsy = FALSE;
   2061 			/*
   2062 			 * If the thread is here because it timedout,
   2063 			 * instead of returning SVC_ETIMEDOUT, it is
   2064 			 * time to do some more work.
   2065 			 */
   2066 			if (timeleft == -1)
   2067 				timeleft = 1;
   2068 		} else {
   2069 			pool->p_asleep--;
   2070 		}
   2071 		mutex_exit(&pool->p_req_lock);
   2072 
   2073 		/*
   2074 		 * If we received a signal while waiting for a
   2075 		 * request, inform svc_run(), so that we can return
   2076 		 * to user level and exit.
   2077 		 */
   2078 		if (timeleft == 0)
   2079 			return (SVC_EINTR);
   2080 
   2081 		/*
   2082 		 * If the current transport is gone then notify
   2083 		 * svc_run() to unlink from it.
   2084 		 */
   2085 		if (xprt && xprt->xp_wq == NULL)
   2086 			return (SVC_EXPRTGONE);
   2087 
   2088 		/*
   2089 		 * If we have timed out waiting for a request inform
   2090 		 * svc_run() that we probably don't need this thread.
   2091 		 */
   2092 		if (timeleft == -1)
   2093 			return (SVC_ETIMEDOUT);
   2094 	}
   2095 }
   2096 
   2097 /*
   2098  * Main loop of the kernel RPC server
   2099  * - wait for input (find a transport with a pending request).
   2100  * - dequeue the request
   2101  * - call a registered server routine to process the requests
   2102  *
   2103  * There can many threads running concurrently in this loop
   2104  * on the same or on different transports.
   2105  */
   2106 static int
   2107 svc_run(SVCPOOL *pool)
   2108 {
   2109 	SVCMASTERXPRT *xprt = NULL;	/* master transport handle  */
   2110 	SVCXPRT *clone_xprt;	/* clone for this thread    */
   2111 	proc_t *p = ttoproc(curthread);
   2112 
   2113 	/* Allocate a clone transport handle for this thread */
   2114 	clone_xprt = svc_clone_init();
   2115 
   2116 	/*
   2117 	 * The loop iterates until the thread becomes
   2118 	 * idle too long or the transport is gone.
   2119 	 */
   2120 	for (;;) {
   2121 		SVCMASTERXPRT *next;
   2122 		mblk_t *mp;
   2123 
   2124 		TRACE_0(TR_FAC_KRPC, TR_SVC_RUN, "svc_run");
   2125 
   2126 		/*
   2127 		 * If the process is exiting/killed, return
   2128 		 * immediately without processing any more
   2129 		 * requests.
   2130 		 */
   2131 		if (p->p_flag & (SEXITING | SKILLED)) {
   2132 			svc_thread_exit(pool, clone_xprt);
   2133 			return (EINTR);
   2134 		}
   2135 
   2136 		/* Find a transport with a pending request */
   2137 		next = svc_poll(pool, xprt, clone_xprt);
   2138 
   2139 		/*
   2140 		 * If svc_poll() finds a transport with a request
   2141 		 * it latches xp_req_lock on it. Therefore we need
   2142 		 * to dequeue the request and release the lock as
   2143 		 * soon as possible.
   2144 		 */
   2145 		ASSERT(next != NULL &&
   2146 		    (next == SVC_EXPRTGONE ||
   2147 		    next == SVC_ETIMEDOUT ||
   2148 		    next == SVC_EINTR ||
   2149 		    MUTEX_HELD(&next->xp_req_lock)));
   2150 
   2151 		/* Ooops! Current transport is closing. Unlink now */
   2152 		if (next == SVC_EXPRTGONE) {
   2153 			svc_clone_unlink(clone_xprt);
   2154 			xprt = NULL;
   2155 			continue;
   2156 		}
   2157 
   2158 		/* Ooops! Timeout while waiting for a request. Exit */
   2159 		if (next == SVC_ETIMEDOUT) {
   2160 			svc_thread_exit(pool, clone_xprt);
   2161 			return (0);
   2162 		}
   2163 
   2164 		/*
   2165 		 * Interrupted by a signal while waiting for a
   2166 		 * request. Return to userspace and exit.
   2167 		 */
   2168 		if (next == SVC_EINTR) {
   2169 			svc_thread_exit(pool, clone_xprt);
   2170 			return (EINTR);
   2171 		}
   2172 
   2173 		/*
   2174 		 * De-queue the request and release the request lock
   2175 		 * on this transport (latched by svc_poll()).
   2176 		 */
   2177 		mp = next->xp_req_head;
   2178 		next->xp_req_head = mp->b_next;
   2179 		mp->b_next = (mblk_t *)0;
   2180 
   2181 		TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_DEQ,
   2182 		    "rpc_que_req_deq:pool %p mp %p", pool, mp);
   2183 		mutex_exit(&next->xp_req_lock);
   2184 
   2185 		/*
   2186 		 * If this is a new request on a current transport then
   2187 		 * the clone structure is already properly initialized.
   2188 		 * Otherwise, if the request is on a different transport,
   2189 		 * unlink from the current master and link to
   2190 		 * the one we got a request on.
   2191 		 */
   2192 		if (next != xprt) {
   2193 			if (xprt)
   2194 				svc_clone_unlink(clone_xprt);
   2195 			svc_clone_link(next, clone_xprt);
   2196 			xprt = next;
   2197 		}
   2198 
   2199 		/*
   2200 		 * If there are more requests and req_cv hasn't
   2201 		 * been signaled yet then wake up one more thread now.
   2202 		 *
   2203 		 * We avoid signaling req_cv until the most recently
   2204 		 * signaled thread wakes up and gets CPU to clear
   2205 		 * the `drowsy' flag.
   2206 		 */
   2207 		if (!(pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
   2208 		    pool->p_asleep == 0)) {
   2209 			mutex_enter(&pool->p_req_lock);
   2210 
   2211 			if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
   2212 			    pool->p_asleep == 0)
   2213 				mutex_exit(&pool->p_req_lock);
   2214 			else {
   2215 				pool->p_asleep--;
   2216 				pool->p_drowsy = TRUE;
   2217 
   2218 				cv_signal(&pool->p_req_cv);
   2219 				mutex_exit(&pool->p_req_lock);
   2220 			}
   2221 		}
   2222 
   2223 		/*
   2224 		 * If there are no asleep/signaled threads, we are
   2225 		 * still below pool->p_maxthreads limit, and no thread is
   2226 		 * currently being created then signal the creator
   2227 		 * for one more service thread.
   2228 		 *
   2229 		 * The asleep and drowsy checks are not protected
   2230 		 * by a lock since it hurts performance and a wrong
   2231 		 * decision is not essential.
   2232 		 */
   2233 		if (pool->p_asleep == 0 && !pool->p_drowsy &&
   2234 		    pool->p_threads + pool->p_detached_threads <
   2235 		    pool->p_maxthreads)
   2236 			svc_creator_signal(pool);
   2237 
   2238 		/*
   2239 		 * Process the request.
   2240 		 */
   2241 		svc_getreq(clone_xprt, mp);
   2242 
   2243 		/* If thread had a reservation it should have been canceled */
   2244 		ASSERT(!clone_xprt->xp_reserved);
   2245 
   2246 		/*
   2247 		 * If the clone is marked detached then exit.
   2248 		 * The rpcmod slot has already been released
   2249 		 * when we detached this thread.
   2250 		 */
   2251 		if (clone_xprt->xp_detached) {
   2252 			svc_thread_exitdetached(pool, clone_xprt);
   2253 			return (0);
   2254 		}
   2255 
   2256 		/*
   2257 		 * Release our reference on the rpcmod
   2258 		 * slot attached to xp_wq->q_ptr.
   2259 		 */
   2260 		(*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
   2261 	}
   2262 	/* NOTREACHED */
   2263 }
   2264 
   2265 /*
   2266  * Flush any pending requests for the queue and
   2267  * and free the associated mblks.
   2268  */
   2269 void
   2270 svc_queueclean(queue_t *q)
   2271 {
   2272 	SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
   2273 	mblk_t *mp;
   2274 	SVCPOOL *pool;
   2275 
   2276 	/*
   2277 	 * clean up the requests
   2278 	 */
   2279 	mutex_enter(&xprt->xp_req_lock);
   2280 	pool = xprt->xp_pool;
   2281 	while ((mp = xprt->xp_req_head) != NULL) {
   2282 		/* remove the request from the list and decrement p_reqs */
   2283 		xprt->xp_req_head = mp->b_next;
   2284 		mutex_enter(&pool->p_req_lock);
   2285 		mp->b_next = (mblk_t *)0;
   2286 		pool->p_reqs--;
   2287 		mutex_exit(&pool->p_req_lock);
   2288 		(*RELE_PROC(xprt)) (xprt->xp_wq, mp);
   2289 	}
   2290 	mutex_exit(&xprt->xp_req_lock);
   2291 }
   2292 
   2293 /*
   2294  * This routine is called by rpcmod to inform kernel RPC that a
   2295  * queue is closing. It is called after all the requests have been
   2296  * picked up (that is after all the slots on the queue have
   2297  * been released by kernel RPC). It is also guaranteed that no more
   2298  * request will be delivered on this transport.
   2299  *
   2300  * - clear xp_wq to mark the master server transport handle as closing
   2301  * - if there are no more threads on this transport close/destroy it
   2302  * - otherwise, broadcast threads sleeping in svc_poll(); the last
   2303  *   thread will close/destroy the transport.
   2304  */
   2305 void
   2306 svc_queueclose(queue_t *q)
   2307 {
   2308 	SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
   2309 
   2310 	if (xprt == NULL) {
   2311 		/*
   2312 		 * If there is no master xprt associated with this stream,
   2313 		 * then there is nothing to do.  This happens regularly
   2314 		 * with connection-oriented listening streams created by
   2315 		 * nfsd.
   2316 		 */
   2317 		return;
   2318 	}
   2319 
   2320 	mutex_enter(&xprt->xp_thread_lock);
   2321 
   2322 	ASSERT(xprt->xp_req_head == NULL);
   2323 	ASSERT(xprt->xp_wq != NULL);
   2324 
   2325 	xprt->xp_wq = NULL;
   2326 
   2327 	if (xprt->xp_threads == 0) {
   2328 		SVCPOOL *pool = xprt->xp_pool;
   2329 
   2330 		/*
   2331 		 * svc_xprt_cleanup() destroys the transport
   2332 		 * or releases the transport thread lock
   2333 		 */
   2334 		svc_xprt_cleanup(xprt, FALSE);
   2335 
   2336 		mutex_enter(&pool->p_thread_lock);
   2337 
   2338 		/*
   2339 		 * If the pool is in closing state and this was
   2340 		 * the last transport in the pool then signal the creator
   2341 		 * thread to clean up and exit.
   2342 		 */
   2343 		if (pool->p_closing && svc_pool_tryexit(pool)) {
   2344 			return;
   2345 		}
   2346 		mutex_exit(&pool->p_thread_lock);
   2347 	} else {
   2348 		/*
   2349 		 * Wakeup threads sleeping in svc_poll() so that they
   2350 		 * unlink from the transport
   2351 		 */
   2352 		mutex_enter(&xprt->xp_pool->p_req_lock);
   2353 		cv_broadcast(&xprt->xp_pool->p_req_cv);
   2354 		mutex_exit(&xprt->xp_pool->p_req_lock);
   2355 
   2356 		/*
   2357 		 *  NOTICE: No references to the master transport structure
   2358 		 *	    beyond this point!
   2359 		 */
   2360 		mutex_exit(&xprt->xp_thread_lock);
   2361 	}
   2362 }
   2363 
   2364 /*
   2365  * Interrupt `request delivery' routine called from rpcmod
   2366  * - put a request at the tail of the transport request queue
   2367  * - insert a hint for svc_poll() into the xprt-ready queue
   2368  * - increment the `pending-requests' count for the pool
   2369  * - wake up a thread sleeping in svc_poll() if necessary
   2370  * - if all the threads are running ask the creator for a new one.
   2371  */
   2372 void
   2373 svc_queuereq(queue_t *q, mblk_t *mp)
   2374 {
   2375 	SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
   2376 	SVCPOOL *pool = xprt->xp_pool;
   2377 
   2378 	TRACE_0(TR_FAC_KRPC, TR_SVC_QUEUEREQ_START, "svc_queuereq_start");
   2379 
   2380 	ASSERT(!is_system_labeled() || msg_getcred(mp, NULL) != NULL ||
   2381 	    mp->b_datap->db_type != M_DATA);
   2382 
   2383 	/*
   2384 	 * Step 1.
   2385 	 * Grab the transport's request lock and the
   2386 	 * pool's request lock so that when we put
   2387 	 * the request at the tail of the transport's
   2388 	 * request queue, possibly put the request on
   2389 	 * the xprt ready queue and increment the
   2390 	 * pending request count it looks atomic.
   2391 	 */
   2392 	mutex_enter(&xprt->xp_req_lock);
   2393 	mutex_enter(&pool->p_req_lock);
   2394 	if (xprt->xp_req_head == NULL)
   2395 		xprt->xp_req_head = mp;
   2396 	else
   2397 		xprt->xp_req_tail->b_next = mp;
   2398 	xprt->xp_req_tail = mp;
   2399 
   2400 	/*
   2401 	 * Step 2.
   2402 	 * Insert a hint into the xprt-ready queue, increment
   2403 	 * `pending-requests' count for the pool, and wake up
   2404 	 * a thread sleeping in svc_poll() if necessary.
   2405 	 */
   2406 
   2407 	/* Insert pointer to this transport into the xprt-ready queue */
   2408 	svc_xprt_qput(pool, xprt);
   2409 
   2410 	/* Increment the `pending-requests' count for the pool */
   2411 	pool->p_reqs++;
   2412 
   2413 	TRACE_2(TR_FAC_KRPC, TR_NFSFP_QUE_REQ_ENQ,
   2414 	    "rpc_que_req_enq:pool %p mp %p", pool, mp);
   2415 
   2416 	/*
   2417 	 * If there are more requests and req_cv hasn't
   2418 	 * been signaled yet then wake up one more thread now.
   2419 	 *
   2420 	 * We avoid signaling req_cv until the most recently
   2421 	 * signaled thread wakes up and gets CPU to clear
   2422 	 * the `drowsy' flag.
   2423 	 */
   2424 	if (pool->p_drowsy || pool->p_reqs <= pool->p_walkers ||
   2425 	    pool->p_asleep == 0) {
   2426 		mutex_exit(&pool->p_req_lock);
   2427 	} else {
   2428 		pool->p_drowsy = TRUE;
   2429 		pool->p_asleep--;
   2430 
   2431 		/*
   2432 		 * Signal wakeup and drop the request lock.
   2433 		 */
   2434 		cv_signal(&pool->p_req_cv);
   2435 		mutex_exit(&pool->p_req_lock);
   2436 	}
   2437 	mutex_exit(&xprt->xp_req_lock);
   2438 
   2439 	/*
   2440 	 * Step 3.
   2441 	 * If there are no asleep/signaled threads, we are
   2442 	 * still below pool->p_maxthreads limit, and no thread is
   2443 	 * currently being created then signal the creator
   2444 	 * for one more service thread.
   2445 	 *
   2446 	 * The asleep and drowsy checks are not not protected
   2447 	 * by a lock since it hurts performance and a wrong
   2448 	 * decision is not essential.
   2449 	 */
   2450 	if (pool->p_asleep == 0 && !pool->p_drowsy &&
   2451 	    pool->p_threads + pool->p_detached_threads < pool->p_maxthreads)
   2452 		svc_creator_signal(pool);
   2453 
   2454 	TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
   2455 	    "svc_queuereq_end:(%S)", "end");
   2456 }
   2457 
   2458 /*
   2459  * Reserve a service thread so that it can be detached later.
   2460  * This reservation is required to make sure that when it tries to
   2461  * detach itself the total number of detached threads does not exceed
   2462  * pool->p_maxthreads - pool->p_redline (i.e. that we can have
   2463  * up to pool->p_redline non-detached threads).
   2464  *
   2465  * If the thread does not detach itself later, it should cancel the
   2466  * reservation before returning to svc_run().
   2467  *
   2468  * - check if there is room for more reserved/detached threads
   2469  * - if so, then increment the `reserved threads' count for the pool
   2470  * - mark the thread as reserved (setting the flag in the clone transport
   2471  *   handle for this thread
   2472  * - returns 1 if the reservation succeeded, 0 if it failed.
   2473  */
   2474 int
   2475 svc_reserve_thread(SVCXPRT *clone_xprt)
   2476 {
   2477 	SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
   2478 
   2479 	/* Recursive reservations are not allowed */
   2480 	ASSERT(!clone_xprt->xp_reserved);
   2481 	ASSERT(!clone_xprt->xp_detached);
   2482 
   2483 	/* Check pool counts if there is room for reservation */
   2484 	mutex_enter(&pool->p_thread_lock);
   2485 	if (pool->p_reserved_threads + pool->p_detached_threads >=
   2486 	    pool->p_maxthreads - pool->p_redline) {
   2487 		mutex_exit(&pool->p_thread_lock);
   2488 		return (0);
   2489 	}
   2490 	pool->p_reserved_threads++;
   2491 	mutex_exit(&pool->p_thread_lock);
   2492 
   2493 	/* Mark the thread (clone handle) as reserved */
   2494 	clone_xprt->xp_reserved = TRUE;
   2495 
   2496 	return (1);
   2497 }
   2498 
   2499 /*
   2500  * Cancel a reservation for a thread.
   2501  * - decrement the `reserved threads' count for the pool
   2502  * - clear the flag in the clone transport handle for this thread.
   2503  */
   2504 void
   2505 svc_unreserve_thread(SVCXPRT *clone_xprt)
   2506 {
   2507 	SVCPOOL *pool = clone_xprt->xp_master->xp_pool;
   2508 
   2509 	/* Thread must have a reservation */
   2510 	ASSERT(clone_xprt->xp_reserved);
   2511 	ASSERT(!clone_xprt->xp_detached);
   2512 
   2513 	/* Decrement global count */
   2514 	mutex_enter(&pool->p_thread_lock);
   2515 	pool->p_reserved_threads--;
   2516 	mutex_exit(&pool->p_thread_lock);
   2517 
   2518 	/* Clear reservation flag */
   2519 	clone_xprt->xp_reserved = FALSE;
   2520 }
   2521 
   2522 /*
   2523  * Detach a thread from its transport, so that it can block for an
   2524  * extended time.  Because the transport can be closed after the thread is
   2525  * detached, the thread should have already sent off a reply if it was
   2526  * going to send one.
   2527  *
   2528  * - decrement `non-detached threads' count and increment `detached threads'
   2529  *   counts for the transport
   2530  * - decrement the  `non-detached threads' and `reserved threads'
   2531  *   counts and increment the `detached threads' count for the pool
   2532  * - release the rpcmod slot
   2533  * - mark the clone (thread) as detached.
   2534  *
   2535  * No need to return a pointer to the thread's CPR information, since
   2536  * the thread has a userland identity.
   2537  *
   2538  * NOTICE: a thread must not detach itself without making a prior reservation
   2539  *	   through svc_thread_reserve().
   2540  */
   2541 callb_cpr_t *
   2542 svc_detach_thread(SVCXPRT *clone_xprt)
   2543 {
   2544 	SVCMASTERXPRT *xprt = clone_xprt->xp_master;
   2545 	SVCPOOL *pool = xprt->xp_pool;
   2546 
   2547 	/* Thread must have a reservation */
   2548 	ASSERT(clone_xprt->xp_reserved);
   2549 	ASSERT(!clone_xprt->xp_detached);
   2550 
   2551 	/* Bookkeeping for this transport */
   2552 	mutex_enter(&xprt->xp_thread_lock);
   2553 	xprt->xp_threads--;
   2554 	xprt->xp_detached_threads++;
   2555 	mutex_exit(&xprt->xp_thread_lock);
   2556 
   2557 	/* Bookkeeping for the pool */
   2558 	mutex_enter(&pool->p_thread_lock);
   2559 	pool->p_threads--;
   2560 	pool->p_reserved_threads--;
   2561 	pool->p_detached_threads++;
   2562 	mutex_exit(&pool->p_thread_lock);
   2563 
   2564 	/* Release an rpcmod slot for this request */
   2565 	(*RELE_PROC(xprt)) (clone_xprt->xp_wq, NULL);
   2566 
   2567 	/* Mark the clone (thread) as detached */
   2568 	clone_xprt->xp_reserved = FALSE;
   2569 	clone_xprt->xp_detached = TRUE;
   2570 
   2571 	return (NULL);
   2572 }
   2573 
   2574 /*
   2575  * This routine is responsible for extracting RDMA plugin master XPRT,
   2576  * unregister from the SVCPOOL and initiate plugin specific cleanup.
   2577  * It is passed a list/group of rdma transports as records which are
   2578  * active in a given registered or unregistered kRPC thread pool. Its shuts
   2579  * all active rdma transports in that pool. If the thread active on the trasport
   2580  * happens to be last thread for that pool, it will signal the creater thread
   2581  * to cleanup the pool and destroy the xprt in svc_queueclose()
   2582  */
   2583 void
   2584 rdma_stop(rdma_xprt_group_t *rdma_xprts)
   2585 {
   2586 	SVCMASTERXPRT *xprt;
   2587 	rdma_xprt_record_t *curr_rec;
   2588 	queue_t *q;
   2589 	mblk_t *mp;
   2590 	int i, rtg_count;
   2591 	SVCPOOL *pool;
   2592 
   2593 	if (rdma_xprts->rtg_count == 0)
   2594 		return;
   2595 
   2596 	rtg_count = rdma_xprts->rtg_count;
   2597 
   2598 	for (i = 0; i < rtg_count; i++) {
   2599 		curr_rec = rdma_xprts->rtg_listhead;
   2600 		rdma_xprts->rtg_listhead = curr_rec->rtr_next;
   2601 		rdma_xprts->rtg_count--;
   2602 		curr_rec->rtr_next = NULL;
   2603 		xprt = curr_rec->rtr_xprt_ptr;
   2604 		q = xprt->xp_wq;
   2605 		svc_rdma_kstop(xprt);
   2606 
   2607 		mutex_enter(&xprt->xp_req_lock);
   2608 		pool = xprt->xp_pool;
   2609 		while ((mp = xprt->xp_req_head) != NULL) {
   2610 			/*
   2611 			 * remove the request from the list and
   2612 			 * decrement p_reqs
   2613 			 */
   2614 			xprt->xp_req_head = mp->b_next;
   2615 			mutex_enter(&pool->p_req_lock);
   2616 			mp->b_next = (mblk_t *)0;
   2617 			pool->p_reqs--;
   2618 			mutex_exit(&pool->p_req_lock);
   2619 			if (mp) {
   2620 				rdma_recv_data_t *rdp = (rdma_recv_data_t *)
   2621 				    mp->b_rptr;
   2622 				RDMA_BUF_FREE(rdp->conn, &rdp->rpcmsg);
   2623 				RDMA_REL_CONN(rdp->conn);
   2624 				freemsg(mp);
   2625 			}
   2626 		}
   2627 		mutex_exit(&xprt->xp_req_lock);
   2628 		svc_queueclose(q);
   2629 #ifdef	DEBUG
   2630 		if (rdma_check)
   2631 			cmn_err(CE_NOTE, "rdma_stop: Exited svc_queueclose\n");
   2632 #endif
   2633 		/*
   2634 		 * Free the rdma transport record for the expunged rdma
   2635 		 * based master transport handle.
   2636 		 */
   2637 		kmem_free(curr_rec, sizeof (rdma_xprt_record_t));
   2638 		if (!rdma_xprts->rtg_listhead)
   2639 			break;
   2640 	}
   2641 }
   2642 
   2643 
   2644 /*
   2645  * rpc_msg_dup/rpc_msg_free
   2646  * Currently only used by svc_rpcsec_gss.c but put in this file as it
   2647  * may be useful to others in the future.
   2648  * But future consumers should be careful cuz so far
   2649  *   - only tested/used for call msgs (not reply)
   2650  *   - only tested/used with call verf oa_length==0
   2651  */
   2652 struct rpc_msg *
   2653 rpc_msg_dup(struct rpc_msg *src)
   2654 {
   2655 	struct rpc_msg *dst;
   2656 	struct opaque_auth oa_src, oa_dst;
   2657 
   2658 	dst = kmem_alloc(sizeof (*dst), KM_SLEEP);
   2659 
   2660 	dst->rm_xid = src->rm_xid;
   2661 	dst->rm_direction = src->rm_direction;
   2662 
   2663 	dst->rm_call.cb_rpcvers = src->rm_call.cb_rpcvers;
   2664 	dst->rm_call.cb_prog = src->rm_call.cb_prog;
   2665 	dst->rm_call.cb_vers = src->rm_call.cb_vers;
   2666 	dst->rm_call.cb_proc = src->rm_call.cb_proc;
   2667 
   2668 	/* dup opaque auth call body cred */
   2669 	oa_src = src->rm_call.cb_cred;
   2670 
   2671 	oa_dst.oa_flavor = oa_src.oa_flavor;
   2672 	oa_dst.oa_base = kmem_alloc(oa_src.oa_length, KM_SLEEP);
   2673 
   2674 	bcopy(oa_src.oa_base, oa_dst.oa_base, oa_src.oa_length);
   2675 	oa_dst.oa_length = oa_src.oa_length;
   2676 
   2677 	dst->rm_call.cb_cred = oa_dst;
   2678 
   2679 	/* dup or just alloc opaque auth call body verifier */
   2680 	if (src->rm_call.cb_verf.oa_length > 0) {
   2681 		oa_src = src->rm_call.cb_verf;
   2682 
   2683 		oa_dst.oa_flavor = oa_src.oa_flavor;
   2684 		oa_dst.oa_base = kmem_alloc(oa_src.oa_length, KM_SLEEP);
   2685 
   2686 		bcopy(oa_src.oa_base, oa_dst.oa_base, oa_src.oa_length);
   2687 		oa_dst.oa_length = oa_src.oa_length;
   2688 
   2689 		dst->rm_call.cb_verf = oa_dst;
   2690 	} else {
   2691 		oa_dst.oa_flavor = -1;  /* will be set later */
   2692 		oa_dst.oa_base = kmem_alloc(MAX_AUTH_BYTES, KM_SLEEP);
   2693 
   2694 		oa_dst.oa_length = 0;   /* will be set later */
   2695 
   2696 		dst->rm_call.cb_verf = oa_dst;
   2697 	}
   2698 	return (dst);
   2699 
   2700 error:
   2701 	kmem_free(dst->rm_call.cb_cred.oa_base,	dst->rm_call.cb_cred.oa_length);
   2702 	kmem_free(dst, sizeof (*dst));
   2703 	return (NULL);
   2704 }
   2705 
   2706 void
   2707 rpc_msg_free(struct rpc_msg **msg, int cb_verf_oa_length)
   2708 {
   2709 	struct rpc_msg *m = *msg;
   2710 
   2711 	kmem_free(m->rm_call.cb_cred.oa_base, m->rm_call.cb_cred.oa_length);
   2712 	m->rm_call.cb_cred.oa_base = NULL;
   2713 	m->rm_call.cb_cred.oa_length = 0;
   2714 
   2715 	kmem_free(m->rm_call.cb_verf.oa_base, cb_verf_oa_length);
   2716 	m->rm_call.cb_verf.oa_base = NULL;
   2717 	m->rm_call.cb_verf.oa_length = 0;
   2718 
   2719 	kmem_free(m, sizeof (*m));
   2720 	m = NULL;
   2721 }
   2722