Home | History | Annotate | Download | only in common
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include "config.h"
     27 
     28 #ifdef HAVE_LWPS
     29 #include <sys/lwp.h>
     30 #endif
     31 #include <fcntl.h>
     32 #include "filebench.h"
     33 #include "flowop.h"
     34 #include "stats.h"
     35 
     36 #ifdef LINUX_PORT
     37 #include <sys/types.h>
     38 #include <linux/unistd.h>
     39 #endif
     40 
     41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
     42     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
     43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
     44 static int flowop_composite_init(flowop_t *flowop);
     45 static void flowop_composite_destruct(flowop_t *flowop);
     46 
     47 /*
     48  * A collection of flowop support functions. The actual code that
     49  * implements the various flowops is in flowop_library.c.
     50  *
     51  * Routines for defining, creating, initializing and destroying
     52  * flowops, cyclically invoking the flowops on each threadflow's flowop
     53  * list, collecting statistics about flowop execution, and other
     54  * housekeeping duties are included in this file.
     55  *
     56  * User Defined Composite Flowops
     57  *    The ability to define new flowops as lists of built-in or previously
     58  * defined flowops has been added to Filebench. In a sense they are like
     59  * in-line subroutines, which can have default attributes set at definition
     60  * time and passed arguments at invocation time. Like other flowops (and
     61  * unlike conventional subroutines) you can invoke them with an iteration
     62  * count (the "iter" attribute), and they will loop through their associated
     63  * list of flowops for "iter" number of times each time they are encountered
     64  * in the thread or outer composite flowop which invokes them.
     65  *
     66  * Composite flowops are created with a "define" command, are given a name,
     67  * optional default attributes, and local variable definitions on the
     68  * "define" command line, followed by a brace enclosed list of flowops
     69  * to execute. The enclosed flowops may include attributes that reference
     70  * the local variables, as well as constants and global variables.
     71  *
     72  * Composite flowops are used pretty much like regular flowops, but you can
     73  * also set local variables to constants or global variables ($local_var =
     74  * [$var | $random_var | string | boolean | integer | double]) as part of
     75  * the invocation. Thus each invocation can pass customized values to its
     76  * inner flowops, greatly increasing their generality.
     77  *
     78  * All flowops are placed on a global, singly linked list, with fo_next
     79  * being the link pointer for this list. The are also placed on a private
     80  * list for the thread or composite flowop they are part of. The tf_thrd_fops
     81  * pointer in the thread will point to the list of top level flowops in the
     82  * thread, which are linked together by fo_exec_next. If any of these flowops
     83  * are composite flowops, they will have a list of second level flowops rooted
     84  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
     85  * of all flowops, and an n-arry tree of threads, composite flowops, and
     86  * flowops, with composite flowops being the branch nodes in the tree.
     87  *
     88  * To illustrate, if we have three first level flowops, the first of which is
     89  * a composite flowop consisting of two other flowops, we get:
     90  *
     91  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
     92  *			   flowop->fo_comp_fops		    |
     93  *				    |			    V
     94  *				    |			flowop->fo_exec_next
     95  *				    |
     96  *				    V
     97  *				flowop->fo_exec_next -> flowop->fo_exec_next
     98  *
     99  * And all five flowops (plus others from any other threads) are on a global
    100  * list linked with fo_next.
    101  */
    102 
    103 /*
    104  * Prints the name and instance number of each flowop in
    105  * the supplied list to the filebench log.
    106  */
    107 int
    108 flowop_printlist(flowop_t *list)
    109 {
    110 	flowop_t *flowop = list;
    111 
    112 	while (flowop) {
    113 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
    114 		    flowop->fo_name, flowop->fo_instance);
    115 		flowop = flowop->fo_exec_next;
    116 	}
    117 	return (0);
    118 }
    119 
    120 /*
    121  * Prints the name and instance number of all flowops on
    122  * the master flowop list to the console and the filebench log.
    123  */
    124 void
    125 flowop_printall(void)
    126 {
    127 	flowop_t *flowop = filebench_shm->shm_flowoplist;
    128 
    129 	while (flowop) {
    130 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
    131 		    flowop->fo_name, flowop->fo_instance);
    132 		flowop = flowop->fo_next;
    133 	}
    134 }
    135 
    136 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
    137 					(e.tv_nsec - s.tv_nsec))
    138 /*
    139  * Puts current high resolution time in start time entry
    140  * for threadflow and may also calculate running filebench
    141  * overhead statistics.
    142  */
    143 void
    144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
    145 {
    146 #ifdef HAVE_PROCFS
    147 	if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
    148 		if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
    149 			char procname[128];
    150 
    151 			(void) snprintf(procname, sizeof (procname),
    152 			    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
    153 			threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
    154 		}
    155 
    156 		(void) pread(threadflow->tf_lwpusagefd,
    157 		    &threadflow->tf_susage,
    158 		    sizeof (struct prusage), 0);
    159 
    160 		/* Compute overhead time in this thread around op */
    161 		if (threadflow->tf_eusage.pr_stime.tv_nsec) {
    162 			flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
    163 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
    164 			    threadflow->tf_susage.pr_utime) +
    165 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
    166 			    threadflow->tf_susage.pr_ttime) +
    167 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
    168 			    threadflow->tf_susage.pr_stime);
    169 		}
    170 	}
    171 #endif
    172 
    173 	/* Start of op for this thread */
    174 	threadflow->tf_stime = gethrtime();
    175 }
    176 
    177 flowstat_t controlstats;
    178 pthread_mutex_t controlstats_lock;
    179 static int controlstats_zeroed = 0;
    180 
    181 /*
    182  * Updates flowop's latency statistics, using saved start
    183  * time and current high resolution time. Updates flowop's
    184  * io count and transferred bytes statistics. Also updates
    185  * threadflow's and flowop's cumulative read or write byte
    186  * and io count statistics.
    187  */
    188 void
    189 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
    190 {
    191 	hrtime_t t;
    192 
    193 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
    194 	    (gethrtime() - threadflow->tf_stime);
    195 #ifdef HAVE_PROCFS
    196 	if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
    197 		if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
    198 		    sizeof (struct prusage), 0)) != sizeof (struct prusage))
    199 			filebench_log(LOG_ERROR, "cannot read /proc");
    200 
    201 		t =
    202 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
    203 		    threadflow->tf_eusage.pr_utime) +
    204 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
    205 		    threadflow->tf_eusage.pr_ttime) +
    206 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
    207 		    threadflow->tf_eusage.pr_stime);
    208 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
    209 
    210 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
    211 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
    212 		    threadflow->tf_eusage.pr_tftime) +
    213 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
    214 		    threadflow->tf_eusage.pr_dftime) +
    215 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
    216 		    threadflow->tf_eusage.pr_kftime) +
    217 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
    218 		    threadflow->tf_eusage.pr_kftime) +
    219 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
    220 		    threadflow->tf_eusage.pr_slptime);
    221 	}
    222 #endif
    223 
    224 	flowop->fo_stats.fs_count++;
    225 	flowop->fo_stats.fs_bytes += bytes;
    226 	(void) ipc_mutex_lock(&controlstats_lock);
    227 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
    228 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
    229 		controlstats.fs_count++;
    230 		controlstats.fs_bytes += bytes;
    231 	}
    232 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
    233 		threadflow->tf_stats.fs_rbytes += bytes;
    234 		threadflow->tf_stats.fs_rcount++;
    235 		flowop->fo_stats.fs_rcount++;
    236 		controlstats.fs_rbytes += bytes;
    237 		controlstats.fs_rcount++;
    238 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
    239 		threadflow->tf_stats.fs_wbytes += bytes;
    240 		threadflow->tf_stats.fs_wcount++;
    241 		flowop->fo_stats.fs_wcount++;
    242 		controlstats.fs_wbytes += bytes;
    243 		controlstats.fs_wcount++;
    244 	}
    245 	(void) ipc_mutex_unlock(&controlstats_lock);
    246 }
    247 
    248 /*
    249  * Calls the flowop's initialization function, pointed to by
    250  * flowop->fo_init.
    251  */
    252 static int
    253 flowop_initflow(flowop_t *flowop)
    254 {
    255 	/*
    256 	 * save static copies of two items, in case they are supplied
    257 	 * from random variables
    258 	 */
    259 	if (!AVD_IS_STRING(flowop->fo_value))
    260 		flowop->fo_constvalue = avd_get_int(flowop->fo_value);
    261 
    262 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
    263 
    264 	if ((*flowop->fo_init)(flowop) < 0) {
    265 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
    266 		    flowop->fo_name, flowop->fo_instance);
    267 		return (-1);
    268 	}
    269 	return (0);
    270 }
    271 
    272 static int
    273 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
    274 {
    275 	flowop_t *flowop = *ops_list_ptr;
    276 
    277 	while (flowop) {
    278 		flowop_t *newflowop;
    279 
    280 		if (flowop == *ops_list_ptr)
    281 			*ops_list_ptr = NULL;
    282 
    283 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
    284 		    flowop, ops_list_ptr, 1, 0);
    285 		if (newflowop == NULL)
    286 			return (FILEBENCH_ERROR);
    287 
    288 		/* check for fo_filename attribute, and resolve if present */
    289 		if (flowop->fo_filename) {
    290 			char *name;
    291 
    292 			name = avd_get_str(flowop->fo_filename);
    293 			newflowop->fo_fileset = fileset_find(name);
    294 
    295 			if (newflowop->fo_fileset == NULL) {
    296 				filebench_log(LOG_ERROR,
    297 				    "flowop %s: file %s not found",
    298 				    newflowop->fo_name, name);
    299 				filebench_shutdown(1);
    300 			}
    301 		}
    302 
    303 		if (flowop_initflow(newflowop) < 0) {
    304 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
    305 			    newflowop->fo_name);
    306 		}
    307 
    308 		flowop = flowop->fo_exec_next;
    309 	}
    310 	return (FILEBENCH_OK);
    311 }
    312 
    313 /*
    314  * Calls the flowop's destruct function, pointed to by
    315  * flowop->fo_destruct.
    316  */
    317 static void
    318 flowop_destructflow(flowop_t *flowop)
    319 {
    320 	(*flowop->fo_destruct)(flowop);
    321 }
    322 
    323 /*
    324  * call the destruct funtions of all the threadflow's flowops,
    325  * if it is still flagged as "running".
    326  */
    327 void
    328 flowop_destruct_all_flows(threadflow_t *threadflow)
    329 {
    330 	flowop_t *flowop;
    331 
    332 	/* wait a moment to give other threads a chance to stop too */
    333 	(void) sleep(1);
    334 
    335 	(void) ipc_mutex_lock(&threadflow->tf_lock);
    336 
    337 	/* prepare to call destruct flow routines, if necessary */
    338 	if (threadflow->tf_running == 0) {
    339 
    340 		/* allready destroyed */
    341 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
    342 		return;
    343 	}
    344 
    345 	flowop = threadflow->tf_thrd_fops;
    346 	threadflow->tf_running = 0;
    347 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
    348 
    349 	while (flowop) {
    350 		flowop_destructflow(flowop);
    351 		flowop = flowop->fo_exec_next;
    352 	}
    353 }
    354 
    355 /*
    356  * The final initialization and main execution loop for the
    357  * worker threads. Sets threadflow and flowop start times,
    358  * waits for all process to start, then creates the runtime
    359  * flowops from those defined by the F language workload
    360  * script. It does some more initialization, then enters a
    361  * loop to repeatedly execute the flowops on the flowop list
    362  * until an abort condition is detected, at which time it exits.
    363  * This is the starting routine for the new worker thread
    364  * created by threadflow_createthread(), and is not currently
    365  * called from anywhere else.
    366  */
    367 void
    368 flowop_start(threadflow_t *threadflow)
    369 {
    370 	flowop_t *flowop;
    371 	size_t memsize;
    372 	int ret = FILEBENCH_OK;
    373 
    374 #ifdef HAVE_PROCFS
    375 	if (noproc == 0) {
    376 		char procname[128];
    377 		long ctl[2] = {PCSET, PR_MSACCT};
    378 		int pfd;
    379 
    380 		(void) snprintf(procname, sizeof (procname),
    381 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
    382 		pfd = open(procname, O_WRONLY);
    383 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
    384 		(void) close(pfd);
    385 	}
    386 #endif
    387 
    388 	(void) ipc_mutex_lock(&controlstats_lock);
    389 	if (!controlstats_zeroed) {
    390 		(void) memset(&controlstats, 0, sizeof (controlstats));
    391 		controlstats_zeroed = 1;
    392 	}
    393 	(void) ipc_mutex_unlock(&controlstats_lock);
    394 
    395 	flowop = threadflow->tf_thrd_fops;
    396 	threadflow->tf_stats.fs_stime = gethrtime();
    397 	flowop->fo_stats.fs_stime = gethrtime();
    398 
    399 	/* Hold the flowop find lock as reader to prevent lookups */
    400 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
    401 
    402 	/*
    403 	 * Block until all processes have started, acting like
    404 	 * a barrier. The original filebench process initially
    405 	 * holds the run_lock as a reader, preventing any of the
    406 	 * threads from obtaining the writer lock, and hence
    407 	 * passing this point. Once all processes and threads
    408 	 * have been created, the original process unlocks
    409 	 * run_lock, allowing each waiting thread to lock
    410 	 * and then immediately unlock it, then begin running.
    411 	 */
    412 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
    413 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
    414 
    415 	/* Create the runtime flowops from those defined by the script */
    416 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    417 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
    418 	    != FILEBENCH_OK) {
    419 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    420 		filebench_shutdown(1);
    421 		return;
    422 	}
    423 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    424 
    425 	/* Release the find lock as reader to allow lookups */
    426 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
    427 
    428 	/* Set to the start of the new flowop list */
    429 	flowop = threadflow->tf_thrd_fops;
    430 
    431 	threadflow->tf_abort = 0;
    432 	threadflow->tf_running = 1;
    433 
    434 	memsize = (size_t)threadflow->tf_constmemsize;
    435 
    436 	/* If we are going to use ISM, allocate later */
    437 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
    438 		threadflow->tf_mem =
    439 		    ipc_ismmalloc(memsize);
    440 	} else {
    441 		threadflow->tf_mem =
    442 		    malloc(memsize);
    443 	}
    444 
    445 	(void) memset(threadflow->tf_mem, 0, memsize);
    446 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
    447 
    448 #ifdef HAVE_LWPS
    449 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
    450 	    threadflow,
    451 	    _lwp_self());
    452 #endif
    453 
    454 	/* Main filebench worker loop */
    455 	while (ret == FILEBENCH_OK) {
    456 		int i, count;
    457 
    458 		/* Abort if asked */
    459 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
    460 			break;
    461 
    462 		/* Be quiet while stats are gathered */
    463 		if (filebench_shm->shm_bequiet) {
    464 			(void) sleep(1);
    465 			continue;
    466 		}
    467 
    468 		/* Take it easy until everyone is ready to go */
    469 		if (!filebench_shm->shm_procs_running) {
    470 			(void) sleep(1);
    471 			continue;
    472 		}
    473 
    474 		if (flowop == NULL) {
    475 			filebench_log(LOG_ERROR, "flowop_read null flowop");
    476 			return;
    477 		}
    478 
    479 		if (flowop->fo_stats.fs_stime == 0)
    480 			flowop->fo_stats.fs_stime = gethrtime();
    481 
    482 		/* Execute the flowop for fo_iters times */
    483 		count = (int)avd_get_int(flowop->fo_iters);
    484 		for (i = 0; i < count; i++) {
    485 
    486 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
    487 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
    488 			    flowop->fo_instance);
    489 
    490 			ret = (*flowop->fo_func)(threadflow, flowop);
    491 
    492 			/*
    493 			 * Return value FILEBENCH_ERROR means "flowop
    494 			 * failed, stop the filebench run"
    495 			 */
    496 			if (ret == FILEBENCH_ERROR) {
    497 				filebench_log(LOG_ERROR,
    498 				    "%s-%d: flowop %s-%d failed",
    499 				    threadflow->tf_name,
    500 				    threadflow->tf_instance,
    501 				    flowop->fo_name,
    502 				    flowop->fo_instance);
    503 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    504 				threadflow->tf_abort = 1;
    505 				filebench_shm->shm_f_abort =
    506 				    FILEBENCH_ABORT_ERROR;
    507 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    508 				break;
    509 			}
    510 
    511 			/*
    512 			 * Return value of FILEBENCH_NORSC means "stop
    513 			 * the filebench run" if in "end on no work mode",
    514 			 * otherwise it indicates an error
    515 			 */
    516 			if (ret == FILEBENCH_NORSC) {
    517 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    518 				threadflow->tf_abort = FILEBENCH_DONE;
    519 				if (filebench_shm->shm_rmode ==
    520 				    FILEBENCH_MODE_Q1STDONE) {
    521 					filebench_shm->shm_f_abort =
    522 					    FILEBENCH_ABORT_RSRC;
    523 				} else if (filebench_shm->shm_rmode !=
    524 				    FILEBENCH_MODE_QALLDONE) {
    525 					filebench_log(LOG_ERROR1,
    526 					    "WARNING! Run stopped early:\n   "
    527 					    "             flowop %s-%d could "
    528 					    "not obtain a file. Please\n      "
    529 					    "          reduce runtime, "
    530 					    "increase fileset entries "
    531 					    "($nfiles), or switch modes.",
    532 					    flowop->fo_name,
    533 					    flowop->fo_instance);
    534 					filebench_shm->shm_f_abort =
    535 					    FILEBENCH_ABORT_ERROR;
    536 				}
    537 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    538 				break;
    539 			}
    540 
    541 			/*
    542 			 * Return value of FILEBENCH_DONE means "stop
    543 			 * the filebench run without error"
    544 			 */
    545 			if (ret == FILEBENCH_DONE) {
    546 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    547 				threadflow->tf_abort = FILEBENCH_DONE;
    548 				filebench_shm->shm_f_abort =
    549 				    FILEBENCH_ABORT_DONE;
    550 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    551 				break;
    552 			}
    553 
    554 			/*
    555 			 * If we get here and the return is something other
    556 			 * than FILEBENCH_OK, it means a spurious code
    557 			 * was returned, so treat as major error. This
    558 			 * probably indicates a bug in the flowop.
    559 			 */
    560 			if (ret != FILEBENCH_OK) {
    561 				filebench_log(LOG_ERROR,
    562 				    "Flowop %s unexpected return value = %d\n",
    563 				    flowop->fo_name, ret);
    564 				filebench_shm->shm_f_abort =
    565 				    FILEBENCH_ABORT_ERROR;
    566 				break;
    567 			}
    568 		}
    569 
    570 		/* advance to next flowop */
    571 		flowop = flowop->fo_exec_next;
    572 
    573 		/* but if at end of list, start over from the beginning */
    574 		if (flowop == NULL) {
    575 			flowop = threadflow->tf_thrd_fops;
    576 			threadflow->tf_stats.fs_count++;
    577 		}
    578 	}
    579 
    580 #ifdef HAVE_LWPS
    581 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
    582 	    _lwp_self());
    583 #endif
    584 
    585 	/* Tell flowops to destroy locally acquired state */
    586 	flowop_destruct_all_flows(threadflow);
    587 
    588 	pthread_exit(&threadflow->tf_abort);
    589 }
    590 
    591 void flowoplib_flowinit(void);
    592 void fb_lfs_flowinit(void);
    593 
    594 void
    595 flowop_init(void)
    596 {
    597 	(void) pthread_mutex_init(&controlstats_lock,
    598 	    ipc_mutexattr(IPC_MUTEX_NORMAL));
    599 	flowoplib_flowinit();
    600 }
    601 
    602 static int plugin_flowinit_done = FALSE;
    603 
    604 /*
    605  * Initialize any "plug-in" flowops. Called when the first "create fileset"
    606  * command is encountered.
    607  */
    608 void
    609 flowop_plugin_flowinit(void)
    610 {
    611 	if (plugin_flowinit_done)
    612 		return;
    613 
    614 	plugin_flowinit_done = TRUE;
    615 
    616 	switch (filebench_shm->shm_filesys_type) {
    617 	case LOCAL_FS_PLUG:
    618 		fb_lfs_flowinit();
    619 		break;
    620 
    621 	case NFS3_PLUG:
    622 	case NFS4_PLUG:
    623 	case CIFS_PLUG:
    624 		break;
    625 	}
    626 }
    627 
    628 
    629 /*
    630  * Delete the designated flowop from the thread's flowop list.
    631  */
    632 static void
    633 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
    634 {
    635 	flowop_t *entry = *flowoplist;
    636 	int found = 0;
    637 
    638 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
    639 	    flowop->fo_name,
    640 	    flowop->fo_instance);
    641 
    642 	/* Delete from thread's flowop list */
    643 	if (flowop == *flowoplist) {
    644 		/* First on list */
    645 		*flowoplist = flowop->fo_exec_next;
    646 		filebench_log(LOG_DEBUG_IMPL,
    647 		    "Delete0 flowop: (%s-%d)",
    648 		    flowop->fo_name,
    649 		    flowop->fo_instance);
    650 	} else {
    651 		while (entry->fo_exec_next) {
    652 			filebench_log(LOG_DEBUG_IMPL,
    653 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
    654 			    entry->fo_exec_next->fo_name,
    655 			    entry->fo_exec_next->fo_instance,
    656 			    flowop->fo_name,
    657 			    flowop->fo_instance);
    658 
    659 			if (flowop == entry->fo_exec_next) {
    660 				/* Delete */
    661 				filebench_log(LOG_DEBUG_IMPL,
    662 				    "Deleted0 flowop: (%s-%d)",
    663 				    entry->fo_exec_next->fo_name,
    664 				    entry->fo_exec_next->fo_instance);
    665 				entry->fo_exec_next =
    666 				    entry->fo_exec_next->fo_exec_next;
    667 				break;
    668 			}
    669 			entry = entry->fo_exec_next;
    670 		}
    671 	}
    672 
    673 #ifdef HAVE_PROCFS
    674 	/* Close /proc stats */
    675 	if (flowop->fo_thread)
    676 		(void) close(flowop->fo_thread->tf_lwpusagefd);
    677 #endif
    678 
    679 	/* Delete from global list */
    680 	entry = filebench_shm->shm_flowoplist;
    681 
    682 	if (flowop == filebench_shm->shm_flowoplist) {
    683 		/* First on list */
    684 		filebench_shm->shm_flowoplist = flowop->fo_next;
    685 		found = 1;
    686 	} else {
    687 		while (entry->fo_next) {
    688 			filebench_log(LOG_DEBUG_IMPL,
    689 			    "Delete flowop: (%s-%d) == (%s-%d)",
    690 			    entry->fo_next->fo_name,
    691 			    entry->fo_next->fo_instance,
    692 			    flowop->fo_name,
    693 			    flowop->fo_instance);
    694 
    695 			if (flowop == entry->fo_next) {
    696 				/* Delete */
    697 				entry->fo_next = entry->fo_next->fo_next;
    698 				found = 1;
    699 				break;
    700 			}
    701 
    702 			entry = entry->fo_next;
    703 		}
    704 	}
    705 	if (found) {
    706 		filebench_log(LOG_DEBUG_IMPL,
    707 		    "Deleted flowop: (%s-%d)",
    708 		    flowop->fo_name,
    709 		    flowop->fo_instance);
    710 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
    711 	} else {
    712 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
    713 		    flowop->fo_name,
    714 		    flowop->fo_instance);
    715 	}
    716 }
    717 
    718 /*
    719  * Deletes all the flowops from a flowop list.
    720  */
    721 void
    722 flowop_delete_all(flowop_t **flowoplist)
    723 {
    724 	flowop_t *flowop = *flowoplist;
    725 	flowop_t *next_flowop;
    726 
    727 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    728 
    729 	while (flowop) {
    730 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
    731 		    flowop->fo_name, flowop->fo_instance);
    732 
    733 		if (flowop->fo_instance &&
    734 		    (flowop->fo_instance == FLOW_MASTER)) {
    735 			flowop = flowop->fo_exec_next;
    736 			continue;
    737 		}
    738 		next_flowop = flowop->fo_exec_next;
    739 		flowop_delete(flowoplist, flowop);
    740 		flowop = next_flowop;
    741 	}
    742 
    743 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    744 }
    745 
    746 /*
    747  * Allocates a flowop entity and initializes it with inherited
    748  * contents from the "inherit" flowop, if it is supplied, or
    749  * with zeros otherwise. In either case the fo_next and fo_exec_next
    750  * pointers are set to NULL, and fo_thread is set to point to
    751  * the owning threadflow. The initialized flowop is placed at
    752  * the head of the global flowop list, and also placed on the
    753  * tail of the supplied local flowop list, which will either
    754  * be a threadflow's tf_thrd_fops list or a composite flowop's
    755  * fo_comp_fops list. The routine locks the flowop's fo_lock and
    756  * leaves it held on return. If successful, it returns a pointer
    757  * to the allocated and initialized flowop, otherwise it returns NULL.
    758  *
    759  * filebench_shm->shm_flowop_lock must be held by caller.
    760  */
    761 static flowop_t *
    762 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
    763     flowop_t **flowoplist_hdp, int instance, int type)
    764 {
    765 	flowop_t *flowop;
    766 
    767 	if (name == NULL)
    768 		return (NULL);
    769 
    770 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
    771 		filebench_log(LOG_ERROR,
    772 		    "flowop_define: Can't malloc flowop");
    773 		return (NULL);
    774 	}
    775 
    776 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
    777 	    name, instance, flowop);
    778 
    779 	if (flowop == NULL)
    780 		return (NULL);
    781 
    782 	if (inherit) {
    783 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
    784 		(void) pthread_mutex_init(&flowop->fo_lock,
    785 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
    786 		(void) ipc_mutex_lock(&flowop->fo_lock);
    787 		flowop->fo_next = NULL;
    788 		flowop->fo_exec_next = NULL;
    789 		filebench_log(LOG_DEBUG_IMPL,
    790 		    "flowop %s-%d calling init", name, instance);
    791 	} else {
    792 		(void) memset(flowop, 0, sizeof (flowop_t));
    793 		flowop->fo_iters = avd_int_alloc(1);
    794 		flowop->fo_type = type;
    795 		(void) pthread_mutex_init(&flowop->fo_lock,
    796 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
    797 		(void) ipc_mutex_lock(&flowop->fo_lock);
    798 	}
    799 
    800 	/* Create backpointer to thread */
    801 	flowop->fo_thread = threadflow;
    802 
    803 	/* Add flowop to global list */
    804 	if (filebench_shm->shm_flowoplist == NULL) {
    805 		filebench_shm->shm_flowoplist = flowop;
    806 		flowop->fo_next = NULL;
    807 	} else {
    808 		flowop->fo_next = filebench_shm->shm_flowoplist;
    809 		filebench_shm->shm_flowoplist = flowop;
    810 	}
    811 
    812 	(void) strcpy(flowop->fo_name, name);
    813 	flowop->fo_instance = instance;
    814 
    815 	if (flowoplist_hdp == NULL)
    816 		return (flowop);
    817 
    818 	/* Add flowop to thread op list */
    819 	if (*flowoplist_hdp == NULL) {
    820 		*flowoplist_hdp = flowop;
    821 		flowop->fo_exec_next = NULL;
    822 	} else {
    823 		flowop_t *flowend;
    824 
    825 		/* Find the end of the thread list */
    826 		flowend = *flowoplist_hdp;
    827 		while (flowend->fo_exec_next != NULL)
    828 			flowend = flowend->fo_exec_next;
    829 		flowend->fo_exec_next = flowop;
    830 		flowop->fo_exec_next = NULL;
    831 	}
    832 
    833 	return (flowop);
    834 }
    835 
    836 /*
    837  * Calls flowop_define_common() to allocate and initialize a
    838  * flowop, and holds the shared flowop_lock during the call.
    839  * It releases the created flowop's fo_lock when done.
    840  */
    841 flowop_t *
    842 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
    843     flowop_t **flowoplist_hdp, int instance, int type)
    844 {
    845 	flowop_t	*flowop;
    846 
    847 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    848 	flowop = flowop_define_common(threadflow, name,
    849 	    inherit, flowoplist_hdp, instance, type);
    850 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    851 
    852 	if (flowop == NULL)
    853 		return (NULL);
    854 
    855 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    856 
    857 	return (flowop);
    858 }
    859 
    860 /*
    861  * Calls flowop_define_common() to allocate and initialize a
    862  * composite flowop, and holds the shared flowop_lock during the call.
    863  * It releases the created flowop's fo_lock when done.
    864  */
    865 flowop_t *
    866 flowop_new_composite_define(char *name)
    867 {
    868 	flowop_t *flowop;
    869 
    870 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    871 	flowop = flowop_define_common(NULL, name,
    872 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
    873 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    874 
    875 	if (flowop == NULL)
    876 		return (NULL);
    877 
    878 	flowop->fo_func = flowop_composite;
    879 	flowop->fo_init = flowop_composite_init;
    880 	flowop->fo_destruct = flowop_composite_destruct;
    881 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    882 
    883 	return (flowop);
    884 }
    885 
    886 /*
    887  * Attempts to take a write lock on the flowop_find_lock that is
    888  * defined in interprocess shared memory. Since each call to
    889  * flowop_start() holds a read lock on flowop_find_lock, this
    890  * routine effectively blocks until all instances of
    891  * flowop_start() have finished. The flowop_find() routine calls
    892  * this routine so that flowops won't be searched for until all
    893  * flowops have been created by flowop_start.
    894  */
    895 static void
    896 flowop_find_barrier(void)
    897 {
    898 	/* Block on wrlock to ensure find waits for all creates */
    899 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
    900 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
    901 }
    902 
    903 /*
    904  * Returns a list of flowops named "name" from the master
    905  * flowop list.
    906  */
    907 flowop_t *
    908 flowop_find(char *name)
    909 {
    910 	flowop_t *flowop;
    911 	flowop_t *result = NULL;
    912 
    913 	flowop_find_barrier();
    914 
    915 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    916 
    917 	flowop = filebench_shm->shm_flowoplist;
    918 
    919 	while (flowop) {
    920 		if (strcmp(name, flowop->fo_name) == 0) {
    921 
    922 			/* Add flowop to result list */
    923 			if (result == NULL) {
    924 				result = flowop;
    925 				flowop->fo_resultnext = NULL;
    926 			} else {
    927 				flowop->fo_resultnext = result;
    928 				result = flowop;
    929 			}
    930 		}
    931 		flowop = flowop->fo_next;
    932 	}
    933 
    934 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    935 
    936 
    937 	return (result);
    938 }
    939 
    940 /*
    941  * Returns a pointer to the specified instance of flowop
    942  * "name" from the global list.
    943  */
    944 flowop_t *
    945 flowop_find_one(char *name, int instance)
    946 {
    947 	flowop_t *test_flowop;
    948 
    949 	flowop_find_barrier();
    950 
    951 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    952 
    953 	test_flowop = filebench_shm->shm_flowoplist;
    954 
    955 	while (test_flowop) {
    956 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
    957 		    (instance == test_flowop->fo_instance))
    958 			break;
    959 
    960 		test_flowop = test_flowop->fo_next;
    961 	}
    962 
    963 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    964 
    965 	return (test_flowop);
    966 }
    967 
    968 /*
    969  * recursively searches through lists of flowops on a given thread
    970  * and those on any included composite flowops for the named flowop.
    971  * either returns with a pointer to the named flowop or NULL if it
    972  * cannot be found.
    973  */
    974 static flowop_t *
    975 flowop_recurse_search(char *path, char *name, flowop_t *list)
    976 {
    977 	flowop_t *test_flowop;
    978 	char fullname[MAXPATHLEN];
    979 
    980 	test_flowop = list;
    981 
    982 	/*
    983 	 * when searching a list of inner flowops, "path" is the fullname
    984 	 * of the containing composite flowop. Use it to form the
    985 	 * full name of the inner flowop to search for.
    986 	 */
    987 	if (path) {
    988 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
    989 			filebench_log(LOG_ERROR,
    990 			    "composite flowop path name %s.%s too long",
    991 			    path, name);
    992 			return (NULL);
    993 		}
    994 
    995 		/* create composite_name.name for recursive search */
    996 		(void) strcpy(fullname, path);
    997 		(void) strcat(fullname, ".");
    998 		(void) strcat(fullname, name);
    999 	} else {
   1000 		(void) strcpy(fullname, name);
   1001 	}
   1002 
   1003 	/*
   1004 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
   1005 	 * list or fo_comp_fops (inner flowop) list.
   1006 	 */
   1007 	while (test_flowop) {
   1008 		if (strcmp(fullname, test_flowop->fo_name) == 0)
   1009 			return (test_flowop);
   1010 
   1011 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
   1012 			flowop_t *found_flowop;
   1013 
   1014 			found_flowop = flowop_recurse_search(
   1015 			    test_flowop->fo_name, name,
   1016 			    test_flowop->fo_comp_fops);
   1017 
   1018 			if (found_flowop)
   1019 				return (found_flowop);
   1020 		}
   1021 		test_flowop = test_flowop->fo_exec_next;
   1022 	}
   1023 
   1024 	/* not found here or on any child lists */
   1025 	return (NULL);
   1026 }
   1027 
   1028 /*
   1029  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
   1030  * list of flowops. Returns the named flowop if found, or NULL.
   1031  */
   1032 flowop_t *
   1033 flowop_find_from_list(char *name, flowop_t *list)
   1034 {
   1035 	flowop_t *found_flowop;
   1036 
   1037 	flowop_find_barrier();
   1038 
   1039 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
   1040 
   1041 	found_flowop = flowop_recurse_search(NULL, name, list);
   1042 
   1043 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
   1044 
   1045 	return (found_flowop);
   1046 }
   1047 
   1048 /*
   1049  * Composite flowop method. Does one pass through its list of
   1050  * inner flowops per iteration.
   1051  */
   1052 static int
   1053 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
   1054 {
   1055 	flowop_t	*inner_flowop;
   1056 
   1057 	/* get the first flowop in the list */
   1058 	inner_flowop = flowop->fo_comp_fops;
   1059 
   1060 	/* make a pass through the list of sub flowops */
   1061 	while (inner_flowop) {
   1062 		int	i, count;
   1063 
   1064 		/* Abort if asked */
   1065 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
   1066 			return (FILEBENCH_DONE);
   1067 
   1068 		if (inner_flowop->fo_stats.fs_stime == 0)
   1069 			inner_flowop->fo_stats.fs_stime = gethrtime();
   1070 
   1071 		/* Execute the flowop for fo_iters times */
   1072 		count = (int)avd_get_int(inner_flowop->fo_iters);
   1073 		for (i = 0; i < count; i++) {
   1074 
   1075 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
   1076 			    "%s-%d", threadflow->tf_name,
   1077 			    inner_flowop->fo_name,
   1078 			    inner_flowop->fo_instance);
   1079 
   1080 			switch ((*inner_flowop->fo_func)(threadflow,
   1081 			    inner_flowop)) {
   1082 
   1083 			/* all done */
   1084 			case FILEBENCH_DONE:
   1085 				return (FILEBENCH_DONE);
   1086 
   1087 			/* quit if inner flowop limit reached */
   1088 			case FILEBENCH_NORSC:
   1089 				return (FILEBENCH_NORSC);
   1090 
   1091 			/* quit on inner flowop error */
   1092 			case FILEBENCH_ERROR:
   1093 				filebench_log(LOG_ERROR,
   1094 				    "inner flowop %s failed",
   1095 				    inner_flowop->fo_name);
   1096 				return (FILEBENCH_ERROR);
   1097 
   1098 			/* otherwise keep going */
   1099 			default:
   1100 				break;
   1101 			}
   1102 
   1103 		}
   1104 
   1105 		/* advance to next flowop */
   1106 		inner_flowop = inner_flowop->fo_exec_next;
   1107 	}
   1108 
   1109 	/* finished with this pass */
   1110 	return (FILEBENCH_OK);
   1111 }
   1112 
   1113 /*
   1114  * Composite flowop initialization. Creates runtime inner flowops
   1115  * from prototype inner flowops.
   1116  */
   1117 static int
   1118 flowop_composite_init(flowop_t *flowop)
   1119 {
   1120 	int err;
   1121 
   1122 	err = flowop_create_runtime_flowops(flowop->fo_thread,
   1123 	    &flowop->fo_comp_fops);
   1124 	if (err != FILEBENCH_OK)
   1125 		return (err);
   1126 
   1127 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1128 	return (0);
   1129 }
   1130 
   1131 /*
   1132  * clean up inner flowops
   1133  */
   1134 static void
   1135 flowop_composite_destruct(flowop_t *flowop)
   1136 {
   1137 	flowop_t *inner_flowop = flowop->fo_comp_fops;
   1138 
   1139 	while (inner_flowop) {
   1140 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
   1141 		    inner_flowop->fo_name, inner_flowop->fo_instance);
   1142 
   1143 		if (inner_flowop->fo_instance &&
   1144 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
   1145 			inner_flowop = inner_flowop->fo_exec_next;
   1146 			continue;
   1147 		}
   1148 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
   1149 		inner_flowop = inner_flowop->fo_exec_next;
   1150 	}
   1151 }
   1152 
   1153 /*
   1154  * Support routines for libraries of flowops
   1155  */
   1156 
   1157 int
   1158 flowop_init_generic(flowop_t *flowop)
   1159 {
   1160 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1161 	return (FILEBENCH_OK);
   1162 }
   1163 
   1164 void
   1165 flowop_destruct_generic(flowop_t *flowop)
   1166 {
   1167 	char *buf;
   1168 
   1169 	/* release any local resources held by the flowop */
   1170 	(void) ipc_mutex_lock(&flowop->fo_lock);
   1171 	buf = flowop->fo_buf;
   1172 	flowop->fo_buf = NULL;
   1173 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1174 
   1175 	if (buf)
   1176 		free(buf);
   1177 }
   1178 
   1179 
   1180 /*
   1181  * Loops through the supplied list of flowops and creates and initializes
   1182  * a flowop for each one by calling flowop_define. As a side effect of
   1183  * calling flowop define, the created flowops are placed on the
   1184  * master flowop list. All created flowops are set to instance "0".
   1185  */
   1186 void
   1187 flowop_flow_init(flowop_proto_t *list, int nops)
   1188 {
   1189 	int i;
   1190 
   1191 	for (i = 0; i < nops; i++) {
   1192 		flowop_t *flowop;
   1193 		flowop_proto_t *fl;
   1194 
   1195 		fl = &(list[i]);
   1196 
   1197 		if ((flowop = flowop_define(NULL,
   1198 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
   1199 			filebench_log(LOG_ERROR,
   1200 			    "failed to create flowop %s\n",
   1201 			    fl->fl_name);
   1202 			filebench_shutdown(1);
   1203 		}
   1204 
   1205 		flowop->fo_func = fl->fl_func;
   1206 		flowop->fo_init = fl->fl_init;
   1207 		flowop->fo_destruct = fl->fl_destruct;
   1208 		flowop->fo_attrs = fl->fl_attrs;
   1209 	}
   1210 }
   1211