Home | History | Annotate | Download | only in common
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include "config.h"
     27 
     28 #ifdef HAVE_LWPS
     29 #include <sys/lwp.h>
     30 #endif
     31 #include <fcntl.h>
     32 #include "filebench.h"
     33 #include "flowop.h"
     34 #include "stats.h"
     35 
     36 #ifdef LINUX_PORT
     37 #include <sys/types.h>
     38 #include <linux/unistd.h>
     39 #endif
     40 
     41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
     42     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
     43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
     44 static int flowop_composite_init(flowop_t *flowop);
     45 static void flowop_composite_destruct(flowop_t *flowop);
     46 
     47 /*
     48  * A collection of flowop support functions. The actual code that
     49  * implements the various flowops is in flowop_library.c.
     50  *
     51  * Routines for defining, creating, initializing and destroying
     52  * flowops, cyclically invoking the flowops on each threadflow's flowop
     53  * list, collecting statistics about flowop execution, and other
     54  * housekeeping duties are included in this file.
     55  *
     56  * User Defined Composite Flowops
     57  *    The ability to define new flowops as lists of built-in or previously
     58  * defined flowops has been added to Filebench. In a sense they are like
     59  * in-line subroutines, which can have default attributes set at definition
     60  * time and passed arguments at invocation time. Like other flowops (and
     61  * unlike conventional subroutines) you can invoke them with an iteration
     62  * count (the "iter" attribute), and they will loop through their associated
     63  * list of flowops for "iter" number of times each time they are encountered
     64  * in the thread or outer composite flowop which invokes them.
     65  *
     66  * Composite flowops are created with a "define" command, are given a name,
     67  * optional default attributes, and local variable definitions on the
     68  * "define" command line, followed by a brace enclosed list of flowops
     69  * to execute. The enclosed flowops may include attributes that reference
     70  * the local variables, as well as constants and global variables.
     71  *
     72  * Composite flowops are used pretty much like regular flowops, but you can
     73  * also set local variables to constants or global variables ($local_var =
     74  * [$var | $random_var | string | boolean | integer | double]) as part of
     75  * the invocation. Thus each invocation can pass customized values to its
     76  * inner flowops, greatly increasing their generality.
     77  *
     78  * All flowops are placed on a global, singly linked list, with fo_next
     79  * being the link pointer for this list. The are also placed on a private
     80  * list for the thread or composite flowop they are part of. The tf_thrd_fops
     81  * pointer in the thread will point to the list of top level flowops in the
     82  * thread, which are linked together by fo_exec_next. If any of these flowops
     83  * are composite flowops, they will have a list of second level flowops rooted
     84  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
     85  * of all flowops, and an n-arry tree of threads, composite flowops, and
     86  * flowops, with composite flowops being the branch nodes in the tree.
     87  *
     88  * To illustrate, if we have three first level flowops, the first of which is
     89  * a composite flowop consisting of two other flowops, we get:
     90  *
     91  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
     92  *			   flowop->fo_comp_fops		    |
     93  *				    |			    V
     94  *				    |			flowop->fo_exec_next
     95  *				    |
     96  *				    V
     97  *				flowop->fo_exec_next -> flowop->fo_exec_next
     98  *
     99  * And all five flowops (plus others from any other threads) are on a global
    100  * list linked with fo_next.
    101  */
    102 
    103 /*
    104  * Prints the name and instance number of each flowop in
    105  * the supplied list to the filebench log.
    106  */
    107 int
    108 flowop_printlist(flowop_t *list)
    109 {
    110 	flowop_t *flowop = list;
    111 
    112 	while (flowop) {
    113 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
    114 		    flowop->fo_name, flowop->fo_instance);
    115 		flowop = flowop->fo_exec_next;
    116 	}
    117 	return (0);
    118 }
    119 
    120 /*
    121  * Prints the name and instance number of all flowops on
    122  * the master flowop list to the console and the filebench log.
    123  */
    124 void
    125 flowop_printall(void)
    126 {
    127 	flowop_t *flowop = filebench_shm->shm_flowoplist;
    128 
    129 	while (flowop) {
    130 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
    131 		    flowop->fo_name, flowop->fo_instance);
    132 		flowop = flowop->fo_next;
    133 	}
    134 }
    135 
    136 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
    137 					(e.tv_nsec - s.tv_nsec))
    138 /*
    139  * Puts current high resolution time in start time entry
    140  * for threadflow and may also calculate running filebench
    141  * overhead statistics.
    142  */
    143 void
    144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
    145 {
    146 #ifdef HAVE_PROCFS
    147 	if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
    148 		char procname[128];
    149 
    150 		(void) snprintf(procname, sizeof (procname),
    151 		    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
    152 		threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
    153 	}
    154 
    155 	(void) pread(threadflow->tf_lwpusagefd,
    156 	    &threadflow->tf_susage,
    157 	    sizeof (struct prusage), 0);
    158 
    159 	/* Compute overhead time in this thread around op */
    160 	if (threadflow->tf_eusage.pr_stime.tv_nsec) {
    161 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
    162 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
    163 		    threadflow->tf_susage.pr_utime) +
    164 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
    165 		    threadflow->tf_susage.pr_ttime) +
    166 		    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
    167 		    threadflow->tf_susage.pr_stime);
    168 	}
    169 #endif
    170 	/* Start of op for this thread */
    171 	threadflow->tf_stime = gethrtime();
    172 }
    173 
    174 flowstat_t controlstats;
    175 pthread_mutex_t controlstats_lock;
    176 static int controlstats_zeroed = 0;
    177 
    178 /*
    179  * Updates flowop's latency statistics, using saved start
    180  * time and current high resolution time. Updates flowop's
    181  * io count and transferred bytes statistics. Also updates
    182  * threadflow's and flowop's cumulative read or write byte
    183  * and io count statistics.
    184  */
    185 void
    186 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
    187 {
    188 	hrtime_t t;
    189 
    190 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
    191 	    (gethrtime() - threadflow->tf_stime);
    192 #ifdef HAVE_PROCFS
    193 	if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
    194 	    sizeof (struct prusage), 0)) != sizeof (struct prusage))
    195 		filebench_log(LOG_ERROR, "cannot read /proc");
    196 
    197 	t =
    198 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
    199 	    threadflow->tf_eusage.pr_utime) +
    200 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
    201 	    threadflow->tf_eusage.pr_ttime) +
    202 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
    203 	    threadflow->tf_eusage.pr_stime);
    204 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
    205 
    206 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
    207 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
    208 	    threadflow->tf_eusage.pr_tftime) +
    209 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
    210 	    threadflow->tf_eusage.pr_dftime) +
    211 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
    212 	    threadflow->tf_eusage.pr_kftime) +
    213 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
    214 	    threadflow->tf_eusage.pr_kftime) +
    215 	    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
    216 	    threadflow->tf_eusage.pr_slptime);
    217 #endif
    218 
    219 	flowop->fo_stats.fs_count++;
    220 	flowop->fo_stats.fs_bytes += bytes;
    221 	(void) ipc_mutex_lock(&controlstats_lock);
    222 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
    223 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
    224 		controlstats.fs_count++;
    225 		controlstats.fs_bytes += bytes;
    226 	}
    227 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
    228 		threadflow->tf_stats.fs_rbytes += bytes;
    229 		threadflow->tf_stats.fs_rcount++;
    230 		flowop->fo_stats.fs_rcount++;
    231 		controlstats.fs_rbytes += bytes;
    232 		controlstats.fs_rcount++;
    233 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
    234 		threadflow->tf_stats.fs_wbytes += bytes;
    235 		threadflow->tf_stats.fs_wcount++;
    236 		flowop->fo_stats.fs_wcount++;
    237 		controlstats.fs_wbytes += bytes;
    238 		controlstats.fs_wcount++;
    239 	}
    240 	(void) ipc_mutex_unlock(&controlstats_lock);
    241 }
    242 
    243 /*
    244  * Calls the flowop's initialization function, pointed to by
    245  * flowop->fo_init.
    246  */
    247 static int
    248 flowop_initflow(flowop_t *flowop)
    249 {
    250 	/*
    251 	 * save static copies of two items, in case they are supplied
    252 	 * from random variables
    253 	 */
    254 	if (!AVD_IS_STRING(flowop->fo_value))
    255 		flowop->fo_constvalue = avd_get_int(flowop->fo_value);
    256 
    257 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
    258 
    259 	if ((*flowop->fo_init)(flowop) < 0) {
    260 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
    261 		    flowop->fo_name, flowop->fo_instance);
    262 		return (-1);
    263 	}
    264 	return (0);
    265 }
    266 
    267 static int
    268 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
    269 {
    270 	flowop_t *flowop = *ops_list_ptr;
    271 
    272 	while (flowop) {
    273 		flowop_t *newflowop;
    274 
    275 		if (flowop == *ops_list_ptr)
    276 			*ops_list_ptr = NULL;
    277 
    278 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
    279 		    flowop, ops_list_ptr, 1, 0);
    280 		if (newflowop == NULL)
    281 			return (FILEBENCH_ERROR);
    282 
    283 		/* check for fo_filename attribute, and resolve if present */
    284 		if (flowop->fo_filename) {
    285 			char *name;
    286 
    287 			name = avd_get_str(flowop->fo_filename);
    288 			newflowop->fo_fileset = fileset_find(name);
    289 
    290 			if (newflowop->fo_fileset == NULL) {
    291 				filebench_log(LOG_ERROR,
    292 				    "flowop %s: file %s not found",
    293 				    newflowop->fo_name, name);
    294 				filebench_shutdown(1);
    295 			}
    296 		}
    297 
    298 		if (flowop_initflow(newflowop) < 0) {
    299 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
    300 			    newflowop->fo_name);
    301 		}
    302 
    303 		flowop = flowop->fo_exec_next;
    304 	}
    305 	return (FILEBENCH_OK);
    306 }
    307 
    308 /*
    309  * Calls the flowop's destruct function, pointed to by
    310  * flowop->fo_destruct.
    311  */
    312 static void
    313 flowop_destructflow(flowop_t *flowop)
    314 {
    315 	(*flowop->fo_destruct)(flowop);
    316 }
    317 
    318 /*
    319  * call the destruct funtions of all the threadflow's flowops,
    320  * if it is still flagged as "running".
    321  */
    322 void
    323 flowop_destruct_all_flows(threadflow_t *threadflow)
    324 {
    325 	flowop_t *flowop;
    326 
    327 	/* wait a moment to give other threads a chance to stop too */
    328 	(void) sleep(1);
    329 
    330 	(void) ipc_mutex_lock(&threadflow->tf_lock);
    331 
    332 	/* prepare to call destruct flow routines, if necessary */
    333 	if (threadflow->tf_running == 0) {
    334 
    335 		/* allready destroyed */
    336 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
    337 		return;
    338 	}
    339 
    340 	flowop = threadflow->tf_thrd_fops;
    341 	threadflow->tf_running = 0;
    342 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
    343 
    344 	while (flowop) {
    345 		flowop_destructflow(flowop);
    346 		flowop = flowop->fo_exec_next;
    347 	}
    348 }
    349 
    350 /*
    351  * The final initialization and main execution loop for the
    352  * worker threads. Sets threadflow and flowop start times,
    353  * waits for all process to start, then creates the runtime
    354  * flowops from those defined by the F language workload
    355  * script. It does some more initialization, then enters a
    356  * loop to repeatedly execute the flowops on the flowop list
    357  * until an abort condition is detected, at which time it exits.
    358  * This is the starting routine for the new worker thread
    359  * created by threadflow_createthread(), and is not currently
    360  * called from anywhere else.
    361  */
    362 void
    363 flowop_start(threadflow_t *threadflow)
    364 {
    365 	flowop_t *flowop;
    366 	size_t memsize;
    367 	int ret = 0;
    368 
    369 #ifdef HAVE_PROCFS
    370 	if (noproc == 0) {
    371 		char procname[128];
    372 		long ctl[2] = {PCSET, PR_MSACCT};
    373 		int pfd;
    374 
    375 		(void) snprintf(procname, sizeof (procname),
    376 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
    377 		pfd = open(procname, O_WRONLY);
    378 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
    379 		(void) close(pfd);
    380 	}
    381 #endif
    382 
    383 	(void) ipc_mutex_lock(&controlstats_lock);
    384 	if (!controlstats_zeroed) {
    385 		(void) memset(&controlstats, 0, sizeof (controlstats));
    386 		controlstats_zeroed = 1;
    387 	}
    388 	(void) ipc_mutex_unlock(&controlstats_lock);
    389 
    390 	flowop = threadflow->tf_thrd_fops;
    391 	threadflow->tf_stats.fs_stime = gethrtime();
    392 	flowop->fo_stats.fs_stime = gethrtime();
    393 
    394 	/* Hold the flowop find lock as reader to prevent lookups */
    395 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
    396 
    397 	/*
    398 	 * Block until all processes have started, acting like
    399 	 * a barrier. The original filebench process initially
    400 	 * holds the run_lock as a reader, preventing any of the
    401 	 * threads from obtaining the writer lock, and hence
    402 	 * passing this point. Once all processes and threads
    403 	 * have been created, the original process unlocks
    404 	 * run_lock, allowing each waiting thread to lock
    405 	 * and then immediately unlock it, then begin running.
    406 	 */
    407 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
    408 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
    409 
    410 	/* Create the runtime flowops from those defined by the script */
    411 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    412 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
    413 	    != FILEBENCH_OK) {
    414 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    415 		filebench_shutdown(1);
    416 		return;
    417 	}
    418 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    419 
    420 	/* Release the find lock as reader to allow lookups */
    421 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
    422 
    423 	/* Set to the start of the new flowop list */
    424 	flowop = threadflow->tf_thrd_fops;
    425 
    426 	threadflow->tf_abort = 0;
    427 	threadflow->tf_running = 1;
    428 
    429 	memsize = (size_t)threadflow->tf_constmemsize;
    430 
    431 	/* If we are going to use ISM, allocate later */
    432 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
    433 		threadflow->tf_mem =
    434 		    ipc_ismmalloc(memsize);
    435 	} else {
    436 		threadflow->tf_mem =
    437 		    malloc(memsize);
    438 	}
    439 
    440 	(void) memset(threadflow->tf_mem, 0, memsize);
    441 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
    442 
    443 #ifdef HAVE_LWPS
    444 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
    445 	    threadflow,
    446 	    _lwp_self());
    447 #endif
    448 
    449 	/* Main filebench worker loop */
    450 	/* CONSTCOND */
    451 	while (1) {
    452 		int i, count;
    453 
    454 		/* Abort if asked */
    455 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
    456 			break;
    457 
    458 		/* Be quiet while stats are gathered */
    459 		if (filebench_shm->shm_bequiet) {
    460 			(void) sleep(1);
    461 			continue;
    462 		}
    463 
    464 		/* Take it easy until everyone is ready to go */
    465 		if (!filebench_shm->shm_procs_running) {
    466 			(void) sleep(1);
    467 			continue;
    468 		}
    469 
    470 		if (flowop == NULL) {
    471 			filebench_log(LOG_ERROR, "flowop_read null flowop");
    472 			return;
    473 		}
    474 
    475 		if (flowop->fo_stats.fs_stime == 0)
    476 			flowop->fo_stats.fs_stime = gethrtime();
    477 
    478 		/* Execute the flowop for fo_iters times */
    479 		count = (int)avd_get_int(flowop->fo_iters);
    480 		for (i = 0; i < count; i++) {
    481 
    482 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
    483 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
    484 			    flowop->fo_instance);
    485 
    486 			ret = (*flowop->fo_func)(threadflow, flowop);
    487 
    488 			/*
    489 			 * Return value FILEBENCH_ERROR means "flowop
    490 			 * failed, stop the filebench run"
    491 			 */
    492 			if (ret == FILEBENCH_ERROR) {
    493 				filebench_log(LOG_ERROR,
    494 				    "%s-%d: flowop %s-%d failed",
    495 				    threadflow->tf_name,
    496 				    threadflow->tf_instance,
    497 				    flowop->fo_name,
    498 				    flowop->fo_instance);
    499 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    500 				threadflow->tf_abort = 1;
    501 				filebench_shm->shm_f_abort =
    502 				    FILEBENCH_ABORT_ERROR;
    503 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    504 				break;
    505 			}
    506 
    507 			/*
    508 			 * Return value of FILEBENCH_NORSC means "stop
    509 			 * the filebench run" if in "end on no work mode",
    510 			 * otherwise it indicates an error
    511 			 */
    512 			if (ret == FILEBENCH_NORSC) {
    513 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    514 				threadflow->tf_abort = FILEBENCH_DONE;
    515 				if (filebench_shm->shm_rmode ==
    516 				    FILEBENCH_MODE_Q1STDONE) {
    517 					filebench_shm->shm_f_abort =
    518 					    FILEBENCH_ABORT_RSRC;
    519 				} else if (filebench_shm->shm_rmode !=
    520 				    FILEBENCH_MODE_QALLDONE) {
    521 					filebench_log(LOG_ERROR1,
    522 					    "WARNING! Run stopped early:\n   "
    523 					    "             flowop %s-%d could "
    524 					    "not obtain a file. Please\n      "
    525 					    "          reduce runtime, "
    526 					    "increase fileset entries "
    527 					    "($nfiles), or switch modes.",
    528 					    flowop->fo_name,
    529 					    flowop->fo_instance);
    530 					filebench_shm->shm_f_abort =
    531 					    FILEBENCH_ABORT_ERROR;
    532 				}
    533 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    534 				break;
    535 			}
    536 
    537 			/*
    538 			 * Return value of FILEBENCH_DONE means "stop
    539 			 * the filebench run without error"
    540 			 */
    541 			if (ret == FILEBENCH_DONE) {
    542 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    543 				threadflow->tf_abort = FILEBENCH_DONE;
    544 				filebench_shm->shm_f_abort =
    545 				    FILEBENCH_ABORT_DONE;
    546 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    547 				break;
    548 			}
    549 
    550 			/*
    551 			 * If we get here and the return is something other
    552 			 * than FILEBENCH_OK, it means a spurious code
    553 			 * was returned, so treat as major error. This
    554 			 * probably indicates a bug in the flowop.
    555 			 */
    556 			if (ret != FILEBENCH_OK) {
    557 				filebench_log(LOG_ERROR,
    558 				    "Flowop %s unexpected return value = %d\n",
    559 				    flowop->fo_name, ret);
    560 				filebench_shm->shm_f_abort =
    561 				    FILEBENCH_ABORT_ERROR;
    562 				break;
    563 			}
    564 		}
    565 
    566 		/* advance to next flowop */
    567 		flowop = flowop->fo_exec_next;
    568 
    569 		/* but if at end of list, start over from the beginning */
    570 		if (flowop == NULL) {
    571 			flowop = threadflow->tf_thrd_fops;
    572 			threadflow->tf_stats.fs_count++;
    573 		}
    574 	}
    575 
    576 #ifdef HAVE_LWPS
    577 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
    578 	    _lwp_self());
    579 #endif
    580 
    581 	/* Tell flowops to destroy locally acquired state */
    582 	flowop_destruct_all_flows(threadflow);
    583 
    584 	pthread_exit(&threadflow->tf_abort);
    585 }
    586 
    587 void
    588 flowop_init(void)
    589 {
    590 	(void) pthread_mutex_init(&controlstats_lock,
    591 	    ipc_mutexattr(IPC_MUTEX_NORMAL));
    592 	flowoplib_init();
    593 }
    594 
    595 /*
    596  * Delete the designated flowop from the thread's flowop list.
    597  */
    598 static void
    599 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
    600 {
    601 	flowop_t *entry = *flowoplist;
    602 	int found = 0;
    603 
    604 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
    605 	    flowop->fo_name,
    606 	    flowop->fo_instance);
    607 
    608 	/* Delete from thread's flowop list */
    609 	if (flowop == *flowoplist) {
    610 		/* First on list */
    611 		*flowoplist = flowop->fo_exec_next;
    612 		filebench_log(LOG_DEBUG_IMPL,
    613 		    "Delete0 flowop: (%s-%d)",
    614 		    flowop->fo_name,
    615 		    flowop->fo_instance);
    616 	} else {
    617 		while (entry->fo_exec_next) {
    618 			filebench_log(LOG_DEBUG_IMPL,
    619 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
    620 			    entry->fo_exec_next->fo_name,
    621 			    entry->fo_exec_next->fo_instance,
    622 			    flowop->fo_name,
    623 			    flowop->fo_instance);
    624 
    625 			if (flowop == entry->fo_exec_next) {
    626 				/* Delete */
    627 				filebench_log(LOG_DEBUG_IMPL,
    628 				    "Deleted0 flowop: (%s-%d)",
    629 				    entry->fo_exec_next->fo_name,
    630 				    entry->fo_exec_next->fo_instance);
    631 				entry->fo_exec_next =
    632 				    entry->fo_exec_next->fo_exec_next;
    633 				break;
    634 			}
    635 			entry = entry->fo_exec_next;
    636 		}
    637 	}
    638 
    639 #ifdef HAVE_PROCFS
    640 	/* Close /proc stats */
    641 	if (flowop->fo_thread)
    642 		(void) close(flowop->fo_thread->tf_lwpusagefd);
    643 #endif
    644 
    645 	/* Delete from global list */
    646 	entry = filebench_shm->shm_flowoplist;
    647 
    648 	if (flowop == filebench_shm->shm_flowoplist) {
    649 		/* First on list */
    650 		filebench_shm->shm_flowoplist = flowop->fo_next;
    651 		found = 1;
    652 	} else {
    653 		while (entry->fo_next) {
    654 			filebench_log(LOG_DEBUG_IMPL,
    655 			    "Delete flowop: (%s-%d) == (%s-%d)",
    656 			    entry->fo_next->fo_name,
    657 			    entry->fo_next->fo_instance,
    658 			    flowop->fo_name,
    659 			    flowop->fo_instance);
    660 
    661 			if (flowop == entry->fo_next) {
    662 				/* Delete */
    663 				entry->fo_next = entry->fo_next->fo_next;
    664 				found = 1;
    665 				break;
    666 			}
    667 
    668 			entry = entry->fo_next;
    669 		}
    670 	}
    671 	if (found) {
    672 		filebench_log(LOG_DEBUG_IMPL,
    673 		    "Deleted flowop: (%s-%d)",
    674 		    flowop->fo_name,
    675 		    flowop->fo_instance);
    676 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
    677 	} else {
    678 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
    679 		    flowop->fo_name,
    680 		    flowop->fo_instance);
    681 	}
    682 }
    683 
    684 /*
    685  * Deletes all the flowops from a flowop list.
    686  */
    687 void
    688 flowop_delete_all(flowop_t **flowoplist)
    689 {
    690 	flowop_t *flowop = *flowoplist;
    691 
    692 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    693 
    694 	while (flowop) {
    695 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
    696 		    flowop->fo_name, flowop->fo_instance);
    697 
    698 		if (flowop->fo_instance &&
    699 		    (flowop->fo_instance == FLOW_MASTER)) {
    700 			flowop = flowop->fo_exec_next;
    701 			continue;
    702 		}
    703 		flowop_delete(flowoplist, flowop);
    704 		flowop = flowop->fo_exec_next;
    705 	}
    706 
    707 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    708 }
    709 
    710 /*
    711  * Allocates a flowop entity and initializes it with inherited
    712  * contents from the "inherit" flowop, if it is supplied, or
    713  * with zeros otherwise. In either case the fo_next and fo_exec_next
    714  * pointers are set to NULL, and fo_thread is set to point to
    715  * the owning threadflow. The initialized flowop is placed at
    716  * the head of the global flowop list, and also placed on the
    717  * tail of the supplied local flowop list, which will either
    718  * be a threadflow's tf_thrd_fops list or a composite flowop's
    719  * fo_comp_fops list. The routine locks the flowop's fo_lock and
    720  * leaves it held on return. If successful, it returns a pointer
    721  * to the allocated and initialized flowop, otherwise it returns NULL.
    722  *
    723  * filebench_shm->shm_flowop_lock must be held by caller.
    724  */
    725 static flowop_t *
    726 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
    727     flowop_t **flowoplist_hdp, int instance, int type)
    728 {
    729 	flowop_t *flowop;
    730 
    731 	if (name == NULL)
    732 		return (NULL);
    733 
    734 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
    735 		filebench_log(LOG_ERROR,
    736 		    "flowop_define: Can't malloc flowop");
    737 		return (NULL);
    738 	}
    739 
    740 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
    741 	    name, instance, flowop);
    742 
    743 	if (flowop == NULL)
    744 		return (NULL);
    745 
    746 	if (inherit) {
    747 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
    748 		(void) pthread_mutex_init(&flowop->fo_lock,
    749 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
    750 		(void) ipc_mutex_lock(&flowop->fo_lock);
    751 		flowop->fo_next = NULL;
    752 		flowop->fo_exec_next = NULL;
    753 		filebench_log(LOG_DEBUG_IMPL,
    754 		    "flowop %s-%d calling init", name, instance);
    755 	} else {
    756 		(void) memset(flowop, 0, sizeof (flowop_t));
    757 		flowop->fo_iters = avd_int_alloc(1);
    758 		flowop->fo_type = type;
    759 		(void) pthread_mutex_init(&flowop->fo_lock,
    760 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
    761 		(void) ipc_mutex_lock(&flowop->fo_lock);
    762 	}
    763 
    764 	/* Create backpointer to thread */
    765 	flowop->fo_thread = threadflow;
    766 
    767 	/* Add flowop to global list */
    768 	if (filebench_shm->shm_flowoplist == NULL) {
    769 		filebench_shm->shm_flowoplist = flowop;
    770 		flowop->fo_next = NULL;
    771 	} else {
    772 		flowop->fo_next = filebench_shm->shm_flowoplist;
    773 		filebench_shm->shm_flowoplist = flowop;
    774 	}
    775 
    776 	(void) strcpy(flowop->fo_name, name);
    777 	flowop->fo_instance = instance;
    778 
    779 	if (flowoplist_hdp == NULL)
    780 		return (flowop);
    781 
    782 	/* Add flowop to thread op list */
    783 	if (*flowoplist_hdp == NULL) {
    784 		*flowoplist_hdp = flowop;
    785 		flowop->fo_exec_next = NULL;
    786 	} else {
    787 		flowop_t *flowend;
    788 
    789 		/* Find the end of the thread list */
    790 		flowend = *flowoplist_hdp;
    791 		while (flowend->fo_exec_next != NULL)
    792 			flowend = flowend->fo_exec_next;
    793 		flowend->fo_exec_next = flowop;
    794 		flowop->fo_exec_next = NULL;
    795 	}
    796 
    797 	return (flowop);
    798 }
    799 
    800 /*
    801  * Calls flowop_define_common() to allocate and initialize a
    802  * flowop, and holds the shared flowop_lock during the call.
    803  * It releases the created flowop's fo_lock when done.
    804  */
    805 flowop_t *
    806 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
    807     flowop_t **flowoplist_hdp, int instance, int type)
    808 {
    809 	flowop_t	*flowop;
    810 
    811 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    812 	flowop = flowop_define_common(threadflow, name,
    813 	    inherit, flowoplist_hdp, instance, type);
    814 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    815 
    816 	if (flowop == NULL)
    817 		return (NULL);
    818 
    819 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    820 
    821 	return (flowop);
    822 }
    823 
    824 /*
    825  * Calls flowop_define_common() to allocate and initialize a
    826  * composite flowop, and holds the shared flowop_lock during the call.
    827  * It releases the created flowop's fo_lock when done.
    828  */
    829 flowop_t *
    830 flowop_new_composite_define(char *name)
    831 {
    832 	flowop_t *flowop;
    833 
    834 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    835 	flowop = flowop_define_common(NULL, name,
    836 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
    837 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    838 
    839 	if (flowop == NULL)
    840 		return (NULL);
    841 
    842 	flowop->fo_func = flowop_composite;
    843 	flowop->fo_init = flowop_composite_init;
    844 	flowop->fo_destruct = flowop_composite_destruct;
    845 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    846 
    847 	return (flowop);
    848 }
    849 
    850 /*
    851  * Attempts to take a write lock on the flowop_find_lock that is
    852  * defined in interprocess shared memory. Since each call to
    853  * flowop_start() holds a read lock on flowop_find_lock, this
    854  * routine effectively blocks until all instances of
    855  * flowop_start() have finished. The flowop_find() routine calls
    856  * this routine so that flowops won't be searched for until all
    857  * flowops have been created by flowop_start.
    858  */
    859 static void
    860 flowop_find_barrier(void)
    861 {
    862 	/* Block on wrlock to ensure find waits for all creates */
    863 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
    864 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
    865 }
    866 
    867 /*
    868  * Returns a list of flowops named "name" from the master
    869  * flowop list.
    870  */
    871 flowop_t *
    872 flowop_find(char *name)
    873 {
    874 	flowop_t *flowop;
    875 	flowop_t *result = NULL;
    876 
    877 	flowop_find_barrier();
    878 
    879 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    880 
    881 	flowop = filebench_shm->shm_flowoplist;
    882 
    883 	while (flowop) {
    884 		if (strcmp(name, flowop->fo_name) == 0) {
    885 
    886 			/* Add flowop to result list */
    887 			if (result == NULL) {
    888 				result = flowop;
    889 				flowop->fo_resultnext = NULL;
    890 			} else {
    891 				flowop->fo_resultnext = result;
    892 				result = flowop;
    893 			}
    894 		}
    895 		flowop = flowop->fo_next;
    896 	}
    897 
    898 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    899 
    900 
    901 	return (result);
    902 }
    903 
    904 /*
    905  * Returns a pointer to the specified instance of flowop
    906  * "name" from the global list.
    907  */
    908 flowop_t *
    909 flowop_find_one(char *name, int instance)
    910 {
    911 	flowop_t *test_flowop;
    912 
    913 	flowop_find_barrier();
    914 
    915 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    916 
    917 	test_flowop = filebench_shm->shm_flowoplist;
    918 
    919 	while (test_flowop) {
    920 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
    921 		    (instance == test_flowop->fo_instance))
    922 			break;
    923 
    924 		test_flowop = test_flowop->fo_next;
    925 	}
    926 
    927 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    928 
    929 	return (test_flowop);
    930 }
    931 
    932 /*
    933  * recursively searches through lists of flowops on a given thread
    934  * and those on any included composite flowops for the named flowop.
    935  * either returns with a pointer to the named flowop or NULL if it
    936  * cannot be found.
    937  */
    938 static flowop_t *
    939 flowop_recurse_search(char *path, char *name, flowop_t *list)
    940 {
    941 	flowop_t *test_flowop;
    942 	char fullname[MAXPATHLEN];
    943 
    944 	test_flowop = list;
    945 
    946 	/*
    947 	 * when searching a list of inner flowops, "path" is the fullname
    948 	 * of the containing composite flowop. Use it to form the
    949 	 * full name of the inner flowop to search for.
    950 	 */
    951 	if (path) {
    952 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
    953 			filebench_log(LOG_ERROR,
    954 			    "composite flowop path name %s.%s too long",
    955 			    path, name);
    956 			return (NULL);
    957 		}
    958 
    959 		/* create composite_name.name for recursive search */
    960 		(void) strcpy(fullname, path);
    961 		(void) strcat(fullname, ".");
    962 		(void) strcat(fullname, name);
    963 	} else {
    964 		(void) strcpy(fullname, name);
    965 	}
    966 
    967 	/*
    968 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
    969 	 * list or fo_comp_fops (inner flowop) list.
    970 	 */
    971 	while (test_flowop) {
    972 		if (strcmp(fullname, test_flowop->fo_name) == 0)
    973 			return (test_flowop);
    974 
    975 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
    976 			flowop_t *found_flowop;
    977 
    978 			found_flowop = flowop_recurse_search(
    979 			    test_flowop->fo_name, name,
    980 			    test_flowop->fo_comp_fops);
    981 
    982 			if (found_flowop)
    983 				return (found_flowop);
    984 		}
    985 		test_flowop = test_flowop->fo_exec_next;
    986 	}
    987 
    988 	/* not found here or on any child lists */
    989 	return (NULL);
    990 }
    991 
    992 /*
    993  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
    994  * list of flowops. Returns the named flowop if found, or NULL.
    995  */
    996 flowop_t *
    997 flowop_find_from_list(char *name, flowop_t *list)
    998 {
    999 	flowop_t *found_flowop;
   1000 
   1001 	flowop_find_barrier();
   1002 
   1003 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
   1004 
   1005 	found_flowop = flowop_recurse_search(NULL, name, list);
   1006 
   1007 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
   1008 
   1009 	return (found_flowop);
   1010 }
   1011 
   1012 /*
   1013  * Composite flowop method. Does one pass through its list of
   1014  * inner flowops per iteration.
   1015  */
   1016 static int
   1017 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
   1018 {
   1019 	flowop_t	*inner_flowop;
   1020 
   1021 	/* get the first flowop in the list */
   1022 	inner_flowop = flowop->fo_comp_fops;
   1023 
   1024 	/* make a pass through the list of sub flowops */
   1025 	while (inner_flowop) {
   1026 		int	i, count;
   1027 
   1028 		/* Abort if asked */
   1029 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
   1030 			return (FILEBENCH_DONE);
   1031 
   1032 		if (inner_flowop->fo_stats.fs_stime == 0)
   1033 			inner_flowop->fo_stats.fs_stime = gethrtime();
   1034 
   1035 		/* Execute the flowop for fo_iters times */
   1036 		count = (int)avd_get_int(inner_flowop->fo_iters);
   1037 		for (i = 0; i < count; i++) {
   1038 
   1039 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
   1040 			    "%s-%d", threadflow->tf_name,
   1041 			    inner_flowop->fo_name,
   1042 			    inner_flowop->fo_instance);
   1043 
   1044 			switch ((*inner_flowop->fo_func)(threadflow,
   1045 			    inner_flowop)) {
   1046 
   1047 			/* all done */
   1048 			case FILEBENCH_DONE:
   1049 				return (FILEBENCH_DONE);
   1050 
   1051 			/* quit if inner flowop limit reached */
   1052 			case FILEBENCH_NORSC:
   1053 				return (FILEBENCH_NORSC);
   1054 
   1055 			/* quit on inner flowop error */
   1056 			case FILEBENCH_ERROR:
   1057 				filebench_log(LOG_ERROR,
   1058 				    "inner flowop %s failed",
   1059 				    inner_flowop->fo_name);
   1060 				return (FILEBENCH_ERROR);
   1061 
   1062 			/* otherwise keep going */
   1063 			default:
   1064 				break;
   1065 			}
   1066 
   1067 		}
   1068 
   1069 		/* advance to next flowop */
   1070 		inner_flowop = inner_flowop->fo_exec_next;
   1071 	}
   1072 
   1073 	/* finished with this pass */
   1074 	return (FILEBENCH_OK);
   1075 }
   1076 
   1077 /*
   1078  * Composite flowop initialization. Creates runtime inner flowops
   1079  * from prototype inner flowops.
   1080  */
   1081 static int
   1082 flowop_composite_init(flowop_t *flowop)
   1083 {
   1084 	int err;
   1085 
   1086 	err = flowop_create_runtime_flowops(flowop->fo_thread,
   1087 	    &flowop->fo_comp_fops);
   1088 	if (err != FILEBENCH_OK)
   1089 		return (err);
   1090 
   1091 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1092 	return (0);
   1093 }
   1094 
   1095 /*
   1096  * clean up inner flowops
   1097  */
   1098 static void
   1099 flowop_composite_destruct(flowop_t *flowop)
   1100 {
   1101 	flowop_t *inner_flowop = flowop->fo_comp_fops;
   1102 
   1103 	while (inner_flowop) {
   1104 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
   1105 		    inner_flowop->fo_name, inner_flowop->fo_instance);
   1106 
   1107 		if (inner_flowop->fo_instance &&
   1108 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
   1109 			inner_flowop = inner_flowop->fo_exec_next;
   1110 			continue;
   1111 		}
   1112 		flowop_delete(&flowop->fo_comp_f