Home | History | Annotate | Download | only in os
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
     22 /*	  All Rights Reserved  	*/
     23 
     24 
     25 /*
     26  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     27  * Use is subject to license terms.
     28  */
     29 
     30 #include <sys/types.h>
     31 #include <sys/sysmacros.h>
     32 #include <sys/param.h>
     33 #include <sys/errno.h>
     34 #include <sys/signal.h>
     35 #include <sys/proc.h>
     36 #include <sys/conf.h>
     37 #include <sys/cred.h>
     38 #include <sys/user.h>
     39 #include <sys/vnode.h>
     40 #include <sys/file.h>
     41 #include <sys/session.h>
     42 #include <sys/stream.h>
     43 #include <sys/strsubr.h>
     44 #include <sys/stropts.h>
     45 #include <sys/poll.h>
     46 #include <sys/systm.h>
     47 #include <sys/cpuvar.h>
     48 #include <sys/uio.h>
     49 #include <sys/cmn_err.h>
     50 #include <sys/priocntl.h>
     51 #include <sys/procset.h>
     52 #include <sys/vmem.h>
     53 #include <sys/bitmap.h>
     54 #include <sys/kmem.h>
     55 #include <sys/siginfo.h>
     56 #include <sys/vtrace.h>
     57 #include <sys/callb.h>
     58 #include <sys/debug.h>
     59 #include <sys/modctl.h>
     60 #include <sys/vmsystm.h>
     61 #include <vm/page.h>
     62 #include <sys/atomic.h>
     63 #include <sys/suntpi.h>
     64 #include <sys/strlog.h>
     65 #include <sys/promif.h>
     66 #include <sys/project.h>
     67 #include <sys/vm.h>
     68 #include <sys/taskq.h>
     69 #include <sys/sunddi.h>
     70 #include <sys/sunldi_impl.h>
     71 #include <sys/strsun.h>
     72 #include <sys/isa_defs.h>
     73 #include <sys/multidata.h>
     74 #include <sys/pattr.h>
     75 #include <sys/strft.h>
     76 #include <sys/fs/snode.h>
     77 #include <sys/zone.h>
     78 #include <sys/open.h>
     79 #include <sys/sunldi.h>
     80 #include <sys/sad.h>
     81 #include <sys/netstack.h>
     82 
     83 #define	O_SAMESTR(q)	(((q)->q_next) && \
     84 	(((q)->q_flag & QREADR) == ((q)->q_next->q_flag & QREADR)))
     85 
     86 /*
     87  * WARNING:
     88  * The variables and routines in this file are private, belonging
     89  * to the STREAMS subsystem. These should not be used by modules
     90  * or drivers. Compatibility will not be guaranteed.
     91  */
     92 
     93 /*
     94  * Id value used to distinguish between different multiplexor links.
     95  */
     96 static int32_t lnk_id = 0;
     97 
     98 #define	STREAMS_LOPRI MINCLSYSPRI
     99 static pri_t streams_lopri = STREAMS_LOPRI;
    100 
    101 #define	STRSTAT(x)	(str_statistics.x.value.ui64++)
    102 typedef struct str_stat {
    103 	kstat_named_t	sqenables;
    104 	kstat_named_t	stenables;
    105 	kstat_named_t	syncqservice;
    106 	kstat_named_t	freebs;
    107 	kstat_named_t	qwr_outer;
    108 	kstat_named_t	rservice;
    109 	kstat_named_t	strwaits;
    110 	kstat_named_t	taskqfails;
    111 	kstat_named_t	bufcalls;
    112 	kstat_named_t	qhelps;
    113 	kstat_named_t	qremoved;
    114 	kstat_named_t	sqremoved;
    115 	kstat_named_t	bcwaits;
    116 	kstat_named_t	sqtoomany;
    117 } str_stat_t;
    118 
    119 static str_stat_t str_statistics = {
    120 	{ "sqenables",		KSTAT_DATA_UINT64 },
    121 	{ "stenables",		KSTAT_DATA_UINT64 },
    122 	{ "syncqservice",	KSTAT_DATA_UINT64 },
    123 	{ "freebs",		KSTAT_DATA_UINT64 },
    124 	{ "qwr_outer",		KSTAT_DATA_UINT64 },
    125 	{ "rservice",		KSTAT_DATA_UINT64 },
    126 	{ "strwaits",		KSTAT_DATA_UINT64 },
    127 	{ "taskqfails",		KSTAT_DATA_UINT64 },
    128 	{ "bufcalls",		KSTAT_DATA_UINT64 },
    129 	{ "qhelps",		KSTAT_DATA_UINT64 },
    130 	{ "qremoved",		KSTAT_DATA_UINT64 },
    131 	{ "sqremoved",		KSTAT_DATA_UINT64 },
    132 	{ "bcwaits",		KSTAT_DATA_UINT64 },
    133 	{ "sqtoomany",		KSTAT_DATA_UINT64 },
    134 };
    135 
    136 static kstat_t *str_kstat;
    137 
    138 /*
    139  * qrunflag was used previously to control background scheduling of queues. It
    140  * is not used anymore, but kept here in case some module still wants to access
    141  * it via qready() and setqsched macros.
    142  */
    143 char qrunflag;			/*  Unused */
    144 
    145 /*
    146  * Most of the streams scheduling is done via task queues. Task queues may fail
    147  * for non-sleep dispatches, so there are two backup threads servicing failed
    148  * requests for queues and syncqs. Both of these threads also service failed
    149  * dispatches freebs requests. Queues are put in the list specified by `qhead'
    150  * and `qtail' pointers, syncqs use `sqhead' and `sqtail' pointers and freebs
    151  * requests are put into `freebs_list' which has no tail pointer. All three
    152  * lists are protected by a single `service_queue' lock and use
    153  * `services_to_run' condition variable for signaling background threads. Use of
    154  * a single lock should not be a problem because it is only used under heavy
    155  * loads when task queues start to fail and at that time it may be a good idea
    156  * to throttle scheduling requests.
    157  *
    158  * NOTE: queues and syncqs should be scheduled by two separate threads because
    159  * queue servicing may be blocked waiting for a syncq which may be also
    160  * scheduled for background execution. This may create a deadlock when only one
    161  * thread is used for both.
    162  */
    163 
    164 static taskq_t *streams_taskq;		/* Used for most STREAMS scheduling */
    165 
    166 static kmutex_t service_queue;		/* protects all of servicing vars */
    167 static kcondvar_t services_to_run;	/* wake up background service thread */
    168 static kcondvar_t syncqs_to_run;	/* wake up background service thread */
    169 
    170 /*
    171  * List of queues scheduled for background processing due to lack of resources
    172  * in the task queues. Protected by service_queue lock;
    173  */
    174 static struct queue *qhead;
    175 static struct queue *qtail;
    176 
    177 /*
    178  * Same list for syncqs
    179  */
    180 static syncq_t *sqhead;
    181 static syncq_t *sqtail;
    182 
    183 static mblk_t *freebs_list;	/* list of buffers to free */
    184 
    185 /*
    186  * Backup threads for servicing queues and syncqs
    187  */
    188 kthread_t *streams_qbkgrnd_thread;
    189 kthread_t *streams_sqbkgrnd_thread;
    190 
    191 /*
    192  * Bufcalls related variables.
    193  */
    194 struct bclist	strbcalls;	/* list of waiting bufcalls */
    195 kmutex_t	strbcall_lock;	/* protects bufcall list (strbcalls) */
    196 kcondvar_t	strbcall_cv;	/* Signaling when a bufcall is added */
    197 kmutex_t	bcall_monitor;	/* sleep/wakeup style monitor */
    198 kcondvar_t	bcall_cv;	/* wait 'till executing bufcall completes */
    199 kthread_t	*bc_bkgrnd_thread; /* Thread to service bufcall requests */
    200 
    201 kmutex_t	strresources;	/* protects global resources */
    202 kmutex_t	muxifier;	/* single-threads multiplexor creation */
    203 
    204 static void	*str_stack_init(netstackid_t stackid, netstack_t *ns);
    205 static void	str_stack_shutdown(netstackid_t stackid, void *arg);
    206 static void	str_stack_fini(netstackid_t stackid, void *arg);
    207 
    208 extern void	time_to_wait(clock_t *, clock_t);
    209 
    210 /*
    211  * run_queues is no longer used, but is kept in case some 3rd party
    212  * module/driver decides to use it.
    213  */
    214 int run_queues = 0;
    215 
    216 /*
    217  * sq_max_size is the depth of the syncq (in number of messages) before
    218  * qfill_syncq() starts QFULL'ing destination queues. As its primary
    219  * consumer - IP is no longer D_MTPERMOD, but there may be other
    220  * modules/drivers depend on this syncq flow control, we prefer to
    221  * choose a large number as the default value. For potential
    222  * performance gain, this value is tunable in /etc/system.
    223  */
    224 int sq_max_size = 10000;
    225 
    226 /*
    227  * The number of ciputctrl structures per syncq and stream we create when
    228  * needed.
    229  */
    230 int n_ciputctrl;
    231 int max_n_ciputctrl = 16;
    232 /*
    233  * If n_ciputctrl is < min_n_ciputctrl don't even create ciputctrl_cache.
    234  */
    235 int min_n_ciputctrl = 2;
    236 
    237 /*
    238  * Per-driver/module syncqs
    239  * ========================
    240  *
    241  * For drivers/modules that use PERMOD or outer syncqs we keep a list of
    242  * perdm structures, new entries being added (and new syncqs allocated) when
    243  * setq() encounters a module/driver with a streamtab that it hasn't seen
    244  * before.
    245  * The reason for this mechanism is that some modules and drivers share a
    246  * common streamtab and it is necessary for those modules and drivers to also
    247  * share a common PERMOD syncq.
    248  *
    249  * perdm_list --> dm_str == streamtab_1
    250  *                dm_sq == syncq_1
    251  *                dm_ref
    252  *                dm_next --> dm_str == streamtab_2
    253  *                            dm_sq == syncq_2
    254  *                            dm_ref
    255  *                            dm_next --> ... NULL
    256  *
    257  * The dm_ref field is incremented for each new driver/module that takes
    258  * a reference to the perdm structure and hence shares the syncq.
    259  * References are held in the fmodsw_impl_t structure for each STREAMS module
    260  * or the dev_impl array (indexed by device major number) for each driver.
    261  *
    262  * perdm_list -> [dm_ref == 1] -> [dm_ref == 2] -> [dm_ref == 1] -> NULL
    263  *		     ^                 ^ ^               ^
    264  *                   |  ______________/  |               |
    265  *                   | /                 |               |
    266  * dev_impl:     ...|x|y|...          module A	      module B
    267  *
    268  * When a module/driver is unloaded the reference count is decremented and,
    269  * when it falls to zero, the perdm structure is removed from the list and
    270  * the syncq is freed (see rele_dm()).
    271  */
    272 perdm_t *perdm_list = NULL;
    273 static krwlock_t perdm_rwlock;
    274 cdevsw_impl_t *devimpl;
    275 
    276 extern struct qinit strdata;
    277 extern struct qinit stwdata;
    278 
    279 static void runservice(queue_t *);
    280 static void streams_bufcall_service(void);
    281 static void streams_qbkgrnd_service(void);
    282 static void streams_sqbkgrnd_service(void);
    283 static syncq_t *new_syncq(void);
    284 static void free_syncq(syncq_t *);
    285 static void outer_insert(syncq_t *, syncq_t *);
    286 static void outer_remove(syncq_t *, syncq_t *);
    287 static void write_now(syncq_t *);
    288 static void clr_qfull(queue_t *);
    289 static void runbufcalls(void);
    290 static void sqenable(syncq_t *);
    291 static void sqfill_events(syncq_t *, queue_t *, mblk_t *, void (*)());
    292 static void wait_q_syncq(queue_t *);
    293 static void backenable_insertedq(queue_t *);
    294 
    295 static void queue_service(queue_t *);
    296 static void stream_service(stdata_t *);
    297 static void syncq_service(syncq_t *);
    298 static void qwriter_outer_service(syncq_t *);
    299 static void mblk_free(mblk_t *);
    300 #ifdef DEBUG
    301 static int qprocsareon(queue_t *);
    302 #endif
    303 
    304 static void set_nfsrv_ptr(queue_t *, queue_t *, queue_t *, queue_t *);
    305 static void reset_nfsrv_ptr(queue_t *, queue_t *);
    306 void set_qfull(queue_t *);
    307 
    308 static void sq_run_events(syncq_t *);
    309 static int propagate_syncq(queue_t *);
    310 
    311 static void	blocksq(syncq_t *, ushort_t, int);
    312 static void	unblocksq(syncq_t *, ushort_t, int);
    313 static int	dropsq(syncq_t *, uint16_t);
    314 static void	emptysq(syncq_t *);
    315 static sqlist_t *sqlist_alloc(struct stdata *, int);
    316 static void	sqlist_free(sqlist_t *);
    317 static sqlist_t	*sqlist_build(queue_t *, struct stdata *, boolean_t);
    318 static void	sqlist_insert(sqlist_t *, syncq_t *);
    319 static void	sqlist_insertall(sqlist_t *, queue_t *);
    320 
    321 static void	strsetuio(stdata_t *);
    322 
    323 struct kmem_cache *stream_head_cache;
    324 struct kmem_cache *queue_cache;
    325 struct kmem_cache *syncq_cache;
    326 struct kmem_cache *qband_cache;
    327 struct kmem_cache *linkinfo_cache;
    328 struct kmem_cache *ciputctrl_cache = NULL;
    329 
    330 static linkinfo_t *linkinfo_list;
    331 
    332 /* Global esballoc throttling queue */
    333 static esb_queue_t	system_esbq;
    334 
    335 /*
    336  * esballoc tunable parameters.
    337  */
    338 int		esbq_max_qlen = 0x16;	/* throttled queue length */
    339 clock_t		esbq_timeout = 0x8;	/* timeout to process esb queue */
    340 
    341 /*
    342  * Routines to handle esballoc queueing.
    343  */
    344 static void esballoc_process_queue(esb_queue_t *);
    345 static void esballoc_enqueue_mblk(mblk_t *);
    346 static void esballoc_timer(void *);
    347 static void esballoc_set_timer(esb_queue_t *, clock_t);
    348 static void esballoc_mblk_free(mblk_t *);
    349 
    350 /*
    351  *  Qinit structure and Module_info structures
    352  *	for passthru read and write queues
    353  */
    354 
    355 static void pass_wput(queue_t *, mblk_t *);
    356 static queue_t *link_addpassthru(stdata_t *);
    357 static void link_rempassthru(queue_t *);
    358 
    359 struct  module_info passthru_info = {
    360 	0,
    361 	"passthru",
    362 	0,
    363 	INFPSZ,
    364 	STRHIGH,
    365 	STRLOW
    366 };
    367 
    368 struct  qinit passthru_rinit = {
    369 	(int (*)())putnext,
    370 	NULL,
    371 	NULL,
    372 	NULL,
    373 	NULL,
    374 	&passthru_info,
    375 	NULL
    376 };
    377 
    378 struct  qinit passthru_winit = {
    379 	(int (*)()) pass_wput,
    380 	NULL,
    381 	NULL,
    382 	NULL,
    383 	NULL,
    384 	&passthru_info,
    385 	NULL
    386 };
    387 
    388 /*
    389  * Special form of assertion: verify that X implies Y i.e. when X is true Y
    390  * should also be true.
    391  */
    392 #define	IMPLY(X, Y)	ASSERT(!(X) || (Y))
    393 
    394 /*
    395  * Logical equivalence. Verify that both X and Y are either TRUE or FALSE.
    396  */
    397 #define	EQUIV(X, Y)	{ IMPLY(X, Y); IMPLY(Y, X); }
    398 
    399 /*
    400  * Verify correctness of list head/tail pointers.
    401  */
    402 #define	LISTCHECK(head, tail, link) {				\
    403 	EQUIV(head, tail);					\
    404 	IMPLY(tail != NULL, tail->link == NULL);		\
    405 }
    406 
    407 /*
    408  * Enqueue a list element `el' in the end of a list denoted by `head' and `tail'
    409  * using a `link' field.
    410  */
    411 #define	ENQUEUE(el, head, tail, link) {				\
    412 	ASSERT(el->link == NULL);				\
    413 	LISTCHECK(head, tail, link);				\
    414 	if (head == NULL)					\
    415 		head = el;					\
    416 	else							\
    417 		tail->link = el;				\
    418 	tail = el;						\
    419 }
    420 
    421 /*
    422  * Dequeue the first element of the list denoted by `head' and `tail' pointers
    423  * using a `link' field and put result into `el'.
    424  */
    425 #define	DQ(el, head, tail, link) {				\
    426 	LISTCHECK(head, tail, link);				\
    427 	el = head;						\
    428 	if (head != NULL) {					\
    429 		head = head->link;				\
    430 		if (head == NULL)				\
    431 			tail = NULL;				\
    432 		el->link = NULL;				\
    433 	}							\
    434 }
    435 
    436 /*
    437  * Remove `el' from the list using `chase' and `curr' pointers and return result
    438  * in `succeed'.
    439  */
    440 #define	RMQ(el, head, tail, link, chase, curr, succeed) {	\
    441 	LISTCHECK(head, tail, link);				\
    442 	chase = NULL;						\
    443 	succeed = 0;						\
    444 	for (curr = head; (curr != el) && (curr != NULL); curr = curr->link) \
    445 		chase = curr;					\
    446 	if (curr != NULL) {					\
    447 		succeed = 1;					\
    448 		ASSERT(curr == el);				\
    449 		if (chase != NULL)				\
    450 			chase->link = curr->link;		\
    451 		else						\
    452 			head = curr->link;			\
    453 		curr->link = NULL;				\
    454 		if (curr == tail)				\
    455 			tail = chase;				\
    456 	}							\
    457 	LISTCHECK(head, tail, link);				\
    458 }
    459 
    460 /* Handling of delayed messages on the inner syncq. */
    461 
    462 /*
    463  * DEBUG versions should use function versions (to simplify tracing) and
    464  * non-DEBUG kernels should use macro versions.
    465  */
    466 
    467 /*
    468  * Put a queue on the syncq list of queues.
    469  * Assumes SQLOCK held.
    470  */
    471 #define	SQPUT_Q(sq, qp)							\
    472 {									\
    473 	ASSERT(MUTEX_HELD(SQLOCK(sq)));					\
    474 	if (!(qp->q_sqflags & Q_SQQUEUED)) {				\
    475 		/* The queue should not be linked anywhere */		\
    476 		ASSERT((qp->q_sqprev == NULL) && (qp->q_sqnext == NULL)); \
    477 		/* Head and tail may only be NULL simultaneously */	\
    478 		EQUIV(sq->sq_head, sq->sq_tail);			\
    479 		/* Queue may be only enqueued on its syncq */		\
    480 		ASSERT(sq == qp->q_syncq);				\
    481 		/* Check the correctness of SQ_MESSAGES flag */		\
    482 		EQUIV(sq->sq_head, (sq->sq_flags & SQ_MESSAGES));	\
    483 		/* Sanity check first/last elements of the list */	\
    484 		IMPLY(sq->sq_head != NULL, sq->sq_head->q_sqprev == NULL);\
    485 		IMPLY(sq->sq_tail != NULL, sq->sq_tail->q_sqnext == NULL);\
    486 		/*							\
    487 		 * Sanity check of priority field: empty queue should	\
    488 		 * have zero priority					\
    489 		 * and nqueues equal to zero.				\
    490 		 */							\
    491 		IMPLY(sq->sq_head == NULL, sq->sq_pri == 0);		\
    492 		/* Sanity check of sq_nqueues field */			\
    493 		EQUIV(sq->sq_head, sq->sq_nqueues);			\
    494 		if (sq->sq_head == NULL) {				\
    495 			sq->sq_head = sq->sq_tail = qp;			\
    496 			sq->sq_flags |= SQ_MESSAGES;			\
    497 		} else if (qp->q_spri == 0) {				\
    498 			qp->q_sqprev = sq->sq_tail;			\
    499 			sq->sq_tail->q_sqnext = qp;			\
    500 			sq->sq_tail = qp;				\
    501 		} else {						\
    502 			/*						\
    503 			 * Put this queue in priority order: higher	\
    504 			 * priority gets closer to the head.		\
    505 			 */						\
    506 			queue_t **qpp = &sq->sq_tail;			\
    507 			queue_t *qnext = NULL;				\
    508 									\
    509 			while (*qpp != NULL && qp->q_spri > (*qpp)->q_spri) { \
    510 				qnext = *qpp;				\
    511 				qpp = &(*qpp)->q_sqprev;		\
    512 			}						\
    513 			qp->q_sqnext = qnext;				\
    514 			qp->q_sqprev = *qpp;				\
    515 			if (*qpp != NULL) {				\
    516 				(*qpp)->q_sqnext = qp;			\
    517 			} else {					\
    518 				sq->sq_head = qp;			\
    519 				sq->sq_pri = sq->sq_head->q_spri;	\
    520 			}						\
    521 			*qpp = qp;					\
    522 		}							\
    523 		qp->q_sqflags |= Q_SQQUEUED;				\
    524 		qp->q_sqtstamp = lbolt;					\
    525 		sq->sq_nqueues++;					\
    526 	}								\
    527 }
    528 
    529 /*
    530  * Remove a queue from the syncq list
    531  * Assumes SQLOCK held.
    532  */
    533 #define	SQRM_Q(sq, qp)							\
    534 	{								\
    535 		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
    536 		ASSERT(qp->q_sqflags & Q_SQQUEUED);			\
    537 		ASSERT(sq->sq_head != NULL && sq->sq_tail != NULL);	\
    538 		ASSERT((sq->sq_flags & SQ_MESSAGES) != 0);		\
    539 		/* Check that the queue is actually in the list */	\
    540 		ASSERT(qp->q_sqnext != NULL || sq->sq_tail == qp);	\
    541 		ASSERT(qp->q_sqprev != NULL || sq->sq_head == qp);	\
    542 		ASSERT(sq->sq_nqueues != 0);				\
    543 		if (qp->q_sqprev == NULL) {				\
    544 			/* First queue on list, make head q_sqnext */	\
    545 			sq->sq_head = qp->q_sqnext;			\
    546 		} else {						\
    547 			/* Make prev->next == next */			\
    548 			qp->q_sqprev->q_sqnext = qp->q_sqnext;		\
    549 		}							\
    550 		if (qp->q_sqnext == NULL) {				\
    551 			/* Last queue on list, make tail sqprev */	\
    552 			sq->sq_tail = qp->q_sqprev;			\
    553 		} else {						\
    554 			/* Make next->prev == prev */			\
    555 			qp->q_sqnext->q_sqprev = qp->q_sqprev;		\
    556 		}							\
    557 		/* clear out references on this queue */		\
    558 		qp->q_sqprev = qp->q_sqnext = NULL;			\
    559 		qp->q_sqflags &= ~Q_SQQUEUED;				\
    560 		/* If there is nothing queued, clear SQ_MESSAGES */	\
    561 		if (sq->sq_head != NULL) {				\
    562 			sq->sq_pri = sq->sq_head->q_spri;		\
    563 		} else	{						\
    564 			sq->sq_flags &= ~SQ_MESSAGES;			\
    565 			sq->sq_pri = 0;					\
    566 		}							\
    567 		sq->sq_nqueues--;					\
    568 		ASSERT(sq->sq_head != NULL || sq->sq_evhead != NULL ||	\
    569 		    (sq->sq_flags & SQ_QUEUED) == 0);			\
    570 	}
    571 
    572 /* Hide the definition from the header file. */
    573 #ifdef SQPUT_MP
    574 #undef SQPUT_MP
    575 #endif
    576 
    577 /*
    578  * Put a message on the queue syncq.
    579  * Assumes QLOCK held.
    580  */
    581 #define	SQPUT_MP(qp, mp)						\
    582 	{								\
    583 		ASSERT(MUTEX_HELD(QLOCK(qp)));				\
    584 		ASSERT(qp->q_sqhead == NULL ||				\
    585 		    (qp->q_sqtail != NULL &&				\
    586 		    qp->q_sqtail->b_next == NULL));			\
    587 		qp->q_syncqmsgs++;					\
    588 		ASSERT(qp->q_syncqmsgs != 0);	/* Wraparound */	\
    589 		if (qp->q_sqhead == NULL) {				\
    590 			qp->q_sqhead = qp->q_sqtail = mp;		\
    591 		} else {						\
    592 			qp->q_sqtail->b_next = mp;			\
    593 			qp->q_sqtail = mp;				\
    594 		}							\
    595 		ASSERT(qp->q_syncqmsgs > 0);				\
    596 		set_qfull(qp);						\
    597 	}
    598 
    599 #define	SQ_PUTCOUNT_SETFAST_LOCKED(sq) {				\
    600 		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
    601 		if ((sq)->sq_ciputctrl != NULL) {			\
    602 			int i;						\
    603 			int nlocks = (sq)->sq_nciputctrl;		\
    604 			ciputctrl_t *cip = (sq)->sq_ciputctrl;		\
    605 			ASSERT((sq)->sq_type & SQ_CIPUT);		\
    606 			for (i = 0; i <= nlocks; i++) {			\
    607 				ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
    608 				cip[i].ciputctrl_count |= SQ_FASTPUT;	\
    609 			}						\
    610 		}							\
    611 	}
    612 
    613 
    614 #define	SQ_PUTCOUNT_CLRFAST_LOCKED(sq) {				\
    615 		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
    616 		if ((sq)->sq_ciputctrl != NULL) {			\
    617 			int i;						\
    618 			int nlocks = (sq)->sq_nciputctrl;		\
    619 			ciputctrl_t *cip = (sq)->sq_ciputctrl;		\
    620 			ASSERT((sq)->sq_type & SQ_CIPUT);		\
    621 			for (i = 0; i <= nlocks; i++) {			\
    622 				ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
    623 				cip[i].ciputctrl_count &= ~SQ_FASTPUT;	\
    624 			}						\
    625 		}							\
    626 	}
    627 
    628 /*
    629  * Run service procedures for all queues in the stream head.
    630  */
    631 #define	STR_SERVICE(stp, q) {						\
    632 	ASSERT(MUTEX_HELD(&stp->sd_qlock));				\
    633 	while (stp->sd_qhead != NULL) {					\
    634 		DQ(q, stp->sd_qhead, stp->sd_qtail, q_link);		\
    635 		ASSERT(stp->sd_nqueues > 0);				\
    636 		stp->sd_nqueues--;					\
    637 		ASSERT(!(q->q_flag & QINSERVICE));			\
    638 		mutex_exit(&stp->sd_qlock);				\
    639 		queue_service(q);					\
    640 		mutex_enter(&stp->sd_qlock);				\
    641 	}								\
    642 	ASSERT(stp->sd_nqueues == 0);					\
    643 	ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL));	\
    644 }
    645 
    646 /*
    647  * Constructor/destructor routines for the stream head cache
    648  */
    649 /* ARGSUSED */
    650 static int
    651 stream_head_constructor(void *buf, void *cdrarg, int kmflags)
    652 {
    653 	stdata_t *stp = buf;
    654 
    655 	mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
    656 	mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
    657 	mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
    658 	cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
    659 	cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
    660 	cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
    661 	cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
    662 	cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
    663 	stp->sd_wrq = NULL;
    664 
    665 	return (0);
    666 }
    667 
    668 /* ARGSUSED */
    669 static void
    670 stream_head_destructor(void *buf, void *cdrarg)
    671 {
    672 	stdata_t *stp = buf;
    673 
    674 	mutex_destroy(&stp->sd_lock);
    675 	mutex_destroy(&stp->sd_reflock);
    676 	mutex_destroy(&stp->sd_qlock);
    677 	cv_destroy(&stp->sd_monitor);
    678 	cv_destroy(&stp->sd_iocmonitor);
    679 	cv_destroy(&stp->sd_refmonitor);
    680 	cv_destroy(&stp->sd_qcv);
    681 	cv_destroy(&stp->sd_zcopy_wait);
    682 }
    683 
    684 /*
    685  * Constructor/destructor routines for the queue cache
    686  */
    687 /* ARGSUSED */
    688 static int
    689 queue_constructor(void *buf, void *cdrarg, int kmflags)
    690 {
    691 	queinfo_t *qip = buf;
    692 	queue_t *qp = &qip->qu_rqueue;
    693 	queue_t *wqp = &qip->qu_wqueue;
    694 	syncq_t	*sq = &qip->qu_syncq;
    695 
    696 	qp->q_first = NULL;
    697 	qp->q_link = NULL;
    698 	qp->q_count = 0;
    699 	qp->q_mblkcnt = 0;
    700 	qp->q_sqhead = NULL;
    701 	qp->q_sqtail = NULL;
    702 	qp->q_sqnext = NULL;
    703 	qp->q_sqprev = NULL;
    704 	qp->q_sqflags = 0;
    705 	qp->q_rwcnt = 0;
    706 	qp->q_spri = 0;
    707 
    708 	mutex_init(QLOCK(qp), NULL, MUTEX_DEFAULT, NULL);
    709 	cv_init(&qp->q_wait, NULL, CV_DEFAULT, NULL);
    710 
    711 	wqp->q_first = NULL;
    712 	wqp->q_link = NULL;
    713 	wqp->q_count = 0;
    714 	wqp->q_mblkcnt = 0;
    715 	wqp->q_sqhead = NULL;
    716 	wqp->q_sqtail = NULL;
    717 	wqp->q_sqnext = NULL;
    718 	wqp->q_sqprev = NULL;
    719 	wqp->q_sqflags = 0;
    720 	wqp->q_rwcnt = 0;
    721 	wqp->q_spri = 0;
    722 
    723 	mutex_init(QLOCK(wqp), NULL, MUTEX_DEFAULT, NULL);
    724 	cv_init(&wqp->q_wait, NULL, CV_DEFAULT, NULL);
    725 
    726 	sq->sq_head = NULL;
    727 	sq->sq_tail = NULL;
    728 	sq->sq_evhead = NULL;
    729 	sq->sq_evtail = NULL;
    730 	sq->sq_callbpend = NULL;
    731 	sq->sq_outer = NULL;
    732 	sq->sq_onext = NULL;
    733 	sq->sq_oprev = NULL;
    734 	sq->sq_next = NULL;
    735 	sq->sq_svcflags = 0;
    736 	sq->sq_servcount = 0;
    737 	sq->sq_needexcl = 0;
    738 	sq->sq_nqueues = 0;
    739 	sq->sq_pri = 0;
    740 
    741 	mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
    742 	cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
    743 	cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
    744 
    745 	return (0);
    746 }
    747 
    748 /* ARGSUSED */
    749 static void
    750 queue_destructor(void *buf, void *cdrarg)
    751 {
    752 	queinfo_t *qip = buf;
    753 	queue_t *qp = &qip->qu_rqueue;
    754 	queue_t *wqp = &qip->qu_wqueue;
    755 	syncq_t	*sq = &qip->qu_syncq;
    756 
    757 	ASSERT(qp->q_sqhead == NULL);
    758 	ASSERT(wqp->q_sqhead == NULL);
    759 	ASSERT(qp->q_sqnext == NULL);
    760 	ASSERT(wqp->q_sqnext == NULL);
    761 	ASSERT(qp->q_rwcnt == 0);
    762 	ASSERT(wqp->q_rwcnt == 0);
    763 
    764 	mutex_destroy(&qp->q_lock);
    765 	cv_destroy(&qp->q_wait);
    766 
    767 	mutex_destroy(&wqp->q_lock);
    768 	cv_destroy(&wqp->q_wait);
    769 
    770 	mutex_destroy(&sq->sq_lock);
    771 	cv_destroy(&sq->sq_wait);
    772 	cv_destroy(&sq->sq_exitwait);
    773 }
    774 
    775 /*
    776  * Constructor/destructor routines for the syncq cache
    777  */
    778 /* ARGSUSED */
    779 static int
    780 syncq_constructor(void *buf, void *cdrarg, int kmflags)
    781 {
    782 	syncq_t	*sq = buf;
    783 
    784 	bzero(buf, sizeof (syncq_t));
    785 
    786 	mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
    787 	cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
    788 	cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
    789 
    790 	return (0);
    791 }
    792 
    793 /* ARGSUSED */
    794 static void
    795 syncq_destructor(void *buf, void *cdrarg)
    796 {
    797 	syncq_t	*sq = buf;
    798 
    799 	ASSERT(sq->sq_head == NULL);
    800 	ASSERT(sq->sq_tail == NULL);
    801 	ASSERT(sq->sq_evhead == NULL);
    802 	ASSERT(sq->sq_evtail == NULL);
    803 	ASSERT(sq->sq_callbpend == NULL);
    804 	ASSERT(sq->sq_callbflags == 0);
    805 	ASSERT(sq->sq_outer == NULL);
    806 	ASSERT(sq->sq_onext == NULL);
    807 	ASSERT(sq->sq_oprev == NULL);
    808 	ASSERT(sq->sq_next == NULL);
    809 	ASSERT(sq->sq_needexcl == 0);
    810 	ASSERT(sq->sq_svcflags == 0);
    811 	ASSERT(sq->sq_servcount == 0);
    812 	ASSERT(sq->sq_nqueues == 0);
    813 	ASSERT(sq->sq_pri == 0);
    814 	ASSERT(sq->sq_count == 0);
    815 	ASSERT(sq->sq_rmqcount == 0);
    816 	ASSERT(sq->sq_cancelid == 0);
    817 	ASSERT(sq->sq_ciputctrl == NULL);
    818 	ASSERT(sq->sq_nciputctrl == 0);
    819 	ASSERT(sq->sq_type == 0);
    820 	ASSERT(sq->sq_flags == 0);
    821 
    822 	mutex_destroy(&sq->sq_lock);
    823 	cv_destroy(&sq->sq_wait);
    824 	cv_destroy(&sq->sq_exitwait);
    825 }
    826 
    827 /* ARGSUSED */
    828 static int
    829 ciputctrl_constructor(void *buf, void *cdrarg, int kmflags)
    830 {
    831 	ciputctrl_t *cip = buf;
    832 	int i;
    833 
    834 	for (i = 0; i < n_ciputctrl; i++) {
    835 		cip[i].ciputctrl_count = SQ_FASTPUT;
    836 		mutex_init(&cip[i].ciputctrl_lock, NULL, MUTEX_DEFAULT, NULL);
    837 	}
    838 
    839 	return (0);
    840 }
    841 
    842 /* ARGSUSED */
    843 static void
    844 ciputctrl_destructor(void *buf, void *cdrarg)
    845 {
    846 	ciputctrl_t *cip = buf;
    847 	int i;
    848 
    849 	for (i = 0; i < n_ciputctrl; i++) {
    850 		ASSERT(cip[i].ciputctrl_count & SQ_FASTPUT);
    851 		mutex_destroy(&cip[i].ciputctrl_lock);
    852 	}
    853 }
    854 
    855 /*
    856  * Init routine run from main at boot time.
    857  */
    858 void
    859 strinit(void)
    860 {
    861 	int ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
    862 
    863 	stream_head_cache = kmem_cache_create("stream_head_cache",
    864 	    sizeof (stdata_t), 0,
    865 	    stream_head_constructor, stream_head_destructor, NULL,
    866 	    NULL, NULL, 0);
    867 
    868 	queue_cache = kmem_cache_create("queue_cache", sizeof (queinfo_t), 0,
    869 	    queue_constructor, queue_destructor, NULL, NULL, NULL, 0);
    870 
    871 	syncq_cache = kmem_cache_create("syncq_cache", sizeof (syncq_t), 0,
    872 	    syncq_constructor, syncq_destructor, NULL, NULL, NULL, 0);
    873 
    874 	qband_cache = kmem_cache_create("qband_cache",
    875 	    sizeof (qband_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
    876 
    877 	linkinfo_cache = kmem_cache_create("linkinfo_cache",
    878 	    sizeof (linkinfo_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
    879 
    880 	n_ciputctrl = ncpus;
    881 	n_ciputctrl = 1 << highbit(n_ciputctrl - 1);
    882 	ASSERT(n_ciputctrl >= 1);
    883 	n_ciputctrl = MIN(n_ciputctrl, max_n_ciputctrl);
    884 	if (n_ciputctrl >= min_n_ciputctrl) {
    885 		ciputctrl_cache = kmem_cache_create("ciputctrl_cache",
    886 		    sizeof (ciputctrl_t) * n_ciputctrl,
    887 		    sizeof (ciputctrl_t), ciputctrl_constructor,
    888 		    ciputctrl_destructor, NULL, NULL, NULL, 0);
    889 	}
    890 
    891 	streams_taskq = system_taskq;
    892 
    893 	if (streams_taskq == NULL)
    894 		panic("strinit: no memory for streams taskq!");
    895 
    896 	bc_bkgrnd_thread = thread_create(NULL, 0,
    897 	    streams_bufcall_service, NULL, 0, &p0, TS_RUN, streams_lopri);
    898 
    899 	streams_qbkgrnd_thread = thread_create(NULL, 0,
    900 	    streams_qbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
    901 
    902 	streams_sqbkgrnd_thread = thread_create(NULL, 0,
    903 	    streams_sqbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
    904 
    905 	/*
    906 	 * Create STREAMS kstats.
    907 	 */
    908 	str_kstat = kstat_create("streams", 0, "strstat",
    909 	    "net", KSTAT_TYPE_NAMED,
    910 	    sizeof (str_statistics) / sizeof (kstat_named_t),
    911 	    KSTAT_FLAG_VIRTUAL);
    912 
    913 	if (str_kstat != NULL) {
    914 		str_kstat->ks_data = &str_statistics;
    915 		kstat_install(str_kstat);
    916 	}
    917 
    918 	/*
    919 	 * TPI support routine initialisation.
    920 	 */
    921 	tpi_init();
    922 
    923 	/*
    924 	 * Handle to have autopush and persistent link information per
    925 	 * zone.
    926 	 * Note: uses shutdown hook instead of destroy hook so that the
    927 	 * persistent links can be torn down before the destroy hooks
    928 	 * in the TCP/IP stack are called.
    929 	 */
    930 	netstack_register(NS_STR, str_stack_init, str_stack_shutdown,
    931 	    str_stack_fini);
    932 }
    933 
    934 void
    935 str_sendsig(vnode_t *vp, int event, uchar_t band, int error)
    936 {
    937 	struct stdata *stp;
    938 
    939 	ASSERT(vp->v_stream);
    940 	stp = vp->v_stream;
    941 	/* Have to hold sd_lock to prevent siglist from changing */
    942 	mutex_enter(&stp->sd_lock);
    943 	if (stp->sd_sigflags & event)
    944 		strsendsig(stp->sd_siglist, event, band, error);
    945 	mutex_exit(&stp->sd_lock);
    946 }
    947 
    948 /*
    949  * Send the "sevent" set of signals to a process.
    950  * This might send more than one signal if the process is registered
    951  * for multiple events. The caller should pass in an sevent that only
    952  * includes the events for which the process has registered.
    953  */
    954 static void
    955 dosendsig(proc_t *proc, int events, int sevent, k_siginfo_t *info,
    956 	uchar_t band, int error)
    957 {
    958 	ASSERT(MUTEX_HELD(&proc->p_lock));
    959 
    960 	info->si_band = 0;
    961 	info->si_errno = 0;
    962 
    963 	if (sevent & S_ERROR) {
    964 		sevent &= ~S_ERROR;
    965 		info->si_code = POLL_ERR;
    966 		info->si_errno = error;
    967 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
    968 		    "strsendsig:proc %p info %p", proc, info);
    969 		sigaddq(proc, NULL, info, KM_NOSLEEP);
    970 		info->si_errno = 0;
    971 	}
    972 	if (sevent & S_HANGUP) {
    973 		sevent &= ~S_HANGUP;
    974 		info->si_code = POLL_HUP;
    975 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
    976 		    "strsendsig:proc %p info %p", proc, info);
    977 		sigaddq(proc, NULL, info, KM_NOSLEEP);
    978 	}
    979 	if (sevent & S_HIPRI) {
    980 		sevent &= ~S_HIPRI;
    981 		info->si_code = POLL_PRI;
    982 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
    983 		    "strsendsig:proc %p info %p", proc, info);
    984 		sigaddq(proc, NULL, info, KM_NOSLEEP);
    985 	}
    986 	if (sevent & S_RDBAND) {
    987 		sevent &= ~S_RDBAND;
    988 		if (events & S_BANDURG)
    989 			sigtoproc(proc, NULL, SIGURG);
    990 		else
    991 			sigtoproc(proc, NULL, SIGPOLL);
    992 	}
    993 	if (sevent & S_WRBAND) {
    994 		sevent &= ~S_WRBAND;
    995 		sigtoproc(proc, NULL, SIGPOLL);
    996 	}
    997 	if (sevent & S_INPUT) {
    998 		sevent &= ~S_INPUT;
    999 		info->si_code = POLL_IN;
   1000 		info->si_band = band;
   1001 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
   1002 		    "strsendsig:proc %p info %p", proc, info);
   1003 		sigaddq(proc, NULL, info, KM_NOSLEEP);
   1004 		info->si_band = 0;
   1005 	}
   1006 	if (sevent & S_OUTPUT) {
   1007 		sevent &= ~S_OUTPUT;
   1008 		info->si_code = POLL_OUT;
   1009 		info->si_band = band;
   1010 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
   1011 		    "strsendsig:proc %p info %p", proc, info);
   1012 		sigaddq(proc, NULL, info, KM_NOSLEEP);
   1013 		info->si_band = 0;
   1014 	}
   1015 	if (sevent & S_MSG) {
   1016 		sevent &= ~S_MSG;
   1017 		info->si_code = POLL_MSG;
   1018 		info->si_band = band;
   1019 		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
   1020 		    "strsendsig:proc %p info %p", proc, info);
   1021 		sigaddq(proc, NULL, info, KM_NOSLEEP);
   1022 		info->si_band = 0;
   1023 	}
   1024 	if (sevent & S_RDNORM) {
   1025 		sevent &= ~S_RDNORM;
   1026 		sigtoproc(proc, NULL, SIGPOLL);
   1027 	}
   1028 	if (sevent != 0) {
   1029 		panic("strsendsig: unknown event(s) %x", sevent);
   1030 	}
   1031 }
   1032 
   1033 /*
   1034  * Send SIGPOLL/SIGURG signal to all processes and process groups
   1035  * registered on the given signal list that want a signal for at
   1036  * least one of the specified events.
   1037  *
   1038  * Must be called with exclusive access to siglist (caller holding sd_lock).
   1039  *
   1040  * strioctl(I_SETSIG/I_ESETSIG) will only change siglist when holding
   1041  * sd_lock and the ioctl code maintains a PID_HOLD on the pid structure
   1042  * while it is in the siglist.
   1043  *
   1044  * For performance reasons (MP scalability) the code drops pidlock
   1045  * when sending signals to a single process.
   1046  * When sending to a process group the code holds
   1047  * pidlock to prevent the membership in the process group from changing
   1048  * while walking the p_pglink list.
   1049  */
   1050 void
   1051 strsendsig(strsig_t *siglist, int event, uchar_t band, int error)
   1052 {
   1053 	strsig_t *ssp;
   1054 	k_siginfo_t info;
   1055 	struct pid *pidp;
   1056 	proc_t  *proc;
   1057 
   1058 	info.si_signo = SIGPOLL;
   1059 	info.si_errno = 0;
   1060 	for (ssp = siglist; ssp; ssp = ssp->ss_next) {
   1061 		int sevent;
   1062 
   1063 		sevent = ssp->ss_events & event;
   1064 		if (sevent == 0)
   1065 			continue;
   1066 
   1067 		if ((pidp = ssp->ss_pidp) == NULL) {
   1068 			/* pid was released but still on event list */
   1069 			continue;
   1070 		}
   1071 
   1072 
   1073 		if (ssp->ss_pid > 0) {
   1074 			/*
   1075 			 * XXX This unfortunately still generates
   1076 			 * a signal when a fd is closed but
   1077 			 * the proc is active.
   1078 			 */
   1079 			ASSERT(ssp->ss_pid == pidp->pid_id);
   1080 
   1081 			mutex_enter(&pidlock);
   1082 			proc = prfind_zone(pidp->pid_id, ALL_ZONES);
   1083 			if (proc == NULL) {
   1084 				mutex_exit(&pidlock);
   1085 				continue;
   1086 			}
   1087 			mutex_enter(&proc->p_lock);
   1088 			mutex_exit(&pidlock);
   1089 			dosendsig(proc, ssp->ss_events, sevent, &info,
   1090 			    band, error);
   1091 			mutex_exit(&proc->p_lock);
   1092 		} else {
   1093 			/*
   1094 			 * Send to process group. Hold pidlock across
   1095 			 * calls to dosendsig().
   1096 			 */
   1097 			pid_t pgrp = -ssp->ss_pid;
   1098 
   1099 			mutex_enter(&pidlock);
   1100 			proc = pgfind_zone(pgrp, ALL_ZONES);
   1101 			while (proc != NULL) {
   1102 				mutex_enter(&proc->p_lock);
   1103 				dosendsig(proc, ssp->ss_events, sevent,
   1104 				    &info, band, error);
   1105 				mutex_exit(&proc->p_lock);
   1106 				proc = proc->p_pglink;
   1107 			}
   1108 			mutex_exit(&pidlock);
   1109 		}
   1110 	}
   1111 }
   1112 
   1113 /*
   1114  * Attach a stream device or module.
   1115  * qp is a read queue; the new queue goes in so its next
   1116  * read ptr is the argument, and the write queue corresponding
   1117  * to the argument points to this queue. Return 0 on success,
   1118  * or a non-zero errno on failure.
   1119  */
   1120 int
   1121 qattach(queue_t *qp, dev_t *devp, int oflag, cred_t *crp, fmodsw_impl_t *fp,
   1122     boolean_t is_insert)
   1123 {
   1124 	major_t			major;
   1125 	cdevsw_impl_t		*dp;
   1126 	struct streamtab	*str;
   1127 	queue_t			*rq;
   1128 	queue_t			*wrq;
   1129 	uint32_t		qflag;
   1130 	uint32_t		sqtype;
   1131 	perdm_t			*dmp;
   1132 	int			error;
   1133 	int			sflag;
   1134 
   1135 	rq = allocq();
   1136 	wrq = _WR(rq);
   1137 	STREAM(rq) = STREAM(wrq) = STREAM(qp);
   1138 
   1139 	if (fp != NULL) {
   1140 		str = fp->f_str;
   1141 		qflag = fp->f_qflag;
   1142 		sqtype = fp->f_sqtype;
   1143 		dmp = fp->f_dmp;
   1144 		IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
   1145 		sflag = MODOPEN;
   1146 
   1147 		/*
   1148 		 * stash away a pointer to the module structure so we can
   1149 		 * unref it in qdetach.
   1150 		 */
   1151 		rq->q_fp = fp;
   1152 	} else {
   1153 		ASSERT(!is_insert);
   1154 
   1155 		major = getmajor(*devp);
   1156 		dp = &devimpl[major];
   1157 
   1158 		str = dp->d_str;
   1159 		ASSERT(str == STREAMSTAB(major));
   1160 
   1161 		qflag = dp->d_qflag;
   1162 		ASSERT(qflag & QISDRV);
   1163 		sqtype = dp->d_sqtype;
   1164 
   1165 		/* create perdm_t if needed */
   1166 		if (NEED_DM(dp->d_dmp, qflag))
   1167 			dp->d_dmp = hold_dm(str, qflag, sqtype);
   1168 
   1169 		dmp = dp->d_dmp;
   1170 		sflag = 0;
   1171 	}
   1172 
   1173 	TRACE_2(TR_FAC_STREAMS_FR, TR_QATTACH_FLAGS,
   1174 	    "qattach:qflag == %X(%X)", qflag, *devp);
   1175 
   1176 	/* setq might sleep in allocator - avoid holding locks. */
   1177 	setq(rq, str->st_rdinit, str->st_wrinit, dmp, qflag, sqtype, B_FALSE);
   1178 
   1179 	/*
   1180 	 * Before calling the module's open routine, set up the q_next
   1181 	 * pointer for inserting a module in the middle of a stream.
   1182 	 *
   1183 	 * Note that we can always set _QINSERTING and set up q_next
   1184 	 * pointer for both inserting and pushing a module.  Then there
   1185 	 * is no need for the is_insert parameter.  In insertq(), called
   1186 	 * by qprocson(), assume that q_next of the new module always points
   1187 	 * to the correct queue and use it for insertion.  Everything should
   1188 	 * work out fine.  But in the first release of _I_INSERT, we
   1189 	 * distinguish between inserting and pushing to make sure that
   1190 	 * pushing a module follows the same code path as before.
   1191 	 */
   1192 	if (is_insert) {
   1193 		rq->q_flag |= _QINSERTING;
   1194 		rq->q_next = qp;
   1195 	}
   1196 
   1197 	/*
   1198 	 * If there is an outer perimeter get exclusive access during
   1199 	 * the open procedure.  Bump up the reference count on the queue.
   1200 	 */
   1201 	entersq(rq->q_syncq, SQ_OPENCLOSE);
   1202 	error = (*rq->q_qinfo->qi_qopen)(rq, devp, oflag, sflag, crp);
   1203 	if (error != 0)
   1204 		goto failed;
   1205 	leavesq(rq->q_syncq, SQ_OPENCLOSE);
   1206 	ASSERT(qprocsareon(rq));
   1207 	return (0);
   1208 
   1209 failed:
   1210 	rq->q_flag &= ~_QINSERTING;
   1211 	if (backq(wrq) != NULL && backq(wrq)->q_next == wrq)
   1212 		qprocsoff(rq);
   1213 	leavesq(rq->q_syncq, SQ_OPENCLOSE);
   1214 	rq->q_next = wrq->q_next = NULL;
   1215 	qdetach(rq, 0, 0, crp, B_FALSE);
   1216 	return (error);
   1217 }
   1218 
   1219 /*
   1220  * Handle second open of stream. For modules, set the
   1221  * last argument to MODOPEN and do not pass any open flags.
   1222  * Ignore dummydev since this is not the first open.
   1223  */
   1224 int
   1225 qreopen(queue_t *qp, dev_t *devp, int flag, cred_t *crp)
   1226 {
   1227 	int	error;
   1228 	dev_t dummydev;
   1229 	queue_t *wqp = _WR(qp);
   1230 
   1231 	ASSERT(qp->q_flag & QREADR);
   1232 	entersq(qp->q_syncq, SQ_OPENCLOSE);
   1233 
   1234 	dummydev = *devp;
   1235 	if (error = ((*qp->q_qinfo->qi_qopen)(qp, &dummydev,
   1236 	    (wqp->q_next ? 0 : flag), (wqp->q_next ? MODOPEN : 0), crp))) {
   1237 		leavesq(qp->q_syncq, SQ_OPENCLOSE);
   1238 		mutex_enter(&STREAM(qp)->sd_lock);
   1239 		qp->q_stream->sd_flag |= STREOPENFAIL;
   1240 		mutex_exit(&STREAM(qp)->sd_lock);
   1241 		return (error);
   1242 	}
   1243 	leavesq(qp->q_syncq, SQ_OPENCLOSE);
   1244 
   1245 	/*
   1246 	 * successful open should have done qprocson()
   1247 	 */
   1248 	ASSERT(qprocsareon(_RD(qp)));
   1249 	return (0);
   1250 }
   1251 
   1252 /*
   1253  * Detach a stream module or device.
   1254  * If clmode == 1 then the module or driver was opened and its
   1255  * close routine must be called. If clmode == 0, the module
   1256  * or driver was never opened or the open failed, and so its close
   1257  * should not be called.
   1258  */
   1259 void
   1260 qdetach(queue_t *qp, int clmode, int flag, cred_t *crp, boolean_t is_remove)
   1261 {
   1262 	queue_t *wqp = _WR(qp);
   1263 	ASSERT(STREAM(qp)->sd_flag & (STRCLOSE|STWOPEN|STRPLUMB));
   1264 
   1265 	if (STREAM_NEEDSERVICE(STREAM(qp)))
   1266 		stream_runservice(STREAM(qp));
   1267 
   1268 	if (clmode) {
   1269 		/*
   1270 		 * Make sure that all the messages on the write side syncq are
   1271 		 * processed and nothing is left. Since we are closing, no new
   1272 		 * messages may appear there.
   1273 		 */
   1274 		wait_q_syncq(wqp);
   1275 
   1276 		entersq(qp->q_syncq, SQ_OPENCLOSE);
   1277 		if (is_remove) {
   1278 			mutex_enter(QLOCK(qp));
   1279 			qp->q_flag |= _QREMOVING;
   1280 			mutex_exit(QLOCK(qp));
   1281 		}
   1282 		(*qp->q_qinfo->qi_qclose)(qp, flag, crp);
   1283 		/*
   1284 		 * Check that qprocsoff() was actually called.
   1285 		 */
   1286 		ASSERT((qp->q_flag & QWCLOSE) && (wqp->q_flag & QWCLOSE));
   1287 
   1288 		leavesq(qp->q_syncq, SQ_OPENCLOSE);
   1289 	} else {
   1290 		disable_svc(qp);
   1291 	}
   1292 
   1293 	/*
   1294 	 * Allow any threads blocked in entersq to proceed and discover
   1295 	 * the QWCLOSE is set.
   1296 	 * Note: This assumes that all users of entersq check QWCLOSE.
   1297 	 * Currently runservice is the only entersq that can happen
   1298 	 * after removeq has finished.
   1299 	 * Removeq will have discarded all messages destined to the closing
   1300 	 * pair of queues from the syncq.
   1301 	 * NOTE: Calling a function inside an assert is unconventional.
   1302 	 * However, it does not cause any problem since flush_syncq() does
   1303 	 * not change any state except when it returns non-zero i.e.
   1304 	 * when the assert will trigger.
   1305 	 */
   1306 	ASSERT(flush_syncq(qp->q_syncq, qp) == 0);
   1307 	ASSERT(flush_syncq(wqp->q_syncq, wqp) == 0);
   1308 	ASSERT((qp->q_flag & QPERMOD) ||
   1309 	    ((qp->q_syncq->sq_head == NULL) &&
   1310 	    (wqp->q_syncq->sq_head == NULL)));
   1311 
   1312 	/* release any fmodsw_impl_t structure held on behalf of the queue */
   1313 	ASSERT(qp->q_fp != NULL || qp->q_flag & QISDRV);
   1314 	if (qp->q_fp != NULL)
   1315 		fmodsw_rele(qp->q_fp);
   1316 
   1317 	/* freeq removes us from the outer perimeter if any */
   1318 	freeq(qp);
   1319 }
   1320 
   1321 /* Prevent service procedures from being called */
   1322 void
   1323 disable_svc(queue_t *qp)
   1324 {
   1325 	queue_t *wqp = _WR(qp);
   1326 
   1327 	ASSERT(qp->q_flag & QREADR);
   1328 	mutex_enter(QLOCK(qp));
   1329 	qp->q_flag |= QWCLOSE;
   1330 	mutex_exit(QLOCK(qp));
   1331 	mutex_enter(QLOCK(wqp));
   1332 	wqp->q_flag |= QWCLOSE;
   1333 	mutex_exit(QLOCK(wqp));
   1334 }
   1335 
   1336 /* Allow service procedures to be called again */
   1337 void
   1338 enable_svc(queue_t *qp)
   1339 {
   1340 	queue_t *wqp = _WR(qp);
   1341 
   1342 	ASSERT(qp->q_flag & QREADR);
   1343 	mutex_enter(QLOCK(qp));
   1344 	qp->q_flag &= ~QWCLOSE;
   1345 	mutex_exit(QLOCK(qp));
   1346 	mutex_enter(QLOCK(wqp));
   1347 	wqp->q_flag &= ~QWCLOSE;
   1348 	mutex_exit(QLOCK(wqp));
   1349 }
   1350 
   1351 /*
   1352  * Remove queue from qhead/qtail if it is enabled.
   1353  * Only reset QENAB if the queue was removed from the runlist.
   1354  * A queue goes through 3 stages:
   1355  *	It is on the service list and QENAB is set.
   1356  *	It is removed from the service list but QENAB is still set.
   1357  *	QENAB gets changed to QINSERVICE.
   1358  *	QINSERVICE is reset (when the service procedure is done)
   1359  * Thus we can not reset QENAB unless we actually removed it from the service
   1360  * queue.
   1361  */
   1362 void
   1363 remove_runlist(queue_t *qp)
   1364 {
   1365 	if (qp->q_flag & QENAB && qhead != NULL) {
   1366 		queue_t *q_chase;
   1367 		queue_t *q_curr;
   1368 		int removed;
   1369 
   1370 		mutex_enter(&service_queue);
   1371 		RMQ(qp, qhead, qtail, q_link, q_chase, q_curr, removed);
   1372 		mutex_exit(&service_queue);
   1373 		if (removed) {
   1374 			STRSTAT(qremoved);
   1375 			qp->q_flag &= ~QENAB;
   1376 		}
   1377 	}
   1378 }
   1379 
   1380 
   1381 /*
   1382  * Wait for any pending service processing to complete.
   1383  * The removal of queues from the runlist is not atomic with the
   1384  * clearing of the QENABLED flag and setting the INSERVICE flag.
   1385  * consequently it is possible for remove_runlist in strclose
   1386  * to not find the queue on the runlist but for it to be QENABLED
   1387  * and not yet INSERVICE -> hence wait_svc needs to check QENABLED
   1388  * as well as INSERVICE.
   1389  */
   1390 void
   1391 wait_svc(queue_t *qp)
   1392 {
   1393 	queue_t *wqp = _WR(qp);
   1394 
   1395 	ASSERT(qp->q_flag & QREADR);
   1396 
   1397 	/*
   1398 	 * Try to remove queues from qhead/qtail list.
   1399 	 */
   1400 	if (qhead != NULL) {
   1401 		remove_runlist(qp);
   1402 		remove_runlist(wqp);
   1403 	}
   1404 	/*
   1405 	 * Wait till the syncqs associated with the queue disappear from the
   1406 	 * background processing list.
   1407 	 * This only needs to be done for non-PERMOD perimeters since
   1408 	 * for PERMOD perimeters the syncq may be shared and will only be freed
   1409 	 * when the last module/driver is unloaded.
   1410 	 * If for PERMOD perimeters queue was on the syncq list, removeq()
   1411 	 * should call propagate_syncq() or drain_syncq() for it. Both of these
   1412 	 * functions remove the queue from its syncq list, so sqthread will not
   1413 	 * try to access the queue.
   1414 	 */
   1415 	if (!(qp->q_flag & QPERMOD)) {
   1416 		syncq_t *rsq = qp->q_syncq;
   1417 		syncq_t *wsq = wqp->q_syncq;
   1418 
   1419 		/*
   1420 		 * Disable rsq and wsq and wait for any background processing of
   1421 		 * syncq to complete.
   1422 		 */
   1423 		wait_sq_svc(rsq);
   1424 		if (wsq != rsq)
   1425 			wait_sq_svc(wsq);
   1426 	}
   1427 
   1428 	mutex_enter(QLOCK(qp));
   1429 	while (qp->q_flag & (QINSERVICE|QENAB))
   1430 		cv_wait(&qp->q_wait, QLOCK(qp));
   1431 	mutex_exit(QLOCK(qp));
   1432 	mutex_enter(QLOCK(wqp));
   1433 	while (wqp->q_flag & (QINSERVICE|QENAB))
   1434 		cv_wait(&wqp->q_wait, QLOCK(wqp));
   1435 	mutex_exit(QLOCK(wqp));
   1436 }
   1437 
   1438 /*
   1439  * Put ioctl data from userland buffer `arg' into the mblk chain `bp'.
   1440  * `flag' must always contain either K_TO_K or U_TO_K; STR_NOSIG may
   1441  * also be set, and is passed through to allocb_cred_wait().
   1442  *
   1443  * Returns errno on failure, zero on success.
   1444  */
   1445 int
   1446 putiocd(mblk_t *bp, char *arg, int flag, cred_t *cr)
   1447 {
   1448 	mblk_t *tmp;
   1449 	ssize_t  count;
   1450 	int error = 0;
   1451 
   1452 	ASSERT((flag & (U_TO_K | K_TO_K)) == U_TO_K ||
   1453 	    (flag & (U_TO_K | K_TO_K)) == K_TO_K);
   1454 
   1455 	if (bp->b_datap->db_type == M_IOCTL) {
   1456 		count = ((struct iocblk *)bp->b_rptr)->ioc_count;
   1457 	} else {
   1458 		ASSERT(bp->b_datap->db_type == M_COPYIN);
   1459 		count = ((struct copyreq *)bp->b_rptr)->cq_size;
   1460 	}
   1461 	/*
   1462 	 * strdoioctl validates ioc_count, so if this assert fails it
   1463 	 * cannot be due to user error.
   1464 	 */
   1465 	ASSERT(count >= 0);
   1466 
   1467 	if ((tmp = allocb_cred_wait(count, (flag & STR_NOSIG), &error, cr,
   1468 	    curproc->p_pid)) == NULL) {
   1469 		return (error);
   1470 	}
   1471 	error = strcopyin(arg, tmp->b_wptr, count, flag & (U_TO_K|K_TO_K));
   1472 	if (error != 0) {
   1473 		freeb(tmp);
   1474 		return (error);
   1475 	}
   1476 	DB_CPID(tmp) = curproc->p_pid;
   1477 	tmp->b_wptr += count;
   1478 	bp->b_cont = tmp;
   1479 
   1480 	return (0);
   1481 }
   1482 
   1483 /*
   1484  * Copy ioctl data to user-land. Return non-zero errno on failure,
   1485  * 0 for success.
   1486  */
   1487 int
   1488 getiocd(mblk_t *bp, char *arg, int copymode)
   1489 {
   1490 	ssize_t count;
   1491 	size_t  n;
   1492 	int	error;
   1493 
   1494 	if (bp->b_datap->db_type == M_IOCACK)
   1495 		count = ((struct iocblk *)bp->b_rptr)->ioc_count;
   1496 	else {
   1497 		ASSERT(bp->b_datap->db_type == M_COPYOUT);
   1498 		count = ((struct copyreq *)bp->b_rptr)->cq_size;
   1499 	}
   1500 	ASSERT(count >= 0);
   1501 
   1502 	for (bp = bp->b_cont; bp && count;
   1503 	    count -= n, bp = bp->b_cont, arg += n) {
   1504 		n = MIN(count, bp->b_wptr - bp->b_rptr);
   1505 		error = strcopyout(bp->b_rptr, arg, n, copymode);
   1506 		if (error)
   1507 			return (error);
   1508 	}
   1509 	ASSERT(count == 0);
   1510 	return (0);
   1511 }
   1512 
   1513 /*
   1514  * Allocate a linkinfo entry given the write queue of the
   1515  * bottom module of the top stream and the write queue of the
   1516  * stream head of the bottom stream.
   1517  */
   1518 linkinfo_t *
   1519 alloclink(queue_t *qup, queue_t *qdown, file_t *fpdown)
   1520 {
   1521 	linkinfo_t *linkp;
   1522 
   1523 	linkp = kmem_cache_alloc(linkinfo_cache, KM_SLEEP);
   1524 
   1525 	linkp->li_lblk.l_qtop = qup;
   1526 	linkp->li_lblk.l_qbot = qdown;
   1527 	linkp->li_fpdown = fpdown;
   1528 
   1529 	mutex_enter(&strresources);
   1530 	linkp->li_next = linkinfo_list;
   1531 	linkp->li_prev = NULL;
   1532 	if (linkp->li_next)
   1533 		linkp->li_next->li_prev = linkp;
   1534 	linkinfo_list = linkp;
   1535 	linkp->li_lblk.l_index = ++lnk_id;
   1536 	ASSERT(lnk_id != 0);	/* this should never wrap in practice */
   1537 	mutex_exit(&strresources);
   1538 
   1539 	return (linkp);
   1540 }
   1541 
   1542 /*
   1543  * Free a linkinfo entry.
   1544  */
   1545 void
   1546 lbfree(linkinfo_t *linkp)
   1547 {
   1548 	mutex_enter(&strresources);
   1549 	if (linkp->li_next)
   1550 		linkp->li_next->li_prev = linkp->li_prev;
   1551 	if (linkp->li_prev)
   1552 		linkp->li_prev->li_next = linkp->li_next;
   1553 	else
   1554 		linkinfo_list = linkp->li_next;
   1555 	mutex_exit(&strresources);
   1556 
   1557 	kmem_cache_free(linkinfo_cache, linkp);
   1558 }
   1559 
   1560 /*
   1561  * Check for a potential linking cycle.
   1562  * Return 1 if a link will result in a cycle,
   1563  * and 0 otherwise.
   1564  */
   1565 int
   1566 linkcycle(stdata_t *upstp, stdata_t *lostp, str_stack_t *ss)
   1567 {
   1568 	struct mux_node *np;
   1569 	struct mux_edge *ep;
   1570 	int i;
   1571 	major_t lomaj;
   1572 	major_t upmaj;
   1573 	/*
   1574 	 * if the lower stream is a pipe/FIFO, return, since link
   1575 	 * cycles can not happen on pipes/FIFOs
   1576 	 */
   1577 	if (lostp->sd_vnode->v_type == VFIFO)
   1578 		return (0);
   1579 
   1580 	for (i = 0; i < ss->ss_devcnt; i++) {
   1581 		np = &ss->ss_mux_nodes[i];
   1582 		MUX_CLEAR(np);
   1583 	}
   1584 	lomaj = getmajor(lostp->sd_vnode->v_rdev);
   1585 	upmaj = getmajor(upstp->sd_vnode->v_rdev);
   1586 	np = &ss->ss_mux_nodes[lomaj];
   1587 	for (;;) {
   1588 		if (!MUX_DIDVISIT(np)) {
   1589 			if (np->mn_imaj == upmaj)
   1590 				return (1);
   1591 			if (np->mn_outp == NULL) {
   1592 				MUX_VISIT(np);
   1593 				if (np->mn_originp == NULL)
   1594 					return (0);
   1595 				np = np->mn_originp;
   1596 				continue;
   1597 			}
   1598 			MUX_VISIT(np);
   1599 			np->mn_startp = np->mn_outp;
   1600 		} else {
   1601 			if (np->mn_startp == NULL) {
   1602 				if (np->mn_originp == NULL)
   1603 					return (0);
   1604 				else {
   1605 					np = np->mn_originp;
   1606 					continue;
   1607 				}
   1608 			}
   1609 			/*
   1610 			 * If ep->me_nodep is a FIFO (me_nodep == NULL),
   1611 			 * ignore the edge and move on. ep->me_nodep gets
   1612 			 * set to NULL in mux_addedge() if it is a FIFO.
   1613 			 *
   1614 			 */
   1615 			ep = np->mn_startp;
   1616 			np->mn_startp = ep->me_nextp;
   1617 			if (ep->me_nodep == NULL)
   1618 				continue;
   1619 			ep->me_nodep->mn_originp = np;
   1620 			np = ep->me_nodep;
   1621 		}
   1622 	}
   1623 }
   1624 
   1625 /*
   1626  * Find linkinfo entry corresponding to the parameters.
   1627  */
   1628 linkinfo_t *
   1629 findlinks(stdata_t *stp, int index, int type, str_stack_t *ss)
   1630 {
   1631 	linkinfo_t *linkp;
   1632 	struct mux_edge *mep;
   1633 	struct mux_node *mnp;
   1634 	queue_t *qup;
   1635 
   1636 	mutex_enter(&strresources);
   1637 	if ((type & LINKTYPEMASK) == LINKNORMAL) {
   1638 		qup = getendq(stp->sd_wrq);
   1639 		for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
   1640 			if ((qup == linkp->li_lblk.l_qtop) &&
   1641 			    (!index || (index == linkp->li_lblk.l_index))) {
   1642 				mutex_exit(&strresources);
   1643 				return (linkp);
   1644 			}
   1645 		}
   1646 	} else {
   1647 		ASSERT((type & LINKTYPEMASK) == LINKPERSIST);
   1648 		mnp = &ss->ss_mux_nodes[getmajor(stp->sd_vnode->v_rdev)];
   1649 		mep = mnp->mn_outp;
   1650 		while (mep) {
   1651 			if ((index == 0) || (index == mep->me_muxid))
   1652 				break;
   1653 			mep = mep->me_nextp;
   1654 		}
   1655 		if (!mep) {
   1656 			mutex_exit(&strresources);
   1657 			return (NULL);
   1658 		}
   1659 		for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
   1660 			if ((!linkp->li_lblk.l_qtop) &&
   1661 			    (mep->me_muxid == linkp->li_lblk.l_index)) {
   1662 				mutex_exit(&strresources);
   1663 				return (linkp);
   1664 			}
   1665 		}
   1666 	}
   1667 	mutex_exit(&strresources);
   1668 	return (NULL);
   1669 }
   1670 
   1671 /*
   1672  * Given a queue ptr, follow the chain of q_next pointers until you reach the
   1673  * last queue on the chain and return it.
   1674  */
   1675 queue_t *
   1676 getendq(queue_t *q)
   1677 {
   1678 	ASSERT(q != NULL);
   1679 	while (_SAMESTR(q))
   1680 		q = q->q_next;
   1681 	return (q);
   1682 }
   1683 
   1684 /*
   1685  * Wait for the syncq count to drop to zero.
   1686  * sq could be either outer or inner.
   1687  */
   1688 
   1689 static void
   1690 wait_syncq(syncq_t *sq)
   1691 {
   1692 	uint16_t count;
   1693 
   1694 	mutex_enter(SQLOCK(sq));
   1695 	count = sq->sq_count;
   1696 	SQ_PUTLOCKS_ENTER(sq);
   1697 	SUM_SQ_PUTCOUNTS(sq, count);
   1698 	while (count != 0) {
   1699 		sq->sq_flags |= SQ_WANTWAKEUP;
   1700 		SQ_PUTLOCKS_EXIT(sq);
   1701 		cv_wait(&sq->sq_wait, SQLOCK(sq));
   1702 		count = sq->sq_count;
   1703 		SQ_PUTLOCKS_ENTER(sq);
   1704 		SUM_SQ_PUTCOUNTS(sq, count);
   1705 	}
   1706 	SQ_PUTLOCKS_EXIT(sq);
   1707 	mutex_exit(SQLOCK(sq));
   1708 }
   1709 
   1710 /*
   1711  * Wait while there are any messages for the queue in its syncq.
   1712  */
   1713 static void
   1714 wait_q_syncq(queue_t *q)
   1715 {
   1716 	if ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
   1717 		syncq_t *sq = q->q_syncq;
   1718 
   1719 		mutex_enter(SQLOCK(sq));
   1720 		while ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
   1721 			sq->sq_flags |= SQ_WANTWAKEUP;
   1722 			cv_wait(&sq->sq_wait, SQLOCK(sq));
   1723 		}
   1724 		mutex_exit(SQLOCK(sq));
   1725 	}
   1726 }
   1727 
   1728 
   1729 int
   1730 mlink_file(vnode_t *vp, int cmd, struct file *fpdown, cred_t *crp, int *rvalp,
   1731     int lhlink)
   1732 {
   1733 	struct stdata *stp;
   1734 	struct strioctl strioc;
   1735 	struct linkinfo *linkp;
   1736 	struct stdata *stpdown;
   1737 	struct streamtab *str;
   1738 	queue_t *passq;
   1739 	syncq_t *passyncq;
   1740 	queue_t *rq;
   1741 	cdevsw_impl_t *dp;
   1742 	uint32_t qflag;
   1743 	uint32_t sqtype;
   1744 	perdm_t *dmp;
   1745 	int error = 0;
   1746 	netstack_t *ns;
   1747 	str_stack_t *ss;
   1748 
   1749 	stp = vp->v_stream;
   1750 	TRACE_1(TR_FAC_STREAMS_FR,
   1751 	    TR_I_LINK, "I_LINK/I_PLINK:stp %p", stp);
   1752 	/*
   1753 	 * Test for invalid upper stream
   1754 	 */
   1755 	if (stp->sd_flag & STRHUP) {
   1756 		return (ENXIO);
   1757 	}
   1758 	if (vp->v_type == VFIFO) {
   1759 		return (EINVAL);
   1760 	}
   1761 	if (stp->sd_strtab == NULL) {
   1762 		return (EINVAL);
   1763 	}
   1764 	if (!stp->sd_strtab->st_muxwinit) {
   1765 		return (EINVAL);
   1766 	}
   1767 	if (fpdown == NULL) {
   1768 		return (EBADF);
   1769 	}
   1770 	ns = netstack_find_by_cred(crp);
   1771 	ASSERT(ns != NULL);
   1772 	ss = ns->netstack_str;
   1773 	ASSERT(ss != NULL);
   1774 
   1775 	if (getmajor(stp->sd_vnode->v_rdev) >= ss->ss_devcnt) {
   1776 		netstack_rele(ss->ss_netstack);
   1777 		return (EINVAL);
   1778 	}
   1779 	mutex_enter(&muxifier);
   1780 	if (stp->sd_flag & STPLEX) {
   1781 		mutex_exit(&muxifier);
   1782 		netstack_rele(ss->ss_netstack);
   1783 		return (ENXIO);
   1784 	}
   1785 
   1786 	/*
   1787 	 * Test for invalid lower stream.
   1788 	 * The check for the v_type != VFIFO and having a major
   1789 	 * number not >= devcnt is done to avoid problems with
   1790 	 * adding mux_node entry past the end of mux_nodes[].
   1791 	 * For FIFO's we don't add an entry so this isn't a
   1792 	 * problem.
   1793 	 */
   1794 	if (((stpdown = fpdown->f_vnode->v_stream) == NULL) ||
   1795 	    (stpdown == stp) || (stpdown->sd_flag &
   1796 	    (STPLEX|STRHUP|STRDERR|STWRERR|IOCWAIT|STRPLUMB)) ||
   1797 	    ((stpdown->sd_vnode->v_type != VFIFO) &&
   1798 	    (getmajor(stpdown->sd_vnode->v_rdev) >= ss->ss_devcnt)) ||
   1799 	    linkcycle(stp, stpdown, ss)) {
   1800 		mutex_exit(&muxifier);
   1801 		netstack_rele(ss->ss_netstack);
   1802 		return (EINVAL);
   1803 	}
   1804 	TRACE_1(TR_FAC_STREAMS_FR,
   1805 	    TR_STPDOWN, "stpdown:%p", stpdown);
   1806 	rq = getendq(stp->sd_wrq);
   1807 	if (cmd == I_PLINK)
   1808 		rq = NULL;
   1809 
   1810 	linkp = alloclink(rq, stpdown->sd_wrq, fpdown);
   1811 
   1812 	strioc.ic_cmd = cmd;
   1813 	strioc.ic_timout = INFTIM;
   1814 	strioc.ic_len = sizeof (struct linkblk);
   1815 	strioc.ic_dp = (char *)&linkp->li_lblk;
   1816 
   1817 	/*
   1818 	 * STRPLUMB protects plumbing changes and should be set before
   1819 	 * link_addpassthru()/link_rempassthru() are called, so it is set here
   1820 	 * and cleared in the end of mlink when passthru queue is removed.
   1821 	 * Setting of STRPLUMB prevents reopens of the stream while passthru
   1822 	 * queue is in-place (it is not a proper module and doesn't have open
   1823 	 * entry point).
   1824 	 *
   1825 	 * STPLEX prevents any threads from entering the stream from above. It
   1826 	 * can't be set before the call to link_addpassthru() because putnext
   1827 	 * from below may cause stream head I/O routines to be called and these
   1828 	 * routines assert that STPLEX is not set. After link_addpassthru()
   1829 	 * nothing may come from below since the pass queue syncq is blocked.
   1830 	 * Note also that STPLEX should be cleared before the call to
   1831 	 * link_rempassthru() since when messages start flowing to the stream
   1832 	 * head (e.g. because of message propagation from the pass queue) stream
   1833 	 * head I/O routines may be called with STPLEX flag set.
   1834 	 *
   1835 	 * When STPLEX is set, nothing may come into the stream from above and
   1836 	 * it is safe to do a setq which will change stream head. So, the
   1837 	 * correct sequence of actions is:
   1838 	 *
   1839 	 * 1) Set STRPLUMB
   1840 	 * 2) Call link_addpassthru()
   1841 	 * 3) Set STPLEX
   1842 	 * 4) Call setq and update the stream state
   1843 	 * 5) Clear STPLEX
   1844 	 * 6) Call link_rempassthru()
   1845 	 * 7) Clear STRPLUMB
   1846 	 *
   1847 	 * The same sequence applies to munlink() code.
   1848 	 */
   1849 	mutex_enter(&stpdown->sd_lock);
   1850 	stpdown->sd_flag |= STRPLUMB;
   1851 	mutex_exit(&stpdown->sd_lock);
   1852 	/*
   1853 	 * Add passthru queue below lower mux. This will block
   1854 	 * syncqs of lower muxs read queue during I_LINK/I_UNLINK.
   1855 	 */
   1856 	passq = link_addpassthru(stpdown);
   1857 
   1858 	mutex_enter(&stpdown->sd_lock);
   1859 	stpdown->sd_flag |= STPLEX;
   1860 	mutex_exit(&stpdown->sd_lock);
   1861 
   1862 	rq = _RD(stpdown->sd_wrq);
   1863 	/*
   1864 	 * There may be messages in the streamhead's syncq due to messages
   1865 	 * that arrived before link_addpassthru() was done. To avoid
   1866 	 * background processing of the syncq happening simultaneous with
   1867 	 * setq processing, we disable the streamhead syncq and wait until
   1868 	 * existing background thread finishes working on it.
   1869 	 */
   1870 	wait_sq_svc(rq->q_syncq);
   1871 	passyncq = passq->q_syncq;
   1872 	if (!(passyncq->sq_flags & SQ_BLOCKED))
   1873 		blocksq(passyncq, SQ_BLOCKED, 0);
   1874 
   1875 	ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
   1876 	ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
   1877 	rq->q_ptr = _WR(rq)->q_ptr = NULL;
   1878 
   1879 	/* setq might sleep in allocator - avoid holding locks. */
   1880 	/* Note: we are holding muxifier here. */
   1881 
   1882 	str = stp->sd_strtab;
   1883 	dp = &devimpl[getmajor(vp->v_rdev)];
   1884 	ASSERT(dp->d_str == str);
   1885 
   1886 	qflag = dp->d_qflag;
   1887 	sqtype = dp->d_sqtype;
   1888 
   1889 	/* create perdm_t if needed */
   1890 	if (NEED_DM(dp->d_dmp, qflag))
   1891 		dp->d_dmp = hold_dm(str, qflag, sqtype);
   1892 
   1893 	dmp = dp->d_dmp;
   1894 
   1895 	setq(rq, str->st_muxrinit, str->st_muxwinit, dmp, qflag, sqtype,
   1896 	    B_TRUE);
   1897 
   1898 	/*
   1899 	 * XXX Remove any "odd" messages from the queue.
   1900 	 * Keep only M_DATA, M_PROTO, M_PCPROTO.
   1901 	 */
   1902 	error = strdoioctl(stp, &strioc, FNATIVE,
   1903 	    K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
   1904 	if (error != 0) {
   1905 		lbfree(linkp);
   1906 
   1907 		if (!(passyncq->sq_flags & SQ_BLOCKED))
   1908 			blocksq(passyncq, SQ_BLOCKED, 0);
   1909 		/*
   1910 		 * Restore the stream head queue and then remove
   1911 		 * the passq. Turn off STPLEX before we turn on
   1912 		 * the stream by removing the passq.
   1913 		 */
   1914 		rq->q_ptr = _WR(rq)->q_ptr = stpdown;
   1915 		setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO,
   1916 		    B_TRUE);
   1917 
   1918 		mutex_enter(&stpdown->sd_lock);
   1919 		stpdown->sd_flag &= ~STPLEX;
   1920 		mutex_exit(&stpdown->sd_lock);
   1921 
   1922 		link_rempassthru(passq);
   1923 
   1924 		mutex_enter(&stpdown->sd_lock);
   1925 		stpdown->sd_flag &= ~STRPLUMB;
   1926 		/* Wakeup anyone waiting for STRPLUMB to clear. */
   1927 		cv_broadcast(&stpdown->sd_monitor);
   1928 		mutex_exit(&stpdown->sd_lock);
   1929 
   1930 		mutex_exit(&muxifier);
   1931 		netstack_rele(ss->ss_netstack);
   1932 		return (error);
   1933 	}
   1934 	mutex_enter(&fpdown->f_tlock);
   1935 	fpdown->f_count++;
   1936 	mutex_exit(&fpdown->f_tlock);
   1937 
   1938 	/*
   1939 	 * if we've made it here the linkage is all set up so we should also
   1940 	 * set up the layered driver linkages
   1941 	 */
   1942 
   1943 	ASSERT((cmd == I_LINK) || (cmd == I_PLINK));
   1944 	if (cmd == I_LINK) {
   1945 		ldi_mlink_fp(stp, fpdown, lhlink, LINKNORMAL);
   1946 	} else {
   1947 		ldi_mlink_fp(stp, fpdown, lhlink, LINKPERSIST);
   1948 	}
   1949 
   1950 	link_rempassthru(passq);
   1951 
   1952 	mux_addedge(stp, stpdown, linkp->li_lblk.l_index, ss);
   1953 
   1954 	/*
   1955 	 * Mark the upper stream as having dependent links
   1956 	 * so that strclose can clean it up.
   1957 	 */
   1958 	if (cmd == I_LINK) {
   1959 		mutex_enter(&stp->sd_lock);
   1960 		stp->sd_flag |= STRHASLINKS;
   1961 		mutex_exit(&stp->sd_lock);
   1962 	}
   1963 	/*
   1964 	 * Wake up any other processes that may have been
   1965 	 * waiting on the lower stream. These will all
   1966 	 * error out.
   1967 	 */
   1968 	mutex_enter(&stpdown->sd_lock);
   1969 	/* The passthru module is removed so we may release STRPLUMB */
   1970 	stpdown->sd_flag &= ~STRPLUMB;
   1971 	cv_broadcast(&rq->q_wait);
   1972 	cv_broadcast(&_WR(rq)->q_wait);
   1973 	cv_broadcast(&stpdown->sd_monitor);
   1974 	mutex_exit(&stpdown->sd_lock);
   1975 	mutex_exit(&muxifier);
   1976 	*rvalp = linkp->li_lblk.l_index;
   1977 	netstack_rele(ss->ss_netstack);
   1978 	return (0);
   1979 }
   1980 
   1981 int
   1982 mlink(vnode_t *vp, int cmd, int arg, cred_t *crp, int *rvalp, int lhlink)
   1983 {
   1984 	int		ret;
   1985 	struct file	*fpdown;
   1986 
   1987 	fpdown = getf(arg);
   1988 	ret = mlink_file(vp, cmd, fpdown, crp, rvalp, lhlink);
   1989 	if (fpdown != NULL)
   1990 		releasef(arg);
   1991 	return (ret);
   1992 }
   1993 
   1994 /*
   1995  * Unlink a multiplexor link. Stp is the controlling stream for the
   1996  * link, and linkp points to the link's entry in the linkinfo list.
   1997  * The muxifier lock must be held on entry and is dropped on exit.
   1998  *
   1999  * NOTE : Currently it is assumed that mux would process all the messages
   2000  * sitting on it's queue before ACKing the UNLINK. It is the responsibility
   2001  * of the mux to handle all the messages that arrive before UNLINK.
   2002  * If the mux has to send down messages on its lower stream before
   2003  * ACKing I_UNLINK, then it *should* know to handle messages even
   2004  * after the UNLINK is acked (actually it should be able to handle till we
   2005  * re-block the read side of the pass queue here). If the mux does not
   2006  * open up the lower stream, any messages that arrive during UNLINK
   2007  * will be put in the stream head. In the case of lower stream opening
   2008  * up, some messages might land in the stream head depending on when
   2009  * the message arrived and when the read side of the pass queue was
   2010  * re-blocked.
   2011  */
   2012 int
   2013 munlink(stdata_t *stp, linkinfo_t *linkp, int flag, cred_t *crp, int *rvalp,
   2014     str_stack_t *ss)
   2015 {
   2016 	struct strioctl strioc;
   2017 	struct stdata *stpdown;
   2018 	queue_t *rq, *wrq;
   2019 	queue_t	*passq;
   2020 	syncq_t *passyncq;
   2021 	int error = 0;
   2022 	file_t *fpdown;
   2023 
   2024 	ASSERT(MUTEX_HELD(&muxifier));
   2025 
   2026 	stpdown = linkp->li_fpdown->f_vnode->v_stream;
   2027 
   2028 	/*
   2029 	 * See the comment in mlink() concerning STRPLUMB/STPLEX flags.
   2030 	 */
   2031 	mutex_enter(&stpdown->sd_lock);
   2032 	stpdown->sd_flag |= STRPLUMB;
   2033 	mutex_exit(&stpdown->sd_lock);
   2034 
   2035 	/*
   2036 	 * Add passthru queue below lower mux. This will block
   2037 	 * syncqs of lower muxs read queue during I_LINK/I_UNLINK.
   2038 	 */
   2039 	passq = link_addpassthru(stpdown);
   2040 
   2041 	if ((flag & LINKTYPEMASK) == LINKNORMAL)
   2042 		strioc.ic_cmd = I_UNLINK;
   2043 	else
   2044 		strioc.ic_cmd = I_PUNLINK;
   2045 	strioc.ic_timout = INFTIM;
   2046 	strioc.ic_len = sizeof (struct linkblk);
   2047 	strioc.ic_dp = (char *)&linkp->li_lblk;
   2048 
   2049 	error = strdoioctl(stp, &strioc, FNATIVE,
   2050 	    K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
   2051 
   2052 	/*
   2053 	 * If there was an error and this is not called via strclose,
   2054 	 * return to the user. Otherwise, pretend there was no error
   2055 	 * and close the link.
   2056 	 */
   2057 	if (error) {
   2058 		if (flag & LINKCLOSE) {
   2059 			cmn_err(CE_WARN, "KERNEL: munlink: could not perform "
   2060 			    "unlink ioctl, closing anyway (%d)\n", error);
   2061 		} else {
   2062 			link_rempassthru(passq);
   2063 			mutex_enter(&stpdown->sd_lock);
   2064 			stpdown->sd_flag &= ~STRPLUMB;
   2065 			cv_broadcast(&stpdown->sd_monitor);
   2066 			mutex_exit(&stpdown->sd_lock);
   2067 			mutex_exit(&muxifier);
   2068 			return (error);
   2069 		}
   2070 	}
   2071 
   2072 	mux_rmvedge(stp, linkp->li_lblk.l_index, ss);
   2073 	fpdown = linkp->li_fpdown;
   2074 	lbfree(linkp);
   2075 
   2076 	/*
   2077 	 * We go ahead and drop muxifier here--it's a nasty global lock that
   2078 	 * can slow others down. It's okay to since attempts to mlink() this
   2079 	 * stream will be stopped because STPLEX is still set in the stdata
   2080 	 * structure, and munlink() is stopped because mux_rmvedge() and
   2081 	 * lbfree() have removed it from mux_nodes[] and linkinfo_list,
   2082 	 * respectively.  Note that we defer the closef() of fpdown until
   2083 	 * after we drop muxifier since strclose() can call munlinkall().
   2084 	 */
   2085 	mutex_exit(&muxifier);
   2086 
   2087 	wrq = stpdown->sd_wrq;
   2088 	rq = _RD(wrq);
   2089 
   2090 	/*
   2091 	 * Get rid of outstanding service procedure runs, before we make
   2092 	 * it a stream head, since a stream head doesn't have any service
   2093 	 * procedure.
   2094 	 */
   2095 	disable_svc(rq);
   2096 	wait_svc(rq);
   2097 
   2098 	/*
   2099 	 * Since we don't disable the syncq for QPERMOD, we wait for whatever
   2100 	 * is queued up to be finished. mux should take care that nothing is
   2101 	 * send down to this queue. We should do it now as we're going to block
   2102 	 * passyncq if it was unblocked.
   2103 	 */
   2104 	if (wrq->q_flag & QPERMOD) {
   2105 		syncq_t	*sq = wrq->q_syncq;
   2106 
   2107 		mutex_enter(SQLOCK(sq));
   2108 		while (wrq->q_sqflags & Q_SQQUEUED) {
   2109 			sq->sq_flags |= SQ_WANTWAKEUP;
   2110 			cv_wait(&sq->sq_wait, SQLOCK(sq));
   2111 		}
   2112 		mutex_exit(SQLOCK(sq));
   2113 	}
   2114 	passyncq = passq->q_syncq;
   2115 	if (!(passyncq->sq_flags & SQ_BLOCKED)) {
   2116 
   2117 		syncq_t *sq, *outer;
   2118 
   2119 		/*
   2120 		 * Messages could be flowing from underneath. We will
   2121 		 * block the read side of the passq. This would be
   2122 		 * sufficient for QPAIR and QPERQ muxes to ensure
   2123 		 * that no data is flowing up into this queue
   2124 		 * and hence no thread active in this instance of
   2125 		 * lower mux. But for QPERMOD and QMTOUTPERIM there
   2126 		 * could be messages on the inner and outer/inner
   2127 		 * syncqs respectively. We will wait for them to drain.
   2128 		 * Because passq is blocked messages end up in the syncq
   2129 		 * And qfill_syncq could possibly end up setting QFULL
   2130 		 * which will access the rq->q_flag. Hence, we have to
   2131 		 * acquire the QLOCK in setq.
   2132 		 *
   2133 		 * XXX Messages can also flow from top into this
   2134 		 * queue though the unlink is over (Ex. some instance
   2135 		 * in putnext() called from top that has still not
   2136 		 * accessed this queue. And also putq(lowerq) ?).
   2137 		 * Solution : How about blocking the l_qtop queue ?
   2138 		 * Do we really care about such pure D_MP muxes ?
   2139 		 */
   2140 
   2141 		blocksq(passyncq, SQ_BLOCKED, 0);
   2142 
   2143 		sq = rq->q_syncq;
   2144 		if ((outer = sq->sq_outer) != NULL) {
   2145 
   2146 			/*
   2147 			 * We have to just wait for the outer sq_count
   2148 			 * drop to zero. As this does not prevent new
   2149 			 * messages to enter the outer perimeter, this
   2150 			 * is subject to starvation.
   2151 			 *
   2152 			 * NOTE :Because of blocksq above, messages could
   2153 			 * be in the inner syncq only because of some
   2154 			 * thread holding the outer perimeter exclusively.
   2155 			 * Hence it would be sufficient to wait for the
   2156 			 * exclusive holder of the outer perimeter to drain
   2157 			 * the inner and outer syncqs. But we will not depend
   2158 			 * on this feature and hence check the inner syncqs
   2159 			 * separately.
   2160 			 */
   2161 			wait_syncq(outer);
   2162 		}
   2163 
   2164 
   2165 		/*
   2166 		 * There could be messages destined for
   2167 		 * this queue. Let the exclusive holder
   2168 		 * drain it.
   2169 		 */
   2170 
   2171 		wait_syncq(sq);
   2172 		ASSERT((rq->q_flag & QPERMOD) ||
   2173 		    ((rq->q_syncq->sq_head == NULL) &&
   2174 		    (_WR(rq)->q_syncq->sq_head == NULL)));
   2175 	}
   2176 
   2177 	/*
   2178 	 * We haven't taken care of QPERMOD case yet. QPERMOD is a special
   2179 	 * case as we don't disable its syncq or remove it off the syncq
   2180 	 * service list.
   2181 	 */
   2182 	if (rq->q_flag & QPERMOD) {
   2183 		syncq_t	*sq = rq->q_syncq;
   2184 
   2185 		mutex_enter(SQLOCK(sq));
   2186 		while (rq->q_sqflags & Q_SQQUEUED) {
   2187 			sq->sq_flags |= SQ_WANTWAKEUP;
   2188 			cv_wait(&sq->sq_wait, SQLOCK(sq));
   2189 		}
   2190 		mutex_exit(SQLOCK(sq));
   2191 	}
   2192 
   2193 	/*
   2194 	 * flush_syncq changes states only when there are some messages to
   2195 	 * free, i.e. when it returns non-zero value to return.
   2196 	 */
   2197 	ASSERT(flush_syncq(rq->q_syncq, rq) == 0);
   2198 	ASSERT(flush_syncq(wrq->q_syncq, wrq) == 0);
   2199 
   2200 	/*
   2201 	 * Nobody else should know about this queue now.
   2202 	 * If the mux did not process the messages before
   2203 	 * acking the I_UNLINK, free them now.
   2204 	 */
   2205 
   2206 	flushq(rq, FLUSHALL);
   2207 	flushq(_WR(rq), FLUSHALL);
   2208 
   2209 	/*
   2210 	 * Convert the mux lower queue into a stream head queue.
   2211 	 * Turn off STPLEX before we turn on the stream by removing the passq.
   2212 	 */
   2213 	rq->q_ptr = wrq->q_ptr = stpdown;
   2214 	setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, B_TRUE);
   2215 
   2216 	ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
   2217 	ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
   2218 
   2219 	enable_svc(rq);
   2220 
   2221 	/*
   2222 	 * Now it is a proper stream, so STPLEX is cleared. But STRPLUMB still
   2223 	 * needs to be set to prevent reopen() of the stream - such reopen may
   2224 	 * try to call non-existent pass queue open routine and panic.
   2225 	 */
   2226 	mutex_enter(&stpdown->sd_lock);
   2227 	stpdown->sd_flag &= ~STPLEX;
   2228 	mutex_exit(&stpdown->sd_lock);
   2229 
   2230 	ASSERT(((flag & LINKTYPEMASK) == LINKNORMAL) ||
   2231 	    ((flag & LINKTYPEMASK) == LINKPERSIST));
   2232 
   2233 	/* clean up the layered driver linkages */
   2234 	if ((flag & LINKTYPEMASK) == LINKNORMAL) {
   2235 		ldi_munlink_fp(stp, fpdown, LINKNORMAL);
   2236 	} else {
   2237 		ldi_munlink_fp(stp, fpdown, LINKPERSIST);
   2238 	}
   2239 
   2240 	link_rempassthru(passq);
   2241 
   2242 	/*
   2243 	 * Now all plumbing changes are finished and STRPLUMB is no
   2244 	 * longer needed.
   2245 	 */
   2246 	mutex_enter(&stpdown->sd_lock);
   2247 	stpdown->sd_flag &= ~STRPLUMB;
   2248 	cv_broadcast(&stpdown->sd_monitor);
   2249 	mutex_exit(&stpdown->sd_lock);
   2250 
   2251 	(void) closef(fpdown);
   2252 	return (0);
   2253 }
   2254 
   2255 /*
   2256  * Unlink all multiplexor links for which stp is the controlling stream.
   2257  * Return 0, or a non-zero errno on failure.
   2258  */
   2259 int
   2260 munlinkall(stdata_t *stp, int flag, cred_t *crp, int *rvalp, str_stack_t *ss)
   2261 {
   2262 	linkinfo_t *linkp;
   2263 	int error = 0;
   2264 
   2265 	mutex_enter(&muxifier);
   2266 	while (linkp = findlinks(stp, 0, flag, ss)) {
   2267 		/*
   2268 		 * munlink() releases the muxifier lock.
   2269 		 */
   2270 		if (error = munlink(stp, linkp, flag, crp, rvalp, ss))
   2271 			return (error);
   2272 		mutex_enter(&muxifier);
   2273 	}
   2274 	mutex_exit(&muxifier);
   2275 	return (0);
   2276 }
   2277 
   2278 /*
   2279  * A multiplexor link has been made. Add an
   2280  * edge to the directed graph.
   2281  */
   2282 void
   2283 mux_addedge(stdata_t *upstp, stdata_t *lostp, int muxid, str_stack_t *ss)
   2284 {
   2285 	struct mux_node *np;
   2286 	struct mux_edge *ep;
   2287 	major_t upmaj;
   2288 	major_t lomaj;
   2289 
   2290 	upmaj = getmajor(upstp->sd_vnode->v_rdev);
   2291 	lomaj = getmajor(lostp->sd_vnode->v_rdev);
   2292 	np = &ss->ss_mux_nodes[upmaj];
   2293 	if (np->mn_outp) {
   2294 		ep = np->mn_outp;
   2295 		while (ep->me_nextp)
   2296 			ep = ep->me_nextp;
   2297 		ep->me_nextp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
   2298 		ep = ep->me_nextp;
   2299 	} else {
   2300 		np->mn_outp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
   2301 		ep = np->mn_outp;
   2302 	}
   2303 	ep->me_nextp = NULL;
   2304 	ep->me_muxid = muxid;
   2305 	/*
   2306 	 * Save the dev_t for the purposes of str_stack_shutdown.
   2307 	 * str_stack_shutdown assumes that the device allows reopen, since
   2308 	 * this dev_t is the one after any cloning by xx_open().
   2309 	 * Would prefer finding the dev_t from before any cloning,
   2310 	 * but specfs doesn't retain that.
   2311 	 */
   2312 	ep->me_dev = upstp->sd_vnode->v_rdev;
   2313 	if (lostp->sd_vnode->v_type == VFIFO)
   2314 		ep->me_nodep = NULL;
   2315 	else
   2316 		ep->me_nodep = &ss->ss_mux_nodes[lomaj];
   2317 }
   2318 
   2319 /*
   2320  * A multiplexor link has been removed. Remove the
   2321  * edge in the directed graph.
   2322  */
   2323 void
   2324 mux_rmvedge(stdata_t *upstp, int muxid, str_stack_t *ss)
   2325 {
   2326 	struct mux_node *np;
   2327 	struct mux_edge *ep;
   2328 	struct mux_edge *pep = NULL;
   2329 	major_t upmaj;
   2330 
   2331 	upmaj = getmajor(upstp->sd_vnode->v_rdev);
   2332 	np = &ss->ss_mux_nodes[upmaj];
   2333 	ASSERT(np->mn_outp != NULL);
   2334 	ep = np->mn_outp;
   2335 	while (ep) {
   2336 		if (ep->me_muxid == muxid) {
   2337 			if (pep)
   2338 				pep->me_nextp = ep->me_nextp;
   2339 			else
   2340 				np->mn_outp = ep->me_nextp;
   2341 			kmem_free(ep, sizeof (struct mux_edge));
   2342 			return;
   2343 		}
   2344 		pep = ep;
   2345 		ep = ep->me_nextp;
   2346 	}
   2347 	ASSERT(0);	/* should not reach here */
   2348 }
   2349 
   2350 /*
   2351  * Translate the device flags (from conf.h) to the corresponding
   2352  * qflag and sq_flag (type) values.
   2353  */
   2354 int
   2355 devflg_to_qflag(struct streamtab *stp, uint32_t devflag, uint32_t *qflagp,
   2356 	uint32_t *sqtypep)
   2357 {
   2358 	uint32_t qflag = 0;
   2359 	uint32_t sqtype = 0;
   2360 
   2361 	if (devflag & _D_OLD)
   2362 		goto bad;
   2363 
   2364 	/* Inner perimeter presence and scope */
   2365 	switch (devflag & D_MTINNER_MASK) {
   2366 	case D_MP:
   2367 		qflag |= QMTSAFE;
   2368 		sqtype |= SQ_CI;
   2369 		break;
   2370 	case D_MTPERQ|D_MP:
   2371 		qflag |= QPERQ;
   2372 		break;
   2373 	case D_MTQPAIR|D_MP:
   2374 		qflag |= QPAIR;
   2375 		break;
   2376 	case D_MTPERMOD|D_MP:
   2377 		qflag |= QPERMOD;
   2378 		break;
   2379 	default:
   2380 		goto bad;
   2381 	}
   2382 
   2383 	/* Outer perimeter */
   2384 	if (devflag & D_MTOUTPERIM) {
   2385 		switch (devflag & D_MTINNER_MASK) {
   2386 		case D_MP:
   2387 		case D_MTPERQ|D_MP:
   2388 		case D_MTQPAIR|D_MP:
   2389 			break;
   2390 		default:
   2391 			goto bad;
   2392 		}
   2393 		qflag |= QMTOUTPERIM;
   2394 	}
   2395 
   2396 	/* Inner perimeter modifiers */
   2397 	if (devflag & D_MTINNER_MOD) {
   2398 		switch (devflag & D_MTINNER_MASK) {
   2399 		case D_MP:
   2400 			goto bad;
   2401 		default:
   2402 			break;
   2403 		}
   2404 		if (devflag & D_MTPUTSHARED)
   2405 			sqtype |= SQ_CIPUT;
   2406 		if (devflag & _D_MTOCSHARED) {
   2407 			/*
   2408 			 * The code in putnext assumes that it has the
   2409 			 * highest concurrency by not checking sq_count.
   2410 			 * Thus _D_MTOCSHARED can only be supported when
   2411 			 * D_MTPUTSHARED is set.
   2412 			 */
   2413 			if (!(devflag & D_MTPUTSHARED))
   2414 				goto bad;
   2415 			sqtype |= SQ_CIOC;
   2416 		}
   2417 		if (devflag & _D_MTCBSHARED) {
   2418 			/*
   2419 			 * The code in putnext assumes that it has the
   2420 			 * highest concurrency by not checking sq_count.
   2421 			 * Thus _D_MTCBSHARED can only be supported when
   2422 			 * D_MTPUTSHARED is set.
   2423 			 */
   2424 			if (!(devflag & D_MTPUTSHARED))
   2425 				goto bad;
   2426 			sqtype |= SQ_CICB;
   2427 		}
   2428 		if (devflag & _D_MTSVCSHARED) {
   2429 			/*
   2430 			 * The code in putnext assumes that it has the
   2431 			 * highest concurrency by not checking sq_count.
   2432 			 * Thus _D_MTSVCSHARED can only be supported when
   2433 			 * D_MTPUTSHARED is set. Also _D_MTSVCSHARED is
   2434 			 * supported only for QPERMOD.
   2435 			 */
   2436 			if (!(devflag & D_MTPUTSHARED) || !(qflag & QPERMOD))
   2437 				goto bad;
   2438 			sqtype |= SQ_CISVC;
   2439 		}
   2440 	}
   2441 
   2442 	/* Default outer perimeter concurrency */
   2443 	sqtype |= SQ_CO;
   2444 
   2445 	/* Outer perimeter modifiers */
   2446 	if (devflag & D_MTOCEXCL) {
   2447 		if (!(devflag & D_MTOUTPERIM)) {
   2448 			/* No outer perimeter */
   2449 			goto bad;
   2450 		}
   2451 		sqtype &= ~SQ_COOC;
   2452 	}
   2453 
   2454 	/* Synchronous Streams extended qinit structure */
   2455 	if (devflag & D_SYNCSTR)
   2456 		qflag |= QSYNCSTR;
   2457 
   2458 	/*
   2459 	 * Private flag used by a transport module to indicate
   2460 	 * to sockfs that it supports direct-access mode without
   2461 	 * having to go through STREAMS.
   2462 	 */
   2463 	if (devflag & _D_DIRECT) {
   2464 		/* Reject unless the module is fully-MT (no perimeter) */
   2465 		if ((qflag & QMT_TYPEMASK) != QMTSAFE)
   2466 			goto bad;
   2467 		qflag |= _QDIRECT;
   2468 	}
   2469 
   2470 	*qflagp = qflag;
   2471 	*sqtypep = sqtype;
   2472 	return (0);
   2473 
   2474 bad:
   2475 	cmn_err(CE_WARN,
   2476 	    "stropen: bad MT flags (0x%x) in driver '%s'",
   2477 	    (int)(qflag & D_MTSAFETY_MASK),
   2478 	    stp->st_rdinit->qi_minfo->mi_idname);
   2479 
   2480 	return (EINVAL);
   2481 }
   2482 
   2483 /*
   2484  * Set the interface values for a pair of queues (qinit structure,
   2485  * packet sizes, water marks).
   2486  * setq assumes that the caller does not have a claim (entersq or claimq)
   2487  * on the queue.
   2488  */
   2489 void
   2490 setq(queue_t *rq, struct qinit *rinit, struct qinit *winit,
   2491     perdm_t *dmp, uint32_t qflag, uint32_t sqtype, boolean_t lock_needed)
   2492 {
   2493 	queue_t *wq;
   2494 	syncq_t	*sq, *outer;
   2495 
   2496 	ASSERT(rq->q_flag & QREADR);
   2497 	ASSERT((qflag & QMT_TYPEMASK) != 0);
   2498 	IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
   2499 
   2500 	wq = _WR(rq);
   2501 	rq->q_qinfo = rinit;
   2502 	rq->q_hiwat = rinit->qi_minfo->mi_hiwat;
   2503 	rq->q_lowat = rinit->qi_minfo->mi_lowat;
   2504 	rq->q_minpsz = rinit->qi_minfo->mi_minpsz;
   2505 	rq->q_maxpsz = rinit->qi_minfo->mi_maxpsz;
   2506 	wq->q_qinfo = winit;
   2507 	wq->q_hiwat = winit->qi_minfo->mi_hiwat;
   2508 	wq->q_lowat = winit->qi_minfo->mi_lowat;
   2509 	wq->q_minpsz = winit->qi_minfo->mi_minpsz;
   2510 	wq->q_maxpsz = winit->qi_minfo->mi_maxpsz;
   2511 
   2512 	/* Remove old syncqs */
   2513 	sq = rq->q_syncq;
   2514 	outer = sq->sq_outer;
   2515 	if (outer != NULL) {
   2516 		ASSERT(wq->q_syncq->sq_outer == outer);
   2517 		outer_remove(outer, rq->q_syncq);
   2518 		if (wq->q_syncq != rq->q_syncq)
   2519 			outer_remove(outer, wq->q_syncq);
   2520 	}
   2521 	ASSERT(sq->sq_outer == NULL);
   2522 	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
   2523 
   2524 	if (sq != SQ(rq)) {
   2525 		if (!(rq->q_flag & QPERMOD))
   2526 			free_syncq(sq);
   2527 		if (wq->q_syncq == rq->q_syncq)
   2528 			wq->q_syncq = NULL;
   2529 		rq->q_syncq = NULL;
   2530 	}
   2531 	if (wq->q_syncq != NULL && wq->q_syncq != sq &&
   2532 	    wq->q_syncq != SQ(rq)) {
   2533 		free_syncq(wq->q_syncq);
   2534 		wq->q_syncq = NULL;
   2535 	}
   2536 	ASSERT(rq->q_syncq == NULL || (rq->q_syncq->sq_head == NULL &&
   2537 	    rq->q_syncq->sq_tail == NULL));
   2538 	ASSERT(wq->q_syncq == NULL || (wq->q_syncq->sq_head == NULL &&
   2539 	    wq->q_syncq->sq_tail == NULL));
   2540 
   2541 	if (!(rq->q_flag & QPERMOD) &&
   2542 	    rq->q_syncq != NULL && rq->q_syncq->sq_ciputctrl != NULL) {
   2543 		ASSERT(rq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
   2544 		SUMCHECK_CIPUTCTRL_COUNTS(rq->q_syncq->sq_ciputctrl,
   2545 		    rq->q_syncq->sq_nciputctrl, 0);
   2546 		ASSERT(ciputctrl_cache != NULL);
   2547 		kmem_cache_free(ciputctrl_cache, rq->q_syncq->sq_ciputctrl);
   2548 		rq->q_syncq->sq_ciputctrl = NULL;
   2549 		rq->q_syncq->sq_nciputctrl = 0;
   2550 	}
   2551 
   2552 	if (!(wq->q_flag & QPERMOD) &&
   2553 	    wq->q_syncq != NULL && wq->q_syncq->sq_ciputctrl != NULL) {
   2554 		ASSERT(wq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
   2555 		SUMCHECK_CIPUTCTRL_COUNTS(wq->q_syncq->sq_ciputctrl,
   2556 		    wq->q_syncq->sq_nciputctrl, 0);
   2557 		ASSERT(ciputctrl_cache != NULL);
   2558 		kmem_cache_free(ciputctrl_cache, wq->q_syncq->sq_ciputctrl);
   2559 		wq->q_syncq->sq_ciputctrl = NULL;
   2560 		wq->q_syncq->sq_nciputctrl = 0;
   2561 	}
   2562 
   2563 	sq = SQ(rq);
   2564 	ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
   2565 	ASSERT(sq->sq_outer == NULL);
   2566 	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
   2567 
   2568 	/*
   2569 	 * Create syncqs based on qflag and sqtype. Set the SQ_TYPES_IN_FLAGS
   2570 	 * bits in sq_flag based on the sqtype.
   2571 	 */
   2572 	ASSERT((sq->sq_flags & ~SQ_TYPES_IN_FLAGS) == 0);
   2573 
   2574 	rq->q_syncq = wq->q_syncq = sq;
   2575 	sq->sq_type = sqtype;
   2576 	sq->sq_flags = (sqtype & SQ_TYPES_IN_FLAGS);
   2577 
   2578 	/*
   2579 	 *  We are making sq_svcflags zero,
   2580 	 *  resetting SQ_DISABLED in case it was set by
   2581 	 *  wait_svc() in the munlink path.
   2582 	 *
   2583 	 */
   2584 	ASSERT((sq->sq_svcflags & SQ_SERVICE) == 0);
   2585 	sq->sq_svcflags = 0;
   2586 
   2587 	/*
   2588 	 * We need to acquire the lock here for the mlink and munlink case,
   2589 	 * where canputnext, backenable, etc can access the q_flag.
   2590 	 */
   2591 	if (lock_needed) {
   2592 		mutex_enter(QLOCK(rq));
   2593 		rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
   2594 		mutex_exit(QLOCK(rq));
   2595 		mutex_enter(QLOCK(wq));
   2596 		wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
   2597 		mutex_exit(QLOCK(wq));
   2598 	} else {
   2599 		rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
   2600 		wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
   2601 	}
   2602 
   2603 	if (qflag & QPERQ) {
   2604 		/* Allocate a separate syncq for the write side */
   2605 		sq = new_syncq();
   2606 		sq->sq_type = rq->q_syncq->sq_type;
   2607 		sq->sq_flags = rq->q_syncq->sq_flags;
   2608 		ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
   2609 		    sq->sq_oprev == NULL);
   2610 		wq->q_syncq = sq;
   2611 	}
   2612 	if (qflag & QPERMOD) {
   2613 		sq = dmp->dm_sq;
   2614 
   2615 		/*
   2616 		 * Assert that we do have an inner perimeter syncq and that it
   2617 		 * does not have an outer perimeter associated with it.
   2618 		 */
   2619 		ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
   2620 		    sq->sq_oprev == NULL);
   2621 		rq->q_syncq = wq->q_syncq = sq;
   2622 	}
   2623 	if (qflag & QMTOUTPERIM) {
   2624 		outer = dmp->dm_sq;
   2625 
   2626 		ASSERT(outer->sq_outer == NULL);
   2627 		outer_insert(outer, rq->q_syncq);
   2628 		if (wq->q_syncq != rq->q_syncq)
   2629 			outer_insert(outer, wq->q_syncq);
   2630 	}
   2631 	ASSERT((rq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
   2632 	    (rq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
   2633 	ASSERT((wq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
   2634 	    (wq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
   2635 	ASSERT((rq->q_flag & QMT_TYPEMASK) == (qflag & QMT_TYPEMASK));
   2636 
   2637 	/*
   2638 	 * Initialize struio() types.
   2639 	 */
   2640 	rq->q_struiot =
   2641 	    (rq->q_flag & QSYNCSTR) ? rinit->qi_struiot : STRUIOT_NONE;
   2642 	wq->q_struiot =
   2643 	    (wq->q_flag & QSYNCSTR) ? winit->qi_struiot : STRUIOT_NONE;
   2644 }
   2645 
   2646 perdm_t *
   2647 hold_dm(struct streamtab *str, uint32_t qflag, uint32_t sqtype)
   2648 {
   2649 	syncq_t	*sq;
   2650 	perdm_t	**pp;
   2651 	perdm_t	*p;
   2652 	perdm_t	*dmp;
   2653 
   2654 	ASSERT(str != NULL);
   2655 	ASSERT(qflag & (QPERMOD | QMTOUTPERIM));
   2656 
   2657 	rw_enter(&perdm_rwlock, RW_READER);
   2658 	for (p = perdm_list; p != NULL; p = p->dm_next) {
   2659 		if (p->dm_str == str) {	/* found one */
   2660 			atomic_add_32(&(p->dm_ref), 1);
   2661 			rw_exit(&perdm_rwlock);
   2662 			return (p);
   2663 		}
   2664 	}
   2665 	rw_exit(&perdm_rwlock);
   2666 
   2667 	sq = new_syncq();
   2668 	if (qflag & QPERMOD) {
   2669 		sq->sq_type = sqtype | SQ_PERMOD;
   2670 		sq->sq_flags = sqtype & SQ_TYPES_IN_FLAGS;
   2671 	} else {
   2672 		ASSERT(qflag & QMTOUTPERIM);
   2673 		sq->sq_onext = sq->sq_oprev = sq;
   2674 	}
   2675 
   2676 	dmp = kmem_alloc(sizeof (perdm_t), KM_SLEEP);
   2677 	dmp->dm_sq = sq;
   2678 	dmp->dm_str = str;
   2679 	dmp->dm_ref = 1;
   2680 	dmp->dm_next = NULL;
   2681 
   2682 	rw_enter(&perdm_rwlock, RW_WRITER);
   2683 	for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) {
   2684 		if (p->dm_str == str) {	/* already present */
   2685 			p->dm_ref++;
   2686 			rw_exit(&perdm_rwlock);
   2687 			free_syncq(sq);
   2688 			kmem_free(dmp, sizeof (perdm_t));
   2689 			return (p);
   2690 		}
   2691 	}
   2692 
   2693 	*pp = dmp;
   2694 	rw_exit(&perdm_rwlock);
   2695 	return (dmp);
   2696 }
   2697 
   2698 void
   2699 rele_dm(perdm_t *dmp)
   2700 {
   2701 	perdm_t **pp;
   2702 	perdm_t *p;
   2703 
   2704 	rw_enter(&perdm_rwlock, RW_WRITER);
   2705 	ASSERT(dmp->dm_ref > 0);
   2706 
   2707 	if (--dmp->dm_ref > 0) {
   2708 		rw_exit(&perdm_rwlock);
   2709 		return;
   2710 	}
   2711 
   2712 	for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next))
   2713 		if (p == dmp)
   2714 			break;
   2715 	ASSERT(p == dmp);
   2716 	*pp = p->dm_next;
   2717 	rw_exit(&perdm_rwlock);
   2718 
   2719 	/*
   2720 	 * Wait for any background processing that relies on the
   2721 	 * syncq to complete before it is freed.
   2722 	 */
   2723 	wait_sq_svc(p->dm_sq);
   2724 	free_syncq(p->dm_sq);
   2725 	kmem_free(p, sizeof (perdm_t));
   2726 }
   2727 
   2728 /*
   2729  * Make a protocol message given control and data buffers.
   2730  * n.b., this can block; be careful of what locks you hold when calling it.
   2731  *
   2732  * If sd_maxblk is less than *iosize this routine can fail part way through
   2733  * (due to an allocation failure). In this case on return *iosize will contain
   2734  * the amount that was consumed. Otherwise *iosize will not be modified
   2735  * i.e. it will contain the amount that was consumed.
   2736  */
   2737 int
   2738 strmakemsg(
   2739 	struct strbuf *mctl,
   2740 	ssize_t *iosize,
   2741 	struct uio *uiop,
   2742 	stdata_t *stp,
   2743 	int32_t flag,
   2744 	mblk_t **mpp)
   2745 {
   2746 	mblk_t *mpctl = NULL;
   2747 	mblk_t *mpdata = NULL;
   2748 	int error;
   2749 
   2750 	ASSERT(uiop != NULL);
   2751 
   2752 	*mpp = NULL;
   2753 	/* Create control part, if any */
   2754 	if ((mctl != NULL) && (mctl->len >= 0)) {
   2755 		error = strmakectl(mctl, flag, uiop->uio_fmode, &mpctl);
   2756 		if (error)
   2757 			return (error);
   2758 	}
   2759 	/* Create data part, if any */
   2760 	if (*iosize >= 0) {
   2761 		error = strmakedata(iosize, uiop, stp, flag, &mpdata);
   2762 		if (error) {
   2763 			freemsg(mpctl);
   2764 			return (error);
   2765 		}
   2766 	}
   2767 	if (mpctl != NULL) {
   2768 		if (mpdata != NULL)
   2769 			linkb(mpctl, mpdata);
   2770 		*mpp = mpctl;
   2771 	} else {
   2772 		*mpp = mpdata;
   2773 	}
   2774 	return (0);
   2775 }
   2776 
   2777 /*
   2778  * Make the control part of a protocol message given a control buffer.
   2779  * n.b., this can block; be careful of what locks you hold when calling it.
   2780  */
   2781 int
   2782 strmakectl(
   2783 	struct strbuf *mctl,
   2784 	int32_t flag,
   2785 	int32_t fflag,
   2786 	mblk_t **mpp)
   2787 {
   2788 	mblk_t *bp = NULL;
   2789 	unsigned char msgtype;
   2790 	int error = 0;
   2791 	cred_t *cr = CRED();
   2792 
   2793 	/* We do not support interrupt threads using the stream head to send */
   2794 	ASSERT(cr != NULL);
   2795 
   2796 	*mpp = NULL;
   2797 	/*
   2798 	 * Create control part of message, if any.
   2799 	 */
   2800 	if ((mctl != NULL) && (mctl->len >= 0)) {
   2801 		caddr_t base;
   2802 		int ctlcount;
   2803 		int allocsz;
   2804 
   2805 		if (flag & RS_HIPRI)
   2806 			msgtype = M_PCPROTO;
   2807 		else
   2808 			msgtype = M_PROTO;
   2809 
   2810 		ctlcount = mctl->len;
   2811 		base = mctl->buf;
   2812 
   2813 		/*
   2814 		 * Give modules a better chance to reuse M_PROTO/M_PCPROTO
   2815 		 * blocks by increasing the size to something more usable.
   2816 		 */
   2817 		allocsz = MAX(ctlcount, 64);
   2818 
   2819 		/*
   2820 		 * Range checking has already been done; simply try
   2821 		 * to allocate a message block for the ctl part.
   2822 		 */
   2823 		while ((bp = allocb_cred(allocsz, cr,
   2824 		    curproc->p_pid)) == NULL) {
   2825 			if (fflag & (FNDELAY|FNONBLOCK))
   2826 				return (EAGAIN);
   2827 			if (error = strwaitbuf(allocsz, BPRI_MED))
   2828 				return (error);
   2829 		}
   2830 
   2831 		bp->b_datap->db_type = msgtype;
   2832 		if (copyin(base, bp->b_wptr, ctlcount)) {
   2833 			freeb(bp);
   2834 			return (EFAULT);
   2835 		}
   2836 		bp->b_wptr += ctlcount;
   2837 	}
   2838 	*mpp = bp;
   2839 	return (0);
   2840 }
   2841 
   2842 /*
   2843  * Make a protocol message given data buffers.
   2844  * n.b., this can block; be careful of what locks you hold when calling it.
   2845  *
   2846  * If sd_maxblk is less than *iosize this routine can fail part way through
   2847  * (due to an allocation failure). In this case on return *iosize will contain
   2848  * the amount that was consumed. Otherwise *iosize will not be modified
   2849  * i.e. it will contain the amount that was consumed.
   2850  */
   2851 int
   2852 strmakedata(
   2853 	ssize_t   *iosize,
   2854 	struct uio *uiop,
   2855 	stdata_t *stp,
   2856 	int32_t flag,
   2857 	mblk_t **mpp)
   2858 {
   2859 	mblk_t *mp = NULL;
   2860 	mblk_t *bp;
   2861 	int wroff = (int)stp->sd_wroff;
   2862 	int tail_len = (int)stp->sd_tail;
   2863 	int extra = wroff + tail_len;
   2864 	int error = 0;
   2865 	ssize_t maxblk;
   2866 	ssize_t count = *iosize;
   2867 	cred_t *cr;
   2868 
   2869 	*mpp = NULL;
   2870 	if (count < 0)
   2871 		return (0);
   2872 
   2873 	/* We do not support interrupt threads using the stream head to send */
   2874 	cr = CRED();
   2875 	ASSERT(cr != NULL);
   2876 
   2877 	maxblk = stp->sd_maxblk;
   2878 	if (maxblk == INFPSZ)
   2879 		maxblk = count;
   2880 
   2881 	/*
   2882 	 * Create data part of message, if any.
   2883 	 */
   2884 	do {
   2885 		ssize_t size;
   2886 		dblk_t  *dp;
   2887 
   2888 		ASSERT(uiop);
   2889 
   2890 		size = MIN(count, maxblk);
   2891 
   2892 		while ((bp = allocb_cred(size + extra, cr,
   2893 		    curproc->p_pid)) == NULL) {
   2894 			error = EAGAIN;
   2895 			if ((uiop->uio_fmode & (FNDELAY|FNONBLOCK)) ||
   2896 			    (error = strwaitbuf(size + extra, BPRI_MED)) != 0) {
   2897 				if (count == *iosize) {
   2898 					freemsg(mp);
   2899 					return (error);
   2900 				} else {
   2901 					*iosize -= count;
   2902 					*mpp = mp;
   2903 					return (0);
   2904 				}
   2905 			}
   2906 		}
   2907 		dp = bp->b_datap;
   2908 		dp->db_cpid = curproc->p_pid;
   2909 		ASSERT(wroff <= dp->db_lim - bp->b_wptr);
   2910 		bp->b_wptr = bp->b_rptr = bp->b_rptr + wroff;
   2911 
   2912 		if (flag & STRUIO_POSTPONE) {
   2913 			/*
   2914 			 * Setup the stream uio portion of the
   2915 			 * dblk for subsequent use by struioget().
   2916 			 */
   2917 			dp->db_struioflag = STRUIO_SPEC;
   2918 			dp->db_cksumstart = 0;
   2919 			dp->db_cksumstuff = 0;
   2920 			dp->db_cksumend = size;
   2921 			*(long long *)dp->db_struioun.data = 0ll;
   2922 			bp->b_wptr += size;
   2923 		} else {
   2924 			if (stp->sd_copyflag & STRCOPYCACHED)
   2925 				uiop->uio_extflg |= UIO_COPY_CACHED;
   2926 
   2927 			if (size != 0) {
   2928 				error = uiomove(bp->b_wptr, size, UIO_WRITE,
   2929 				    uiop);
   2930 				if (error != 0) {
   2931 					freeb(bp);
   2932 					freemsg(mp);
   2933 					return (error);
   2934 				}
   2935 			}
   2936 			bp->b_wptr += size;
   2937 
   2938 			if (stp->sd_wputdatafunc != NULL) {
   2939 				mblk_t *newbp;
   2940 
   2941 				newbp = (stp->sd_wputdatafunc)(stp->sd_vnode,
   2942 				    bp, NULL, NULL, NULL, NULL);
   2943 				if (newbp == NULL) {
   2944 					freeb(bp);
   2945 					freemsg(mp);
   2946 					return (ECOMM);
   2947 				}
   2948 				bp = newbp;
   2949 			}
   2950 		}
   2951 
   2952 		count -= size;
   2953 
   2954 		if (mp == NULL)
   2955 			mp = bp;
   2956 		else
   2957 			linkb(mp, bp);
   2958 	} while (count > 0);
   2959 
   2960 	*mpp = mp;
   2961 	return (0);
   2962 }
   2963 
   2964 /*
   2965  * Wait for a buffer to become available. Return non-zero errno
   2966  * if not able to wait, 0 if buffer is probably there.
   2967  */
   2968 int
   2969 strwaitbuf(size_t size, int pri)
   2970 {
   2971 	bufcall_id_t id;
   2972 
   2973 	mutex_enter(&bcall_monitor);
   2974 	if ((id = bufcall(size, pri, (void (*)(void *))cv_broadcast,
   2975 	    &ttoproc(curthread)->p_flag_cv)) == 0) {
   2976 		mutex_exit(&bcall_monitor);
   2977 		return (ENOSR);
   2978 	}
   2979 	if (!cv_wait_sig(&(ttoproc(curthread)->p_flag_cv), &bcall_monitor)) {
   2980 		unbufcall(id);
   2981 		mutex_exit(&bcall_monitor);
   2982 		return (EINTR);
   2983 	}
   2984 	unbufcall(id);
   2985 	mutex_exit(&bcall_monitor);
   2986 	return (0);
   2987 }
   2988 
   2989 /*
   2990  * This function waits for a read or write event to happen on a stream.
   2991  * fmode can specify FNDELAY and/or FNONBLOCK.
   2992  * The timeout is in ms with -1 meaning infinite.
   2993  * The flag values work as follows:
   2994  *	READWAIT	Check for read side errors, send M_READ
   2995  *	GETWAIT		Check for read side errors, no M_READ
   2996  *	WRITEWAIT	Check for write side errors.
   2997  *	NOINTR		Do not return error if nonblocking or timeout.
   2998  * 	STR_NOERROR	Ignore all errors except STPLEX.
   2999  *	STR_NOSIG	Ignore/hold signals during the duration of the call.
   3000  *	STR_PEEK	Pass through the strgeterr().
   3001  */
   3002 int
   3003 strwaitq(stdata_t *stp, int flag, ssize_t count, int fmode, clock_t timout,
   3004     int *done)
   3005 {
   3006 	int slpflg, errs;
   3007 	int error;
   3008 	kcondvar_t *sleepon;
   3009 	mblk_t *mp;
   3010 	ssize_t *rd_count;
   3011 	clock_t rval;
   3012 
   3013 	ASSERT(MUTEX_HELD(&stp->sd_lock));
   3014 	if ((flag & READWAIT) || (flag & GETWAIT)) {
   3015 		slpflg = RSLEEP;
   3016 		sleepon = &_RD(stp->sd_wrq)->q_wait;
   3017 		errs = STRDERR|STPLEX;
   3018 	} else {
   3019 		slpflg = WSLEEP;
   3020 		sleepon = &stp->sd_wrq->q_wait;
   3021 		errs = STWRERR|STRHUP|STPLEX;
   3022 	}
   3023 	if (flag & STR_NOERROR)
   3024 		errs = STPLEX;
   3025 
   3026 	if (stp->sd_wakeq & slpflg) {
   3027 		/*
   3028 		 * A strwakeq() is pending, no need to sleep.
   3029 		 */
   3030 		stp->sd_wakeq &= ~slpflg;
   3031 		*done = 0;
   3032 		return (0);
   3033 	}
   3034 
   3035 	if (stp->sd_flag & errs) {
   3036 		/*
   3037 		 * Check for errors before going to sleep since the
   3038 		 * caller might not have checked this while holding
   3039 		 * sd_lock.
   3040 		 */
   3041 		error = strgeterr(stp, errs, (flag & STR_PEEK));
   3042 		if (error != 0) {
   3043 			*done = 1;
   3044 			return (error);
   3045 		}
   3046 	}
   3047 
   3048 	/*
   3049 	 * If any module downstream has requested read notification
   3050 	 * by setting SNDMREAD flag using M_SETOPTS, send a message
   3051 	 * down stream.
   3052 	 */
   3053 	if ((flag & READWAIT) && (stp->sd_flag & SNDMREAD)) {
   3054 		mutex_exit(&stp->sd_lock);
   3055 		if (!(mp = allocb_wait(sizeof (ssize_t), BPRI_MED,
   3056 		    (flag & STR_NOSIG), &error))) {
   3057 			mutex_enter(&stp->sd_lock);
   3058 			*done = 1;
   3059 			return (error);
   3060 		}
   3061 		mp->b_datap->db_type = M_READ;
   3062 		rd_count = (ssize_t *)mp->b_wptr;
   3063 		*rd_count = count;
   3064 		mp->b_wptr += sizeof (ssize_t);
   3065 		/*
   3066 		 * Send the number of bytes requested by the
   3067 		 * read as the argument to M_READ.
   3068 		 */
   3069 		stream_willservice(stp);
   3070 		putnext(stp->sd_wrq, mp);
   3071 		stream_runservice(stp);
   3072 		mutex_enter(&stp->sd_lock);
   3073 
   3074 		/*
   3075 		 * If any data arrived due to inline processing
   3076 		 * of putnext(), don't sleep.
   3077 		 */
   3078 		if (_RD(stp->sd_wrq)->q_first != NULL) {
   3079 			*done = 0;
   3080 			return (0);
   3081 		}
   3082 	}
   3083 
   3084 	if (fmode & (FNDELAY|FNONBLOCK)) {
   3085 		if (!(flag & NOINTR))
   3086 			error = EAGAIN;
   3087 		else
   3088 			error = 0;
   3089 		*done = 1;
   3090 		return (error);
   3091 	}
   3092 
   3093 	stp->sd_flag |= slpflg;
   3094 	TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAIT2,
   3095 	    "strwaitq sleeps (2):%p, %X, %lX, %X, %p",
   3096 	    stp, flag, count, fmode, done);
   3097 
   3098 	rval = str_cv_wait(sleepon, &stp->sd_lock, timout, flag & STR_NOSIG);
   3099 	if (rval > 0) {
   3100 		/* EMPTY */
   3101 		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAKE2,
   3102 		    "strwaitq awakes(2):%X, %X, %X, %X, %X",
   3103 		    stp, flag, count, fmode, done);
   3104 	} else if (rval == 0) {
   3105 		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_INTR2,
   3106 		    "strwaitq interrupt #2:%p, %X, %lX, %X, %p",
   3107 		    stp, flag, count, fmode, done);
   3108 		stp->sd_flag &= ~slpflg;
   3109 		cv_broadcast(sleepon);
   3110 		if (!(flag & NOINTR))
   3111 			error = EINTR;
   3112 		else
   3113 			error = 0;
   3114 		*done = 1;
   3115 		return (error);
   3116 	} else {
   3117 		/* timeout */
   3118 		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_TIME,
   3119 		    "strwaitq timeout:%p, %X, %lX, %X, %p",
   3120 		    stp, flag, count, fmode, done);
   3121 		*done = 1;
   3122 		if (!(flag & NOINTR))
   3123 			return (ETIME);
   3124 		else
   3125 			return (0);
   3126 	}
   3127 	/*
   3128 	 * If the caller implements delayed errors (i.e. queued after data)
   3129 	 * we can not check for errors here since data as well as an
   3130 	 * error might have arrived at the stream head. We return to
   3131 	 * have the caller check the read queue before checking for errors.
   3132 	 */
   3133 	if ((stp->sd_flag & errs) && !(flag & STR_DELAYERR)) {
   3134 		error = strgeterr(stp, errs, (flag & STR_PEEK));
   3135 		if (error != 0) {
   3136 			*done = 1;
   3137 			return (error);
   3138 		}
   3139 	}
   3140 	*done = 0;
   3141 	return (0);
   3142 }
   3143 
   3144 /*
   3145  * Perform job control discipline access checks.
   3146  * Return 0 for success and the errno for failure.
   3147  */
   3148 
   3149 #define	cantsend(p, t, sig) \
   3150 	(sigismember(&(p)->p_ignore, sig) || signal_is_blocked((t), sig))
   3151 
   3152 int
   3153 straccess(struct stdata *stp, enum jcaccess mode)
   3154 {
   3155 	extern kcondvar_t lbolt_cv;	/* XXX: should be in a header file */
   3156 	kthread_t *t = curthread;
   3157 	proc_t *p = ttoproc(t);
   3158 	sess_t *sp;
   3159 
   3160 	ASSERT(mutex_owned(&stp->sd_lock));
   3161 
   3162 	if (stp->sd_sidp == NULL || stp->sd_vnode->v_type == VFIFO)
   3163 		return (0);
   3164 
   3165 	mutex_enter(&p->p_lock);		/* protects p_pgidp */
   3166 
   3167 	for (;;) {
   3168 		mutex_enter(&p->p_splock);	/* protects p->p_sessp */
   3169 		sp = p->p_sessp;
   3170 		mutex_enter(&sp->s_lock);	/* protects sp->* */
   3171 
   3172 		/*
   3173 		 * If this is not the calling process's controlling terminal
   3174 		 * or if the calling process is already in the foreground
   3175 		 * then allow access.
   3176 		 */
   3177 		if (sp->s_dev != stp->sd_vnode->v_rdev ||
   3178 		    p->p_pgidp == stp->sd_pgidp) {
   3179 			mutex_exit(&sp->s_lock);
   3180 			mutex_exit(&p->p_splock);
   3181 			mutex_exit(&p->p_lock);
   3182 			return (0);
   3183 		}
   3184 
   3185 		/*
   3186 		 * Check to see if controlling terminal has been deallocated.
   3187 		 */
   3188 		if (sp->s_vp == NULL) {
   3189 			if (!cantsend(p, t, SIGHUP))
   3190 				sigtoproc(p, t, SIGHUP);
   3191 			mutex_exit(&sp->s_lock);
   3192 			mutex_exit(&p->p_splock);
   3193 			mutex_exit(&p->p_lock);
   3194 			return (EIO);
   3195 		}
   3196 
   3197 		mutex_exit(&sp->s_lock);
   3198 		mutex_exit(&p->p_splock);
   3199 
   3200 		if (mode == JCGETP) {
   3201 			mutex_exit(&p->p_lock);
   3202 			return (0);
   3203 		}
   3204 
   3205 		if (mode == JCREAD) {
   3206 			if (p->p_detached || cantsend(p, t, SIGTTIN)) {
   3207 				mutex_exit(&p->p_lock);
   3208 				return (EIO);
   3209 			}
   3210 			mutex_exit(&p->p_lock);
   3211 			mutex_exit(&stp->sd_lock);
   3212 			pgsignal(p->p_pgidp, SIGTTIN);
   3213 			mutex_enter(&stp->sd_lock);
   3214 			mutex_enter(&p->p_lock);
   3215 		} else {  /* mode == JCWRITE or JCSETP */
   3216 			if ((mode == JCWRITE && !(stp->sd_flag & STRTOSTOP)) ||
   3217 			    cantsend(p, t, SIGTTOU)) {
   3218 				mutex_exit(&p->p_lock);
   3219 				return (0);
   3220 			}
   3221 			if (p->p_detached) {
   3222 				mutex_exit(&p->p_lock);
   3223 				return (EIO);
   3224 			}
   3225 			mutex_exit(&p->p_lock);
   3226 			mutex_exit(&stp->sd_lock);
   3227 			pgsignal(p->p_pgidp, SIGTTOU);
   3228 			mutex_enter(&stp->sd_lock);
   3229 			mutex_enter(&p->p_lock);
   3230 		}
   3231 
   3232 		/*
   3233 		 * We call cv_wait_sig_swap() to cause the appropriate
   3234 		 * action for the jobcontrol signal to take place.
   3235 		 * If the signal is being caught, we will take the
   3236 		 * EINTR error return.  Otherwise, the default action
   3237 		 * of causing the process to stop will take place.
   3238 		 * In this case, we rely on the periodic cv_broadcast() on
   3239 		 * &lbolt_cv to wake us up to loop around and test again.
   3240 		 * We can't get here if the signal is ignored or
   3241 		 * if the current thread is blocking the signal.
   3242 		 */
   3243 		mutex_exit(&stp->sd_lock);
   3244 		if (!cv_wait_sig_swap(&lbolt_cv, &p->p_lock)) {
   3245 			mutex_exit(&p->p_lock);
   3246 			mutex_enter(&stp->sd_lock);
   3247 			return (EINTR);
   3248 		}
   3249 		mutex_exit(&p->p_lock);
   3250 		mutex_enter(&stp->sd_lock);
   3251 		mutex_enter(&p->p_lock);
   3252 	}
   3253 }
   3254 
   3255 /*
   3256  * Return size of message of block type (bp->b_datap->db_type)
   3257  */
   3258 size_t
   3259 xmsgsize(mblk_t *bp)
   3260 {
   3261 	unsigned char type;
   3262 	size_t count = 0;
   3263 
   3264 	type = bp->b_datap->db_type;
   3265 
   3266 	for (; bp; bp = bp->b_cont) {
   3267 		if (type != bp->b_datap->db_type)
   3268 			break;
   3269 		ASSERT(bp->b_wptr >= bp->b_rptr);
   3270 		count += bp->b_wptr - bp->b_rptr;
   3271 	}
   3272 	return (count);
   3273 }
   3274 
   3275 /*
   3276  * Allocate a stream head.
   3277  */
   3278 struct stdata *
   3279 shalloc(queue_t *qp)
   3280 {
   3281 	stdata_t *stp;
   3282 
   3283 	stp = kmem_cache_alloc(stream_head_cache, KM_SLEEP);
   3284 
   3285 	stp->sd_wrq = _WR(qp);
   3286 	stp->sd_strtab = NULL;
   3287 	stp->sd_iocid = 0;
   3288 	stp->sd_mate = NULL;
   3289 	stp->sd_freezer = NULL;
   3290 	stp->sd_refcnt = 0;
   3291 	stp->sd_wakeq = 0;
   3292 	stp->sd_anchor = 0;
   3293 	stp->sd_struiowrq = NULL;
   3294 	stp->sd_struiordq = NULL;
   3295 	stp->sd_struiodnak = 0;
   3296 	stp->sd_struionak = NULL;
   3297 	stp->sd_t_audit_data = NULL;
   3298 	stp->sd_rput_opt = 0;
   3299 	stp->sd_wput_opt = 0;
   3300 	stp->sd_read_opt = 0;
   3301 	stp->sd_rprotofunc = strrput_proto;
   3302 	stp->sd_rmiscfunc = strrput_misc;
   3303 	stp->sd_rderrfunc = stp->sd_wrerrfunc = NULL;
   3304 	stp->sd_rputdatafunc = stp->sd_wputdatafunc = NULL;
   3305 	stp->sd_ciputctrl = NULL;
   3306 	stp->sd_nciputctrl = 0;
   3307 	stp->sd_qhead = NULL;
   3308 	stp->sd_qtail = NULL;
   3309 	stp->sd_servid = NULL;
   3310 	stp->sd_nqueues = 0;
   3311 	stp->sd_svcflags = 0;
   3312 	stp->sd_copyflag = 0;
   3313 
   3314 	return (stp);
   3315 }
   3316 
   3317 /*
   3318  * Free a stream head.
   3319  */
   3320 void
   3321 shfree(stdata_t *stp)
   3322 {
   3323 	ASSERT(MUTEX_NOT_HELD(&stp->sd_lock));
   3324 
   3325 	stp->sd_wrq = NULL;
   3326 
   3327 	mutex_enter(&stp->sd_qlock);
   3328 	while (stp->sd_svcflags & STRS_SCHEDULED) {
   3329 		STRSTAT(strwaits);
   3330 		cv_wait(&stp->sd_qcv, &stp->sd_qlock);
   3331 	}
   3332 	mutex_exit(&stp->sd_qlock);
   3333 
   3334 	if (stp->sd_ciputctrl != NULL) {
   3335 		ASSERT(stp->sd_nciputctrl == n_ciputctrl - 1);
   3336 		SUMCHECK_CIPUTCTRL_COUNTS(stp->sd_ciputctrl,
   3337 		    stp->sd_nciputctrl, 0);
   3338 		ASSERT(ciputctrl_cache != NULL);
   3339 		kmem_cache_free(ciputctrl_cache, stp->sd_ciputctrl);
   3340 		stp->sd_ciputctrl = NULL;
   3341 		stp->sd_nciputctrl = 0;
   3342 	}
   3343 	ASSERT(stp->sd_qhead == NULL);
   3344 	ASSERT(stp->sd_qtail == NULL);
   3345 	ASSERT(stp->sd_nqueues == 0);
   3346 	kmem_cache_free(stream_head_cache, stp);
   3347 }
   3348 
   3349 /*
   3350  * Allocate a pair of queues and a syncq for the pair
   3351  */
   3352 queue_t *
   3353 allocq(void)
   3354 {
   3355 	queinfo_t *qip;
   3356 	queue_t *qp, *wqp;
   3357 	syncq_t	*sq;
   3358 
   3359 	qip = kmem_cache_alloc(queue_cache, KM_SLEEP);
   3360 
   3361 	qp = &qip->qu_rqueue;
   3362 	wqp = &qip->qu_wqueue;
   3363 	sq = &qip->qu_syncq;
   3364 
   3365 	qp->q_last	= NULL;
   3366 	qp->q_next	= NULL;
   3367 	qp->q_ptr	= NULL;
   3368 	qp->q_flag	= QUSE | QREADR;
   3369 	qp->q_bandp	= NULL;
   3370 	qp->q_stream	= NULL;
   3371 	qp->q_syncq	= sq;
   3372 	qp->q_nband	= 0;
   3373 	qp->q_nfsrv	= NULL;
   3374 	qp->q_draining	= 0;
   3375 	qp->q_syncqmsgs	= 0;
   3376 	qp->q_spri	= 0;
   3377 	qp->q_qtstamp	= 0;
   3378 	qp->q_sqtstamp	= 0;
   3379 	qp->q_fp	= NULL;
   3380 
   3381 	wqp->q_last	= NULL;
   3382 	wqp->q_next	= NULL;
   3383 	wqp->q_ptr	= NULL;
   3384 	wqp->q_flag	= QUSE;
   3385 	wqp->q_bandp	= NULL;
   3386 	wqp->q_stream	= NULL;
   3387 	wqp->q_syncq	= sq;
   3388 	wqp->q_nband	= 0;
   3389 	wqp->q_nfsrv	= NULL;
   3390 	wqp->q_draining	= 0;
   3391 	wqp->q_syncqmsgs = 0;
   3392 	wqp->q_qtstamp	= 0;
   3393 	wqp->q_sqtstamp	= 0;
   3394 	wqp->q_spri	= 0;
   3395 
   3396 	sq->sq_count	= 0;
   3397 	sq->sq_rmqcount	= 0;
   3398 	sq->sq_flags	= 0;
   3399 	sq->sq_type	= 0;
   3400 	sq->sq_callbflags = 0;
   3401 	sq->sq_cancelid	= 0;
   3402 	sq->sq_ciputctrl = NULL;
   3403 	sq->sq_nciputctrl = 0;
   3404 	sq->sq_needexcl = 0;
   3405 	sq->sq_svcflags = 0;
   3406 
   3407 	return (qp);
   3408 }
   3409 
   3410 /*
   3411  * Free a pair of queues and the "attached" syncq.
   3412  * Discard any messages left on the syncq(s), remove the syncq(s) from the
   3413  * outer perimeter, and free the syncq(s) if they are not the "attached" syncq.
   3414  */
   3415 void
   3416 freeq(queue_t *qp)
   3417 {
   3418 	qband_t *qbp, *nqbp;
   3419 	syncq_t *sq, *outer;
   3420 	queue_t *wqp = _WR(qp);
   3421 
   3422 	ASSERT(qp->q_flag & QREADR);
   3423 
   3424 	/*
   3425 	 * If a previously dispatched taskq job is scheduled to run
   3426 	 * sync_service() or a service routine is scheduled for the
   3427 	 * queues about to be freed, wait here until all service is
   3428 	 * done on the queue and all associated queues and syncqs.
   3429 	 */
   3430 	wait_svc(qp);
   3431 
   3432 	(void) flush_syncq(qp->q_syncq, qp);
   3433 	(void) flush_syncq(wqp->q_syncq, wqp);
   3434 	ASSERT(qp->q_syncqmsgs == 0 && wqp->q_syncqmsgs == 0);
   3435 
   3436 	/*
   3437 	 * Flush the queues before q_next is set to NULL This is needed
   3438 	 * in order to backenable any downstream queue before we go away.
   3439 	 * Note: we are already removed from the stream so that the
   3440 	 * backenabling will not cause any messages to be delivered to our
   3441 	 * put procedures.
   3442 	 */
   3443 	flushq(qp, FLUSHALL);
   3444 	flushq(wqp, FLUSHALL);
   3445 
   3446 	/* Tidy up - removeq only does a half-remove from stream */
   3447 	qp->q_next = wqp->q_next = NULL;
   3448 	ASSERT(!(qp->q_flag & QENAB));
   3449 	ASSERT(!(wqp->q_flag & QENAB));
   3450 
   3451 	outer = qp->q_syncq->sq_outer;
   3452 	if (outer != NULL) {
   3453 		outer_remove(outer, qp->q_syncq);
   3454 		if (wqp->q_syncq != qp->q_syncq)
   3455 			outer_remove(outer, wqp->q_syncq);
   3456 	}
   3457 	/*
   3458 	 * Free any syncqs that are outside what allocq returned.
   3459 	 */
   3460 	if (qp->q_syncq != SQ(qp) && !(qp->q_flag & QPERMOD))
   3461 		free_syncq(qp->q_syncq);
   3462 	if (qp->q_syncq != wqp->q_syncq && wqp->q_syncq != SQ(qp))
   3463 		free_syncq(wqp->q_syncq);
   3464 
   3465 	ASSERT((qp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0);
   3466 	ASSERT((wqp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0);
   3467 	ASSERT(MUTEX_NOT_HELD(QLOCK(qp)));
   3468 	ASSERT(MUTEX_NOT_HELD(QLOCK(wqp)));
   3469 	sq = SQ(qp);
   3470 	ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
   3471 	ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
   3472 	ASSERT(sq->sq_outer == NULL);
   3473 	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
   3474 	ASSERT(sq->sq_callbpend == NULL);
   3475 	ASSERT(sq->sq_needexcl == 0);
   3476 
   3477 	if (sq->sq_ciputctrl != NULL) {
   3478 		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
   3479 		SUMCHECK_CIPUTCTRL_COUNTS(sq->sq_ciputctrl,
   3480 		    sq->sq_nciputctrl, 0);
   3481 		ASSERT(ciputctrl_cache != NULL);
   3482 		kmem_cache_free(ciputctrl_cache, sq->sq_ciputctrl);
   3483 		sq->sq_ciputctrl = NULL;
   3484 		sq->sq_nciputctrl = 0;
   3485 	}
   3486 
   3487 	ASSERT(qp->q_first == NULL && wqp->q_first == NULL);
   3488 	ASSERT(qp->q_count == 0 && wqp->q_count == 0);
   3489 	ASSERT(qp->q_mblkcnt == 0 && wqp->q_mblkcnt == 0);
   3490 
   3491 	qp->q_flag &= ~QUSE;
   3492 	wqp->q_flag &= ~QUSE;
   3493 
   3494 	/* NOTE: Uncomment the assert below once bugid 1159635 is fixed. */
   3495 	/* ASSERT((qp->q_flag & QWANTW) == 0 && (wqp->q_flag & QWANTW) == 0); */
   3496 
   3497 	qbp = qp->q_bandp;
   3498 	while (qbp) {
   3499 		nqbp = qbp->qb_next;
   3500 		freeband(qbp);
   3501 		qbp = nqbp;
   3502 	}
   3503 	qbp = wqp->q_bandp;
   3504 	while (qbp) {
   3505 		nqbp = qbp->qb_next;
   3506 		freeband(qbp);
   3507 		qbp = nqbp;
   3508 	}
   3509 	kmem_cache_free(queue_cache, qp);
   3510 }
   3511 
   3512 /*
   3513  * Allocate a qband structure.
   3514  */
   3515 qband_t *
   3516 allocband(void)
   3517 {
   3518 	qband_t *qbp;
   3519 
   3520 	qbp = kmem_cache_alloc(qband_cache, KM_NOSLEEP);
   3521 	if (qbp == NULL)
   3522 		return (NULL);
   3523 
   3524 	qbp->qb_next	= NULL;
   3525 	qbp->qb_count	= 0;
   3526 	qbp->qb_mblkcnt	= 0;
   3527 	qbp->qb_first	= NULL;
   3528 	qbp->qb_last	= NULL;
   3529 	qbp->qb_flag	= 0;
   3530 
   3531 	return (qbp);
   3532 }
   3533 
   3534 /*
   3535  * Free a qband structure.
   3536  */
   3537 void
   3538 freeband(qband_t *qbp)
   3539 {
   3540 	kmem_cache_free(qband_cache, qbp);
   3541 }
   3542 
   3543 /*
   3544  * Just like putnextctl(9F), except that allocb_wait() is used.
   3545  *
   3546  * Consolidation Private, and of course only callable from the stream head or
   3547  * routines that may block.
   3548  */
   3549 int
   3550 putnextctl_wait(queue_t *q, int type)
   3551 {
   3552 	mblk_t *bp;
   3553 	int error;
   3554 
   3555 	if ((datamsg(type) && (type != M_DELAY)) ||
   3556 	    (bp = allocb_wait(0, BPRI_HI, 0, &error)) == NULL)
   3557 		return (0);
   3558 
   3559 	bp->b_datap->db_type = (unsigned char)type;
   3560 	putnext(q, bp);
   3561 	return (1);
   3562 }
   3563 
   3564 /*
   3565  * Run any possible bufcalls.
   3566  */
   3567 void
   3568 runbufcalls(void)
   3569 {
   3570 	strbufcall_t *bcp;
   3571 
   3572 	mutex_enter(&bcall_monitor);
   3573 	mutex_enter(&strbcall_lock);
   3574 
   3575 	if (strbcalls.bc_head) {
   3576 		size_t count;
   3577 		int nevent;
   3578 
   3579 		/*
   3580 		 * count how many events are on the list
   3581 		 * now so we can check to avoid looping
   3582 		 * in low memory situations
   3583 		 */
   3584 		nevent = 0;
   3585 		for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next)
   3586 			nevent++;
   3587 
   3588 		/*
   3589 		 * get estimate of available memory from kmem_avail().
   3590 		 * awake all bufcall functions waiting for
   3591 		 * memory whose request could be satisfied
   3592 		 * by 'count' memory and let 'em fight for it.
   3593 		 */
   3594 		count = kmem_avail();
   3595 		while ((bcp = strbcalls.bc_head) != NULL && nevent) {
   3596 			STRSTAT(bufcalls);
   3597 			--nevent;
   3598 			if (bcp->bc_size <= count) {
   3599 				bcp->bc_executor = curthread;
   3600 				mutex_exit(&strbcall_lock);
   3601 				(*bcp->bc_func)(bcp->bc_arg);
   3602 				mutex_enter(&strbcall_lock);
   3603 				bcp->bc_executor = NULL;
   3604 				cv_broadcast(&bcall_cv);
   3605 				strbcalls.bc_head = bcp->bc_next;
   3606 				kmem_free(bcp, sizeof (strbufcall_t));
   3607 			} else {
   3608 				/*
   3609 				 * too big, try again later - note
   3610 				 * that nevent was decremented above
   3611 				 * so we won't retry this one on this
   3612 				 * iteration of the loop
   3613 				 */
   3614 				if (bcp->bc_next != NULL) {
   3615 					strbcalls.bc_head = bcp->bc_next;
   3616 					bcp->bc_next = NULL;
   3617 					strbcalls.bc_tail->bc_next = bcp;
   3618 					strbcalls.bc_tail = bcp;
   3619 				}
   3620 			}
   3621 		}
   3622 		if (strbcalls.bc_head == NULL)
   3623 			strbcalls.bc_tail = NULL;
   3624 	}
   3625 
   3626 	mutex_exit(&strbcall_lock);
   3627 	mutex_exit(&bcall_monitor);
   3628 }
   3629 
   3630 
   3631 /*
   3632  * Actually run queue's service routine.
   3633  */
   3634 static void
   3635 runservice(queue_t *q)
   3636 {
   3637 	qband_t *qbp;
   3638 
   3639 	ASSERT(q->q_qinfo->qi_srvp);
   3640 again:
   3641 	entersq(q->q_syncq, SQ_SVC);
   3642 	TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_START,
   3643 	    "runservice starts:%p", q);
   3644 
   3645 	if (!(q->q_flag & QWCLOSE))
   3646 		(*q->q_qinfo->qi_srvp)(q);
   3647 
   3648 	TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_END,
   3649 	    "runservice ends:(%p)", q);
   3650 
   3651 	leavesq(q->q_syncq, SQ_SVC);
   3652 
   3653 	mutex_enter(QLOCK(q));
   3654 	if (q->q_flag & QENAB) {
   3655 		q->q_flag &= ~QENAB;
   3656 		mutex_exit(QLOCK(q));
   3657 		goto again;
   3658 	}
   3659 	q->q_flag &= ~QINSERVICE;
   3660 	q->q_flag &= ~QBACK;
   3661 	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next)
   3662 		qbp->qb_flag &= ~QB_BACK;
   3663 	/*
   3664 	 * Wakeup thread waiting for the service procedure
   3665 	 * to be run (strclose and qdetach).
   3666 	 */
   3667 	cv_broadcast(&q->q_wait);
   3668 
   3669 	mutex_exit(QLOCK(q));
   3670 }
   3671 
   3672 /*
   3673  * Background processing of bufcalls.
   3674  */
   3675 void
   3676 streams_bufcall_service(void)
   3677 {
   3678 	callb_cpr_t	cprinfo;
   3679 
   3680 	CALLB_CPR_INIT(&cprinfo, &strbcall_lock, callb_generic_cpr,
   3681 	    "streams_bufcall_service");
   3682 
   3683 	mutex_enter(&strbcall_lock);
   3684 
   3685 	for (;;) {
   3686 		if (strbcalls.bc_head != NULL && kmem_avail() > 0) {
   3687 			mutex_exit(&strbcall_lock);
   3688 			runbufcalls();
   3689 			mutex_enter(&strbcall_lock);
   3690 		}
   3691 		if (strbcalls.bc_head != NULL) {
   3692 			clock_t wt, tick;
   3693 
   3694 			STRSTAT(bcwaits);
   3695 			/* Wait for memory to become available */
   3696 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
   3697 			tick = SEC_TO_TICK(60);
   3698 			time_to_wait(&wt, tick);
   3699 			(void) cv_timedwait(&memavail_cv, &strbcall_lock, wt);
   3700 			CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock);
   3701 		}
   3702 
   3703 		/* Wait for new work to arrive */
   3704 		if (strbcalls.bc_head == NULL) {
   3705 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
   3706 			cv_wait(&strbcall_cv, &strbcall_lock);
   3707 			CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock);
   3708 		}
   3709 	}
   3710 }
   3711 
   3712 /*
   3713  * Background processing of streams background tasks which failed
   3714  * taskq_dispatch.
   3715  */
   3716 static void
   3717 streams_qbkgrnd_service(void)
   3718 {
   3719 	callb_cpr_t cprinfo;
   3720 	queue_t *q;
   3721 
   3722 	CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr,
   3723 	    "streams_bkgrnd_service");
   3724 
   3725 	mutex_enter(&service_queue);
   3726 
   3727 	for (;;) {
   3728 		/*
   3729 		 * Wait for work to arrive.
   3730 		 */
   3731 		while ((freebs_list == NULL) && (qhead == NULL)) {
   3732 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
   3733 			cv_wait(&services_to_run, &service_queue);
   3734 			CALLB_CPR_SAFE_END(&cprinfo, &service_queue);
   3735 		}
   3736 		/*
   3737 		 * Handle all pending freebs requests to free memory.
   3738 		 */
   3739 		while (freebs_list != NULL) {
   3740 			mblk_t *mp = freebs_list;
   3741 			freebs_list = mp->b_next;
   3742 			mutex_exit(&service_queue);
   3743 			mblk_free(mp);
   3744 			mutex_enter(&service_queue);
   3745 		}
   3746 		/*
   3747 		 * Run pending queues.
   3748 		 */
   3749 		while (qhead != NULL) {
   3750 			DQ(q, qhead, qtail, q_link);
   3751 			ASSERT(q != NULL);
   3752 			mutex_exit(&service_queue);
   3753 			queue_service(q);
   3754 			mutex_enter(&service_queue);
   3755 		}
   3756 		ASSERT(qhead == NULL && qtail == NULL);
   3757 	}
   3758 }
   3759 
   3760 /*
   3761  * Background processing of streams background tasks which failed
   3762  * taskq_dispatch.
   3763  */
   3764 static void
   3765 streams_sqbkgrnd_service(void)
   3766 {
   3767 	callb_cpr_t cprinfo;
   3768 	syncq_t *sq;
   3769 
   3770 	CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr,
   3771 	    "streams_sqbkgrnd_service");
   3772 
   3773 	mutex_enter(&service_queue);
   3774 
   3775 	for (;;) {
   3776 		/*
   3777 		 * Wait for work to arrive.
   3778 		 */
   3779 		while (sqhead == NULL) {
   3780 			CALLB_CPR_SAFE_BEGIN(&cprinfo);
   3781 			cv_wait(&syncqs_to_run, &service_queue);
   3782 			CALLB_CPR_SAFE_END(&cprinfo, &service_queue);
   3783 		}
   3784 
   3785 		/*
   3786 		 * Run pending syncqs.
   3787 		 */
   3788 		while (sqhead != NULL) {
   3789 			DQ(sq, sqhead, sqtail, sq_next);
   3790 			ASSERT(sq != NULL);
   3791 			ASSERT(sq->sq_svcflags & SQ_BGTHREAD);
   3792 			mutex_exit(&service_queue);
   3793 			syncq_service(sq);
   3794 			mutex_enter(&service_queue);
   3795 		}
   3796 	}
   3797 }
   3798 
   3799 /*
   3800  * Disable the syncq and wait for background syncq processing to complete.
   3801  * If the syncq is placed on the sqhead/sqtail queue, try to remove it from the
   3802  * list.
   3803  */
   3804 void
   3805 wait_sq_svc(syncq_t *sq)
   3806 {
   3807 	mutex_enter(SQLOCK(sq));
   3808 	sq->sq_svcflags |= SQ_DISABLED;
   3809 	if (sq->sq_svcflags & SQ_BGTHREAD) {
   3810 		syncq_t *sq_chase;
   3811 		syncq_t *sq_curr;
   3812 		int removed;
   3813 
   3814 		ASSERT(sq->sq_servcount == 1);
   3815 		mutex_enter(&service_queue);
   3816 		RMQ(sq, sqhead, sqtail, sq_next, sq_chase, sq_curr, removed);
   3817 		mutex_exit(&service_queue);
   3818 		if (removed) {
   3819 			sq->sq_svcflags &= ~SQ_BGTHREAD;
   3820 			sq->sq_servcount = 0;
   3821 			STRSTAT(sqremoved);
   3822 			goto done;
   3823 		}
   3824 	}
   3825 	while (sq->sq_servcount != 0) {
   3826 		sq->sq_flags |= SQ_WANTWAKEUP;
   3827 		cv_wait(&sq->sq_wait, SQLOCK(sq));
   3828 	}
   3829 done:
   3830 	mutex_exit(SQLOCK(sq));
   3831 }
   3832 
   3833 /*
   3834  * Put a syncq on the list of syncq's to be serviced by the sqthread.
   3835  * Add the argument to the end of the sqhead list and set the flag
   3836  * indicating this syncq has been enabled.  If it has already been
   3837  * enabled, don't do anything.
   3838  * This routine assumes that SQLOCK is held.
   3839  * NOTE that the lock order is to have the SQLOCK first,
   3840  * so if the service_syncq lock is held, we need to release it
   3841  * before acquiring the SQLOCK (mostly relevant for the background
   3842  * thread, and this seems to be common among the STREAMS global locks).
   3843  * Note that the sq_svcflags are protected by the SQLOCK.
   3844  */
   3845 void
   3846 sqenable(syncq_t *sq)
   3847 {
   3848 	/*
   3849 	 * This is probably not important except for where I believe it
   3850 	 * is being called.  At that point, it should be held (and it
   3851 	 * is a pain to release it just for this routine, so don't do