Home | History | Annotate | Download | only in rt
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     28 
     29 #include "lint.h"
     30 #include "thr_uberdata.h"
     31 #include <sys/types.h>
     32 #include <pthread.h>
     33 #include <unistd.h>
     34 #include <stdlib.h>
     35 #include <thread.h>
     36 #include <pthread.h>
     37 #include <synch.h>
     38 #include <port.h>
     39 #include <signal.h>
     40 #include <stdio.h>
     41 #include <errno.h>
     42 #include <stdarg.h>
     43 #include <string.h>
     44 #include <sys/aiocb.h>
     45 #include <time.h>
     46 #include <signal.h>
     47 #include <fcntl.h>
     48 #include "sigev_thread.h"
     49 
     50 /*
     51  * There is but one spawner for all aio operations.
     52  */
     53 thread_communication_data_t *sigev_aio_tcd = NULL;
     54 
     55 /*
     56  * Set non-zero via _RT_DEBUG to enable debugging printf's.
     57  */
     58 static int _rt_debug = 0;
     59 
     60 void
     61 init_sigev_thread(void)
     62 {
     63 	char *ldebug;
     64 
     65 	if ((ldebug = getenv("_RT_DEBUG")) != NULL)
     66 		_rt_debug = atoi(ldebug);
     67 }
     68 
     69 /*
     70  * Routine to print debug messages:
     71  * If _rt_debug is set, printf the debug message to stderr
     72  * with an appropriate prefix.
     73  */
     74 /*PRINTFLIKE1*/
     75 static void
     76 dprintf(const char *format, ...)
     77 {
     78 	if (_rt_debug) {
     79 		va_list alist;
     80 
     81 		va_start(alist, format);
     82 		flockfile(stderr);
     83 		pthread_cleanup_push(funlockfile, stderr);
     84 		(void) fputs("DEBUG: ", stderr);
     85 		(void) vfprintf(stderr, format, alist);
     86 		pthread_cleanup_pop(1);		/* funlockfile(stderr) */
     87 		va_end(alist);
     88 	}
     89 }
     90 
     91 /*
     92  * The notify_thread() function can be used as the start function of a new
     93  * thread but it is normally called from notifier(), below, in the context
     94  * of a thread pool worker thread.  It is used as the start function of a
     95  * new thread only when individual pthread attributes differ from those
     96  * that are common to all workers.  This only occurs in the AIO case.
     97  */
     98 static void *
     99 notify_thread(void *arg)
    100 {
    101 	sigev_thread_data_t *stdp = arg;
    102 	void (*function)(union sigval) = stdp->std_func;
    103 	union sigval argument = stdp->std_arg;
    104 
    105 	lfree(stdp, sizeof (*stdp));
    106 	function(argument);
    107 	return (NULL);
    108 }
    109 
    110 /*
    111  * Thread pool interface to call the user-supplied notification function.
    112  */
    113 static void
    114 notifier(void *arg)
    115 {
    116 	(void) notify_thread(arg);
    117 }
    118 
    119 /*
    120  * This routine adds a new work request, described by function
    121  * and argument, to the list of outstanding jobs.
    122  * It returns 0 indicating success.  A value != 0 indicates an error.
    123  */
    124 static int
    125 sigev_add_work(thread_communication_data_t *tcdp,
    126 	void (*function)(union sigval), union sigval argument)
    127 {
    128 	tpool_t *tpool = tcdp->tcd_poolp;
    129 	sigev_thread_data_t *stdp;
    130 
    131 	if (tpool == NULL)
    132 		return (EINVAL);
    133 	if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
    134 		return (errno);
    135 	stdp->std_func = function;
    136 	stdp->std_arg = argument;
    137 	if (tpool_dispatch(tpool, notifier, stdp) != 0) {
    138 		lfree(stdp, sizeof (*stdp));
    139 		return (errno);
    140 	}
    141 	return (0);
    142 }
    143 
    144 static void
    145 sigev_destroy_pool(thread_communication_data_t *tcdp)
    146 {
    147 	if (tcdp->tcd_poolp != NULL)
    148 		tpool_abandon(tcdp->tcd_poolp);
    149 	tcdp->tcd_poolp = NULL;
    150 
    151 	if (tcdp->tcd_subsystem == MQ) {
    152 		/*
    153 		 * synchronize with del_sigev_mq()
    154 		 */
    155 		sig_mutex_lock(&tcdp->tcd_lock);
    156 		tcdp->tcd_server_id = 0;
    157 		if (tcdp->tcd_msg_closing) {
    158 			(void) cond_broadcast(&tcdp->tcd_cv);
    159 			sig_mutex_unlock(&tcdp->tcd_lock);
    160 			return;		/* del_sigev_mq() will free the tcd */
    161 		}
    162 		sig_mutex_unlock(&tcdp->tcd_lock);
    163 	}
    164 
    165 	/*
    166 	 * now delete everything
    167 	 */
    168 	free_sigev_handler(tcdp);
    169 }
    170 
    171 /*
    172  * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
    173  * functions for the daemon threads that get the event(s) for the
    174  * respective SIGEV_THREAD subsystems.  There is one timer spawner for
    175  * each timer_create(), one mqueue spawner for every mq_open(), and
    176  * exactly one aio spawner for all aio requests.  These spawners add
    177  * work requests to be done by a pool of daemon worker threads.  In case
    178  * the event requires creation of a worker thread with different pthread
    179  * attributes than those from the pool of workers, a new daemon thread
    180  * with these attributes is spawned apart from the pool of workers.
    181  * If the spawner fails to add work or fails to create an additional
    182  * thread because of lacking resources, it puts the event back into
    183  * the kernel queue and re-tries some time later.
    184  */
    185 
    186 void *
    187 timer_spawner(void *arg)
    188 {
    189 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
    190 	port_event_t port_event;
    191 
    192 	/* destroy the pool if we are cancelled */
    193 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
    194 
    195 	for (;;) {
    196 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
    197 			dprintf("port_get on port %d failed with %d <%s>\n",
    198 			    tcdp->tcd_port, errno, strerror(errno));
    199 			break;
    200 		}
    201 		switch (port_event.portev_source) {
    202 		case PORT_SOURCE_TIMER:
    203 			break;
    204 		case PORT_SOURCE_ALERT:
    205 			if (port_event.portev_events != SIGEV_THREAD_TERM)
    206 				errno = EPROTO;
    207 			goto out;
    208 		default:
    209 			dprintf("port_get on port %d returned %u "
    210 			    "(not PORT_SOURCE_TIMER)\n",
    211 			    tcdp->tcd_port, port_event.portev_source);
    212 			errno = EPROTO;
    213 			goto out;
    214 		}
    215 
    216 		tcdp->tcd_overruns = port_event.portev_events - 1;
    217 		if (sigev_add_work(tcdp,
    218 		    tcdp->tcd_notif.sigev_notify_function,
    219 		    tcdp->tcd_notif.sigev_value) != 0)
    220 			break;
    221 		/* wait until job is done before looking for another */
    222 		tpool_wait(tcdp->tcd_poolp);
    223 	}
    224 out:
    225 	pthread_cleanup_pop(1);
    226 	return (NULL);
    227 }
    228 
    229 void *
    230 mqueue_spawner(void *arg)
    231 {
    232 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
    233 	int ret = 0;
    234 	int ntype;
    235 	void (*function)(union sigval);
    236 	union sigval argument;
    237 
    238 	/* destroy the pool if we are cancelled */
    239 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
    240 
    241 	while (ret == 0) {
    242 		sig_mutex_lock(&tcdp->tcd_lock);
    243 		pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
    244 		while ((ntype = tcdp->tcd_msg_enabled) == 0)
    245 			(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
    246 		pthread_cleanup_pop(1);
    247 
    248 		while (sem_wait(tcdp->tcd_msg_avail) == -1)
    249 			continue;
    250 
    251 		sig_mutex_lock(&tcdp->tcd_lock);
    252 		tcdp->tcd_msg_enabled = 0;
    253 		sig_mutex_unlock(&tcdp->tcd_lock);
    254 
    255 		/* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
    256 		if (ntype == SIGEV_THREAD) {
    257 			function = tcdp->tcd_notif.sigev_notify_function;
    258 			argument.sival_ptr = tcdp->tcd_msg_userval;
    259 			ret = sigev_add_work(tcdp, function, argument);
    260 		} else {	/* ntype == SIGEV_PORT */
    261 			ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
    262 			    0, (uintptr_t)tcdp->tcd_msg_object,
    263 			    tcdp->tcd_msg_userval);
    264 		}
    265 	}
    266 	sig_mutex_unlock(&tcdp->tcd_lock);
    267 
    268 	pthread_cleanup_pop(1);
    269 	return (NULL);
    270 }
    271 
    272 void *
    273 aio_spawner(void *arg)
    274 {
    275 	thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
    276 	int error = 0;
    277 	void (*function)(union sigval);
    278 	union sigval argument;
    279 	port_event_t port_event;
    280 	struct sigevent *sigevp;
    281 	timespec_t delta;
    282 	pthread_attr_t *attrp;
    283 
    284 	/* destroy the pool if we are cancelled */
    285 	pthread_cleanup_push(sigev_destroy_pool, tcdp);
    286 
    287 	while (error == 0) {
    288 		if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
    289 			error = errno;
    290 			dprintf("port_get on port %d failed with %d <%s>\n",
    291 			    tcdp->tcd_port, error, strerror(error));
    292 			break;
    293 		}
    294 		switch (port_event.portev_source) {
    295 		case PORT_SOURCE_AIO:
    296 			break;
    297 		case PORT_SOURCE_ALERT:
    298 			if (port_event.portev_events != SIGEV_THREAD_TERM)
    299 				errno = EPROTO;
    300 			goto out;
    301 		default:
    302 			dprintf("port_get on port %d returned %u "
    303 			    "(not PORT_SOURCE_AIO)\n",
    304 			    tcdp->tcd_port, port_event.portev_source);
    305 			errno = EPROTO;
    306 			goto out;
    307 		}
    308 		argument.sival_ptr = port_event.portev_user;
    309 		switch (port_event.portev_events) {
    310 		case AIOLIO:
    311 #if !defined(_LP64)
    312 		case AIOLIO64:
    313 #endif
    314 			sigevp = (struct sigevent *)port_event.portev_object;
    315 			function = sigevp->sigev_notify_function;
    316 			attrp = sigevp->sigev_notify_attributes;
    317 			break;
    318 		case AIOAREAD:
    319 		case AIOAWRITE:
    320 		case AIOFSYNC:
    321 			{
    322 			aiocb_t *aiocbp =
    323 			    (aiocb_t *)port_event.portev_object;
    324 			function = aiocbp->aio_sigevent.sigev_notify_function;
    325 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
    326 			break;
    327 			}
    328 #if !defined(_LP64)
    329 		case AIOAREAD64:
    330 		case AIOAWRITE64:
    331 		case AIOFSYNC64:
    332 			{
    333 			aiocb64_t *aiocbp =
    334 			    (aiocb64_t *)port_event.portev_object;
    335 			function = aiocbp->aio_sigevent.sigev_notify_function;
    336 			attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
    337 			break;
    338 			}
    339 #endif
    340 		default:
    341 			function = NULL;
    342 			attrp = NULL;
    343 			break;
    344 		}
    345 
    346 		if (function == NULL)
    347 			error = EINVAL;
    348 		else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
    349 			error = sigev_add_work(tcdp, function, argument);
    350 		else {
    351 			/*
    352 			 * The attributes don't match.
    353 			 * Spawn a thread with the non-matching attributes.
    354 			 */
    355 			pthread_attr_t local_attr;
    356 			sigev_thread_data_t *stdp;
    357 
    358 			if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
    359 				error = ENOMEM;
    360 			else
    361 				error = pthread_attr_clone(&local_attr, attrp);
    362 
    363 			if (error == 0) {
    364 				(void) pthread_attr_setdetachstate(
    365 				    &local_attr, PTHREAD_CREATE_DETACHED);
    366 				(void) pthread_attr_setdaemonstate_np(
    367 				    &local_attr, PTHREAD_CREATE_DAEMON_NP);
    368 				stdp->std_func = function;
    369 				stdp->std_arg = argument;
    370 				error = pthread_create(NULL, &local_attr,
    371 				    notify_thread, stdp);
    372 				(void) pthread_attr_destroy(&local_attr);
    373 			}
    374 			if (error && stdp != NULL)
    375 				lfree(stdp, sizeof (*stdp));
    376 		}
    377 
    378 		if (error) {
    379 			dprintf("Cannot add work, error=%d <%s>.\n",
    380 			    error, strerror(error));
    381 			if (error == EAGAIN || error == ENOMEM) {
    382 				/* (Temporary) no resources are available. */
    383 				if (_port_dispatch(tcdp->tcd_port, 0,
    384 				    PORT_SOURCE_AIO, port_event.portev_events,
    385 				    port_event.portev_object,
    386 				    port_event.portev_user) != 0)
    387 					break;
    388 				error = 0;
    389 				delta.tv_sec = 0;
    390 				delta.tv_nsec = NANOSEC / 20;	/* 50 msec */
    391 				(void) nanosleep(&delta, NULL);
    392 			}
    393 		}
    394 	}
    395 out:
    396 	pthread_cleanup_pop(1);
    397 	return (NULL);
    398 }
    399 
    400 /*
    401  * Allocate a thread_communication_data_t block.
    402  */
    403 static thread_communication_data_t *
    404 alloc_sigev_handler(subsystem_t caller)
    405 {
    406 	thread_communication_data_t *tcdp;
    407 
    408 	if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
    409 		tcdp->tcd_subsystem = caller;
    410 		tcdp->tcd_port = -1;
    411 		(void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
    412 		(void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
    413 	}
    414 	return (tcdp);
    415 }
    416 
    417 /*
    418  * Free a thread_communication_data_t block.
    419  */
    420 void
    421 free_sigev_handler(thread_communication_data_t *tcdp)
    422 {
    423 	if (tcdp->tcd_attrp) {
    424 		(void) pthread_attr_destroy(tcdp->tcd_attrp);
    425 		tcdp->tcd_attrp = NULL;
    426 	}
    427 	(void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
    428 
    429 	switch (tcdp->tcd_subsystem) {
    430 	case TIMER:
    431 	case AIO:
    432 		if (tcdp->tcd_port >= 0)
    433 			(void) close(tcdp->tcd_port);
    434 		break;
    435 	case MQ:
    436 		tcdp->tcd_msg_avail = NULL;
    437 		tcdp->tcd_msg_object = NULL;
    438 		tcdp->tcd_msg_userval = NULL;
    439 		tcdp->tcd_msg_enabled = 0;
    440 		break;
    441 	}
    442 
    443 	lfree(tcdp, sizeof (*tcdp));
    444 }
    445 
    446 /*
    447  * Initialize data structure and create the port.
    448  */
    449 thread_communication_data_t *
    450 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
    451 {
    452 	thread_communication_data_t *tcdp;
    453 	int error;
    454 
    455 	if (sigevp == NULL) {
    456 		errno = EINVAL;
    457 		return (NULL);
    458 	}
    459 
    460 	if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
    461 		errno = ENOMEM;
    462 		return (NULL);
    463 	}
    464 
    465 	if (sigevp->sigev_notify_attributes == NULL)
    466 		tcdp->tcd_attrp = NULL;		/* default attributes */
    467 	else {
    468 		/*
    469 		 * We cannot just copy the sigevp->sigev_notify_attributes
    470 		 * pointer.  We need to initialize a new pthread_attr_t
    471 		 * structure with the values from the user-supplied
    472 		 * pthread_attr_t.
    473 		 */
    474 		tcdp->tcd_attrp = &tcdp->tcd_user_attr;
    475 		error = pthread_attr_clone(tcdp->tcd_attrp,
    476 		    sigevp->sigev_notify_attributes);
    477 		if (error) {
    478 			tcdp->tcd_attrp = NULL;
    479 			free_sigev_handler(tcdp);
    480 			errno = error;
    481 			return (NULL);
    482 		}
    483 	}
    484 	tcdp->tcd_notif = *sigevp;
    485 	tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
    486 
    487 	if (caller == TIMER || caller == AIO) {
    488 		if ((tcdp->tcd_port = port_create()) < 0 ||
    489 		    fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
    490 			free_sigev_handler(tcdp);
    491 			errno = EBADF;
    492 			return (NULL);
    493 		}
    494 	}
    495 	return (tcdp);
    496 }
    497 
    498 /*
    499  * Create a thread pool and launch the spawner.
    500  */
    501 int
    502 launch_spawner(thread_communication_data_t *tcdp)
    503 {
    504 	int ret;
    505 	int maxworkers;
    506 	void *(*spawner)(void *);
    507 	sigset_t set;
    508 	sigset_t oset;
    509 
    510 	switch (tcdp->tcd_subsystem) {
    511 	case TIMER:
    512 		spawner = timer_spawner;
    513 		maxworkers = 1;
    514 		break;
    515 	case MQ:
    516 		spawner = mqueue_spawner;
    517 		maxworkers = 1;
    518 		break;
    519 	case AIO:
    520 		spawner = aio_spawner;
    521 		maxworkers = 100;
    522 		break;
    523 	default:
    524 		return (-1);
    525 	}
    526 	tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
    527 	    tcdp->tcd_notif.sigev_notify_attributes);
    528 	if (tcdp->tcd_poolp == NULL)
    529 		return (-1);
    530 	/* create the spawner with all signals blocked */
    531 	(void) sigfillset(&set);
    532 	(void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
    533 	ret = thr_create(NULL, 0, spawner, tcdp,
    534 	    THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
    535 	(void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
    536 	if (ret != 0) {
    537 		tpool_destroy(tcdp->tcd_poolp);
    538 		tcdp->tcd_poolp = NULL;
    539 		return (-1);
    540 	}
    541 	return (0);
    542 }
    543 
    544 /*
    545  * Delete the data associated with the sigev_thread timer, if timer is
    546  * associated with such a notification option.
    547  * Destroy the timer_spawner thread.
    548  */
    549 int
    550 del_sigev_timer(timer_t timer)
    551 {
    552 	int rc = 0;
    553 	thread_communication_data_t *tcdp;
    554 
    555 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
    556 		sig_mutex_lock(&tcdp->tcd_lock);
    557 		if (tcdp->tcd_port >= 0) {
    558 			if ((rc = port_alert(tcdp->tcd_port,
    559 			    PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
    560 				dprintf("del_sigev_timer(%d) OK.\n", timer);
    561 			}
    562 		}
    563 		timer_tcd[timer] = NULL;
    564 		sig_mutex_unlock(&tcdp->tcd_lock);
    565 	}
    566 	return (rc);
    567 }
    568 
    569 int
    570 sigev_timer_getoverrun(timer_t timer)
    571 {
    572 	thread_communication_data_t *tcdp;
    573 
    574 	if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
    575 		return (tcdp->tcd_overruns);
    576 	return (0);
    577 }
    578 
    579 static void
    580 del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
    581 {
    582 	sig_mutex_unlock(&tcdp->tcd_lock);
    583 	free_sigev_handler(tcdp);
    584 }
    585 
    586 /*
    587  * Delete the data associated with the sigev_thread message queue,
    588  * if the message queue is associated with such a notification option.
    589  * Destroy the mqueue_spawner thread.
    590  */
    591 void
    592 del_sigev_mq(thread_communication_data_t *tcdp)
    593 {
    594 	pthread_t server_id;
    595 	int rc;
    596 
    597 	sig_mutex_lock(&tcdp->tcd_lock);
    598 
    599 	server_id = tcdp->tcd_server_id;
    600 	tcdp->tcd_msg_closing = 1;
    601 	if ((rc = pthread_cancel(server_id)) != 0) {	/* "can't happen" */
    602 		sig_mutex_unlock(&tcdp->tcd_lock);
    603 		dprintf("Fail to cancel %u with error %d <%s>.\n",
    604 		    server_id, rc, strerror(rc));
    605 		return;
    606 	}
    607 
    608 	/*
    609 	 * wait for sigev_destroy_pool() to finish
    610 	 */
    611 	pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
    612 	while (tcdp->tcd_server_id == server_id)
    613 		(void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
    614 	pthread_cleanup_pop(1);
    615 }
    616 
    617 /*
    618  * POSIX aio:
    619  * If the notification type is SIGEV_THREAD, set up
    620  * the port number for notifications.  Create the
    621  * thread pool and launch the spawner if necessary.
    622  * If the notification type is not SIGEV_THREAD, do nothing.
    623  */
    624 int
    625 _aio_sigev_thread_init(struct sigevent *sigevp)
    626 {
    627 	static mutex_t sigev_aio_lock = DEFAULTMUTEX;
    628 	static cond_t sigev_aio_cv = DEFAULTCV;
    629 	static int sigev_aio_busy = 0;
    630 
    631 	thread_communication_data_t *tcdp;
    632 	int port;
    633 	int cancel_state;
    634 	int rc = 0;
    635 
    636 	if (sigevp == NULL ||
    637 	    sigevp->sigev_notify != SIGEV_THREAD ||
    638 	    sigevp->sigev_notify_function == NULL)
    639 		return (0);
    640 
    641 	lmutex_lock(&sigev_aio_lock);
    642 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
    643 	while (sigev_aio_busy)
    644 		(void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
    645 	(void) pthread_setcancelstate(cancel_state, NULL);
    646 	if ((tcdp = sigev_aio_tcd) != NULL)
    647 		port = tcdp->tcd_port;
    648 	else {
    649 		sigev_aio_busy = 1;
    650 		lmutex_unlock(&sigev_aio_lock);
    651 
    652 		tcdp = setup_sigev_handler(sigevp, AIO);
    653 		if (tcdp == NULL) {
    654 			port = -1;
    655 			rc = -1;
    656 		} else if (launch_spawner(tcdp) != 0) {
    657 			free_sigev_handler(tcdp);
    658 			tcdp = NULL;
    659 			port = -1;
    660 			rc = -1;
    661 		} else {
    662 			port = tcdp->tcd_port;
    663 		}
    664 
    665 		lmutex_lock(&sigev_aio_lock);
    666 		sigev_aio_tcd = tcdp;
    667 		sigev_aio_busy = 0;
    668 		(void) cond_broadcast(&sigev_aio_cv);
    669 	}
    670 	lmutex_unlock(&sigev_aio_lock);
    671 	sigevp->sigev_signo = port;
    672 	return (rc);
    673 }
    674 
    675 int
    676 _aio_sigev_thread(aiocb_t *aiocbp)
    677 {
    678 	if (aiocbp == NULL)
    679 		return (0);
    680 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
    681 }
    682 
    683 #if !defined(_LP64)
    684 int
    685 _aio_sigev_thread64(aiocb64_t *aiocbp)
    686 {
    687 	if (aiocbp == NULL)
    688 		return (0);
    689 	return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
    690 }
    691 #endif
    692 
    693 /*
    694  * Cleanup POSIX aio after fork1() in the child process.
    695  */
    696 void
    697 postfork1_child_sigev_aio(void)
    698 {
    699 	thread_communication_data_t *tcdp;
    700 
    701 	if ((tcdp = sigev_aio_tcd) != NULL) {
    702 		sigev_aio_tcd = NULL;
    703 		tcd_teardown(tcdp);
    704 	}
    705 }
    706 
    707 /*
    708  * Utility function for the various postfork1_child_sigev_*() functions.
    709  * Clean up the tcdp data structure and close the port.
    710  */
    711 void
    712 tcd_teardown(thread_communication_data_t *tcdp)
    713 {
    714 	if (tcdp->tcd_poolp != NULL)
    715 		tpool_abandon(tcdp->tcd_poolp);
    716 	tcdp->tcd_poolp = NULL;
    717 	tcdp->tcd_server_id = 0;
    718 	free_sigev_handler(tcdp);
    719 }
    720