1 5184 ek110237 /* 2 5184 ek110237 * CDDL HEADER START 3 5184 ek110237 * 4 5184 ek110237 * The contents of this file are subject to the terms of the 5 5184 ek110237 * Common Development and Distribution License (the "License"). 6 5184 ek110237 * You may not use this file except in compliance with the License. 7 5184 ek110237 * 8 5184 ek110237 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 5184 ek110237 * or http://www.opensolaris.org/os/licensing. 10 5184 ek110237 * See the License for the specific language governing permissions 11 5184 ek110237 * and limitations under the License. 12 5184 ek110237 * 13 5184 ek110237 * When distributing Covered Code, include this CDDL HEADER in each 14 5184 ek110237 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 5184 ek110237 * If applicable, add the following below this CDDL HEADER, with the 16 5184 ek110237 * fields enclosed by brackets "[]" replaced with your own identifying 17 5184 ek110237 * information: Portions Copyright [yyyy] [name of copyright owner] 18 5184 ek110237 * 19 5184 ek110237 * CDDL HEADER END 20 5184 ek110237 */ 21 5184 ek110237 /* 22 8615 Andrew * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 5184 ek110237 * Use is subject to license terms. 24 6613 ek110237 * 25 6613 ek110237 * Portions Copyright 2008 Denis Cheng 26 5184 ek110237 */ 27 5184 ek110237 28 5184 ek110237 #include "config.h" 29 5184 ek110237 #include <pthread.h> 30 5184 ek110237 #ifdef HAVE_LWPS 31 5184 ek110237 #include <sys/lwp.h> 32 5184 ek110237 #endif 33 5184 ek110237 #include <signal.h> 34 6613 ek110237 35 6613 ek110237 #include "filebench.h" 36 5184 ek110237 #include "threadflow.h" 37 5184 ek110237 #include "flowop.h" 38 5184 ek110237 #include "ipc.h" 39 5184 ek110237 40 5184 ek110237 static threadflow_t *threadflow_define_common(procflow_t *procflow, 41 5184 ek110237 char *name, threadflow_t *inherit, int instance); 42 5184 ek110237 43 5184 ek110237 /* 44 5184 ek110237 * Threadflows are filebench entities which manage operating system 45 5184 ek110237 * threads. Each worker threadflow spawns a separate filebench thread, 46 5184 ek110237 * with attributes inherited from a FLOW_MASTER threadflow created during 47 5184 ek110237 * f model language parsing. This section contains routines to define, 48 5184 ek110237 * create, control, and delete threadflows. 49 5184 ek110237 * 50 5184 ek110237 * Each thread defined in the f model creates a FLOW_MASTER 51 5184 ek110237 * threadflow which encapsulates the defined attributes and flowops of 52 5184 ek110237 * the f language thread, including the number of instances to create. 53 5184 ek110237 * At runtime, a worker threadflow instance with an associated filebench 54 5184 ek110237 * thread is created, which runs until told to quit or is specifically 55 5184 ek110237 * deleted. 56 5184 ek110237 */ 57 5184 ek110237 58 5184 ek110237 59 5184 ek110237 /* 60 5184 ek110237 * Prints information about threadflow syntax. 61 5184 ek110237 */ 62 5184 ek110237 void 63 5184 ek110237 threadflow_usage(void) 64 5184 ek110237 { 65 5184 ek110237 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n"); 66 5184 ek110237 (void) fprintf(stderr, "\n"); 67 5184 ek110237 (void) fprintf(stderr, " {\n"); 68 5184 ek110237 (void) fprintf(stderr, " flowop ...\n"); 69 5184 ek110237 (void) fprintf(stderr, " flowop ...\n"); 70 5184 ek110237 (void) fprintf(stderr, " flowop ...\n"); 71 5184 ek110237 (void) fprintf(stderr, " }\n"); 72 5184 ek110237 (void) fprintf(stderr, "\n"); 73 5184 ek110237 } 74 5184 ek110237 75 5184 ek110237 /* 76 5184 ek110237 * Creates a thread for the supplied threadflow. If interprocess 77 5184 ek110237 * shared memory is desired, then increments the amount of shared 78 5184 ek110237 * memory needed by the amount specified in the threadflow's 79 5184 ek110237 * tf_memsize parameter. The thread starts in routine 80 5184 ek110237 * flowop_start() with a poineter to the threadflow supplied 81 5184 ek110237 * as the argument. 82 5184 ek110237 */ 83 5184 ek110237 static int 84 5184 ek110237 threadflow_createthread(threadflow_t *threadflow) 85 5184 ek110237 { 86 6212 aw148015 fbint_t memsize; 87 6212 aw148015 memsize = avd_get_int(threadflow->tf_memsize); 88 6212 aw148015 threadflow->tf_constmemsize = memsize; 89 6212 aw148015 90 5184 ek110237 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld", 91 6212 aw148015 threadflow->tf_name, memsize); 92 5184 ek110237 93 5184 ek110237 if (threadflow->tf_attrs & THREADFLOW_USEISM) 94 6212 aw148015 filebench_shm->shm_required += memsize; 95 5184 ek110237 96 5184 ek110237 if (pthread_create(&threadflow->tf_tid, NULL, 97 5184 ek110237 (void *(*)(void*))flowop_start, threadflow) != 0) { 98 5184 ek110237 filebench_log(LOG_ERROR, "thread create failed"); 99 5184 ek110237 filebench_shutdown(1); 100 6084 aw148015 return (FILEBENCH_ERROR); 101 5184 ek110237 } 102 5184 ek110237 103 6084 aw148015 return (FILEBENCH_OK); 104 5184 ek110237 } 105 5184 ek110237 106 5184 ek110237 /* 107 5184 ek110237 * Creates threads for the threadflows associated with a procflow. 108 5184 ek110237 * The routine iterates through the list of threadflows in the 109 5184 ek110237 * supplied procflow's pf_threads list. For each threadflow on 110 5184 ek110237 * the list, it defines tf_instances number of cloned 111 5184 ek110237 * threadflows, and then calls threadflow_createthread() for 112 5184 ek110237 * each to create and start the actual operating system thread. 113 5184 ek110237 * Note that each of the newly defined threadflows will be linked 114 5184 ek110237 * into the procflows threadflow list, but at the head of the 115 5184 ek110237 * list, so they will not become part of the supplied set. After 116 5184 ek110237 * all the threads have been created, threadflow_init enters 117 5184 ek110237 * a join loop for all the threads in the newly defined 118 5184 ek110237 * threadflows. Once all the created threads have exited, 119 5184 ek110237 * threadflow_init will return 0. If errors are encountered, it 120 5184 ek110237 * will return a non zero value. 121 5184 ek110237 */ 122 5184 ek110237 int 123 5184 ek110237 threadflow_init(procflow_t *procflow) 124 5184 ek110237 { 125 5184 ek110237 threadflow_t *threadflow = procflow->pf_threads; 126 5184 ek110237 int ret = 0; 127 5184 ek110237 128 6391 aw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 129 6084 aw148015 130 5184 ek110237 while (threadflow) { 131 5184 ek110237 threadflow_t *newthread; 132 6212 aw148015 int instances; 133 5184 ek110237 int i; 134 5184 ek110237 135 6212 aw148015 instances = avd_get_int(threadflow->tf_instances); 136 5184 ek110237 filebench_log(LOG_VERBOSE, 137 6286 aw148015 "Starting %d %s threads", 138 6212 aw148015 instances, threadflow->tf_name); 139 5184 ek110237 140 6212 aw148015 for (i = 1; i < instances; i++) { 141 5184 ek110237 /* Create threads */ 142 5184 ek110237 newthread = 143 5184 ek110237 threadflow_define_common(procflow, 144 5184 ek110237 threadflow->tf_name, threadflow, i + 1); 145 5184 ek110237 if (newthread == NULL) 146 5184 ek110237 return (-1); 147 6084 aw148015 ret |= threadflow_createthread(newthread); 148 5184 ek110237 } 149 5184 ek110237 150 5184 ek110237 newthread = threadflow_define_common(procflow, 151 5184 ek110237 threadflow->tf_name, 152 5184 ek110237 threadflow, 1); 153 5184 ek110237 154 5184 ek110237 if (newthread == NULL) 155 5184 ek110237 return (-1); 156 5184 ek110237 157 6084 aw148015 /* Create each thread */ 158 6084 aw148015 ret |= threadflow_createthread(newthread); 159 5184 ek110237 160 5184 ek110237 threadflow = threadflow->tf_next; 161 5184 ek110237 } 162 5184 ek110237 163 5184 ek110237 threadflow = procflow->pf_threads; 164 5184 ek110237 165 6391 aw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 166 5184 ek110237 167 5184 ek110237 while (threadflow) { 168 6333 ek110237 /* wait for all threads to finish */ 169 6333 ek110237 if (threadflow->tf_tid) { 170 6333 ek110237 void *status; 171 5184 ek110237 172 6333 ek110237 if (pthread_join(threadflow->tf_tid, &status) == 0) 173 6333 ek110237 ret += *(int *)status; 174 6333 ek110237 } 175 5184 ek110237 threadflow = threadflow->tf_next; 176 5184 ek110237 } 177 5184 ek110237 178 5184 ek110237 procflow->pf_running = 0; 179 5184 ek110237 180 5184 ek110237 return (ret); 181 5184 ek110237 } 182 5184 ek110237 183 5184 ek110237 /* 184 5184 ek110237 * Tells the threadflow's thread to stop and optionally signals 185 5184 ek110237 * its associated process to end the thread. 186 5184 ek110237 */ 187 5184 ek110237 static void 188 8762 Andrew threadflow_kill(threadflow_t *threadflow) 189 5184 ek110237 { 190 8762 Andrew int wait_cnt = 2; 191 8762 Andrew 192 5184 ek110237 /* Tell thread to finish */ 193 5184 ek110237 threadflow->tf_abort = 1; 194 6084 aw148015 195 6084 aw148015 /* wait a bit for threadflow to stop */ 196 6084 aw148015 while (wait_cnt && threadflow->tf_running) { 197 6084 aw148015 (void) sleep(1); 198 6084 aw148015 wait_cnt--; 199 6084 aw148015 } 200 5184 ek110237 201 8762 Andrew if (threadflow->tf_running) { 202 8762 Andrew threadflow->tf_running = FALSE; 203 8769 Andrew (void) pthread_kill(threadflow->tf_tid, SIGKILL); 204 8762 Andrew } 205 5184 ek110237 } 206 5184 ek110237 207 5184 ek110237 /* 208 5184 ek110237 * Deletes the specified threadflow from the specified threadflow 209 5184 ek110237 * list after first terminating the threadflow's thread, deleting 210 5184 ek110237 * the threadflow's flowops, and finally freeing the threadflow 211 5184 ek110237 * entity. It also subtracts the threadflow's shared memory 212 5184 ek110237 * requirements from the total amount required, shm_required. If 213 5184 ek110237 * the specified threadflow is found, returns 0, otherwise 214 5184 ek110237 * returns -1. 215 5184 ek110237 */ 216 5184 ek110237 static int 217 8762 Andrew threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow) 218 5184 ek110237 { 219 5184 ek110237 threadflow_t *entry = *threadlist; 220 5184 ek110237 221 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)", 222 5184 ek110237 threadflow->tf_name, 223 5184 ek110237 threadflow->tf_instance); 224 5184 ek110237 225 6212 aw148015 if (threadflow->tf_attrs & THREADFLOW_USEISM) 226 6212 aw148015 filebench_shm->shm_required -= threadflow->tf_constmemsize; 227 5184 ek110237 228 5184 ek110237 if (threadflow == *threadlist) { 229 5184 ek110237 /* First on list */ 230 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", 231 5184 ek110237 threadflow->tf_name, 232 5184 ek110237 threadflow->tf_instance); 233 5184 ek110237 234 8762 Andrew threadflow_kill(threadflow); 235 6550 aw148015 flowop_delete_all(&threadflow->tf_thrd_fops); 236 5184 ek110237 *threadlist = threadflow->tf_next; 237 6613 ek110237 (void) pthread_mutex_destroy(&threadflow->tf_lock); 238 5184 ek110237 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 239 5184 ek110237 return (0); 240 5184 ek110237 } 241 5184 ek110237 242 5184 ek110237 while (entry->tf_next) { 243 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, 244 5184 ek110237 "Delete thread: (%s-%d) == (%s-%d)", 245 5184 ek110237 entry->tf_next->tf_name, 246 5184 ek110237 entry->tf_next->tf_instance, 247 5184 ek110237 threadflow->tf_name, 248 5184 ek110237 threadflow->tf_instance); 249 5184 ek110237 250 5184 ek110237 if (threadflow == entry->tf_next) { 251 5184 ek110237 /* Delete */ 252 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, 253 5184 ek110237 "Deleted thread: (%s-%d)", 254 5184 ek110237 entry->tf_next->tf_name, 255 5184 ek110237 entry->tf_next->tf_instance); 256 8762 Andrew threadflow_kill(entry->tf_next); 257 6550 aw148015 flowop_delete_all(&entry->tf_next->tf_thrd_fops); 258 6613 ek110237 (void) pthread_mutex_destroy(&threadflow->tf_lock); 259 5184 ek110237 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 260 5184 ek110237 entry->tf_next = entry->tf_next->tf_next; 261 5184 ek110237 return (0); 262 5184 ek110237 } 263 5184 ek110237 entry = entry->tf_next; 264 5184 ek110237 } 265 5184 ek110237 266 5184 ek110237 return (-1); 267 5184 ek110237 } 268 5184 ek110237 269 5184 ek110237 /* 270 5184 ek110237 * Given a pointer to the thread list of a procflow, cycles 271 5184 ek110237 * through all the threadflows on the list, deleting each one 272 5184 ek110237 * except the FLOW_MASTER. 273 5184 ek110237 */ 274 5184 ek110237 void 275 8762 Andrew threadflow_delete_all(threadflow_t **threadlist) 276 5184 ek110237 { 277 8615 Andrew threadflow_t *threadflow; 278 5184 ek110237 279 6391 aw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 280 5184 ek110237 281 8615 Andrew threadflow = *threadlist; 282 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads"); 283 5184 ek110237 284 5184 ek110237 while (threadflow) { 285 5184 ek110237 if (threadflow->tf_instance && 286 5184 ek110237 (threadflow->tf_instance == FLOW_MASTER)) { 287 5184 ek110237 threadflow = threadflow->tf_next; 288 5184 ek110237 continue; 289 5184 ek110237 } 290 8762 Andrew (void) threadflow_delete(threadlist, threadflow); 291 5184 ek110237 threadflow = threadflow->tf_next; 292 5184 ek110237 } 293 5184 ek110237 294 6391 aw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 295 5184 ek110237 } 296 5184 ek110237 297 5184 ek110237 /* 298 5184 ek110237 * Waits till all threadflows are started, or a timeout occurs. 299 5184 ek110237 * Checks through the list of threadflows, waiting up to 10 300 5184 ek110237 * seconds for each one to set its tf_running flag to 1. If not 301 5184 ek110237 * set after 10 seconds, continues on to the next threadflow 302 5184 ek110237 * anyway. 303 5184 ek110237 */ 304 5184 ek110237 void 305 5184 ek110237 threadflow_allstarted(pid_t pid, threadflow_t *threadflow) 306 5184 ek110237 { 307 6391 aw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 308 5184 ek110237 309 5184 ek110237 while (threadflow) { 310 5184 ek110237 int waits; 311 5184 ek110237 312 5184 ek110237 if ((threadflow->tf_instance == 0) || 313 5184 ek110237 (threadflow->tf_instance == FLOW_MASTER)) { 314 5184 ek110237 threadflow = threadflow->tf_next; 315 5184 ek110237 continue; 316 5184 ek110237 } 317 5184 ek110237 318 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d", 319 5184 ek110237 pid, 320 5184 ek110237 threadflow->tf_name, 321 5184 ek110237 threadflow->tf_instance); 322 5184 ek110237 323 5184 ek110237 waits = 10; 324 6391 aw148015 while (waits && (threadflow->tf_running == 0) && 325 6391 aw148015 (filebench_shm->shm_f_abort == 0)) { 326 5184 ek110237 (void) ipc_mutex_unlock( 327 6391 aw148015 &filebench_shm->shm_threadflow_lock); 328 5184 ek110237 if (waits < 3) 329 5184 ek110237 filebench_log(LOG_INFO, 330 5184 ek110237 "Waiting for pid %d thread %s-%d", 331 5184 ek110237 pid, 332 5184 ek110237 threadflow->tf_name, 333 5184 ek110237 threadflow->tf_instance); 334 5184 ek110237 335 5184 ek110237 (void) sleep(1); 336 6391 aw148015 (void) ipc_mutex_lock( 337 6391 aw148015 &filebench_shm->shm_threadflow_lock); 338 5184 ek110237 waits--; 339 5184 ek110237 } 340 5184 ek110237 341 5184 ek110237 threadflow = threadflow->tf_next; 342 5184 ek110237 } 343 5184 ek110237 344 6391 aw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 345 5184 ek110237 } 346 5184 ek110237 347 5184 ek110237 /* 348 5184 ek110237 * Create an in-memory thread object linked to a parent procflow. 349 5184 ek110237 * A threadflow entity is allocated from shared memory and 350 5184 ek110237 * initialized from the "inherit" threadflow if supplied, 351 5184 ek110237 * otherwise to zeros. The threadflow is assigned a unique 352 5184 ek110237 * thread id, the supplied instance number, the supplied name 353 5184 ek110237 * and added to the procflow's pf_thread list. If no name is 354 5184 ek110237 * supplied or the threadflow can't be allocated, NULL is 355 5184 ek110237 * returned Otherwise a pointer to the newly allocated threadflow 356 5184 ek110237 * is returned. 357 5184 ek110237 * 358 6391 aw148015 * The filebench_shm->shm_threadflow_lock must be held by the caller. 359 5184 ek110237 */ 360 5184 ek110237 static threadflow_t * 361 5184 ek110237 threadflow_define_common(procflow_t *procflow, char *name, 362 5184 ek110237 threadflow_t *inherit, int instance) 363 5184 ek110237 { 364 5184 ek110237 threadflow_t *threadflow; 365 5184 ek110237 threadflow_t **threadlistp = &procflow->pf_threads; 366 5184 ek110237 367 5184 ek110237 if (name == NULL) 368 5184 ek110237 return (NULL); 369 5184 ek110237 370 5184 ek110237 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW); 371 5184 ek110237 372 5184 ek110237 if (threadflow == NULL) 373 5184 ek110237 return (NULL); 374 5184 ek110237 375 5184 ek110237 if (inherit) 376 5184 ek110237 (void) memcpy(threadflow, inherit, sizeof (threadflow_t)); 377 5184 ek110237 else 378 5184 ek110237 (void) memset(threadflow, 0, sizeof (threadflow_t)); 379 5184 ek110237 380 6391 aw148015 threadflow->tf_utid = ++filebench_shm->shm_utid; 381 5184 ek110237 382 5184 ek110237 threadflow->tf_instance = instance; 383 5184 ek110237 (void) strcpy(threadflow->tf_name, name); 384 5184 ek110237 threadflow->tf_process = procflow; 385 7556 Andrew (void) pthread_mutex_init(&threadflow->tf_lock, 386 7556 Andrew ipc_mutexattr(IPC_MUTEX_NORMAL)); 387 5184 ek110237 388 5184 ek110237 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d", 389 5184 ek110237 name, instance); 390 5184 ek110237 391 5184 ek110237 /* Add threadflow to list */ 392 5184 ek110237 if (*threadlistp == NULL) { 393 5184 ek110237 *threadlistp = threadflow; 394 5184 ek110237 threadflow->tf_next = NULL; 395 5184 ek110237 } else { 396 5184 ek110237 threadflow->tf_next = *threadlistp; 397 5184 ek110237 *threadlistp = threadflow; 398 5184 ek110237 } 399 5184 ek110237 400 5184 ek110237 return (threadflow); 401 5184 ek110237 } 402 5184 ek110237 403 5184 ek110237 /* 404 5184 ek110237 * Create an in memory FLOW_MASTER thread object as described 405 6391 aw148015 * by the syntax. Acquire the filebench_shm->shm_threadflow_lock and 406 5184 ek110237 * call threadflow_define_common() to create a threadflow entity. 407 5184 ek110237 * Set the number of instances to create at runtime, 408 5184 ek110237 * tf_instances, to "instances". Return the threadflow pointer 409 5184 ek110237 * returned by the threadflow_define_common call. 410 5184 ek110237 */ 411 5184 ek110237 threadflow_t * 412 5184 ek110237 threadflow_define(procflow_t *procflow, char *name, 413 6212 aw148015 threadflow_t *inherit, avd_t instances) 414 5184 ek110237 { 415 5184 ek110237 threadflow_t *threadflow; 416 5184 ek110237 417 6391 aw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 418 5184 ek110237 419 5184 ek110237 if ((threadflow = threadflow_define_common(procflow, name, 420 5184 ek110237 inherit, FLOW_MASTER)) == NULL) 421 5184 ek110237 return (NULL); 422 5184 ek110237 423 5184 ek110237 threadflow->tf_instances = instances; 424 5184 ek110237 425 6391 aw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 426 5184 ek110237 427 5184 ek110237 return (threadflow); 428 5184 ek110237 } 429 5184 ek110237 430 5184 ek110237 431 5184 ek110237 /* 432 5184 ek110237 * Searches the provided threadflow list for the named threadflow. 433 5184 ek110237 * A pointer to the threadflow is returned, or NULL if threadflow 434 5184 ek110237 * is not found. 435 5184 ek110237 */ 436 5184 ek110237 threadflow_t * 437 5184 ek110237 threadflow_find(threadflow_t *threadlist, char *name) 438 5184 ek110237 { 439 5184 ek110237 threadflow_t *threadflow = threadlist; 440 5184 ek110237 441 6391 aw148015 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 442 5184 ek110237 443 5184 ek110237 while (threadflow) { 444 5184 ek110237 if (strcmp(name, threadflow->tf_name) == 0) { 445 5184 ek110237 446 5184 ek110237 (void) ipc_mutex_unlock( 447 6391 aw148015 &filebench_shm->shm_threadflow_lock); 448 5184 ek110237 449 5184 ek110237 return (threadflow); 450 5184 ek110237 } 451 5184 ek110237 threadflow = threadflow->tf_next; 452 5184 ek110237 } 453 5184 ek110237 454 6391 aw148015 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 455 5184 ek110237 456 5184 ek110237 457 5184 ek110237 return (NULL); 458 5184 ek110237 } 459