Home | History | Annotate | Download | only in common
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  *
     25  * Portions Copyright 2008 Denis Cheng
     26  */
     27 
     28 #include "config.h"
     29 #include <pthread.h>
     30 #ifdef HAVE_LWPS
     31 #include <sys/lwp.h>
     32 #endif
     33 #include <signal.h>
     34 
     35 #include "filebench.h"
     36 #include "threadflow.h"
     37 #include "flowop.h"
     38 #include "ipc.h"
     39 
     40 static threadflow_t *threadflow_define_common(procflow_t *procflow,
     41     char *name, threadflow_t *inherit, int instance);
     42 
     43 /*
     44  * Threadflows are filebench entities which manage operating system
     45  * threads. Each worker threadflow spawns a separate filebench thread,
     46  * with attributes inherited from a FLOW_MASTER threadflow created during
     47  * f model language parsing. This section contains routines to define,
     48  * create, control, and delete threadflows.
     49  *
     50  * Each thread defined in the f model creates a FLOW_MASTER
     51  * threadflow which encapsulates the defined attributes and flowops of
     52  * the f language thread, including the number of instances to create.
     53  * At runtime, a worker threadflow instance with an associated filebench
     54  * thread is created, which runs until told to quit or is specifically
     55  * deleted.
     56  */
     57 
     58 
     59 /*
     60  * Prints information about threadflow syntax.
     61  */
     62 void
     63 threadflow_usage(void)
     64 {
     65 	(void) fprintf(stderr, "  thread  name=<name>[,instances=<count>]\n");
     66 	(void) fprintf(stderr, "\n");
     67 	(void) fprintf(stderr, "  {\n");
     68 	(void) fprintf(stderr, "    flowop ...\n");
     69 	(void) fprintf(stderr, "    flowop ...\n");
     70 	(void) fprintf(stderr, "    flowop ...\n");
     71 	(void) fprintf(stderr, "  }\n");
     72 	(void) fprintf(stderr, "\n");
     73 }
     74 
     75 /*
     76  * Creates a thread for the supplied threadflow. If interprocess
     77  * shared memory is desired, then increments the amount of shared
     78  * memory needed by the amount specified in the threadflow's
     79  * tf_memsize parameter. The thread starts in routine
     80  * flowop_start() with a poineter to the threadflow supplied
     81  * as the argument.
     82  */
     83 static int
     84 threadflow_createthread(threadflow_t *threadflow)
     85 {
     86 	fbint_t memsize;
     87 	memsize = avd_get_int(threadflow->tf_memsize);
     88 	threadflow->tf_constmemsize = memsize;
     89 
     90 	filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld",
     91 	    threadflow->tf_name, memsize);
     92 
     93 	if (threadflow->tf_attrs & THREADFLOW_USEISM)
     94 		filebench_shm->shm_required += memsize;
     95 
     96 	if (pthread_create(&threadflow->tf_tid, NULL,
     97 	    (void *(*)(void*))flowop_start, threadflow) != 0) {
     98 		filebench_log(LOG_ERROR, "thread create failed");
     99 		filebench_shutdown(1);
    100 		return (FILEBENCH_ERROR);
    101 	}
    102 
    103 	return (FILEBENCH_OK);
    104 }
    105 
    106 /*
    107  * Creates threads for the threadflows associated with a procflow.
    108  * The routine iterates through the list of threadflows in the
    109  * supplied procflow's pf_threads list. For each threadflow on
    110  * the list, it defines tf_instances number of cloned
    111  * threadflows, and then calls threadflow_createthread() for
    112  * each to create and start the actual operating system thread.
    113  * Note that each of the newly defined threadflows will be linked
    114  * into the procflows threadflow list, but at the head of the
    115  * list, so they will not become part of the supplied set. After
    116  * all the threads have been created, threadflow_init enters
    117  * a join loop for all the threads in the newly defined
    118  * threadflows. Once all the created threads have exited,
    119  * threadflow_init will return 0. If errors are encountered, it
    120  * will return a non zero value.
    121  */
    122 int
    123 threadflow_init(procflow_t *procflow)
    124 {
    125 	threadflow_t *threadflow = procflow->pf_threads;
    126 	int ret = 0;
    127 
    128 	(void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
    129 
    130 	while (threadflow) {
    131 		threadflow_t *newthread;
    132 		int instances;
    133 		int i;
    134 
    135 		instances = avd_get_int(threadflow->tf_instances);
    136 		filebench_log(LOG_VERBOSE,
    137 		    "Starting %d %s threads",
    138 		    instances, threadflow->tf_name);
    139 
    140 		for (i = 1; i < instances; i++) {
    141 			/* Create threads */
    142 			newthread =
    143 			    threadflow_define_common(procflow,
    144 			    threadflow->tf_name, threadflow, i + 1);
    145 			if (newthread == NULL)
    146 				return (-1);
    147 			ret |= threadflow_createthread(newthread);
    148 		}
    149 
    150 		newthread = threadflow_define_common(procflow,
    151 		    threadflow->tf_name,
    152 		    threadflow, 1);
    153 
    154 		if (newthread == NULL)
    155 			return (-1);
    156 
    157 		/* Create each thread */
    158 		ret |= threadflow_createthread(newthread);
    159 
    160 		threadflow = threadflow->tf_next;
    161 	}
    162 
    163 	threadflow = procflow->pf_threads;
    164 
    165 	(void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
    166 
    167 	while (threadflow) {
    168 		/* wait for all threads to finish */
    169 		if (threadflow->tf_tid) {
    170 			void *status;
    171 
    172 			if (pthread_join(threadflow->tf_tid, &status) == 0)
    173 				ret += *(int *)status;
    174 		}
    175 		threadflow = threadflow->tf_next;
    176 	}
    177 
    178 	procflow->pf_running = 0;
    179 
    180 	return (ret);
    181 }
    182 
    183 /*
    184  * Tells the threadflow's thread to stop and optionally signals
    185  * its associated process to end the thread.
    186  */
    187 static void
    188 threadflow_kill(threadflow_t *threadflow)
    189 {
    190 	int wait_cnt = 2;
    191 
    192 	/* Tell thread to finish */
    193 	threadflow->tf_abort = 1;
    194 
    195 	/* wait a bit for threadflow to stop */
    196 	while (wait_cnt && threadflow->tf_running) {
    197 		(void) sleep(1);
    198 		wait_cnt--;
    199 	}
    200 
    201 	if (threadflow->tf_running) {
    202 		threadflow->tf_running = FALSE;
    203 		(void) pthread_kill(threadflow->tf_tid, SIGKILL);
    204 	}
    205 }
    206 
    207 /*
    208  * Deletes the specified threadflow from the specified threadflow
    209  * list after first terminating the threadflow's thread, deleting
    210  * the threadflow's flowops, and finally freeing the threadflow
    211  * entity. It also subtracts the threadflow's shared memory
    212  * requirements from the total amount required, shm_required. If
    213  * the specified threadflow is found, returns 0, otherwise
    214  * returns -1.
    215  */
    216 static int
    217 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow)
    218 {
    219 	threadflow_t *entry = *threadlist;
    220 
    221 	filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)",
    222 	    threadflow->tf_name,
    223 	    threadflow->tf_instance);
    224 
    225 	if (threadflow->tf_attrs & THREADFLOW_USEISM)
    226 		filebench_shm->shm_required -= threadflow->tf_constmemsize;
    227 
    228 	if (threadflow == *threadlist) {
    229 		/* First on list */
    230 		filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)",
    231 		    threadflow->tf_name,
    232 		    threadflow->tf_instance);
    233 
    234 		threadflow_kill(threadflow);
    235 		flowop_delete_all(&threadflow->tf_thrd_fops);
    236 		*threadlist = threadflow->tf_next;
    237 		(void) pthread_mutex_destroy(&threadflow->tf_lock);
    238 		ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
    239 		return (0);
    240 	}
    241 
    242 	while (entry->tf_next) {
    243 		filebench_log(LOG_DEBUG_IMPL,
    244 		    "Delete thread: (%s-%d) == (%s-%d)",
    245 		    entry->tf_next->tf_name,
    246 		    entry->tf_next->tf_instance,
    247 		    threadflow->tf_name,
    248 		    threadflow->tf_instance);
    249 
    250 		if (threadflow == entry->tf_next) {
    251 			/* Delete */
    252 			filebench_log(LOG_DEBUG_IMPL,
    253 			    "Deleted thread: (%s-%d)",
    254 			    entry->tf_next->tf_name,
    255 			    entry->tf_next->tf_instance);
    256 			threadflow_kill(entry->tf_next);
    257 			flowop_delete_all(&entry->tf_next->tf_thrd_fops);
    258 			(void) pthread_mutex_destroy(&threadflow->tf_lock);
    259 			ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow);
    260 			entry->tf_next = entry->tf_next->tf_next;
    261 			return (0);
    262 		}
    263 		entry = entry->tf_next;
    264 	}
    265 
    266 	return (-1);
    267 }
    268 
    269 /*
    270  * Given a pointer to the thread list of a procflow, cycles
    271  * through all the threadflows on the list, deleting each one
    272  * except the FLOW_MASTER.
    273  */
    274 void
    275 threadflow_delete_all(threadflow_t **threadlist)
    276 {
    277 	threadflow_t *threadflow;
    278 
    279 	(void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
    280 
    281 	threadflow = *threadlist;
    282 	filebench_log(LOG_DEBUG_IMPL, "Deleting all threads");
    283 
    284 	while (threadflow) {
    285 		if (threadflow->tf_instance &&
    286 		    (threadflow->tf_instance == FLOW_MASTER)) {
    287 			threadflow = threadflow->tf_next;
    288 			continue;
    289 		}
    290 		(void) threadflow_delete(threadlist, threadflow);
    291 		threadflow = threadflow->tf_next;
    292 	}
    293 
    294 	(void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
    295 }
    296 
    297 /*
    298  * Waits till all threadflows are started, or a timeout occurs.
    299  * Checks through the list of threadflows, waiting up to 10
    300  * seconds for each one to set its tf_running flag to 1. If not
    301  * set after 10 seconds, continues on to the next threadflow
    302  * anyway.
    303  */
    304 void
    305 threadflow_allstarted(pid_t pid, threadflow_t *threadflow)
    306 {
    307 	(void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
    308 
    309 	while (threadflow) {
    310 		int waits;
    311 
    312 		if ((threadflow->tf_instance == 0) ||
    313 		    (threadflow->tf_instance == FLOW_MASTER)) {
    314 			threadflow = threadflow->tf_next;
    315 			continue;
    316 		}
    317 
    318 		filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d",
    319 		    pid,
    320 		    threadflow->tf_name,
    321 		    threadflow->tf_instance);
    322 
    323 		waits = 10;
    324 		while (waits && (threadflow->tf_running == 0) &&
    325 		    (filebench_shm->shm_f_abort == 0)) {
    326 			(void) ipc_mutex_unlock(
    327 			    &filebench_shm->shm_threadflow_lock);
    328 			if (waits < 3)
    329 				filebench_log(LOG_INFO,
    330 				    "Waiting for pid %d thread %s-%d",
    331 				    pid,
    332 				    threadflow->tf_name,
    333 				    threadflow->tf_instance);
    334 
    335 			(void) sleep(1);
    336 			(void) ipc_mutex_lock(
    337 			    &filebench_shm->shm_threadflow_lock);
    338 			waits--;
    339 		}
    340 
    341 		threadflow = threadflow->tf_next;
    342 	}
    343 
    344 	(void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
    345 }
    346 
    347 /*
    348  * Create an in-memory thread object linked to a parent procflow.
    349  * A threadflow entity is allocated from shared memory and
    350  * initialized from the "inherit" threadflow if supplied,
    351  * otherwise to zeros. The threadflow is assigned a unique
    352  * thread id, the supplied instance number, the supplied name
    353  * and added to the procflow's pf_thread list. If no name is
    354  * supplied or the threadflow can't be allocated, NULL is
    355  * returned Otherwise a pointer to the newly allocated threadflow
    356  * is returned.
    357  *
    358  * The filebench_shm->shm_threadflow_lock must be held by the caller.
    359  */
    360 static threadflow_t *
    361 threadflow_define_common(procflow_t *procflow, char *name,
    362     threadflow_t *inherit, int instance)
    363 {
    364 	threadflow_t *threadflow;
    365 	threadflow_t **threadlistp = &procflow->pf_threads;
    366 
    367 	if (name == NULL)
    368 		return (NULL);
    369 
    370 	threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW);
    371 
    372 	if (threadflow == NULL)
    373 		return (NULL);
    374 
    375 	if (inherit)
    376 		(void) memcpy(threadflow, inherit, sizeof (threadflow_t));
    377 	else
    378 		(void) memset(threadflow, 0, sizeof (threadflow_t));
    379 
    380 	threadflow->tf_utid = ++filebench_shm->shm_utid;
    381 
    382 	threadflow->tf_instance = instance;
    383 	(void) strcpy(threadflow->tf_name, name);
    384 	threadflow->tf_process = procflow;
    385 	(void) pthread_mutex_init(&threadflow->tf_lock,
    386 	    ipc_mutexattr(IPC_MUTEX_NORMAL));
    387 
    388 	filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d",
    389 	    name, instance);
    390 
    391 	/* Add threadflow to list */
    392 	if (*threadlistp == NULL) {
    393 		*threadlistp = threadflow;
    394 		threadflow->tf_next = NULL;
    395 	} else {
    396 		threadflow->tf_next = *threadlistp;
    397 		*threadlistp = threadflow;
    398 	}
    399 
    400 	return (threadflow);
    401 }
    402 
    403 /*
    404  * Create an in memory FLOW_MASTER thread object as described
    405  * by the syntax. Acquire the  filebench_shm->shm_threadflow_lock and
    406  * call threadflow_define_common() to create a threadflow entity.
    407  * Set the number of instances to create at runtime,
    408  * tf_instances, to "instances". Return the threadflow pointer
    409  * returned by the threadflow_define_common call.
    410  */
    411 threadflow_t *
    412 threadflow_define(procflow_t *procflow, char *name,
    413     threadflow_t *inherit, avd_t instances)
    414 {
    415 	threadflow_t *threadflow;
    416 
    417 	(void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
    418 
    419 	if ((threadflow = threadflow_define_common(procflow, name,
    420 	    inherit, FLOW_MASTER)) == NULL)
    421 		return (NULL);
    422 
    423 	threadflow->tf_instances = instances;
    424 
    425 	(void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
    426 
    427 	return (threadflow);
    428 }
    429 
    430 
    431 /*
    432  * Searches the provided threadflow list for the named threadflow.
    433  * A pointer to the threadflow is returned, or NULL if threadflow
    434  * is not found.
    435  */
    436 threadflow_t *
    437 threadflow_find(threadflow_t *threadlist, char *name)
    438 {
    439 	threadflow_t *threadflow = threadlist;
    440 
    441 	(void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock);
    442 
    443 	while (threadflow) {
    444 		if (strcmp(name, threadflow->tf_name) == 0) {
    445 
    446 			(void) ipc_mutex_unlock(
    447 			    &filebench_shm->shm_threadflow_lock);
    448 
    449 			return (threadflow);
    450 		}
    451 		threadflow = threadflow->tf_next;
    452 	}
    453 
    454 	(void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock);
    455 
    456 
    457 	return (NULL);
    458 }
    459