Home | History | Annotate | Download | only in inet
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     27 
     28 /*
     29  * Squeues - TCP/IP serialization mechanism.
     30  *
     31  * This is a general purpose high-performance serialization mechanism. It is
     32  * similar to a taskq with a single worker thread, the difference is that it
     33  * does not imply a context switch - the thread placing a request may actually
     34  * process it. It is also biased for processing requests in interrupt context.
     35  *
     36  * Each squeue has a worker thread which may optionally be bound to a CPU.
     37  *
     38  * Only one thread may process requests from a given squeue at any time. This is
     39  * called "entering" squeue.
     40  *
     41  * Each dispatched request is processed either by
     42  *
     43  *	a) Dispatching thread or
     44  *	b) Some other thread that is currently processing squeue at the time of
     45  *		request or
     46  *	c) worker thread.
     47  *
     48  * INTERFACES:
     49  *
     50  * squeue_t *squeue_create(name, bind, wait, pri)
     51  *
     52  *	name: symbolic name for squeue.
     53  *	wait: time to wait before waiking the worker thread after queueing
     54  *		request.
     55  *	bind: preferred CPU binding for the worker thread.
     56  *	pri:  thread priority for the worker thread.
     57  *
     58  *   This function never fails and may sleep. It returns a transparent pointer
     59  *   to the squeue_t structure that is passed to all other squeue operations.
     60  *
     61  * void squeue_bind(sqp, bind)
     62  *
     63  *   Bind squeue worker thread to a CPU specified by the 'bind' argument. The
     64  *   'bind' value of -1 binds to the preferred thread specified for
     65  *   squeue_create.
     66  *
     67  *   NOTE: Any value of 'bind' other then -1 is not supported currently, but the
     68  *	 API is present - in the future it may be useful to specify different
     69  *	 binding.
     70  *
     71  * void squeue_unbind(sqp)
     72  *
     73  *   Unbind the worker thread from its preferred CPU.
     74  *
     75  * void squeue_enter(*sqp, *mp, proc, arg, tag)
     76  *
     77  *   Post a single request for processing. Each request consists of mblock 'mp',
     78  *   function 'proc' to execute and an argument 'arg' to pass to this
     79  *   function. The function is called as (*proc)(arg, mp, sqp); The tag is an
     80  *   arbitrary number from 0 to 255 which will be stored in mp to track exact
     81  *   caller of squeue_enter. The combination of function name and the tag should
     82  *   provide enough information to identify the caller.
     83  *
     84  *   If no one is processing the squeue, squeue_enter() will call the function
     85  *   immediately. Otherwise it will add the request to the queue for later
     86  *   processing. Once the function is executed, the thread may continue
     87  *   executing all other requests pending on the queue.
     88  *
     89  *   NOTE: The tagging information is only used when SQUEUE_DEBUG is set to 1.
     90  *   NOTE: The argument can be conn_t only. Ideally we'd like to have generic
     91  *	   argument, but we want to drop connection reference count here - this
     92  *	   improves tail-call optimizations.
     93  *	   XXX: The arg should have type conn_t.
     94  *
     95  * void squeue_enter_nodrain(*sqp, *mp, proc, arg, tag)
     96  *
     97  *   Same as squeue_enter(), but the entering thread will only try to execute a
     98  *   single request. It will not continue executing any pending requests.
     99  *
    100  * void squeue_fill(*sqp, *mp, proc, arg, tag)
    101  *
    102  *   Just place the request on the queue without trying to execute it. Arrange
    103  *   for the worker thread to process the request.
    104  *
    105  * void squeue_profile_enable(sqp)
    106  * void squeue_profile_disable(sqp)
    107  *
    108  *    Enable or disable profiling for specified 'sqp'. Profiling is only
    109  *    available when SQUEUE_PROFILE is set.
    110  *
    111  * void squeue_profile_reset(sqp)
    112  *
    113  *    Reset all profiling information to zero. Profiling is only
    114  *    available when SQUEUE_PROFILE is set.
    115  *
    116  * void squeue_profile_start()
    117  * void squeue_profile_stop()
    118  *
    119  *    Globally enable or disabled profiling for all squeues.
    120  *
    121  * uintptr_t *squeue_getprivate(sqp, p)
    122  *
    123  *    Each squeue keeps small amount of private data space available for various
    124  *    consumers. Current consumers include TCP and NCA. Other consumers need to
    125  *    add their private tag to the sqprivate_t enum. The private information is
    126  *    limited to an uintptr_t value. The squeue has no knowledge of its content
    127  *    and does not manage it in any way.
    128  *
    129  *    The typical use may be a breakdown of data structures per CPU (since
    130  *    squeues are usually per CPU). See NCA for examples of use.
    131  *    Currently 'p' may have one legal value SQPRIVATE_TCP.
    132  *
    133  * processorid_t squeue_binding(sqp)
    134  *
    135  *    Returns the CPU binding for a given squeue.
    136  *
    137  * TUNABALES:
    138  *
    139  * squeue_intrdrain_ms: Maximum time in ms interrupts spend draining any
    140  *	squeue. Note that this is approximation - squeues have no control on the
    141  *	time it takes to process each request. This limit is only checked
    142  *	between processing individual messages.
    143  *    Default: 20 ms.
    144  *
    145  * squeue_writerdrain_ms: Maximum time in ms non-interrupts spend draining any
    146  *	squeue. Note that this is approximation - squeues have no control on the
    147  *	time it takes to process each request. This limit is only checked
    148  *	between processing individual messages.
    149  *    Default: 10 ms.
    150  *
    151  * squeue_workerdrain_ms: Maximum time in ms worker thread spends draining any
    152  *	squeue. Note that this is approximation - squeues have no control on the
    153  *	time it takes to process each request. This limit is only checked
    154  *	between processing individual messages.
    155  *    Default: 10 ms.
    156  *
    157  * squeue_workerwait_ms: When worker thread is interrupted because workerdrain
    158  *	expired, how much time to wait before waking worker thread again.
    159  *    Default: 10 ms.
    160  */
    161 
    162 #include <sys/types.h>
    163 #include <sys/cmn_err.h>
    164 #include <sys/debug.h>
    165 #include <sys/kmem.h>
    166 #include <sys/cpuvar.h>
    167 #include <sys/condvar_impl.h>
    168 #include <sys/systm.h>
    169 #include <sys/callb.h>
    170 #include <sys/sdt.h>
    171 #include <sys/ddi.h>
    172 
    173 #include <inet/ipclassifier.h>
    174 #include <inet/udp_impl.h>
    175 
    176 /*
    177  * State flags.
    178  * Note: The MDB IP module depends on the values of these flags.
    179  */
    180 #define	SQS_PROC	0x0001	/* being processed */
    181 #define	SQS_WORKER	0x0002	/* worker thread */
    182 #define	SQS_ENTER	0x0004	/* enter thread */
    183 #define	SQS_FAST	0x0008	/* enter-fast thread */
    184 #define	SQS_USER	0x0010	/* A non interrupt user */
    185 #define	SQS_BOUND	0x0020	/* Worker thread is bound */
    186 #define	SQS_PROFILE	0x0040	/* Enable profiling */
    187 #define	SQS_REENTER	0x0080	/* Re entered thread */
    188 #define	SQS_TMO_PROG	0x0100	/* Timeout is being set */
    189 
    190 #include <sys/squeue_impl.h>
    191 
    192 static void squeue_fire(void *);
    193 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
    194 static void squeue_worker(squeue_t *sqp);
    195 
    196 #if SQUEUE_PROFILE
    197 static kmutex_t squeue_kstat_lock;
    198 static int  squeue_kstat_update(kstat_t *, int);
    199 #endif
    200 
    201 kmem_cache_t *squeue_cache;
    202 
    203 #define	SQUEUE_MSEC_TO_NSEC 1000000
    204 
    205 int squeue_intrdrain_ms = 20;
    206 int squeue_writerdrain_ms = 10;
    207 int squeue_workerdrain_ms = 10;
    208 int squeue_workerwait_ms = 10;
    209 
    210 /* The values above converted to ticks or nano seconds */
    211 static int squeue_intrdrain_ns = 0;
    212 static int squeue_writerdrain_ns = 0;
    213 static int squeue_workerdrain_ns = 0;
    214 static int squeue_workerwait_tick = 0;
    215 
    216 /*
    217  * The minimum packet queued when worker thread doing the drain triggers
    218  * polling (if squeue allows it). The choice of 3 is arbitrary. You
    219  * definitely don't want it to be 1 since that will trigger polling
    220  * on very low loads as well (ssh seems to do be one such example
    221  * where packet flow was very low yet somehow 1 packet ended up getting
    222  * queued and worker thread fires every 10ms and blanking also gets
    223  * triggered.
    224  */
    225 int squeue_worker_poll_min = 3;
    226 
    227 #if SQUEUE_PROFILE
    228 /*
    229  * Set to B_TRUE to enable profiling.
    230  */
    231 static int squeue_profile = B_FALSE;
    232 #define	SQ_PROFILING(sqp) (squeue_profile && ((sqp)->sq_state & SQS_PROFILE))
    233 
    234 #define	SQSTAT(sqp, x) ((sqp)->sq_stats.x++)
    235 #define	SQDELTA(sqp, x, d) ((sqp)->sq_stats.x += (d))
    236 
    237 struct squeue_kstat {
    238 	kstat_named_t	sq_count;
    239 	kstat_named_t	sq_max_qlen;
    240 	kstat_named_t	sq_npackets_worker;
    241 	kstat_named_t	sq_npackets_intr;
    242 	kstat_named_t	sq_npackets_other;
    243 	kstat_named_t	sq_nqueued_intr;
    244 	kstat_named_t	sq_nqueued_other;
    245 	kstat_named_t	sq_ndrains_worker;
    246 	kstat_named_t	sq_ndrains_intr;
    247 	kstat_named_t	sq_ndrains_other;
    248 	kstat_named_t	sq_time_worker;
    249 	kstat_named_t	sq_time_intr;
    250 	kstat_named_t	sq_time_other;
    251 } squeue_kstat = {
    252 	{ "count",		KSTAT_DATA_UINT64 },
    253 	{ "max_qlen",		KSTAT_DATA_UINT64 },
    254 	{ "packets_worker",	KSTAT_DATA_UINT64 },
    255 	{ "packets_intr",	KSTAT_DATA_UINT64 },
    256 	{ "packets_other",	KSTAT_DATA_UINT64 },
    257 	{ "queued_intr",	KSTAT_DATA_UINT64 },
    258 	{ "queued_other",	KSTAT_DATA_UINT64 },
    259 	{ "ndrains_worker",	KSTAT_DATA_UINT64 },
    260 	{ "ndrains_intr",	KSTAT_DATA_UINT64 },
    261 	{ "ndrains_other",	KSTAT_DATA_UINT64 },
    262 	{ "time_worker",	KSTAT_DATA_UINT64 },
    263 	{ "time_intr",		KSTAT_DATA_UINT64 },
    264 	{ "time_other",		KSTAT_DATA_UINT64 },
    265 };
    266 #endif
    267 
    268 #define	SQUEUE_WORKER_WAKEUP(sqp) {					\
    269 	timeout_id_t tid = (sqp)->sq_tid;				\
    270 									\
    271 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));				\
    272 	/*								\
    273 	 * Queue isn't being processed, so take				\
    274 	 * any post enqueue actions needed before leaving.		\
    275 	 */								\
    276 	if (tid != 0) {							\
    277 		/*							\
    278 		 * Waiting for an enter() to process mblk(s).		\
    279 		 */							\
    280 		clock_t	waited = lbolt - (sqp)->sq_awaken;		\
    281 									\
    282 		if (TICK_TO_MSEC(waited) >= (sqp)->sq_wait) {		\
    283 			/*						\
    284 			 * Times up and have a worker thread		\
    285 			 * waiting for work, so schedule it.		\
    286 			 */						\
    287 			(sqp)->sq_tid = 0;				\
    288 			(sqp)->sq_awaken = lbolt;			\
    289 			cv_signal(&(sqp)->sq_async);			\
    290 			mutex_exit(&(sqp)->sq_lock);			\
    291 			(void) untimeout(tid);				\
    292 			return;						\
    293 		}							\
    294 		mutex_exit(&(sqp)->sq_lock);				\
    295 		return;							\
    296 	} else if ((sqp)->sq_state & SQS_TMO_PROG) {			\
    297 		mutex_exit(&(sqp)->sq_lock);				\
    298 		return;							\
    299 	} else if ((sqp)->sq_wait != 0) {				\
    300 		clock_t	wait = (sqp)->sq_wait;				\
    301 		/*							\
    302 		 * Wait up to sqp->sq_wait ms for an			\
    303 		 * enter() to process this queue. We			\
    304 		 * don't want to contend on timeout locks		\
    305 		 * with sq_lock held for performance reasons,		\
    306 		 * so drop the sq_lock before calling timeout		\
    307 		 * but we need to check if timeout is required		\
    308 		 * after re acquiring the sq_lock. Once			\
    309 		 * the sq_lock is dropped, someone else could		\
    310 		 * have processed the packet or the timeout could	\
    311 		 * have already fired.					\
    312 		 */							\
    313 		(sqp)->sq_state |= SQS_TMO_PROG;			\
    314 		mutex_exit(&(sqp)->sq_lock);				\
    315 		tid = timeout(squeue_fire, (sqp), wait);		\
    316 		mutex_enter(&(sqp)->sq_lock);				\
    317 		/* Check again if we still need the timeout */		\
    318 		if ((((sqp)->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==	\
    319 			SQS_TMO_PROG) && ((sqp)->sq_tid == 0) &&	\
    320 			((sqp)->sq_first != NULL)) {			\
    321 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
    322 				(sqp)->sq_awaken = lbolt;		\
    323 				(sqp)->sq_tid = tid;			\
    324 				mutex_exit(&(sqp)->sq_lock);		\
    325 				return;					\
    326 		} else {						\
    327 			if ((sqp)->sq_state & SQS_TMO_PROG) {		\
    328 				(sqp)->sq_state &= ~SQS_TMO_PROG;	\
    329 				mutex_exit(&(sqp)->sq_lock);		\
    330 				(void) untimeout(tid);			\
    331 			} else {					\
    332 				/*					\
    333 				 * The timer fired before we could 	\
    334 				 * reacquire the sq_lock. squeue_fire	\
    335 				 * removes the SQS_TMO_PROG flag	\
    336 				 * and we don't need to	do anything	\
    337 				 * else.				\
    338 				 */					\
    339 				mutex_exit(&(sqp)->sq_lock);		\
    340 			}						\
    341 		}							\
    342 	} else {							\
    343 		/*							\
    344 		 * Schedule the worker thread.				\
    345 		 */							\
    346 		(sqp)->sq_awaken = lbolt;				\
    347 		cv_signal(&(sqp)->sq_async);				\
    348 		mutex_exit(&(sqp)->sq_lock);				\
    349 	}								\
    350 	ASSERT(MUTEX_NOT_HELD(&(sqp)->sq_lock)); 			\
    351 }
    352 
    353 #define	ENQUEUE_MP(sqp, mp, proc, arg) {			\
    354 	/*							\
    355 	 * Enque our mblk.					\
    356 	 */							\
    357 	(mp)->b_queue = NULL;					\
    358 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
    359 	ASSERT((mp)->b_prev == NULL && (mp)->b_next == NULL); 	\
    360 	(mp)->b_queue = (queue_t *)(proc);			\
    361 	(mp)->b_prev = (mblk_t *)(arg);				\
    362 								\
    363 	if ((sqp)->sq_last != NULL)				\
    364 		(sqp)->sq_last->b_next = (mp);			\
    365 	else							\
    366 		(sqp)->sq_first = (mp);				\
    367 	(sqp)->sq_last = (mp);					\
    368 	(sqp)->sq_count++;					\
    369 	ASSERT((sqp)->sq_count > 0);				\
    370 	DTRACE_PROBE2(squeue__enqueue, squeue_t *, sqp,		\
    371 	    mblk_t *, mp);					\
    372 }
    373 
    374 
    375 #define	ENQUEUE_CHAIN(sqp, mp, tail, cnt) {			\
    376 	/*							\
    377 	 * Enqueue our mblk chain.				\
    378 	 */							\
    379 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
    380 								\
    381 	if ((sqp)->sq_last != NULL)				\
    382 		(sqp)->sq_last->b_next = (mp);			\
    383 	else							\
    384 		(sqp)->sq_first = (mp);				\
    385 	(sqp)->sq_last = (tail);				\
    386 	(sqp)->sq_count += (cnt);				\
    387 	ASSERT((sqp)->sq_count > 0);				\
    388 	DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp,	\
    389 		mblk_t *, mp, mblk_t *, tail, int, cnt);	\
    390 								\
    391 }
    392 
    393 #define	SQS_POLLING_ON(sqp, rx_ring) {				\
    394 	ASSERT(rx_ring != NULL);				\
    395 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
    396 	rx_ring->rr_blank(rx_ring->rr_handle,			\
    397 	    MIN((sqp->sq_avg_drain_time * sqp->sq_count),	\
    398 		rx_ring->rr_max_blank_time),			\
    399 		rx_ring->rr_max_pkt_cnt);			\
    400 	rx_ring->rr_poll_state |= ILL_POLLING;			\
    401 	rx_ring->rr_poll_time = lbolt;				\
    402 }
    403 
    404 
    405 #define	SQS_POLLING_OFF(sqp, rx_ring) {				\
    406 	ASSERT(rx_ring != NULL);				\
    407 	ASSERT(MUTEX_HELD(&(sqp)->sq_lock));			\
    408 	rx_ring->rr_blank(rx_ring->rr_handle,			\
    409 	    rx_ring->rr_min_blank_time,				\
    410 	    rx_ring->rr_min_pkt_cnt);				\
    411 }
    412 
    413 void
    414 squeue_init(void)
    415 {
    416 	squeue_cache = kmem_cache_create("squeue_cache",
    417 	    sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
    418 
    419 	squeue_intrdrain_ns = squeue_intrdrain_ms * SQUEUE_MSEC_TO_NSEC;
    420 	squeue_writerdrain_ns = squeue_writerdrain_ms * SQUEUE_MSEC_TO_NSEC;
    421 	squeue_workerdrain_ns = squeue_workerdrain_ms * SQUEUE_MSEC_TO_NSEC;
    422 	squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
    423 }
    424 
    425 /* ARGSUSED */
    426 squeue_t *
    427 squeue_create(char *name, processorid_t bind, clock_t wait, pri_t pri)
    428 {
    429 	squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
    430 
    431 	bzero(sqp, sizeof (squeue_t));
    432 	(void) strncpy(sqp->sq_name, name, SQ_NAMELEN + 1);
    433 	sqp->sq_name[SQ_NAMELEN] = '\0';
    434 
    435 	sqp->sq_bind = bind;
    436 	sqp->sq_wait = MSEC_TO_TICK(wait);
    437 	sqp->sq_avg_drain_time =
    438 	    drv_hztousec(NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns)) /
    439 	    NSEC_TO_TICK_ROUNDUP(squeue_intrdrain_ns);
    440 
    441 #if SQUEUE_PROFILE
    442 	if ((sqp->sq_kstat = kstat_create("ip", bind, name,
    443 		"net", KSTAT_TYPE_NAMED,
    444 		sizeof (squeue_kstat) / sizeof (kstat_named_t),
    445 		KSTAT_FLAG_VIRTUAL)) != NULL) {
    446 		sqp->sq_kstat->ks_lock = &squeue_kstat_lock;
    447 		sqp->sq_kstat->ks_data = &squeue_kstat;
    448 		sqp->sq_kstat->ks_update = squeue_kstat_update;
    449 		sqp->sq_kstat->ks_private = sqp;
    450 		kstat_install(sqp->sq_kstat);
    451 	}
    452 #endif
    453 
    454 	sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
    455 	    sqp, 0, &p0, TS_RUN, pri);
    456 
    457 	return (sqp);
    458 }
    459 
    460 /* ARGSUSED */
    461 void
    462 squeue_bind(squeue_t *sqp, processorid_t bind)
    463 {
    464 	ASSERT(bind == -1);
    465 
    466 	mutex_enter(&sqp->sq_lock);
    467 	if (sqp->sq_state & SQS_BOUND) {
    468 		mutex_exit(&sqp->sq_lock);
    469 		return;
    470 	}
    471 
    472 	sqp->sq_state |= SQS_BOUND;
    473 	mutex_exit(&sqp->sq_lock);
    474 
    475 	thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
    476 }
    477 
    478 void
    479 squeue_unbind(squeue_t *sqp)
    480 {
    481 	mutex_enter(&sqp->sq_lock);
    482 	if (!(sqp->sq_state & SQS_BOUND)) {
    483 		mutex_exit(&sqp->sq_lock);
    484 		return;
    485 	}
    486 
    487 	sqp->sq_state &= ~SQS_BOUND;
    488 	mutex_exit(&sqp->sq_lock);
    489 
    490 	thread_affinity_clear(sqp->sq_worker);
    491 }
    492 
    493 /*
    494  * squeue_enter() - enter squeue sqp with mblk mp (which can be
    495  * a chain), while tail points to the end and cnt in number of
    496  * mblks in the chain.
    497  *
    498  * For a chain of single packet (i.e. mp == tail), go through the
    499  * fast path if no one is processing the squeue and nothing is queued.
    500  *
    501  * The proc and arg for each mblk is already stored in the mblk in
    502  * appropriate places.
    503  */
    504 void
    505 squeue_enter_chain(squeue_t *sqp, mblk_t *mp, mblk_t *tail,
    506     uint32_t cnt, uint8_t tag)
    507 {
    508 	int		interrupt = servicing_interrupt();
    509 	void 		*arg;
    510 	sqproc_t	proc;
    511 	hrtime_t	now;
    512 #if SQUEUE_PROFILE
    513 	hrtime_t 	start, delta;
    514 #endif
    515 
    516 	ASSERT(sqp != NULL);
    517 	ASSERT(mp != NULL);
    518 	ASSERT(tail != NULL);
    519 	ASSERT(cnt > 0);
    520 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
    521 
    522 	mutex_enter(&sqp->sq_lock);
    523 	if (!(sqp->sq_state & SQS_PROC)) {
    524 		/*
    525 		 * See if anything is already queued. If we are the
    526 		 * first packet, do inline processing else queue the
    527 		 * packet and do the drain.
    528 		 */
    529 		sqp->sq_run = curthread;
    530 		if (sqp->sq_first == NULL && cnt == 1) {
    531 			/*
    532 			 * Fast-path, ok to process and nothing queued.
    533 			 */
    534 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
    535 			mutex_exit(&sqp->sq_lock);
    536 
    537 			/*
    538 			 * We are the chain of 1 packet so
    539 			 * go through this fast path.
    540 			 */
    541 			arg = mp->b_prev;
    542 			mp->b_prev = NULL;
    543 			proc = (sqproc_t)mp->b_queue;
    544 			mp->b_queue = NULL;
    545 
    546 			ASSERT(proc != NULL);
    547 			ASSERT(arg != NULL);
    548 			ASSERT(mp->b_next == NULL);
    549 
    550 #if SQUEUE_DEBUG
    551 			sqp->sq_isintr = interrupt;
    552 			sqp->sq_curmp = mp;
    553 			sqp->sq_curproc = proc;
    554 			sqp->sq_connp = arg;
    555 			mp->b_tag = sqp->sq_tag = tag;
    556 #endif
    557 #if SQUEUE_PROFILE
    558 			if (SQ_PROFILING(sqp)) {
    559 				if (interrupt)
    560 					SQSTAT(sqp, sq_npackets_intr);
    561 				else
    562 					SQSTAT(sqp, sq_npackets_other);
    563 				start = gethrtime();
    564 			}
    565 #endif
    566 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
    567 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
    568 			    sqp, mblk_t *, mp, conn_t *, arg);
    569 			(*proc)(arg, mp, sqp);
    570 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
    571 			    sqp, conn_t *, arg);
    572 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
    573 
    574 #if SQUEUE_PROFILE
    575 			if (SQ_PROFILING(sqp)) {
    576 				delta = gethrtime() - start;
    577 				if (interrupt)
    578 					SQDELTA(sqp, sq_time_intr, delta);
    579 				else
    580 					SQDELTA(sqp, sq_time_other, delta);
    581 			}
    582 #endif
    583 #if SQUEUE_DEBUG
    584 			sqp->sq_curmp = NULL;
    585 			sqp->sq_curproc = NULL;
    586 			sqp->sq_connp = NULL;
    587 			sqp->sq_isintr = 0;
    588 #endif
    589 
    590 			CONN_DEC_REF((conn_t *)arg);
    591 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
    592 			mutex_enter(&sqp->sq_lock);
    593 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
    594 			if (sqp->sq_first == NULL) {
    595 				/*
    596 				 * We processed inline our packet and
    597 				 * nothing new has arrived. We are done.
    598 				 */
    599 				sqp->sq_run = NULL;
    600 				mutex_exit(&sqp->sq_lock);
    601 				return;
    602 			} else if (sqp->sq_bind != CPU->cpu_id) {
    603 				/*
    604 				 * If the current thread is not running
    605 				 * on the CPU to which this squeue is bound,
    606 				 * then don't allow it to drain.
    607 				 */
    608 				sqp->sq_run = NULL;
    609 				SQUEUE_WORKER_WAKEUP(sqp);
    610 				return;
    611 			}
    612 		} else {
    613 			ENQUEUE_CHAIN(sqp, mp, tail, cnt);
    614 #if SQUEUE_DEBUG
    615 			mp->b_tag = tag;
    616 #endif
    617 #if SQUEUE_PROFILE
    618 			if (SQ_PROFILING(sqp)) {
    619 				if (servicing_interrupt())
    620 					SQSTAT(sqp, sq_nqueued_intr);
    621 				else
    622 					SQSTAT(sqp, sq_nqueued_other);
    623 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
    624 					sqp->sq_stats.sq_max_qlen =
    625 					    sqp->sq_count;
    626 			}
    627 #endif
    628 		}
    629 
    630 		/*
    631 		 * We are here because either we couldn't do inline
    632 		 * processing (because something was already queued),
    633 		 * or we had a chanin of more than one packet,
    634 		 * or something else arrived after we were done with
    635 		 * inline processing.
    636 		 */
    637 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
    638 		ASSERT(sqp->sq_first != NULL);
    639 
    640 #if SQUEUE_PROFILE
    641 		if (SQ_PROFILING(sqp)) {
    642 			start = gethrtime();
    643 		}
    644 #endif
    645 #if SQUEUE_DEBUG
    646 		sqp->sq_isintr = interrupt;
    647 #endif
    648 
    649 		now = gethrtime();
    650 		if (interrupt) {
    651 			squeue_drain(sqp, SQS_ENTER, now +
    652 			    squeue_intrdrain_ns);
    653 		} else {
    654 			squeue_drain(sqp, SQS_USER, now +
    655 			    squeue_writerdrain_ns);
    656 		}
    657 
    658 #if SQUEUE_PROFILE
    659 		if (SQ_PROFILING(sqp)) {
    660 			delta = gethrtime() - start;
    661 			if (interrupt)
    662 				SQDELTA(sqp, sq_time_intr, delta);
    663 			else
    664 				SQDELTA(sqp, sq_time_other, delta);
    665 		}
    666 #endif
    667 #if SQUEUE_DEBUG
    668 		sqp->sq_isintr = 0;
    669 #endif
    670 
    671 		/*
    672 		 * If we didn't do a complete drain, the worker
    673 		 * thread was already signalled by squeue_drain.
    674 		 */
    675 		sqp->sq_run = NULL;
    676 		mutex_exit(&sqp->sq_lock);
    677 		return;
    678 	} else {
    679 		ASSERT(sqp->sq_run != NULL);
    680 		/*
    681 		 * Queue is already being processed. Just enqueue
    682 		 * the packet and go away.
    683 		 */
    684 #if SQUEUE_DEBUG
    685 		mp->b_tag = tag;
    686 #endif
    687 #if SQUEUE_PROFILE
    688 		if (SQ_PROFILING(sqp)) {
    689 			if (servicing_interrupt())
    690 				SQSTAT(sqp, sq_nqueued_intr);
    691 			else
    692 				SQSTAT(sqp, sq_nqueued_other);
    693 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
    694 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
    695 		}
    696 #endif
    697 
    698 		ENQUEUE_CHAIN(sqp, mp, tail, cnt);
    699 		mutex_exit(&sqp->sq_lock);
    700 		return;
    701 	}
    702 }
    703 
    704 /*
    705  * squeue_enter() - enter squeue *sqp with mblk *mp with argument of *arg.
    706  */
    707 void
    708 squeue_enter(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
    709     uint8_t tag)
    710 {
    711 	int	interrupt = servicing_interrupt();
    712 	hrtime_t now;
    713 #if SQUEUE_PROFILE
    714 	hrtime_t start, delta;
    715 #endif
    716 #if SQUEUE_DEBUG
    717 	conn_t 	*connp = (conn_t *)arg;
    718 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
    719 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
    720 #endif
    721 
    722 	ASSERT(proc != NULL);
    723 	ASSERT(sqp != NULL);
    724 	ASSERT(mp != NULL);
    725 	ASSERT(mp->b_next == NULL);
    726 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
    727 
    728 	mutex_enter(&sqp->sq_lock);
    729 	if (!(sqp->sq_state & SQS_PROC)) {
    730 		/*
    731 		 * See if anything is already queued. If we are the
    732 		 * first packet, do inline processing else queue the
    733 		 * packet and do the drain.
    734 		 */
    735 		sqp->sq_run = curthread;
    736 		if (sqp->sq_first == NULL) {
    737 			/*
    738 			 * Fast-path, ok to process and nothing queued.
    739 			 */
    740 			sqp->sq_state |= (SQS_PROC|SQS_FAST);
    741 			mutex_exit(&sqp->sq_lock);
    742 
    743 #if SQUEUE_DEBUG
    744 			sqp->sq_isintr = interrupt;
    745 			sqp->sq_curmp = mp;
    746 			sqp->sq_curproc = proc;
    747 			sqp->sq_connp = connp;
    748 			mp->b_tag = sqp->sq_tag = tag;
    749 #endif
    750 #if SQUEUE_PROFILE
    751 			if (SQ_PROFILING(sqp)) {
    752 				if (interrupt)
    753 					SQSTAT(sqp, sq_npackets_intr);
    754 				else
    755 					SQSTAT(sqp, sq_npackets_other);
    756 				start = gethrtime();
    757 			}
    758 #endif
    759 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
    760 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
    761 			    sqp, mblk_t *, mp, conn_t *, arg);
    762 			(*proc)(arg, mp, sqp);
    763 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
    764 			    sqp, conn_t *, arg);
    765 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
    766 
    767 #if SQUEUE_PROFILE
    768 			if (SQ_PROFILING(sqp)) {
    769 				delta = gethrtime() - start;
    770 				if (interrupt)
    771 					SQDELTA(sqp, sq_time_intr, delta);
    772 				else
    773 					SQDELTA(sqp, sq_time_other, delta);
    774 			}
    775 #endif
    776 #if SQUEUE_DEBUG
    777 			sqp->sq_curmp = NULL;
    778 			sqp->sq_curproc = NULL;
    779 			sqp->sq_connp = NULL;
    780 			sqp->sq_isintr = 0;
    781 #endif
    782 
    783 			CONN_DEC_REF((conn_t *)arg);
    784 			ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
    785 			mutex_enter(&sqp->sq_lock);
    786 			sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
    787 			if (sqp->sq_first == NULL) {
    788 				/*
    789 				 * We processed inline our packet and
    790 				 * nothing new has arrived. We are done.
    791 				 */
    792 				sqp->sq_run = NULL;
    793 				mutex_exit(&sqp->sq_lock);
    794 				return;
    795 			} else if (sqp->sq_bind != CPU->cpu_id) {
    796 				/*
    797 				 * If the current thread is not running
    798 				 * on the CPU to which this squeue is bound,
    799 				 * then don't allow it to drain.
    800 				 */
    801 				sqp->sq_run = NULL;
    802 				SQUEUE_WORKER_WAKEUP(sqp);
    803 				return;
    804 			}
    805 		} else {
    806 			ENQUEUE_MP(sqp, mp, proc, arg);
    807 #if SQUEUE_DEBUG
    808 			mp->b_tag = tag;
    809 #endif
    810 #if SQUEUE_PROFILE
    811 			if (SQ_PROFILING(sqp)) {
    812 				if (servicing_interrupt())
    813 					SQSTAT(sqp, sq_nqueued_intr);
    814 				else
    815 					SQSTAT(sqp, sq_nqueued_other);
    816 				if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
    817 					sqp->sq_stats.sq_max_qlen =
    818 					    sqp->sq_count;
    819 			}
    820 #endif
    821 		}
    822 
    823 		/*
    824 		 * We are here because either we couldn't do inline
    825 		 * processing (because something was already queued)
    826 		 * or something else arrived after we were done with
    827 		 * inline processing.
    828 		 */
    829 		ASSERT(MUTEX_HELD(&sqp->sq_lock));
    830 		ASSERT(sqp->sq_first != NULL);
    831 
    832 #if SQUEUE_PROFILE
    833 		if (SQ_PROFILING(sqp)) {
    834 			start = gethrtime();
    835 		}
    836 #endif
    837 #if SQUEUE_DEBUG
    838 		sqp->sq_isintr = interrupt;
    839 #endif
    840 
    841 		now = gethrtime();
    842 		if (interrupt) {
    843 			squeue_drain(sqp, SQS_ENTER, now +
    844 			    squeue_intrdrain_ns);
    845 		} else {
    846 			squeue_drain(sqp, SQS_USER, now +
    847 			    squeue_writerdrain_ns);
    848 		}
    849 
    850 #if SQUEUE_PROFILE
    851 		if (SQ_PROFILING(sqp)) {
    852 			delta = gethrtime() - start;
    853 			if (interrupt)
    854 				SQDELTA(sqp, sq_time_intr, delta);
    855 			else
    856 				SQDELTA(sqp, sq_time_other, delta);
    857 		}
    858 #endif
    859 #if SQUEUE_DEBUG
    860 		sqp->sq_isintr = 0;
    861 #endif
    862 
    863 		/*
    864 		 * If we didn't do a complete drain, the worker
    865 		 * thread was already signalled by squeue_drain.
    866 		 */
    867 		sqp->sq_run = NULL;
    868 		mutex_exit(&sqp->sq_lock);
    869 		return;
    870 	} else {
    871 		ASSERT(sqp->sq_run != NULL);
    872 		/*
    873 		 * We let a thread processing a squeue reenter only
    874 		 * once. This helps the case of incoming connection
    875 		 * where a SYN-ACK-ACK that triggers the conn_ind
    876 		 * doesn't have to queue the packet if listener and
    877 		 * eager are on the same squeue. Also helps the
    878 		 * loopback connection where the two ends are bound
    879 		 * to the same squeue (which is typical on single
    880 		 * CPU machines).
    881 		 * We let the thread reenter only once for the fear
    882 		 * of stack getting blown with multiple traversal.
    883 		 */
    884 		if (!(sqp->sq_state & SQS_REENTER) &&
    885 		    (sqp->sq_run == curthread) && sqp->sq_first == NULL &&
    886 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
    887 			sqp->sq_state |= SQS_REENTER;
    888 			mutex_exit(&sqp->sq_lock);
    889 
    890 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
    891 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
    892 			    sqp, mblk_t *, mp, conn_t *, arg);
    893 			(*proc)(arg, mp, sqp);
    894 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
    895 			    sqp, conn_t *, arg);
    896 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
    897 			CONN_DEC_REF((conn_t *)arg);
    898 
    899 			mutex_enter(&sqp->sq_lock);
    900 			sqp->sq_state &= ~SQS_REENTER;
    901 			mutex_exit(&sqp->sq_lock);
    902 			return;
    903 		}
    904 		/*
    905 		 * Queue is already being processed. Just enqueue
    906 		 * the packet and go away.
    907 		 */
    908 #if SQUEUE_DEBUG
    909 		mp->b_tag = tag;
    910 #endif
    911 #if SQUEUE_PROFILE
    912 		if (SQ_PROFILING(sqp)) {
    913 			if (servicing_interrupt())
    914 				SQSTAT(sqp, sq_nqueued_intr);
    915 			else
    916 				SQSTAT(sqp, sq_nqueued_other);
    917 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
    918 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
    919 		}
    920 #endif
    921 
    922 		ENQUEUE_MP(sqp, mp, proc, arg);
    923 		mutex_exit(&sqp->sq_lock);
    924 		return;
    925 	}
    926 }
    927 
    928 void
    929 squeue_enter_nodrain(squeue_t *sqp, mblk_t *mp, sqproc_t proc, void *arg,
    930     uint8_t tag)
    931 {
    932 	int		interrupt = servicing_interrupt();
    933 	boolean_t	being_processed;
    934 #if SQUEUE_DEBUG
    935 	conn_t 		*connp = (conn_t *)arg;
    936 #endif
    937 #if SQUEUE_PROFILE
    938 	hrtime_t 	start, delta;
    939 #endif
    940 
    941 	ASSERT(proc != NULL);
    942 	ASSERT(sqp != NULL);
    943 	ASSERT(mp != NULL);
    944 	ASSERT(mp->b_next == NULL);
    945 	ASSERT(!IPCL_IS_TCP(connp) || connp->conn_tcp->tcp_connp == connp);
    946 	ASSERT(!IPCL_IS_UDP(connp) || connp->conn_udp->udp_connp == connp);
    947 	ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
    948 
    949 	mutex_enter(&sqp->sq_lock);
    950 
    951 	being_processed = (sqp->sq_state & SQS_PROC);
    952 	if (!being_processed && (sqp->sq_first == NULL)) {
    953 		/*
    954 		 * Fast-path, ok to process and nothing queued.
    955 		 */
    956 		sqp->sq_state |= (SQS_PROC|SQS_FAST);
    957 		sqp->sq_run = curthread;
    958 		mutex_exit(&sqp->sq_lock);
    959 
    960 #if SQUEUE_DEBUG
    961 		sqp->sq_isintr = interrupt;
    962 		sqp->sq_curmp = mp;
    963 		sqp->sq_curproc = proc;
    964 		sqp->sq_connp = connp;
    965 		mp->b_tag = sqp->sq_tag = tag;
    966 #endif
    967 
    968 #if SQUEUE_PROFILE
    969 		if (SQ_PROFILING(sqp)) {
    970 			if (interrupt)
    971 				SQSTAT(sqp, sq_npackets_intr);
    972 			else
    973 				SQSTAT(sqp, sq_npackets_other);
    974 			start = gethrtime();
    975 		}
    976 #endif
    977 
    978 		((conn_t *)arg)->conn_on_sqp = B_TRUE;
    979 		DTRACE_PROBE3(squeue__proc__start, squeue_t *,
    980 		    sqp, mblk_t *, mp, conn_t *, arg);
    981 		(*proc)(arg, mp, sqp);
    982 		DTRACE_PROBE2(squeue__proc__end, squeue_t *,
    983 		    sqp, conn_t *, arg);
    984 		((conn_t *)arg)->conn_on_sqp = B_FALSE;
    985 
    986 #if SQUEUE_DEBUG
    987 		sqp->sq_curmp = NULL;
    988 		sqp->sq_curproc = NULL;
    989 		sqp->sq_connp = NULL;
    990 		sqp->sq_isintr = 0;
    991 #endif
    992 #if SQUEUE_PROFILE
    993 		if (SQ_PROFILING(sqp)) {
    994 			delta = gethrtime() - start;
    995 			if (interrupt)
    996 				SQDELTA(sqp, sq_time_intr, delta);
    997 			else
    998 				SQDELTA(sqp, sq_time_other, delta);
    999 		}
   1000 #endif
   1001 
   1002 		CONN_DEC_REF((conn_t *)arg);
   1003 		mutex_enter(&sqp->sq_lock);
   1004 		sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
   1005 		sqp->sq_run = NULL;
   1006 		if (sqp->sq_first == NULL) {
   1007 			/*
   1008 			 * We processed inline our packet and
   1009 			 * nothing new has arrived. We are done.
   1010 			 */
   1011 			mutex_exit(&sqp->sq_lock);
   1012 		} else {
   1013 			SQUEUE_WORKER_WAKEUP(sqp);
   1014 		}
   1015 		return;
   1016 	} else {
   1017 		/*
   1018 		 * We let a thread processing a squeue reenter only
   1019 		 * once. This helps the case of incoming connection
   1020 		 * where a SYN-ACK-ACK that triggers the conn_ind
   1021 		 * doesn't have to queue the packet if listener and
   1022 		 * eager are on the same squeue. Also helps the
   1023 		 * loopback connection where the two ends are bound
   1024 		 * to the same squeue (which is typical on single
   1025 		 * CPU machines).
   1026 		 * We let the thread reenter only once for the fear
   1027 		 * of stack getting blown with multiple traversal.
   1028 		 */
   1029 		if (being_processed && !(sqp->sq_state & SQS_REENTER) &&
   1030 		    (sqp->sq_run == curthread) && sqp->sq_first == NULL &&
   1031 		    (((conn_t *)arg)->conn_on_sqp == B_FALSE)) {
   1032 			sqp->sq_state |= SQS_REENTER;
   1033 			mutex_exit(&sqp->sq_lock);
   1034 
   1035 			((conn_t *)arg)->conn_on_sqp = B_TRUE;
   1036 			DTRACE_PROBE3(squeue__proc__start, squeue_t *,
   1037 			    sqp, mblk_t *, mp, conn_t *, arg);
   1038 			(*proc)(arg, mp, sqp);
   1039 			DTRACE_PROBE2(squeue__proc__end, squeue_t *,
   1040 			    sqp, conn_t *, arg);
   1041 			((conn_t *)arg)->conn_on_sqp = B_FALSE;
   1042 			CONN_DEC_REF((conn_t *)arg);
   1043 
   1044 			mutex_enter(&sqp->sq_lock);
   1045 			sqp->sq_state &= ~SQS_REENTER;
   1046 			mutex_exit(&sqp->sq_lock);
   1047 			return;
   1048 		}
   1049 
   1050 #if SQUEUE_DEBUG
   1051 		mp->b_tag = tag;
   1052 #endif
   1053 #if SQUEUE_PROFILE
   1054 		if (SQ_PROFILING(sqp)) {
   1055 			if (servicing_interrupt())
   1056 				SQSTAT(sqp, sq_nqueued_intr);
   1057 			else
   1058 				SQSTAT(sqp, sq_nqueued_other);
   1059 			if (sqp->sq_stats.sq_max_qlen < sqp->sq_count)
   1060 				sqp->sq_stats.sq_max_qlen = sqp->sq_count;
   1061 		}
   1062 #endif
   1063 		ENQUEUE_MP(sqp, mp, proc, arg);
   1064 		if (being_processed) {
   1065 			/*
   1066 			 * Queue is already being processed.
   1067 			 * No need to do anything.
   1068 			 */
   1069 			mutex_exit(&sqp->sq_lock);
   1070 			return;
   1071 		}
   1072 		SQUEUE_WORKER_WAKEUP(sqp);
   1073 	}