Home | History | Annotate | Download | only in aio
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     28 
     29 #include "lint.h"
     30 #include "thr_uberdata.h"
     31 #include "asyncio.h"
     32 #include <atomic.h>
     33 #include <sys/param.h>
     34 #include <sys/file.h>
     35 #include <sys/port.h>
     36 
     37 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
     38 static aio_req_t *_aio_req_get(aio_worker_t *);
     39 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
     40 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
     41 static void _aio_work_done(aio_worker_t *);
     42 static void _aio_enq_doneq(aio_req_t *);
     43 
     44 extern void _aio_lio_free(aio_lio_t *);
     45 
     46 extern int __fdsync(int, int);
     47 extern int __fcntl(int, int, ...);
     48 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
     49 
     50 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
     51 static void _aiodone(aio_req_t *, ssize_t, int);
     52 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
     53 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
     54 
     55 /*
     56  * switch for kernel async I/O
     57  */
     58 int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
     59 
     60 /*
     61  * Key for thread-specific data
     62  */
     63 pthread_key_t _aio_key;
     64 
     65 /*
     66  * Array for determining whether or not a file supports kaio.
     67  * Initialized in _kaio_init().
     68  */
     69 uint32_t *_kaio_supported = NULL;
     70 
     71 /*
     72  *  workers for read/write requests
     73  * (__aio_mutex lock protects circular linked list of workers)
     74  */
     75 aio_worker_t *__workers_rw;	/* circular list of AIO workers */
     76 aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
     77 int __rw_workerscnt;		/* number of read/write workers */
     78 
     79 /*
     80  * worker for notification requests.
     81  */
     82 aio_worker_t *__workers_no;	/* circular list of AIO workers */
     83 aio_worker_t *__nextworker_no;	/* next worker in list of workers */
     84 int __no_workerscnt;		/* number of write workers */
     85 
     86 aio_req_t *_aio_done_tail;		/* list of done requests */
     87 aio_req_t *_aio_done_head;
     88 
     89 mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
     90 cond_t __aio_initcv = DEFAULTCV;
     91 int __aio_initbusy = 0;
     92 
     93 mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
     94 cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
     95 
     96 pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
     97 int _sigio_enabled = 0;			/* when set, send SIGIO signal */
     98 
     99 aio_hash_t *_aio_hash;
    100 
    101 aio_req_t *_aio_doneq;			/* double linked done queue list */
    102 
    103 int _aio_donecnt = 0;
    104 int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
    105 int _aio_doneq_cnt = 0;
    106 int _aio_outstand_cnt = 0;		/* # of outstanding requests */
    107 int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
    108 int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
    109 int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
    110 int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
    111 
    112 int _max_workers = 256;			/* max number of workers permitted */
    113 int _min_workers = 4;			/* min number of workers */
    114 int _minworkload = 2;			/* min number of request in q */
    115 int _aio_worker_cnt = 0;		/* number of workers to do requests */
    116 int __uaio_ok = 0;			/* AIO has been enabled */
    117 sigset_t _worker_set;			/* worker's signal mask */
    118 
    119 int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
    120 int _aio_flags = 0;			/* see asyncio.h defines for */
    121 
    122 aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
    123 
    124 int hz;					/* clock ticks per second */
    125 
    126 static int
    127 _kaio_supported_init(void)
    128 {
    129 	void *ptr;
    130 	size_t size;
    131 
    132 	if (_kaio_supported != NULL)	/* already initialized */
    133 		return (0);
    134 
    135 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
    136 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
    137 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
    138 	if (ptr == MAP_FAILED)
    139 		return (-1);
    140 	_kaio_supported = ptr;
    141 	return (0);
    142 }
    143 
    144 /*
    145  * The aio subsystem is initialized when an AIO request is made.
    146  * Constants are initialized like the max number of workers that
    147  * the subsystem can create, and the minimum number of workers
    148  * permitted before imposing some restrictions.  Also, some
    149  * workers are created.
    150  */
    151 int
    152 __uaio_init(void)
    153 {
    154 	int ret = -1;
    155 	int i;
    156 	int cancel_state;
    157 
    158 	lmutex_lock(&__aio_initlock);
    159 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
    160 	while (__aio_initbusy)
    161 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
    162 	(void) pthread_setcancelstate(cancel_state, NULL);
    163 	if (__uaio_ok) {	/* already initialized */
    164 		lmutex_unlock(&__aio_initlock);
    165 		return (0);
    166 	}
    167 	__aio_initbusy = 1;
    168 	lmutex_unlock(&__aio_initlock);
    169 
    170 	hz = (int)sysconf(_SC_CLK_TCK);
    171 	__pid = getpid();
    172 
    173 	setup_cancelsig(SIGAIOCANCEL);
    174 
    175 	if (_kaio_supported_init() != 0)
    176 		goto out;
    177 
    178 	/*
    179 	 * Allocate and initialize the hash table.
    180 	 * Do this only once, even if __uaio_init() is called twice.
    181 	 */
    182 	if (_aio_hash == NULL) {
    183 		/* LINTED pointer cast */
    184 		_aio_hash = (aio_hash_t *)mmap(NULL,
    185 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
    186 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
    187 		if ((void *)_aio_hash == MAP_FAILED) {
    188 			_aio_hash = NULL;
    189 			goto out;
    190 		}
    191 		for (i = 0; i < HASHSZ; i++)
    192 			(void) mutex_init(&_aio_hash[i].hash_lock,
    193 			    USYNC_THREAD, NULL);
    194 	}
    195 
    196 	/*
    197 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
    198 	 */
    199 	(void) sigfillset(&_worker_set);
    200 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
    201 
    202 	/*
    203 	 * Create one worker to send asynchronous notifications.
    204 	 * Do this only once, even if __uaio_init() is called twice.
    205 	 */
    206 	if (__no_workerscnt == 0 &&
    207 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
    208 		errno = EAGAIN;
    209 		goto out;
    210 	}
    211 
    212 	/*
    213 	 * Create the minimum number of read/write workers.
    214 	 * And later check whether atleast one worker is created;
    215 	 * lwp_create() calls could fail because of segkp exhaustion.
    216 	 */
    217 	for (i = 0; i < _min_workers; i++)
    218 		(void) _aio_create_worker(NULL, AIOREAD);
    219 	if (__rw_workerscnt == 0) {
    220 		errno = EAGAIN;
    221 		goto out;
    222 	}
    223 
    224 	ret = 0;
    225 out:
    226 	lmutex_lock(&__aio_initlock);
    227 	if (ret == 0)
    228 		__uaio_ok = 1;
    229 	__aio_initbusy = 0;
    230 	(void) cond_broadcast(&__aio_initcv);
    231 	lmutex_unlock(&__aio_initlock);
    232 	return (ret);
    233 }
    234 
    235 /*
    236  * Called from close() before actually performing the real _close().
    237  */
    238 void
    239 _aio_close(int fd)
    240 {
    241 	if (fd < 0)	/* avoid cancelling everything */
    242 		return;
    243 	/*
    244 	 * Cancel all outstanding aio requests for this file descriptor.
    245 	 */
    246 	if (__uaio_ok)
    247 		(void) aiocancel_all(fd);
    248 	/*
    249 	 * If we have allocated the bit array, clear the bit for this file.
    250 	 * The next open may re-use this file descriptor and the new file
    251 	 * may have different kaio() behaviour.
    252 	 */
    253 	if (_kaio_supported != NULL)
    254 		CLEAR_KAIO_SUPPORTED(fd);
    255 }
    256 
    257 /*
    258  * special kaio cleanup thread sits in a loop in the
    259  * kernel waiting for pending kaio requests to complete.
    260  */
    261 void *
    262 _kaio_cleanup_thread(void *arg)
    263 {
    264 	if (pthread_setspecific(_aio_key, arg) != 0)
    265 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
    266 	(void) _kaio(AIOSTART);
    267 	return (arg);
    268 }
    269 
    270 /*
    271  * initialize kaio.
    272  */
    273 void
    274 _kaio_init()
    275 {
    276 	int error;
    277 	sigset_t oset;
    278 	int cancel_state;
    279 
    280 	lmutex_lock(&__aio_initlock);
    281 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
    282 	while (__aio_initbusy)
    283 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
    284 	(void) pthread_setcancelstate(cancel_state, NULL);
    285 	if (_kaio_ok) {		/* already initialized */
    286 		lmutex_unlock(&__aio_initlock);
    287 		return;
    288 	}
    289 	__aio_initbusy = 1;
    290 	lmutex_unlock(&__aio_initlock);
    291 
    292 	if (_kaio_supported_init() != 0)
    293 		error = ENOMEM;
    294 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
    295 		error = ENOMEM;
    296 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
    297 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
    298 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
    299 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
    300 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
    301 	}
    302 	if (error && _kaiowp != NULL) {
    303 		_aio_worker_free(_kaiowp);
    304 		_kaiowp = NULL;
    305 	}
    306 
    307 	lmutex_lock(&__aio_initlock);
    308 	if (error)
    309 		_kaio_ok = -1;
    310 	else
    311 		_kaio_ok = 1;
    312 	__aio_initbusy = 0;
    313 	(void) cond_broadcast(&__aio_initcv);
    314 	lmutex_unlock(&__aio_initlock);
    315 }
    316 
    317 int
    318 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
    319     aio_result_t *resultp)
    320 {
    321 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
    322 }
    323 
    324 int
    325 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
    326     aio_result_t *resultp)
    327 {
    328 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
    329 }
    330 
    331 #if !defined(_LP64)
    332 int
    333 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
    334     aio_result_t *resultp)
    335 {
    336 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
    337 }
    338 
    339 int
    340 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
    341     aio_result_t *resultp)
    342 {
    343 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
    344 }
    345 #endif	/* !defined(_LP64) */
    346 
    347 int
    348 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
    349     aio_result_t *resultp, int mode)
    350 {
    351 	aio_req_t *reqp;
    352 	aio_args_t *ap;
    353 	offset_t loffset;
    354 	struct stat64 stat64;
    355 	int error = 0;
    356 	int kerr;
    357 	int umode;
    358 
    359 	switch (whence) {
    360 
    361 	case SEEK_SET:
    362 		loffset = offset;
    363 		break;
    364 	case SEEK_CUR:
    365 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
    366 			error = -1;
    367 		else
    368 			loffset += offset;
    369 		break;
    370 	case SEEK_END:
    371 		if (fstat64(fd, &stat64) == -1)
    372 			error = -1;
    373 		else
    374 			loffset = offset + stat64.st_size;
    375 		break;
    376 	default:
    377 		errno = EINVAL;
    378 		error = -1;
    379 	}
    380 
    381 	if (error)
    382 		return (error);
    383 
    384 	/* initialize kaio */
    385 	if (!_kaio_ok)
    386 		_kaio_init();
    387 
    388 	/*
    389 	 * _aio_do_request() needs the original request code (mode) to be able
    390 	 * to choose the appropiate 32/64 bit function.  All other functions
    391 	 * only require the difference between READ and WRITE (umode).
    392 	 */
    393 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
    394 		umode = mode - AIOAREAD64;
    395 	else
    396 		umode = mode;
    397 
    398 	/*
    399 	 * Try kernel aio first.
    400 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
    401 	 */
    402 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
    403 		resultp->aio_errno = 0;
    404 		sig_mutex_lock(&__aio_mutex);
    405 		_kaio_outstand_cnt++;
    406 		sig_mutex_unlock(&__aio_mutex);
    407 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
    408 		    (umode | AIO_POLL_BIT) : umode),
    409 		    fd, buf, bufsz, loffset, resultp);
    410 		if (kerr == 0) {
    411 			return (0);
    412 		}
    413 		sig_mutex_lock(&__aio_mutex);
    414 		_kaio_outstand_cnt--;
    415 		sig_mutex_unlock(&__aio_mutex);
    416 		if (errno != ENOTSUP && errno != EBADFD)
    417 			return (-1);
    418 		if (errno == EBADFD)
    419 			SET_KAIO_NOT_SUPPORTED(fd);
    420 	}
    421 
    422 	if (!__uaio_ok && __uaio_init() == -1)
    423 		return (-1);
    424 
    425 	if ((reqp = _aio_req_alloc()) == NULL) {
    426 		errno = EAGAIN;
    427 		return (-1);
    428 	}
    429 
    430 	/*
    431 	 * _aio_do_request() checks reqp->req_op to differentiate
    432 	 * between 32 and 64 bit access.
    433 	 */
    434 	reqp->req_op = mode;
    435 	reqp->req_resultp = resultp;
    436 	ap = &reqp->req_args;
    437 	ap->fd = fd;
    438 	ap->buf = buf;
    439 	ap->bufsz = bufsz;
    440 	ap->offset = loffset;
    441 
    442 	if (_aio_hash_insert(resultp, reqp) != 0) {
    443 		_aio_req_free(reqp);
    444 		errno = EINVAL;
    445 		return (-1);
    446 	}
    447 	/*
    448 	 * _aio_req_add() only needs the difference between READ and
    449 	 * WRITE to choose the right worker queue.
    450 	 */
    451 	_aio_req_add(reqp, &__nextworker_rw, umode);
    452 	return (0);
    453 }
    454 
    455 int
    456 aiocancel(aio_result_t *resultp)
    457 {
    458 	aio_req_t *reqp;
    459 	aio_worker_t *aiowp;
    460 	int ret;
    461 	int done = 0;
    462 	int canceled = 0;
    463 
    464 	if (!__uaio_ok) {
    465 		errno = EINVAL;
    466 		return (-1);
    467 	}
    468 
    469 	sig_mutex_lock(&__aio_mutex);
    470 	reqp = _aio_hash_find(resultp);
    471 	if (reqp == NULL) {
    472 		if (_aio_outstand_cnt == _aio_req_done_cnt)
    473 			errno = EINVAL;
    474 		else
    475 			errno = EACCES;
    476 		ret = -1;
    477 	} else {
    478 		aiowp = reqp->req_worker;
    479 		sig_mutex_lock(&aiowp->work_qlock1);
    480 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
    481 		sig_mutex_unlock(&aiowp->work_qlock1);
    482 
    483 		if (canceled) {
    484 			ret = 0;
    485 		} else {
    486 			if (_aio_outstand_cnt == 0 ||
    487 			    _aio_outstand_cnt == _aio_req_done_cnt)
    488 				errno = EINVAL;
    489 			else
    490 				errno = EACCES;
    491 			ret = -1;
    492 		}
    493 	}
    494 	sig_mutex_unlock(&__aio_mutex);
    495 	return (ret);
    496 }
    497 
    498 /* ARGSUSED */
    499 static void
    500 _aiowait_cleanup(void *arg)
    501 {
    502 	sig_mutex_lock(&__aio_mutex);
    503 	_aiowait_flag--;
    504 	sig_mutex_unlock(&__aio_mutex);
    505 }
    506 
    507 /*
    508  * This must be asynch safe and cancel safe
    509  */
    510 aio_result_t *
    511 aiowait(struct timeval *uwait)
    512 {
    513 	aio_result_t *uresultp;
    514 	aio_result_t *kresultp;
    515 	aio_result_t *resultp;
    516 	int dontblock;
    517 	int timedwait = 0;
    518 	int kaio_errno = 0;
    519 	struct timeval twait;
    520 	struct timeval *wait = NULL;
    521 	hrtime_t hrtend;
    522 	hrtime_t hres;
    523 
    524 	if (uwait) {
    525 		/*
    526 		 * Check for a valid specified wait time.
    527 		 * If it is invalid, fail the call right away.
    528 		 */
    529 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
    530 		    uwait->tv_usec >= MICROSEC) {
    531 			errno = EINVAL;
    532 			return ((aio_result_t *)-1);
    533 		}
    534 
    535 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
    536 			hrtend = gethrtime() +
    537 			    (hrtime_t)uwait->tv_sec * NANOSEC +
    538 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
    539 			twait = *uwait;
    540 			wait = &twait;
    541 			timedwait++;
    542 		} else {
    543 			/* polling */
    544 			sig_mutex_lock(&__aio_mutex);
    545 			if (_kaio_outstand_cnt == 0) {
    546 				kresultp = (aio_result_t *)-1;
    547 			} else {
    548 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
    549 				    (struct timeval *)-1, 1);
    550 				if (kresultp != (aio_result_t *)-1 &&
    551 				    kresultp != NULL &&
    552 				    kresultp != (aio_result_t *)1) {
    553 					_kaio_outstand_cnt--;
    554 					sig_mutex_unlock(&__aio_mutex);
    555 					return (kresultp);
    556 				}
    557 			}
    558 			uresultp = _aio_req_done();
    559 			sig_mutex_unlock(&__aio_mutex);
    560 			if (uresultp != NULL &&
    561 			    uresultp != (aio_result_t *)-1) {
    562 				return (uresultp);
    563 			}
    564 			if (uresultp == (aio_result_t *)-1 &&
    565 			    kresultp == (aio_result_t *)-1) {
    566 				errno = EINVAL;
    567 				return ((aio_result_t *)-1);
    568 			} else {
    569 				return (NULL);
    570 			}
    571 		}
    572 	}
    573 
    574 	for (;;) {
    575 		sig_mutex_lock(&__aio_mutex);
    576 		uresultp = _aio_req_done();
    577 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
    578 			sig_mutex_unlock(&__aio_mutex);
    579 			resultp = uresultp;
    580 			break;
    581 		}
    582 		_aiowait_flag++;
    583 		dontblock = (uresultp == (aio_result_t *)-1);
    584 		if (dontblock && _kaio_outstand_cnt == 0) {
    585 			kresultp = (aio_result_t *)-1;
    586 			kaio_errno = EINVAL;
    587 		} else {
    588 			sig_mutex_unlock(&__aio_mutex);
    589 			pthread_cleanup_push(_aiowait_cleanup, NULL);
    590 			_cancel_prologue();
    591 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
    592 			    wait, dontblock);
    593 			_cancel_epilogue();
    594 			pthread_cleanup_pop(0);
    595 			sig_mutex_lock(&__aio_mutex);
    596 			kaio_errno = errno;
    597 		}
    598 		_aiowait_flag--;
    599 		sig_mutex_unlock(&__aio_mutex);
    600 		if (kresultp == (aio_result_t *)1) {
    601 			/* aiowait() awakened by an aionotify() */
    602 			continue;
    603 		} else if (kresultp != NULL &&
    604 		    kresultp != (aio_result_t *)-1) {
    605 			resultp = kresultp;
    606 			sig_mutex_lock(&__aio_mutex);
    607 			_kaio_outstand_cnt--;
    608 			sig_mutex_unlock(&__aio_mutex);
    609 			break;
    610 		} else if (kresultp == (aio_result_t *)-1 &&
    611 		    kaio_errno == EINVAL &&
    612 		    uresultp == (aio_result_t *)-1) {
    613 			errno = kaio_errno;
    614 			resultp = (aio_result_t *)-1;
    615 			break;
    616 		} else if (kresultp == (aio_result_t *)-1 &&
    617 		    kaio_errno == EINTR) {
    618 			errno = kaio_errno;
    619 			resultp = (aio_result_t *)-1;
    620 			break;
    621 		} else if (timedwait) {
    622 			hres = hrtend - gethrtime();
    623 			if (hres <= 0) {
    624 				/* time is up; return */
    625 				resultp = NULL;
    626 				break;
    627 			} else {
    628 				/*
    629 				 * Some time left.  Round up the remaining time
    630 				 * in nanoseconds to microsec.  Retry the call.
    631 				 */
    632 				hres += (NANOSEC / MICROSEC) - 1;
    633 				wait->tv_sec = hres / NANOSEC;
    634 				wait->tv_usec =
    635 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
    636 			}
    637 		} else {
    638 			ASSERT(kresultp == NULL && uresultp == NULL);
    639 			resultp = NULL;
    640 			continue;
    641 		}
    642 	}
    643 	return (resultp);
    644 }
    645 
    646 /*
    647  * _aio_get_timedelta calculates the remaining time and stores the result
    648  * into timespec_t *wait.
    649  */
    650 
    651 int
    652 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
    653 {
    654 	int	ret = 0;
    655 	struct	timeval cur;
    656 	timespec_t curtime;
    657 
    658 	(void) gettimeofday(&cur, NULL);
    659 	curtime.tv_sec = cur.tv_sec;
    660 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
    661 
    662 	if (end->tv_sec >= curtime.tv_sec) {
    663 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
    664 		if (end->tv_nsec >= curtime.tv_nsec) {
    665 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
    666 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
    667 				ret = -1;	/* timer expired */
    668 		} else {
    669 			if (end->tv_sec > curtime.tv_sec) {
    670 				wait->tv_sec -= 1;
    671 				wait->tv_nsec = NANOSEC -
    672 				    (curtime.tv_nsec - end->tv_nsec);
    673 			} else {
    674 				ret = -1;	/* timer expired */
    675 			}
    676 		}
    677 	} else {
    678 		ret = -1;
    679 	}
    680 	return (ret);
    681 }
    682 
    683 /*
    684  * If closing by file descriptor: we will simply cancel all the outstanding
    685  * aio`s and return.  Those aio's in question will have either noticed the
    686  * cancellation notice before, during, or after initiating io.
    687  */
    688 int
    689 aiocancel_all(int fd)
    690 {
    691 	aio_req_t *reqp;
    692 	aio_req_t **reqpp, *last;
    693 	aio_worker_t *first;
    694 	aio_worker_t *next;
    695 	int canceled = 0;
    696 	int done = 0;
    697 	int cancelall = 0;
    698 
    699 	sig_mutex_lock(&__aio_mutex);
    700 
    701 	if (_aio_outstand_cnt == 0) {
    702 		sig_mutex_unlock(&__aio_mutex);
    703 		return (AIO_ALLDONE);
    704 	}
    705 
    706 	/*
    707 	 * Cancel requests from the read/write workers' queues.
    708 	 */
    709 	first = __nextworker_rw;
    710 	next = first;
    711 	do {
    712 		_aio_cancel_work(next, fd, &canceled, &done);
    713 	} while ((next = next->work_forw) != first);
    714 
    715 	/*
    716 	 * finally, check if there are requests on the done queue that
    717 	 * should be canceled.
    718 	 */
    719 	if (fd < 0)
    720 		cancelall = 1;
    721 	reqpp = &_aio_done_tail;
    722 	last = _aio_done_tail;
    723 	while ((reqp = *reqpp) != NULL) {
    724 		if (cancelall || reqp->req_args.fd == fd) {
    725 			*reqpp = reqp->req_next;
    726 			if (last == reqp) {
    727 				last = reqp->req_next;
    728 			}
    729 			if (_aio_done_head == reqp) {
    730 				/* this should be the last req in list */
    731 				_aio_done_head = last;
    732 			}
    733 			_aio_donecnt--;
    734 			_aio_set_result(reqp, -1, ECANCELED);
    735 			(void) _aio_hash_del(reqp->req_resultp);
    736 			_aio_req_free(reqp);
    737 		} else {
    738 			reqpp = &reqp->req_next;
    739 			last = reqp;
    740 		}
    741 	}
    742 
    743 	if (cancelall) {
    744 		ASSERT(_aio_donecnt == 0);
    745 		_aio_done_head = NULL;
    746 	}
    747 	sig_mutex_unlock(&__aio_mutex);
    748 
    749 	if (canceled && done == 0)
    750 		return (AIO_CANCELED);
    751 	else if (done && canceled == 0)
    752 		return (AIO_ALLDONE);
    753 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
    754 		return ((int)_kaio(AIOCANCEL, fd, NULL));
    755 	return (AIO_NOTCANCELED);
    756 }
    757 
    758 /*
    759  * Cancel requests from a given work queue.  If the file descriptor
    760  * parameter, fd, is non-negative, then only cancel those requests
    761  * in this queue that are to this file descriptor.  If the fd
    762  * parameter is -1, then cancel all requests.
    763  */
    764 static void
    765 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
    766 {
    767 	aio_req_t *reqp;
    768 
    769 	sig_mutex_lock(&aiowp->work_qlock1);
    770 	/*
    771 	 * cancel queued requests first.
    772 	 */
    773 	reqp = aiowp->work_tail1;
    774 	while (reqp != NULL) {
    775 		if (fd < 0 || reqp->req_args.fd == fd) {
    776 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
    777 				/*
    778 				 * Callers locks were dropped.
    779 				 * reqp is invalid; start traversing
    780 				 * the list from the beginning again.
    781 				 */
    782 				reqp = aiowp->work_tail1;
    783 				continue;
    784 			}
    785 		}
    786 		reqp = reqp->req_next;
    787 	}
    788 	/*
    789 	 * Since the queued requests have been canceled, there can
    790 	 * only be one inprogress request that should be canceled.
    791 	 */
    792 	if ((reqp = aiowp->work_req) != NULL &&
    793 	    (fd < 0 || reqp->req_args.fd == fd))
    794 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
    795 	sig_mutex_unlock(&aiowp->work_qlock1);
    796 }
    797 
    798 /*
    799  * Cancel a request.  Return 1 if the callers locks were temporarily
    800  * dropped, otherwise return 0.
    801  */
    802 int
    803 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
    804 {
    805 	int ostate = reqp->req_state;
    806 
    807 	ASSERT(MUTEX_HELD(&__aio_mutex));
    808 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
    809 	if (ostate == AIO_REQ_CANCELED)
    810 		return (0);
    811 	if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
    812 	    aiowp->work_prev1 == reqp) {
    813 		ASSERT(aiowp->work_done1 != 0);
    814 		/*
    815 		 * If not on the done queue yet, just mark it CANCELED,
    816 		 * _aio_work_done() will do the necessary clean up.
    817 		 * This is required to ensure that aiocancel_all() cancels
    818 		 * all the outstanding requests, including this one which
    819 		 * is not yet on done queue but has been marked done.
    820 		 */
    821 		_aio_set_result(reqp, -1, ECANCELED);
    822 		(void) _aio_hash_del(reqp->req_resultp);
    823 		reqp->req_state = AIO_REQ_CANCELED;
    824 		(*canceled)++;
    825 		return (0);
    826 	}
    827 
    828 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
    829 		(*done)++;
    830 		return (0);
    831 	}
    832 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
    833 		ASSERT(POSIX_AIO(reqp));
    834 		/* Cancel the queued aio_fsync() request */
    835 		if (!reqp->req_head->lio_canned) {
    836 			reqp->req_head->lio_canned = 1;
    837 			_aio_outstand_cnt--;
    838 			(*canceled)++;
    839 		}
    840 		return (0);
    841 	}
    842 	reqp->req_state = AIO_REQ_CANCELED;
    843 	_aio_req_del(aiowp, reqp, ostate);
    844 	(void) _aio_hash_del(reqp->req_resultp);
    845 	(*canceled)++;
    846 	if (reqp == aiowp->work_req) {
    847 		ASSERT(ostate == AIO_REQ_INPROGRESS);
    848 		/*
    849 		 * Set the result values now, before _aiodone() is called.
    850 		 * We do this because the application can expect aio_return
    851 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
    852 		 * immediately after a successful return from aiocancel()
    853 		 * or aio_cancel().
    854 		 */
    855 		_aio_set_result(reqp, -1, ECANCELED);
    856 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
    857 		return (0);
    858 	}
    859 	if (!POSIX_AIO(reqp)) {
    860 		_aio_outstand_cnt--;
    861 		_aio_set_result(reqp, -1, ECANCELED);
    862 		_aio_req_free(reqp);
    863 		return (0);
    864 	}
    865 	sig_mutex_unlock(&aiowp->work_qlock1);
    866 	sig_mutex_unlock(&__aio_mutex);
    867 	_aiodone(reqp, -1, ECANCELED);
    868 	sig_mutex_lock(&__aio_mutex);
    869 	sig_mutex_lock(&aiowp->work_qlock1);
    870 	return (1);
    871 }
    872 
    873 int
    874 _aio_create_worker(aio_req_t *reqp, int mode)
    875 {
    876 	aio_worker_t *aiowp, **workers, **nextworker;
    877 	int *aio_workerscnt;
    878 	void *(*func)(void *);
    879 	sigset_t oset;
    880 	int error;
    881 
    882 	/*
    883 	 * Put the new worker thread in the right queue.
    884 	 */
    885 	switch (mode) {
    886 	case AIOREAD:
    887 	case AIOWRITE:
    888 	case AIOAREAD:
    889 	case AIOAWRITE:
    890 #if !defined(_LP64)
    891 	case AIOAREAD64:
    892 	case AIOAWRITE64:
    893 #endif
    894 		workers = &__workers_rw;
    895 		nextworker = &__nextworker_rw;
    896 		aio_workerscnt = &__rw_workerscnt;
    897 		func = _aio_do_request;
    898 		break;
    899 	case AIONOTIFY:
    900 		workers = &__workers_no;
    901 		nextworker = &__nextworker_no;
    902 		func = _aio_do_notify;
    903 		aio_workerscnt = &__no_workerscnt;
    904 		break;
    905 	default:
    906 		aio_panic("_aio_create_worker: invalid mode");
    907 		break;
    908 	}
    909 
    910 	if ((aiowp = _aio_worker_alloc()) == NULL)
    911 		return (-1);
    912 
    913 	if (reqp) {
    914 		reqp->req_state = AIO_REQ_QUEUED;
    915 		reqp->req_worker = aiowp;
    916 		aiowp->work_head1 = reqp;
    917 		aiowp->work_tail1 = reqp;
    918 		aiowp->work_next1 = reqp;
    919 		aiowp->work_count1 = 1;
    920 		aiowp->work_minload1 = 1;
    921 	}
    922 
    923 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
    924 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
    925 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
    926 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
    927 	if (error) {
    928 		if (reqp) {
    929 			reqp->req_state = 0;
    930 			reqp->req_worker = NULL;
    931 		}
    932 		_aio_worker_free(aiowp);
    933 		return (-1);
    934 	}
    935 
    936 	lmutex_lock(&__aio_mutex);
    937 	(*aio_workerscnt)++;
    938 	if (*workers == NULL) {
    939 		aiowp->work_forw = aiowp;
    940 		aiowp->work_backw = aiowp;
    941 		*nextworker = aiowp;
    942 		*workers = aiowp;
    943 	} else {
    944 		aiowp->work_backw = (*workers)->work_backw;
    945 		aiowp->work_forw = (*workers);
    946 		(*workers)->work_backw->work_forw = aiowp;
    947 		(*workers)->work_backw = aiowp;
    948 	}
    949 	_aio_worker_cnt++;
    950 	lmutex_unlock(&__aio_mutex);
    951 
    952 	(void) thr_continue(aiowp->work_tid);
    953 
    954 	return (0);
    955 }
    956 
    957 /*
    958  * This is the worker's main routine.
    959  * The task of this function is to execute all queued requests;
    960  * once the last pending request is executed this function will block
    961  * in _aio_idle().  A new incoming request must wakeup this thread to
    962  * restart the work.
    963  * Every worker has an own work queue.  The queue lock is required
    964  * to synchronize the addition of new requests for this worker or
    965  * cancellation of pending/running requests.
    966  *
    967  * Cancellation scenarios:
    968  * The cancellation of a request is being done asynchronously using
    969  * _aio_cancel_req() from another thread context.
    970  * A queued request can be cancelled in different manners :
    971  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
    972  *	- lock the queue -> remove the request -> unlock the queue
    973  *	- this function/thread does not detect this cancellation process
    974  * b) request is in progress (AIO_REQ_INPROGRESS) :
    975  *	- this function first allow the cancellation of the running
    976  *	  request with the flag "work_cancel_flg=1"
    977  * 		see _aio_req_get() -> _aio_cancel_on()
    978  *	  During this phase, it is allowed to interrupt the worker
    979  *	  thread running the request (this thread) using the SIGAIOCANCEL
    980  *	  signal.
    981  *	  Once this thread returns from the kernel (because the request
    982  *	  is just done), then it must disable a possible cancellation
    983  *	  and proceed to finish the request.  To disable the cancellation
    984  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
    985  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
    986  *	  same procedure as in a)
    987  *
    988  * To b)
    989  *	This thread uses sigsetjmp() to define the position in the code, where
    990  *	it wish to continue working in the case that a SIGAIOCANCEL signal
    991  *	is detected.
    992  *	Normally this thread should get the cancellation signal during the
    993  *	kernel phase (reading or writing).  In that case the signal handler
    994  *	aiosigcancelhndlr() is activated using the worker thread context,
    995  *	which again will use the siglongjmp() function to break the standard
    996  *	code flow and jump to the "sigsetjmp" position, provided that
    997  *	"work_cancel_flg" is set to "1".
    998  *	Because the "work_cancel_flg" is only manipulated by this worker
    999  *	thread and it can only run on one CPU at a given time, it is not
   1000  *	necessary to protect that flag with the queue lock.
   1001  *	Returning from the kernel (read or write system call) we must
   1002  *	first disable the use of the SIGAIOCANCEL signal and accordingly
   1003  *	the use of the siglongjmp() function to prevent a possible deadlock:
   1004  *	- It can happens that this worker thread returns from the kernel and
   1005  *	  blocks in "work_qlock1",
   1006  *	- then a second thread cancels the apparently "in progress" request
   1007  *	  and sends the SIGAIOCANCEL signal to the worker thread,
   1008  *	- the worker thread gets assigned the "work_qlock1" and will returns
   1009  *	  from the kernel,
   1010  *	- the kernel detects the pending signal and activates the signal
   1011  *	  handler instead,
   1012  *	- if the "work_cancel_flg" is still set then the signal handler
   1013  *	  should use siglongjmp() to cancel the "in progress" request and
   1014  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
   1015  *	  for a second time => deadlock.
   1016  *	To avoid that situation we disable the cancellation of the request
   1017  *	in progress BEFORE we try to acquire the work_qlock1.
   1018  *	In that case the signal handler will not call siglongjmp() and the
   1019  *	worker thread will continue running the standard code flow.
   1020  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
   1021  *	an eventually required siglongjmp() freeing the work_qlock1 and
   1022  *	avoiding a deadlock.
   1023  */
   1024 void *
   1025 _aio_do_request(void *arglist)
   1026 {
   1027 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
   1028 	ulwp_t *self = curthread;
   1029 	struct aio_args *arg;
   1030 	aio_req_t *reqp;		/* current AIO request */
   1031 	ssize_t retval;
   1032 	int append;
   1033 	int error;
   1034 
   1035 	if (pthread_setspecific(_aio_key, aiowp) != 0)
   1036 		aio_panic("_aio_do_request, pthread_setspecific()");
   1037 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
   1038 	ASSERT(aiowp->work_req == NULL);
   1039 
   1040 	/*
   1041 	 * We resume here when an operation is cancelled.
   1042 	 * On first entry, aiowp->work_req == NULL, so all
   1043 	 * we do is block SIGAIOCANCEL.
   1044 	 */
   1045 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
   1046 	ASSERT(self->ul_sigdefer == 0);
   1047 
   1048 	sigoff(self);	/* block SIGAIOCANCEL */
   1049 	if (aiowp->work_req != NULL)
   1050 		_aio_finish_request(aiowp, -1, ECANCELED);
   1051 
   1052 	for (;;) {
   1053 		/*
   1054 		 * Put completed requests on aio_done_list.  This has
   1055 		 * to be done as part of the main loop to ensure that
   1056 		 * we don't artificially starve any aiowait'ers.
   1057 		 */
   1058 		if (aiowp->work_done1)
   1059 			_aio_work_done(aiowp);
   1060 
   1061 top:
   1062 		/* consume any deferred SIGAIOCANCEL signal here */
   1063 		sigon(self);
   1064 		sigoff(self);
   1065 
   1066 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
   1067 			if (_aio_idle(aiowp) != 0)
   1068 				goto top;
   1069 		}
   1070 		arg = &reqp->req_args;
   1071 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
   1072 		    reqp->req_state == AIO_REQ_CANCELED);
   1073 		error = 0;
   1074 
   1075 		switch (reqp->req_op) {
   1076 		case AIOREAD:
   1077 		case AIOAREAD:
   1078 			sigon(self);	/* unblock SIGAIOCANCEL */
   1079 			retval = pread(arg->fd, arg->buf,
   1080 			    arg->bufsz, arg->offset);
   1081 			if (retval == -1) {
   1082 				if (errno == ESPIPE) {
   1083 					retval = read(arg->fd,
   1084 					    arg->buf, arg->bufsz);
   1085 					if (retval == -1)
   1086 						error = errno;
   1087 				} else {
   1088 					error = errno;
   1089 				}
   1090 			}
   1091 			sigoff(self);	/* block SIGAIOCANCEL */
   1092 			break;
   1093 		case AIOWRITE:
   1094 		case AIOAWRITE:
   1095 			/*
   1096 			 * The SUSv3 POSIX spec for aio_write() states:
   1097 			 *	If O_APPEND is set for the file descriptor,
   1098 			 *	write operations append to the file in the
   1099 			 *	same order as the calls were made.
   1100 			 * but, somewhat inconsistently, it requires pwrite()
   1101 			 * to ignore the O_APPEND setting.  So we have to use
   1102 			 * fcntl() to get the open modes and call write() for
   1103 			 * the O_APPEND case.
   1104 			 */
   1105 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
   1106 			sigon(self);	/* unblock SIGAIOCANCEL */
   1107 			retval = append?
   1108 			    write(arg->fd, arg->buf, arg->bufsz) :
   1109 			    pwrite(arg->fd, arg->buf, arg->bufsz,
   1110 			    arg->offset);
   1111 			if (retval == -1) {
   1112 				if (errno == ESPIPE) {
   1113 					retval = write(arg->fd,
   1114 					    arg->buf, arg->bufsz);
   1115 					if (retval == -1)
   1116 						error = errno;
   1117 				} else {
   1118 					error = errno;
   1119 				}
   1120 			}
   1121 			sigoff(self);	/* block SIGAIOCANCEL */
   1122 			break;
   1123 #if !defined(_LP64)
   1124 		case AIOAREAD64:
   1125 			sigon(self);	/* unblock SIGAIOCANCEL */
   1126 			retval = pread64(arg->fd, arg->buf,
   1127 			    arg->bufsz, arg->offset);
   1128 			if (retval == -1) {
   1129 				if (errno == ESPIPE) {
   1130 					retval = read(arg->fd,
   1131 					    arg->buf, arg->bufsz);
   1132 					if (retval == -1)
   1133 						error = errno;
   1134 				} else {
   1135 					error = errno;
   1136 				}
   1137 			}
   1138 			sigoff(self);	/* block SIGAIOCANCEL */
   1139 			break;
   1140 		case AIOAWRITE64:
   1141 			/*
   1142 			 * The SUSv3 POSIX spec for aio_write() states:
   1143 			 *	If O_APPEND is set for the file descriptor,
   1144 			 *	write operations append to the file in the
   1145 			 *	same order as the calls were made.
   1146 			 * but, somewhat inconsistently, it requires pwrite()
   1147 			 * to ignore the O_APPEND setting.  So we have to use
   1148 			 * fcntl() to get the open modes and call write() for
   1149 			 * the O_APPEND case.
   1150 			 */
   1151 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
   1152 			sigon(self);	/* unblock SIGAIOCANCEL */
   1153 			retval = append?
   1154 			    write(arg->fd, arg->buf, arg->bufsz) :
   1155 			    pwrite64(arg->fd, arg->buf, arg->bufsz,
   1156 			    arg->offset);
   1157 			if (retval == -1) {
   1158 				if (errno == ESPIPE) {
   1159 					retval = write(arg->fd,
   1160 					    arg->buf, arg->bufsz);
   1161 					if (retval == -1)
   1162 						error = errno;
   1163 				} else {
   1164 					error = errno;
   1165 				}
   1166 			}
   1167 			sigoff(self);	/* block SIGAIOCANCEL */
   1168 			break;
   1169 #endif	/* !defined(_LP64) */
   1170 		case AIOFSYNC:
   1171 			if (_aio_fsync_del(aiowp, reqp))
   1172 				goto top;
   1173 			ASSERT(reqp->req_head == NULL);
   1174 			/*
   1175 			 * All writes for this fsync request are now
   1176 			 * acknowledged.  Now make these writes visible
   1177 			 * and put the final request into the hash table.
   1178 			 */
   1179 			if (reqp->req_state == AIO_REQ_CANCELED) {
   1180 				/* EMPTY */;
   1181 			} else if (arg->offset == O_SYNC) {
   1182 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
   1183 					error = errno;
   1184 			} else {
   1185 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
   1186 					error = errno;
   1187 			}
   1188 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
   1189 				aio_panic("_aio_do_request(): AIOFSYNC: "
   1190 				    "request already in hash table");
   1191 			break;
   1192 		default:
   1193 			aio_panic("_aio_do_request, bad op");
   1194 		}
   1195 
   1196 		_aio_finish_request(aiowp, retval, error);
   1197 	}
   1198 	/* NOTREACHED */
   1199 	return (NULL);
   1200 }
   1201 
   1202 /*
   1203  * Perform the tail processing for _aio_do_request().
   1204  * The in-progress request may or may not have been cancelled.
   1205  */
   1206 static void
   1207 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
   1208 {
   1209 	aio_req_t *reqp;
   1210 
   1211 	sig_mutex_lock(&aiowp->work_qlock1);
   1212 	if ((reqp = aiowp->work_req) == NULL)
   1213 		sig_mutex_unlock(&aiowp->work_qlock1);
   1214 	else {
   1215 		aiowp->work_req = NULL;
   1216 		if (reqp->req_state == AIO_REQ_CANCELED) {
   1217 			retval = -1;
   1218 			error = ECANCELED;
   1219 		}
   1220 		if (!POSIX_AIO(reqp)) {
   1221 			int notify;
   1222 			if (reqp->req_state == AIO_REQ_INPROGRESS) {
   1223 				reqp->req_state = AIO_REQ_DONE;
   1224 				_aio_set_result(reqp, retval, error);
   1225 			}
   1226 			sig_mutex_unlock(&aiowp->work_qlock1);
   1227 			sig_mutex_lock(&__aio_mutex);
   1228 			/*
   1229 			 * If it was canceled, this request will not be
   1230 			 * added to done list. Just free it.
   1231 			 */
   1232 			if (error == ECANCELED) {
   1233 				_aio_outstand_cnt--;
   1234 				_aio_req_free(reqp);
   1235 			} else {
   1236 				_aio_req_done_cnt++;
   1237 			}
   1238 			/*
   1239 			 * Notify any thread that may have blocked
   1240 			 * because it saw an outstanding request.
   1241 			 */
   1242 			notify = 0;
   1243 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
   1244 				notify = 1;
   1245 			}
   1246 			sig_mutex_unlock(&__aio_mutex);
   1247 			if (notify) {
   1248 				(void) _kaio(AIONOTIFY);
   1249 			}
   1250 		} else {
   1251 			if (reqp->req_state == AIO_REQ_INPROGRESS)
   1252 				reqp->req_state = AIO_REQ_DONE;
   1253 			sig_mutex_unlock(&aiowp->work_qlock1);
   1254 			_aiodone(reqp, retval, error);
   1255 		}
   1256 	}
   1257 }
   1258 
   1259 void
   1260 _aio_req_mark_done(aio_req_t *reqp)
   1261 {
   1262 #if !defined(_LP64)
   1263 	if (reqp->req_largefile)
   1264 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
   1265 	else
   1266 #endif
   1267 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
   1268 }
   1269 
   1270 /*
   1271  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
   1272  * hopefully to consume one of our queued signals.
   1273  */
   1274 static void
   1275 _aio_delay(int ticks)
   1276 {
   1277 	(void) usleep(ticks * (MICROSEC / hz));
   1278 }
   1279 
   1280 /*
   1281  * Actually send the notifications.
   1282  * We could block indefinitely here if the application
   1283  * is not listening for the signal or port notifications.
   1284  */
   1285 static void
   1286 send_notification(notif_param_t *npp)
   1287 {
   1288 	extern int __sigqueue(pid_t pid, int signo,
   1289 	    /* const union sigval */ void *value, int si_code, int block);
   1290 
   1291 	if (npp->np_signo)
   1292 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
   1293 		    SI_ASYNCIO, 1);
   1294 	else if (npp->np_port >= 0)
   1295 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
   1296 		    npp->np_event, npp->np_object, npp->np_user);
   1297 
   1298 	if (npp->np_lio_signo)
   1299 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
   1300 		    SI_ASYNCIO, 1);
   1301 	else if (npp->np_lio_port >= 0)
   1302 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
   1303 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
   1304 }
   1305 
   1306 /*
   1307  * Asynchronous notification worker.
   1308  */
   1309 void *
   1310 _aio_do_notify(void *arg)
   1311 {
   1312 	aio_worker_t *aiowp = (aio_worker_t *)arg;
   1313 	aio_req_t *reqp;
   1314 
   1315 	/*
   1316 	 * This isn't really necessary.  All signals are blocked.
   1317 	 */
   1318 	if (pthread_setspecific(_aio_key, aiowp) != 0)
   1319 		aio_panic("_aio_do_notify, pthread_setspecific()");
   1320 
   1321 	/*
   1322 	 * Notifications are never cancelled.
   1323 	 * All signals remain blocked, forever.
   1324 	 */
   1325 	for (;;) {
   1326 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
   1327 			if (_aio_idle(aiowp) != 0)
   1328 				aio_panic("_aio_do_notify: _aio_idle() failed");
   1329 		}
   1330 		send_notification(&reqp->req_notify);
   1331 		_aio_req_free(reqp);
   1332 	}
   1333 
   1334 	/* NOTREACHED */
   1335 	return (NULL);
   1336 }
   1337 
   1338 /*
   1339  * Do the completion semantics for a request that was either canceled
   1340  * by _aio_cancel_req() or was completed by _aio_do_request().
   1341  */
   1342 static void
   1343 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
   1344 {
   1345 	aio_result_t *resultp = reqp->req_resultp;
   1346 	int notify = 0;
   1347 	aio_lio_t *head;
   1348 	int sigev_none;
   1349 	int sigev_signal;
   1350 	int sigev_thread;
   1351 	int sigev_port;
   1352 	notif_param_t np;
   1353 
   1354 	/*
   1355 	 * We call _aiodone() only for Posix I/O.
   1356 	 */
   1357 	ASSERT(POSIX_AIO(reqp));
   1358 
   1359 	sigev_none = 0;
   1360 	sigev_signal = 0;
   1361 	sigev_thread = 0;
   1362 	sigev_port = 0;
   1363 	np.np_signo = 0;
   1364 	np.np_port = -1;
   1365 	np.np_lio_signo = 0;
   1366 	np.np_lio_port = -1;
   1367 
   1368 	switch (reqp->req_sigevent.sigev_notify) {
   1369 	case SIGEV_NONE:
   1370 		sigev_none = 1;
   1371 		break;
   1372 	case SIGEV_SIGNAL:
   1373 		sigev_signal = 1;
   1374 		break;
   1375 	case SIGEV_THREAD:
   1376 		sigev_thread = 1;
   1377 		break;
   1378 	case SIGEV_PORT:
   1379 		sigev_port = 1;
   1380 		break;
   1381 	default:
   1382 		aio_panic("_aiodone: improper sigev_notify");
   1383 		break;
   1384 	}
   1385 
   1386 	/*
   1387 	 * Figure out the notification parameters while holding __aio_mutex.
   1388 	 * Actually perform the notifications after dropping __aio_mutex.
   1389 	 * This allows us to sleep for a long time (if the notifications
   1390 	 * incur delays) without impeding other async I/O operations.
   1391 	 */
   1392 
   1393 	sig_mutex_lock(&__aio_mutex);
   1394 
   1395 	if (sigev_signal) {
   1396 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
   1397 			notify = 1;
   1398 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
   1399 	} else if (sigev_thread | sigev_port) {
   1400 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
   1401 			notify = 1;
   1402 		np.np_event = reqp->req_op;
   1403 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
   1404 			np.np_event = AIOFSYNC64;
   1405 		np.np_object = (uintptr_t)reqp->req_aiocbp;
   1406 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
   1407 	}
   1408 
   1409 	if (resultp->aio_errno == EINPROGRESS)
   1410 		_aio_set_result(reqp, retval, error);
   1411 
   1412 	_aio_outstand_cnt--;
   1413 
   1414 	head = reqp->req_head;
   1415 	reqp->req_head = NULL;
   1416 
   1417 	if (sigev_none) {
   1418 		_aio_enq_doneq(reqp);
   1419 		reqp = NULL;
   1420 	} else {
   1421 		(void) _aio_hash_del(resultp);
   1422 		_aio_req_mark_done(reqp);
   1423 	}
   1424 
   1425 	_aio_waitn_wakeup();
   1426 
   1427 	/*
   1428 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
   1429 	 * __aio_suspend() increments "_aio_kernel_suspend"
   1430 	 * when they are waiting in the kernel for completed I/Os.
   1431 	 *
   1432 	 * _kaio(AIONOTIFY) awakes the corresponding function
   1433 	 * in the kernel; then the corresponding __aio_waitn() or
   1434 	 * __aio_suspend() function could reap the recently
   1435 	 * completed I/Os (_aiodone()).
   1436 	 */
   1437 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
   1438 		(void) _kaio(AIONOTIFY);
   1439 
   1440 	sig_mutex_unlock(&__aio_mutex);
   1441 
   1442 	if (head != NULL) {
   1443 		/*
   1444 		 * If all the lio requests have completed,
   1445 		 * prepare to notify the waiting thread.
   1446 		 */
   1447 		sig_mutex_lock(&head->lio_mutex);
   1448 		ASSERT(head->lio_refcnt == head->lio_nent);
   1449 		if (head->lio_refcnt == 1) {
   1450 			int waiting = 0;
   1451 			if (head->lio_mode == LIO_WAIT) {
   1452 				if ((waiting = head->lio_waiting) != 0)
   1453 					(void) cond_signal(&head->lio_cond_cv);
   1454 			} else if (head->lio_port < 0) { /* none or signal */
   1455 				if ((np.np_lio_signo = head->lio_signo) != 0)
   1456 					notify = 1;
   1457 				np.np_lio_user = head->lio_sigval.sival_ptr;
   1458 			} else {			/* thread or port */
   1459 				notify = 1;
   1460 				np.np_lio_port = head->lio_port;
   1461 				np.np_lio_event = head->lio_event;
   1462 				np.np_lio_object =
   1463 				    (uintptr_t)head->lio_sigevent;
   1464 				np.np_lio_user = head->lio_sigval.sival_ptr;
   1465 			}
   1466 			head->lio_nent = head->lio_refcnt = 0;
   1467 			sig_mutex_unlock(&head->lio_mutex);
   1468 			if (waiting == 0)
   1469 				_aio_lio_free(head);
   1470 		} else {
   1471 			head->lio_nent--;
   1472 			head->lio_refcnt--;
   1473 			sig_mutex_unlock(&head->lio_mutex);
   1474 		}
   1475 	}
   1476 
   1477 	/*
   1478 	 * The request is completed; now perform the notifications.
   1479 	 */
   1480 	if (notify) {
   1481 		if (reqp != NULL) {
   1482 			/*
   1483 			 * We usually put the request on the notification
   1484 			 * queue because we don't want to block and delay
   1485 			 * other operations behind us in the work queue.
   1486 			 * Also we must never block on a cancel notification
   1487 			 * because we are being called from an application
   1488 			 * thread in this case and that could lead to deadlock
   1489 			 * if no other thread is receiving notificatins.
   1490 			 */
   1491 			reqp->req_notify = np;
   1492 			reqp->req_op = AIONOTIFY;
   1493 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
   1494 			reqp = NULL;
   1495 		} else {
   1496 			/*
   1497 			 * We already put the request on the done queue,
   1498 			 * so we can't queue it to the notification queue.
   1499 			 * Just do the notification directly.
   1500 			 */
   1501 			send_notification(&np);
   1502 		}
   1503 	}
   1504 
   1505 	if (reqp != NULL)
   1506 		_aio_req_free(reqp);
   1507 }
   1508 
   1509 /*
   1510  * Delete fsync requests from list head until there is
   1511  * only one left.  Return 0 when there is only one,
   1512  * otherwise return a non-zero value.
   1513  */
   1514 static int
   1515 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
   1516 {
   1517 	aio_lio_t *head = reqp->req_head;
   1518 	int rval = 0;
   1519 
   1520 	ASSERT(reqp == aiowp->work_req);
   1521 	sig_mutex_lock(&aiowp->work_qlock1);
   1522 	sig_mutex_lock(&head->lio_mutex);
   1523 	if (head->lio_refcnt > 1) {
   1524 		head->lio_refcnt--;
   1525 		head->lio_nent--;
   1526 		aiowp->work_req = NULL;
   1527 		sig_mutex_unlock(&head->lio_mutex);
   1528 		sig_mutex_unlock(&aiowp->work_qlock1);
   1529 		sig_mutex_lock(&__aio_mutex);
   1530 		_aio_outstand_cnt--;
   1531 		_aio_waitn_wakeup();
   1532 		sig_mutex_unlock(&__aio_mutex);
   1533 		_aio_req_free(reqp);
   1534 		return (1);
   1535 	}
   1536 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
   1537 	reqp->req_head = NULL;
   1538 	if (head->lio_canned)
   1539 		reqp->req_state = AIO_REQ_CANCELED;
   1540 	if (head->lio_mode == LIO_DESTROY) {
   1541 		aiowp->work_req = NULL;
   1542 		rval = 1;
   1543 	}
   1544 	sig_mutex_unlock(&head->lio_mutex);
   1545 	sig_mutex_unlock(&aiowp->work_qlock1);
   1546 	head->lio_refcnt--;
   1547 	head->lio_nent--;
   1548 	_aio_lio_free(head);
   1549 	if (rval != 0)
   1550 		_aio_req_free(reqp);
   1551 	return (rval);
   1552 }
   1553 
   1554 /*
   1555  * A worker is set idle when its work queue is empty.
   1556  * The worker checks again that it has no more work
   1557  * and then goes to sleep waiting for more work.
   1558  */
   1559 int
   1560 _aio_idle(aio_worker_t *aiowp)
   1561 {
   1562 	int error = 0;
   1563 
   1564 	sig_mutex_lock(&aiowp->work_qlock1);
   1565 	if (aiowp->work_count1 == 0) {
   1566 		ASSERT(aiowp->work_minload1 == 0);
   1567 		aiowp->work_idleflg = 1;
   1568 		/*
   1569 		 * A cancellation handler is not needed here.
   1570 		 * aio worker threads are never cancelled via pthread_cancel().
   1571 		 */
   1572 		error = sig_cond_wait(&aiowp->work_idle_cv,
   1573 		    &aiowp->work_qlock1);
   1574 		/*
   1575 		 * The idle flag is normally cleared before worker is awakened
   1576 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
   1577 		 */
   1578 		if (error)
   1579 			aiowp->work_idleflg = 0;
   1580 	}
   1581 	sig_mutex_unlock(&aiowp->work_qlock1);
   1582 	return (error);
   1583 }
   1584 
   1585 /*
   1586  * A worker's completed AIO requests are placed onto a global
   1587  * done queue.  The application is only sent a SIGIO signal if
   1588  * the process has a handler enabled and it is not waiting via
   1589  * aiowait().
   1590  */
   1591 static void
   1592 _aio_work_done(aio_worker_t *aiowp)
   1593 {
   1594 	aio_req_t *reqp;
   1595 
   1596 	sig_mutex_lock(&__aio_mutex);
   1597 	sig_mutex_lock(&aiowp->work_qlock1);
   1598 	reqp = aiowp->work_prev1;
   1599 	reqp->req_next = NULL;
   1600 	aiowp->work_done1 = 0;
   1601 	aiowp->work_tail1 = aiowp->work_next1;
   1602 	if (aiowp->work_tail1 == NULL)
   1603 		aiowp->work_head1 = NULL;
   1604 	aiowp->work_prev1 = NULL;
   1605 	_aio_outstand_cnt--;
   1606 	_aio_req_done_cnt--;
   1607 	if (reqp->req_state == AIO_REQ_CANCELED) {
   1608 		/*
   1609 		 * Request got cancelled after it was marked done. This can
   1610 		 * happen because _aio_finish_request() marks it AIO_REQ_DONE
   1611 		 * and drops all locks. Don't add the request to the done
   1612 		 * queue and just discard it.
   1613 		 */
   1614 		sig_mutex_unlock(&aiowp->work_qlock1);
   1615 		_aio_req_free(reqp);
   1616 		if (_aio_outstand_cnt == 0 && _aiowait_flag) {
   1617 			sig_mutex_unlock(&__aio_mutex);
   1618 			(void) _kaio(AIONOTIFY);
   1619 		} else {
   1620 			sig_mutex_unlock(&__aio_mutex);
   1621 		}
   1622 		return;
   1623 	}
   1624 	sig_mutex_unlock(&aiowp->work_qlock1);
   1625 	_aio_donecnt++;
   1626 	ASSERT(_aio_donecnt > 0 &&
   1627 	    _aio_outstand_cnt >= 0 &&
   1628 	    _aio_req_done_cnt >= 0);
   1629 	ASSERT(reqp != NULL);
   1630 
   1631 	if (_aio_done_tail == NULL) {
   1632 		_aio_done_head = _aio_done_tail = reqp;
   1633 	} else {
   1634 		_aio_done_head->req_next = reqp;
   1635 		_aio_done_head = reqp;
   1636 	}
   1637 
   1638 	if (_aiowait_flag) {
   1639 		sig_mutex_unlock(&__aio_mutex);
   1640 		(void) _kaio(AIONOTIFY);
   1641 	} else {
   1642 		sig_mutex_unlock(&__aio_mutex);
   1643 		if (_sigio_enabled)
   1644 			(void) kill(__pid, SIGIO);
   1645 	}
   1646 }
   1647 
   1648 /*
   1649  * The done queue consists of AIO requests that are in either the
   1650  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
   1651  * are discarded.  If the done queue is empty then NULL is returned.
   1652  * Otherwise the address of a done aio_result_t is returned.
   1653  */
   1654 aio_result_t *
   1655 _aio_req_done(void)
   1656 {
   1657 	aio_req_t *reqp;
   1658 	aio_result_t *resultp;
   1659 
   1660 	ASSERT(MUTEX_HELD(&__aio_mutex));
   1661 
   1662 	if ((reqp = _aio_done_tail) != NULL) {
   1663 		if ((_aio_done_tail = reqp->req_next) == NULL)
   1664 			_aio_done_head = NULL;
   1665 		ASSERT(_aio_donecnt > 0);
   1666 		_aio_donecnt--;
   1667 		(void) _aio_hash_del(reqp->req_resultp);
   1668 		resultp = reqp->req_resultp;
   1669 		ASSERT(reqp->req_state == AIO_REQ_DONE);
   1670 		_aio_req_free(reqp);
   1671 		return (resultp);
   1672 	}
   1673 	/* is queue empty? */
   1674 	if (reqp == NULL && _aio_outstand_cnt == 0) {
   1675 		return ((aio_result_t *)-1);
   1676 	}
   1677 	return (NULL);
   1678 }
   1679 
   1680 /*
   1681  * Set the return and errno values for the application's use.
   1682  *
   1683  * For the Posix interfaces, we must set the return value first followed
   1684  * by the errno value because the Posix interfaces allow for a change
   1685  * in the errno value from EINPROGRESS to something else to signal
   1686  * the completion of the asynchronous request.
   1687  *
   1688  * The opposite is true for the Solaris interfaces.  These allow for
   1689  * a change in the return value from AIO_INPROGRESS to something else
   1690  * to signal the completion of the asynchronous request.
   1691  */
   1692 void
   1693 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
   1694 {
   1695 	aio_result_t *resultp = reqp->req_resultp;
   1696 
   1697 	if (POSIX_AIO(reqp)) {
   1698 		resultp->aio_return = retval;
   1699 		membar_producer();
   1700 		resultp->aio_errno = error;
   1701 	} else {
   1702 		resultp->aio_errno = error;
   1703 		membar_producer();
   1704 		resultp->aio_return = retval;
   1705 	}
   1706 }
   1707 
   1708 /*
   1709  * Add an AIO request onto the next work queue.
   1710  * A circular list of workers is used to choose the next worker.
   1711  */
   1712 void
   1713 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
   1714 {
   1715 	ulwp_t *self = curthread;
   1716 	aio_worker_t *aiowp;
   1717 	aio_worker_t *first;
   1718 	int load_bal_flg = 1;
   1719 	int found;
   1720 
   1721 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
   1722 	reqp->req_next = NULL;
   1723 	/*
   1724 	 * Try to acquire the next worker's work queue.  If it is locked,
   1725 	 * then search the list of workers until a queue is found unlocked,
   1726 	 * or until the list is completely traversed at which point another
   1727 	 * worker will be created.
   1728 	 */
   1729 	sigoff(self);		/* defer SIGIO */
   1730 	sig_mutex_lock(&__aio_mutex);
   1731 	first = aiowp = *nextworker;
   1732 	if (mode != AIONOTIFY)
   1733 		_aio_outstand_cnt++;
   1734 	sig_mutex_unlock(&__aio_mutex);
   1735 
   1736 	switch (mode) {
   1737 	case AIOREAD:
   1738 	case AIOWRITE:
   1739 	case AIOAREAD:
   1740 	case AIOAWRITE:
   1741 #if !defined(_LP64)
   1742 	case AIOAREAD64:
   1743 	case AIOAWRITE64:
   1744 #endif
   1745 		/* try to find an idle worker */
   1746 		found = 0;
   1747 		do {
   1748 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
   1749 				if (aiowp->work_idleflg) {
   1750 					found = 1;
   1751 					break;
   1752 				}
   1753 				sig_mutex_unlock(&aiowp->work_qlock1);
   1754 			}
   1755 		} while ((aiowp = aiowp->work_forw) != first);
   1756 
   1757 		if (found) {
   1758 			aiowp->work_minload1++;
   1759 			break;
   1760 		}
   1761 
   1762 		/* try to acquire some worker's queue lock */
   1763 		do {
   1764 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
   1765 				found = 1;
   1766 				break;
   1767 			}
   1768 		} while ((aiowp = aiowp->work_forw) != first);
   1769 
   1770 		/*
   1771 		 * Create more workers when the workers appear overloaded.
   1772 		 * Either all the workers are busy draining their queues
   1773 		 * or no worker's queue lock could be acquired.
   1774 		 */
   1775 		if (!found) {
   1776 			if (_aio_worker_cnt < _max_workers) {
   1777 				if (_aio_create_worker(reqp, mode))
   1778 					aio_panic("_aio_req_add: add worker");
   1779 				sigon(self);	/* reenable SIGIO */
   1780 				return;
   1781 			}
   1782 
   1783 			/*
   1784 			 * No worker available and we have created
   1785 			 * _max_workers, keep going through the
   1786 			 * list slowly until we get a lock
   1787 			 */
   1788 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
   1789 				/*
   1790 				 * give someone else a chance
   1791 				 */
   1792 				_aio_delay(1);
   1793 				aiowp = aiowp->work_forw;
   1794 			}
   1795 		}
   1796 
   1797 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
   1798 		if (_aio_worker_cnt < _max_workers &&
   1799 		    aiowp->work_minload1 >= _minworkload) {
   1800 			sig_mutex_unlock(&aiowp->work_qlock1);
   1801 			sig_mutex_lock(&__aio_mutex);
   1802 			*nextworker = aiowp->work_forw;
   1803 			sig_mutex_unlock(&__aio_mutex);
   1804 			if (_aio_create_worker(reqp, mode))
   1805 				aio_panic("aio_req_add: add worker");
   1806 			sigon(self);	/* reenable SIGIO */
   1807 			return;
   1808 		}
   1809 		aiowp->work_minload1++;
   1810 		break;
   1811 	case AIOFSYNC:
   1812 	case AIONOTIFY:
   1813 		load_bal_flg = 0;
   1814 		sig_mutex_lock(&aiowp->work_qlock1);
   1815 		break;
   1816 	default:
   1817 		aio_panic("_aio_req_add: invalid mode");
   1818 		break;
   1819 	}
   1820 	/*
   1821 	 * Put request onto worker's work queue.
   1822 	 */
   1823 	if (aiowp->work_tail1 == NULL) {
   1824 		ASSERT(aiowp->work_count1 == 0);
   1825 		aiowp->work_tail1 = reqp;
   1826 		aiowp->work_next1 = reqp;
   1827 	} else {
   1828 		aiowp->work_head1->req_next = reqp;
   1829 		if (aiowp->work_next1 == NULL)
   1830 			aiowp->work_next1 = reqp;
   1831 	}
   1832 	reqp->req_state = AIO_REQ_QUEUED;
   1833 	reqp->req_worker = aiowp;
   1834 	aiowp->work_head1 = reqp;
   1835 	/*
   1836 	 * Awaken worker if it is not currently active.
   1837 	 */
   1838 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
   1839 		aiowp->work_idleflg = 0;
   1840 		(void) cond_signal(&aiowp->work_idle_cv);
   1841 	}
   1842 	sig_mutex_unlock(&aiowp->work_qlock1);
   1843 
   1844 	if (load_bal_flg) {
   1845 		sig_mutex_lock(&__aio_mutex);
   1846 		*nextworker = aiowp->work_forw;
   1847 		sig_mutex_unlock(&__aio_mutex);
   1848 	}
   1849 	sigon(self);	/* reenable SIGIO */
   1850 }
   1851 
   1852 /*
   1853  * Get an AIO request for a specified worker.
   1854  * If the work queue is empty, return NULL.
   1855  */
   1856 aio_req_t *
   1857 _aio_req_get(aio_worker_t *aiowp)
   1858 {
   1859 	aio_req_t *reqp;
   1860 
   1861 	sig_mutex_lock(&aiowp->work_qlock1);
   1862 	if ((reqp = aiowp->work_next1) != NULL) {
   1863 		/*
   1864 		 * Remove a POSIX request from the queue; the
   1865 		 * request queue is a singularly linked list
   1866 		 * with a previous pointer.  The request is
   1867 		 * removed by updating the previous pointer.
   1868 		 *
   1869 		 * Non-posix requests are left on the queue
   1870 		 * to eventually be placed on the done queue.
   1871 		 */
   1872 
   1873 		if (POSIX_AIO(reqp)) {
   1874 			if (aiowp->work_prev1 == NULL) {
   1875 				aiowp->work_tail1 = reqp->req_next;
   1876 				if (aiowp->work_tail1 == NULL)
   1877 					aiowp->work_head1 = NULL;
   1878 			} else {
   1879 				aiowp->work_prev1->req_next = reqp->req_next;
   1880 				if (aiowp->work_head1 == reqp)
   1881 					aiowp->work_head1 = reqp->req_next;
   1882 			}
   1883 
   1884 		} else {
   1885 			aiowp->work_prev1 = reqp;
   1886 			ASSERT(aiowp->work_done1 >= 0);
   1887 			aiowp->work_done1++;
   1888 		}
   1889 		ASSERT(reqp != reqp->req_next);
   1890 		aiowp->work_next1 = reqp->req_next;
   1891 		ASSERT(aiowp->work_count1 >= 1);
   1892 		aiowp->work_count1--;
   1893 		switch (reqp->req_op) {
   1894 		case AIOREAD:
   1895 		case AIOWRITE:
   1896 		case AIOAREAD:
   1897 		case AIOAWRITE:
   1898 #if !defined(_LP64)
   1899 		case AIOAREAD64:
   1900 		case AIOAWRITE64:
   1901 #endif
   1902 			ASSERT(aiowp->work_minload1 > 0);
   1903 			aiowp->work_minload1--;
   1904 			break;
   1905 		}
   1906 		reqp->req_state = AIO_REQ_INPROGRESS;
   1907 	}
   1908 	aiowp->work_req = reqp;
   1909 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
   1910 	sig_mutex_unlock(&aiowp->work_qlock1);
   1911 	return (reqp);
   1912 }
   1913 
   1914 static void
   1915 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
   1916 {
   1917 	aio_req_t **last;
   1918 	aio_req_t *lastrp;
   1919 	aio_req_t *next;
   1920 
   1921 	ASSERT(aiowp != NULL);
   1922 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
   1923 	if (POSIX_AIO(reqp)) {
   1924 		if (ostate != AIO_REQ_QUEUED)
   1925 			return;
   1926 	}
   1927 	last = &aiowp->work_tail1;
   1928 	lastrp = aiowp->work_tail1;
   1929 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
   1930 	while ((next = *last) != NULL) {
   1931 		if (next == reqp) {
   1932 			*last = next->req_next;
   1933 			if (aiowp->work_next1 == next)
   1934 				aiowp->work_next1 = next->req_next;
   1935 
   1936 			/*
   1937 			 * if this is the first request on the queue, move
   1938 			 * the lastrp pointer forward.
   1939 			 */
   1940 			if (lastrp == next)
   1941 				lastrp = next->req_next;
   1942 
   1943 			/*
   1944 			 * if this request is pointed by work_head1, then
   1945 			 * make work_head1 point to the last request that is
   1946 			 * present on the queue.
   1947 			 */
   1948 			if (aiowp->work_head1 == next)
   1949 				aiowp->work_head1 = lastrp;
   1950 
   1951 			/*
   1952 			 * work_prev1 is used only in non posix case and it
   1953 			 * points to the current AIO_REQ_INPROGRESS request.
   1954 			 * If work_prev1 points to this request which is being
   1955 			 * deleted, make work_prev1 NULL and set  work_done1
   1956 			 * to 0.
   1957 			 *
   1958 			 * A worker thread can be processing only one request
   1959 			 * at a time.
   1960 			 */
   1961 			if (aiowp->work_prev1 == next) {
   1962 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
   1963 				    !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
   1964 					aiowp->work_prev1 = NULL;
   1965 					aiowp->work_done1--;
   1966 			}
   1967 
   1968 			if (ostate == AIO_REQ_QUEUED) {
   1969 				ASSERT(aiowp->work_count1 >= 1);
   1970 				aiowp->work_count1--;
   1971 				ASSERT(aiowp->work_minload1 >= 1);
   1972 				aiowp->work_minload1--;
   1973 			}
   1974 			return;
   1975 		}
   1976 		last = &next->req_next;
   1977 		lastrp = next;
   1978 	}
   1979 	/* NOTREACHED */
   1980 }
   1981 
   1982 static void
   1983 _aio_enq_doneq(aio_req_t *reqp)
   1984 {
   1985 	if (_aio_doneq == NULL) {
   1986 		_aio_doneq = reqp;
   1987 		reqp->req_next = reqp->req_prev = reqp;
   1988 	} else {
   1989 		reqp->req_next = _aio_doneq;
   1990 		reqp->req_prev = _aio_doneq->req_prev;
   1991 		_aio_doneq->req_prev->req_next = reqp;
   1992 		_aio_doneq->req_prev = reqp;
   1993 	}
   1994 	reqp->req_state = AIO_REQ_DONEQ;
   1995 	_aio_doneq_cnt++;
   1996 }
   1997 
   1998 /*
   1999  * caller owns the _aio_mutex
   2000  */
   2001 aio_req_t *
   2002 _aio_req_remove(aio_req_t *reqp)
   2003 {
   2004 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
   2005 		return (NULL);
   2006 
   2007 	if (reqp) {
   2008 		/* request in done queue */
   2009 		if (_aio_doneq == reqp)
   2010 			_aio_doneq = reqp->req_next;
   2011 		if (_aio_doneq == reqp) {
   2012 			/* only one request on queue */
   2013 			_aio_doneq = NULL;
   2014 		} else {
   2015 			aio_req_t *tmp = reqp->req_next;
   2016 			reqp->req_prev->req_next = tmp;
   2017 			tmp->req_prev = reqp->req_prev;
   2018 		}
   2019 	} else if ((reqp = _aio_doneq) != NULL) {
   2020 		if (reqp == reqp->req_next) {
   2021 			/* only one request on queue */
   2022 			_aio_doneq = NULL;
   2023 		} else {
   2024 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
   2025 			_aio_doneq->req_prev = reqp->req_prev;
   2026 		}
   2027 	}
   2028 	if (reqp) {
   2029 		_aio_doneq_cnt--;
   2030 		reqp->req_next = reqp->req_prev = reqp;
   2031 		reqp->req_state = AIO_REQ_DONE;
   2032 	}
   2033 	return (reqp);
   2034 }
   2035 
   2036 /*
   2037  * An AIO request is identified by an aio_result_t pointer.  The library
   2038  * maps this aio_result_t pointer to its internal representation using a
   2039  * hash table.  This function adds an aio_result_t pointer to the hash table.
   2040  */
   2041 static int
   2042 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
   2043 {
   2044 	aio_hash_t *hashp;
   2045 	aio_req_t **prev;
   2046 	aio_req_t *next;
   2047 
   2048 	hashp = _aio_hash + AIOHASH(resultp);
   2049 	lmutex_lock(&hashp->hash_lock);
   2050 	prev = &hashp->hash_ptr;
   2051 	while ((next = *prev) != NULL) {
   2052 		if (resultp == next->req_resultp) {
   2053 			lmutex_unlock(&hashp->hash_lock);
   2054 			return (-1);
   2055 		}
   2056 		prev = &next->req_link;
   2057 	}
   2058 	*prev = reqp;
   2059 	ASSERT(reqp->req_link == NULL);
   2060 	lmutex_unlock(&hashp->hash_lock);
   2061 	return (0);
   2062 }
   2063 
   2064 /*
   2065  * Remove an entry from the hash table.
   2066  */
   2067 aio_req_t *
   2068 _aio_hash_del(aio_result_t *resultp)
   2069 {
   2070 	aio_hash_t *hashp;
   2071 	aio_req_t **prev;
   2072 	aio_req_t *next = NULL;
   2073 
   2074 	if (_aio_hash != NULL) {
   2075 		hashp = _aio_hash + AIOHASH(resultp);
   2076 		lmutex_lock(&hashp->hash_lock);
   2077 		prev = &hashp->hash_ptr;
   2078 		while ((next = *prev) != NULL) {
   2079 			if (resultp == next->req_resultp) {
   2080 				*prev = next->req_link;
   2081 				next->req_link = NULL;
   2082 				break;
   2083 			}
   2084 			prev = &next->req_link;
   2085 		}
   2086 		lmutex_unlock(&hashp->hash_lock);
   2087 	}
   2088 	return (next);
   2089 }
   2090 
   2091 /*
   2092  *  find an entry in the hash table
   2093  */
   2094 aio_req_t *
   2095 _aio_hash_find(aio_result_t *resultp)
   2096 {
   2097 	aio_hash_t *hashp;
   2098 	aio_req_t **prev;
   2099 	aio_req_t *next = NULL;
   2100 
   2101 	if (_aio_hash != NULL) {
   2102 		hashp = _aio_hash + AIOHASH(resultp);
   2103 		lmutex_lock(&hashp->hash_lock);
   2104 		prev = &hashp->hash_ptr;
   2105 		while ((next = *prev) != NULL) {
   2106 			if (resultp == next->req_resultp)
   2107 				break;
   2108 			prev = &next->req_link;
   2109 		}
   2110 		lmutex_unlock(&hashp->hash_lock);
   2111 	}
   2112 	return (next);
   2113 }
   2114 
   2115 /*
   2116  * AIO interface for POSIX
   2117  */
   2118 int
   2119 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
   2120     int mode, int flg)
   2121 {
   2122 	aio_req_t *reqp;
   2123 	aio_args_t *ap;
   2124 	int kerr;
   2125 
   2126 	if (aiocbp == NULL) {
   2127 		errno = EINVAL;
   2128 		return (-1);
   2129 	}
   2130 
   2131 	/* initialize kaio */
   2132 	if (!_kaio_ok)
   2133 		_kaio_init();
   2134 
   2135 	aiocbp->aio_state = NOCHECK;
   2136 
   2137 	/*
   2138 	 * If we have been called because a list I/O
   2139 	 * kaio() failed, we dont want to repeat the
   2140 	 * system call
   2141 	 */
   2142 
   2143 	if (flg & AIO_KAIO) {
   2144 		/*
   2145 		 * Try kernel aio first.
   2146 		 * If errno is ENOTSUP/EBADFD,
   2147 		 * fall back to the thread implementation.
   2148 		 */
   2149 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
   2150 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2151 			aiocbp->aio_state = CHECK;
   2152 			kerr = (int)_kaio(mode, aiocbp);
   2153 			if (kerr == 0)
   2154 				return (0);
   2155 			if (errno != ENOTSUP && errno != EBADFD) {
   2156 				aiocbp->aio_resultp.aio_errno = errno;
   2157 				aiocbp->aio_resultp.aio_return = -1;
   2158 				aiocbp->aio_state = NOCHECK;
   2159 				return (-1);
   2160 			}
   2161 			if (errno == EBADFD)
   2162 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
   2163 		}
   2164 	}
   2165 
   2166 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2167 	aiocbp->aio_state = USERAIO;
   2168 
   2169 	if (!__uaio_ok && __uaio_init() == -1)
   2170 		return (-1);
   2171 
   2172 	if ((reqp = _aio_req_alloc()) == NULL) {
   2173 		errno = EAGAIN;
   2174 		return (-1);
   2175 	}
   2176 
   2177 	/*
   2178 	 * If an LIO request, add the list head to the aio request
   2179 	 */
   2180 	reqp->req_head = lio_head;
   2181 	reqp->req_type = AIO_POSIX_REQ;
   2182 	reqp->req_op = mode;
   2183 	reqp->req_largefile = 0;
   2184 
   2185 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
   2186 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
   2187 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
   2188 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
   2189 		reqp->req_sigevent.sigev_signo =
   2190 		    aiocbp->aio_sigevent.sigev_signo;
   2191 		reqp->req_sigevent.sigev_value.sival_ptr =
   2192 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2193 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
   2194 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2195 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
   2196 		/*
   2197 		 * Reuse the sigevent structure to contain the port number
   2198 		 * and the user value.  Same for SIGEV_THREAD, below.
   2199 		 */
   2200 		reqp->req_sigevent.sigev_signo =
   2201 		    pn->portnfy_port;
   2202 		reqp->req_sigevent.sigev_value.sival_ptr =
   2203 		    pn->portnfy_user;
   2204 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
   2205 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
   2206 		/*
   2207 		 * The sigevent structure contains the port number
   2208 		 * and the user value.  Same for SIGEV_PORT, above.
   2209 		 */
   2210 		reqp->req_sigevent.sigev_signo =
   2211 		    aiocbp->aio_sigevent.sigev_signo;
   2212 		reqp->req_sigevent.sigev_value.sival_ptr =
   2213 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2214 	}
   2215 
   2216 	reqp->req_resultp = &aiocbp->aio_resultp;
   2217 	reqp->req_aiocbp = aiocbp;
   2218 	ap = &reqp->req_args;
   2219 	ap->fd = aiocbp->aio_fildes;
   2220 	ap->buf = (caddr_t)aiocbp->aio_buf;
   2221 	ap->bufsz = aiocbp->aio_nbytes;
   2222 	ap->offset = aiocbp->aio_offset;
   2223 
   2224 	if ((flg & AIO_NO_DUPS) &&
   2225 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
   2226 		aio_panic("_aio_rw(): request already in hash table");
   2227 		_aio_req_free(reqp);
   2228 		errno = EINVAL;
   2229 		return (-1);
   2230 	}
   2231 	_aio_req_add(reqp, nextworker, mode);
   2232 	return (0);
   2233 }
   2234 
   2235 #if !defined(_LP64)
   2236 /*
   2237  * 64-bit AIO interface for POSIX
   2238  */
   2239 int
   2240 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
   2241     int mode, int flg)
   2242 {
   2243 	aio_req_t *reqp;
   2244 	aio_args_t *ap;
   2245 	int kerr;
   2246 
   2247 	if (aiocbp == NULL) {
   2248 		errno = EINVAL;
   2249 		return (-1);
   2250 	}
   2251 
   2252 	/* initialize kaio */
   2253 	if (!_kaio_ok)
   2254 		_kaio_init();
   2255 
   2256 	aiocbp->aio_state = NOCHECK;
   2257 
   2258 	/*
   2259 	 * If we have been called because a list I/O
   2260 	 * kaio() failed, we dont want to repeat the
   2261 	 * system call
   2262 	 */
   2263 
   2264 	if (flg & AIO_KAIO) {
   2265 		/*
   2266 		 * Try kernel aio first.
   2267 		 * If errno is ENOTSUP/EBADFD,
   2268 		 * fall back to the thread implementation.
   2269 		 */
   2270 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
   2271 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2272 			aiocbp->aio_state = CHECK;
   2273 			kerr = (int)_kaio(mode, aiocbp);
   2274 			if (kerr == 0)
   2275 				return (0);
   2276 			if (errno != ENOTSUP && errno != EBADFD) {
   2277 				aiocbp->aio_resultp.aio_errno = errno;
   2278 				aiocbp->aio_resultp.aio_return = -1;
   2279 				aiocbp->aio_state = NOCHECK;
   2280 				return (-1);
   2281 			}
   2282 			if (errno == EBADFD)
   2283 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
   2284 		}
   2285 	}
   2286 
   2287 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
   2288 	aiocbp->aio_state = USERAIO;
   2289 
   2290 	if (!__uaio_ok && __uaio_init() == -1)
   2291 		return (-1);
   2292 
   2293 	if ((reqp = _aio_req_alloc()) == NULL) {
   2294 		errno = EAGAIN;
   2295 		return (-1);
   2296 	}
   2297 
   2298 	/*
   2299 	 * If an LIO request, add the list head to the aio request
   2300 	 */
   2301 	reqp->req_head = lio_head;
   2302 	reqp->req_type = AIO_POSIX_REQ;
   2303 	reqp->req_op = mode;
   2304 	reqp->req_largefile = 1;
   2305 
   2306 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
   2307 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
   2308 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
   2309 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
   2310 		reqp->req_sigevent.sigev_signo =
   2311 		    aiocbp->aio_sigevent.sigev_signo;
   2312 		reqp->req_sigevent.sigev_value.sival_ptr =
   2313 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2314 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
   2315 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2316 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
   2317 		reqp->req_sigevent.sigev_signo =
   2318 		    pn->portnfy_port;
   2319 		reqp->req_sigevent.sigev_value.sival_ptr =
   2320 		    pn->portnfy_user;
   2321 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
   2322 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
   2323 		reqp->req_sigevent.sigev_signo =
   2324 		    aiocbp->aio_sigevent.sigev_signo;
   2325 		reqp->req_sigevent.sigev_value.sival_ptr =
   2326 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
   2327 	}
   2328 
   2329 	reqp->req_resultp = &aiocbp->aio_resultp;
   2330 	reqp->req_aiocbp = aiocbp;
   2331 	ap = &reqp->req_args;
   2332 	ap->fd = aiocbp->aio_fildes;
   2333 	ap->buf = (caddr_t)aiocbp->aio_buf;
   2334 	ap->bufsz = aiocbp->aio_nbytes;
   2335 	ap->offset = aiocbp->aio_offset;
   2336 
   2337 	if ((flg & AIO_NO_DUPS) &&
   2338 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
   2339 		aio_panic("_aio_rw64(): request already in hash table");
   2340 		_aio_req_free(reqp);
   2341 		errno = EINVAL;
   2342 		return (-1);
   2343 	}
   2344 	_aio_req_add(reqp, nextworker, mode);
   2345 	return (0);
   2346 }
   2347 #endif	/* !defined(_LP64) */
   2348