Home | History | Annotate | Download | only in diskomizer
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 
     22 /*
     23  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     24  * Use is subject to license terms.
     25  */
     26 
     27 #pragma ident	"@(#)async.c	1.8	09/05/26 SMI"
     28 
     29 #include <stdlib.h>
     30 #include <unistd.h>
     31 #include <errno.h>
     32 #include <sys/types.h>
     33 #include <sys/stat.h>
     34 #include <fcntl.h>
     35 #include <sys/mman.h>
     36 #include <signal.h>
     37 #include <pthread.h>
     38 #include <diskomizer/log.h>
     39 
     40 #include "async.h"
     41 #include "signal_catch.h"
     42 
     43 /*
     44  * General routines for providing aync operations.
     45  *
     46  * Async operations get passed in as async_work structures. A job
     47  * object is found or created that contains a thread to run the job and that
     48  * thread then runs the job and puts the result on the done_queue from
     49  * whence it can be reaped by async_queue_wait(); meanwhile the job
     50  * puts itself on the queue of free jobs so that it can run the next
     51  * job.
     52  */
     53 /*
     54  * Still to do:
     55  * 	cancellation is not implemented.
     56  * 	unemployed jobs should exit after a while
     57  */
     58 
     59 /*
     60  * madvise flags. Not defined on 5.6 and 5.7 which so have to be added
     61  * here.  On older releases the use of the flags should just give an error
     62  */
     63 #ifndef MADV_FREE
     64 #define	MADV_FREE 0x5
     65 #endif
     66 
     67 #ifndef MADV_ACCESS_LWP
     68 #define	MADV_ACCESS_LWP 0x7
     69 #endif
     70 
     71 #define	roundup(x, y) ((((x)+((y)-1))/(y))*(y))
     72 #define	PTHREAD_STACK_MIN sysconf(_SC_THREAD_STACK_MIN)
     73 
     74 enum state {
     75 	READY_TO_RUN,
     76 	COMPLETED
     77 };
     78 struct jobs {
     79 	pthread_mutex_t mlock;
     80 	pthread_cond_t cv;
     81 	pthread_t thread;
     82 	enum state state;
     83 	struct jobs *next;
     84 	struct async_work *work;
     85 };
     86 
     87 struct queue {
     88 	pthread_mutex_t mlock;
     89 	pthread_cond_t cv;
     90 	struct jobs *head;
     91 	struct jobs *tail;
     92 };
     93 struct work_done {
     94 	pthread_mutex_t mlock;
     95 	pthread_cond_t cv;
     96 	struct async_work *head;
     97 	struct async_work *tail;
     98 };
     99 
    100 struct thr_info {
    101 	pthread_mutex_t mlock;
    102 	int thr_free;
    103 	int thr_running;
    104 };
    105 static struct queue unemployed;
    106 static struct work_done work_done;
    107 
    108 /*
    109  * Job queues.
    110  *
    111  * There are three routines for managing the job queues, 2 for adding and
    112  * one for taking off the queue.
    113  *
    114  * Jobs are always removed from the head of the queue.
    115  *
    116  * For a FIFO queue jobs are added to the tail of the queue by add_to_job_fifo
    117  * and for LIFO they are added to the head of the queue by add_to_job_lifo.
    118  */
    119 #ifdef FIFO_NEEDED
    120 static void
    121 add_to_job_fifo(struct queue *q, struct jobs *job)
    122 {
    123 	pthread_mutex_lock(&q->mlock);
    124 	if (q->head == NULL) {
    125 		q->head = q->tail = job;
    126 	} else {
    127 		q->tail->next = job;
    128 		q->tail = job;
    129 	}
    130 	job->next = NULL;
    131 	pthread_mutex_unlock(&q->mlock);
    132 }
    133 #endif
    134 static void
    135 add_to_job_lifo(struct queue *q, struct jobs *job)
    136 {
    137 	pthread_mutex_lock(&q->mlock);
    138 	if (q->head == NULL) {
    139 		q->head = q->tail = job;
    140 		job->next = NULL;
    141 	} else {
    142 		job->next = q->head;
    143 		q->head = job;
    144 	}
    145 	pthread_mutex_unlock(&q->mlock);
    146 }
    147 
    148 static struct jobs *
    149 get_job(struct queue *q)
    150 {
    151 	struct jobs *job;
    152 	pthread_mutex_lock(&q->mlock);
    153 	job = q->head;
    154 	if (job != NULL) {
    155 		q->head = job->next;
    156 		if (job == q->tail)
    157 			q->tail = NULL;
    158 		job->next = NULL;
    159 	}
    160 	pthread_mutex_unlock(&q->mlock);
    161 	return (job);
    162 }
    163 static void
    164 add_to_done(struct async_work *work)
    165 {
    166 	pthread_mutex_lock(&work_done.mlock);
    167 	if (work_done.head == NULL) {
    168 		work_done.head = work_done.tail = work;
    169 		pthread_cond_signal(&work_done.cv);
    170 	} else {
    171 		work_done.tail->u.next = work;
    172 		work_done.tail = work;
    173 	}
    174 	work->flags |= ASYNCWORK_COMPLETE;
    175 	work->u.next = NULL;
    176 	pthread_mutex_unlock(&work_done.mlock);
    177 }
    178 static void
    179 do_work(struct async_work *work)
    180 {
    181 	work->result = work->u.funcs.func(work->arg);
    182 	add_to_done(work);
    183 }
    184 static void *
    185 worker(void *arg)
    186 {
    187 	struct jobs *job = arg;
    188 #ifdef __lint
    189 	int err = 0;
    190 #endif
    191 
    192 	for (;;) {
    193 		while (pthread_mutex_lock(&job->mlock) != 0)
    194 #ifdef __lint
    195 			err++;
    196 #else
    197 			;
    198 #endif
    199 		while (job->state != READY_TO_RUN) {
    200 			pthread_cond_wait(&job->cv, &job->mlock);
    201 		}
    202 		do_work(job->work);
    203 		job->work = NULL;
    204 		job->state = COMPLETED;
    205 		pthread_mutex_unlock(&job->mlock);
    206 		add_to_job_lifo(&unemployed, job);
    207 	}
    208 	/*NOTREACHED*/
    209 }
    210 
    211 void *
    212 create_stack(size_t len)
    213 {
    214 	int fd;
    215 	int redzone_len = getpagesize();
    216 	char *addr;
    217 
    218 	if ((fd = open("/dev/zero", O_RDWR)) == -1)
    219 		return (NULL);
    220 
    221 	addr = mmap(NULL, len + redzone_len, PROT_READ|PROT_WRITE,
    222 	    MAP_PRIVATE, fd, 0);
    223 	close(fd);
    224 	if (addr == MAP_FAILED)
    225 		return (NULL);
    226 
    227 	madvise(addr, redzone_len, MADV_DONTNEED);
    228 	madvise(addr, redzone_len, MADV_FREE);
    229 	mprotect(addr, redzone_len, PROT_NONE);
    230 
    231 	madvise(addr + redzone_len, len, MADV_ACCESS_LWP);
    232 
    233 	expect_signal(SIGSEGV, "Thread Buffer Overflow\n", addr, redzone_len);
    234 
    235 	plog(LOG_DEBUG, "Stack %p\n", addr + redzone_len);
    236 	return ((void *)(addr + redzone_len));
    237 }
    238 
    239 /*
    240  * create_job.  Creates a new job structure and its thread.
    241  */
    242 static struct jobs *
    243 create_job(void)
    244 {
    245 	void *stack;
    246 	struct jobs *job;
    247 	pthread_attr_t attr;
    248 	int err;
    249 	size_t stack_size = roundup(getpagesize() + PTHREAD_STACK_MIN,
    250 	    getpagesize());
    251 
    252 	if ((job = malloc(sizeof (struct jobs))) == NULL)
    253 		return (NULL);
    254 
    255 	if ((stack = create_stack(stack_size)) == NULL) {
    256 		free(job);
    257 		return (NULL);
    258 	}
    259 	pthread_mutex_init(&job->mlock, NULL);
    260 	pthread_cond_init(&job->cv, NULL);
    261 	job->next = NULL;
    262 	job->state = COMPLETED;
    263 	pthread_attr_init(&attr);
    264 	pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
    265 	pthread_attr_setstacksize(&attr, stack_size);
    266 	pthread_attr_setstackaddr(&attr, stack);
    267 	err = pthread_create(&job->thread, &attr, worker, job);
    268 	if (err != 0) {
    269 		pthread_cond_destroy(&job->cv);
    270 		pthread_mutex_destroy(&job->mlock);
    271 		free(job);
    272 		errno = err;
    273 		return (NULL);
    274 	}
    275 	return (job);
    276 }
    277 
    278 static struct jobs *
    279 get_free_job(void)
    280 {
    281 	struct jobs *job;
    282 
    283 	if ((job = get_job(&unemployed)) == NULL) {
    284 		job = create_job();
    285 	}
    286 	return (job);
    287 }
    288 
    289 void *
    290 async_queue_request(struct async_work *work)
    291 {
    292 	struct jobs *job;
    293 
    294 	if ((job = get_free_job()) == NULL)
    295 		return (NULL);
    296 	pthread_mutex_lock(&job->mlock);
    297 	work->flags &= (~ASYNCWORK_COMPLETE);
    298 	job->work = work;
    299 	job->state = READY_TO_RUN;
    300 	pthread_cond_signal(&job->cv);
    301 	pthread_mutex_unlock(&job->mlock);
    302 	return (job);
    303 }
    304 /*ARGSUSED*/
    305 struct async_work *
    306 async_queue_wait(void *handle, const struct timespec *abstime, int flags)
    307 {
    308 	struct async_work *work;
    309 	pthread_mutex_lock(&work_done.mlock);
    310 	while (work_done.head == NULL) {
    311 		if (abstime == NULL) {
    312 			pthread_cond_wait(&work_done.cv, &work_done.mlock);
    313 		} else {
    314 			int x;
    315 			x = pthread_cond_timedwait(&work_done.cv,
    316 			    &work_done.mlock, abstime);
    317 			if (x == ETIMEDOUT) {
    318 				pthread_mutex_unlock(&work_done.mlock);
    319 				return (NULL);
    320 			}
    321 		}
    322 	}
    323 	work = work_done.head;
    324 	work_done.head = work->u.next;
    325 	if (work_done.tail == work) {
    326 		work_done.tail = NULL;
    327 	}
    328 	work->u.next = NULL;
    329 	pthread_mutex_unlock(&work_done.mlock);
    330 	return (work);
    331 }
    332 
    333 void
    334 find_work(struct async_work *work)
    335 {
    336 	struct async_work *w;
    337 
    338 	for (w = work_done.head; w != NULL; w = w->u.next) {
    339 		if (w == work)
    340 			abort();
    341 	}
    342 }
    343 int
    344 async_cancel_request(struct async_work *work, int flags)
    345 {
    346 	int ret;
    347 	pthread_mutex_lock(&work_done.mlock);
    348 	if (work->flags & ASYNCWORK_COMPLETE) {
    349 		ret = -1;
    350 	} else {
    351 		if (work->u.funcs.cancel) {
    352 			ret = work->u.funcs.cancel(work->arg, flags);
    353 		} else {
    354 			find_work(work);
    355 			abort();
    356 		}
    357 	}
    358 	pthread_mutex_unlock(&work_done.mlock);
    359 	return (ret);
    360 }
    361