Home | History | Annotate | Download | only in aio
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     28 
     29 #include "synonyms.h"
     30 #include "thr_uberdata.h"
     31 #include "asyncio.h"
     32 #include <atomic.h>
     33 #include <sys/param.h>
     34 #include <sys/file.h>
     35 #include <sys/port.h>
     36 
     37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
     38 static aio_req_t *_aio_req_get(aio_worker_t *);
     39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
     40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
     41 static void _aio_work_done(aio_worker_t *);
     42 static void _aio_enq_doneq(aio_req_t *);
     43 
     44 extern void _aio_lio_free(aio_lio_t *);
     45 
     46 extern int __fdsync(int, int);
     47 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
     48 
     49 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
     50 static void _aiodone(aio_req_t *, ssize_t, int);
     51 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
     52 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
     53 
     54 /*
     55  * switch for kernel async I/O
     56  */
     57 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
     58 
     59 /*
     60  * Key for thread-specific data
     61  */
     62 pthread_key_t _aio_key;
     63 
     64 /*
     65  * Array for determining whether or not a file supports kaio.
     66  * Initialized in _kaio_init().
     67  */
     68 uint32_t *_kaio_supported = NULL;
     69 
     70 /*
     71  *  workers for read/write requests
     72  * (__aio_mutex lock protects circular linked list of workers)
     73  */
     74 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
     75 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
     76 int __rw_workerscnt;		/* number of read/write workers */
     77 
     78 /*
     79  * worker for notification requests.
     80  */
     81 aio_worker_t *__workers_no;	/* circular list of AIO workers */
     82 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
     83 int __no_workerscnt;		/* number of write workers */
     84 
     85 aio_req_t *_aio_done_tail;		/* list of done requests */
     86 aio_req_t *_aio_done_head;
     87 
     88 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
     89 cond_t __aio_initcv = DEFAULTCV;
     90 int __aio_initbusy = 0;
     91 
     92 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
     93 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
     94 
     95 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
     96 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
     97 
     98 aio_hash_t *_aio_hash;
     99 
    100 aio_req_t *_aio_doneq;			/* double linked done queue list */
    101 
    102 int _aio_donecnt = 0;
    103 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
    104 int _aio_doneq_cnt = 0;
    105 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
    106 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
    107 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
    108 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
    109 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
    110 
    111 int _max_workers = 256;			/* max number of workers permitted */
    112 int _min_workers = 4;			/* min number of workers */
    113 int _minworkload = 2;			/* min number of request in q */
    114 int _aio_worker_cnt = 0;		/* number of workers to do requests */
    115 int __uaio_ok = 0;			/* AIO has been enabled */
    116 sigset_t _worker_set;			/* worker's signal mask */
    117 
    118 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
    119 int _aio_flags = 0;			/* see asyncio.h defines for */
    120 
    121 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
    122 
    123 int hz;					/* clock ticks per second */
    124 
    125 static int
    126 _kaio_supported_init(void)
    127 {
    128 	void *ptr;
    129 	size_t size;
    130 
    131 	if (_kaio_supported != NULL)	/* already initialized */
    132 		return (0);
    133 
    134 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
    135 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
    136 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
    137 	if (ptr == MAP_FAILED)
    138 		return (-1);
    139 	_kaio_supported = ptr;
    140 	return (0);
    141 }
    142 
    143 /*
    144  * The aio subsystem is initialized when an AIO request is made.
    145  * Constants are initialized like the max number of workers that
    146  * the subsystem can create, and the minimum number of workers
    147  * permitted before imposing some restrictions.  Also, some
    148  * workers are created.
    149  */
    150 int
    151 __uaio_init(void)
    152 {
    153 	int ret = -1;
    154 	int i;
    155 
    156 	lmutex_lock(&__aio_initlock);
    157 	while (__aio_initbusy)
    158 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
    159 	if (__uaio_ok) {	/* already initialized */
    160 		lmutex_unlock(&__aio_initlock);
    161 		return (0);
    162 	}
    163 	__aio_initbusy = 1;
    164 	lmutex_unlock(&__aio_initlock);
    165 
    166 	hz = (int)sysconf(_SC_CLK_TCK);
    167 	__pid = getpid();
    168 
    169 	setup_cancelsig(SIGAIOCANCEL);
    170 
    171 	if (_kaio_supported_init() != 0)
    172 		goto out;
    173 
    174 	/*
    175 	 * Allocate and initialize the hash table.
    176 	 */
    177 	/* LINTED pointer cast */
    178 	_aio_hash = (aio_hash_t *)mmap(NULL,
    179 	    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
    180 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
    181 	if ((void *)_aio_hash == MAP_FAILED) {
    182 		_aio_hash = NULL;
    183 		goto out;
    184 	}
    185 	for (i = 0; i < HASHSZ; i++)
    186 		(void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL);
    187 
    188 	/*
    189 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
    190 	 */
    191 	(void) sigfillset(&_worker_set);
    192 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
    193 
    194 	/*
    195 	 * Create the minimum number of read/write workers.
    196 	 */
    197 	for (i = 0; i < _min_workers; i++)
    198 		(void) _aio_create_worker(NULL, AIOREAD);
    199 
    200 	/*
    201 	 * Create one worker to send asynchronous notifications.
    202 	 */
    203 	(void) _aio_create_worker(NULL, AIONOTIFY);
    204 
    205 	ret = 0;
    206 out:
    207 	lmutex_lock(&__aio_initlock);
    208 	if (ret == 0)
    209 		__uaio_ok = 1;
    210 	__aio_initbusy = 0;
    211 	(void) cond_broadcast(&__aio_initcv);
    212 	lmutex_unlock(&__aio_initlock);
    213 	return (ret);
    214 }
    215 
    216 /*
    217  * Called from close() before actually performing the real _close().
    218  */
    219 void
    220 _aio_close(int fd)
    221 {
    222 	if (fd < 0)	/* avoid cancelling everything */
    223 		return;
    224 	/*
    225 	 * Cancel all outstanding aio requests for this file descriptor.
    226 	 */
    227 	if (__uaio_ok)
    228 		(void) aiocancel_all(fd);
    229 	/*
    230 	 * If we have allocated the bit array, clear the bit for this file.
    231 	 * The next open may re-use this file descriptor and the new file
    232 	 * may have different kaio() behaviour.
    233 	 */
    234 	if (_kaio_supported != NULL)
    235 		CLEAR_KAIO_SUPPORTED(fd);
    236 }
    237 
    238 /*
    239  * special kaio cleanup thread sits in a loop in the
    240  * kernel waiting for pending kaio requests to complete.
    241  */
    242 void *
    243 _kaio_cleanup_thread(void *arg)
    244 {
    245 	if (pthread_setspecific(_aio_key, arg) != 0)
    246 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
    247 	(void) _kaio(AIOSTART);
    248 	return (arg);
    249 }
    250 
    251 /*
    252  * initialize kaio.
    253  */
    254 void
    255 _kaio_init()
    256 {
    257 	int error;
    258 	sigset_t oset;
    259 
    260 	lmutex_lock(&__aio_initlock);
    261 	while (__aio_initbusy)
    262 		(void) _cond_wait(&__aio_initcv, &__aio_initlock);
    263 	if (_kaio_ok) {		/* already initialized */
    264 		lmutex_unlock(&__aio_initlock);
    265 		return;
    266 	}
    267 	__aio_initbusy = 1;
    268 	lmutex_unlock(&__aio_initlock);
    269 
    270 	if (_kaio_supported_init() != 0)
    271 		error = ENOMEM;
    272 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
    273 		error = ENOMEM;
    274 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
    275 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
    276 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
    277 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
    278 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
    279 	}
    280 	if (error && _kaiowp != NULL) {
    281 		_aio_worker_free(_kaiowp);
    282 		_kaiowp = NULL;
    283 	}
    284 
    285 	lmutex_lock(&__aio_initlock);
    286 	if (error)
    287 		_kaio_ok = -1;
    288 	else
    289 		_kaio_ok = 1;
    290 	__aio_initbusy = 0;
    291 	(void) cond_broadcast(&__aio_initcv);
    292 	lmutex_unlock(&__aio_initlock);
    293 }
    294 
    295 int
    296 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
    297     aio_result_t *resultp)
    298 {
    299 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
    300 }
    301 
    302 int
    303 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
    304     aio_result_t *resultp)
    305 {
    306 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
    307 }
    308 
    309 #if !defined(_LP64)
    310 int
    311 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
    312     aio_result_t *resultp)
    313 {
    314 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
    315 }
    316 
    317 int
    318 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
    319     aio_result_t *resultp)
    320 {
    321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
    322 }
    323 #endif	/* !defined(_LP64) */
    324 
    325 int
    326 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
    327     aio_result_t *resultp, int mode)
    328 {
    329 	aio_req_t *reqp;
    330 	aio_args_t *ap;
    331 	offset_t loffset;
    332 	struct stat stat;
    333 	int error = 0;
    334 	int kerr;
    335 	int umode;
    336 
    337 	switch (whence) {
    338 
    339 	case SEEK_SET:
    340 		loffset = offset;
    341 		break;
    342 	case SEEK_CUR:
    343 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
    344 			error = -1;
    345 		else
    346 			loffset += offset;
    347 		break;
    348 	case SEEK_END:
    349 		if (fstat(fd, &stat) == -1)
    350 			error = -1;
    351 		else
    352 			loffset = offset + stat.st_size;
    353 		break;
    354 	default:
    355 		errno = EINVAL;
    356 		error = -1;
    357 	}
    358 
    359 	if (error)
    360 		return (error);
    361 
    362 	/* initialize kaio */
    363 	if (!_kaio_ok)
    364 		_kaio_init();
    365 
    366 	/*
    367 	 * _aio_do_request() needs the original request code (mode) to be able
    368 	 * to choose the appropiate 32/64 bit function.  All other functions
    369 	 * only require the difference between READ and WRITE (umode).
    370 	 */
    371 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
    372 		umode = mode - AIOAREAD64;
    373 	else
    374 		umode = mode;
    375 
    376 	/*
    377 	 * Try kernel aio first.
    378 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
    379 	 */
    380 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
    381 		resultp->aio_errno = 0;
    382 		sig_mutex_lock(&__aio_mutex);
    383 		_kaio_outstand_cnt++;
    384 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
    385 		    (umode | AIO_POLL_BIT) : umode),
    386 		    fd, buf, bufsz, loffset, resultp);
    387 		if (kerr == 0) {
    388 			sig_mutex_unlock(&__aio_mutex);
    389 			return (0);
    390 		}
    391 		_kaio_outstand_cnt--;
    392 		sig_mutex_unlock(&__aio_mutex);
    393 		if (errno != ENOTSUP && errno != EBADFD)
    394 			return (-1);
    395 		if (errno == EBADFD)
    396 			SET_KAIO_NOT_SUPPORTED(fd);
    397 	}
    398 
    399 	if (!__uaio_ok && __uaio_init() == -1)
    400 		return (-1);
    401 
    402 	if ((reqp = _aio_req_alloc()) == NULL) {
    403 		errno = EAGAIN;
    404 		return (-1);
    405 	}
    406 
    407 	/*
    408 	 * _aio_do_request() checks reqp->req_op to differentiate
    409 	 * between 32 and 64 bit access.
    410 	 */
    411 	reqp->req_op = mode;
    412 	reqp->req_resultp = resultp;
    413 	ap = &reqp->req_args;
    414 	ap->fd = fd;
    415 	ap->buf = buf;
    416 	ap->bufsz = bufsz;
    417 	ap->offset = loffset;
    418 
    419 	if (_aio_hash_insert(resultp, reqp) != 0) {
    420 		_aio_req_free(reqp);
    421 		errno = EINVAL;
    422 		return (-1);
    423 	}
    424 	/*
    425 	 * _aio_req_add() only needs the difference between READ and
    426 	 * WRITE to choose the right worker queue.
    427 	 */
    428 	_aio_req_add(reqp, &__nextworker_rw, umode);
    429 	return (0);
    430 }
    431 
    432 int
    433 aiocancel(aio_result_t *resultp)
    434 {
    435 	aio_req_t *reqp;
    436 	aio_worker_t *aiowp;
    437 	int ret;
    438 	int done = 0;
    439 	int canceled = 0;
    440 
    441 	if (!__uaio_ok) {
    442 		errno = EINVAL;
    443 		return (-1);
    444 	}
    445 
    446 	sig_mutex_lock(&__aio_mutex);
    447 	reqp = _aio_hash_find(resultp);
    448 	if (reqp == NULL) {
    449 		if (_aio_outstand_cnt == _aio_req_done_cnt)
    450 			errno = EINVAL;
    451 		else
    452 			errno = EACCES;
    453 		ret = -1;
    454 	} else {
    455 		aiowp = reqp->req_worker;
    456 		sig_mutex_lock(&aiowp->work_qlock1);
    457 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
    458 		sig_mutex_unlock(&aiowp->work_qlock1);
    459 
    460 		if (canceled) {
    461 			ret = 0;
    462 		} else {
    463 			if (_aio_outstand_cnt == 0 ||
    464 			    _aio_outstand_cnt == _aio_req_done_cnt)
    465 				errno = EINVAL;
    466 			else
    467 				errno = EACCES;
    468 			ret = -1;
    469 		}
    470 	}
    471 	sig_mutex_unlock(&__aio_mutex);
    472 	return (ret);
    473 }
    474 
    475 /*
    476  * This must be asynch safe
    477  */
    478 aio_result_t *
    479 aiowait(struct timeval *uwait)
    480 {
    481 	aio_result_t *uresultp;
    482 	aio_result_t *kresultp;
    483 	aio_result_t *resultp;
    484 	int dontblock;
    485 	int timedwait = 0;
    486 	int kaio_errno = 0;
    487 	struct timeval twait;
    488 	struct timeval *wait = NULL;
    489 	hrtime_t hrtend;
    490 	hrtime_t hres;
    491 
    492 	if (uwait) {
    493 		/*
    494 		 * Check for a valid specified wait time.
    495 		 * If it is invalid, fail the call right away.
    496 		 */
    497 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
    498 		    uwait->tv_usec >= MICROSEC) {
    499 			errno = EINVAL;
    500 			return ((aio_result_t *)-1);
    501 		}
    502 
    503 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
    504 			hrtend = gethrtime() +
    505 				(hrtime_t)uwait->tv_sec * NANOSEC +
    506 				(hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
    507 			twait = *uwait;
    508 			wait = &twait;
    509 			timedwait++;
    510 		} else {
    511 			/* polling */
    512 			sig_mutex_lock(&__aio_mutex);
    513 			if (_kaio_outstand_cnt == 0) {
    514 				kresultp = (aio_result_t *)-1;
    515 			} else {
    516 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
    517 				    (struct timeval *)-1, 1);
    518 				if (kresultp != (aio_result_t *)-1 &&
    519 				    kresultp != NULL &&
    520 				    kresultp != (aio_result_t *)1) {
    521 					_kaio_outstand_cnt--;
    522 					sig_mutex_unlock(&__aio_mutex);
    523 					return (kresultp);
    524 				}
    525 			}
    526 			uresultp = _aio_req_done();
    527 			sig_mutex_unlock(&__aio_mutex);
    528 			if (uresultp != NULL &&
    529 			    uresultp != (aio_result_t *)-1) {
    530 				return (uresultp);
    531 			}
    532 			if (uresultp == (aio_result_t *)-1 &&
    533 			    kresultp == (aio_result_t *)-1) {
    534 				errno = EINVAL;
    535 				return ((aio_result_t *)-1);
    536 			} else {
    537 				return (NULL);
    538 			}
    539 		}
    540 	}
    541 
    542 	for (;;) {
    543 		sig_mutex_lock(&__aio_mutex);
    544 		uresultp = _aio_req_done();
    545 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
    546 			sig_mutex_unlock(&__aio_mutex);
    547 			resultp = uresultp;
    548 			break;
    549 		}
    550 		_aiowait_flag++;
    551 		dontblock = (uresultp == (aio_result_t *)-1);
    552 		if (dontblock && _kaio_outstand_cnt == 0) {
    553 			kresultp = (aio_result_t *)-1;
    554 			kaio_errno = EINVAL;
    555 		} else {
    556 			sig_mutex_unlock(&__aio_mutex);
    557 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
    558 			    wait, dontblock);
    559 			sig_mutex_lock(&__aio_mutex);
    560 			kaio_errno = errno;
    561 		}
    562 		_aiowait_flag--;
    563 		sig_mutex_unlock(&__aio_mutex);
    564 		if (kresultp == (aio_result_t *)1) {
    565 			/* aiowait() awakened by an aionotify() */
    566 			continue;
    567 		} else if (kresultp != NULL &&
    568 		    kresultp != (aio_result_t *)-1) {
    569 			resultp = kresultp;
    570 			sig_mutex_lock(&__aio_mutex);
    571 			_kaio_outstand_cnt--;
    572 			sig_mutex_unlock(&__aio_mutex);
    573 			break;
    574 		} else if (kresultp == (aio_result_t *)-1 &&
    575 		    kaio_errno == EINVAL &&
    576 		    uresultp == (aio_result_t *)-1) {
    577 			errno = kaio_errno;
    578 			resultp = (aio_result_t *)-1;
    579 			break;
    580 		} else if (kresultp == (aio_result_t *)-1 &&
    581 		    kaio_errno == EINTR) {
    582 			errno = kaio_errno;
    583 			resultp = (aio_result_t *)-1;
    584 			break;
    585 		} else if (timedwait) {
    586 			hres = hrtend - gethrtime();
    587 			if (hres <= 0) {
    588 				/* time is up; return */
    589 				resultp = NULL;
    590 				break;
    591 			} else {
    592 				/*
    593 				 * Some time left.  Round up the remaining time
    594 				 * in nanoseconds to microsec.  Retry the call.
    595 				 */
    596 				hres += (NANOSEC / MICROSEC) - 1;
    597 				wait->tv_sec = hres / NANOSEC;
    598 				wait->tv_usec =
    599 					(hres % NANOSEC) / (NANOSEC / MICROSEC);
    600 			}
    601 		} else {
    602 			ASSERT(kresultp == NULL && uresultp == NULL);
    603 			resultp = NULL;
    604 			continue;
    605 		}
    606 	}
    607 	return (resultp);
    608 }
    609 
    610 /*
    611  * _aio_get_timedelta calculates the remaining time and stores the result
    612  * into timespec_t *wait.
    613  */
    614 
    615 int
    616 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
    617 {
    618 	int	ret = 0;
    619 	struct	timeval cur;
    620 	timespec_t curtime;
    621 
    622 	(void) gettimeofday(&cur, NULL);
    623 	curtime.tv_sec = cur.tv_sec;
    624 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
    625 
    626 	if (end->tv_sec >= curtime.tv_sec) {
    627 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
    628 		if (end->tv_nsec >= curtime.tv_nsec) {
    629 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
    630 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
    631 				ret = -1;	/* timer expired */
    632 		} else {
    633 			if (end->tv_sec > curtime.tv_sec) {
    634 				wait->tv_sec -= 1;
    635 				wait->tv_nsec = NANOSEC -
    636 				    (curtime.tv_nsec - end->tv_nsec);
    637 			} else {
    638 				ret = -1;	/* timer expired */
    639 			}
    640 		}
    641 	} else {
    642 		ret = -1;
    643 	}
    644 	return (ret);
    645 }
    646 
    647 /*
    648  * If closing by file descriptor: we will simply cancel all the outstanding
    649  * aio`s and return.  Those aio's in question will have either noticed the
    650  * cancellation notice before, during, or after initiating io.
    651  */
    652 int
    653 aiocancel_all(int fd)
    654 {
    655 	aio_req_t *reqp;
    656 	aio_req_t **reqpp;
    657 	aio_worker_t *first;
    658 	aio_worker_t *next;
    659 	int canceled = 0;
    660 	int done = 0;
    661 	int cancelall = 0;
    662 
    663 	sig_mutex_lock(&__aio_mutex);
    664 
    665 	if (_aio_outstand_cnt == 0) {
    666 		sig_mutex_unlock(&__aio_mutex);
    667 		return (AIO_ALLDONE);
    668 	}
    669 
    670 	/*
    671 	 * Cancel requests from the read/write workers' queues.
    672 	 */
    673 	first = __nextworker_rw;
    674 	next = first;
    675 	do {
    676 		_aio_cancel_work(next, fd, &canceled, &done);
    677 	} while ((next = next->work_forw) != first);
    678 
    679 	/*
    680 	 * finally, check if there are requests on the done queue that
    681 	 * should be canceled.
    682 	 */
    683 	if (fd < 0)
    684 		cancelall = 1;
    685 	reqpp = &_aio_done_tail;
    686 	while ((reqp = *reqpp) != NULL) {
    687 		if (cancelall || reqp->req_args.fd == fd) {
    688 			*reqpp = reqp->req_next;
    689 			_aio_donecnt--;
    690 			(void) _aio_hash_del(reqp->req_resultp);
    691 			_aio_req_free(reqp);
    692 		} else
    693 			reqpp = &reqp->req_next;
    694 	}
    695 	if (cancelall) {
    696 		ASSERT(_aio_donecnt == 0);
    697 		_aio_done_head = NULL;
    698 	}
    699 	sig_mutex_unlock(&__aio_mutex);
    700 
    701 	if (canceled && done == 0)
    702 		return (AIO_CANCELED);
    703 	else if (done && canceled == 0)
    704 		return (AIO_ALLDONE);
    705 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
    706 		return ((int)_kaio(AIOCANCEL, fd, NULL));
    707 	return (AIO_NOTCANCELED);
    708 }
    709 
    710 /*
    711  * Cancel requests from a given work queue.  If the file descriptor
    712  * parameter, fd, is non-negative, then only cancel those requests
    713  * in this queue that are to this file descriptor.  If the fd
    714  * parameter is -1, then cancel all requests.
    715  */
    716 static void
    717 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
    718 {
    719 	aio_req_t *reqp;
    720 
    721 	sig_mutex_lock(&aiowp->work_qlock1);
    722 	/*
    723 	 * cancel queued requests first.
    724 	 */
    725 	reqp = aiowp->work_tail1;
    726 	while (reqp != NULL) {
    727 		if (fd < 0 || reqp->req_args.fd == fd) {
    728 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
    729 				/*
    730 				 * Callers locks were dropped.
    731 				 * reqp is invalid; start traversing
    732 				 * the list from the beginning again.
    733 				 */
    734 				reqp = aiowp->work_tail1;
    735 				continue;
    736 			}
    737 		}
    738 		reqp = reqp->req_next;
    739 	}
    740 	/*
    741 	 * Since the queued requests have been canceled, there can
    742 	 * only be one inprogress request that should be canceled.
    743 	 */
    744 	if ((reqp = aiowp->work_req) != NULL &&
    745 	    (fd < 0 || reqp->req_args.fd == fd))
    746 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
    747 	sig_mutex_unlock(&aiowp->work_qlock1);
    748 }
    749 
    750 /*
    751  * Cancel a request.  Return 1 if the callers locks were temporarily
    752  * dropped, otherwise return 0.
    753  */
    754 int
    755 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
    756 {
    757 	int ostate = reqp->req_state;
    758 
    759 	ASSERT(MUTEX_HELD(&__aio_mutex));
    760 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
    761 	if (ostate == AIO_REQ_CANCELED)
    762 		return (0);
    763 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
    764 		(*done)++;
    765 		return (0);
    766 	}
    767 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
    768 		ASSERT(POSIX_AIO(reqp));
    769 		/* Cancel the queued aio_fsync() request */
    770 		if (!reqp->req_head->lio_canned) {
    771 			reqp->req_head->lio_canned = 1;
    772 			_aio_outstand_cnt--;
    773 			(*canceled)++;
    774 		}
    775 		return (0);
    776 	}
    777 	reqp->req_state = AIO_REQ_CANCELED;
    778 	_aio_req_del(aiowp, reqp, ostate);
    779 	(void) _aio_hash_del(reqp->req_resultp);
    780 	(*canceled)++;
    781 	if (reqp == aiowp->work_req) {
    782 		ASSERT(ostate == AIO_REQ_INPROGRESS);
    783 		/*
    784 		 * Set the result values now, before _aiodone() is called.
    785 		 * We do this because the application can expect aio_return
    786 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
    787 		 * immediately after a successful return from aiocancel()
    788 		 * or aio_cancel().
    789 		 */
    790 		_aio_set_result(reqp, -1, ECANCELED);
    791 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
    792 		return (0);
    793 	}
    794 	if (!POSIX_AIO(reqp)) {
    795 		_aio_outstand_cnt--;
    796 		_aio_set_result(reqp, -1, ECANCELED);
    797 		return (0);
    798 	}
    799 	sig_mutex_unlock(&aiowp->work_qlock1);
    800 	sig_mutex_unlock(&__aio_mutex);
    801 	_aiodone(reqp, -1, ECANCELED);
    802 	sig_mutex_lock(&__aio_mutex);
    803 	sig_mutex_lock(&aiowp->work_qlock1);
    804 	return (1);
    805 }
    806 
    807 int
    808 _aio_create_worker(aio_req_t *reqp, int mode)
    809 {
    810 	aio_worker_t *aiowp, **workers, **nextworker;
    811 	int *aio_workerscnt;
    812 	void *(*func)(void *);
    813 	sigset_t oset;
    814 	int error;
    815 
    816 	/*
    817 	 * Put the new worker thread in the right queue.
    818 	 */
    819 	switch (mode) {
    820 	case AIOREAD:
    821 	case AIOWRITE:
    822 	case AIOAREAD:
    823 	case AIOAWRITE:
    824 #if !defined(_LP64)
    825 	case AIOAREAD64:
    826 	case AIOAWRITE64:
    827 #endif
    828 		workers = &__workers_rw;
    829 		nextworker = &__nextworker_rw;
    830 		aio_workerscnt = &__rw_workerscnt;
    831 		func = _aio_do_request;
    832 		break;
    833 	case AIONOTIFY:
    834 		workers = &__workers_no;
    835 		nextworker = &__nextworker_no;
    836 		func = _aio_do_notify;
    837 		aio_workerscnt = &__no_workerscnt;
    838 		break;
    839 	default:
    840 		aio_panic("_aio_create_worker: invalid mode");
    841 		break;
    842 	}
    843 
    844 	if ((aiowp = _aio_worker_alloc()) == NULL)
    845 		return (-1);
    846 
    847 	if (reqp) {
    848 		reqp->req_state = AIO_REQ_QUEUED;
    849 		reqp->req_worker = aiowp;
    850 		aiowp->work_head1 = reqp;
    851 		aiowp->work_tail1 = reqp;
    852 		aiowp->work_next1 = reqp;
    853 		aiowp->work_count1 = 1;
    854 		aiowp->work_minload1 = 1;
    855 	}
    856 
    857 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
    858 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
    859 		THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
    860 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
    861 	if (error) {
    862 		if (reqp) {
    863 			reqp->req_state = 0;
    864 			reqp->req_worker = NULL;
    865 		}
    866 		_aio_worker_free(aiowp);
    867 		return (-1);
    868 	}
    869 
    870 	lmutex_lock(&__aio_mutex);
    871 	(*aio_workerscnt)++;
    872 	if (*workers == NULL) {
    873 		aiowp->work_forw = aiowp;
    874 		aiowp->work_backw = aiowp;
    875 		*nextworker = aiowp;
    876 		*workers = aiowp;
    877 	} else {
    878 		aiowp->work_backw = (*workers)->work_backw;
    879 		aiowp->work_forw = (*workers);
    880 		(*workers)->work_backw->work_forw = aiowp;
    881 		(*workers)->work_backw = aiowp;
    882 	}
    883 	_aio_worker_cnt++;
    884 	lmutex_unlock(&__aio_mutex);
    885 
    886 	(void) thr_continue(aiowp->work_tid);
    887 
    888 	return (0);
    889 }
    890 
    891 /*
    892  * This is the worker's main routine.
    893  * The task of this function is to execute all queued requests;
    894  * once the last pending request is executed this function will block
    895  * in _aio_idle().  A new incoming request must wakeup this thread to
    896  * restart the work.
    897  * Every worker has an own work queue.  The queue lock is required
    898  * to synchronize the addition of new requests for this worker or
    899  * cancellation of pending/running requests.
    900  *
    901  * Cancellation scenarios:
    902  * The cancellation of a request is being done asynchronously using
    903  * _aio_cancel_req() from another thread context.
    904  * A queued request can be cancelled in different manners :
    905  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
    906  *	- lock the queue -> remove the request -> unlock the queue
    907  *	- this function/thread does not detect this cancellation process
    908  * b) request is in progress (AIO_REQ_INPROGRESS) :
    909  *	- this function first allow the cancellation of the running
    910  *	  request with the flag "work_cancel_flg=1"
    911  * 		see _aio_req_get() -> _aio_cancel_on()
    912  *	  During this phase, it is allowed to interrupt the worker
    913  *	  thread running the request (this thread) using the SIGAIOCANCEL
    914  *	  signal.
    915  *	  Once this thread returns from the kernel (because the request
    916  *	  is just done), then it must disable a possible cancellation
    917  *	  and proceed to finish the request.  To disable the cancellation
    918  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
    919  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
    920  *	  same procedure as in a)
    921  *
    922  * To b)
    923  *	This thread uses sigsetjmp() to define the position in the code, where
    924  *	it wish to continue working in the case that a SIGAIOCANCEL signal
    925  *	is detected.
    926  *	Normally this thread should get the cancellation signal during the
    927  *	kernel phase (reading or writing).  In that case the signal handler
    928  *	aiosigcancelhndlr() is activated using the worker thread context,
    929  *	which again will use the siglongjmp() function to break the standard
    930  *	code flow and jump to the "sigsetjmp" position, provided that
    931  *	"work_cancel_flg" is set to "1".
    932  *	Because the "work_cancel_flg" is only manipulated by this worker
    933  *	thread and it can only run on one CPU at a given time, it is not
    934  *	necessary to protect that flag with the queue lock.
    935  *	Returning from the kernel (read or write system call) we must
    936  *	first disable the use of the SIGAIOCANCEL signal and accordingly
    937  *	the use of the siglongjmp() function to prevent a possible deadlock:
    938  *	- It can happens that this worker thread returns from the kernel and
    939  *	  blocks in "work_qlock1",
    940  *	- then a second thread cancels the apparently "in progress" request
    941  *	  and sends the SIGAIOCANCEL signal to the worker thread,
    942  *	- the worker thread gets assigned the "work_qlock1" and will returns
    943  *	  from the kernel,
    944  *	- the kernel detects the pending signal and activates the signal
    945  *	  handler instead,
    946  *	- if the "work_cancel_flg" is still set then the signal handler
    947  *	  should use siglongjmp() to cancel the "in progress" request and
    948  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
    949  *	  for a second time => deadlock.
    950  *	To avoid that situation we disable the cancellation of the request
    951  *	in progress BEFORE we try to acquire the work_qlock1.
    952  *	In that case the signal handler will not call siglongjmp() and the
    953  *	worker thread will continue running the standard code flow.
    954  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
    955  *	an eventually required siglongjmp() freeing the work_qlock1 and
    956  *	avoiding a deadlock.
    957  */
    958 void *
    959 _aio_do_request(void *arglist)
    960 {
    961 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
    962 	ulwp_t *self = curthread;
    963 	struct aio_args *arg;
    964 	aio_req_t *reqp;		/* current AIO request */
    965 	ssize_t retval;
    966 	int error;
    967 
    968 	if (pthread_setspecific(_aio_key, aiowp) != 0)
    969 		aio_panic("_aio_do_request, pthread_setspecific()");
    970 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
    971 	ASSERT(aiowp->work_req == NULL);
    972 
    973 	/*
    974 	 * We resume here when an operation is cancelled.
    975 	 * On first entry, aiowp->work_req == NULL, so all
    976 	 * we do is block SIGAIOCANCEL.
    977 	 */
    978 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
    979 	ASSERT(self->ul_sigdefer == 0);
    980 
    981 	sigoff(self);	/* block SIGAIOCANCEL */
    982 	if (aiowp->work_req != NULL)
    983 		_aio_finish_request(aiowp, -1, ECANCELED);
    984 
    985 	for (;;) {
    986 		/*
    987 		 * Put completed requests on aio_done_list.  This has
    988 		 * to be done as part of the main loop to ensure that
    989 		 * we don't artificially starve any aiowait'ers.
    990 		 */
    991 		if (aiowp->work_done1)
    992 			_aio_work_done(aiowp);
    993 
    994 top:
    995 		/* consume any deferred SIGAIOCANCEL signal here */
    996 		sigon(self);
    997 		sigoff(self);
    998 
    999 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
   1000 			if (_aio_idle(aiowp) != 0)
   1001 				goto top;
   1002 		}
   1003 		arg = &reqp->req_args;
   1004 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
   1005 		    reqp->req_state == AIO_REQ_CANCELED);
   1006 		error = 0;
   1007 
   1008 		switch (reqp->req_op) {
   1009 		case AIOREAD:
   1010 		case AIOAREAD:
   1011 			sigon(self);	/* unblock SIGAIOCANCEL */
   1012 			retval = pread(arg->fd, arg->buf,
   1013 			    arg->bufsz, arg->offset);
   1014 			if (retval == -1) {
   1015 				if (errno == ESPIPE) {
   1016 					retval = read(arg->fd,
   1017 					    arg->buf, arg->bufsz);
   1018 					if (retval == -1)
   1019 						error = errno;
   1020 				} else {
   1021 					error = errno;
   1022 				}
   1023 			}
   1024 			sigoff(self);	/* block SIGAIOCANCEL */
   1025 			break;
   1026 		case AIOWRITE:
   1027 		case AIOAWRITE:
   1028 			sigon(self);	/* unblock SIGAIOCANCEL */
   1029 			retval = pwrite(arg->fd, arg->buf,
   1030 			    arg->bufsz, arg->offset);
   1031 			if (retval == -1) {
   1032 				if (errno == ESPIPE) {
   1033 					retval = write(arg->fd,
   1034 					    arg->buf, arg->bufsz);
   1035 					if (retval == -1)
   1036 						error = errno;
   1037 				} else {
   1038 					error = errno;
   1039 				}
   1040 			}
   1041 			sigoff(self);	/* block SIGAIOCANCEL */
   1042 			break;
   1043 #if !defined(_LP64)
   1044 		case AIOAREAD64:
   1045 			sigon(self);	/* unblock SIGAIOCANCEL */
   1046 			retval = pread64(arg->fd, arg->buf,
   1047 			    arg->bufsz, arg->offset);
   1048 			if (retval == -1) {
   1049 				if (errno == ESPIPE) {
   1050 					retval = read(arg->fd,
   1051 					    arg->buf, arg->bufsz);
   1052 					if (retval == -1)
   1053 						error = errno;
   1054 				} else {
   1055 					error = errno;
   1056 				}
   1057 			}
   1058 			sigoff(self);	/* block SIGAIOCANCEL */
   1059 			break;
   1060 		case AIOAWRITE64:
   1061 			sigon(self);	/* unblock SIGAIOCANCEL */
   1062 			retval = pwrite64(arg->fd, arg->buf,
   1063 			    arg->bufsz, arg->offset);
   1064 			if (retval == -1) {
   1065 				if (errno == ESPIPE) {
   1066 					retval = write(arg->fd,
   1067 					    arg->buf, arg->bufsz);
   1068 					if (retval == -1)
   1069 						error = errno;
   1070 				} else {
   1071 					error = errno;
   1072 				}
   1073 			}
   1074 			sigoff(self);	/* block SIGAIOCANCEL */
   1075 			break;
   1076 #endif	/* !defined(_LP64) */
   1077 		case AIOFSYNC:
   1078 			if (_aio_fsync_del(aiowp, reqp))
   1079 				goto top;
   1080 			ASSERT(reqp->req_head == NULL);
   1081 			/*
   1082 			 * All writes for this fsync request are now
   1083 			 * acknowledged.  Now make these writes visible
   1084 			 * and put the final request into the hash table.
   1085 			 */
   1086 			if (reqp->req_state == AIO_REQ_CANCELED) {
   1087 				/* EMPTY */;
   1088 			} else if (arg->offset == O_SYNC) {
   1089 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
   1090 					error = errno;
   1091 			} else {
   1092 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
   1093 					error = errno;
   1094 			}
   1095 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
   1096 				aio_panic("_aio_do_request(): AIOFSYNC: "
   1097 				    "request already in hash table");
   1098 			break;
   1099 		default:
   1100 			aio_panic("_aio_do_request, bad op");
   1101 		}
   1102 
   1103 		_aio_finish_request(aiowp, retval, error);
   1104 	}
   1105 	/* NOTREACHED */
   1106 	return (NULL);
   1107 }
   1108 
   1109 /*
   1110  * Perform the tail processing for _aio_do_request().
   1111  * The in-progress request may or may not have been cancelled.
   1112  */
   1113 static void
   1114 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
   1115 {
   1116 	aio_req_t *reqp;
   1117 
   1118 	sig_mutex_lock(&aiowp->work_qlock1);
   1119 	if ((reqp = aiowp->work_req) == NULL)
   1120 		sig_mutex_unlock(&aiowp->work_qlock1);
   1121 	else {
   1122 		aiowp->work_req = NULL;
   1123 		if (reqp->req_state == AIO_REQ_CANCELED) {
   1124 			retval = -1;
   1125 			error = ECANCELED;
   1126 		}
   1127 		if (!POSIX_AIO(reqp)) {
   1128 			sig_mutex_unlock(&aiowp->work_qlock1);
   1129 			sig_mutex_lock(&__aio_mutex);
   1130 			if (reqp->req_state == AIO_REQ_INPROGRESS)
   1131 				reqp->req_state = AIO_REQ_DONE;
   1132 			_aio_req_done_cnt++;
   1133 			_aio_set_result(reqp, retval, error);
   1134 			if (error == ECANCELED)
   1135 				_aio_outstand_cnt--;
   1136 			sig_mutex_unlock(&__aio_mutex);
   1137 		} else {
   1138 			if (reqp->req_state == AIO_REQ_INPROGRESS)
   1139 				reqp->req_state = AIO_REQ_DONE;
   1140 			sig_mutex_unlock(&aiowp->work_qlock1);
   1141 			_aiodone(reqp, retval, error);
   1142 		}
   1143 	}
   1144 }
   1145 
   1146 void
   1147 _aio_req_mark_done(aio_req_t *reqp)
   1148 {
   1149 #if !defined(_LP64)
   1150 	if (reqp->req_largefile)
   1151 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
   1152 	else
   1153 #endif
   1154 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
   1155 }
   1156 
   1157 /*
   1158  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
   1159  * hopefully to consume one of our queued signals.
   1160  */
   1161 static void
   1162 _aio_delay(int ticks)
   1163 {
   1164 	(void) usleep(ticks * (MICROSEC / hz));
   1165 }
   1166 
   1167 /*
   1168  * Actually send the notifications.
   1169  * We could block indefinitely here if the application
   1170  * is not listening for the signal or port notifications.
   1171  */
   1172 static void
   1173 send_notification(notif_param_t *npp)
   1174 {
   1175 	extern int __sigqueue(pid_t pid, int signo,
   1176 		/* const union sigval */ void *value, int si_code, int block);
   1177 
   1178 	if (npp->np_signo)
   1179 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
   1180 		    SI_ASYNCIO, 1);
   1181 	else if (npp->np_port >= 0)
   1182 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
   1183 		    npp->np_event, npp->np_object, npp->np_user);
   1184 
   1185 	if (npp->np_lio_signo)
   1186 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
   1187 		    SI_ASYNCIO, 1);
   1188 	else if (npp->np_lio_port >= 0)
   1189 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
   1190 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
   1191 }
   1192 
   1193 /*
   1194  * Asynchronous notification worker.
   1195  */
   1196 void *
   1197 _aio_do_notify(void *arg)
   1198 {
   1199 	aio_worker_t *aiowp = (aio_worker_t *)arg;
   1200 	aio_req_t *reqp;
   1201 
   1202 	/*
   1203 	 * This isn't really necessary.  All signals are blocked.
   1204 	 */
   1205 	if (pthread_setspecific(_aio_key, aiowp) != 0)
   1206 		aio_panic("_aio_do_notify, pthread_setspecific()");
   1207 
   1208 	/*
   1209 	 * Notifications are never cancelled.
   1210 	 * All signals remain blocked, forever.
   1211 	 */
   1212 	for (;;) {
   1213 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
   1214 			if (_aio_idle(aiowp) != 0)
   1215 				aio_panic("_aio_do_notify: _aio_idle() failed");
   1216 		}
   1217 		send_notification(&reqp->req_notify);
   1218 		_aio_req_free(reqp);
   1219 	}
   1220 
   1221 	/* NOTREACHED */
   1222 	return (NULL);
   1223 }
   1224 
   1225 /*
   1226  * Do the completion semantics for a request that was either canceled
   1227  * by _aio_cancel_req() or was completed by _aio_do_request().
   1228  */
   1229 static void
   1230 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
   1231 {
   1232 	aio_result_t *resultp = reqp->req_resultp;
   1233 	int notify = 0;
   1234 	aio_lio_t *head;
   1235 	int sigev_none;
   1236 	int sigev_signal;
   1237 	int sigev_thread;
   1238 	int sigev_port;
   1239 	notif_param_t np;
   1240 
   1241 	/*
   1242 	 * We call _aiodone() only for Posix I/O.
   1243 	 */
   1244 	ASSERT(POSIX_AIO(reqp));
   1245 
   1246 	sigev_none = 0;
   1247 	sigev_signal = 0;
   1248 	sigev_thread = 0;
   1249 	sigev_port = 0;
   1250 	np.np_signo = 0;
   1251 	np.np_port = -1;
   1252 	np.np_lio_signo = 0;
   1253 	np.np_lio_port = -1;
   1254 
   1255 	switch (reqp->req_sigevent.sigev_notify) {
   1256 	case SIGEV_NONE:
   1257 		sigev_none = 1;
   1258 		break;
   1259 	case SIGEV_SIGNAL:
   1260 		sigev_signal = 1;
   1261 		break;
   1262 	case SIGEV_THREAD:
   1263 		sigev_thread = 1;
   1264 		break;
   1265 	case SIGEV_PORT:
   1266 		sigev_port = 1;
   1267 		break;
   1268 	default:
   1269 		aio_panic("_aiodone: improper sigev_notify");
   1270 		break;
   1271 	}
   1272 
   1273 	/*
   1274 	 * Figure out the notification parameters while holding __aio_mutex.
   1275 	 * Actually perform the notifications after dropping __aio_mutex.
   1276 	 * This allows us to sleep for a long time (if the notifications
   1277 	 * incur delays) without impeding other async I/O operations.
   1278 	 */
   1279 
   1280 	sig_mutex_lock(&__aio_mutex);
   1281 
   1282 	if (sigev_signal) {
   1283 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
   1284 			notify = 1;
   1285 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
   1286 	} else if (sigev_thread | sigev_port) {
   1287 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
   1288 			notify = 1;
   1289 		np.np_event = reqp->req_op;
   1290 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
   1291 			np.np_event = AIOFSYNC64;
   1292 		np.np_object = (uintptr_t)reqp->req_aiocbp;
   1293 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
   1294 	}
   1295 
   1296 	if (resultp->aio_errno == EINPROGRESS)
   1297 		_aio_set_result(reqp, retval, error);
   1298 
   1299 	_aio_outstand_cnt--;
   1300 
   1301 	head = reqp->req_head;
   1302 	reqp->req_head = NULL;
   1303 
   1304 	if (sigev_none) {
   1305 		_aio_enq_doneq(reqp);
   1306 		reqp = NULL;
   1307 	} else {
   1308 		(void) _aio_hash_del(resultp);
   1309 		_aio_req_mark_done(reqp);
   1310 	}
   1311 
   1312 	_aio_waitn_wakeup();
   1313 
   1314 	/*
   1315 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
   1316 	 * __aio_suspend() increments "_aio_kernel_suspend"
   1317 	 * when they are waiting in the kernel for completed I/Os.
   1318 	 *
   1319 	 * _kaio(AIONOTIFY) awakes the corresponding function
   1320 	 * in the kernel; then the corresponding __aio_waitn() or
   1321 	 * __aio_suspend() function could reap the recently
   1322 	 * completed I/Os (_aiodone()).
   1323 	 */
   1324 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
   1325 		(void) _kaio(AIONOTIFY);
   1326 
   1327 	sig_mutex_unlock(&__aio_mutex);
   1328 
   1329 	if (head != NULL) {
   1330 		/*
   1331 		 * If all the lio requests have completed,
   1332 		 * prepare to notify the waiting thread.
   1333 		 */
   1334 		sig_mutex_lock(&head->lio_mutex);
   1335 		ASSERT(head->lio_refcnt == head->lio_nent);
   1336 		if (head->lio_refcnt == 1) {
   1337 			int waiting = 0;
   1338 			if (head->lio_mode == LIO_WAIT) {
   1339 				if ((waiting = head->lio_waiting) != 0)
   1340 					(void) cond_signal(&head->lio_cond_cv);
   1341 			} else if (head->lio_port < 0) { /* none or signal */
   1342 				if ((np.np_lio_signo = head->lio_signo) != 0)
   1343 					notify = 1;
   1344 				np.np_lio_user = head->lio_sigval.sival_ptr;
   1345 			} else {			/* thread or port */
   1346 				notify = 1;
   1347 				np.np_lio_port = head->lio_port;
   1348 				np.np_lio_event = head->lio_event;
   1349 				np.np_lio_object =
   1350 				    (uintptr_t)head->lio_sigevent;
   1351 				np.np_lio_user = head->lio_sigval.sival_ptr;
   1352 			}
   1353 			head->lio_nent = head->lio_refcnt = 0;
   1354 			sig_mutex_unlock(&head->lio_mutex);
   1355 			if (waiting == 0)
   1356 				_aio_lio_free(head);
   1357 		} else {
   1358 			head->lio_nent--;
   1359 			head->lio_refcnt--;
   1360 			sig_mutex_unlock(&head->lio_mutex);
   1361 		}
   1362 	}
   1363 
   1364 	/*
   1365 	 * The request is completed; now perform the notifications.
   1366 	 */
   1367 	if (notify) {
   1368 		if (reqp != NULL) {
   1369 			/*
   1370 			 * We usually put the request on the notification
   1371 			 * queue because we don't want to block and delay
   1372 			 * other operations behind us in the work queue.
   1373 			 * Also we must never block on a cancel notification
   1374 			 * because we are being called from an application
   1375 			 * thread in this case and that could lead to deadlock
   1376 			 * if no other thread is receiving notificatins.
   1377 			 */
   1378 			reqp->req_notify = np;
   1379 			reqp->req_op = AIONOTIFY;
   1380 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
   1381 			reqp = NULL;
   1382 		} else {
   1383 			/*
   1384 			 * We already put the request on the done queue,
   1385 			 * so we can't queue it to the notification queue.
   1386 			 * Just do the notification directly.
   1387 			 */
   1388 			send_notification(&np);
   1389 		}
   1390 	}
   1391 
   1392 	if (reqp != NULL)
   1393 		_aio_req_free(reqp);
   1394 }
   1395 
   1396 /*
   1397  * Delete fsync requests from list head until there is
   1398  * only one left.  Return 0 when there is only one,
   1399  * otherwise return a non-zero value.
   1400  */
   1401 static int
   1402 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
   1403 {
   1404 	aio_lio_t *head = reqp->req_head;
   1405 	int rval = 0;
   1406 
   1407 	ASSERT(reqp == aiowp->work_req);
   1408 	sig_mutex_lock(&aiowp->work_qlock1);
   1409 	sig_mutex_lock(&head->lio_mutex);
   1410 	if (head->lio_refcnt > 1) {
   1411 		head->lio_refcnt--;
   1412 		head->lio_nent--;
   1413 		aiowp->work_req = NULL;
   1414 		sig_mutex_unlock(&head->lio_mutex);
   1415 		sig_mutex_unlock(&aiowp->work_qlock1);
   1416 		sig_mutex_lock(&__aio_mutex);
   1417 		_aio_outstand_cnt--;
   1418 		_aio_waitn_wakeup();
   1419 		sig_mutex_unlock(&__aio_mutex);
   1420 		_aio_req_free(reqp);
   1421 		return (1);
   1422 	}
   1423 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
   1424 	reqp->req_head = NULL;
   1425 	if (head->lio_canned)
   1426 		reqp->req_state = AIO_REQ_CANCELED;
   1427 	if (head->lio_mode == LIO_DESTROY) {
   1428 		aiowp->work_req = NULL;
   1429 		rval = 1;
   1430 	}
   1431 	sig_mutex_unlock(&head->lio_mutex);
   1432 	sig_mutex_unlock(&aiowp->work_qlock1);
   1433 	head->lio_refcnt--;
   1434 	head->lio_nent--;
   1435 	_aio_lio_free(head);
   1436 	if (rval != 0)
   1437 		_aio_req_free(reqp);
   1438 	return (rval);
   1439 }
   1440 
   1441 /*
   1442  * A worker is set idle when its work queue is empty.
   1443  * The worker checks again that it has no more work
   1444  * and then goes to sleep waiting for more work.
   1445  */
   1446 int
   1447 _aio_idle(aio_worker_t *aiowp)
   1448 {
   1449 	int error = 0;
   1450 
   1451 	sig_mutex_lock(&aiowp->work_qlock1);
   1452 	if (aiowp->work_count1 == 0) {
   1453 		ASSERT(aiowp->work_minload1 == 0);
   1454 		aiowp->work_idleflg = 1;
   1455 		/*
   1456 		 * A cancellation handler is not needed here.
   1457 		 * aio worker threads are never cancelled via pthread_cancel().
   1458 		 */
   1459 		error = sig_cond_wait(&aiowp->work_idle_cv,
   1460 		    &aiowp->work_qlock1);
   1461 		/*
   1462 		 * The idle flag is normally cleared before worker is awakened
   1463 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
   1464 		 */
   1465 		if (error)
   1466 			aiowp->work_idleflg = 0;
   1467 	}
   1468 	sig_mutex_unlock(&aiowp->work_qlock1);
   1469 	return (error);
   1470 }
   1471 
   1472 /*
   1473  * A worker's completed AIO requests are placed onto a global
   1474  * done queue.  The application is only sent a SIGIO signal if
   1475  * the process has a handler enabled and it is not waiting via
   1476  * aiowait().
   1477  */
   1478 static void
   1479 _aio_work_done(aio_worker_t *aiowp)
   1480 {
   1481 	aio_req_t *reqp;
   1482 
   1483 	sig_mutex_lock(&aiowp->work_qlock1);
   1484 	reqp = aiowp->work_prev1;
   1485 	reqp->req_next = NULL;
   1486 	aiowp->work_done1 = 0;
   1487 	aiowp->work_tail1 = aiowp->work_next1;
   1488 	if (aiowp->work_tail1 == NULL)
   1489 		aiowp->work_head1 = NULL;
   1490 	aiowp->work_prev1 = NULL;
   1491 	sig_mutex_unlock(&aiowp->work_qlock1);
   1492 	sig_mutex_lock(&__aio_mutex);
   1493 	_aio_donecnt++;
   1494 	_aio_outstand_cnt--;
   1495 	_aio_req_done_cnt--;
   1496 	ASSERT(_aio_donecnt > 0 &&
   1497 	    _aio_outstand_cnt >= 0 &&
   1498 	    _aio_req_done_cnt >= 0);
   1499 	ASSERT(reqp != NULL);
   1500 
   1501 	if (_aio_done_tail == NULL) {
   1502 		_aio_done_head = _aio_done_tail = reqp;
   1503 	} else {
   1504 		_aio_done_head->req_next = reqp;
   1505 		_aio_done_head = reqp;
   1506 	}
   1507 
   1508 	if (_aiowait_flag) {
   1509 		sig_mutex_unlock(&__aio_mutex);
   1510 		(void) _kaio(AIONOTIFY);
   1511 	} else {
   1512 		sig_mutex_unlock(&__aio_mutex);
   1513 		if (_sigio_enabled)
   1514 			(void) kill(__pid, SIGIO);
   1515 	}
   1516 }
   1517 
   1518 /*
   1519  * The done queue consists of AIO requests that are in either the
   1520  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
   1521  * are discarded.  If the done queue is empty then NULL is returned.
   1522  * Otherwise the address of a done aio_result_t is returned.
   1523  */
   1524 aio_result_t *
   1525 _aio_req_done(void)
   1526 {
   1527 	aio_req_t *reqp;
   1528 	aio_result_t *resultp;
   1529 
   1530 	ASSERT(MUTEX_HELD(&__aio_mutex));
   1531 
   1532 	if ((reqp = _aio_done_tail) != NULL) {
   1533 		if ((_aio_done_tail = reqp->req_next) == NULL)
   1534 			_aio_done_head = NULL;
   1535 		ASSERT(_aio_donecnt > 0);
   1536 		_aio_donecnt--;
   1537 		(void) _aio_hash_del(reqp->req_resultp);
   1538 		resultp = reqp->req_resultp;
   1539 		ASSERT(reqp->req_state == AIO_REQ_DONE);
   1540 		_aio_req_free(reqp);
   1541 		return (resultp);
   1542 	}
   1543 	/* is queue empty? */
   1544 	if (reqp == NULL && _aio_outstand_cnt == 0) {
   1545 		return ((aio_result_t *)-1);
   1546 	}
   1547 	return (NULL);
   1548 }
   1549 
   1550 /*
   1551  * Set the return and errno values for the application's use.
   1552  *
   1553  * For the Posix interfaces, we must set the return value first followed
   1554  * by the errno value because the Posix interfaces allow for a change
   1555  * in the errno value from EINPROGRESS to something else to signal
   1556  * the completion of the asynchronous request.
   1557  *
   1558  * The opposite is true for the Solaris interfaces.  These allow for
   1559  * a change in the return value from AIO_INPROGRESS to something else
   1560  * to signal the completion of the asynchronous request.
   1561  */
   1562 void
   1563 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
   1564 {
   1565 	aio_result_t *resultp = reqp->req_resultp;
   1566 
   1567 	if (POSIX_AIO(reqp)) {
   1568 		resultp->aio_return = retval;
   1569 		membar_producer();
   1570 		resultp->aio_errno = error;
   1571 	} else {
   1572 		resultp->aio_errno = error;
   1573 		membar_producer();
   1574 		resultp->aio_return = retval;
   1575 	}
   1576 }
   1577 
   1578 /*
   1579  * Add an AIO request onto the next work queue.
   1580  * A circular list of workers is used to choose the next worker.
   1581  */
   1582 void
   1583 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
   1584 {
   1585 	ulwp_t *self = curthread;
   1586 	aio_worker_t *aiowp;
   1587 	aio_worker_t *first;
   1588 	int load_bal_flg = 1;
   1589 	int found;
   1590 
   1591 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
   1592 	reqp->req_next = NULL;
   1593 	/*
   1594 	 * Try to acquire the next worker's work queue.  If it is locked,
   1595 	 * then search the list of workers until a queue is found unlocked,
   1596 	 * or until the list is completely traversed at which point another
   1597 	 * worker will be created.
   1598 	 */
   1599 	sigoff(self);		/* defer SIGIO */
   1600 	sig_mutex_lock(&__aio_mutex);
   1601 	first = aiowp = *nextworker;
   1602 	if (mode != AIONOTIFY)
   1603 		_aio_outstand_cnt++;
   1604 	sig_mutex_unlock(&__aio_mutex);
   1605 
   1606 	switch (mode) {
   1607 	case AIOREAD:
   1608 	case AIOWRITE:
   1609 	case AIOAREAD:
   1610 	case AIOAWRITE:
   1611 #if !defined(_LP64)
   1612 	case AIOAREAD64:
   1613 	case AIOAWRITE64:
   1614 #endif
   1615 		/* try to find an idle worker */
   1616 		found = 0;
   1617 		do {
   1618 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
   1619 				if (aiowp->work_idleflg) {
   1620 					found = 1;
   1621 					break;
   1622 				}
   1623 				sig_mutex_unlock(&aiowp->work_qlock1);
   1624 			}
   1625 		} while ((aiowp = aiowp->work_forw) != first);
   1626 
   1627 		if (found) {
   1628 			aiowp->work_minload1++;
   1629 			break;
   1630 		}
   1631 
   1632 		/* try to acquire some worker's queue lock */
   1633 		do {
   1634 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
   1635 				found = 1;
   1636 				break;
   1637 			}
   1638 		} while ((aiowp = aiowp->work_forw) != first);
   1639 
   1640 		/*
   1641 		 * Create more workers when the workers appear overloaded.
   1642 		 * Either all the workers are busy draining their queues
   1643 		 * or no worker's queue lock could be acquired.
   1644 		 */
   1645 		if (!found) {
   1646 			if (_aio_worker_cnt < _max_workers) {
   1647 				if (_aio_create_worker(reqp, mode))
   1648 					aio_panic("_aio_req_add: add worker");
   1649 				sigon(self);	/* reenable SIGIO */
   1650 				return;
   1651 			}
   1652 
   1653 			/*
   1654 			 * No worker available and we have created
   1655 			 * _max_workers, keep going through the
   1656 			 * list slowly until we get a lock
   1657 			 */
   1658 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
   1659 				/*
   1660 				 * give someone else a chance
   1661 				 */
   1662 				_aio_delay(1);
   1663 				aiowp = aiowp->work_forw;
   1664 			}
   1665 		}
   1666 
   1667 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
   1668 		if (_aio_worker_cnt < _max_workers &&
   1669 		    aiowp->work_minload1 >= _minworkload) {
   1670 			sig_mutex_unlock(&aiowp->work_qlock1);
   1671 			sig_mutex_lock(&__aio_mutex);
   1672 			*nextworker = aiowp->work_forw;
   1673 			sig_mutex_unlock(&__aio_mutex);
   1674 			if (_aio_create_worker(reqp, mode))
   1675 				aio_panic("aio_req_add: add worker");
   1676 			sigon(self);	/* reenable SIGIO */
   1677 			return;
   1678 		}
   1679 		aiowp->work_minload1++;
   1680 		break;
   1681 	case AIOFSYNC:
   1682 	case AIONOTIFY:
   1683 		load_bal_flg = 0;
   1684 		sig_mutex_lock(&aiowp->work_qlock1);
   1685 		break;
   1686 	default:
   1687 		aio_panic("_aio_req_add: invalid mode");
   1688 		break;
   1689 	}
   1690 	/*
   1691 	 * Put request onto worker's work queue.
   1692 	 */
   1693 	if (aiowp->work_tail1 == NULL) {
   1694 		ASSERT(aiowp->work_count1 == 0);
   1695 		aiowp->work_tail1 = reqp;
   1696 		aiowp->work_next1 = reqp;
   1697 	} else {
   1698 		aiowp->work_head1->req_next = reqp;
   1699 		if (aiowp->work_next1 == NULL)
   1700 			aiowp->work_next1 = reqp;
   1701 	}
   1702 	reqp->req_state = AIO_REQ_QUEUED;
   1703 	reqp->req_worker = aiowp;
   1704 	aiowp->work_head1 = reqp;
   1705 	/*
   1706 	 * Awaken worker if it is not currently active.
   1707 	 */
   1708 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
   1709 		aiowp->work_idleflg = 0;
   1710 		(void) cond_signal(&aiowp->work_idle_cv);
   1711 	}
   1712 	sig_mutex_unlock(&aiowp->work_qlock1);
   1713 
   1714 	if (load_bal_flg) {
   1715 		sig_mutex_lock(&__aio_mutex);
   1716 		*nextworker = aiowp->work_forw;
   1717 		sig_mutex_unlock(&__aio_mutex);
   1718 	}
   1719 	sigon(self);	/* reenable SIGIO */
   1720 }
   1721 
   1722 /*
   1723  * Get an AIO request for a specified worker.
   1724  * If the work queue is empty, return NULL.
   1725  */
   1726 aio_req_t *
   1727 _aio_req_get(aio_worker_t *aiowp)
   1728 {
   1729 	aio_req_t *reqp;
   1730 
   1731 	sig_mutex_lock(&aiowp->work_qlock1);
   1732 	if ((reqp = aiowp->work_next1) != NULL) {
   1733 		/*
   1734 		 * Remove a POSIX request from the queue; the
   1735 		 * request queue is a singularly linked list
   1736 		 * with a previous pointer.  The request is
   1737 		 * removed by updating the previous pointer.
   1738 		 *
   1739 		 * Non-posix requests are left on the queue
   1740 		 * to eventually be placed on the done queue.
   1741 		 */
   1742 
   1743 		if (POSIX_AIO(reqp)) {
   1744 			if (aiowp->work_prev1 == NULL) {
   1745 				aiowp->work_tail1 = reqp->req_next;
   1746 				if (aiowp->work_tail1 == NULL)
   1747 					aiowp->work_head1 = NULL;
   1748 			} else {
   1749 				aiowp->work_prev1->req_next = reqp->req_next;
   1750 				if (aiowp->work_head1 == reqp)
   1751 					aiowp->work_head1 = reqp->req_next;
   1752 			}
   1753 
   1754 		} else {
   1755 			aiowp->work_prev1 = reqp;
   1756 			ASSERT(aiowp->work_done1 >= 0);
   1757 			aiowp->work_done1++;
   1758 		}
   1759 		ASSERT(reqp != reqp->req_next);
   1760 		aiowp->work_next1 = reqp->req_next;
   1761 		ASSERT(aiowp->work_count1 >= 1);
   1762 		aiowp->work_count1--;
   1763 		switch (reqp->req_op) {
   1764 		case AIOREAD:
   1765 		case AIOWRITE:
   1766 		case AIOAREAD:
   1767 		case AIOAWRITE:
   1768 #if !defined(_LP64)
   1769 		case AIOAREAD64:
   1770 		case AIOAWRITE64:
   1771 #endif
   1772 			ASSERT(aiowp->work_minload1 > 0);
   1773 			aiowp->work_minload1--;
   1774 			break;
   1775 		}
   1776 		reqp->req_state = AIO_REQ_INPROGRESS;
   1777 	}
   1778 	aiowp->work_req = reqp;
   1779 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
   1780 	sig_mutex_unlock(&aiowp->work_qlock1);
   1781 	return (reqp);
   1782 }
   1783 
   1784 static void
   1785 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
   1786 {
   1787 	aio_req_t **last;
   1788 	aio_req_t *lastrp;
   1789 	aio_req_t *next;
   1790 
   1791 	ASSERT(aiowp != NULL);
   1792 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
   1793 	if (POSIX_AIO(reqp)) {
   1794 		if (ostate != AIO_REQ_QUEUED)
   1795 			return;
   1796 	}
   1797 	last = &aiowp->work_tail1;
   1798 	lastrp = aiowp->work_tail1;
   1799 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
   1800 	while ((next = *last) != NULL) {
   1801 		if (next == reqp) {
   1802 			*last = next->req_next;
   1803 			if (aiowp->work_next1 == next)
   1804 				aiowp->work_next1 = next->req_next;
   1805 
   1806 			if ((next->req_next != NULL) ||
   1807 			    (aiowp->work_done1 == 0)) {
   1808 				if (aiowp->work_head1 == next)
   1809 					aiowp->work_head1 = next->req_next;
   1810 				if (aiowp->work_prev1 == next)
   1811 					aiowp->work_prev1 = next->req_next;
   1812 			} else {
   1813 				if (aiowp->work_head1 == next)
   1814 					aiowp->work_head1 = lastrp;
   1815 				if (aiowp->work_prev1 == next)
   1816 					aiowp->work_prev1 = lastrp;
   1817 			}
   1818 
   1819 			if (ostate == AIO_REQ_QUEUED) {
   1820 				ASSERT(aiowp->work_count1 >= 1);
   1821 				aiowp->work_count1--;
   1822 				ASSERT(aiowp->work_minload1 >= 1);
   1823 				aiowp->work_minload1--;
   1824 			} else {
   1825 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
   1826 				    !POSIX_AIO(reqp));
   1827 				aiowp->work_done1--;
   1828 			}
   1829 			return;
   1830 		}
   1831 		last = &next->req_next;
   1832 		lastrp = next;
   1833 	}
   1834 	/* NOTREACHED */
   1835 }
   1836 
   1837 static void
   1838 _aio_enq_doneq(aio_req_t *reqp)
   1839 {
   1840 	if (_aio_doneq == NULL) {
   1841 		_aio_doneq = reqp;
   1842 		reqp->req_next = reqp->req_prev = reqp;
   1843 	} else {
   1844 		reqp->req_next = _aio_doneq;
   1845 		reqp->req_prev = _aio_doneq->req_prev;
   1846 		_aio_doneq->req_prev->req_next = reqp;
   1847 		_aio_doneq->req_prev = reqp;
   1848 	}
   1849 	reqp->req_state = AIO_REQ_DONEQ;
   1850 	_aio_doneq_cnt++;
   1851 }
   1852 
   1853 /*
   1854  * caller owns the _aio_mutex
   1855  */
   1856 aio_req_t *
   1857 _aio_req_remove(aio_req_t *reqp)
   1858 {
   1859 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
   1860 		return (NULL);
   1861 
   1862 	if (reqp) {
   1863 		/* request in done queue */
   1864 		if (_aio_doneq == reqp)
   1865 			_aio_doneq = reqp->req_next;
   1866 		if (_aio_doneq == reqp) {
   1867 			/* only one request on queue */
   1868 			_aio_doneq = NULL;
   1869 		} else {
   1870 			aio_req_t *tmp = reqp->req_next;
   1871 			reqp->req_prev->req_next = tmp;
   1872 			tmp->req_prev = reqp->req_prev;
   1873 		}
   1874 	} else if ((reqp = _aio_doneq) != NULL) {
   1875 		if (reqp == reqp->req_next) {
   1876 			/* only one request on queue */
   1877 			_aio_doneq = NULL;
   1878 		} else {
   1879 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
   1880 			_aio_doneq->req_prev = reqp->req_prev;
   1881 		}
   1882 	}
   1883 	if (reqp) {
   1884 		_aio_doneq_cnt--;
   1885 		reqp->req_next = reqp->req_prev = reqp;
   1886 		reqp->req_state = AIO_REQ_DONE;
   1887 	}
   1888 	return (reqp);
   1889 }
   1890 
   1891 /*
   1892  * An AIO request is identified by an aio_result_t pointer.  The library
   1893  * maps this aio_result_t pointer to its internal representation using a
   1894  * hash table.  This function adds an aio_result_t pointer to the hash table.
   1895  */
   1896 static int
   1897 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
   1898 {
   1899 	aio_hash_t *hashp;
   1900 	aio_req_t **prev;
   1901 	aio_req_t *next;
   1902 
   1903 	hashp = _aio_hash + AIOHASH(resultp);
   1904 	lmutex_lock(&hashp->hash_lock);
   1905 	prev = &hashp->hash_ptr;
   1906 	while ((next = *prev) != NULL) {
   1907 		if (resultp == next->req_resultp) {
   1908 			lmutex_unlock(&hashp->hash_lock);
   1909 			return (-1);
   1910 		}
   1911 		prev = &next->req_link;
   1912 	}
   1913 	*prev = reqp;
   1914 	ASSERT(reqp->req_link == NULL);
   1915 	lmutex_unlock(&hashp->hash_lock);
   1916 	return (0);
   1917 }
   1918 
   1919 /*
   1920  * Remove an entry from the hash table.
   1921  */
   1922 aio_req_t *
   1923 _aio_hash_del(aio_result_t *resultp)
   1924 {
   1925 	aio_hash_t *hashp;
   1926 	aio_req_t **prev;
   1927 	aio_req_t *next = NULL;
   1928 
   1929 	if (_aio_hash != NULL) {
   1930 		hashp = _aio_hash + AIOHASH(resultp);
   1931 		lmutex_lock(&hashp->hash_lock);
   1932 		prev = &hashp->hash_ptr;
   1933 		while ((next = *prev) != NULL) {
   1934 			if (resultp == next->req_resultp) {
   1935 				*prev = next->req_link;
   1936 				next->req_link = NULL;
   1937 				break;
   1938 			}
   1939 			prev = &next->req_link;
   1940 		}
   1941 		lmutex_unlock(&hashp->hash_lock);
   1942 	}
   1943 	return (next);
   1944 }
   1945 
   1946 /*
   1947  *  find an entry in the hash table
   1948  */
   1949 aio_req_t *
   1950 _aio_hash_find(aio_result_t *resultp)
   1951 {
   1952 	aio_hash_t *hashp;
   1953 	aio_req_t **prev;
   1954 	aio_req_t *next = NULL;
   1955 
   1956 	if (_aio_hash != NULL) {
   1957 		hashp = _aio_hash + AIOHASH(resultp);
   1958 		lmutex_lock(&hashp->hash_lock);
   1959 		prev = &hashp->hash_ptr;
   1960 		while ((next = *prev) != NULL) {
   1961 			if (resultp == next->req_resultp)
   1962 				break;
   1963 			prev = &next->req_link;
   1964 		}
   1965 		lmutex_unlock(&hashp->hash_lock);
   1966 	}
   1967 	return (next);
   1968 }
   1969 
   1970 /*
   1971  * AIO interface for POSIX
   1972  */
   1973 int
   1974 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
   1975     int mode, int flg)
   1976 {
   1977 	aio_req_t *reqp;
   1978 	aio_args_t *ap;
   1979 	int kerr;
   1980 
   1981 	if (aiocbp == NULL) {
   1982 		errno = EINVAL;
   1983 		return (-1);
   1984 	}
   1985 
   1986 	/* initialize kaio */
   1987 	if (!_kaio_ok)
   1988 		_kaio_init();
   1989 
   1990 	aiocbp->aio_state = NOCHECK;
   1991 
   1992 	/*
   1993 	 * If we have been called because a list I/O
   1994 	 * kaio() failed, we dont want to repeat the
   1995 	 * system call
   1996 	 */
   1997 
   1998 	if (flg & AIO_KAIO) {
   1999 		/*
   2000 		 * Try kernel aio first.
   2001 		 * If errno is ENOTSUP/EBADFD,
   2002 		 * fall back to the thread implementation.
   2003 		 */
   2004 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
   2005 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2006 			aiocbp->aio_state = CHECK;
   2007 			kerr = (int)_kaio(mode, aiocbp);
   2008 			if (kerr == 0)
   2009 				return (0);
   2010 			if (errno != ENOTSUP && errno != EBADFD) {
   2011 				aiocbp->aio_resultp.aio_errno = errno;
   2012 				aiocbp->aio_resultp.aio_return = -1;
   2013 				aiocbp->aio_state = NOCHECK;
   2014 				return (-1);
   2015 			}
   2016 			if (errno == EBADFD)
   2017 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
   2018 		}
   2019 	}
   2020 
   2021 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2022 	aiocbp->aio_state = USERAIO;
   2023 
   2024 	if (!__uaio_ok && __uaio_init() == -1)
   2025 		return (-1);
   2026 
   2027 	if ((reqp = _aio_req_alloc()) == NULL) {
   2028 		errno = EAGAIN;
   2029 		return (-1);
   2030 	}
   2031 
   2032 	/*
   2033 	 * If an LIO request, add the list head to the aio request
   2034 	 */
   2035 	reqp->req_head = lio_head;
   2036 	reqp->req_type = AIO_POSIX_REQ;
   2037 	reqp->req_op = mode;
   2038 	reqp->req_largefile = 0;
   2039 
   2040 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
   2041 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
   2042 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
   2043 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
   2044 		reqp->req_sigevent.sigev_signo =
   2045 		    aiocbp->aio_sigevent.sigev_signo;
   2046 		reqp->req_sigevent.sigev_value.sival_ptr =
   2047 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2048 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
   2049 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2050 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
   2051 		/*
   2052 		 * Reuse the sigevent structure to contain the port number
   2053 		 * and the user value.  Same for SIGEV_THREAD, below.
   2054 		 */
   2055 		reqp->req_sigevent.sigev_signo =
   2056 		    pn->portnfy_port;
   2057 		reqp->req_sigevent.sigev_value.sival_ptr =
   2058 		    pn->portnfy_user;
   2059 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
   2060 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
   2061 		/*
   2062 		 * The sigevent structure contains the port number
   2063 		 * and the user value.  Same for SIGEV_PORT, above.
   2064 		 */
   2065 		reqp->req_sigevent.sigev_signo =
   2066 		    aiocbp->aio_sigevent.sigev_signo;
   2067 		reqp->req_sigevent.sigev_value.sival_ptr =
   2068 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2069 	}
   2070 
   2071 	reqp->req_resultp = &aiocbp->aio_resultp;
   2072 	reqp->req_aiocbp = aiocbp;
   2073 	ap = &reqp->req_args;
   2074 	ap->fd = aiocbp->aio_fildes;
   2075 	ap->buf = (caddr_t)aiocbp->aio_buf;
   2076 	ap->bufsz = aiocbp->aio_nbytes;
   2077 	ap->offset = aiocbp->aio_offset;
   2078 
   2079 	if ((flg & AIO_NO_DUPS) &&
   2080 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
   2081 		aio_panic("_aio_rw(): request already in hash table");
   2082 		_aio_req_free(reqp);
   2083 		errno = EINVAL;
   2084 		return (-1);
   2085 	}
   2086 	_aio_req_add(reqp, nextworker, mode);
   2087 	return (0);
   2088 }
   2089 
   2090 #if !defined(_LP64)
   2091 /*
   2092  * 64-bit AIO interface for POSIX
   2093  */
   2094 int
   2095 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
   2096     int mode, int flg)
   2097 {
   2098 	aio_req_t *reqp;
   2099 	aio_args_t *ap;
   2100 	int kerr;
   2101 
   2102 	if (aiocbp == NULL) {
   2103 		errno = EINVAL;
   2104 		return (-1);
   2105 	}
   2106 
   2107 	/* initialize kaio */
   2108 	if (!_kaio_ok)
   2109 		_kaio_init();
   2110 
   2111 	aiocbp->aio_state = NOCHECK;
   2112 
   2113 	/*
   2114 	 * If we have been called because a list I/O
   2115 	 * kaio() failed, we dont want to repeat the
   2116 	 * system call
   2117 	 */
   2118 
   2119 	if (flg & AIO_KAIO) {
   2120 		/*
   2121 		 * Try kernel aio first.
   2122 		 * If errno is ENOTSUP/EBADFD,
   2123 		 * fall back to the thread implementation.
   2124 		 */
   2125 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
   2126 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2127 			aiocbp->aio_state = CHECK;
   2128 			kerr = (int)_kaio(mode, aiocbp);
   2129 			if (kerr == 0)
   2130 				return (0);
   2131 			if (errno != ENOTSUP && errno != EBADFD) {
   2132 				aiocbp->aio_resultp.aio_errno = errno;
   2133 				aiocbp->aio_resultp.aio_return = -1;
   2134 				aiocbp->aio_state = NOCHECK;
   2135 				return (-1);
   2136 			}
   2137 			if (errno == EBADFD)
   2138 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
   2139 		}
   2140 	}
   2141 
   2142 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2143 	aiocbp->aio_state = USERAIO;
   2144 
   2145 	if (!__uaio_ok && __uaio_init() == -1)
   2146 		return (-1);
   2147 
   2148 	if ((reqp = _aio_req_alloc()) == NULL) {
   2149 		errno = EAGAIN;
   2150 		return (-1);
   2151 	}
   2152 
   2153 	/*
   2154 	 * If an LIO request, add the list head to the aio request
   2155 	 */
   2156 	reqp->req_head = lio_head;
   2157 	reqp->req_type = AIO_POSIX_REQ;
   2158 	reqp->req_op = mode;
   2159 	reqp->req_largefile = 1;
   2160 
   2161 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
   2162 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
   2163 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
   2164 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
   2165 		reqp->req_sigevent.sigev_signo =
   2166 		    aiocbp->aio_sigevent.sigev_signo;
   2167 		reqp->req_sigevent.sigev_value.sival_ptr =
   2168 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2169 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
   2170 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2171 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
   2172 		reqp->req_sigevent.sigev_signo =
   2173 		    pn->portnfy_port;
   2174 		reqp->req_sigevent.sigev_value.sival_ptr =
   2175 		    pn->portnfy_user;
   2176 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
   2177 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
   2178 		reqp->req_sigevent.sigev_signo =
   2179 		    aiocbp->aio_sigevent.sigev_signo;
   2180 		reqp->req_sigevent.sigev_value.sival_ptr =
   2181 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2182 	}
   2183 
   2184 	reqp->req_resultp = &aiocbp->aio_resultp;
   2185 	reqp->req_aiocbp = aiocbp;
   2186 	ap = &reqp->req_args;
   2187 	ap->fd = aiocbp->aio_fildes;
   2188 	ap->buf = (caddr_t)aiocbp->aio_buf;
   2189 	ap->bufsz = aiocbp->aio_nbytes;
   2190 	ap->offset = aiocbp->aio_offset;
   2191 
   2192 	if ((flg & AIO_NO_DUPS) &&
   2193 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
   2194 		aio_panic("_aio_rw64(): request already in hash table");
   2195 		_aio_req_free(reqp);
   2196 		errno = EINVAL;
   2197 		return (-1);
   2198 	}
   2199 	_aio_req_add(reqp, nextworker, mode);
   2200 	return (0);
   2201 }
   2202 #endif	/* !defined(_LP64) */
   2203