1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22 /* 23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "@(#)async.c 1.8 09/05/26 SMI" 28 29 #include <stdlib.h> 30 #include <unistd.h> 31 #include <errno.h> 32 #include <sys/types.h> 33 #include <sys/stat.h> 34 #include <fcntl.h> 35 #include <sys/mman.h> 36 #include <signal.h> 37 #include <pthread.h> 38 #include <diskomizer/log.h> 39 40 #include "async.h" 41 #include "signal_catch.h" 42 43 /* 44 * General routines for providing aync operations. 45 * 46 * Async operations get passed in as async_work structures. A job 47 * object is found or created that contains a thread to run the job and that 48 * thread then runs the job and puts the result on the done_queue from 49 * whence it can be reaped by async_queue_wait(); meanwhile the job 50 * puts itself on the queue of free jobs so that it can run the next 51 * job. 52 */ 53 /* 54 * Still to do: 55 * cancellation is not implemented. 56 * unemployed jobs should exit after a while 57 */ 58 59 /* 60 * madvise flags. Not defined on 5.6 and 5.7 which so have to be added 61 * here. On older releases the use of the flags should just give an error 62 */ 63 #ifndef MADV_FREE 64 #define MADV_FREE 0x5 65 #endif 66 67 #ifndef MADV_ACCESS_LWP 68 #define MADV_ACCESS_LWP 0x7 69 #endif 70 71 #define roundup(x, y) ((((x)+((y)-1))/(y))*(y)) 72 #define PTHREAD_STACK_MIN sysconf(_SC_THREAD_STACK_MIN) 73 74 enum state { 75 READY_TO_RUN, 76 COMPLETED 77 }; 78 struct jobs { 79 pthread_mutex_t mlock; 80 pthread_cond_t cv; 81 pthread_t thread; 82 enum state state; 83 struct jobs *next; 84 struct async_work *work; 85 }; 86 87 struct queue { 88 pthread_mutex_t mlock; 89 pthread_cond_t cv; 90 struct jobs *head; 91 struct jobs *tail; 92 }; 93 struct work_done { 94 pthread_mutex_t mlock; 95 pthread_cond_t cv; 96 struct async_work *head; 97 struct async_work *tail; 98 }; 99 100 struct thr_info { 101 pthread_mutex_t mlock; 102 int thr_free; 103 int thr_running; 104 }; 105 static struct queue unemployed; 106 static struct work_done work_done; 107 108 /* 109 * Job queues. 110 * 111 * There are three routines for managing the job queues, 2 for adding and 112 * one for taking off the queue. 113 * 114 * Jobs are always removed from the head of the queue. 115 * 116 * For a FIFO queue jobs are added to the tail of the queue by add_to_job_fifo 117 * and for LIFO they are added to the head of the queue by add_to_job_lifo. 118 */ 119 #ifdef FIFO_NEEDED 120 static void 121 add_to_job_fifo(struct queue *q, struct jobs *job) 122 { 123 pthread_mutex_lock(&q->mlock); 124 if (q->head == NULL) { 125 q->head = q->tail = job; 126 } else { 127 q->tail->next = job; 128 q->tail = job; 129 } 130 job->next = NULL; 131 pthread_mutex_unlock(&q->mlock); 132 } 133 #endif 134 static void 135 add_to_job_lifo(struct queue *q, struct jobs *job) 136 { 137 pthread_mutex_lock(&q->mlock); 138 if (q->head == NULL) { 139 q->head = q->tail = job; 140 job->next = NULL; 141 } else { 142 job->next = q->head; 143 q->head = job; 144 } 145 pthread_mutex_unlock(&q->mlock); 146 } 147 148 static struct jobs * 149 get_job(struct queue *q) 150 { 151 struct jobs *job; 152 pthread_mutex_lock(&q->mlock); 153 job = q->head; 154 if (job != NULL) { 155 q->head = job->next; 156 if (job == q->tail) 157 q->tail = NULL; 158 job->next = NULL; 159 } 160 pthread_mutex_unlock(&q->mlock); 161 return (job); 162 } 163 static void 164 add_to_done(struct async_work *work) 165 { 166 pthread_mutex_lock(&work_done.mlock); 167 if (work_done.head == NULL) { 168 work_done.head = work_done.tail = work; 169 pthread_cond_signal(&work_done.cv); 170 } else { 171 work_done.tail->u.next = work; 172 work_done.tail = work; 173 } 174 work->flags |= ASYNCWORK_COMPLETE; 175 work->u.next = NULL; 176 pthread_mutex_unlock(&work_done.mlock); 177 } 178 static void 179 do_work(struct async_work *work) 180 { 181 work->result = work->u.funcs.func(work->arg); 182 add_to_done(work); 183 } 184 static void * 185 worker(void *arg) 186 { 187 struct jobs *job = arg; 188 #ifdef __lint 189 int err = 0; 190 #endif 191 192 for (;;) { 193 while (pthread_mutex_lock(&job->mlock) != 0) 194 #ifdef __lint 195 err++; 196 #else 197 ; 198 #endif 199 while (job->state != READY_TO_RUN) { 200 pthread_cond_wait(&job->cv, &job->mlock); 201 } 202 do_work(job->work); 203 job->work = NULL; 204 job->state = COMPLETED; 205 pthread_mutex_unlock(&job->mlock); 206 add_to_job_lifo(&unemployed, job); 207 } 208 /*NOTREACHED*/ 209 } 210 211 void * 212 create_stack(size_t len) 213 { 214 int fd; 215 int redzone_len = getpagesize(); 216 char *addr; 217 218 if ((fd = open("/dev/zero", O_RDWR)) == -1) 219 return (NULL); 220 221 addr = mmap(NULL, len + redzone_len, PROT_READ|PROT_WRITE, 222 MAP_PRIVATE, fd, 0); 223 close(fd); 224 if (addr == MAP_FAILED) 225 return (NULL); 226 227 madvise(addr, redzone_len, MADV_DONTNEED); 228 madvise(addr, redzone_len, MADV_FREE); 229 mprotect(addr, redzone_len, PROT_NONE); 230 231 madvise(addr + redzone_len, len, MADV_ACCESS_LWP); 232 233 expect_signal(SIGSEGV, "Thread Buffer Overflow\n", addr, redzone_len); 234 235 plog(LOG_DEBUG, "Stack %p\n", addr + redzone_len); 236 return ((void *)(addr + redzone_len)); 237 } 238 239 /* 240 * create_job. Creates a new job structure and its thread. 241 */ 242 static struct jobs * 243 create_job(void) 244 { 245 void *stack; 246 struct jobs *job; 247 pthread_attr_t attr; 248 int err; 249 size_t stack_size = roundup(getpagesize() + PTHREAD_STACK_MIN, 250 getpagesize()); 251 252 if ((job = malloc(sizeof (struct jobs))) == NULL) 253 return (NULL); 254 255 if ((stack = create_stack(stack_size)) == NULL) { 256 free(job); 257 return (NULL); 258 } 259 pthread_mutex_init(&job->mlock, NULL); 260 pthread_cond_init(&job->cv, NULL); 261 job->next = NULL; 262 job->state = COMPLETED; 263 pthread_attr_init(&attr); 264 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); 265 pthread_attr_setstacksize(&attr, stack_size); 266 pthread_attr_setstackaddr(&attr, stack); 267 err = pthread_create(&job->thread, &attr, worker, job); 268 if (err != 0) { 269 pthread_cond_destroy(&job->cv); 270 pthread_mutex_destroy(&job->mlock); 271 free(job); 272 errno = err; 273 return (NULL); 274 } 275 return (job); 276 } 277 278 static struct jobs * 279 get_free_job(void) 280 { 281 struct jobs *job; 282 283 if ((job = get_job(&unemployed)) == NULL) { 284 job = create_job(); 285 } 286 return (job); 287 } 288 289 void * 290 async_queue_request(struct async_work *work) 291 { 292 struct jobs *job; 293 294 if ((job = get_free_job()) == NULL) 295 return (NULL); 296 pthread_mutex_lock(&job->mlock); 297 work->flags &= (~ASYNCWORK_COMPLETE); 298 job->work = work; 299 job->state = READY_TO_RUN; 300 pthread_cond_signal(&job->cv); 301 pthread_mutex_unlock(&job->mlock); 302 return (job); 303 } 304 /*ARGSUSED*/ 305 struct async_work * 306 async_queue_wait(void *handle, const struct timespec *abstime, int flags) 307 { 308 struct async_work *work; 309 pthread_mutex_lock(&work_done.mlock); 310 while (work_done.head == NULL) { 311 if (abstime == NULL) { 312 pthread_cond_wait(&work_done.cv, &work_done.mlock); 313 } else { 314 int x; 315 x = pthread_cond_timedwait(&work_done.cv, 316 &work_done.mlock, abstime); 317 if (x == ETIMEDOUT) { 318 pthread_mutex_unlock(&work_done.mlock); 319 return (NULL); 320 } 321 } 322 } 323 work = work_done.head; 324 work_done.head = work->u.next; 325 if (work_done.tail == work) { 326 work_done.tail = NULL; 327 } 328 work->u.next = NULL; 329 pthread_mutex_unlock(&work_done.mlock); 330 return (work); 331 } 332 333 void 334 find_work(struct async_work *work) 335 { 336 struct async_work *w; 337 338 for (w = work_done.head; w != NULL; w = w->u.next) { 339 if (w == work) 340 abort(); 341 } 342 } 343 int 344 async_cancel_request(struct async_work *work, int flags) 345 { 346 int ret; 347 pthread_mutex_lock(&work_done.mlock); 348 if (work->flags & ASYNCWORK_COMPLETE) { 349 ret = -1; 350 } else { 351 if (work->u.funcs.cancel) { 352 ret = work->u.funcs.cancel(work->arg, flags); 353 } else { 354 find_work(work); 355 abort(); 356 } 357 } 358 pthread_mutex_unlock(&work_done.mlock); 359 return (ret); 360 } 361