Home | History | Annotate | Download | only in io
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License, Version 1.0 only
      6  * (the "License").  You may not use this file except in compliance
      7  * with the License.
      8  *
      9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
     10  * or http://www.opensolaris.org/os/licensing.
     11  * See the License for the specific language governing permissions
     12  * and limitations under the License.
     13  *
     14  * When distributing Covered Code, include this CDDL HEADER in each
     15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     16  * If applicable, add the following below this CDDL HEADER, with the
     17  * fields enclosed by brackets "[]" replaced with your own identifying
     18  * information: Portions Copyright [yyyy] [name of copyright owner]
     19  *
     20  * CDDL HEADER END
     21  */
     22 /*
     23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     28 
     29 /*
     30  * STREAMS Buffering module
     31  *
     32  * This streams module collects incoming messages from modules below
     33  * it on the stream and buffers them up into a smaller number of
     34  * aggregated messages.  Its main purpose is to reduce overhead by
     35  * cutting down on the number of read (or getmsg) calls its client
     36  * user process makes.
     37  *  - only M_DATA is buffered.
     38  *  - multithreading assumes configured as D_MTQPAIR
     39  *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
     40  *    allocation fails.
     41  *  - in order message transmission. This is enforced for messages other
     42  *    than high priority messages.
     43  *  - zero length messages on the read side are not passed up the
     44  *    stream but used internally for synchronization.
     45  * FLAGS:
     46  * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
     47  *   (conversion is the default for backwards compatibility
     48  *    hence the negative logic).
     49  * - SB_NO_HEADER - no headers in buffered data.
     50  *   (adding headers is the default for backwards compatibility
     51  *    hence the negative logic).
     52  * - SB_DEFER_CHUNK - provides improved response time in question-answer
     53  *   applications. Buffering is not enabled until the second message
     54  *   is received on the read side within the sb_ticks interval.
     55  *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
     56  * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
     57  *   data being immediately sent upstream.
     58  * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
     59  *   the blocked flow condition downstream. If this flag is clear (default)
     60  *   messages will be dropped if the upstream flow is blocked.
     61  */
     62 
     63 
     64 #include	<sys/types.h>
     65 #include	<sys/errno.h>
     66 #include	<sys/debug.h>
     67 #include	<sys/stropts.h>
     68 #include	<sys/time.h>
     69 #include	<sys/stream.h>
     70 #include	<sys/conf.h>
     71 #include	<sys/ddi.h>
     72 #include	<sys/sunddi.h>
     73 #include	<sys/kmem.h>
     74 #include	<sys/strsun.h>
     75 #include	<sys/bufmod.h>
     76 #include	<sys/modctl.h>
     77 #include	<sys/isa_defs.h>
     78 
     79 /*
     80  * Per-Stream state information.
     81  *
     82  * If sb_ticks is negative, we don't deliver chunks until they're
     83  * full.  If it's zero, we deliver every packet as it arrives.  (In
     84  * this case we force sb_chunk to zero, to make the implementation
     85  * easier.)  Otherwise, sb_ticks gives the number of ticks in a
     86  * buffering interval. The interval begins when the a read side data
     87  * message is received and a timeout is not active. If sb_snap is
     88  * zero, no truncation of the msg is done.
     89  */
     90 struct sb {
     91 	queue_t	*sb_rq;		/* our rq */
     92 	mblk_t	*sb_mp;		/* partial chunk */
     93 	mblk_t  *sb_head;	/* pre-allocated space for the next header */
     94 	mblk_t	*sb_tail;	/* first mblk of last message appended */
     95 	uint_t	sb_mlen;	/* sb_mp length */
     96 	uint_t	sb_mcount;	/* input msg count in sb_mp */
     97 	uint_t	sb_chunk;	/* max chunk size */
     98 	clock_t	sb_ticks;	/* timeout interval */
     99 	timeout_id_t sb_timeoutid; /* qtimeout() id */
    100 	uint_t	sb_drops;	/* cumulative # discarded msgs */
    101 	uint_t	sb_snap;	/* snapshot length */
    102 	uint_t	sb_flags;	/* flags field */
    103 	uint_t	sb_state;	/* state variable */
    104 };
    105 
    106 /*
    107  * Function prototypes.
    108  */
    109 static	int	sbopen(queue_t *, dev_t *, int, int, cred_t *);
    110 static	int	sbclose(queue_t *, int, cred_t *);
    111 static	void	sbwput(queue_t *, mblk_t *);
    112 static	void	sbrput(queue_t *, mblk_t *);
    113 static	void	sbrsrv(queue_t *);
    114 static	void	sbioctl(queue_t *, mblk_t *);
    115 static	void	sbaddmsg(queue_t *, mblk_t *);
    116 static	void	sbtick(void *);
    117 static	void	sbclosechunk(struct sb *);
    118 static	void	sbsendit(queue_t *, mblk_t *);
    119 
    120 static struct module_info	sb_minfo = {
    121 	21,		/* mi_idnum */
    122 	"bufmod",	/* mi_idname */
    123 	0,		/* mi_minpsz */
    124 	INFPSZ,		/* mi_maxpsz */
    125 	1,		/* mi_hiwat */
    126 	0		/* mi_lowat */
    127 };
    128 
    129 static struct qinit	sb_rinit = {
    130 	(int (*)())sbrput,	/* qi_putp */
    131 	(int (*)())sbrsrv,	/* qi_srvp */
    132 	sbopen,			/* qi_qopen */
    133 	sbclose,		/* qi_qclose */
    134 	NULL,			/* qi_qadmin */
    135 	&sb_minfo,		/* qi_minfo */
    136 	NULL			/* qi_mstat */
    137 };
    138 
    139 static struct qinit	sb_winit = {
    140 	(int (*)())sbwput,	/* qi_putp */
    141 	NULL,			/* qi_srvp */
    142 	NULL,			/* qi_qopen */
    143 	NULL,			/* qi_qclose */
    144 	NULL,			/* qi_qadmin */
    145 	&sb_minfo,		/* qi_minfo */
    146 	NULL			/* qi_mstat */
    147 };
    148 
    149 static struct streamtab	sb_info = {
    150 	&sb_rinit,	/* st_rdinit */
    151 	&sb_winit,	/* st_wrinit */
    152 	NULL,		/* st_muxrinit */
    153 	NULL		/* st_muxwinit */
    154 };
    155 
    156 
    157 /*
    158  * This is the loadable module wrapper.
    159  */
    160 
    161 static struct fmodsw fsw = {
    162 	"bufmod",
    163 	&sb_info,
    164 	D_MTQPAIR | D_MP
    165 };
    166 
    167 /*
    168  * Module linkage information for the kernel.
    169  */
    170 
    171 static struct modlstrmod modlstrmod = {
    172 	&mod_strmodops, "streams buffer mod", &fsw
    173 };
    174 
    175 static struct modlinkage modlinkage = {
    176 	MODREV_1, &modlstrmod, NULL
    177 };
    178 
    179 
    180 int
    181 _init(void)
    182 {
    183 	return (mod_install(&modlinkage));
    184 }
    185 
    186 int
    187 _fini(void)
    188 {
    189 	return (mod_remove(&modlinkage));
    190 }
    191 
    192 int
    193 _info(struct modinfo *modinfop)
    194 {
    195 	return (mod_info(&modlinkage, modinfop));
    196 }
    197 
    198 
    199 /* ARGSUSED */
    200 static int
    201 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
    202 {
    203 	struct sb	*sbp;
    204 	ASSERT(rq);
    205 
    206 	if (sflag != MODOPEN)
    207 		return (EINVAL);
    208 
    209 	if (rq->q_ptr)
    210 		return (0);
    211 
    212 	/*
    213 	 * Allocate and initialize per-Stream structure.
    214 	 */
    215 	sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
    216 	sbp->sb_rq = rq;
    217 	sbp->sb_ticks = -1;
    218 	sbp->sb_chunk = SB_DFLT_CHUNK;
    219 	sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
    220 	sbp->sb_mlen = 0;
    221 	sbp->sb_mcount = 0;
    222 	sbp->sb_timeoutid = 0;
    223 	sbp->sb_drops = 0;
    224 	sbp->sb_snap = 0;
    225 	sbp->sb_flags = 0;
    226 	sbp->sb_state = 0;
    227 
    228 	rq->q_ptr = WR(rq)->q_ptr = sbp;
    229 
    230 	qprocson(rq);
    231 
    232 
    233 	return (0);
    234 }
    235 
    236 /* ARGSUSED1 */
    237 static int
    238 sbclose(queue_t *rq, int flag, cred_t *credp)
    239 {
    240 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
    241 
    242 	ASSERT(sbp);
    243 
    244 	qprocsoff(rq);
    245 	/*
    246 	 * Cancel an outstanding timeout
    247 	 */
    248 	if (sbp->sb_timeoutid != 0) {
    249 		(void) quntimeout(rq, sbp->sb_timeoutid);
    250 		sbp->sb_timeoutid = 0;
    251 	}
    252 	/*
    253 	 * Free the current chunk.
    254 	 */
    255 	if (sbp->sb_mp) {
    256 		freemsg(sbp->sb_mp);
    257 		sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
    258 		sbp->sb_mlen = 0;
    259 	}
    260 
    261 	/*
    262 	 * Free the per-Stream structure.
    263 	 */
    264 	kmem_free((caddr_t)sbp, sizeof (struct sb));
    265 	rq->q_ptr = WR(rq)->q_ptr = NULL;
    266 
    267 	return (0);
    268 }
    269 
    270 /*
    271  * the correction factor is introduced to compensate for
    272  * whatever assumptions the modules below have made about
    273  * how much traffic is flowing through the stream and the fact
    274  * that bufmod may be snipping messages with the sb_snap length.
    275  */
    276 #define	SNIT_HIWAT(msgsize, fudge)	((4 * msgsize * fudge) + 512)
    277 #define	SNIT_LOWAT(msgsize, fudge)	((2 * msgsize * fudge) + 256)
    278 
    279 
    280 static void
    281 sbioc(queue_t *wq, mblk_t *mp)
    282 {
    283 	struct iocblk *iocp;
    284 	struct sb *sbp = (struct sb *)wq->q_ptr;
    285 	clock_t	ticks;
    286 	mblk_t	*mop;
    287 
    288 	iocp = (struct iocblk *)mp->b_rptr;
    289 
    290 	switch (iocp->ioc_cmd) {
    291 	case SBIOCGCHUNK:
    292 	case SBIOCGSNAP:
    293 	case SBIOCGFLAGS:
    294 	case SBIOCGTIME:
    295 		miocack(wq, mp, 0, 0);
    296 		return;
    297 
    298 	case SBIOCSTIME:
    299 #ifdef _SYSCALL32_IMPL
    300 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
    301 			struct timeval32 *t32;
    302 
    303 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
    304 			if (t32->tv_sec < 0 || t32->tv_usec < 0) {
    305 				miocnak(wq, mp, 0, EINVAL);
    306 				break;
    307 			}
    308 			ticks = TIMEVAL_TO_TICK(t32);
    309 		} else
    310 #endif /* _SYSCALL32_IMPL */
    311 		{
    312 			struct timeval *tb;
    313 
    314 			tb = (struct timeval *)mp->b_cont->b_rptr;
    315 
    316 			if (tb->tv_sec < 0 || tb->tv_usec < 0) {
    317 				miocnak(wq, mp, 0, EINVAL);
    318 				break;
    319 			}
    320 			ticks = TIMEVAL_TO_TICK(tb);
    321 		}
    322 		sbp->sb_ticks = ticks;
    323 		if (ticks == 0)
    324 			sbp->sb_chunk = 0;
    325 		miocack(wq, mp, 0, 0);
    326 		sbclosechunk(sbp);
    327 		return;
    328 
    329 	case SBIOCSCHUNK:
    330 		/*
    331 		 * set up hi/lo water marks on stream head read queue.
    332 		 * unlikely to run out of resources. Fix at later date.
    333 		 */
    334 		if ((mop = allocb(sizeof (struct stroptions),
    335 		    BPRI_MED)) != NULL) {
    336 			struct stroptions *sop;
    337 			uint_t chunk;
    338 
    339 			chunk = *(uint_t *)mp->b_cont->b_rptr;
    340 			mop->b_datap->db_type = M_SETOPTS;
    341 			mop->b_wptr += sizeof (struct stroptions);
    342 			sop = (struct stroptions *)mop->b_rptr;
    343 			sop->so_flags = SO_HIWAT | SO_LOWAT;
    344 			sop->so_hiwat = SNIT_HIWAT(chunk, 1);
    345 			sop->so_lowat = SNIT_LOWAT(chunk, 1);
    346 			qreply(wq, mop);
    347 		}
    348 
    349 		sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
    350 		miocack(wq, mp, 0, 0);
    351 		sbclosechunk(sbp);
    352 		return;
    353 
    354 	case SBIOCSFLAGS:
    355 		sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
    356 		miocack(wq, mp, 0, 0);
    357 		return;
    358 
    359 	case SBIOCSSNAP:
    360 		/*
    361 		 * if chunking dont worry about effects of
    362 		 * snipping of message size on head flow control
    363 		 * since it has a relatively small bearing on the
    364 		 * data rate onto the streamn head.
    365 		 */
    366 		if (!sbp->sb_chunk) {
    367 			/*
    368 			 * set up hi/lo water marks on stream head read queue.
    369 			 * unlikely to run out of resources. Fix at later date.
    370 			 */
    371 			if ((mop = allocb(sizeof (struct stroptions),
    372 			    BPRI_MED)) != NULL) {
    373 				struct stroptions *sop;
    374 				uint_t snap;
    375 				int fudge;
    376 
    377 				snap = *(uint_t *)mp->b_cont->b_rptr;
    378 				mop->b_datap->db_type = M_SETOPTS;
    379 				mop->b_wptr += sizeof (struct stroptions);
    380 				sop = (struct stroptions *)mop->b_rptr;
    381 				sop->so_flags = SO_HIWAT | SO_LOWAT;
    382 				fudge = snap <= 100 ?   4 :
    383 				    snap <= 400 ?   2 :
    384 				    1;
    385 				sop->so_hiwat = SNIT_HIWAT(snap, fudge);
    386 				sop->so_lowat = SNIT_LOWAT(snap, fudge);
    387 				qreply(wq, mop);
    388 			}
    389 		}
    390 
    391 		sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
    392 		miocack(wq, mp, 0, 0);
    393 		return;
    394 
    395 	default:
    396 		ASSERT(0);
    397 		return;
    398 	}
    399 }
    400 
    401 /*
    402  * Write-side put procedure.  Its main task is to detect ioctls
    403  * for manipulating the buffering state and hand them to sbioctl.
    404  * Other message types are passed on through.
    405  */
    406 static void
    407 sbwput(queue_t *wq, mblk_t *mp)
    408 {
    409 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
    410 	struct copyresp *resp;
    411 
    412 	if (sbp->sb_flags & SB_SEND_ON_WRITE)
    413 		sbclosechunk(sbp);
    414 	switch (mp->b_datap->db_type) {
    415 	case M_IOCTL:
    416 		sbioctl(wq, mp);
    417 		break;
    418 
    419 	case M_IOCDATA:
    420 		resp = (struct copyresp *)mp->b_rptr;
    421 		if (resp->cp_rval) {
    422 			/*
    423 			 * Just free message on failure.
    424 			 */
    425 			freemsg(mp);
    426 			break;
    427 		}
    428 
    429 		switch (resp->cp_cmd) {
    430 		case SBIOCSTIME:
    431 		case SBIOCSCHUNK:
    432 		case SBIOCSFLAGS:
    433 		case SBIOCSSNAP:
    434 		case SBIOCGTIME:
    435 		case SBIOCGCHUNK:
    436 		case SBIOCGSNAP:
    437 		case SBIOCGFLAGS:
    438 			sbioc(wq, mp);
    439 			break;
    440 
    441 		default:
    442 			putnext(wq, mp);
    443 			break;
    444 		}
    445 		break;
    446 
    447 	default:
    448 		putnext(wq, mp);
    449 		break;
    450 	}
    451 }
    452 
    453 /*
    454  * Read-side put procedure.  It's responsible for buffering up incoming
    455  * messages and grouping them into aggregates according to the current
    456  * buffering parameters.
    457  */
    458 static void
    459 sbrput(queue_t *rq, mblk_t *mp)
    460 {
    461 	struct	sb	*sbp = (struct sb *)rq->q_ptr;
    462 
    463 	ASSERT(sbp);
    464 
    465 	switch (mp->b_datap->db_type) {
    466 	case M_PROTO:
    467 		if (sbp->sb_flags & SB_NO_PROTO_CVT) {
    468 			sbclosechunk(sbp);
    469 			sbsendit(rq, mp);
    470 			break;
    471 		} else {
    472 			/*
    473 			 * Convert M_PROTO to M_DATA.
    474 			 */
    475 			mp->b_datap->db_type = M_DATA;
    476 		}
    477 		/* FALLTHRU */
    478 
    479 	case M_DATA:
    480 		if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
    481 		    !(sbp->sb_state & SB_FRCVD)) {
    482 			sbclosechunk(sbp);
    483 			sbsendit(rq, mp);
    484 			sbp->sb_state |= SB_FRCVD;
    485 		} else
    486 			sbaddmsg(rq, mp);
    487 
    488 		if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
    489 			sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
    490 			    sbp, sbp->sb_ticks);
    491 
    492 		break;
    493 
    494 	case M_FLUSH:
    495 		if (*mp->b_rptr & FLUSHR) {
    496 			/*
    497 			 * Reset timeout, flush the chunk currently in
    498 			 * progress, and start a new chunk.
    499 			 */
    500 			if (sbp->sb_timeoutid) {
    501 				(void) quntimeout(sbp->sb_rq,
    502 				    sbp->sb_timeoutid);
    503 				sbp->sb_timeoutid = 0;
    504 			}
    505 			if (sbp->sb_mp) {
    506 				freemsg(sbp->sb_mp);
    507 				sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
    508 				sbp->sb_mlen = 0;
    509 				sbp->sb_mcount = 0;
    510 			}
    511 			flushq(rq, FLUSHALL);
    512 		}
    513 		putnext(rq, mp);
    514 		break;
    515 
    516 	case M_CTL:
    517 		/*
    518 		 * Zero-length M_CTL means our timeout() popped.
    519 		 */
    520 		if (MBLKL(mp) == 0) {
    521 			freemsg(mp);
    522 			sbclosechunk(sbp);
    523 		} else {
    524 			sbclosechunk(sbp);
    525 			sbsendit(rq, mp);
    526 		}
    527 		break;
    528 
    529 	default:
    530 		if (mp->b_datap->db_type <= QPCTL) {
    531 			sbclosechunk(sbp);
    532 			sbsendit(rq, mp);
    533 		} else {
    534 			/* Note: out of band */
    535 			putnext(rq, mp);
    536 		}
    537 		break;
    538 	}
    539 }
    540 
    541 /*
    542  *  read service procedure.
    543  */
    544 /* ARGSUSED */
    545 static void
    546 sbrsrv(queue_t *rq)
    547 {
    548 	mblk_t	*mp;
    549 
    550 	/*
    551 	 * High priority messages shouldn't get here but if
    552 	 * one does, jam it through to avoid infinite loop.
    553 	 */
    554 	while ((mp = getq(rq)) != NULL) {
    555 		if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
    556 			/* should only get here if SB_NO_SROPS */
    557 			(void) putbq(rq, mp);
    558 			return;
    559 		}
    560 		putnext(rq, mp);
    561 	}
    562 }
    563 
    564 /*
    565  * Handle write-side M_IOCTL messages.
    566  */
    567 static void
    568 sbioctl(queue_t *wq, mblk_t *mp)
    569 {
    570 	struct	sb	*sbp = (struct sb *)wq->q_ptr;
    571 	struct iocblk	*iocp = (struct iocblk *)mp->b_rptr;
    572 	struct	timeval	*t;
    573 	clock_t	ticks;
    574 	mblk_t	*mop;
    575 	int	transparent = iocp->ioc_count;
    576 	mblk_t	*datamp;
    577 	int	error;
    578 
    579 	switch (iocp->ioc_cmd) {
    580 	case SBIOCSTIME:
    581 		if (iocp->ioc_count == TRANSPARENT) {
    582 #ifdef _SYSCALL32_IMPL
    583 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
    584 				mcopyin(mp, NULL, sizeof (struct timeval32),
    585 				    NULL);
    586 			} else
    587 #endif /* _SYSCALL32_IMPL */
    588 			{
    589 				mcopyin(mp, NULL, sizeof (*t), NULL);
    590 			}
    591 			qreply(wq, mp);
    592 		} else {
    593 			/*
    594 			 * Verify argument length.
    595 			 */
    596 #ifdef _SYSCALL32_IMPL
    597 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
    598 				struct timeval32 *t32;
    599 
    600 				error = miocpullup(mp,
    601 				    sizeof (struct timeval32));
    602 				if (error != 0) {
    603 					miocnak(wq, mp, 0, error);
    604 					break;
    605 				}
    606 				t32 = (struct timeval32 *)mp->b_cont->b_rptr;
    607 				if (t32->tv_sec < 0 || t32->tv_usec < 0) {
    608 					miocnak(wq, mp, 0, EINVAL);
    609 					break;
    610 				}
    611 				ticks = TIMEVAL_TO_TICK(t32);
    612 			} else
    613 #endif /* _SYSCALL32_IMPL */
    614 			{
    615 				error = miocpullup(mp, sizeof (struct timeval));
    616 				if (error != 0) {
    617 					miocnak(wq, mp, 0, error);
    618 					break;
    619 				}
    620 
    621 				t = (struct timeval *)mp->b_cont->b_rptr;
    622 				if (t->tv_sec < 0 || t->tv_usec < 0) {
    623 					miocnak(wq, mp, 0, EINVAL);
    624 					break;
    625 				}
    626 				ticks = TIMEVAL_TO_TICK(t);
    627 			}
    628 			sbp->sb_ticks = ticks;
    629 			if (ticks == 0)
    630 				sbp->sb_chunk = 0;
    631 			miocack(wq, mp, 0, 0);
    632 			sbclosechunk(sbp);
    633 		}
    634 		break;
    635 
    636 	case SBIOCGTIME: {
    637 		struct timeval *t;
    638 
    639 		/*
    640 		 * Verify argument length.
    641 		 */
    642 		if (transparent != TRANSPARENT) {
    643 #ifdef _SYSCALL32_IMPL
    644 			if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
    645 				error = miocpullup(mp,
    646 				    sizeof (struct timeval32));
    647 				if (error != 0) {
    648 					miocnak(wq, mp, 0, error);
    649 					break;
    650 				}
    651 			} else
    652 #endif /* _SYSCALL32_IMPL */
    653 			error = miocpullup(mp, sizeof (struct timeval));
    654 			if (error != 0) {
    655 				miocnak(wq, mp, 0, error);
    656 				break;
    657 			}
    658 		}
    659 
    660 		/*
    661 		 * If infinite timeout, return range error
    662 		 * for the ioctl.
    663 		 */
    664 		if (sbp->sb_ticks < 0) {
    665 			miocnak(wq, mp, 0, ERANGE);
    666 			break;
    667 		}
    668 
    669 #ifdef _SYSCALL32_IMPL
    670 		if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
    671 			struct timeval32 *t32;
    672 
    673 			if (transparent == TRANSPARENT) {
    674 				datamp = allocb(sizeof (*t32), BPRI_MED);
    675 				if (datamp == NULL) {
    676 					miocnak(wq, mp, 0, EAGAIN);
    677 					break;
    678 				}
    679 				mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
    680 			}
    681 
    682 			t32 = (struct timeval32 *)mp->b_cont->b_rptr;
    683 			TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
    684 
    685 			if (transparent == TRANSPARENT)
    686 				qreply(wq, mp);
    687 			else
    688 				miocack(wq, mp, sizeof (*t32), 0);
    689 		} else
    690 #endif /* _SYSCALL32_IMPL */
    691 		{
    692 			if (transparent == TRANSPARENT) {
    693 				datamp = allocb(sizeof (*t), BPRI_MED);
    694 				if (datamp == NULL) {
    695 					miocnak(wq, mp, 0, EAGAIN);
    696 					break;
    697 				}
    698 				mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
    699 			}
    700 
    701 			t = (struct timeval *)mp->b_cont->b_rptr;
    702 			TICK_TO_TIMEVAL(sbp->sb_ticks, t);
    703 
    704 			if (transparent == TRANSPARENT)
    705 				qreply(wq, mp);
    706 			else
    707 				miocack(wq, mp, sizeof (*t), 0);
    708 		}
    709 		break;
    710 	}
    711 
    712 	case SBIOCCTIME:
    713 		sbp->sb_ticks = -1;
    714 		miocack(wq, mp, 0, 0);
    715 		break;
    716 
    717 	case SBIOCSCHUNK:
    718 		if (iocp->ioc_count == TRANSPARENT) {
    719 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
    720 			qreply(wq, mp);
    721 		} else {
    722 			/*
    723 			 * Verify argument length.
    724 			 */
    725 			error = miocpullup(mp, sizeof (uint_t));
    726 			if (error != 0) {
    727 				miocnak(wq, mp, 0, error);
    728 				break;
    729 			}
    730 
    731 			/*
    732 			 * set up hi/lo water marks on stream head read queue.
    733 			 * unlikely to run out of resources. Fix at later date.
    734 			 */
    735 			if ((mop = allocb(sizeof (struct stroptions),
    736 			    BPRI_MED)) != NULL) {
    737 				struct stroptions *sop;
    738 				uint_t chunk;
    739 
    740 				chunk = *(uint_t *)mp->b_cont->b_rptr;
    741 				mop->b_datap->db_type = M_SETOPTS;
    742 				mop->b_wptr += sizeof (struct stroptions);
    743 				sop = (struct stroptions *)mop->b_rptr;
    744 				sop->so_flags = SO_HIWAT | SO_LOWAT;
    745 				sop->so_hiwat = SNIT_HIWAT(chunk, 1);
    746 				sop->so_lowat = SNIT_LOWAT(chunk, 1);
    747 				qreply(wq, mop);
    748 			}
    749 
    750 			sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
    751 			miocack(wq, mp, 0, 0);
    752 			sbclosechunk(sbp);
    753 		}
    754 		break;
    755 
    756 	case SBIOCGCHUNK:
    757 		/*
    758 		 * Verify argument length.
    759 		 */
    760 		if (transparent != TRANSPARENT) {
    761 			error = miocpullup(mp, sizeof (uint_t));
    762 			if (error != 0) {
    763 				miocnak(wq, mp, 0, error);
    764 				break;
    765 			}
    766 		}
    767 
    768 		if (transparent == TRANSPARENT) {
    769 			datamp = allocb(sizeof (uint_t), BPRI_MED);
    770 			if (datamp == NULL) {
    771 				miocnak(wq, mp, 0, EAGAIN);
    772 				break;
    773 			}
    774 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
    775 		}
    776 
    777 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
    778 
    779 		if (transparent == TRANSPARENT)
    780 			qreply(wq, mp);
    781 		else
    782 			miocack(wq, mp, sizeof (uint_t), 0);
    783 		break;
    784 
    785 	case SBIOCSSNAP:
    786 		if (iocp->ioc_count == TRANSPARENT) {
    787 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
    788 			qreply(wq, mp);
    789 		} else {
    790 			/*
    791 			 * Verify argument length.
    792 			 */
    793 			error = miocpullup(mp, sizeof (uint_t));
    794 			if (error != 0) {
    795 				miocnak(wq, mp, 0, error);
    796 				break;
    797 			}
    798 
    799 			/*
    800 			 * if chunking dont worry about effects of
    801 			 * snipping of message size on head flow control
    802 			 * since it has a relatively small bearing on the
    803 			 * data rate onto the streamn head.
    804 			 */
    805 			if (!sbp->sb_chunk) {
    806 				/*
    807 				 * set up hi/lo water marks on stream
    808 				 * head read queue.  unlikely to run out
    809 				 * of resources. Fix at later date.
    810 				 */
    811 				if ((mop = allocb(sizeof (struct stroptions),
    812 				    BPRI_MED)) != NULL) {
    813 					struct stroptions *sop;
    814 					uint_t snap;
    815 					int fudge;
    816 
    817 					snap = *(uint_t *)mp->b_cont->b_rptr;
    818 					mop->b_datap->db_type = M_SETOPTS;
    819 					mop->b_wptr += sizeof (*sop);
    820 					sop = (struct stroptions *)mop->b_rptr;
    821 					sop->so_flags = SO_HIWAT | SO_LOWAT;
    822 					fudge = (snap <= 100) ? 4 :
    823 					    (snap <= 400) ? 2 : 1;
    824 					sop->so_hiwat = SNIT_HIWAT(snap, fudge);
    825 					sop->so_lowat = SNIT_LOWAT(snap, fudge);
    826 					qreply(wq, mop);
    827 				}
    828 			}
    829 
    830 			sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
    831 
    832 			miocack(wq, mp, 0, 0);
    833 		}
    834 		break;
    835 
    836 	case SBIOCGSNAP:
    837 		/*
    838 		 * Verify argument length
    839 		 */
    840 		if (transparent != TRANSPARENT) {
    841 			error = miocpullup(mp, sizeof (uint_t));
    842 			if (error != 0) {
    843 				miocnak(wq, mp, 0, error);
    844 				break;
    845 			}
    846 		}
    847 
    848 		if (transparent == TRANSPARENT) {
    849 			datamp = allocb(sizeof (uint_t), BPRI_MED);
    850 			if (datamp == NULL) {
    851 				miocnak(wq, mp, 0, EAGAIN);
    852 				break;
    853 			}
    854 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
    855 		}
    856 
    857 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
    858 
    859 		if (transparent == TRANSPARENT)
    860 			qreply(wq, mp);
    861 		else
    862 			miocack(wq, mp, sizeof (uint_t), 0);
    863 		break;
    864 
    865 	case SBIOCSFLAGS:
    866 		/*
    867 		 * set the flags.
    868 		 */
    869 		if (iocp->ioc_count == TRANSPARENT) {
    870 			mcopyin(mp, NULL, sizeof (uint_t), NULL);
    871 			qreply(wq, mp);
    872 		} else {
    873 			error = miocpullup(mp, sizeof (uint_t));
    874 			if (error != 0) {
    875 				miocnak(wq, mp, 0, error);
    876 				break;
    877 			}
    878 			sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
    879 			miocack(wq, mp, 0, 0);
    880 		}
    881 		break;
    882 
    883 	case SBIOCGFLAGS:
    884 		/*
    885 		 * Verify argument length
    886 		 */
    887 		if (transparent != TRANSPARENT) {
    888 			error = miocpullup(mp, sizeof (uint_t));
    889 			if (error != 0) {
    890 				miocnak(wq, mp, 0, error);
    891 				break;
    892 			}
    893 		}
    894 
    895 		if (transparent == TRANSPARENT) {
    896 			datamp = allocb(sizeof (uint_t), BPRI_MED);
    897 			if (datamp == NULL) {
    898 				miocnak(wq, mp, 0, EAGAIN);
    899 				break;
    900 			}
    901 			mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
    902 		}
    903 
    904 		*(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
    905 
    906 		if (transparent == TRANSPARENT)
    907 			qreply(wq, mp);
    908 		else
    909 			miocack(wq, mp, sizeof (uint_t), 0);
    910 		break;
    911 
    912 
    913 	default:
    914 		putnext(wq, mp);
    915 		break;
    916 	}
    917 }
    918 
    919 /*
    920  * Given a length l, calculate the amount of extra storage
    921  * required to round it up to the next multiple of the alignment a.
    922  */
    923 #define	RoundUpAmt(l, a)	((l) % (a) ? (a) - ((l) % (a)) : 0)
    924 /*
    925  * Calculate additional amount of space required for alignment.
    926  */
    927 #define	Align(l)		RoundUpAmt(l, sizeof (ulong_t))
    928 /*
    929  * Smallest possible message size when headers are enabled.
    930  * This is used to calculate whether a chunk is nearly full.
    931  */
    932 #define	SMALLEST_MESSAGE	sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
    933 
    934 /*
    935  * Process a read-side M_DATA message.
    936  *
    937  * If the currently accumulating chunk doesn't have enough room
    938  * for the message, close off the chunk, pass it upward, and start
    939  * a new one.  Then add the message to the current chunk, taking
    940  * account of the possibility that the message's size exceeds the
    941  * chunk size.
    942  *
    943  * If headers are enabled add an sb_hdr header and trailing alignment padding.
    944  *
    945  * To optimise performance the total number of msgbs should be kept
    946  * to a minimum. This is achieved by using any remaining space in message N
    947  * for both its own padding as well as the header of message N+1 if possible.
    948  * If there's insufficient space we allocate one message to hold this 'wrapper'.
    949  * (there's likely to be space beyond message N, since allocb would have
    950  * rounded up the required size to one of the dblk_sizes).
    951  *
    952  */
    953 static void
    954 sbaddmsg(queue_t *rq, mblk_t *mp)
    955 {
    956 	struct sb	*sbp;
    957 	struct timeval	t;
    958 	struct sb_hdr	hp;
    959 	mblk_t *wrapper;	/* padding for msg N, header for msg N+1 */
    960 	mblk_t *last;		/* last mblk of current message */
    961 	size_t wrapperlen;	/* length of header + padding */
    962 	size_t origlen;		/* data length before truncation */
    963 	size_t pad;		/* bytes required to align header */
    964 
    965 	sbp = (struct sb *)rq->q_ptr;
    966 
    967 	origlen = msgdsize(mp);
    968 
    969 	/*
    970 	 * Truncate the message.
    971 	 */
    972 	if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
    973 			(adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
    974 		hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
    975 	else
    976 		hp.sbh_totlen = hp.sbh_msglen = origlen;
    977 
    978 	if (sbp->sb_flags & SB_NO_HEADER) {
    979 
    980 		/*
    981 		 * Would the inclusion of this message overflow the current
    982 		 * chunk? If so close the chunk off and start a new one.
    983 		 */
    984 		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
    985 			sbclosechunk(sbp);
    986 		/*
    987 		 * First message too big for chunk - just send it up.
    988 		 * This will always be true when we're not chunking.
    989 		 */
    990 		if (hp.sbh_totlen > sbp->sb_chunk) {
    991 			sbsendit(rq, mp);
    992 			return;
    993 		}
    994 
    995 		/*
    996 		 * We now know that the msg will fit in the chunk.
    997 		 * Link it onto the end of the chunk.
    998 		 * Since linkb() walks the entire chain, we keep a pointer to
    999 		 * the first mblk of the last msgb added and call linkb on that
   1000 		 * that last message, rather than performing the
   1001 		 * O(n) linkb() operation on the whole chain.
   1002 		 * sb_head isn't needed in this SB_NO_HEADER mode.
   1003 		 */
   1004 		if (sbp->sb_mp)
   1005 			linkb(sbp->sb_tail, mp);
   1006 		else
   1007 			sbp->sb_mp = mp;
   1008 
   1009 		sbp->sb_tail = mp;
   1010 		sbp->sb_mlen += hp.sbh_totlen;
   1011 		sbp->sb_mcount++;
   1012 	} else {
   1013 		/* Timestamp must be done immediately */
   1014 		uniqtime(&t);
   1015 		TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
   1016 
   1017 		pad = Align(hp.sbh_totlen);
   1018 		hp.sbh_totlen += sizeof (hp);
   1019 		hp.sbh_totlen += pad;
   1020 
   1021 		/*
   1022 		 * Would the inclusion of this message overflow the current
   1023 		 * chunk? If so close the chunk off and start a new one.
   1024 		 */
   1025 		if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
   1026 				sbclosechunk(sbp);
   1027 
   1028 		if (sbp->sb_head == NULL) {
   1029 			/* Allocate leading header of new chunk */
   1030 			sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
   1031 			if (sbp->sb_head == NULL) {
   1032 				/*
   1033 				 * Memory allocation failure.
   1034 				 * This will need to be revisited
   1035 				 * since using certain flag combinations
   1036 				 * can result in messages being dropped
   1037 				 * silently.
   1038 				 */
   1039 				freemsg(mp);
   1040 				sbp->sb_drops++;
   1041 				return;
   1042 			}
   1043 			sbp->sb_mp = sbp->sb_head;
   1044 		}
   1045 
   1046 		/*
   1047 		 * Copy header into message
   1048 		 */
   1049 		hp.sbh_drops = sbp->sb_drops;
   1050 		hp.sbh_origlen = origlen;
   1051 		(void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
   1052 		sbp->sb_head->b_wptr += sizeof (hp);
   1053 
   1054 		ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
   1055 
   1056 		/*
   1057 		 * Join message to the chunk
   1058 		 */
   1059 		linkb(sbp->sb_head, mp);
   1060 
   1061 		sbp->sb_mcount++;
   1062 		sbp->sb_mlen += hp.sbh_totlen;
   1063 
   1064 		/*
   1065 		 * If the first message alone is too big for the chunk close
   1066 		 * the chunk now.
   1067 		 * If the next message would immediately cause the chunk to
   1068 		 * overflow we may as well close the chunk now. The next
   1069 		 * message is certain to be at least SMALLEST_MESSAGE size.
   1070 		 */
   1071 		if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
   1072 			sbclosechunk(sbp);
   1073 			return;
   1074 		}
   1075 
   1076 		/*
   1077 		 * Find space for the wrapper. The wrapper consists of:
   1078 		 *
   1079 		 * 1) Padding for this message (this is to ensure each header
   1080 		 * begins on an 8 byte boundary in the userland buffer).
   1081 		 *
   1082 		 * 2) Space for the next message's header, in case the next
   1083 		 * next message will fit in this chunk.
   1084 		 *
   1085 		 * It may be possible to append the wrapper to the last mblk
   1086 		 * of the message, but only if we 'own' the data. If the dblk
   1087 		 * has been shared through dupmsg() we mustn't alter it.
   1088 		 */
   1089 
   1090 		wrapperlen = (sizeof (hp) + pad);
   1091 
   1092 		/* Is there space for the wrapper beyond the message's data ? */
   1093 		for (last = mp; last->b_cont; last = last->b_cont)
   1094 			;
   1095 
   1096 		if ((wrapperlen <= MBLKTAIL(last)) &&
   1097 			(last->b_datap->db_ref == 1)) {
   1098 			if (pad > 0) {
   1099 				/*
   1100 				 * Pad with zeroes to the next pointer boundary
   1101 				 * (we don't want to disclose kernel data to
   1102 				 * users), then advance wptr.
   1103 				 */
   1104 				(void) memset(last->b_wptr, 0, pad);
   1105 				last->b_wptr += pad;
   1106 			}
   1107 			/* Remember where to write the header information */
   1108 			sbp->sb_head = last;
   1109 		} else {
   1110 			/* Have to allocate additional space for the wrapper */
   1111 			wrapper = allocb(wrapperlen, BPRI_MED);
   1112 			if (wrapper == NULL) {
   1113 				sbclosechunk(sbp);
   1114 				return;
   1115 			}
   1116 			if (pad > 0) {
   1117 				/*
   1118 				 * Pad with zeroes (we don't want to disclose
   1119 				 * kernel data to users).
   1120 				 */
   1121 				(void) memset(wrapper->b_wptr, 0, pad);
   1122 				wrapper->b_wptr += pad;
   1123 			}
   1124 			/* Link the wrapper msg onto the end of the chunk */
   1125 			linkb(mp, wrapper);
   1126 			/* Remember to write the next header in this wrapper */
   1127 			sbp->sb_head = wrapper;
   1128 		}
   1129 	}
   1130 }
   1131 
   1132 /*
   1133  * Called from timeout().
   1134  * Signal a timeout by passing a zero-length M_CTL msg in the read-side
   1135  * to synchronize with any active module threads (open, close, wput, rput).
   1136  */
   1137 static void
   1138 sbtick(void *arg)
   1139 {
   1140 	struct sb *sbp = arg;
   1141 	queue_t	*rq;
   1142