Home | History | Annotate | Download | only in rt
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     28 
     29 #include "lint.h"
     30 #include "mtlib.h"
     31 #define	_KMEMUSER
     32 #include <sys/param.h>		/* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
     33 #undef	_KMEMUSER
     34 #include <mqueue.h>
     35 #include <sys/types.h>
     36 #include <sys/file.h>
     37 #include <sys/mman.h>
     38 #include <errno.h>
     39 #include <stdarg.h>
     40 #include <limits.h>
     41 #include <pthread.h>
     42 #include <assert.h>
     43 #include <string.h>
     44 #include <unistd.h>
     45 #include <stdlib.h>
     46 #include <sys/stat.h>
     47 #include <inttypes.h>
     48 #include "sigev_thread.h"
     49 #include "pos4obj.h"
     50 
     51 /*
     52  * Default values per message queue
     53  */
     54 #define	MQ_MAXMSG	128
     55 #define	MQ_MAXSIZE	1024
     56 
     57 #define	MQ_MAGIC	0x4d534751		/* "MSGQ" */
     58 
     59 /*
     60  * Message header which is part of messages in link list
     61  */
     62 typedef struct {
     63 	uint64_t 	msg_next;	/* offset of next message in the link */
     64 	uint64_t	msg_len;	/* length of the message */
     65 } msghdr_t;
     66 
     67 /*
     68  * message queue description
     69  */
     70 struct mq_dn {
     71 	size_t		mqdn_flags;	/* open description flags */
     72 };
     73 
     74 /*
     75  * message queue descriptor structure
     76  */
     77 typedef struct mq_des {
     78 	struct mq_des	*mqd_next;	/* list of all open mq descriptors, */
     79 	struct mq_des	*mqd_prev;	/* needed for fork-safety */
     80 	int		mqd_magic;	/* magic # to identify mq_des */
     81 	int		mqd_flags;	/* operation flag per open */
     82 	struct mq_header *mqd_mq;	/* address pointer of message Q */
     83 	struct mq_dn	*mqd_mqdn;	/* open	description */
     84 	thread_communication_data_t *mqd_tcd;	/* SIGEV_THREAD notification */
     85 	int		mqd_ownerdead;	/* mq_exclusive is inconsistent */
     86 } mqdes_t;
     87 
     88 /*
     89  * message queue common header, part of the mmap()ed file.
     90  * Since message queues may be shared between 32- and 64-bit processes,
     91  * care must be taken to make sure that the elements of this structure
     92  * are identical for both _LP64 and _ILP32 cases.
     93  */
     94 typedef struct mq_header {
     95 	/* first field must be mq_totsize, DO NOT insert before this	*/
     96 	int64_t		mq_totsize;	/* total size of the Queue */
     97 	int64_t		mq_maxsz;	/* max size of each message */
     98 	uint32_t	mq_maxmsg;	/* max messages in the queue */
     99 	uint32_t	mq_maxprio;	/* maximum mqueue priority */
    100 	uint32_t	mq_curmaxprio;	/* current maximum MQ priority */
    101 	uint32_t	mq_mask;	/* priority bitmask */
    102 	uint64_t	mq_freep;	/* free message's head pointer */
    103 	uint64_t	mq_headpp;	/* pointer to head pointers */
    104 	uint64_t	mq_tailpp;	/* pointer to tail pointers */
    105 	signotify_id_t	mq_sigid;	/* notification id (3 int's) */
    106 	uint32_t	mq_ntype;	/* notification type (SIGEV_*) */
    107 	uint64_t	mq_des;		/* pointer to msg Q descriptor */
    108 	mutex_t		mq_exclusive;	/* acquire for exclusive access */
    109 	sem_t		mq_rblocked;	/* number of processes rblocked */
    110 	sem_t		mq_notfull;	/* mq_send()'s block on this */
    111 	sem_t		mq_notempty;	/* mq_receive()'s block on this */
    112 	sem_t		mq_spawner;	/* spawner thread blocks on this */
    113 } mqhdr_t;
    114 
    115 /*
    116  * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
    117  * If this assumption is somehow invalidated, mq_open() needs to be changed
    118  * back to the old version which kept a count and enforced a limit.
    119  * We make sure that this is pointed out to those changing <sys/param.h>
    120  * by checking _MQ_OPEN_MAX at compile time.
    121  */
    122 #if _MQ_OPEN_MAX != -1
    123 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
    124 #endif
    125 
    126 #define	MQ_ALIGNSIZE	8	/* 64-bit alignment */
    127 
    128 #ifdef DEBUG
    129 #define	MQ_ASSERT(x)	assert(x);
    130 
    131 #define	MQ_ASSERT_PTR(_m, _p) \
    132 	assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
    133 	    !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
    134 	    _m->mq_totsize));
    135 
    136 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
    137 	int _val; \
    138 	(void) sem_getvalue((sem), &_val); \
    139 	assert((_val) <= val); }
    140 #else
    141 #define	MQ_ASSERT(x)
    142 #define	MQ_ASSERT_PTR(_m, _p)
    143 #define	MQ_ASSERT_SEMVAL_LEQ(sem, val)
    144 #endif
    145 
    146 #define	MQ_PTR(m, n)	((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
    147 #define	HEAD_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
    148 			(uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
    149 #define	TAIL_PTR(m, n)	((uint64_t *)((uintptr_t)m + \
    150 			(uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
    151 
    152 #define	MQ_RESERVED	((mqdes_t *)-1)
    153 
    154 #define	ABS_TIME	0
    155 #define	REL_TIME	1
    156 
    157 static mutex_t mq_list_lock = DEFAULTMUTEX;
    158 static mqdes_t *mq_list = NULL;
    159 
    160 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
    161 
    162 static int
    163 mq_is_valid(mqdes_t *mqdp)
    164 {
    165 	/*
    166 	 * Any use of a message queue after it was closed is
    167 	 * undefined.  But the standard strongly favours EBADF
    168 	 * returns.  Before we dereference which could be fatal,
    169 	 * we first do some pointer sanity checks.
    170 	 */
    171 	if (mqdp != NULL && mqdp != MQ_RESERVED &&
    172 	    ((uintptr_t)mqdp & 0x7) == 0) {
    173 		return (mqdp->mqd_magic == MQ_MAGIC);
    174 	}
    175 
    176 	return (0);
    177 }
    178 
    179 static void
    180 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
    181 {
    182 	int		i;
    183 	uint64_t	temp;
    184 	uint64_t	currentp;
    185 	uint64_t	nextp;
    186 
    187 	/*
    188 	 * We only need to initialize the non-zero fields.  The use of
    189 	 * ftruncate() on the message queue file assures that the
    190 	 * pages will be zero-filled.
    191 	 */
    192 	(void) mutex_init(&mqhp->mq_exclusive,
    193 	    USYNC_PROCESS | LOCK_ROBUST, NULL);
    194 	(void) sem_init(&mqhp->mq_rblocked, 1, 0);
    195 	(void) sem_init(&mqhp->mq_notempty, 1, 0);
    196 	(void) sem_init(&mqhp->mq_spawner, 1, 0);
    197 	(void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
    198 
    199 	mqhp->mq_maxsz = msgsize;
    200 	mqhp->mq_maxmsg = maxmsg;
    201 
    202 	/*
    203 	 * As of this writing (1997), there are 32 message queue priorities.
    204 	 * If this is to change, then the size of the mq_mask will
    205 	 * also have to change.  If DEBUG is defined, assert that
    206 	 * _MQ_PRIO_MAX hasn't changed.
    207 	 */
    208 	mqhp->mq_maxprio = _MQ_PRIO_MAX;
    209 #if defined(DEBUG)
    210 	/* LINTED always true */
    211 	MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
    212 #endif
    213 
    214 	/*
    215 	 * Since the message queue can be mapped into different
    216 	 * virtual address ranges by different processes, we don't
    217 	 * keep track of pointers, only offsets into the shared region.
    218 	 */
    219 	mqhp->mq_headpp = sizeof (mqhdr_t);
    220 	mqhp->mq_tailpp = mqhp->mq_headpp +
    221 	    mqhp->mq_maxprio * sizeof (uint64_t);
    222 	mqhp->mq_freep = mqhp->mq_tailpp +
    223 	    mqhp->mq_maxprio * sizeof (uint64_t);
    224 
    225 	currentp = mqhp->mq_freep;
    226 	MQ_PTR(mqhp, currentp)->msg_next = 0;
    227 
    228 	temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
    229 	for (i = 1; i < mqhp->mq_maxmsg; i++) {
    230 		nextp = currentp + sizeof (msghdr_t) + temp;
    231 		MQ_PTR(mqhp, currentp)->msg_next = nextp;
    232 		MQ_PTR(mqhp, nextp)->msg_next = 0;
    233 		currentp = nextp;
    234 	}
    235 }
    236 
    237 static size_t
    238 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
    239 {
    240 	uint64_t currentp;
    241 	msghdr_t *curbuf;
    242 	uint64_t *headpp;
    243 	uint64_t *tailpp;
    244 
    245 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
    246 
    247 	/*
    248 	 * Get the head and tail pointers for the queue of maximum
    249 	 * priority.  We shouldn't be here unless there is a message for
    250 	 * us, so it's fair to assert that both the head and tail
    251 	 * pointers are non-NULL.
    252 	 */
    253 	headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
    254 	tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
    255 
    256 	if (msg_prio != NULL)
    257 		*msg_prio = mqhp->mq_curmaxprio;
    258 
    259 	currentp = *headpp;
    260 	MQ_ASSERT_PTR(mqhp, currentp);
    261 	curbuf = MQ_PTR(mqhp, currentp);
    262 
    263 	if ((*headpp = curbuf->msg_next) == NULL) {
    264 		/*
    265 		 * We just nuked the last message in this priority's queue.
    266 		 * Twiddle this priority's bit, and then find the next bit
    267 		 * tipped.
    268 		 */
    269 		uint_t prio = mqhp->mq_curmaxprio;
    270 
    271 		mqhp->mq_mask &= ~(1u << prio);
    272 
    273 		for (; prio != 0; prio--)
    274 			if (mqhp->mq_mask & (1u << prio))
    275 				break;
    276 		mqhp->mq_curmaxprio = prio;
    277 
    278 		*tailpp = NULL;
    279 	}
    280 
    281 	/*
    282 	 * Copy the message, and put the buffer back on the free list.
    283 	 */
    284 	(void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
    285 	curbuf->msg_next = mqhp->mq_freep;
    286 	mqhp->mq_freep = currentp;
    287 
    288 	return (curbuf->msg_len);
    289 }
    290 
    291 
    292 static void
    293 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
    294 {
    295 	uint64_t currentp;
    296 	msghdr_t *curbuf;
    297 	uint64_t *headpp;
    298 	uint64_t *tailpp;
    299 
    300 	MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
    301 
    302 	/*
    303 	 * Grab a free message block, and link it in.  We shouldn't
    304 	 * be here unless there is room in the queue for us;  it's
    305 	 * fair to assert that the free pointer is non-NULL.
    306 	 */
    307 	currentp = mqhp->mq_freep;
    308 	MQ_ASSERT_PTR(mqhp, currentp);
    309 	curbuf = MQ_PTR(mqhp, currentp);
    310 
    311 	/*
    312 	 * Remove a message from the free list, and copy in the new contents.
    313 	 */
    314 	mqhp->mq_freep = curbuf->msg_next;
    315 	curbuf->msg_next = NULL;
    316 	(void) memcpy((char *)&curbuf[1], msgp, len);
    317 	curbuf->msg_len = len;
    318 
    319 	headpp = HEAD_PTR(mqhp, prio);
    320 	tailpp = TAIL_PTR(mqhp, prio);
    321 
    322 	if (*tailpp == 0) {
    323 		/*
    324 		 * This is the first message on this queue.  Set the
    325 		 * head and tail pointers, and tip the appropriate bit
    326 		 * in the priority mask.
    327 		 */
    328 		*headpp = currentp;
    329 		*tailpp = currentp;
    330 		mqhp->mq_mask |= (1u << prio);
    331 		if (prio > mqhp->mq_curmaxprio)
    332 			mqhp->mq_curmaxprio = prio;
    333 	} else {
    334 		MQ_ASSERT_PTR(mqhp, *tailpp);
    335 		MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
    336 		*tailpp = currentp;
    337 	}
    338 }
    339 
    340 /*
    341  * Send a notification and also delete the registration.
    342  */
    343 static void
    344 do_notify(mqhdr_t *mqhp)
    345 {
    346 	(void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
    347 	if (mqhp->mq_ntype == SIGEV_THREAD ||
    348 	    mqhp->mq_ntype == SIGEV_PORT)
    349 		(void) sem_post(&mqhp->mq_spawner);
    350 	mqhp->mq_ntype = 0;
    351 	mqhp->mq_des = 0;
    352 }
    353 
    354 /*
    355  * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
    356  * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
    357  * they fail with errno == EBADMSG.  Trigger any registered notification.
    358  */
    359 static void
    360 owner_dead(mqdes_t *mqdp, int error)
    361 {
    362 	mqhdr_t *mqhp = mqdp->mqd_mq;
    363 
    364 	mqdp->mqd_ownerdead = 1;
    365 	(void) sem_post(&mqhp->mq_notfull);
    366 	(void) sem_post(&mqhp->mq_notempty);
    367 	if (error == EOWNERDEAD) {
    368 		if (mqhp->mq_sigid.sn_pid != 0)
    369 			do_notify(mqhp);
    370 		(void) mutex_unlock(&mqhp->mq_exclusive);
    371 	}
    372 	errno = EBADMSG;
    373 }
    374 
    375 mqd_t
    376 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
    377 {
    378 	va_list		ap;
    379 	mode_t		mode = 0;
    380 	struct mq_attr	*attr = NULL;
    381 	int		fd;
    382 	int		err;
    383 	int		cr_flag = 0;
    384 	int		locked = 0;
    385 	uint64_t	total_size;
    386 	size_t		msgsize;
    387 	ssize_t		maxmsg;
    388 	uint64_t	temp;
    389 	void		*ptr;
    390 	mqdes_t		*mqdp;
    391 	mqhdr_t		*mqhp;
    392 	struct mq_dn	*mqdnp;
    393 
    394 	if (__pos4obj_check(path) == -1)
    395 		return ((mqd_t)-1);
    396 
    397 	/* acquire MSGQ lock to have atomic operation */
    398 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
    399 		goto out;
    400 	locked = 1;
    401 
    402 	va_start(ap, oflag);
    403 	/* filter oflag to have READ/WRITE/CREATE modes only */
    404 	oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
    405 	if ((oflag & O_CREAT) != 0) {
    406 		mode = va_arg(ap, mode_t);
    407 		attr = va_arg(ap, struct mq_attr *);
    408 	}
    409 	va_end(ap);
    410 
    411 	if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
    412 	    mode, &cr_flag)) < 0)
    413 		goto out;
    414 
    415 	/* closing permission file */
    416 	(void) __close_nc(fd);
    417 
    418 	/* Try to open/create data file */
    419 	if (cr_flag) {
    420 		cr_flag = PFILE_CREATE;
    421 		if (attr == NULL) {
    422 			maxmsg = MQ_MAXMSG;
    423 			msgsize = MQ_MAXSIZE;
    424 		} else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
    425 			errno = EINVAL;
    426 			goto out;
    427 		} else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
    428 			errno = ENOSPC;
    429 			goto out;
    430 		} else {
    431 			maxmsg = attr->mq_maxmsg;
    432 			msgsize = attr->mq_msgsize;
    433 		}
    434 
    435 		/* adjust for message size at word boundary */
    436 		temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
    437 
    438 		total_size = sizeof (mqhdr_t) +
    439 		    maxmsg * (temp + sizeof (msghdr_t)) +
    440 		    2 * _MQ_PRIO_MAX * sizeof (uint64_t);
    441 
    442 		if (total_size > SSIZE_MAX) {
    443 			errno = ENOSPC;
    444 			goto out;
    445 		}
    446 
    447 		/*
    448 		 * data file is opened with read/write to those
    449 		 * who have read or write permission
    450 		 */
    451 		mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
    452 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
    453 		    (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
    454 			goto out;
    455 
    456 		cr_flag |= DFILE_CREATE | DFILE_OPEN;
    457 
    458 		/* force permissions to avoid umask effect */
    459 		if (fchmod(fd, mode) < 0)
    460 			goto out;
    461 
    462 		if (ftruncate64(fd, (off64_t)total_size) < 0)
    463 			goto out;
    464 	} else {
    465 		if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
    466 		    O_RDWR, 0666, &err)) < 0)
    467 			goto out;
    468 		cr_flag = DFILE_OPEN;
    469 
    470 		/* Message queue has not been initialized yet */
    471 		if (read(fd, &total_size, sizeof (total_size)) !=
    472 		    sizeof (total_size) || total_size == 0) {
    473 			errno = ENOENT;
    474 			goto out;
    475 		}
    476 
    477 		/* Message queue too big for this process to handle */
    478 		if (total_size > SSIZE_MAX) {
    479 			errno = EFBIG;
    480 			goto out;
    481 		}
    482 	}
    483 
    484 	if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
    485 		errno = ENOMEM;
    486 		goto out;
    487 	}
    488 	cr_flag |= ALLOC_MEM;
    489 
    490 	if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
    491 	    MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
    492 		goto out;
    493 	mqhp = ptr;
    494 	cr_flag |= DFILE_MMAP;
    495 
    496 	/* closing data file */
    497 	(void) __close_nc(fd);
    498 	cr_flag &= ~DFILE_OPEN;
    499 
    500 	/*
    501 	 * create, unlink, size, mmap, and close description file
    502 	 * all for a flag word in anonymous shared memory
    503 	 */
    504 	if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
    505 	    0666, &err)) < 0)
    506 		goto out;
    507 	cr_flag |= DFILE_OPEN;
    508 	(void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
    509 	if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
    510 		goto out;
    511 
    512 	if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
    513 	    PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
    514 		goto out;
    515 	mqdnp = ptr;
    516 	cr_flag |= MQDNP_MMAP;
    517 
    518 	(void) __close_nc(fd);
    519 	cr_flag &= ~DFILE_OPEN;
    520 
    521 	/*
    522 	 * we follow the same strategy as filesystem open() routine,
    523 	 * where fcntl.h flags are changed to flags defined in file.h.
    524 	 */
    525 	mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
    526 	mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
    527 
    528 	/* new message queue requires initialization */
    529 	if ((cr_flag & DFILE_CREATE) != 0) {
    530 		/* message queue header has to be initialized */
    531 		mq_init(mqhp, msgsize, maxmsg);
    532 		mqhp->mq_totsize = total_size;
    533 	}
    534 	mqdp->mqd_mq = mqhp;
    535 	mqdp->mqd_mqdn = mqdnp;
    536 	mqdp->mqd_magic = MQ_MAGIC;
    537 	mqdp->mqd_tcd = NULL;
    538 	mqdp->mqd_ownerdead = 0;
    539 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
    540 		lmutex_lock(&mq_list_lock);
    541 		mqdp->mqd_next = mq_list;
    542 		mqdp->mqd_prev = NULL;
    543 		if (mq_list)
    544 			mq_list->mqd_prev = mqdp;
    545 		mq_list = mqdp;
    546 		lmutex_unlock(&mq_list_lock);
    547 		return ((mqd_t)mqdp);
    548 	}
    549 
    550 	locked = 0;	/* fall into the error case */
    551 out:
    552 	err = errno;
    553 	if ((cr_flag & DFILE_OPEN) != 0)
    554 		(void) __close_nc(fd);
    555 	if ((cr_flag & DFILE_CREATE) != 0)
    556 		(void) __pos4obj_unlink(path, MQ_DATA_TYPE);
    557 	if ((cr_flag & PFILE_CREATE) != 0)
    558 		(void) __pos4obj_unlink(path, MQ_PERM_TYPE);
    559 	if ((cr_flag & ALLOC_MEM) != 0)
    560 		free((void *)mqdp);
    561 	if ((cr_flag & DFILE_MMAP) != 0)
    562 		(void) munmap((caddr_t)mqhp, (size_t)total_size);
    563 	if ((cr_flag & MQDNP_MMAP) != 0)
    564 		(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
    565 	if (locked)
    566 		(void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
    567 	errno = err;
    568 	return ((mqd_t)-1);
    569 }
    570 
    571 static void
    572 mq_close_cleanup(mqdes_t *mqdp)
    573 {
    574 	mqhdr_t *mqhp = mqdp->mqd_mq;
    575 	struct mq_dn *mqdnp = mqdp->mqd_mqdn;
    576 
    577 	/* invalidate the descriptor before freeing it */
    578 	mqdp->mqd_magic = 0;
    579 	if (!mqdp->mqd_ownerdead)
    580 		(void) mutex_unlock(&mqhp->mq_exclusive);
    581 
    582 	lmutex_lock(&mq_list_lock);
    583 	if (mqdp->mqd_next)
    584 		mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
    585 	if (mqdp->mqd_prev)
    586 		mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
    587 	if (mq_list == mqdp)
    588 		mq_list = mqdp->mqd_next;
    589 	lmutex_unlock(&mq_list_lock);
    590 
    591 	free(mqdp);
    592 	(void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
    593 	(void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
    594 }
    595 
    596 int
    597 mq_close(mqd_t mqdes)
    598 {
    599 	mqdes_t *mqdp = (mqdes_t *)mqdes;
    600 	mqhdr_t *mqhp;
    601 	thread_communication_data_t *tcdp;
    602 	int error;
    603 
    604 	if (!mq_is_valid(mqdp)) {
    605 		errno = EBADF;
    606 		return (-1);
    607 	}
    608 
    609 	mqhp = mqdp->mqd_mq;
    610 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
    611 		mqdp->mqd_ownerdead = 1;
    612 		if (error == EOWNERDEAD)
    613 			(void) mutex_unlock(&mqhp->mq_exclusive);
    614 		/* carry on regardless, without holding mq_exclusive */
    615 	}
    616 
    617 	if (mqhp->mq_des == (uintptr_t)mqdp &&
    618 	    mqhp->mq_sigid.sn_pid == getpid()) {
    619 		/* notification is set for this descriptor, remove it */
    620 		(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
    621 		mqhp->mq_ntype = 0;
    622 		mqhp->mq_des = 0;
    623 	}
    624 
    625 	pthread_cleanup_push(mq_close_cleanup, mqdp);
    626 	if ((tcdp = mqdp->mqd_tcd) != NULL) {
    627 		mqdp->mqd_tcd = NULL;
    628 		del_sigev_mq(tcdp);	/* possible cancellation point */
    629 	}
    630 	pthread_cleanup_pop(1);		/* finish in the cleanup handler */
    631 
    632 	return (0);
    633 }
    634 
    635 int
    636 mq_unlink(const char *path)
    637 {
    638 	int err;
    639 
    640 	if (__pos4obj_check(path) < 0)
    641 		return (-1);
    642 
    643 	if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
    644 		return (-1);
    645 	}
    646 
    647 	err = __pos4obj_unlink(path, MQ_PERM_TYPE);
    648 
    649 	if (err == 0 || (err == -1 && errno == EEXIST)) {
    650 		errno = 0;
    651 		err = __pos4obj_unlink(path, MQ_DATA_TYPE);
    652 	}
    653 
    654 	if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
    655 		return (-1);
    656 
    657 	return (err);
    658 
    659 }
    660 
    661 static int
    662 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
    663 	uint_t msg_prio, const timespec_t *timeout, int abs_rel)
    664 {
    665 	mqdes_t *mqdp = (mqdes_t *)mqdes;
    666 	mqhdr_t *mqhp;
    667 	int err;
    668 	int notify = 0;
    669 
    670 	/*
    671 	 * sem_*wait() does cancellation, if called.
    672 	 * pthread_testcancel() ensures that cancellation takes place if
    673 	 * there is a cancellation pending when mq_*send() is called.
    674 	 */
    675 	pthread_testcancel();
    676 
    677 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
    678 		errno = EBADF;
    679 		return (-1);
    680 	}
    681 
    682 	mqhp = mqdp->mqd_mq;
    683 
    684 	if (msg_prio >= mqhp->mq_maxprio) {
    685 		errno = EINVAL;
    686 		return (-1);
    687 	}
    688 	if (msg_len > mqhp->mq_maxsz) {
    689 		errno = EMSGSIZE;
    690 		return (-1);
    691 	}
    692 
    693 	if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
    694 		err = sem_trywait(&mqhp->mq_notfull);
    695 	else {
    696 		/*
    697 		 * We might get cancelled here...
    698 		 */
    699 		if (timeout == NULL)
    700 			err = sem_wait(&mqhp->mq_notfull);
    701 		else if (abs_rel == ABS_TIME)
    702 			err = sem_timedwait(&mqhp->mq_notfull, timeout);
    703 		else
    704 			err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
    705 	}
    706 	if (err == -1) {
    707 		/*
    708 		 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
    709 		 * by sem_*wait(), so we can just return.
    710 		 */
    711 		return (-1);
    712 	}
    713 
    714 	/*
    715 	 * By the time we're here, we know that we've got the capacity
    716 	 * to add to the queue...now acquire the exclusive lock.
    717 	 */
    718 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
    719 		owner_dead(mqdp, err);
    720 		return (-1);
    721 	}
    722 
    723 	/*
    724 	 * Now determine if we want to kick the notification.  POSIX
    725 	 * requires that if a process has registered for notification,
    726 	 * we must kick it when the queue makes an empty to non-empty
    727 	 * transition, and there are no blocked receivers.  Note that
    728 	 * this mechanism does _not_ guarantee that the kicked process
    729 	 * will be able to receive a message without blocking;
    730 	 * another receiver could intervene in the meantime.  Thus,
    731 	 * the notification mechanism is inherently racy; all we can
    732 	 * do is hope to minimize the window as much as possible.
    733 	 * In general, we want to avoid kicking the notification when
    734 	 * there are clearly receivers blocked.  We'll determine if
    735 	 * we want to kick the notification before the mq_putmsg(),
    736 	 * but the actual signotify() won't be done until the message
    737 	 * is on the queue.
    738 	 */
    739 	if (mqhp->mq_sigid.sn_pid != 0) {
    740 		int nmessages, nblocked;
    741 
    742 		(void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
    743 		(void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
    744 
    745 		if (nmessages == 0 && nblocked == 0)
    746 			notify = 1;
    747 	}
    748 
    749 	mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
    750 	(void) sem_post(&mqhp->mq_notempty);
    751 
    752 	if (notify) {
    753 		/* notify and also delete the registration */
    754 		do_notify(mqhp);
    755 	}
    756 
    757 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
    758 	(void) mutex_unlock(&mqhp->mq_exclusive);
    759 
    760 	return (0);
    761 }
    762 
    763 int
    764 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
    765 {
    766 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
    767 	    NULL, ABS_TIME));
    768 }
    769 
    770 int
    771 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
    772 	uint_t msg_prio, const timespec_t *abs_timeout)
    773 {
    774 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
    775 	    abs_timeout, ABS_TIME));
    776 }
    777 
    778 int
    779 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
    780 	uint_t msg_prio, const timespec_t *rel_timeout)
    781 {
    782 	return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
    783 	    rel_timeout, REL_TIME));
    784 }
    785 
    786 static void
    787 decrement_rblocked(mqhdr_t *mqhp)
    788 {
    789 	int cancel_state;
    790 
    791 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
    792 	while (sem_wait(&mqhp->mq_rblocked) == -1)
    793 		continue;
    794 	(void) pthread_setcancelstate(cancel_state, NULL);
    795 }
    796 
    797 static ssize_t
    798 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
    799 	uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
    800 {
    801 	mqdes_t *mqdp = (mqdes_t *)mqdes;
    802 	mqhdr_t *mqhp;
    803 	ssize_t	msg_size;
    804 	int err;
    805 
    806 	/*
    807 	 * sem_*wait() does cancellation, if called.
    808 	 * pthread_testcancel() ensures that cancellation takes place if
    809 	 * there is a cancellation pending when mq_*receive() is called.
    810 	 */
    811 	pthread_testcancel();
    812 
    813 	if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
    814 		errno = EBADF;
    815 		return (ssize_t)(-1);
    816 	}
    817 
    818 	mqhp = mqdp->mqd_mq;
    819 
    820 	if (msg_len < mqhp->mq_maxsz) {
    821 		errno = EMSGSIZE;
    822 		return (ssize_t)(-1);
    823 	}
    824 
    825 	/*
    826 	 * The semaphoring scheme for mq_[timed]receive is a little hairy
    827 	 * thanks to POSIX.1b's arcane notification mechanism.  First,
    828 	 * we try to take the common case and do a sem_trywait().
    829 	 * If that doesn't work, and O_NONBLOCK hasn't been set,
    830 	 * then note that we're going to sleep by incrementing the rblocked
    831 	 * semaphore.  We decrement that semaphore after waking up.
    832 	 */
    833 	if (sem_trywait(&mqhp->mq_notempty) == -1) {
    834 		if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
    835 			/*
    836 			 * errno has been set to EAGAIN or EINTR by
    837 			 * sem_trywait(), so we can just return.
    838 			 */
    839 			return (-1);
    840 		}
    841 		/*
    842 		 * If we're here, then we're probably going to block...
    843 		 * increment the rblocked semaphore.  If we get
    844 		 * cancelled, decrement_rblocked() will decrement it.
    845 		 */
    846 		(void) sem_post(&mqhp->mq_rblocked);
    847 
    848 		pthread_cleanup_push(decrement_rblocked, mqhp);
    849 		if (timeout == NULL)
    850 			err = sem_wait(&mqhp->mq_notempty);
    851 		else if (abs_rel == ABS_TIME)
    852 			err = sem_timedwait(&mqhp->mq_notempty, timeout);
    853 		else
    854 			err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
    855 		pthread_cleanup_pop(1);
    856 
    857 		if (err == -1) {
    858 			/*
    859 			 * We took a signal or timeout while waiting
    860 			 * on mq_notempty...
    861 			 */
    862 			return (-1);
    863 		}
    864 	}
    865 
    866 	if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
    867 		owner_dead(mqdp, err);
    868 		return (-1);
    869 	}
    870 	msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
    871 	(void) sem_post(&mqhp->mq_notfull);
    872 	MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
    873 	(void) mutex_unlock(&mqhp->mq_exclusive);
    874 
    875 	return (msg_size);
    876 }
    877 
    878 ssize_t
    879 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
    880 {
    881 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
    882 	    NULL, ABS_TIME));
    883 }
    884 
    885 ssize_t
    886 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
    887 	uint_t *msg_prio, const timespec_t *abs_timeout)
    888 {
    889 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
    890 	    abs_timeout, ABS_TIME));
    891 }
    892 
    893 ssize_t
    894 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
    895 	uint_t *msg_prio, const timespec_t *rel_timeout)
    896 {
    897 	return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
    898 	    rel_timeout, REL_TIME));
    899 }
    900 
    901 /*
    902  * Only used below, in mq_notify().
    903  * We already have a spawner thread.
    904  * Verify that the attributes match; cancel it if necessary.
    905  */
    906 static int
    907 cancel_if_necessary(thread_communication_data_t *tcdp,
    908 	const struct sigevent *sigevp)
    909 {
    910 	int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
    911 	    sigevp->sigev_notify_attributes);
    912 
    913 	if (do_cancel) {
    914 		/*
    915 		 * Attributes don't match, cancel the spawner thread.
    916 		 */
    917 		(void) pthread_cancel(tcdp->tcd_server_id);
    918 	} else {
    919 		/*
    920 		 * Reuse the existing spawner thread with possibly
    921 		 * changed notification function and value.
    922 		 */
    923 		tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
    924 		tcdp->tcd_notif.sigev_signo = 0;
    925 		tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
    926 		tcdp->tcd_notif.sigev_notify_function =
    927 		    sigevp->sigev_notify_function;
    928 	}
    929 
    930 	return (do_cancel);
    931 }
    932 
    933 int
    934 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
    935 {
    936 	mqdes_t *mqdp = (mqdes_t *)mqdes;
    937 	mqhdr_t *mqhp;
    938 	thread_communication_data_t *tcdp;
    939 	siginfo_t mq_siginfo;
    940 	struct sigevent sigevent;
    941 	struct stat64 statb;
    942 	port_notify_t *pn;
    943 	void *userval;
    944 	int rval = -1;
    945 	int ntype;
    946 	int port;
    947 	int error;
    948 
    949 	if (!mq_is_valid(mqdp)) {
    950 		errno = EBADF;
    951 		return (-1);
    952 	}
    953 
    954 	mqhp = mqdp->mqd_mq;
    955 
    956 	if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
    957 		mqdp->mqd_ownerdead = 1;
    958 		sigevp = NULL;
    959 		if (error == EOWNERDEAD)
    960 			(void) mutex_unlock(&mqhp->mq_exclusive);
    961 		/* carry on regardless, without holding mq_exclusive */
    962 	}
    963 
    964 	if (sigevp == NULL) {		/* remove notification */
    965 		if (mqhp->mq_des == (uintptr_t)mqdp &&
    966 		    mqhp->mq_sigid.sn_pid == getpid()) {
    967 			/* notification is set for this descriptor, remove it */
    968 			(void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
    969 			if ((tcdp = mqdp->mqd_tcd) != NULL) {
    970 				sig_mutex_lock(&tcdp->tcd_lock);
    971 				if (tcdp->tcd_msg_enabled) {
    972 					/* cancel the spawner thread */
    973 					tcdp = mqdp->mqd_tcd;
    974 					mqdp->mqd_tcd = NULL;
    975 					(void) pthread_cancel(
    976 					    tcdp->tcd_server_id);
    977 				}
    978 				sig_mutex_unlock(&tcdp->tcd_lock);
    979 			}
    980 			mqhp->mq_ntype = 0;
    981 			mqhp->mq_des = 0;
    982 		} else {
    983 			/* notification is not set for this descriptor */
    984 			errno = EBUSY;
    985 			goto bad;
    986 		}
    987 	} else {		/* register notification with this process */
    988 		switch (ntype = sigevp->sigev_notify) {
    989 		case SIGEV_THREAD:
    990 			userval = sigevp->sigev_value.sival_ptr;
    991 			port = -1;
    992 			break;
    993 		case SIGEV_PORT:
    994 			pn = sigevp->sigev_value.sival_ptr;
    995 			userval = pn->portnfy_user;
    996 			port = pn->portnfy_port;
    997 			if (fstat64(port, &statb) != 0 ||
    998 			    !S_ISPORT(statb.st_mode)) {
    999 				errno = EBADF;
   1000 				goto bad;
   1001 			}
   1002 			(void) memset(&sigevent, 0, sizeof (sigevent));
   1003 			sigevent.sigev_notify = SIGEV_PORT;
   1004 			sigevp = &sigevent;
   1005 			break;
   1006 		}
   1007 		switch (ntype) {
   1008 		case SIGEV_NONE:
   1009 			mq_siginfo.si_signo = 0;
   1010 			mq_siginfo.si_code = SI_MESGQ;
   1011 			break;
   1012 		case SIGEV_SIGNAL:
   1013 			mq_siginfo.si_signo = sigevp->sigev_signo;
   1014 			mq_siginfo.si_value = sigevp->sigev_value;
   1015 			mq_siginfo.si_code = SI_MESGQ;
   1016 			break;
   1017 		case SIGEV_THREAD:
   1018 			if ((tcdp = mqdp->mqd_tcd) != NULL &&
   1019 			    cancel_if_necessary(tcdp, sigevp))
   1020 				mqdp->mqd_tcd = NULL;
   1021 			/* FALLTHROUGH */
   1022 		case SIGEV_PORT:
   1023 			if ((tcdp = mqdp->mqd_tcd) == NULL) {
   1024 				/* we must create a spawner thread */
   1025 				tcdp = setup_sigev_handler(sigevp, MQ);
   1026 				if (tcdp == NULL) {
   1027 					errno = EBADF;
   1028 					goto bad;
   1029 				}
   1030 				tcdp->tcd_msg_enabled = 0;
   1031 				tcdp->tcd_msg_closing = 0;
   1032 				tcdp->tcd_msg_avail = &mqhp->mq_spawner;
   1033 				if (launch_spawner(tcdp) != 0) {
   1034 					free_sigev_handler(tcdp);
   1035 					goto bad;
   1036 				}
   1037 				mqdp->mqd_tcd = tcdp;
   1038 			}
   1039 			mq_siginfo.si_signo = 0;
   1040 			mq_siginfo.si_code = SI_MESGQ;
   1041 			break;
   1042 		default:
   1043 			errno = EINVAL;
   1044 			goto bad;
   1045 		}
   1046 
   1047 		/* register notification */
   1048 		if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
   1049 			goto bad;
   1050 		mqhp->mq_ntype = ntype;
   1051 		mqhp->mq_des = (uintptr_t)mqdp;
   1052 		switch (ntype) {
   1053 		case SIGEV_THREAD:
   1054 		case SIGEV_PORT:
   1055 			tcdp->tcd_port = port;
   1056 			tcdp->tcd_msg_object = mqdp;
   1057 			tcdp->tcd_msg_userval = userval;
   1058 			sig_mutex_lock(&tcdp->tcd_lock);
   1059 			tcdp->tcd_msg_enabled = ntype;
   1060 			sig_mutex_unlock(&tcdp->tcd_lock);
   1061 			(void) cond_broadcast(&tcdp->tcd_cv);
   1062 			break;
   1063 		}
   1064 	}
   1065 
   1066 	rval = 0;	/* success */
   1067 bad:
   1068 	if (error == 0) {
   1069 		(void) mutex_unlock(&mqhp->mq_exclusive);
   1070 	} else {
   1071 		errno = EBADMSG;
   1072 		rval = -1;
   1073 	}
   1074 	return (rval);
   1075 }
   1076 
   1077 int
   1078 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
   1079 {
   1080 	mqdes_t *mqdp = (mqdes_t *)mqdes;
   1081 	mqhdr_t *mqhp;
   1082 	uint_t	flag = 0;
   1083 
   1084 	if (!mq_is_valid(mqdp)) {
   1085 		errno = EBADF;
   1086 		return (-1);
   1087 	}
   1088 
   1089 	/* store current attributes */
   1090 	if (omqstat != NULL) {
   1091 		int	count;
   1092 
   1093 		mqhp = mqdp->mqd_mq;
   1094 		omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
   1095 		omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
   1096 		omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
   1097 		(void) sem_getvalue(&mqhp->mq_notempty, &count);
   1098 		omqstat->mq_curmsgs = count;
   1099 	}
   1100 
   1101 	/* set description attributes */
   1102 	if ((mqstat->mq_flags & O_NONBLOCK) != 0)
   1103 		flag = FNONBLOCK;
   1104 	mqdp->mqd_mqdn->mqdn_flags = flag;
   1105 
   1106 	return (0);
   1107 }
   1108 
   1109 int
   1110 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
   1111 {
   1112 	mqdes_t *mqdp = (mqdes_t *)mqdes;
   1113 	mqhdr_t *mqhp;
   1114 	int count;
   1115 
   1116 	if (!mq_is_valid(mqdp)) {
   1117 		errno = EBADF;
   1118 		return (-1);
   1119 	}
   1120 
   1121 	mqhp = mqdp->mqd_mq;
   1122 
   1123 	mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
   1124 	mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
   1125 	mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
   1126 	(void) sem_getvalue(&mqhp->mq_notempty, &count);
   1127 	mqstat->mq_curmsgs = count;
   1128 	return (0);
   1129 }
   1130 
   1131 /*
   1132  * Cleanup after fork1() in the child process.
   1133  */
   1134 void
   1135 postfork1_child_sigev_mq(void)
   1136 {
   1137 	thread_communication_data_t *tcdp;
   1138 	mqdes_t *mqdp;
   1139 
   1140 	for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
   1141 		if ((tcdp = mqdp->mqd_tcd) != NULL) {
   1142 			mqdp->mqd_tcd = NULL;
   1143 			tcd_teardown(tcdp);
   1144 		}
   1145 	}
   1146 }
   1147