1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 22 /* All Rights Reserved */ 23 24 25 /* 26 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 27 * Use is subject to license terms. 28 */ 29 30 #include <sys/types.h> 31 #include <sys/sysmacros.h> 32 #include <sys/param.h> 33 #include <sys/errno.h> 34 #include <sys/signal.h> 35 #include <sys/proc.h> 36 #include <sys/conf.h> 37 #include <sys/cred.h> 38 #include <sys/user.h> 39 #include <sys/vnode.h> 40 #include <sys/file.h> 41 #include <sys/session.h> 42 #include <sys/stream.h> 43 #include <sys/strsubr.h> 44 #include <sys/stropts.h> 45 #include <sys/poll.h> 46 #include <sys/systm.h> 47 #include <sys/cpuvar.h> 48 #include <sys/uio.h> 49 #include <sys/cmn_err.h> 50 #include <sys/priocntl.h> 51 #include <sys/procset.h> 52 #include <sys/vmem.h> 53 #include <sys/bitmap.h> 54 #include <sys/kmem.h> 55 #include <sys/siginfo.h> 56 #include <sys/vtrace.h> 57 #include <sys/callb.h> 58 #include <sys/debug.h> 59 #include <sys/modctl.h> 60 #include <sys/vmsystm.h> 61 #include <vm/page.h> 62 #include <sys/atomic.h> 63 #include <sys/suntpi.h> 64 #include <sys/strlog.h> 65 #include <sys/promif.h> 66 #include <sys/project.h> 67 #include <sys/vm.h> 68 #include <sys/taskq.h> 69 #include <sys/sunddi.h> 70 #include <sys/sunldi_impl.h> 71 #include <sys/strsun.h> 72 #include <sys/isa_defs.h> 73 #include <sys/multidata.h> 74 #include <sys/pattr.h> 75 #include <sys/strft.h> 76 #include <sys/fs/snode.h> 77 #include <sys/zone.h> 78 #include <sys/open.h> 79 #include <sys/sunldi.h> 80 #include <sys/sad.h> 81 #include <sys/netstack.h> 82 83 #define O_SAMESTR(q) (((q)->q_next) && \ 84 (((q)->q_flag & QREADR) == ((q)->q_next->q_flag & QREADR))) 85 86 /* 87 * WARNING: 88 * The variables and routines in this file are private, belonging 89 * to the STREAMS subsystem. These should not be used by modules 90 * or drivers. Compatibility will not be guaranteed. 91 */ 92 93 /* 94 * Id value used to distinguish between different multiplexor links. 95 */ 96 static int32_t lnk_id = 0; 97 98 #define STREAMS_LOPRI MINCLSYSPRI 99 static pri_t streams_lopri = STREAMS_LOPRI; 100 101 #define STRSTAT(x) (str_statistics.x.value.ui64++) 102 typedef struct str_stat { 103 kstat_named_t sqenables; 104 kstat_named_t stenables; 105 kstat_named_t syncqservice; 106 kstat_named_t freebs; 107 kstat_named_t qwr_outer; 108 kstat_named_t rservice; 109 kstat_named_t strwaits; 110 kstat_named_t taskqfails; 111 kstat_named_t bufcalls; 112 kstat_named_t qhelps; 113 kstat_named_t qremoved; 114 kstat_named_t sqremoved; 115 kstat_named_t bcwaits; 116 kstat_named_t sqtoomany; 117 } str_stat_t; 118 119 static str_stat_t str_statistics = { 120 { "sqenables", KSTAT_DATA_UINT64 }, 121 { "stenables", KSTAT_DATA_UINT64 }, 122 { "syncqservice", KSTAT_DATA_UINT64 }, 123 { "freebs", KSTAT_DATA_UINT64 }, 124 { "qwr_outer", KSTAT_DATA_UINT64 }, 125 { "rservice", KSTAT_DATA_UINT64 }, 126 { "strwaits", KSTAT_DATA_UINT64 }, 127 { "taskqfails", KSTAT_DATA_UINT64 }, 128 { "bufcalls", KSTAT_DATA_UINT64 }, 129 { "qhelps", KSTAT_DATA_UINT64 }, 130 { "qremoved", KSTAT_DATA_UINT64 }, 131 { "sqremoved", KSTAT_DATA_UINT64 }, 132 { "bcwaits", KSTAT_DATA_UINT64 }, 133 { "sqtoomany", KSTAT_DATA_UINT64 }, 134 }; 135 136 static kstat_t *str_kstat; 137 138 /* 139 * qrunflag was used previously to control background scheduling of queues. It 140 * is not used anymore, but kept here in case some module still wants to access 141 * it via qready() and setqsched macros. 142 */ 143 char qrunflag; /* Unused */ 144 145 /* 146 * Most of the streams scheduling is done via task queues. Task queues may fail 147 * for non-sleep dispatches, so there are two backup threads servicing failed 148 * requests for queues and syncqs. Both of these threads also service failed 149 * dispatches freebs requests. Queues are put in the list specified by `qhead' 150 * and `qtail' pointers, syncqs use `sqhead' and `sqtail' pointers and freebs 151 * requests are put into `freebs_list' which has no tail pointer. All three 152 * lists are protected by a single `service_queue' lock and use 153 * `services_to_run' condition variable for signaling background threads. Use of 154 * a single lock should not be a problem because it is only used under heavy 155 * loads when task queues start to fail and at that time it may be a good idea 156 * to throttle scheduling requests. 157 * 158 * NOTE: queues and syncqs should be scheduled by two separate threads because 159 * queue servicing may be blocked waiting for a syncq which may be also 160 * scheduled for background execution. This may create a deadlock when only one 161 * thread is used for both. 162 */ 163 164 static taskq_t *streams_taskq; /* Used for most STREAMS scheduling */ 165 166 static kmutex_t service_queue; /* protects all of servicing vars */ 167 static kcondvar_t services_to_run; /* wake up background service thread */ 168 static kcondvar_t syncqs_to_run; /* wake up background service thread */ 169 170 /* 171 * List of queues scheduled for background processing due to lack of resources 172 * in the task queues. Protected by service_queue lock; 173 */ 174 static struct queue *qhead; 175 static struct queue *qtail; 176 177 /* 178 * Same list for syncqs 179 */ 180 static syncq_t *sqhead; 181 static syncq_t *sqtail; 182 183 static mblk_t *freebs_list; /* list of buffers to free */ 184 185 /* 186 * Backup threads for servicing queues and syncqs 187 */ 188 kthread_t *streams_qbkgrnd_thread; 189 kthread_t *streams_sqbkgrnd_thread; 190 191 /* 192 * Bufcalls related variables. 193 */ 194 struct bclist strbcalls; /* list of waiting bufcalls */ 195 kmutex_t strbcall_lock; /* protects bufcall list (strbcalls) */ 196 kcondvar_t strbcall_cv; /* Signaling when a bufcall is added */ 197 kmutex_t bcall_monitor; /* sleep/wakeup style monitor */ 198 kcondvar_t bcall_cv; /* wait 'till executing bufcall completes */ 199 kthread_t *bc_bkgrnd_thread; /* Thread to service bufcall requests */ 200 201 kmutex_t strresources; /* protects global resources */ 202 kmutex_t muxifier; /* single-threads multiplexor creation */ 203 204 static void *str_stack_init(netstackid_t stackid, netstack_t *ns); 205 static void str_stack_shutdown(netstackid_t stackid, void *arg); 206 static void str_stack_fini(netstackid_t stackid, void *arg); 207 208 extern void time_to_wait(clock_t *, clock_t); 209 210 /* 211 * run_queues is no longer used, but is kept in case some 3rd party 212 * module/driver decides to use it. 213 */ 214 int run_queues = 0; 215 216 /* 217 * sq_max_size is the depth of the syncq (in number of messages) before 218 * qfill_syncq() starts QFULL'ing destination queues. As its primary 219 * consumer - IP is no longer D_MTPERMOD, but there may be other 220 * modules/drivers depend on this syncq flow control, we prefer to 221 * choose a large number as the default value. For potential 222 * performance gain, this value is tunable in /etc/system. 223 */ 224 int sq_max_size = 10000; 225 226 /* 227 * The number of ciputctrl structures per syncq and stream we create when 228 * needed. 229 */ 230 int n_ciputctrl; 231 int max_n_ciputctrl = 16; 232 /* 233 * If n_ciputctrl is < min_n_ciputctrl don't even create ciputctrl_cache. 234 */ 235 int min_n_ciputctrl = 2; 236 237 /* 238 * Per-driver/module syncqs 239 * ======================== 240 * 241 * For drivers/modules that use PERMOD or outer syncqs we keep a list of 242 * perdm structures, new entries being added (and new syncqs allocated) when 243 * setq() encounters a module/driver with a streamtab that it hasn't seen 244 * before. 245 * The reason for this mechanism is that some modules and drivers share a 246 * common streamtab and it is necessary for those modules and drivers to also 247 * share a common PERMOD syncq. 248 * 249 * perdm_list --> dm_str == streamtab_1 250 * dm_sq == syncq_1 251 * dm_ref 252 * dm_next --> dm_str == streamtab_2 253 * dm_sq == syncq_2 254 * dm_ref 255 * dm_next --> ... NULL 256 * 257 * The dm_ref field is incremented for each new driver/module that takes 258 * a reference to the perdm structure and hence shares the syncq. 259 * References are held in the fmodsw_impl_t structure for each STREAMS module 260 * or the dev_impl array (indexed by device major number) for each driver. 261 * 262 * perdm_list -> [dm_ref == 1] -> [dm_ref == 2] -> [dm_ref == 1] -> NULL 263 * ^ ^ ^ ^ 264 * | ______________/ | | 265 * | / | | 266 * dev_impl: ...|x|y|... module A module B 267 * 268 * When a module/driver is unloaded the reference count is decremented and, 269 * when it falls to zero, the perdm structure is removed from the list and 270 * the syncq is freed (see rele_dm()). 271 */ 272 perdm_t *perdm_list = NULL; 273 static krwlock_t perdm_rwlock; 274 cdevsw_impl_t *devimpl; 275 276 extern struct qinit strdata; 277 extern struct qinit stwdata; 278 279 static void runservice(queue_t *); 280 static void streams_bufcall_service(void); 281 static void streams_qbkgrnd_service(void); 282 static void streams_sqbkgrnd_service(void); 283 static syncq_t *new_syncq(void); 284 static void free_syncq(syncq_t *); 285 static void outer_insert(syncq_t *, syncq_t *); 286 static void outer_remove(syncq_t *, syncq_t *); 287 static void write_now(syncq_t *); 288 static void clr_qfull(queue_t *); 289 static void runbufcalls(void); 290 static void sqenable(syncq_t *); 291 static void sqfill_events(syncq_t *, queue_t *, mblk_t *, void (*)()); 292 static void wait_q_syncq(queue_t *); 293 static void backenable_insertedq(queue_t *); 294 295 static void queue_service(queue_t *); 296 static void stream_service(stdata_t *); 297 static void syncq_service(syncq_t *); 298 static void qwriter_outer_service(syncq_t *); 299 static void mblk_free(mblk_t *); 300 #ifdef DEBUG 301 static int qprocsareon(queue_t *); 302 #endif 303 304 static void set_nfsrv_ptr(queue_t *, queue_t *, queue_t *, queue_t *); 305 static void reset_nfsrv_ptr(queue_t *, queue_t *); 306 void set_qfull(queue_t *); 307 308 static void sq_run_events(syncq_t *); 309 static int propagate_syncq(queue_t *); 310 311 static void blocksq(syncq_t *, ushort_t, int); 312 static void unblocksq(syncq_t *, ushort_t, int); 313 static int dropsq(syncq_t *, uint16_t); 314 static void emptysq(syncq_t *); 315 static sqlist_t *sqlist_alloc(struct stdata *, int); 316 static void sqlist_free(sqlist_t *); 317 static sqlist_t *sqlist_build(queue_t *, struct stdata *, boolean_t); 318 static void sqlist_insert(sqlist_t *, syncq_t *); 319 static void sqlist_insertall(sqlist_t *, queue_t *); 320 321 static void strsetuio(stdata_t *); 322 323 struct kmem_cache *stream_head_cache; 324 struct kmem_cache *queue_cache; 325 struct kmem_cache *syncq_cache; 326 struct kmem_cache *qband_cache; 327 struct kmem_cache *linkinfo_cache; 328 struct kmem_cache *ciputctrl_cache = NULL; 329 330 static linkinfo_t *linkinfo_list; 331 332 /* Global esballoc throttling queue */ 333 static esb_queue_t system_esbq; 334 335 /* 336 * esballoc tunable parameters. 337 */ 338 int esbq_max_qlen = 0x16; /* throttled queue length */ 339 clock_t esbq_timeout = 0x8; /* timeout to process esb queue */ 340 341 /* 342 * Routines to handle esballoc queueing. 343 */ 344 static void esballoc_process_queue(esb_queue_t *); 345 static void esballoc_enqueue_mblk(mblk_t *); 346 static void esballoc_timer(void *); 347 static void esballoc_set_timer(esb_queue_t *, clock_t); 348 static void esballoc_mblk_free(mblk_t *); 349 350 /* 351 * Qinit structure and Module_info structures 352 * for passthru read and write queues 353 */ 354 355 static void pass_wput(queue_t *, mblk_t *); 356 static queue_t *link_addpassthru(stdata_t *); 357 static void link_rempassthru(queue_t *); 358 359 struct module_info passthru_info = { 360 0, 361 "passthru", 362 0, 363 INFPSZ, 364 STRHIGH, 365 STRLOW 366 }; 367 368 struct qinit passthru_rinit = { 369 (int (*)())putnext, 370 NULL, 371 NULL, 372 NULL, 373 NULL, 374 &passthru_info, 375 NULL 376 }; 377 378 struct qinit passthru_winit = { 379 (int (*)()) pass_wput, 380 NULL, 381 NULL, 382 NULL, 383 NULL, 384 &passthru_info, 385 NULL 386 }; 387 388 /* 389 * Special form of assertion: verify that X implies Y i.e. when X is true Y 390 * should also be true. 391 */ 392 #define IMPLY(X, Y) ASSERT(!(X) || (Y)) 393 394 /* 395 * Logical equivalence. Verify that both X and Y are either TRUE or FALSE. 396 */ 397 #define EQUIV(X, Y) { IMPLY(X, Y); IMPLY(Y, X); } 398 399 /* 400 * Verify correctness of list head/tail pointers. 401 */ 402 #define LISTCHECK(head, tail, link) { \ 403 EQUIV(head, tail); \ 404 IMPLY(tail != NULL, tail->link == NULL); \ 405 } 406 407 /* 408 * Enqueue a list element `el' in the end of a list denoted by `head' and `tail' 409 * using a `link' field. 410 */ 411 #define ENQUEUE(el, head, tail, link) { \ 412 ASSERT(el->link == NULL); \ 413 LISTCHECK(head, tail, link); \ 414 if (head == NULL) \ 415 head = el; \ 416 else \ 417 tail->link = el; \ 418 tail = el; \ 419 } 420 421 /* 422 * Dequeue the first element of the list denoted by `head' and `tail' pointers 423 * using a `link' field and put result into `el'. 424 */ 425 #define DQ(el, head, tail, link) { \ 426 LISTCHECK(head, tail, link); \ 427 el = head; \ 428 if (head != NULL) { \ 429 head = head->link; \ 430 if (head == NULL) \ 431 tail = NULL; \ 432 el->link = NULL; \ 433 } \ 434 } 435 436 /* 437 * Remove `el' from the list using `chase' and `curr' pointers and return result 438 * in `succeed'. 439 */ 440 #define RMQ(el, head, tail, link, chase, curr, succeed) { \ 441 LISTCHECK(head, tail, link); \ 442 chase = NULL; \ 443 succeed = 0; \ 444 for (curr = head; (curr != el) && (curr != NULL); curr = curr->link) \ 445 chase = curr; \ 446 if (curr != NULL) { \ 447 succeed = 1; \ 448 ASSERT(curr == el); \ 449 if (chase != NULL) \ 450 chase->link = curr->link; \ 451 else \ 452 head = curr->link; \ 453 curr->link = NULL; \ 454 if (curr == tail) \ 455 tail = chase; \ 456 } \ 457 LISTCHECK(head, tail, link); \ 458 } 459 460 /* Handling of delayed messages on the inner syncq. */ 461 462 /* 463 * DEBUG versions should use function versions (to simplify tracing) and 464 * non-DEBUG kernels should use macro versions. 465 */ 466 467 /* 468 * Put a queue on the syncq list of queues. 469 * Assumes SQLOCK held. 470 */ 471 #define SQPUT_Q(sq, qp) \ 472 { \ 473 ASSERT(MUTEX_HELD(SQLOCK(sq))); \ 474 if (!(qp->q_sqflags & Q_SQQUEUED)) { \ 475 /* The queue should not be linked anywhere */ \ 476 ASSERT((qp->q_sqprev == NULL) && (qp->q_sqnext == NULL)); \ 477 /* Head and tail may only be NULL simultaneously */ \ 478 EQUIV(sq->sq_head, sq->sq_tail); \ 479 /* Queue may be only enqueued on its syncq */ \ 480 ASSERT(sq == qp->q_syncq); \ 481 /* Check the correctness of SQ_MESSAGES flag */ \ 482 EQUIV(sq->sq_head, (sq->sq_flags & SQ_MESSAGES)); \ 483 /* Sanity check first/last elements of the list */ \ 484 IMPLY(sq->sq_head != NULL, sq->sq_head->q_sqprev == NULL);\ 485 IMPLY(sq->sq_tail != NULL, sq->sq_tail->q_sqnext == NULL);\ 486 /* \ 487 * Sanity check of priority field: empty queue should \ 488 * have zero priority \ 489 * and nqueues equal to zero. \ 490 */ \ 491 IMPLY(sq->sq_head == NULL, sq->sq_pri == 0); \ 492 /* Sanity check of sq_nqueues field */ \ 493 EQUIV(sq->sq_head, sq->sq_nqueues); \ 494 if (sq->sq_head == NULL) { \ 495 sq->sq_head = sq->sq_tail = qp; \ 496 sq->sq_flags |= SQ_MESSAGES; \ 497 } else if (qp->q_spri == 0) { \ 498 qp->q_sqprev = sq->sq_tail; \ 499 sq->sq_tail->q_sqnext = qp; \ 500 sq->sq_tail = qp; \ 501 } else { \ 502 /* \ 503 * Put this queue in priority order: higher \ 504 * priority gets closer to the head. \ 505 */ \ 506 queue_t **qpp = &sq->sq_tail; \ 507 queue_t *qnext = NULL; \ 508 \ 509 while (*qpp != NULL && qp->q_spri > (*qpp)->q_spri) { \ 510 qnext = *qpp; \ 511 qpp = &(*qpp)->q_sqprev; \ 512 } \ 513 qp->q_sqnext = qnext; \ 514 qp->q_sqprev = *qpp; \ 515 if (*qpp != NULL) { \ 516 (*qpp)->q_sqnext = qp; \ 517 } else { \ 518 sq->sq_head = qp; \ 519 sq->sq_pri = sq->sq_head->q_spri; \ 520 } \ 521 *qpp = qp; \ 522 } \ 523 qp->q_sqflags |= Q_SQQUEUED; \ 524 qp->q_sqtstamp = lbolt; \ 525 sq->sq_nqueues++; \ 526 } \ 527 } 528 529 /* 530 * Remove a queue from the syncq list 531 * Assumes SQLOCK held. 532 */ 533 #define SQRM_Q(sq, qp) \ 534 { \ 535 ASSERT(MUTEX_HELD(SQLOCK(sq))); \ 536 ASSERT(qp->q_sqflags & Q_SQQUEUED); \ 537 ASSERT(sq->sq_head != NULL && sq->sq_tail != NULL); \ 538 ASSERT((sq->sq_flags & SQ_MESSAGES) != 0); \ 539 /* Check that the queue is actually in the list */ \ 540 ASSERT(qp->q_sqnext != NULL || sq->sq_tail == qp); \ 541 ASSERT(qp->q_sqprev != NULL || sq->sq_head == qp); \ 542 ASSERT(sq->sq_nqueues != 0); \ 543 if (qp->q_sqprev == NULL) { \ 544 /* First queue on list, make head q_sqnext */ \ 545 sq->sq_head = qp->q_sqnext; \ 546 } else { \ 547 /* Make prev->next == next */ \ 548 qp->q_sqprev->q_sqnext = qp->q_sqnext; \ 549 } \ 550 if (qp->q_sqnext == NULL) { \ 551 /* Last queue on list, make tail sqprev */ \ 552 sq->sq_tail = qp->q_sqprev; \ 553 } else { \ 554 /* Make next->prev == prev */ \ 555 qp->q_sqnext->q_sqprev = qp->q_sqprev; \ 556 } \ 557 /* clear out references on this queue */ \ 558 qp->q_sqprev = qp->q_sqnext = NULL; \ 559 qp->q_sqflags &= ~Q_SQQUEUED; \ 560 /* If there is nothing queued, clear SQ_MESSAGES */ \ 561 if (sq->sq_head != NULL) { \ 562 sq->sq_pri = sq->sq_head->q_spri; \ 563 } else { \ 564 sq->sq_flags &= ~SQ_MESSAGES; \ 565 sq->sq_pri = 0; \ 566 } \ 567 sq->sq_nqueues--; \ 568 ASSERT(sq->sq_head != NULL || sq->sq_evhead != NULL || \ 569 (sq->sq_flags & SQ_QUEUED) == 0); \ 570 } 571 572 /* Hide the definition from the header file. */ 573 #ifdef SQPUT_MP 574 #undef SQPUT_MP 575 #endif 576 577 /* 578 * Put a message on the queue syncq. 579 * Assumes QLOCK held. 580 */ 581 #define SQPUT_MP(qp, mp) \ 582 { \ 583 ASSERT(MUTEX_HELD(QLOCK(qp))); \ 584 ASSERT(qp->q_sqhead == NULL || \ 585 (qp->q_sqtail != NULL && \ 586 qp->q_sqtail->b_next == NULL)); \ 587 qp->q_syncqmsgs++; \ 588 ASSERT(qp->q_syncqmsgs != 0); /* Wraparound */ \ 589 if (qp->q_sqhead == NULL) { \ 590 qp->q_sqhead = qp->q_sqtail = mp; \ 591 } else { \ 592 qp->q_sqtail->b_next = mp; \ 593 qp->q_sqtail = mp; \ 594 } \ 595 ASSERT(qp->q_syncqmsgs > 0); \ 596 set_qfull(qp); \ 597 } 598 599 #define SQ_PUTCOUNT_SETFAST_LOCKED(sq) { \ 600 ASSERT(MUTEX_HELD(SQLOCK(sq))); \ 601 if ((sq)->sq_ciputctrl != NULL) { \ 602 int i; \ 603 int nlocks = (sq)->sq_nciputctrl; \ 604 ciputctrl_t *cip = (sq)->sq_ciputctrl; \ 605 ASSERT((sq)->sq_type & SQ_CIPUT); \ 606 for (i = 0; i <= nlocks; i++) { \ 607 ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \ 608 cip[i].ciputctrl_count |= SQ_FASTPUT; \ 609 } \ 610 } \ 611 } 612 613 614 #define SQ_PUTCOUNT_CLRFAST_LOCKED(sq) { \ 615 ASSERT(MUTEX_HELD(SQLOCK(sq))); \ 616 if ((sq)->sq_ciputctrl != NULL) { \ 617 int i; \ 618 int nlocks = (sq)->sq_nciputctrl; \ 619 ciputctrl_t *cip = (sq)->sq_ciputctrl; \ 620 ASSERT((sq)->sq_type & SQ_CIPUT); \ 621 for (i = 0; i <= nlocks; i++) { \ 622 ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \ 623 cip[i].ciputctrl_count &= ~SQ_FASTPUT; \ 624 } \ 625 } \ 626 } 627 628 /* 629 * Run service procedures for all queues in the stream head. 630 */ 631 #define STR_SERVICE(stp, q) { \ 632 ASSERT(MUTEX_HELD(&stp->sd_qlock)); \ 633 while (stp->sd_qhead != NULL) { \ 634 DQ(q, stp->sd_qhead, stp->sd_qtail, q_link); \ 635 ASSERT(stp->sd_nqueues > 0); \ 636 stp->sd_nqueues--; \ 637 ASSERT(!(q->q_flag & QINSERVICE)); \ 638 mutex_exit(&stp->sd_qlock); \ 639 queue_service(q); \ 640 mutex_enter(&stp->sd_qlock); \ 641 } \ 642 ASSERT(stp->sd_nqueues == 0); \ 643 ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL)); \ 644 } 645 646 /* 647 * Constructor/destructor routines for the stream head cache 648 */ 649 /* ARGSUSED */ 650 static int 651 stream_head_constructor(void *buf, void *cdrarg, int kmflags) 652 { 653 stdata_t *stp = buf; 654 655 mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL); 656 mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL); 657 mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL); 658 cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL); 659 cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL); 660 cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL); 661 cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL); 662 cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL); 663 stp->sd_wrq = NULL; 664 665 return (0); 666 } 667 668 /* ARGSUSED */ 669 static void 670 stream_head_destructor(void *buf, void *cdrarg) 671 { 672 stdata_t *stp = buf; 673 674 mutex_destroy(&stp->sd_lock); 675 mutex_destroy(&stp->sd_reflock); 676 mutex_destroy(&stp->sd_qlock); 677 cv_destroy(&stp->sd_monitor); 678 cv_destroy(&stp->sd_iocmonitor); 679 cv_destroy(&stp->sd_refmonitor); 680 cv_destroy(&stp->sd_qcv); 681 cv_destroy(&stp->sd_zcopy_wait); 682 } 683 684 /* 685 * Constructor/destructor routines for the queue cache 686 */ 687 /* ARGSUSED */ 688 static int 689 queue_constructor(void *buf, void *cdrarg, int kmflags) 690 { 691 queinfo_t *qip = buf; 692 queue_t *qp = &qip->qu_rqueue; 693 queue_t *wqp = &qip->qu_wqueue; 694 syncq_t *sq = &qip->qu_syncq; 695 696 qp->q_first = NULL; 697 qp->q_link = NULL; 698 qp->q_count = 0; 699 qp->q_mblkcnt = 0; 700 qp->q_sqhead = NULL; 701 qp->q_sqtail = NULL; 702 qp->q_sqnext = NULL; 703 qp->q_sqprev = NULL; 704 qp->q_sqflags = 0; 705 qp->q_rwcnt = 0; 706 qp->q_spri = 0; 707 708 mutex_init(QLOCK(qp), NULL, MUTEX_DEFAULT, NULL); 709 cv_init(&qp->q_wait, NULL, CV_DEFAULT, NULL); 710 711 wqp->q_first = NULL; 712 wqp->q_link = NULL; 713 wqp->q_count = 0; 714 wqp->q_mblkcnt = 0; 715 wqp->q_sqhead = NULL; 716 wqp->q_sqtail = NULL; 717 wqp->q_sqnext = NULL; 718 wqp->q_sqprev = NULL; 719 wqp->q_sqflags = 0; 720 wqp->q_rwcnt = 0; 721 wqp->q_spri = 0; 722 723 mutex_init(QLOCK(wqp), NULL, MUTEX_DEFAULT, NULL); 724 cv_init(&wqp->q_wait, NULL, CV_DEFAULT, NULL); 725 726 sq->sq_head = NULL; 727 sq->sq_tail = NULL; 728 sq->sq_evhead = NULL; 729 sq->sq_evtail = NULL; 730 sq->sq_callbpend = NULL; 731 sq->sq_outer = NULL; 732 sq->sq_onext = NULL; 733 sq->sq_oprev = NULL; 734 sq->sq_next = NULL; 735 sq->sq_svcflags = 0; 736 sq->sq_servcount = 0; 737 sq->sq_needexcl = 0; 738 sq->sq_nqueues = 0; 739 sq->sq_pri = 0; 740 741 mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL); 742 cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL); 743 cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL); 744 745 return (0); 746 } 747 748 /* ARGSUSED */ 749 static void 750 queue_destructor(void *buf, void *cdrarg) 751 { 752 queinfo_t *qip = buf; 753 queue_t *qp = &qip->qu_rqueue; 754 queue_t *wqp = &qip->qu_wqueue; 755 syncq_t *sq = &qip->qu_syncq; 756 757 ASSERT(qp->q_sqhead == NULL); 758 ASSERT(wqp->q_sqhead == NULL); 759 ASSERT(qp->q_sqnext == NULL); 760 ASSERT(wqp->q_sqnext == NULL); 761 ASSERT(qp->q_rwcnt == 0); 762 ASSERT(wqp->q_rwcnt == 0); 763 764 mutex_destroy(&qp->q_lock); 765 cv_destroy(&qp->q_wait); 766 767 mutex_destroy(&wqp->q_lock); 768 cv_destroy(&wqp->q_wait); 769 770 mutex_destroy(&sq->sq_lock); 771 cv_destroy(&sq->sq_wait); 772 cv_destroy(&sq->sq_exitwait); 773 } 774 775 /* 776 * Constructor/destructor routines for the syncq cache 777 */ 778 /* ARGSUSED */ 779 static int 780 syncq_constructor(void *buf, void *cdrarg, int kmflags) 781 { 782 syncq_t *sq = buf; 783 784 bzero(buf, sizeof (syncq_t)); 785 786 mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL); 787 cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL); 788 cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL); 789 790 return (0); 791 } 792 793 /* ARGSUSED */ 794 static void 795 syncq_destructor(void *buf, void *cdrarg) 796 { 797 syncq_t *sq = buf; 798 799 ASSERT(sq->sq_head == NULL); 800 ASSERT(sq->sq_tail == NULL); 801 ASSERT(sq->sq_evhead == NULL); 802 ASSERT(sq->sq_evtail == NULL); 803 ASSERT(sq->sq_callbpend == NULL); 804 ASSERT(sq->sq_callbflags == 0); 805 ASSERT(sq->sq_outer == NULL); 806 ASSERT(sq->sq_onext == NULL); 807 ASSERT(sq->sq_oprev == NULL); 808 ASSERT(sq->sq_next == NULL); 809 ASSERT(sq->sq_needexcl == 0); 810 ASSERT(sq->sq_svcflags == 0); 811 ASSERT(sq->sq_servcount == 0); 812 ASSERT(sq->sq_nqueues == 0); 813 ASSERT(sq->sq_pri == 0); 814 ASSERT(sq->sq_count == 0); 815 ASSERT(sq->sq_rmqcount == 0); 816 ASSERT(sq->sq_cancelid == 0); 817 ASSERT(sq->sq_ciputctrl == NULL); 818 ASSERT(sq->sq_nciputctrl == 0); 819 ASSERT(sq->sq_type == 0); 820 ASSERT(sq->sq_flags == 0); 821 822 mutex_destroy(&sq->sq_lock); 823 cv_destroy(&sq->sq_wait); 824 cv_destroy(&sq->sq_exitwait); 825 } 826 827 /* ARGSUSED */ 828 static int 829 ciputctrl_constructor(void *buf, void *cdrarg, int kmflags) 830 { 831 ciputctrl_t *cip = buf; 832 int i; 833 834 for (i = 0; i < n_ciputctrl; i++) { 835 cip[i].ciputctrl_count = SQ_FASTPUT; 836 mutex_init(&cip[i].ciputctrl_lock, NULL, MUTEX_DEFAULT, NULL); 837 } 838 839 return (0); 840 } 841 842 /* ARGSUSED */ 843 static void 844 ciputctrl_destructor(void *buf, void *cdrarg) 845 { 846 ciputctrl_t *cip = buf; 847 int i; 848 849 for (i = 0; i < n_ciputctrl; i++) { 850 ASSERT(cip[i].ciputctrl_count & SQ_FASTPUT); 851 mutex_destroy(&cip[i].ciputctrl_lock); 852 } 853 } 854 855 /* 856 * Init routine run from main at boot time. 857 */ 858 void 859 strinit(void) 860 { 861 int ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 862 863 stream_head_cache = kmem_cache_create("stream_head_cache", 864 sizeof (stdata_t), 0, 865 stream_head_constructor, stream_head_destructor, NULL, 866 NULL, NULL, 0); 867 868 queue_cache = kmem_cache_create("queue_cache", sizeof (queinfo_t), 0, 869 queue_constructor, queue_destructor, NULL, NULL, NULL, 0); 870 871 syncq_cache = kmem_cache_create("syncq_cache", sizeof (syncq_t), 0, 872 syncq_constructor, syncq_destructor, NULL, NULL, NULL, 0); 873 874 qband_cache = kmem_cache_create("qband_cache", 875 sizeof (qband_t), 0, NULL, NULL, NULL, NULL, NULL, 0); 876 877 linkinfo_cache = kmem_cache_create("linkinfo_cache", 878 sizeof (linkinfo_t), 0, NULL, NULL, NULL, NULL, NULL, 0); 879 880 n_ciputctrl = ncpus; 881 n_ciputctrl = 1 << highbit(n_ciputctrl - 1); 882 ASSERT(n_ciputctrl >= 1); 883 n_ciputctrl = MIN(n_ciputctrl, max_n_ciputctrl); 884 if (n_ciputctrl >= min_n_ciputctrl) { 885 ciputctrl_cache = kmem_cache_create("ciputctrl_cache", 886 sizeof (ciputctrl_t) * n_ciputctrl, 887 sizeof (ciputctrl_t), ciputctrl_constructor, 888 ciputctrl_destructor, NULL, NULL, NULL, 0); 889 } 890 891 streams_taskq = system_taskq; 892 893 if (streams_taskq == NULL) 894 panic("strinit: no memory for streams taskq!"); 895 896 bc_bkgrnd_thread = thread_create(NULL, 0, 897 streams_bufcall_service, NULL, 0, &p0, TS_RUN, streams_lopri); 898 899 streams_qbkgrnd_thread = thread_create(NULL, 0, 900 streams_qbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri); 901 902 streams_sqbkgrnd_thread = thread_create(NULL, 0, 903 streams_sqbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri); 904 905 /* 906 * Create STREAMS kstats. 907 */ 908 str_kstat = kstat_create("streams", 0, "strstat", 909 "net", KSTAT_TYPE_NAMED, 910 sizeof (str_statistics) / sizeof (kstat_named_t), 911 KSTAT_FLAG_VIRTUAL); 912 913 if (str_kstat != NULL) { 914 str_kstat->ks_data = &str_statistics; 915 kstat_install(str_kstat); 916 } 917 918 /* 919 * TPI support routine initialisation. 920 */ 921 tpi_init(); 922 923 /* 924 * Handle to have autopush and persistent link information per 925 * zone. 926 * Note: uses shutdown hook instead of destroy hook so that the 927 * persistent links can be torn down before the destroy hooks 928 * in the TCP/IP stack are called. 929 */ 930 netstack_register(NS_STR, str_stack_init, str_stack_shutdown, 931 str_stack_fini); 932 } 933 934 void 935 str_sendsig(vnode_t *vp, int event, uchar_t band, int error) 936 { 937 struct stdata *stp; 938 939 ASSERT(vp->v_stream); 940 stp = vp->v_stream; 941 /* Have to hold sd_lock to prevent siglist from changing */ 942 mutex_enter(&stp->sd_lock); 943 if (stp->sd_sigflags & event) 944 strsendsig(stp->sd_siglist, event, band, error); 945 mutex_exit(&stp->sd_lock); 946 } 947 948 /* 949 * Send the "sevent" set of signals to a process. 950 * This might send more than one signal if the process is registered 951 * for multiple events. The caller should pass in an sevent that only 952 * includes the events for which the process has registered. 953 */ 954 static void 955 dosendsig(proc_t *proc, int events, int sevent, k_siginfo_t *info, 956 uchar_t band, int error) 957 { 958 ASSERT(MUTEX_HELD(&proc->p_lock)); 959 960 info->si_band = 0; 961 info->si_errno = 0; 962 963 if (sevent & S_ERROR) { 964 sevent &= ~S_ERROR; 965 info->si_code = POLL_ERR; 966 info->si_errno = error; 967 TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG, 968 "strsendsig:proc %p info %p", proc, info); 969 sigaddq(proc, NULL, info, KM_NOSLEEP); 970 info->si_errno = 0; 971 } 972 if (sevent & S_HANGUP) { 973 sevent &= ~S_HANGUP; 974 info->si_code = POLL_HUP; 975 TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG, 976 "strsendsig:proc %p info %p", proc, info); 977 sigaddq(proc, NULL, info, KM_NOSLEEP); 978 } 979 if (sevent & S_HIPRI) { 980 sevent &= ~S_HIPRI; 981 info->si_code = POLL_PRI; 982 TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG, 983 "strsendsig:proc %p info %p", proc, info); 984 sigaddq(proc, NULL, info, KM_NOSLEEP); 985 } 986 if (sevent & S_RDBAND) { 987 sevent &= ~S_RDBAND; 988 if (events & S_BANDURG) 989 sigtoproc(proc, NULL, SIGURG); 990 else 991 sigtoproc(proc, NULL, SIGPOLL); 992 } 993 if (sevent & S_WRBAND) { 994 sevent &= ~S_WRBAND; 995 sigtoproc(proc, NULL, SIGPOLL); 996 } 997 if (sevent & S_INPUT) { 998 sevent &= ~S_INPUT; 999 info->si_code = POLL_IN; 1000 info->si_band = band; 1001 TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG, 1002 "strsendsig:proc %p info %p", proc, info); 1003 sigaddq(proc, NULL, info, KM_NOSLEEP); 1004 info->si_band = 0; 1005 } 1006 if (sevent & S_OUTPUT) { 1007 sevent &= ~S_OUTPUT; 1008 info->si_code = POLL_OUT; 1009 info->si_band = band; 1010 TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG, 1011 "strsendsig:proc %p info %p", proc, info); 1012 sigaddq(proc, NULL, info, KM_NOSLEEP); 1013 info->si_band = 0; 1014 } 1015 if (sevent & S_MSG) { 1016 sevent &= ~S_MSG; 1017 info->si_code = POLL_MSG; 1018 info->si_band = band; 1019 TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG, 1020 "strsendsig:proc %p info %p", proc, info); 1021 sigaddq(proc, NULL, info, KM_NOSLEEP); 1022 info->si_band = 0; 1023 } 1024 if (sevent & S_RDNORM) { 1025 sevent &= ~S_RDNORM; 1026 sigtoproc(proc, NULL, SIGPOLL); 1027 } 1028 if (sevent != 0) { 1029 panic("strsendsig: unknown event(s) %x", sevent); 1030 } 1031 } 1032 1033 /* 1034 * Send SIGPOLL/SIGURG signal to all processes and process groups 1035 * registered on the given signal list that want a signal for at 1036 * least one of the specified events. 1037 * 1038 * Must be called with exclusive access to siglist (caller holding sd_lock). 1039 * 1040 * strioctl(I_SETSIG/I_ESETSIG) will only change siglist when holding 1041 * sd_lock and the ioctl code maintains a PID_HOLD on the pid structure 1042 * while it is in the siglist. 1043 * 1044 * For performance reasons (MP scalability) the code drops pidlock 1045 * when sending signals to a single process. 1046 * When sending to a process group the code holds 1047 * pidlock to prevent the membership in the process group from changing 1048 * while walking the p_pglink list. 1049 */ 1050 void 1051 strsendsig(strsig_t *siglist, int event, uchar_t band, int error) 1052 { 1053 strsig_t *ssp; 1054 k_siginfo_t info; 1055 struct pid *pidp; 1056 proc_t *proc; 1057 1058 info.si_signo = SIGPOLL; 1059 info.si_errno = 0; 1060 for (ssp = siglist; ssp; ssp = ssp->ss_next) { 1061 int sevent; 1062 1063 sevent = ssp->ss_events & event; 1064 if (sevent == 0) 1065 continue; 1066 1067 if ((pidp = ssp->ss_pidp) == NULL) { 1068 /* pid was released but still on event list */ 1069 continue; 1070 } 1071 1072 1073 if (ssp->ss_pid > 0) { 1074 /* 1075 * XXX This unfortunately still generates 1076 * a signal when a fd is closed but 1077 * the proc is active. 1078 */ 1079 ASSERT(ssp->ss_pid == pidp->pid_id); 1080 1081 mutex_enter(&pidlock); 1082 proc = prfind_zone(pidp->pid_id, ALL_ZONES); 1083 if (proc == NULL) { 1084 mutex_exit(&pidlock); 1085 continue; 1086 } 1087 mutex_enter(&proc->p_lock); 1088 mutex_exit(&pidlock); 1089 dosendsig(proc, ssp->ss_events, sevent, &info, 1090 band, error); 1091 mutex_exit(&proc->p_lock); 1092 } else { 1093 /* 1094 * Send to process group. Hold pidlock across 1095 * calls to dosendsig(). 1096 */ 1097 pid_t pgrp = -ssp->ss_pid; 1098 1099 mutex_enter(&pidlock); 1100 proc = pgfind_zone(pgrp, ALL_ZONES); 1101 while (proc != NULL) { 1102 mutex_enter(&proc->p_lock); 1103 dosendsig(proc, ssp->ss_events, sevent, 1104 &info, band, error); 1105 mutex_exit(&proc->p_lock); 1106 proc = proc->p_pglink; 1107 } 1108 mutex_exit(&pidlock); 1109 } 1110 } 1111 } 1112 1113 /* 1114 * Attach a stream device or module. 1115 * qp is a read queue; the new queue goes in so its next 1116 * read ptr is the argument, and the write queue corresponding 1117 * to the argument points to this queue. Return 0 on success, 1118 * or a non-zero errno on failure. 1119 */ 1120 int 1121 qattach(queue_t *qp, dev_t *devp, int oflag, cred_t *crp, fmodsw_impl_t *fp, 1122 boolean_t is_insert) 1123 { 1124 major_t major; 1125 cdevsw_impl_t *dp; 1126 struct streamtab *str; 1127 queue_t *rq; 1128 queue_t *wrq; 1129 uint32_t qflag; 1130 uint32_t sqtype; 1131 perdm_t *dmp; 1132 int error; 1133 int sflag; 1134 1135 rq = allocq(); 1136 wrq = _WR(rq); 1137 STREAM(rq) = STREAM(wrq) = STREAM(qp); 1138 1139 if (fp != NULL) { 1140 str = fp->f_str; 1141 qflag = fp->f_qflag; 1142 sqtype = fp->f_sqtype; 1143 dmp = fp->f_dmp; 1144 IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL); 1145 sflag = MODOPEN; 1146 1147 /* 1148 * stash away a pointer to the module structure so we can 1149 * unref it in qdetach. 1150 */ 1151 rq->q_fp = fp; 1152 } else { 1153 ASSERT(!is_insert); 1154 1155 major = getmajor(*devp); 1156 dp = &devimpl[major]; 1157 1158 str = dp->d_str; 1159 ASSERT(str == STREAMSTAB(major)); 1160 1161 qflag = dp->d_qflag; 1162 ASSERT(qflag & QISDRV); 1163 sqtype = dp->d_sqtype; 1164 1165 /* create perdm_t if needed */ 1166 if (NEED_DM(dp->d_dmp, qflag)) 1167 dp->d_dmp = hold_dm(str, qflag, sqtype); 1168 1169 dmp = dp->d_dmp; 1170 sflag = 0; 1171 } 1172 1173 TRACE_2(TR_FAC_STREAMS_FR, TR_QATTACH_FLAGS, 1174 "qattach:qflag == %X(%X)", qflag, *devp); 1175 1176 /* setq might sleep in allocator - avoid holding locks. */ 1177 setq(rq, str->st_rdinit, str->st_wrinit, dmp, qflag, sqtype, B_FALSE); 1178 1179 /* 1180 * Before calling the module's open routine, set up the q_next 1181 * pointer for inserting a module in the middle of a stream. 1182 * 1183 * Note that we can always set _QINSERTING and set up q_next 1184 * pointer for both inserting and pushing a module. Then there 1185 * is no need for the is_insert parameter. In insertq(), called 1186 * by qprocson(), assume that q_next of the new module always points 1187 * to the correct queue and use it for insertion. Everything should 1188 * work out fine. But in the first release of _I_INSERT, we 1189 * distinguish between inserting and pushing to make sure that 1190 * pushing a module follows the same code path as before. 1191 */ 1192 if (is_insert) { 1193 rq->q_flag |= _QINSERTING; 1194 rq->q_next = qp; 1195 } 1196 1197 /* 1198 * If there is an outer perimeter get exclusive access during 1199 * the open procedure. Bump up the reference count on the queue. 1200 */ 1201 entersq(rq->q_syncq, SQ_OPENCLOSE); 1202 error = (*rq->q_qinfo->qi_qopen)(rq, devp, oflag, sflag, crp); 1203 if (error != 0) 1204 goto failed; 1205 leavesq(rq->q_syncq, SQ_OPENCLOSE); 1206 ASSERT(qprocsareon(rq)); 1207 return (0); 1208 1209 failed: 1210 rq->q_flag &= ~_QINSERTING; 1211 if (backq(wrq) != NULL && backq(wrq)->q_next == wrq) 1212 qprocsoff(rq); 1213 leavesq(rq->q_syncq, SQ_OPENCLOSE); 1214 rq->q_next = wrq->q_next = NULL; 1215 qdetach(rq, 0, 0, crp, B_FALSE); 1216 return (error); 1217 } 1218 1219 /* 1220 * Handle second open of stream. For modules, set the 1221 * last argument to MODOPEN and do not pass any open flags. 1222 * Ignore dummydev since this is not the first open. 1223 */ 1224 int 1225 qreopen(queue_t *qp, dev_t *devp, int flag, cred_t *crp) 1226 { 1227 int error; 1228 dev_t dummydev; 1229 queue_t *wqp = _WR(qp); 1230 1231 ASSERT(qp->q_flag & QREADR); 1232 entersq(qp->q_syncq, SQ_OPENCLOSE); 1233 1234 dummydev = *devp; 1235 if (error = ((*qp->q_qinfo->qi_qopen)(qp, &dummydev, 1236 (wqp->q_next ? 0 : flag), (wqp->q_next ? MODOPEN : 0), crp))) { 1237 leavesq(qp->q_syncq, SQ_OPENCLOSE); 1238 mutex_enter(&STREAM(qp)->sd_lock); 1239 qp->q_stream->sd_flag |= STREOPENFAIL; 1240 mutex_exit(&STREAM(qp)->sd_lock); 1241 return (error); 1242 } 1243 leavesq(qp->q_syncq, SQ_OPENCLOSE); 1244 1245 /* 1246 * successful open should have done qprocson() 1247 */ 1248 ASSERT(qprocsareon(_RD(qp))); 1249 return (0); 1250 } 1251 1252 /* 1253 * Detach a stream module or device. 1254 * If clmode == 1 then the module or driver was opened and its 1255 * close routine must be called. If clmode == 0, the module 1256 * or driver was never opened or the open failed, and so its close 1257 * should not be called. 1258 */ 1259 void 1260 qdetach(queue_t *qp, int clmode, int flag, cred_t *crp, boolean_t is_remove) 1261 { 1262 queue_t *wqp = _WR(qp); 1263 ASSERT(STREAM(qp)->sd_flag & (STRCLOSE|STWOPEN|STRPLUMB)); 1264 1265 if (STREAM_NEEDSERVICE(STREAM(qp))) 1266 stream_runservice(STREAM(qp)); 1267 1268 if (clmode) { 1269 /* 1270 * Make sure that all the messages on the write side syncq are 1271 * processed and nothing is left. Since we are closing, no new 1272 * messages may appear there. 1273 */ 1274 wait_q_syncq(wqp); 1275 1276 entersq(qp->q_syncq, SQ_OPENCLOSE); 1277 if (is_remove) { 1278 mutex_enter(QLOCK(qp)); 1279 qp->q_flag |= _QREMOVING; 1280 mutex_exit(QLOCK(qp)); 1281 } 1282 (*qp->q_qinfo->qi_qclose)(qp, flag, crp); 1283 /* 1284 * Check that qprocsoff() was actually called. 1285 */ 1286 ASSERT((qp->q_flag & QWCLOSE) && (wqp->q_flag & QWCLOSE)); 1287 1288 leavesq(qp->q_syncq, SQ_OPENCLOSE); 1289 } else { 1290 disable_svc(qp); 1291 } 1292 1293 /* 1294 * Allow any threads blocked in entersq to proceed and discover 1295 * the QWCLOSE is set. 1296 * Note: This assumes that all users of entersq check QWCLOSE. 1297 * Currently runservice is the only entersq that can happen 1298 * after removeq has finished. 1299 * Removeq will have discarded all messages destined to the closing 1300 * pair of queues from the syncq. 1301 * NOTE: Calling a function inside an assert is unconventional. 1302 * However, it does not cause any problem since flush_syncq() does 1303 * not change any state except when it returns non-zero i.e. 1304 * when the assert will trigger. 1305 */ 1306 ASSERT(flush_syncq(qp->q_syncq, qp) == 0); 1307 ASSERT(flush_syncq(wqp->q_syncq, wqp) == 0); 1308 ASSERT((qp->q_flag & QPERMOD) || 1309 ((qp->q_syncq->sq_head == NULL) && 1310 (wqp->q_syncq->sq_head == NULL))); 1311 1312 /* release any fmodsw_impl_t structure held on behalf of the queue */ 1313 ASSERT(qp->q_fp != NULL || qp->q_flag & QISDRV); 1314 if (qp->q_fp != NULL) 1315 fmodsw_rele(qp->q_fp); 1316 1317 /* freeq removes us from the outer perimeter if any */ 1318 freeq(qp); 1319 } 1320 1321 /* Prevent service procedures from being called */ 1322 void 1323 disable_svc(queue_t *qp) 1324 { 1325 queue_t *wqp = _WR(qp); 1326 1327 ASSERT(qp->q_flag & QREADR); 1328 mutex_enter(QLOCK(qp)); 1329 qp->q_flag |= QWCLOSE; 1330 mutex_exit(QLOCK(qp)); 1331 mutex_enter(QLOCK(wqp)); 1332 wqp->q_flag |= QWCLOSE; 1333 mutex_exit(QLOCK(wqp)); 1334 } 1335 1336 /* Allow service procedures to be called again */ 1337 void 1338 enable_svc(queue_t *qp) 1339 { 1340 queue_t *wqp = _WR(qp); 1341 1342 ASSERT(qp->q_flag & QREADR); 1343 mutex_enter(QLOCK(qp)); 1344 qp->q_flag &= ~QWCLOSE; 1345 mutex_exit(QLOCK(qp)); 1346 mutex_enter(QLOCK(wqp)); 1347 wqp->q_flag &= ~QWCLOSE; 1348 mutex_exit(QLOCK(wqp)); 1349 } 1350 1351 /* 1352 * Remove queue from qhead/qtail if it is enabled. 1353 * Only reset QENAB if the queue was removed from the runlist. 1354 * A queue goes through 3 stages: 1355 * It is on the service list and QENAB is set. 1356 * It is removed from the service list but QENAB is still set. 1357 * QENAB gets changed to QINSERVICE. 1358 * QINSERVICE is reset (when the service procedure is done) 1359 * Thus we can not reset QENAB unless we actually removed it from the service 1360 * queue. 1361 */ 1362 void 1363 remove_runlist(queue_t *qp) 1364 { 1365 if (qp->q_flag & QENAB && qhead != NULL) { 1366 queue_t *q_chase; 1367 queue_t *q_curr; 1368 int removed; 1369 1370 mutex_enter(&service_queue); 1371 RMQ(qp, qhead, qtail, q_link, q_chase, q_curr, removed); 1372 mutex_exit(&service_queue); 1373 if (removed) { 1374 STRSTAT(qremoved); 1375 qp->q_flag &= ~QENAB; 1376 } 1377 } 1378 } 1379 1380 1381 /* 1382 * Wait for any pending service processing to complete. 1383 * The removal of queues from the runlist is not atomic with the 1384 * clearing of the QENABLED flag and setting the INSERVICE flag. 1385 * consequently it is possible for remove_runlist in strclose 1386 * to not find the queue on the runlist but for it to be QENABLED 1387 * and not yet INSERVICE -> hence wait_svc needs to check QENABLED 1388 * as well as INSERVICE. 1389 */ 1390 void 1391 wait_svc(queue_t *qp) 1392 { 1393 queue_t *wqp = _WR(qp); 1394 1395 ASSERT(qp->q_flag & QREADR); 1396 1397 /* 1398 * Try to remove queues from qhead/qtail list. 1399 */ 1400 if (qhead != NULL) { 1401 remove_runlist(qp); 1402 remove_runlist(wqp); 1403 } 1404 /* 1405 * Wait till the syncqs associated with the queue disappear from the 1406 * background processing list. 1407 * This only needs to be done for non-PERMOD perimeters since 1408 * for PERMOD perimeters the syncq may be shared and will only be freed 1409 * when the last module/driver is unloaded. 1410 * If for PERMOD perimeters queue was on the syncq list, removeq() 1411 * should call propagate_syncq() or drain_syncq() for it. Both of these 1412 * functions remove the queue from its syncq list, so sqthread will not 1413 * try to access the queue. 1414 */ 1415 if (!(qp->q_flag & QPERMOD)) { 1416 syncq_t *rsq = qp->q_syncq; 1417 syncq_t *wsq = wqp->q_syncq; 1418 1419 /* 1420 * Disable rsq and wsq and wait for any background processing of 1421 * syncq to complete. 1422 */ 1423 wait_sq_svc(rsq); 1424 if (wsq != rsq) 1425 wait_sq_svc(wsq); 1426 } 1427 1428 mutex_enter(QLOCK(qp)); 1429 while (qp->q_flag & (QINSERVICE|QENAB)) 1430 cv_wait(&qp->q_wait, QLOCK(qp)); 1431 mutex_exit(QLOCK(qp)); 1432 mutex_enter(QLOCK(wqp)); 1433 while (wqp->q_flag & (QINSERVICE|QENAB)) 1434 cv_wait(&wqp->q_wait, QLOCK(wqp)); 1435 mutex_exit(QLOCK(wqp)); 1436 } 1437 1438 /* 1439 * Put ioctl data from userland buffer `arg' into the mblk chain `bp'. 1440 * `flag' must always contain either K_TO_K or U_TO_K; STR_NOSIG may 1441 * also be set, and is passed through to allocb_cred_wait(). 1442 * 1443 * Returns errno on failure, zero on success. 1444 */ 1445 int 1446 putiocd(mblk_t *bp, char *arg, int flag, cred_t *cr) 1447 { 1448 mblk_t *tmp; 1449 ssize_t count; 1450 int error = 0; 1451 1452 ASSERT((flag & (U_TO_K | K_TO_K)) == U_TO_K || 1453 (flag & (U_TO_K | K_TO_K)) == K_TO_K); 1454 1455 if (bp->b_datap->db_type == M_IOCTL) { 1456 count = ((struct iocblk *)bp->b_rptr)->ioc_count; 1457 } else { 1458 ASSERT(bp->b_datap->db_type == M_COPYIN); 1459 count = ((struct copyreq *)bp->b_rptr)->cq_size; 1460 } 1461 /* 1462 * strdoioctl validates ioc_count, so if this assert fails it 1463 * cannot be due to user error. 1464 */ 1465 ASSERT(count >= 0); 1466 1467 if ((tmp = allocb_cred_wait(count, (flag & STR_NOSIG), &error, cr, 1468 curproc->p_pid)) == NULL) { 1469 return (error); 1470 } 1471 error = strcopyin(arg, tmp->b_wptr, count, flag & (U_TO_K|K_TO_K)); 1472 if (error != 0) { 1473 freeb(tmp); 1474 return (error); 1475 } 1476 DB_CPID(tmp) = curproc->p_pid; 1477 tmp->b_wptr += count; 1478 bp->b_cont = tmp; 1479 1480 return (0); 1481 } 1482 1483 /* 1484 * Copy ioctl data to user-land. Return non-zero errno on failure, 1485 * 0 for success. 1486 */ 1487 int 1488 getiocd(mblk_t *bp, char *arg, int copymode) 1489 { 1490 ssize_t count; 1491 size_t n; 1492 int error; 1493 1494 if (bp->b_datap->db_type == M_IOCACK) 1495 count = ((struct iocblk *)bp->b_rptr)->ioc_count; 1496 else { 1497 ASSERT(bp->b_datap->db_type == M_COPYOUT); 1498 count = ((struct copyreq *)bp->b_rptr)->cq_size; 1499 } 1500 ASSERT(count >= 0); 1501 1502 for (bp = bp->b_cont; bp && count; 1503 count -= n, bp = bp->b_cont, arg += n) { 1504 n = MIN(count, bp->b_wptr - bp->b_rptr); 1505 error = strcopyout(bp->b_rptr, arg, n, copymode); 1506 if (error) 1507 return (error); 1508 } 1509 ASSERT(count == 0); 1510 return (0); 1511 } 1512 1513 /* 1514 * Allocate a linkinfo entry given the write queue of the 1515 * bottom module of the top stream and the write queue of the 1516 * stream head of the bottom stream. 1517 */ 1518 linkinfo_t * 1519 alloclink(queue_t *qup, queue_t *qdown, file_t *fpdown) 1520 { 1521 linkinfo_t *linkp; 1522 1523 linkp = kmem_cache_alloc(linkinfo_cache, KM_SLEEP); 1524 1525 linkp->li_lblk.l_qtop = qup; 1526 linkp->li_lblk.l_qbot = qdown; 1527 linkp->li_fpdown = fpdown; 1528 1529 mutex_enter(&strresources); 1530 linkp->li_next = linkinfo_list; 1531 linkp->li_prev = NULL; 1532 if (linkp->li_next) 1533 linkp->li_next->li_prev = linkp; 1534 linkinfo_list = linkp; 1535 linkp->li_lblk.l_index = ++lnk_id; 1536 ASSERT(lnk_id != 0); /* this should never wrap in practice */ 1537 mutex_exit(&strresources); 1538 1539 return (linkp); 1540 } 1541 1542 /* 1543 * Free a linkinfo entry. 1544 */ 1545 void 1546 lbfree(linkinfo_t *linkp) 1547 { 1548 mutex_enter(&strresources); 1549 if (linkp->li_next) 1550 linkp->li_next->li_prev = linkp->li_prev; 1551 if (linkp->li_prev) 1552 linkp->li_prev->li_next = linkp->li_next; 1553 else 1554 linkinfo_list = linkp->li_next; 1555 mutex_exit(&strresources); 1556 1557 kmem_cache_free(linkinfo_cache, linkp); 1558 } 1559 1560 /* 1561 * Check for a potential linking cycle. 1562 * Return 1 if a link will result in a cycle, 1563 * and 0 otherwise. 1564 */ 1565 int 1566 linkcycle(stdata_t *upstp, stdata_t *lostp, str_stack_t *ss) 1567 { 1568 struct mux_node *np; 1569 struct mux_edge *ep; 1570 int i; 1571 major_t lomaj; 1572 major_t upmaj; 1573 /* 1574 * if the lower stream is a pipe/FIFO, return, since link 1575 * cycles can not happen on pipes/FIFOs 1576 */ 1577 if (lostp->sd_vnode->v_type == VFIFO) 1578 return (0); 1579 1580 for (i = 0; i < ss->ss_devcnt; i++) { 1581 np = &ss->ss_mux_nodes[i]; 1582 MUX_CLEAR(np); 1583 } 1584 lomaj = getmajor(lostp->sd_vnode->v_rdev); 1585 upmaj = getmajor(upstp->sd_vnode->v_rdev); 1586 np = &ss->ss_mux_nodes[lomaj]; 1587 for (;;) { 1588 if (!MUX_DIDVISIT(np)) { 1589 if (np->mn_imaj == upmaj) 1590 return (1); 1591 if (np->mn_outp == NULL) { 1592 MUX_VISIT(np); 1593 if (np->mn_originp == NULL) 1594 return (0); 1595 np = np->mn_originp; 1596 continue; 1597 } 1598 MUX_VISIT(np); 1599 np->mn_startp = np->mn_outp; 1600 } else { 1601 if (np->mn_startp == NULL) { 1602 if (np->mn_originp == NULL) 1603 return (0); 1604 else { 1605 np = np->mn_originp; 1606 continue; 1607 } 1608 } 1609 /* 1610 * If ep->me_nodep is a FIFO (me_nodep == NULL), 1611 * ignore the edge and move on. ep->me_nodep gets 1612 * set to NULL in mux_addedge() if it is a FIFO. 1613 * 1614 */ 1615 ep = np->mn_startp; 1616 np->mn_startp = ep->me_nextp; 1617 if (ep->me_nodep == NULL) 1618 continue; 1619 ep->me_nodep->mn_originp = np; 1620 np = ep->me_nodep; 1621 } 1622 } 1623 } 1624 1625 /* 1626 * Find linkinfo entry corresponding to the parameters. 1627 */ 1628 linkinfo_t * 1629 findlinks(stdata_t *stp, int index, int type, str_stack_t *ss) 1630 { 1631 linkinfo_t *linkp; 1632 struct mux_edge *mep; 1633 struct mux_node *mnp; 1634 queue_t *qup; 1635 1636 mutex_enter(&strresources); 1637 if ((type & LINKTYPEMASK) == LINKNORMAL) { 1638 qup = getendq(stp->sd_wrq); 1639 for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) { 1640 if ((qup == linkp->li_lblk.l_qtop) && 1641 (!index || (index == linkp->li_lblk.l_index))) { 1642 mutex_exit(&strresources); 1643 return (linkp); 1644 } 1645 } 1646 } else { 1647 ASSERT((type & LINKTYPEMASK) == LINKPERSIST); 1648 mnp = &ss->ss_mux_nodes[getmajor(stp->sd_vnode->v_rdev)]; 1649 mep = mnp->mn_outp; 1650 while (mep) { 1651 if ((index == 0) || (index == mep->me_muxid)) 1652 break; 1653 mep = mep->me_nextp; 1654 } 1655 if (!mep) { 1656 mutex_exit(&strresources); 1657 return (NULL); 1658 } 1659 for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) { 1660 if ((!linkp->li_lblk.l_qtop) && 1661 (mep->me_muxid == linkp->li_lblk.l_index)) { 1662 mutex_exit(&strresources); 1663 return (linkp); 1664 } 1665 } 1666 } 1667 mutex_exit(&strresources); 1668 return (NULL); 1669 } 1670 1671 /* 1672 * Given a queue ptr, follow the chain of q_next pointers until you reach the 1673 * last queue on the chain and return it. 1674 */ 1675 queue_t * 1676 getendq(queue_t *q) 1677 { 1678 ASSERT(q != NULL); 1679 while (_SAMESTR(q)) 1680 q = q->q_next; 1681 return (q); 1682 } 1683 1684 /* 1685 * Wait for the syncq count to drop to zero. 1686 * sq could be either outer or inner. 1687 */ 1688 1689 static void 1690 wait_syncq(syncq_t *sq) 1691 { 1692 uint16_t count; 1693 1694 mutex_enter(SQLOCK(sq)); 1695 count = sq->sq_count; 1696 SQ_PUTLOCKS_ENTER(sq); 1697 SUM_SQ_PUTCOUNTS(sq, count); 1698 while (count != 0) { 1699 sq->sq_flags |= SQ_WANTWAKEUP; 1700 SQ_PUTLOCKS_EXIT(sq); 1701 cv_wait(&sq->sq_wait, SQLOCK(sq)); 1702 count = sq->sq_count; 1703 SQ_PUTLOCKS_ENTER(sq); 1704 SUM_SQ_PUTCOUNTS(sq, count); 1705 } 1706 SQ_PUTLOCKS_EXIT(sq); 1707 mutex_exit(SQLOCK(sq)); 1708 } 1709 1710 /* 1711 * Wait while there are any messages for the queue in its syncq. 1712 */ 1713 static void 1714 wait_q_syncq(queue_t *q) 1715 { 1716 if ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) { 1717 syncq_t *sq = q->q_syncq; 1718 1719 mutex_enter(SQLOCK(sq)); 1720 while ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) { 1721 sq->sq_flags |= SQ_WANTWAKEUP; 1722 cv_wait(&sq->sq_wait, SQLOCK(sq)); 1723 } 1724 mutex_exit(SQLOCK(sq)); 1725 } 1726 } 1727 1728 1729 int 1730 mlink_file(vnode_t *vp, int cmd, struct file *fpdown, cred_t *crp, int *rvalp, 1731 int lhlink) 1732 { 1733 struct stdata *stp; 1734 struct strioctl strioc; 1735 struct linkinfo *linkp; 1736 struct stdata *stpdown; 1737 struct streamtab *str; 1738 queue_t *passq; 1739 syncq_t *passyncq; 1740 queue_t *rq; 1741 cdevsw_impl_t *dp; 1742 uint32_t qflag; 1743 uint32_t sqtype; 1744 perdm_t *dmp; 1745 int error = 0; 1746 netstack_t *ns; 1747 str_stack_t *ss; 1748 1749 stp = vp->v_stream; 1750 TRACE_1(TR_FAC_STREAMS_FR, 1751 TR_I_LINK, "I_LINK/I_PLINK:stp %p", stp); 1752 /* 1753 * Test for invalid upper stream 1754 */ 1755 if (stp->sd_flag & STRHUP) { 1756 return (ENXIO); 1757 } 1758 if (vp->v_type == VFIFO) { 1759 return (EINVAL); 1760 } 1761 if (stp->sd_strtab == NULL) { 1762 return (EINVAL); 1763 } 1764 if (!stp->sd_strtab->st_muxwinit) { 1765 return (EINVAL); 1766 } 1767 if (fpdown == NULL) { 1768 return (EBADF); 1769 } 1770 ns = netstack_find_by_cred(crp); 1771 ASSERT(ns != NULL); 1772 ss = ns->netstack_str; 1773 ASSERT(ss != NULL); 1774 1775 if (getmajor(stp->sd_vnode->v_rdev) >= ss->ss_devcnt) { 1776 netstack_rele(ss->ss_netstack); 1777 return (EINVAL); 1778 } 1779 mutex_enter(&muxifier); 1780 if (stp->sd_flag & STPLEX) { 1781 mutex_exit(&muxifier); 1782 netstack_rele(ss->ss_netstack); 1783 return (ENXIO); 1784 } 1785 1786 /* 1787 * Test for invalid lower stream. 1788 * The check for the v_type != VFIFO and having a major 1789 * number not >= devcnt is done to avoid problems with 1790 * adding mux_node entry past the end of mux_nodes[]. 1791 * For FIFO's we don't add an entry so this isn't a 1792 * problem. 1793 */ 1794 if (((stpdown = fpdown->f_vnode->v_stream) == NULL) || 1795 (stpdown == stp) || (stpdown->sd_flag & 1796 (STPLEX|STRHUP|STRDERR|STWRERR|IOCWAIT|STRPLUMB)) || 1797 ((stpdown->sd_vnode->v_type != VFIFO) && 1798 (getmajor(stpdown->sd_vnode->v_rdev) >= ss->ss_devcnt)) || 1799 linkcycle(stp, stpdown, ss)) { 1800 mutex_exit(&muxifier); 1801 netstack_rele(ss->ss_netstack); 1802 return (EINVAL); 1803 } 1804 TRACE_1(TR_FAC_STREAMS_FR, 1805 TR_STPDOWN, "stpdown:%p", stpdown); 1806 rq = getendq(stp->sd_wrq); 1807 if (cmd == I_PLINK) 1808 rq = NULL; 1809 1810 linkp = alloclink(rq, stpdown->sd_wrq, fpdown); 1811 1812 strioc.ic_cmd = cmd; 1813 strioc.ic_timout = INFTIM; 1814 strioc.ic_len = sizeof (struct linkblk); 1815 strioc.ic_dp = (char *)&linkp->li_lblk; 1816 1817 /* 1818 * STRPLUMB protects plumbing changes and should be set before 1819 * link_addpassthru()/link_rempassthru() are called, so it is set here 1820 * and cleared in the end of mlink when passthru queue is removed. 1821 * Setting of STRPLUMB prevents reopens of the stream while passthru 1822 * queue is in-place (it is not a proper module and doesn't have open 1823 * entry point). 1824 * 1825 * STPLEX prevents any threads from entering the stream from above. It 1826 * can't be set before the call to link_addpassthru() because putnext 1827 * from below may cause stream head I/O routines to be called and these 1828 * routines assert that STPLEX is not set. After link_addpassthru() 1829 * nothing may come from below since the pass queue syncq is blocked. 1830 * Note also that STPLEX should be cleared before the call to 1831 * link_rempassthru() since when messages start flowing to the stream 1832 * head (e.g. because of message propagation from the pass queue) stream 1833 * head I/O routines may be called with STPLEX flag set. 1834 * 1835 * When STPLEX is set, nothing may come into the stream from above and 1836 * it is safe to do a setq which will change stream head. So, the 1837 * correct sequence of actions is: 1838 * 1839 * 1) Set STRPLUMB 1840 * 2) Call link_addpassthru() 1841 * 3) Set STPLEX 1842 * 4) Call setq and update the stream state 1843 * 5) Clear STPLEX 1844 * 6) Call link_rempassthru() 1845 * 7) Clear STRPLUMB 1846 * 1847 * The same sequence applies to munlink() code. 1848 */ 1849 mutex_enter(&stpdown->sd_lock); 1850 stpdown->sd_flag |= STRPLUMB; 1851 mutex_exit(&stpdown->sd_lock); 1852 /* 1853 * Add passthru queue below lower mux. This will block 1854 * syncqs of lower muxs read queue during I_LINK/I_UNLINK. 1855 */ 1856 passq = link_addpassthru(stpdown); 1857 1858 mutex_enter(&stpdown->sd_lock); 1859 stpdown->sd_flag |= STPLEX; 1860 mutex_exit(&stpdown->sd_lock); 1861 1862 rq = _RD(stpdown->sd_wrq); 1863 /* 1864 * There may be messages in the streamhead's syncq due to messages 1865 * that arrived before link_addpassthru() was done. To avoid 1866 * background processing of the syncq happening simultaneous with 1867 * setq processing, we disable the streamhead syncq and wait until 1868 * existing background thread finishes working on it. 1869 */ 1870 wait_sq_svc(rq->q_syncq); 1871 passyncq = passq->q_syncq; 1872 if (!(passyncq->sq_flags & SQ_BLOCKED)) 1873 blocksq(passyncq, SQ_BLOCKED, 0); 1874 1875 ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE); 1876 ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq)); 1877 rq->q_ptr = _WR(rq)->q_ptr = NULL; 1878 1879 /* setq might sleep in allocator - avoid holding locks. */ 1880 /* Note: we are holding muxifier here. */ 1881 1882 str = stp->sd_strtab; 1883 dp = &devimpl[getmajor(vp->v_rdev)]; 1884 ASSERT(dp->d_str == str); 1885 1886 qflag = dp->d_qflag; 1887 sqtype = dp->d_sqtype; 1888 1889 /* create perdm_t if needed */ 1890 if (NEED_DM(dp->d_dmp, qflag)) 1891 dp->d_dmp = hold_dm(str, qflag, sqtype); 1892 1893 dmp = dp->d_dmp; 1894 1895 setq(rq, str->st_muxrinit, str->st_muxwinit, dmp, qflag, sqtype, 1896 B_TRUE); 1897 1898 /* 1899 * XXX Remove any "odd" messages from the queue. 1900 * Keep only M_DATA, M_PROTO, M_PCPROTO. 1901 */ 1902 error = strdoioctl(stp, &strioc, FNATIVE, 1903 K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp); 1904 if (error != 0) { 1905 lbfree(linkp); 1906 1907 if (!(passyncq->sq_flags & SQ_BLOCKED)) 1908 blocksq(passyncq, SQ_BLOCKED, 0); 1909 /* 1910 * Restore the stream head queue and then remove 1911 * the passq. Turn off STPLEX before we turn on 1912 * the stream by removing the passq. 1913 */ 1914 rq->q_ptr = _WR(rq)->q_ptr = stpdown; 1915 setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, 1916 B_TRUE); 1917 1918 mutex_enter(&stpdown->sd_lock); 1919 stpdown->sd_flag &= ~STPLEX; 1920 mutex_exit(&stpdown->sd_lock); 1921 1922 link_rempassthru(passq); 1923 1924 mutex_enter(&stpdown->sd_lock); 1925 stpdown->sd_flag &= ~STRPLUMB; 1926 /* Wakeup anyone waiting for STRPLUMB to clear. */ 1927 cv_broadcast(&stpdown->sd_monitor); 1928 mutex_exit(&stpdown->sd_lock); 1929 1930 mutex_exit(&muxifier); 1931 netstack_rele(ss->ss_netstack); 1932 return (error); 1933 } 1934 mutex_enter(&fpdown->f_tlock); 1935 fpdown->f_count++; 1936 mutex_exit(&fpdown->f_tlock); 1937 1938 /* 1939 * if we've made it here the linkage is all set up so we should also 1940 * set up the layered driver linkages 1941 */ 1942 1943 ASSERT((cmd == I_LINK) || (cmd == I_PLINK)); 1944 if (cmd == I_LINK) { 1945 ldi_mlink_fp(stp, fpdown, lhlink, LINKNORMAL); 1946 } else { 1947 ldi_mlink_fp(stp, fpdown, lhlink, LINKPERSIST); 1948 } 1949 1950 link_rempassthru(passq); 1951 1952 mux_addedge(stp, stpdown, linkp->li_lblk.l_index, ss); 1953 1954 /* 1955 * Mark the upper stream as having dependent links 1956 * so that strclose can clean it up. 1957 */ 1958 if (cmd == I_LINK) { 1959 mutex_enter(&stp->sd_lock); 1960 stp->sd_flag |= STRHASLINKS; 1961 mutex_exit(&stp->sd_lock); 1962 } 1963 /* 1964 * Wake up any other processes that may have been 1965 * waiting on the lower stream. These will all 1966 * error out. 1967 */ 1968 mutex_enter(&stpdown->sd_lock); 1969 /* The passthru module is removed so we may release STRPLUMB */ 1970 stpdown->sd_flag &= ~STRPLUMB; 1971 cv_broadcast(&rq->q_wait); 1972 cv_broadcast(&_WR(rq)->q_wait); 1973 cv_broadcast(&stpdown->sd_monitor); 1974 mutex_exit(&stpdown->sd_lock); 1975 mutex_exit(&muxifier); 1976 *rvalp = linkp->li_lblk.l_index; 1977 netstack_rele(ss->ss_netstack); 1978 return (0); 1979 } 1980 1981 int 1982 mlink(vnode_t *vp, int cmd, int arg, cred_t *crp, int *rvalp, int lhlink) 1983 { 1984 int ret; 1985 struct file *fpdown; 1986 1987 fpdown = getf(arg); 1988 ret = mlink_file(vp, cmd, fpdown, crp, rvalp, lhlink); 1989 if (fpdown != NULL) 1990 releasef(arg); 1991 return (ret); 1992 } 1993 1994 /* 1995 * Unlink a multiplexor link. Stp is the controlling stream for the 1996 * link, and linkp points to the link's entry in the linkinfo list. 1997 * The muxifier lock must be held on entry and is dropped on exit. 1998 * 1999 * NOTE : Currently it is assumed that mux would process all the messages 2000 * sitting on it's queue before ACKing the UNLINK. It is the responsibility 2001 * of the mux to handle all the messages that arrive before UNLINK. 2002 * If the mux has to send down messages on its lower stream before 2003 * ACKing I_UNLINK, then it *should* know to handle messages even 2004 * after the UNLINK is acked (actually it should be able to handle till we 2005 * re-block the read side of the pass queue here). If the mux does not 2006 * open up the lower stream, any messages that arrive during UNLINK 2007 * will be put in the stream head. In the case of lower stream opening 2008 * up, some messages might land in the stream head depending on when 2009 * the message arrived and when the read side of the pass queue was 2010 * re-blocked. 2011 */ 2012 int 2013 munlink(stdata_t *stp, linkinfo_t *linkp, int flag, cred_t *crp, int *rvalp, 2014 str_stack_t *ss) 2015 { 2016 struct strioctl strioc; 2017 struct stdata *stpdown; 2018 queue_t *rq, *wrq; 2019 queue_t *passq; 2020 syncq_t *passyncq; 2021 int error = 0; 2022 file_t *fpdown; 2023 2024 ASSERT(MUTEX_HELD(&muxifier)); 2025 2026 stpdown = linkp->li_fpdown->f_vnode->v_stream; 2027 2028 /* 2029 * See the comment in mlink() concerning STRPLUMB/STPLEX flags. 2030 */ 2031 mutex_enter(&stpdown->sd_lock); 2032 stpdown->sd_flag |= STRPLUMB; 2033 mutex_exit(&stpdown->sd_lock); 2034 2035 /* 2036 * Add passthru queue below lower mux. This will block 2037 * syncqs of lower muxs read queue during I_LINK/I_UNLINK. 2038 */ 2039 passq = link_addpassthru(stpdown); 2040 2041 if ((flag & LINKTYPEMASK) == LINKNORMAL) 2042 strioc.ic_cmd = I_UNLINK; 2043 else 2044 strioc.ic_cmd = I_PUNLINK; 2045 strioc.ic_timout = INFTIM; 2046 strioc.ic_len = sizeof (struct linkblk); 2047 strioc.ic_dp = (char *)&linkp->li_lblk; 2048 2049 error = strdoioctl(stp, &strioc, FNATIVE, 2050 K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp); 2051 2052 /* 2053 * If there was an error and this is not called via strclose, 2054 * return to the user. Otherwise, pretend there was no error 2055 * and close the link. 2056 */ 2057 if (error) { 2058 if (flag & LINKCLOSE) { 2059 cmn_err(CE_WARN, "KERNEL: munlink: could not perform " 2060 "unlink ioctl, closing anyway (%d)\n", error); 2061 } else { 2062 link_rempassthru(passq); 2063 mutex_enter(&stpdown->sd_lock); 2064 stpdown->sd_flag &= ~STRPLUMB; 2065 cv_broadcast(&stpdown->sd_monitor); 2066 mutex_exit(&stpdown->sd_lock); 2067 mutex_exit(&muxifier); 2068 return (error); 2069 } 2070 } 2071 2072 mux_rmvedge(stp, linkp->li_lblk.l_index, ss); 2073 fpdown = linkp->li_fpdown; 2074 lbfree(linkp); 2075 2076 /* 2077 * We go ahead and drop muxifier here--it's a nasty global lock that 2078 * can slow others down. It's okay to since attempts to mlink() this 2079 * stream will be stopped because STPLEX is still set in the stdata 2080 * structure, and munlink() is stopped because mux_rmvedge() and 2081 * lbfree() have removed it from mux_nodes[] and linkinfo_list, 2082 * respectively. Note that we defer the closef() of fpdown until 2083 * after we drop muxifier since strclose() can call munlinkall(). 2084 */ 2085 mutex_exit(&muxifier); 2086 2087 wrq = stpdown->sd_wrq; 2088 rq = _RD(wrq); 2089 2090 /* 2091 * Get rid of outstanding service procedure runs, before we make 2092 * it a stream head, since a stream head doesn't have any service 2093 * procedure. 2094 */ 2095 disable_svc(rq); 2096 wait_svc(rq); 2097 2098 /* 2099 * Since we don't disable the syncq for QPERMOD, we wait for whatever 2100 * is queued up to be finished. mux should take care that nothing is 2101 * send down to this queue. We should do it now as we're going to block 2102 * passyncq if it was unblocked. 2103 */ 2104 if (wrq->q_flag & QPERMOD) { 2105 syncq_t *sq = wrq->q_syncq; 2106 2107 mutex_enter(SQLOCK(sq)); 2108 while (wrq->q_sqflags & Q_SQQUEUED) { 2109 sq->sq_flags |= SQ_WANTWAKEUP; 2110 cv_wait(&sq->sq_wait, SQLOCK(sq)); 2111 } 2112 mutex_exit(SQLOCK(sq)); 2113 } 2114 passyncq = passq->q_syncq; 2115 if (!(passyncq->sq_flags & SQ_BLOCKED)) { 2116 2117 syncq_t *sq, *outer; 2118 2119 /* 2120 * Messages could be flowing from underneath. We will 2121 * block the read side of the passq. This would be 2122 * sufficient for QPAIR and QPERQ muxes to ensure 2123 * that no data is flowing up into this queue 2124 * and hence no thread active in this instance of 2125 * lower mux. But for QPERMOD and QMTOUTPERIM there 2126 * could be messages on the inner and outer/inner 2127 * syncqs respectively. We will wait for them to drain. 2128 * Because passq is blocked messages end up in the syncq 2129 * And qfill_syncq could possibly end up setting QFULL 2130 * which will access the rq->q_flag. Hence, we have to 2131 * acquire the QLOCK in setq. 2132 * 2133 * XXX Messages can also flow from top into this 2134 * queue though the unlink is over (Ex. some instance 2135 * in putnext() called from top that has still not 2136 * accessed this queue. And also putq(lowerq) ?). 2137 * Solution : How about blocking the l_qtop queue ? 2138 * Do we really care about such pure D_MP muxes ? 2139 */ 2140 2141 blocksq(passyncq, SQ_BLOCKED, 0); 2142 2143 sq = rq->q_syncq; 2144 if ((outer = sq->sq_outer) != NULL) { 2145 2146 /* 2147 * We have to just wait for the outer sq_count 2148 * drop to zero. As this does not prevent new 2149 * messages to enter the outer perimeter, this 2150 * is subject to starvation. 2151 * 2152 * NOTE :Because of blocksq above, messages could 2153 * be in the inner syncq only because of some 2154 * thread holding the outer perimeter exclusively. 2155 * Hence it would be sufficient to wait for the 2156 * exclusive holder of the outer perimeter to drain 2157 * the inner and outer syncqs. But we will not depend 2158 * on this feature and hence check the inner syncqs 2159 * separately. 2160 */ 2161 wait_syncq(outer); 2162 } 2163 2164 2165 /* 2166 * There could be messages destined for 2167 * this queue. Let the exclusive holder 2168 * drain it. 2169 */ 2170 2171 wait_syncq(sq); 2172 ASSERT((rq->q_flag & QPERMOD) || 2173 ((rq->q_syncq->sq_head == NULL) && 2174 (_WR(rq)->q_syncq->sq_head == NULL))); 2175 } 2176 2177 /* 2178 * We haven't taken care of QPERMOD case yet. QPERMOD is a special 2179 * case as we don't disable its syncq or remove it off the syncq 2180 * service list. 2181 */ 2182 if (rq->q_flag & QPERMOD) { 2183 syncq_t *sq = rq->q_syncq; 2184 2185 mutex_enter(SQLOCK(sq)); 2186 while (rq->q_sqflags & Q_SQQUEUED) { 2187 sq->sq_flags |= SQ_WANTWAKEUP; 2188 cv_wait(&sq->sq_wait, SQLOCK(sq)); 2189 } 2190 mutex_exit(SQLOCK(sq)); 2191 } 2192 2193 /* 2194 * flush_syncq changes states only when there are some messages to 2195 * free, i.e. when it returns non-zero value to return. 2196 */ 2197 ASSERT(flush_syncq(rq->q_syncq, rq) == 0); 2198 ASSERT(flush_syncq(wrq->q_syncq, wrq) == 0); 2199 2200 /* 2201 * Nobody else should know about this queue now. 2202 * If the mux did not process the messages before 2203 * acking the I_UNLINK, free them now. 2204 */ 2205 2206 flushq(rq, FLUSHALL); 2207 flushq(_WR(rq), FLUSHALL); 2208 2209 /* 2210 * Convert the mux lower queue into a stream head queue. 2211 * Turn off STPLEX before we turn on the stream by removing the passq. 2212 */ 2213 rq->q_ptr = wrq->q_ptr = stpdown; 2214 setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, B_TRUE); 2215 2216 ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE); 2217 ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq)); 2218 2219 enable_svc(rq); 2220 2221 /* 2222 * Now it is a proper stream, so STPLEX is cleared. But STRPLUMB still 2223 * needs to be set to prevent reopen() of the stream - such reopen may 2224 * try to call non-existent pass queue open routine and panic. 2225 */ 2226 mutex_enter(&stpdown->sd_lock); 2227 stpdown->sd_flag &= ~STPLEX; 2228 mutex_exit(&stpdown->sd_lock); 2229 2230 ASSERT(((flag & LINKTYPEMASK) == LINKNORMAL) || 2231 ((flag & LINKTYPEMASK) == LINKPERSIST)); 2232 2233 /* clean up the layered driver linkages */ 2234 if ((flag & LINKTYPEMASK) == LINKNORMAL) { 2235 ldi_munlink_fp(stp, fpdown, LINKNORMAL); 2236 } else { 2237 ldi_munlink_fp(stp, fpdown, LINKPERSIST); 2238 } 2239 2240 link_rempassthru(passq); 2241 2242 /* 2243 * Now all plumbing changes are finished and STRPLUMB is no 2244 * longer needed. 2245 */ 2246 mutex_enter(&stpdown->sd_lock); 2247 stpdown->sd_flag &= ~STRPLUMB; 2248 cv_broadcast(&stpdown->sd_monitor); 2249 mutex_exit(&stpdown->sd_lock); 2250 2251 (void) closef(fpdown); 2252 return (0); 2253 } 2254 2255 /* 2256 * Unlink all multiplexor links for which stp is the controlling stream. 2257 * Return 0, or a non-zero errno on failure. 2258 */ 2259 int 2260 munlinkall(stdata_t *stp, int flag, cred_t *crp, int *rvalp, str_stack_t *ss) 2261 { 2262 linkinfo_t *linkp; 2263 int error = 0; 2264 2265 mutex_enter(&muxifier); 2266 while (linkp = findlinks(stp, 0, flag, ss)) { 2267 /* 2268 * munlink() releases the muxifier lock. 2269 */ 2270 if (error = munlink(stp, linkp, flag, crp, rvalp, ss)) 2271 return (error); 2272 mutex_enter(&muxifier); 2273 } 2274 mutex_exit(&muxifier); 2275 return (0); 2276 } 2277 2278 /* 2279 * A multiplexor link has been made. Add an 2280 * edge to the directed graph. 2281 */ 2282 void 2283 mux_addedge(stdata_t *upstp, stdata_t *lostp, int muxid, str_stack_t *ss) 2284 { 2285 struct mux_node *np; 2286 struct mux_edge *ep; 2287 major_t upmaj; 2288 major_t lomaj; 2289 2290 upmaj = getmajor(upstp->sd_vnode->v_rdev); 2291 lomaj = getmajor(lostp->sd_vnode->v_rdev); 2292 np = &ss->ss_mux_nodes[upmaj]; 2293 if (np->mn_outp) { 2294 ep = np->mn_outp; 2295 while (ep->me_nextp) 2296 ep = ep->me_nextp; 2297 ep->me_nextp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP); 2298 ep = ep->me_nextp; 2299 } else { 2300 np->mn_outp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP); 2301 ep = np->mn_outp; 2302 } 2303 ep->me_nextp = NULL; 2304 ep->me_muxid = muxid; 2305 /* 2306 * Save the dev_t for the purposes of str_stack_shutdown. 2307 * str_stack_shutdown assumes that the device allows reopen, since 2308 * this dev_t is the one after any cloning by xx_open(). 2309 * Would prefer finding the dev_t from before any cloning, 2310 * but specfs doesn't retain that. 2311 */ 2312 ep->me_dev = upstp->sd_vnode->v_rdev; 2313 if (lostp->sd_vnode->v_type == VFIFO) 2314 ep->me_nodep = NULL; 2315 else 2316 ep->me_nodep = &ss->ss_mux_nodes[lomaj]; 2317 } 2318 2319 /* 2320 * A multiplexor link has been removed. Remove the 2321 * edge in the directed graph. 2322 */ 2323 void 2324 mux_rmvedge(stdata_t *upstp, int muxid, str_stack_t *ss) 2325 { 2326 struct mux_node *np; 2327 struct mux_edge *ep; 2328 struct mux_edge *pep = NULL; 2329 major_t upmaj; 2330 2331 upmaj = getmajor(upstp->sd_vnode->v_rdev); 2332 np = &ss->ss_mux_nodes[upmaj]; 2333 ASSERT(np->mn_outp != NULL); 2334 ep = np->mn_outp; 2335 while (ep) { 2336 if (ep->me_muxid == muxid) { 2337 if (pep) 2338 pep->me_nextp = ep->me_nextp; 2339 else 2340 np->mn_outp = ep->me_nextp; 2341 kmem_free(ep, sizeof (struct mux_edge)); 2342 return; 2343 } 2344 pep = ep; 2345 ep = ep->me_nextp; 2346 } 2347 ASSERT(0); /* should not reach here */ 2348 } 2349 2350 /* 2351 * Translate the device flags (from conf.h) to the corresponding 2352 * qflag and sq_flag (type) values. 2353 */ 2354 int 2355 devflg_to_qflag(struct streamtab *stp, uint32_t devflag, uint32_t *qflagp, 2356 uint32_t *sqtypep) 2357 { 2358 uint32_t qflag = 0; 2359 uint32_t sqtype = 0; 2360 2361 if (devflag & _D_OLD) 2362 goto bad; 2363 2364 /* Inner perimeter presence and scope */ 2365 switch (devflag & D_MTINNER_MASK) { 2366 case D_MP: 2367 qflag |= QMTSAFE; 2368 sqtype |= SQ_CI; 2369 break; 2370 case D_MTPERQ|D_MP: 2371 qflag |= QPERQ; 2372 break; 2373 case D_MTQPAIR|D_MP: 2374 qflag |= QPAIR; 2375 break; 2376 case D_MTPERMOD|D_MP: 2377 qflag |= QPERMOD; 2378 break; 2379 default: 2380 goto bad; 2381 } 2382 2383 /* Outer perimeter */ 2384 if (devflag & D_MTOUTPERIM) { 2385 switch (devflag & D_MTINNER_MASK) { 2386 case D_MP: 2387 case D_MTPERQ|D_MP: 2388 case D_MTQPAIR|D_MP: 2389 break; 2390 default: 2391 goto bad; 2392 } 2393 qflag |= QMTOUTPERIM; 2394 } 2395 2396 /* Inner perimeter modifiers */ 2397 if (devflag & D_MTINNER_MOD) { 2398 switch (devflag & D_MTINNER_MASK) { 2399 case D_MP: 2400 goto bad; 2401 default: 2402 break; 2403 } 2404 if (devflag & D_MTPUTSHARED) 2405 sqtype |= SQ_CIPUT; 2406 if (devflag & _D_MTOCSHARED) { 2407 /* 2408 * The code in putnext assumes that it has the 2409 * highest concurrency by not checking sq_count. 2410 * Thus _D_MTOCSHARED can only be supported when 2411 * D_MTPUTSHARED is set. 2412 */ 2413 if (!(devflag & D_MTPUTSHARED)) 2414 goto bad; 2415 sqtype |= SQ_CIOC; 2416 } 2417 if (devflag & _D_MTCBSHARED) { 2418 /* 2419 * The code in putnext assumes that it has the 2420 * highest concurrency by not checking sq_count. 2421 * Thus _D_MTCBSHARED can only be supported when 2422 * D_MTPUTSHARED is set. 2423 */ 2424 if (!(devflag & D_MTPUTSHARED)) 2425 goto bad; 2426 sqtype |= SQ_CICB; 2427 } 2428 if (devflag & _D_MTSVCSHARED) { 2429 /* 2430 * The code in putnext assumes that it has the 2431 * highest concurrency by not checking sq_count. 2432 * Thus _D_MTSVCSHARED can only be supported when 2433 * D_MTPUTSHARED is set. Also _D_MTSVCSHARED is 2434 * supported only for QPERMOD. 2435 */ 2436 if (!(devflag & D_MTPUTSHARED) || !(qflag & QPERMOD)) 2437 goto bad; 2438 sqtype |= SQ_CISVC; 2439 } 2440 } 2441 2442 /* Default outer perimeter concurrency */ 2443 sqtype |= SQ_CO; 2444 2445 /* Outer perimeter modifiers */ 2446 if (devflag & D_MTOCEXCL) { 2447 if (!(devflag & D_MTOUTPERIM)) { 2448 /* No outer perimeter */ 2449 goto bad; 2450 } 2451 sqtype &= ~SQ_COOC; 2452 } 2453 2454 /* Synchronous Streams extended qinit structure */ 2455 if (devflag & D_SYNCSTR) 2456 qflag |= QSYNCSTR; 2457 2458 /* 2459 * Private flag used by a transport module to indicate 2460 * to sockfs that it supports direct-access mode without 2461 * having to go through STREAMS. 2462 */ 2463 if (devflag & _D_DIRECT) { 2464 /* Reject unless the module is fully-MT (no perimeter) */ 2465 if ((qflag & QMT_TYPEMASK) != QMTSAFE) 2466 goto bad; 2467 qflag |= _QDIRECT; 2468 } 2469 2470 *qflagp = qflag; 2471 *sqtypep = sqtype; 2472 return (0); 2473 2474 bad: 2475 cmn_err(CE_WARN, 2476 "stropen: bad MT flags (0x%x) in driver '%s'", 2477 (int)(qflag & D_MTSAFETY_MASK), 2478 stp->st_rdinit->qi_minfo->mi_idname); 2479 2480 return (EINVAL); 2481 } 2482 2483 /* 2484 * Set the interface values for a pair of queues (qinit structure, 2485 * packet sizes, water marks). 2486 * setq assumes that the caller does not have a claim (entersq or claimq) 2487 * on the queue. 2488 */ 2489 void 2490 setq(queue_t *rq, struct qinit *rinit, struct qinit *winit, 2491 perdm_t *dmp, uint32_t qflag, uint32_t sqtype, boolean_t lock_needed) 2492 { 2493 queue_t *wq; 2494 syncq_t *sq, *outer; 2495 2496 ASSERT(rq->q_flag & QREADR); 2497 ASSERT((qflag & QMT_TYPEMASK) != 0); 2498 IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL); 2499 2500 wq = _WR(rq); 2501 rq->q_qinfo = rinit; 2502 rq->q_hiwat = rinit->qi_minfo->mi_hiwat; 2503 rq->q_lowat = rinit->qi_minfo->mi_lowat; 2504 rq->q_minpsz = rinit->qi_minfo->mi_minpsz; 2505 rq->q_maxpsz = rinit->qi_minfo->mi_maxpsz; 2506 wq->q_qinfo = winit; 2507 wq->q_hiwat = winit->qi_minfo->mi_hiwat; 2508 wq->q_lowat = winit->qi_minfo->mi_lowat; 2509 wq->q_minpsz = winit->qi_minfo->mi_minpsz; 2510 wq->q_maxpsz = winit->qi_minfo->mi_maxpsz; 2511 2512 /* Remove old syncqs */ 2513 sq = rq->q_syncq; 2514 outer = sq->sq_outer; 2515 if (outer != NULL) { 2516 ASSERT(wq->q_syncq->sq_outer == outer); 2517 outer_remove(outer, rq->q_syncq); 2518 if (wq->q_syncq != rq->q_syncq) 2519 outer_remove(outer, wq->q_syncq); 2520 } 2521 ASSERT(sq->sq_outer == NULL); 2522 ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL); 2523 2524 if (sq != SQ(rq)) { 2525 if (!(rq->q_flag & QPERMOD)) 2526 free_syncq(sq); 2527 if (wq->q_syncq == rq->q_syncq) 2528 wq->q_syncq = NULL; 2529 rq->q_syncq = NULL; 2530 } 2531 if (wq->q_syncq != NULL && wq->q_syncq != sq && 2532 wq->q_syncq != SQ(rq)) { 2533 free_syncq(wq->q_syncq); 2534 wq->q_syncq = NULL; 2535 } 2536 ASSERT(rq->q_syncq == NULL || (rq->q_syncq->sq_head == NULL && 2537 rq->q_syncq->sq_tail == NULL)); 2538 ASSERT(wq->q_syncq == NULL || (wq->q_syncq->sq_head == NULL && 2539 wq->q_syncq->sq_tail == NULL)); 2540 2541 if (!(rq->q_flag & QPERMOD) && 2542 rq->q_syncq != NULL && rq->q_syncq->sq_ciputctrl != NULL) { 2543 ASSERT(rq->q_syncq->sq_nciputctrl == n_ciputctrl - 1); 2544 SUMCHECK_CIPUTCTRL_COUNTS(rq->q_syncq->sq_ciputctrl, 2545 rq->q_syncq->sq_nciputctrl, 0); 2546 ASSERT(ciputctrl_cache != NULL); 2547 kmem_cache_free(ciputctrl_cache, rq->q_syncq->sq_ciputctrl); 2548 rq->q_syncq->sq_ciputctrl = NULL; 2549 rq->q_syncq->sq_nciputctrl = 0; 2550 } 2551 2552 if (!(wq->q_flag & QPERMOD) && 2553 wq->q_syncq != NULL && wq->q_syncq->sq_ciputctrl != NULL) { 2554 ASSERT(wq->q_syncq->sq_nciputctrl == n_ciputctrl - 1); 2555 SUMCHECK_CIPUTCTRL_COUNTS(wq->q_syncq->sq_ciputctrl, 2556 wq->q_syncq->sq_nciputctrl, 0); 2557 ASSERT(ciputctrl_cache != NULL); 2558 kmem_cache_free(ciputctrl_cache, wq->q_syncq->sq_ciputctrl); 2559 wq->q_syncq->sq_ciputctrl = NULL; 2560 wq->q_syncq->sq_nciputctrl = 0; 2561 } 2562 2563 sq = SQ(rq); 2564 ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL); 2565 ASSERT(sq->sq_outer == NULL); 2566 ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL); 2567 2568 /* 2569 * Create syncqs based on qflag and sqtype. Set the SQ_TYPES_IN_FLAGS 2570 * bits in sq_flag based on the sqtype. 2571 */ 2572 ASSERT((sq->sq_flags & ~SQ_TYPES_IN_FLAGS) == 0); 2573 2574 rq->q_syncq = wq->q_syncq = sq; 2575 sq->sq_type = sqtype; 2576 sq->sq_flags = (sqtype & SQ_TYPES_IN_FLAGS); 2577 2578 /* 2579 * We are making sq_svcflags zero, 2580 * resetting SQ_DISABLED in case it was set by 2581 * wait_svc() in the munlink path. 2582 * 2583 */ 2584 ASSERT((sq->sq_svcflags & SQ_SERVICE) == 0); 2585 sq->sq_svcflags = 0; 2586 2587 /* 2588 * We need to acquire the lock here for the mlink and munlink case, 2589 * where canputnext, backenable, etc can access the q_flag. 2590 */ 2591 if (lock_needed) { 2592 mutex_enter(QLOCK(rq)); 2593 rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag; 2594 mutex_exit(QLOCK(rq)); 2595 mutex_enter(QLOCK(wq)); 2596 wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag; 2597 mutex_exit(QLOCK(wq)); 2598 } else { 2599 rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag; 2600 wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag; 2601 } 2602 2603 if (qflag & QPERQ) { 2604 /* Allocate a separate syncq for the write side */ 2605 sq = new_syncq(); 2606 sq->sq_type = rq->q_syncq->sq_type; 2607 sq->sq_flags = rq->q_syncq->sq_flags; 2608 ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL && 2609 sq->sq_oprev == NULL); 2610 wq->q_syncq = sq; 2611 } 2612 if (qflag & QPERMOD) { 2613 sq = dmp->dm_sq; 2614 2615 /* 2616 * Assert that we do have an inner perimeter syncq and that it 2617 * does not have an outer perimeter associated with it. 2618 */ 2619 ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL && 2620 sq->sq_oprev == NULL); 2621 rq->q_syncq = wq->q_syncq = sq; 2622 } 2623 if (qflag & QMTOUTPERIM) { 2624 outer = dmp->dm_sq; 2625 2626 ASSERT(outer->sq_outer == NULL); 2627 outer_insert(outer, rq->q_syncq); 2628 if (wq->q_syncq != rq->q_syncq) 2629 outer_insert(outer, wq->q_syncq); 2630 } 2631 ASSERT((rq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) == 2632 (rq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS)); 2633 ASSERT((wq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) == 2634 (wq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS)); 2635 ASSERT((rq->q_flag & QMT_TYPEMASK) == (qflag & QMT_TYPEMASK)); 2636 2637 /* 2638 * Initialize struio() types. 2639 */ 2640 rq->q_struiot = 2641 (rq->q_flag & QSYNCSTR) ? rinit->qi_struiot : STRUIOT_NONE; 2642 wq->q_struiot = 2643 (wq->q_flag & QSYNCSTR) ? winit->qi_struiot : STRUIOT_NONE; 2644 } 2645 2646 perdm_t * 2647 hold_dm(struct streamtab *str, uint32_t qflag, uint32_t sqtype) 2648 { 2649 syncq_t *sq; 2650 perdm_t **pp; 2651 perdm_t *p; 2652 perdm_t *dmp; 2653 2654 ASSERT(str != NULL); 2655 ASSERT(qflag & (QPERMOD | QMTOUTPERIM)); 2656 2657 rw_enter(&perdm_rwlock, RW_READER); 2658 for (p = perdm_list; p != NULL; p = p->dm_next) { 2659 if (p->dm_str == str) { /* found one */ 2660 atomic_add_32(&(p->dm_ref), 1); 2661 rw_exit(&perdm_rwlock); 2662 return (p); 2663 } 2664 } 2665 rw_exit(&perdm_rwlock); 2666 2667 sq = new_syncq(); 2668 if (qflag & QPERMOD) { 2669 sq->sq_type = sqtype | SQ_PERMOD; 2670 sq->sq_flags = sqtype & SQ_TYPES_IN_FLAGS; 2671 } else { 2672 ASSERT(qflag & QMTOUTPERIM); 2673 sq->sq_onext = sq->sq_oprev = sq; 2674 } 2675 2676 dmp = kmem_alloc(sizeof (perdm_t), KM_SLEEP); 2677 dmp->dm_sq = sq; 2678 dmp->dm_str = str; 2679 dmp->dm_ref = 1; 2680 dmp->dm_next = NULL; 2681 2682 rw_enter(&perdm_rwlock, RW_WRITER); 2683 for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) { 2684 if (p->dm_str == str) { /* already present */ 2685 p->dm_ref++; 2686 rw_exit(&perdm_rwlock); 2687 free_syncq(sq); 2688 kmem_free(dmp, sizeof (perdm_t)); 2689 return (p); 2690 } 2691 } 2692 2693 *pp = dmp; 2694 rw_exit(&perdm_rwlock); 2695 return (dmp); 2696 } 2697 2698 void 2699 rele_dm(perdm_t *dmp) 2700 { 2701 perdm_t **pp; 2702 perdm_t *p; 2703 2704 rw_enter(&perdm_rwlock, RW_WRITER); 2705 ASSERT(dmp->dm_ref > 0); 2706 2707 if (--dmp->dm_ref > 0) { 2708 rw_exit(&perdm_rwlock); 2709 return; 2710 } 2711 2712 for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) 2713 if (p == dmp) 2714 break; 2715 ASSERT(p == dmp); 2716 *pp = p->dm_next; 2717 rw_exit(&perdm_rwlock); 2718 2719 /* 2720 * Wait for any background processing that relies on the 2721 * syncq to complete before it is freed. 2722 */ 2723 wait_sq_svc(p->dm_sq); 2724 free_syncq(p->dm_sq); 2725 kmem_free(p, sizeof (perdm_t)); 2726 } 2727 2728 /* 2729 * Make a protocol message given control and data buffers. 2730 * n.b., this can block; be careful of what locks you hold when calling it. 2731 * 2732 * If sd_maxblk is less than *iosize this routine can fail part way through 2733 * (due to an allocation failure). In this case on return *iosize will contain 2734 * the amount that was consumed. Otherwise *iosize will not be modified 2735 * i.e. it will contain the amount that was consumed. 2736 */ 2737 int 2738 strmakemsg( 2739 struct strbuf *mctl, 2740 ssize_t *iosize, 2741 struct uio *uiop, 2742 stdata_t *stp, 2743 int32_t flag, 2744 mblk_t **mpp) 2745 { 2746 mblk_t *mpctl = NULL; 2747 mblk_t *mpdata = NULL; 2748 int error; 2749 2750 ASSERT(uiop != NULL); 2751 2752 *mpp = NULL; 2753 /* Create control part, if any */ 2754 if ((mctl != NULL) && (mctl->len >= 0)) { 2755 error = strmakectl(mctl, flag, uiop->uio_fmode, &mpctl); 2756 if (error) 2757 return (error); 2758 } 2759 /* Create data part, if any */ 2760 if (*iosize >= 0) { 2761 error = strmakedata(iosize, uiop, stp, flag, &mpdata); 2762 if (error) { 2763 freemsg(mpctl); 2764 return (error); 2765 } 2766 } 2767 if (mpctl != NULL) { 2768 if (mpdata != NULL) 2769 linkb(mpctl, mpdata); 2770 *mpp = mpctl; 2771 } else { 2772 *mpp = mpdata; 2773 } 2774 return (0); 2775 } 2776 2777 /* 2778 * Make the control part of a protocol message given a control buffer. 2779 * n.b., this can block; be careful of what locks you hold when calling it. 2780 */ 2781 int 2782 strmakectl( 2783 struct strbuf *mctl, 2784 int32_t flag, 2785 int32_t fflag, 2786 mblk_t **mpp) 2787 { 2788 mblk_t *bp = NULL; 2789 unsigned char msgtype; 2790 int error = 0; 2791 cred_t *cr = CRED(); 2792 2793 /* We do not support interrupt threads using the stream head to send */ 2794 ASSERT(cr != NULL); 2795 2796 *mpp = NULL; 2797 /* 2798 * Create control part of message, if any. 2799 */ 2800 if ((mctl != NULL) && (mctl->len >= 0)) { 2801 caddr_t base; 2802 int ctlcount; 2803 int allocsz; 2804 2805 if (flag & RS_HIPRI) 2806 msgtype = M_PCPROTO; 2807 else 2808 msgtype = M_PROTO; 2809 2810 ctlcount = mctl->len; 2811 base = mctl->buf; 2812 2813 /* 2814 * Give modules a better chance to reuse M_PROTO/M_PCPROTO 2815 * blocks by increasing the size to something more usable. 2816 */ 2817 allocsz = MAX(ctlcount, 64); 2818 2819 /* 2820 * Range checking has already been done; simply try 2821 * to allocate a message block for the ctl part. 2822 */ 2823 while ((bp = allocb_cred(allocsz, cr, 2824 curproc->p_pid)) == NULL) { 2825 if (fflag & (FNDELAY|FNONBLOCK)) 2826 return (EAGAIN); 2827 if (error = strwaitbuf(allocsz, BPRI_MED)) 2828 return (error); 2829 } 2830 2831 bp->b_datap->db_type = msgtype; 2832 if (copyin(base, bp->b_wptr, ctlcount)) { 2833 freeb(bp); 2834 return (EFAULT); 2835 } 2836 bp->b_wptr += ctlcount; 2837 } 2838 *mpp = bp; 2839 return (0); 2840 } 2841 2842 /* 2843 * Make a protocol message given data buffers. 2844 * n.b., this can block; be careful of what locks you hold when calling it. 2845 * 2846 * If sd_maxblk is less than *iosize this routine can fail part way through 2847 * (due to an allocation failure). In this case on return *iosize will contain 2848 * the amount that was consumed. Otherwise *iosize will not be modified 2849 * i.e. it will contain the amount that was consumed. 2850 */ 2851 int 2852 strmakedata( 2853 ssize_t *iosize, 2854 struct uio *uiop, 2855 stdata_t *stp, 2856 int32_t flag, 2857 mblk_t **mpp) 2858 { 2859 mblk_t *mp = NULL; 2860 mblk_t *bp; 2861 int wroff = (int)stp->sd_wroff; 2862 int tail_len = (int)stp->sd_tail; 2863 int extra = wroff + tail_len; 2864 int error = 0; 2865 ssize_t maxblk; 2866 ssize_t count = *iosize; 2867 cred_t *cr; 2868 2869 *mpp = NULL; 2870 if (count < 0) 2871 return (0); 2872 2873 /* We do not support interrupt threads using the stream head to send */ 2874 cr = CRED(); 2875 ASSERT(cr != NULL); 2876 2877 maxblk = stp->sd_maxblk; 2878 if (maxblk == INFPSZ) 2879 maxblk = count; 2880 2881 /* 2882 * Create data part of message, if any. 2883 */ 2884 do { 2885 ssize_t size; 2886 dblk_t *dp; 2887 2888 ASSERT(uiop); 2889 2890 size = MIN(count, maxblk); 2891 2892 while ((bp = allocb_cred(size + extra, cr, 2893 curproc->p_pid)) == NULL) { 2894 error = EAGAIN; 2895 if ((uiop->uio_fmode & (FNDELAY|FNONBLOCK)) || 2896 (error = strwaitbuf(size + extra, BPRI_MED)) != 0) { 2897 if (count == *iosize) { 2898 freemsg(mp); 2899 return (error); 2900 } else { 2901 *iosize -= count; 2902 *mpp = mp; 2903 return (0); 2904 } 2905 } 2906 } 2907 dp = bp->b_datap; 2908 dp->db_cpid = curproc->p_pid; 2909 ASSERT(wroff <= dp->db_lim - bp->b_wptr); 2910 bp->b_wptr = bp->b_rptr = bp->b_rptr + wroff; 2911 2912 if (flag & STRUIO_POSTPONE) { 2913 /* 2914 * Setup the stream uio portion of the 2915 * dblk for subsequent use by struioget(). 2916 */ 2917 dp->db_struioflag = STRUIO_SPEC; 2918 dp->db_cksumstart = 0; 2919 dp->db_cksumstuff = 0; 2920 dp->db_cksumend = size; 2921 *(long long *)dp->db_struioun.data = 0ll; 2922 bp->b_wptr += size; 2923 } else { 2924 if (stp->sd_copyflag & STRCOPYCACHED) 2925 uiop->uio_extflg |= UIO_COPY_CACHED; 2926 2927 if (size != 0) { 2928 error = uiomove(bp->b_wptr, size, UIO_WRITE, 2929 uiop); 2930 if (error != 0) { 2931 freeb(bp); 2932 freemsg(mp); 2933 return (error); 2934 } 2935 } 2936 bp->b_wptr += size; 2937 2938 if (stp->sd_wputdatafunc != NULL) { 2939 mblk_t *newbp; 2940 2941 newbp = (stp->sd_wputdatafunc)(stp->sd_vnode, 2942 bp, NULL, NULL, NULL, NULL); 2943 if (newbp == NULL) { 2944 freeb(bp); 2945 freemsg(mp); 2946 return (ECOMM); 2947 } 2948 bp = newbp; 2949 } 2950 } 2951 2952 count -= size; 2953 2954 if (mp == NULL) 2955 mp = bp; 2956 else 2957 linkb(mp, bp); 2958 } while (count > 0); 2959 2960 *mpp = mp; 2961 return (0); 2962 } 2963 2964 /* 2965 * Wait for a buffer to become available. Return non-zero errno 2966 * if not able to wait, 0 if buffer is probably there. 2967 */ 2968 int 2969 strwaitbuf(size_t size, int pri) 2970 { 2971 bufcall_id_t id; 2972 2973 mutex_enter(&bcall_monitor); 2974 if ((id = bufcall(size, pri, (void (*)(void *))cv_broadcast, 2975 &ttoproc(curthread)->p_flag_cv)) == 0) { 2976 mutex_exit(&bcall_monitor); 2977 return (ENOSR); 2978 } 2979 if (!cv_wait_sig(&(ttoproc(curthread)->p_flag_cv), &bcall_monitor)) { 2980 unbufcall(id); 2981 mutex_exit(&bcall_monitor); 2982 return (EINTR); 2983 } 2984 unbufcall(id); 2985 mutex_exit(&bcall_monitor); 2986 return (0); 2987 } 2988 2989 /* 2990 * This function waits for a read or write event to happen on a stream. 2991 * fmode can specify FNDELAY and/or FNONBLOCK. 2992 * The timeout is in ms with -1 meaning infinite. 2993 * The flag values work as follows: 2994 * READWAIT Check for read side errors, send M_READ 2995 * GETWAIT Check for read side errors, no M_READ 2996 * WRITEWAIT Check for write side errors. 2997 * NOINTR Do not return error if nonblocking or timeout. 2998 * STR_NOERROR Ignore all errors except STPLEX. 2999 * STR_NOSIG Ignore/hold signals during the duration of the call. 3000 * STR_PEEK Pass through the strgeterr(). 3001 */ 3002 int 3003 strwaitq(stdata_t *stp, int flag, ssize_t count, int fmode, clock_t timout, 3004 int *done) 3005 { 3006 int slpflg, errs; 3007 int error; 3008 kcondvar_t *sleepon; 3009 mblk_t *mp; 3010 ssize_t *rd_count; 3011 clock_t rval; 3012 3013 ASSERT(MUTEX_HELD(&stp->sd_lock)); 3014 if ((flag & READWAIT) || (flag & GETWAIT)) { 3015 slpflg = RSLEEP; 3016 sleepon = &_RD(stp->sd_wrq)->q_wait; 3017 errs = STRDERR|STPLEX; 3018 } else { 3019 slpflg = WSLEEP; 3020 sleepon = &stp->sd_wrq->q_wait; 3021 errs = STWRERR|STRHUP|STPLEX; 3022 } 3023 if (flag & STR_NOERROR) 3024 errs = STPLEX; 3025 3026 if (stp->sd_wakeq & slpflg) { 3027 /* 3028 * A strwakeq() is pending, no need to sleep. 3029 */ 3030 stp->sd_wakeq &= ~slpflg; 3031 *done = 0; 3032 return (0); 3033 } 3034 3035 if (stp->sd_flag & errs) { 3036 /* 3037 * Check for errors before going to sleep since the 3038 * caller might not have checked this while holding 3039 * sd_lock. 3040 */ 3041 error = strgeterr(stp, errs, (flag & STR_PEEK)); 3042 if (error != 0) { 3043 *done = 1; 3044 return (error); 3045 } 3046 } 3047 3048 /* 3049 * If any module downstream has requested read notification 3050 * by setting SNDMREAD flag using M_SETOPTS, send a message 3051 * down stream. 3052 */ 3053 if ((flag & READWAIT) && (stp->sd_flag & SNDMREAD)) { 3054 mutex_exit(&stp->sd_lock); 3055 if (!(mp = allocb_wait(sizeof (ssize_t), BPRI_MED, 3056 (flag & STR_NOSIG), &error))) { 3057 mutex_enter(&stp->sd_lock); 3058 *done = 1; 3059 return (error); 3060 } 3061 mp->b_datap->db_type = M_READ; 3062 rd_count = (ssize_t *)mp->b_wptr; 3063 *rd_count = count; 3064 mp->b_wptr += sizeof (ssize_t); 3065 /* 3066 * Send the number of bytes requested by the 3067 * read as the argument to M_READ. 3068 */ 3069 stream_willservice(stp); 3070 putnext(stp->sd_wrq, mp); 3071 stream_runservice(stp); 3072 mutex_enter(&stp->sd_lock); 3073 3074 /* 3075 * If any data arrived due to inline processing 3076 * of putnext(), don't sleep. 3077 */ 3078 if (_RD(stp->sd_wrq)->q_first != NULL) { 3079 *done = 0; 3080 return (0); 3081 } 3082 } 3083 3084 if (fmode & (FNDELAY|FNONBLOCK)) { 3085 if (!(flag & NOINTR)) 3086 error = EAGAIN; 3087 else 3088 error = 0; 3089 *done = 1; 3090 return (error); 3091 } 3092 3093 stp->sd_flag |= slpflg; 3094 TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAIT2, 3095 "strwaitq sleeps (2):%p, %X, %lX, %X, %p", 3096 stp, flag, count, fmode, done); 3097 3098 rval = str_cv_wait(sleepon, &stp->sd_lock, timout, flag & STR_NOSIG); 3099 if (rval > 0) { 3100 /* EMPTY */ 3101 TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAKE2, 3102 "strwaitq awakes(2):%X, %X, %X, %X, %X", 3103 stp, flag, count, fmode, done); 3104 } else if (rval == 0) { 3105 TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_INTR2, 3106 "strwaitq interrupt #2:%p, %X, %lX, %X, %p", 3107 stp, flag, count, fmode, done); 3108 stp->sd_flag &= ~slpflg; 3109 cv_broadcast(sleepon); 3110 if (!(flag & NOINTR)) 3111 error = EINTR; 3112 else 3113 error = 0; 3114 *done = 1; 3115 return (error); 3116 } else { 3117 /* timeout */ 3118 TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_TIME, 3119 "strwaitq timeout:%p, %X, %lX, %X, %p", 3120 stp, flag, count, fmode, done); 3121 *done = 1; 3122 if (!(flag & NOINTR)) 3123 return (ETIME); 3124 else 3125 return (0); 3126 } 3127 /* 3128 * If the caller implements delayed errors (i.e. queued after data) 3129 * we can not check for errors here since data as well as an 3130 * error might have arrived at the stream head. We return to 3131 * have the caller check the read queue before checking for errors. 3132 */ 3133 if ((stp->sd_flag & errs) && !(flag & STR_DELAYERR)) { 3134 error = strgeterr(stp, errs, (flag & STR_PEEK)); 3135 if (error != 0) { 3136 *done = 1; 3137 return (error); 3138 } 3139 } 3140 *done = 0; 3141 return (0); 3142 } 3143 3144 /* 3145 * Perform job control discipline access checks. 3146 * Return 0 for success and the errno for failure. 3147 */ 3148 3149 #define cantsend(p, t, sig) \ 3150 (sigismember(&(p)->p_ignore, sig) || signal_is_blocked((t), sig)) 3151 3152 int 3153 straccess(struct stdata *stp, enum jcaccess mode) 3154 { 3155 extern kcondvar_t lbolt_cv; /* XXX: should be in a header file */ 3156 kthread_t *t = curthread; 3157 proc_t *p = ttoproc(t); 3158 sess_t *sp; 3159 3160 ASSERT(mutex_owned(&stp->sd_lock)); 3161 3162 if (stp->sd_sidp == NULL || stp->sd_vnode->v_type == VFIFO) 3163 return (0); 3164 3165 mutex_enter(&p->p_lock); /* protects p_pgidp */ 3166 3167 for (;;) { 3168 mutex_enter(&p->p_splock); /* protects p->p_sessp */ 3169 sp = p->p_sessp; 3170 mutex_enter(&sp->s_lock); /* protects sp->* */ 3171 3172 /* 3173 * If this is not the calling process's controlling terminal 3174 * or if the calling process is already in the foreground 3175 * then allow access. 3176 */ 3177 if (sp->s_dev != stp->sd_vnode->v_rdev || 3178 p->p_pgidp == stp->sd_pgidp) { 3179 mutex_exit(&sp->s_lock); 3180 mutex_exit(&p->p_splock); 3181 mutex_exit(&p->p_lock); 3182 return (0); 3183 } 3184 3185 /* 3186 * Check to see if controlling terminal has been deallocated. 3187 */ 3188 if (sp->s_vp == NULL) { 3189 if (!cantsend(p, t, SIGHUP)) 3190 sigtoproc(p, t, SIGHUP); 3191 mutex_exit(&sp->s_lock); 3192 mutex_exit(&p->p_splock); 3193 mutex_exit(&p->p_lock); 3194 return (EIO); 3195 } 3196 3197 mutex_exit(&sp->s_lock); 3198 mutex_exit(&p->p_splock); 3199 3200 if (mode == JCGETP) { 3201 mutex_exit(&p->p_lock); 3202 return (0); 3203 } 3204 3205 if (mode == JCREAD) { 3206 if (p->p_detached || cantsend(p, t, SIGTTIN)) { 3207 mutex_exit(&p->p_lock); 3208 return (EIO); 3209 } 3210 mutex_exit(&p->p_lock); 3211 mutex_exit(&stp->sd_lock); 3212 pgsignal(p->p_pgidp, SIGTTIN); 3213 mutex_enter(&stp->sd_lock); 3214 mutex_enter(&p->p_lock); 3215 } else { /* mode == JCWRITE or JCSETP */ 3216 if ((mode == JCWRITE && !(stp->sd_flag & STRTOSTOP)) || 3217 cantsend(p, t, SIGTTOU)) { 3218 mutex_exit(&p->p_lock); 3219 return (0); 3220 } 3221 if (p->p_detached) { 3222 mutex_exit(&p->p_lock); 3223 return (EIO); 3224 } 3225 mutex_exit(&p->p_lock); 3226 mutex_exit(&stp->sd_lock); 3227 pgsignal(p->p_pgidp, SIGTTOU); 3228 mutex_enter(&stp->sd_lock); 3229 mutex_enter(&p->p_lock); 3230 } 3231 3232 /* 3233 * We call cv_wait_sig_swap() to cause the appropriate 3234 * action for the jobcontrol signal to take place. 3235 * If the signal is being caught, we will take the 3236 * EINTR error return. Otherwise, the default action 3237 * of causing the process to stop will take place. 3238 * In this case, we rely on the periodic cv_broadcast() on 3239 * &lbolt_cv to wake us up to loop around and test again. 3240 * We can't get here if the signal is ignored or 3241 * if the current thread is blocking the signal. 3242 */ 3243 mutex_exit(&stp->sd_lock); 3244 if (!cv_wait_sig_swap(&lbolt_cv, &p->p_lock)) { 3245 mutex_exit(&p->p_lock); 3246 mutex_enter(&stp->sd_lock); 3247 return (EINTR); 3248 } 3249 mutex_exit(&p->p_lock); 3250 mutex_enter(&stp->sd_lock); 3251 mutex_enter(&p->p_lock); 3252 } 3253 } 3254 3255 /* 3256 * Return size of message of block type (bp->b_datap->db_type) 3257 */ 3258 size_t 3259 xmsgsize(mblk_t *bp) 3260 { 3261 unsigned char type; 3262 size_t count = 0; 3263 3264 type = bp->b_datap->db_type; 3265 3266 for (; bp; bp = bp->b_cont) { 3267 if (type != bp->b_datap->db_type) 3268 break; 3269 ASSERT(bp->b_wptr >= bp->b_rptr); 3270 count += bp->b_wptr - bp->b_rptr; 3271 } 3272 return (count); 3273 } 3274 3275 /* 3276 * Allocate a stream head. 3277 */ 3278 struct stdata * 3279 shalloc(queue_t *qp) 3280 { 3281 stdata_t *stp; 3282 3283 stp = kmem_cache_alloc(stream_head_cache, KM_SLEEP); 3284 3285 stp->sd_wrq = _WR(qp); 3286 stp->sd_strtab = NULL; 3287 stp->sd_iocid = 0; 3288 stp->sd_mate = NULL; 3289 stp->sd_freezer = NULL; 3290 stp->sd_refcnt = 0; 3291 stp->sd_wakeq = 0; 3292 stp->sd_anchor = 0; 3293 stp->sd_struiowrq = NULL; 3294 stp->sd_struiordq = NULL; 3295 stp->sd_struiodnak = 0; 3296 stp->sd_struionak = NULL; 3297 stp->sd_t_audit_data = NULL; 3298 stp->sd_rput_opt = 0; 3299 stp->sd_wput_opt = 0; 3300 stp->sd_read_opt = 0; 3301 stp->sd_rprotofunc = strrput_proto; 3302 stp->sd_rmiscfunc = strrput_misc; 3303 stp->sd_rderrfunc = stp->sd_wrerrfunc = NULL; 3304 stp->sd_rputdatafunc = stp->sd_wputdatafunc = NULL; 3305 stp->sd_ciputctrl = NULL; 3306 stp->sd_nciputctrl = 0; 3307 stp->sd_qhead = NULL; 3308 stp->sd_qtail = NULL; 3309 stp->sd_servid = NULL; 3310 stp->sd_nqueues = 0; 3311 stp->sd_svcflags = 0; 3312 stp->sd_copyflag = 0; 3313 3314 return (stp); 3315 } 3316 3317 /* 3318 * Free a stream head. 3319 */ 3320 void 3321 shfree(stdata_t *stp) 3322 { 3323 ASSERT(MUTEX_NOT_HELD(&stp->sd_lock)); 3324 3325 stp->sd_wrq = NULL; 3326 3327 mutex_enter(&stp->sd_qlock); 3328 while (stp->sd_svcflags & STRS_SCHEDULED) { 3329 STRSTAT(strwaits); 3330 cv_wait(&stp->sd_qcv, &stp->sd_qlock); 3331 } 3332 mutex_exit(&stp->sd_qlock); 3333 3334 if (stp->sd_ciputctrl != NULL) { 3335 ASSERT(stp->sd_nciputctrl == n_ciputctrl - 1); 3336 SUMCHECK_CIPUTCTRL_COUNTS(stp->sd_ciputctrl, 3337 stp->sd_nciputctrl, 0); 3338 ASSERT(ciputctrl_cache != NULL); 3339 kmem_cache_free(ciputctrl_cache, stp->sd_ciputctrl); 3340 stp->sd_ciputctrl = NULL; 3341 stp->sd_nciputctrl = 0; 3342 } 3343 ASSERT(stp->sd_qhead == NULL); 3344 ASSERT(stp->sd_qtail == NULL); 3345 ASSERT(stp->sd_nqueues == 0); 3346 kmem_cache_free(stream_head_cache, stp); 3347 } 3348 3349 /* 3350 * Allocate a pair of queues and a syncq for the pair 3351 */ 3352 queue_t * 3353 allocq(void) 3354 { 3355 queinfo_t *qip; 3356 queue_t *qp, *wqp; 3357 syncq_t *sq; 3358 3359 qip = kmem_cache_alloc(queue_cache, KM_SLEEP); 3360 3361 qp = &qip->qu_rqueue; 3362 wqp = &qip->qu_wqueue; 3363 sq = &qip->qu_syncq; 3364 3365 qp->q_last = NULL; 3366 qp->q_next = NULL; 3367 qp->q_ptr = NULL; 3368 qp->q_flag = QUSE | QREADR; 3369 qp->q_bandp = NULL; 3370 qp->q_stream = NULL; 3371 qp->q_syncq = sq; 3372 qp->q_nband = 0; 3373 qp->q_nfsrv = NULL; 3374 qp->q_draining = 0; 3375 qp->q_syncqmsgs = 0; 3376 qp->q_spri = 0; 3377 qp->q_qtstamp = 0; 3378 qp->q_sqtstamp = 0; 3379 qp->q_fp = NULL; 3380 3381 wqp->q_last = NULL; 3382 wqp->q_next = NULL; 3383 wqp->q_ptr = NULL; 3384 wqp->q_flag = QUSE; 3385 wqp->q_bandp = NULL; 3386 wqp->q_stream = NULL; 3387 wqp->q_syncq = sq; 3388 wqp->q_nband = 0; 3389 wqp->q_nfsrv = NULL; 3390 wqp->q_draining = 0; 3391 wqp->q_syncqmsgs = 0; 3392 wqp->q_qtstamp = 0; 3393 wqp->q_sqtstamp = 0; 3394 wqp->q_spri = 0; 3395 3396 sq->sq_count = 0; 3397 sq->sq_rmqcount = 0; 3398 sq->sq_flags = 0; 3399 sq->sq_type = 0; 3400 sq->sq_callbflags = 0; 3401 sq->sq_cancelid = 0; 3402 sq->sq_ciputctrl = NULL; 3403 sq->sq_nciputctrl = 0; 3404 sq->sq_needexcl = 0; 3405 sq->sq_svcflags = 0; 3406 3407 return (qp); 3408 } 3409 3410 /* 3411 * Free a pair of queues and the "attached" syncq. 3412 * Discard any messages left on the syncq(s), remove the syncq(s) from the 3413 * outer perimeter, and free the syncq(s) if they are not the "attached" syncq. 3414 */ 3415 void 3416 freeq(queue_t *qp) 3417 { 3418 qband_t *qbp, *nqbp; 3419 syncq_t *sq, *outer; 3420 queue_t *wqp = _WR(qp); 3421 3422 ASSERT(qp->q_flag & QREADR); 3423 3424 /* 3425 * If a previously dispatched taskq job is scheduled to run 3426 * sync_service() or a service routine is scheduled for the 3427 * queues about to be freed, wait here until all service is 3428 * done on the queue and all associated queues and syncqs. 3429 */ 3430 wait_svc(qp); 3431 3432 (void) flush_syncq(qp->q_syncq, qp); 3433 (void) flush_syncq(wqp->q_syncq, wqp); 3434 ASSERT(qp->q_syncqmsgs == 0 && wqp->q_syncqmsgs == 0); 3435 3436 /* 3437 * Flush the queues before q_next is set to NULL This is needed 3438 * in order to backenable any downstream queue before we go away. 3439 * Note: we are already removed from the stream so that the 3440 * backenabling will not cause any messages to be delivered to our 3441 * put procedures. 3442 */ 3443 flushq(qp, FLUSHALL); 3444 flushq(wqp, FLUSHALL); 3445 3446 /* Tidy up - removeq only does a half-remove from stream */ 3447 qp->q_next = wqp->q_next = NULL; 3448 ASSERT(!(qp->q_flag & QENAB)); 3449 ASSERT(!(wqp->q_flag & QENAB)); 3450 3451 outer = qp->q_syncq->sq_outer; 3452 if (outer != NULL) { 3453 outer_remove(outer, qp->q_syncq); 3454 if (wqp->q_syncq != qp->q_syncq) 3455 outer_remove(outer, wqp->q_syncq); 3456 } 3457 /* 3458 * Free any syncqs that are outside what allocq returned. 3459 */ 3460 if (qp->q_syncq != SQ(qp) && !(qp->q_flag & QPERMOD)) 3461 free_syncq(qp->q_syncq); 3462 if (qp->q_syncq != wqp->q_syncq && wqp->q_syncq != SQ(qp)) 3463 free_syncq(wqp->q_syncq); 3464 3465 ASSERT((qp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0); 3466 ASSERT((wqp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0); 3467 ASSERT(MUTEX_NOT_HELD(QLOCK(qp))); 3468 ASSERT(MUTEX_NOT_HELD(QLOCK(wqp))); 3469 sq = SQ(qp); 3470 ASSERT(MUTEX_NOT_HELD(SQLOCK(sq))); 3471 ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL); 3472 ASSERT(sq->sq_outer == NULL); 3473 ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL); 3474 ASSERT(sq->sq_callbpend == NULL); 3475 ASSERT(sq->sq_needexcl == 0); 3476 3477 if (sq->sq_ciputctrl != NULL) { 3478 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1); 3479 SUMCHECK_CIPUTCTRL_COUNTS(sq->sq_ciputctrl, 3480 sq->sq_nciputctrl, 0); 3481 ASSERT(ciputctrl_cache != NULL); 3482 kmem_cache_free(ciputctrl_cache, sq->sq_ciputctrl); 3483 sq->sq_ciputctrl = NULL; 3484 sq->sq_nciputctrl = 0; 3485 } 3486 3487 ASSERT(qp->q_first == NULL && wqp->q_first == NULL); 3488 ASSERT(qp->q_count == 0 && wqp->q_count == 0); 3489 ASSERT(qp->q_mblkcnt == 0 && wqp->q_mblkcnt == 0); 3490 3491 qp->q_flag &= ~QUSE; 3492 wqp->q_flag &= ~QUSE; 3493 3494 /* NOTE: Uncomment the assert below once bugid 1159635 is fixed. */ 3495 /* ASSERT((qp->q_flag & QWANTW) == 0 && (wqp->q_flag & QWANTW) == 0); */ 3496 3497 qbp = qp->q_bandp; 3498 while (qbp) { 3499 nqbp = qbp->qb_next; 3500 freeband(qbp); 3501 qbp = nqbp; 3502 } 3503 qbp = wqp->q_bandp; 3504 while (qbp) { 3505 nqbp = qbp->qb_next; 3506 freeband(qbp); 3507 qbp = nqbp; 3508 } 3509 kmem_cache_free(queue_cache, qp); 3510 } 3511 3512 /* 3513 * Allocate a qband structure. 3514 */ 3515 qband_t * 3516 allocband(void) 3517 { 3518 qband_t *qbp; 3519 3520 qbp = kmem_cache_alloc(qband_cache, KM_NOSLEEP); 3521 if (qbp == NULL) 3522 return (NULL); 3523 3524 qbp->qb_next = NULL; 3525 qbp->qb_count = 0; 3526 qbp->qb_mblkcnt = 0; 3527 qbp->qb_first = NULL; 3528 qbp->qb_last = NULL; 3529 qbp->qb_flag = 0; 3530 3531 return (qbp); 3532 } 3533 3534 /* 3535 * Free a qband structure. 3536 */ 3537 void 3538 freeband(qband_t *qbp) 3539 { 3540 kmem_cache_free(qband_cache, qbp); 3541 } 3542 3543 /* 3544 * Just like putnextctl(9F), except that allocb_wait() is used. 3545 * 3546 * Consolidation Private, and of course only callable from the stream head or 3547 * routines that may block. 3548 */ 3549 int 3550 putnextctl_wait(queue_t *q, int type) 3551 { 3552 mblk_t *bp; 3553 int error; 3554 3555 if ((datamsg(type) && (type != M_DELAY)) || 3556 (bp = allocb_wait(0, BPRI_HI, 0, &error)) == NULL) 3557 return (0); 3558 3559 bp->b_datap->db_type = (unsigned char)type; 3560 putnext(q, bp); 3561 return (1); 3562 } 3563 3564 /* 3565 * Run any possible bufcalls. 3566 */ 3567 void 3568 runbufcalls(void) 3569 { 3570 strbufcall_t *bcp; 3571 3572 mutex_enter(&bcall_monitor); 3573 mutex_enter(&strbcall_lock); 3574 3575 if (strbcalls.bc_head) { 3576 size_t count; 3577 int nevent; 3578 3579 /* 3580 * count how many events are on the list 3581 * now so we can check to avoid looping 3582 * in low memory situations 3583 */ 3584 nevent = 0; 3585 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) 3586 nevent++; 3587 3588 /* 3589 * get estimate of available memory from kmem_avail(). 3590 * awake all bufcall functions waiting for 3591 * memory whose request could be satisfied 3592 * by 'count' memory and let 'em fight for it. 3593 */ 3594 count = kmem_avail(); 3595 while ((bcp = strbcalls.bc_head) != NULL && nevent) { 3596 STRSTAT(bufcalls); 3597 --nevent; 3598 if (bcp->bc_size <= count) { 3599 bcp->bc_executor = curthread; 3600 mutex_exit(&strbcall_lock); 3601 (*bcp->bc_func)(bcp->bc_arg); 3602 mutex_enter(&strbcall_lock); 3603 bcp->bc_executor = NULL; 3604 cv_broadcast(&bcall_cv); 3605 strbcalls.bc_head = bcp->bc_next; 3606 kmem_free(bcp, sizeof (strbufcall_t)); 3607 } else { 3608 /* 3609 * too big, try again later - note 3610 * that nevent was decremented above 3611 * so we won't retry this one on this 3612 * iteration of the loop 3613 */ 3614 if (bcp->bc_next != NULL) { 3615 strbcalls.bc_head = bcp->bc_next; 3616 bcp->bc_next = NULL; 3617 strbcalls.bc_tail->bc_next = bcp; 3618 strbcalls.bc_tail = bcp; 3619 } 3620 } 3621 } 3622 if (strbcalls.bc_head == NULL) 3623 strbcalls.bc_tail = NULL; 3624 } 3625 3626 mutex_exit(&strbcall_lock); 3627 mutex_exit(&bcall_monitor); 3628 } 3629 3630 3631 /* 3632 * Actually run queue's service routine. 3633 */ 3634 static void 3635 runservice(queue_t *q) 3636 { 3637 qband_t *qbp; 3638 3639 ASSERT(q->q_qinfo->qi_srvp); 3640 again: 3641 entersq(q->q_syncq, SQ_SVC); 3642 TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_START, 3643 "runservice starts:%p", q); 3644 3645 if (!(q->q_flag & QWCLOSE)) 3646 (*q->q_qinfo->qi_srvp)(q); 3647 3648 TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_END, 3649 "runservice ends:(%p)", q); 3650 3651 leavesq(q->q_syncq, SQ_SVC); 3652 3653 mutex_enter(QLOCK(q)); 3654 if (q->q_flag & QENAB) { 3655 q->q_flag &= ~QENAB; 3656 mutex_exit(QLOCK(q)); 3657 goto again; 3658 } 3659 q->q_flag &= ~QINSERVICE; 3660 q->q_flag &= ~QBACK; 3661 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) 3662 qbp->qb_flag &= ~QB_BACK; 3663 /* 3664 * Wakeup thread waiting for the service procedure 3665 * to be run (strclose and qdetach). 3666 */ 3667 cv_broadcast(&q->q_wait); 3668 3669 mutex_exit(QLOCK(q)); 3670 } 3671 3672 /* 3673 * Background processing of bufcalls. 3674 */ 3675 void 3676 streams_bufcall_service(void) 3677 { 3678 callb_cpr_t cprinfo; 3679 3680 CALLB_CPR_INIT(&cprinfo, &strbcall_lock, callb_generic_cpr, 3681 "streams_bufcall_service"); 3682 3683 mutex_enter(&strbcall_lock); 3684 3685 for (;;) { 3686 if (strbcalls.bc_head != NULL && kmem_avail() > 0) { 3687 mutex_exit(&strbcall_lock); 3688 runbufcalls(); 3689 mutex_enter(&strbcall_lock); 3690 } 3691 if (strbcalls.bc_head != NULL) { 3692 clock_t wt, tick; 3693 3694 STRSTAT(bcwaits); 3695 /* Wait for memory to become available */ 3696 CALLB_CPR_SAFE_BEGIN(&cprinfo); 3697 tick = SEC_TO_TICK(60); 3698 time_to_wait(&wt, tick); 3699 (void) cv_timedwait(&memavail_cv, &strbcall_lock, wt); 3700 CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock); 3701 } 3702 3703 /* Wait for new work to arrive */ 3704 if (strbcalls.bc_head == NULL) { 3705 CALLB_CPR_SAFE_BEGIN(&cprinfo); 3706 cv_wait(&strbcall_cv, &strbcall_lock); 3707 CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock); 3708 } 3709 } 3710 } 3711 3712 /* 3713 * Background processing of streams background tasks which failed 3714 * taskq_dispatch. 3715 */ 3716 static void 3717 streams_qbkgrnd_service(void) 3718 { 3719 callb_cpr_t cprinfo; 3720 queue_t *q; 3721 3722 CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr, 3723 "streams_bkgrnd_service"); 3724 3725 mutex_enter(&service_queue); 3726 3727 for (;;) { 3728 /* 3729 * Wait for work to arrive. 3730 */ 3731 while ((freebs_list == NULL) && (qhead == NULL)) { 3732 CALLB_CPR_SAFE_BEGIN(&cprinfo); 3733 cv_wait(&services_to_run, &service_queue); 3734 CALLB_CPR_SAFE_END(&cprinfo, &service_queue); 3735 } 3736 /* 3737 * Handle all pending freebs requests to free memory. 3738 */ 3739 while (freebs_list != NULL) { 3740 mblk_t *mp = freebs_list; 3741 freebs_list = mp->b_next; 3742 mutex_exit(&service_queue); 3743 mblk_free(mp); 3744 mutex_enter(&service_queue); 3745 } 3746 /* 3747 * Run pending queues. 3748 */ 3749 while (qhead != NULL) { 3750 DQ(q, qhead, qtail, q_link); 3751 ASSERT(q != NULL); 3752 mutex_exit(&service_queue); 3753 queue_service(q); 3754 mutex_enter(&service_queue); 3755 } 3756 ASSERT(qhead == NULL && qtail == NULL); 3757 } 3758 } 3759 3760 /* 3761 * Background processing of streams background tasks which failed 3762 * taskq_dispatch. 3763 */ 3764 static void 3765 streams_sqbkgrnd_service(void) 3766 { 3767 callb_cpr_t cprinfo; 3768 syncq_t *sq; 3769 3770 CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr, 3771 "streams_sqbkgrnd_service"); 3772 3773 mutex_enter(&service_queue); 3774 3775 for (;;) { 3776 /* 3777 * Wait for work to arrive. 3778 */ 3779 while (sqhead == NULL) { 3780 CALLB_CPR_SAFE_BEGIN(&cprinfo); 3781 cv_wait(&syncqs_to_run, &service_queue); 3782 CALLB_CPR_SAFE_END(&cprinfo, &service_queue); 3783 } 3784 3785 /* 3786 * Run pending syncqs. 3787 */ 3788 while (sqhead != NULL) { 3789 DQ(sq, sqhead, sqtail, sq_next); 3790 ASSERT(sq != NULL); 3791 ASSERT(sq->sq_svcflags & SQ_BGTHREAD); 3792 mutex_exit(&service_queue); 3793 syncq_service(sq); 3794 mutex_enter(&service_queue); 3795 } 3796 } 3797 } 3798 3799 /* 3800 * Disable the syncq and wait for background syncq processing to complete. 3801 * If the syncq is placed on the sqhead/sqtail queue, try to remove it from the 3802 * list. 3803 */ 3804 void 3805 wait_sq_svc(syncq_t *sq) 3806 { 3807 mutex_enter(SQLOCK(sq)); 3808 sq->sq_svcflags |= SQ_DISABLED; 3809 if (sq->sq_svcflags & SQ_BGTHREAD) { 3810 syncq_t *sq_chase; 3811 syncq_t *sq_curr; 3812 int removed; 3813 3814 ASSERT(sq->sq_servcount == 1); 3815 mutex_enter(&service_queue); 3816 RMQ(sq, sqhead, sqtail, sq_next, sq_chase, sq_curr, removed); 3817 mutex_exit(&service_queue); 3818 if (removed) { 3819 sq->sq_svcflags &= ~SQ_BGTHREAD; 3820 sq->sq_servcount = 0; 3821 STRSTAT(sqremoved); 3822 goto done; 3823 } 3824 } 3825 while (sq->sq_servcount != 0) { 3826 sq->sq_flags |= SQ_WANTWAKEUP; 3827 cv_wait(&sq->sq_wait, SQLOCK(sq)); 3828 } 3829 done: 3830 mutex_exit(SQLOCK(sq)); 3831 } 3832 3833 /* 3834 * Put a syncq on the list of syncq's to be serviced by the sqthread. 3835 * Add the argument to the end of the sqhead list and set the flag 3836 * indicating this syncq has been enabled. If it has already been 3837 * enabled, don't do anything. 3838 * This routine assumes that SQLOCK is held. 3839 * NOTE that the lock order is to have the SQLOCK first, 3840 * so if the service_syncq lock is held, we need to release it 3841 * before acquiring the SQLOCK (mostly relevant for the background 3842 * thread, and this seems to be common among the STREAMS global locks). 3843 * Note that the sq_svcflags are protected by the SQLOCK. 3844 */ 3845 void 3846 sqenable(syncq_t *sq) 3847 { 3848 /* 3849 * This is probably not important except for where I believe it 3850 * is being called. At that point, it should be held (and it 3851 * is a pain to release it just for this routine, so don't do