Home | History | Annotate | Download | only in common
      1  5184  ek110237 /*
      2  5184  ek110237  * CDDL HEADER START
      3  5184  ek110237  *
      4  5184  ek110237  * The contents of this file are subject to the terms of the
      5  5184  ek110237  * Common Development and Distribution License (the "License").
      6  5184  ek110237  * You may not use this file except in compliance with the License.
      7  5184  ek110237  *
      8  5184  ek110237  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  5184  ek110237  * or http://www.opensolaris.org/os/licensing.
     10  5184  ek110237  * See the License for the specific language governing permissions
     11  5184  ek110237  * and limitations under the License.
     12  5184  ek110237  *
     13  5184  ek110237  * When distributing Covered Code, include this CDDL HEADER in each
     14  5184  ek110237  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  5184  ek110237  * If applicable, add the following below this CDDL HEADER, with the
     16  5184  ek110237  * fields enclosed by brackets "[]" replaced with your own identifying
     17  5184  ek110237  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  5184  ek110237  *
     19  5184  ek110237  * CDDL HEADER END
     20  5184  ek110237  */
     21  5184  ek110237 /*
     22  8615    Andrew  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  5184  ek110237  * Use is subject to license terms.
     24  5184  ek110237  */
     25  5184  ek110237 
     26  5184  ek110237 #include "config.h"
     27  5184  ek110237 
     28  5184  ek110237 #ifdef HAVE_LWPS
     29  5184  ek110237 #include <sys/lwp.h>
     30  5184  ek110237 #endif
     31  5184  ek110237 #include <fcntl.h>
     32  5184  ek110237 #include "filebench.h"
     33  5184  ek110237 #include "flowop.h"
     34  5184  ek110237 #include "stats.h"
     35  5184  ek110237 
     36  5184  ek110237 #ifdef LINUX_PORT
     37  5184  ek110237 #include <sys/types.h>
     38  5184  ek110237 #include <linux/unistd.h>
     39  5184  ek110237 #endif
     40  5184  ek110237 
     41  5184  ek110237 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
     42  6550  aw148015     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
     43  6550  aw148015 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
     44  6550  aw148015 static int flowop_composite_init(flowop_t *flowop);
     45  6550  aw148015 static void flowop_composite_destruct(flowop_t *flowop);
     46  5184  ek110237 
     47  5184  ek110237 /*
     48  5184  ek110237  * A collection of flowop support functions. The actual code that
     49  5184  ek110237  * implements the various flowops is in flowop_library.c.
     50  5184  ek110237  *
     51  5184  ek110237  * Routines for defining, creating, initializing and destroying
     52  5184  ek110237  * flowops, cyclically invoking the flowops on each threadflow's flowop
     53  5184  ek110237  * list, collecting statistics about flowop execution, and other
     54  5184  ek110237  * housekeeping duties are included in this file.
     55  6550  aw148015  *
     56  6550  aw148015  * User Defined Composite Flowops
     57  6550  aw148015  *    The ability to define new flowops as lists of built-in or previously
     58  6550  aw148015  * defined flowops has been added to Filebench. In a sense they are like
     59  6550  aw148015  * in-line subroutines, which can have default attributes set at definition
     60  6550  aw148015  * time and passed arguments at invocation time. Like other flowops (and
     61  6550  aw148015  * unlike conventional subroutines) you can invoke them with an iteration
     62  6550  aw148015  * count (the "iter" attribute), and they will loop through their associated
     63  6550  aw148015  * list of flowops for "iter" number of times each time they are encountered
     64  6550  aw148015  * in the thread or outer composite flowop which invokes them.
     65  6550  aw148015  *
     66  6550  aw148015  * Composite flowops are created with a "define" command, are given a name,
     67  6550  aw148015  * optional default attributes, and local variable definitions on the
     68  6550  aw148015  * "define" command line, followed by a brace enclosed list of flowops
     69  6550  aw148015  * to execute. The enclosed flowops may include attributes that reference
     70  6550  aw148015  * the local variables, as well as constants and global variables.
     71  6550  aw148015  *
     72  6550  aw148015  * Composite flowops are used pretty much like regular flowops, but you can
     73  6550  aw148015  * also set local variables to constants or global variables ($local_var =
     74  6550  aw148015  * [$var | $random_var | string | boolean | integer | double]) as part of
     75  6550  aw148015  * the invocation. Thus each invocation can pass customized values to its
     76  6550  aw148015  * inner flowops, greatly increasing their generality.
     77  6550  aw148015  *
     78  6550  aw148015  * All flowops are placed on a global, singly linked list, with fo_next
     79  6550  aw148015  * being the link pointer for this list. The are also placed on a private
     80  6550  aw148015  * list for the thread or composite flowop they are part of. The tf_thrd_fops
     81  6550  aw148015  * pointer in the thread will point to the list of top level flowops in the
     82  6550  aw148015  * thread, which are linked together by fo_exec_next. If any of these flowops
     83  6550  aw148015  * are composite flowops, they will have a list of second level flowops rooted
     84  6550  aw148015  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
     85  6550  aw148015  * of all flowops, and an n-arry tree of threads, composite flowops, and
     86  6550  aw148015  * flowops, with composite flowops being the branch nodes in the tree.
     87  6550  aw148015  *
     88  6550  aw148015  * To illustrate, if we have three first level flowops, the first of which is
     89  6550  aw148015  * a composite flowop consisting of two other flowops, we get:
     90  6550  aw148015  *
     91  6550  aw148015  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
     92  6550  aw148015  *			   flowop->fo_comp_fops		    |
     93  6550  aw148015  *				    |			    V
     94  6550  aw148015  *				    |			flowop->fo_exec_next
     95  6550  aw148015  *				    |
     96  6550  aw148015  *				    V
     97  6550  aw148015  *				flowop->fo_exec_next -> flowop->fo_exec_next
     98  6550  aw148015  *
     99  6550  aw148015  * And all five flowops (plus others from any other threads) are on a global
    100  6550  aw148015  * list linked with fo_next.
    101  5184  ek110237  */
    102  5184  ek110237 
    103  5184  ek110237 /*
    104  5184  ek110237  * Prints the name and instance number of each flowop in
    105  5184  ek110237  * the supplied list to the filebench log.
    106  5184  ek110237  */
    107  5184  ek110237 int
    108  5184  ek110237 flowop_printlist(flowop_t *list)
    109  5184  ek110237 {
    110  5184  ek110237 	flowop_t *flowop = list;
    111  5184  ek110237 
    112  5184  ek110237 	while (flowop) {
    113  5184  ek110237 		filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
    114  5184  ek110237 		    flowop->fo_name, flowop->fo_instance);
    115  6550  aw148015 		flowop = flowop->fo_exec_next;
    116  5184  ek110237 	}
    117  5184  ek110237 	return (0);
    118  6212  aw148015 }
    119  6212  aw148015 
    120  6212  aw148015 /*
    121  6212  aw148015  * Prints the name and instance number of all flowops on
    122  6212  aw148015  * the master flowop list to the console and the filebench log.
    123  6212  aw148015  */
    124  6212  aw148015 void
    125  6212  aw148015 flowop_printall(void)
    126  6212  aw148015 {
    127  6391  aw148015 	flowop_t *flowop = filebench_shm->shm_flowoplist;
    128  6212  aw148015 
    129  6212  aw148015 	while (flowop) {
    130  6212  aw148015 		filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
    131  6212  aw148015 		    flowop->fo_name, flowop->fo_instance);
    132  6212  aw148015 		flowop = flowop->fo_next;
    133  6212  aw148015 	}
    134  5184  ek110237 }
    135  5184  ek110237 
    136  5184  ek110237 #define	TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
    137  5184  ek110237 					(e.tv_nsec - s.tv_nsec))
    138  5184  ek110237 /*
    139  5184  ek110237  * Puts current high resolution time in start time entry
    140  5184  ek110237  * for threadflow and may also calculate running filebench
    141  5184  ek110237  * overhead statistics.
    142  5184  ek110237  */
    143  5184  ek110237 void
    144  5184  ek110237 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
    145  5184  ek110237 {
    146  5184  ek110237 #ifdef HAVE_PROCFS
    147  9356    Andrew 	if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
    148  9356    Andrew 		if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
    149  9356    Andrew 			char procname[128];
    150  5184  ek110237 
    151  9356    Andrew 			(void) snprintf(procname, sizeof (procname),
    152  9356    Andrew 			    "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
    153  9356    Andrew 			threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
    154  9356    Andrew 		}
    155  5184  ek110237 
    156  9356    Andrew 		(void) pread(threadflow->tf_lwpusagefd,
    157  9356    Andrew 		    &threadflow->tf_susage,
    158  9356    Andrew 		    sizeof (struct prusage), 0);
    159  5184  ek110237 
    160  9356    Andrew 		/* Compute overhead time in this thread around op */
    161  9356    Andrew 		if (threadflow->tf_eusage.pr_stime.tv_nsec) {
    162  9356    Andrew 			flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
    163  9356    Andrew 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
    164  9356    Andrew 			    threadflow->tf_susage.pr_utime) +
    165  9356    Andrew 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
    166  9356    Andrew 			    threadflow->tf_susage.pr_ttime) +
    167  9356    Andrew 			    TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
    168  9356    Andrew 			    threadflow->tf_susage.pr_stime);
    169  9356    Andrew 		}
    170  5184  ek110237 	}
    171  5184  ek110237 #endif
    172  9356    Andrew 
    173  5184  ek110237 	/* Start of op for this thread */
    174  5184  ek110237 	threadflow->tf_stime = gethrtime();
    175  5184  ek110237 }
    176  5184  ek110237 
    177  5184  ek110237 flowstat_t controlstats;
    178  6212  aw148015 pthread_mutex_t controlstats_lock;
    179  5184  ek110237 static int controlstats_zeroed = 0;
    180  5184  ek110237 
    181  5184  ek110237 /*
    182  5184  ek110237  * Updates flowop's latency statistics, using saved start
    183  5184  ek110237  * time and current high resolution time. Updates flowop's
    184  5184  ek110237  * io count and transferred bytes statistics. Also updates
    185  5184  ek110237  * threadflow's and flowop's cumulative read or write byte
    186  5184  ek110237  * and io count statistics.
    187  5184  ek110237  */
    188  5184  ek110237 void
    189  5673  aw148015 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
    190  5184  ek110237 {
    191  5184  ek110237 	hrtime_t t;
    192  5184  ek110237 
    193  5184  ek110237 	flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
    194  5184  ek110237 	    (gethrtime() - threadflow->tf_stime);
    195  5184  ek110237 #ifdef HAVE_PROCFS
    196  9356    Andrew 	if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
    197  9356    Andrew 		if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
    198  9356    Andrew 		    sizeof (struct prusage), 0)) != sizeof (struct prusage))
    199  9356    Andrew 			filebench_log(LOG_ERROR, "cannot read /proc");
    200  5184  ek110237 
    201  9356    Andrew 		t =
    202  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
    203  9356    Andrew 		    threadflow->tf_eusage.pr_utime) +
    204  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
    205  9356    Andrew 		    threadflow->tf_eusage.pr_ttime) +
    206  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
    207  9356    Andrew 		    threadflow->tf_eusage.pr_stime);
    208  9356    Andrew 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
    209  5184  ek110237 
    210  9356    Andrew 		flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
    211  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
    212  9356    Andrew 		    threadflow->tf_eusage.pr_tftime) +
    213  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
    214  9356    Andrew 		    threadflow->tf_eusage.pr_dftime) +
    215  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
    216  9356    Andrew 		    threadflow->tf_eusage.pr_kftime) +
    217  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
    218  9356    Andrew 		    threadflow->tf_eusage.pr_kftime) +
    219  9356    Andrew 		    TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
    220  9356    Andrew 		    threadflow->tf_eusage.pr_slptime);
    221  9356    Andrew 	}
    222  5184  ek110237 #endif
    223  5184  ek110237 
    224  5184  ek110237 	flowop->fo_stats.fs_count++;
    225  5673  aw148015 	flowop->fo_stats.fs_bytes += bytes;
    226  6212  aw148015 	(void) ipc_mutex_lock(&controlstats_lock);
    227  5184  ek110237 	if ((flowop->fo_type & FLOW_TYPE_IO) ||
    228  5184  ek110237 	    (flowop->fo_type & FLOW_TYPE_AIO)) {
    229  5184  ek110237 		controlstats.fs_count++;
    230  5673  aw148015 		controlstats.fs_bytes += bytes;
    231  5184  ek110237 	}
    232  5184  ek110237 	if (flowop->fo_attrs & FLOW_ATTR_READ) {
    233  5673  aw148015 		threadflow->tf_stats.fs_rbytes += bytes;
    234  5184  ek110237 		threadflow->tf_stats.fs_rcount++;
    235  5184  ek110237 		flowop->fo_stats.fs_rcount++;
    236  5673  aw148015 		controlstats.fs_rbytes += bytes;
    237  5184  ek110237 		controlstats.fs_rcount++;
    238  5184  ek110237 	} else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
    239  5673  aw148015 		threadflow->tf_stats.fs_wbytes += bytes;
    240  5184  ek110237 		threadflow->tf_stats.fs_wcount++;
    241  5184  ek110237 		flowop->fo_stats.fs_wcount++;
    242  5673  aw148015 		controlstats.fs_wbytes += bytes;
    243  5184  ek110237 		controlstats.fs_wcount++;
    244  5184  ek110237 	}
    245  6212  aw148015 	(void) ipc_mutex_unlock(&controlstats_lock);
    246  5184  ek110237 }
    247  5184  ek110237 
    248  5184  ek110237 /*
    249  5184  ek110237  * Calls the flowop's initialization function, pointed to by
    250  5184  ek110237  * flowop->fo_init.
    251  5184  ek110237  */
    252  5184  ek110237 static int
    253  5184  ek110237 flowop_initflow(flowop_t *flowop)
    254  5184  ek110237 {
    255  6212  aw148015 	/*
    256  6212  aw148015 	 * save static copies of two items, in case they are supplied
    257  6212  aw148015 	 * from random variables
    258  6212  aw148015 	 */
    259  7556    Andrew 	if (!AVD_IS_STRING(flowop->fo_value))
    260  7556    Andrew 		flowop->fo_constvalue = avd_get_int(flowop->fo_value);
    261  7556    Andrew 
    262  6212  aw148015 	flowop->fo_constwss = avd_get_int(flowop->fo_wss);
    263  6212  aw148015 
    264  5184  ek110237 	if ((*flowop->fo_init)(flowop) < 0) {
    265  5184  ek110237 		filebench_log(LOG_ERROR, "flowop %s-%d init failed",
    266  5184  ek110237 		    flowop->fo_name, flowop->fo_instance);
    267  5184  ek110237 		return (-1);
    268  5184  ek110237 	}
    269  5184  ek110237 	return (0);
    270  5184  ek110237 }
    271  5184  ek110237 
    272  6550  aw148015 static int
    273  6550  aw148015 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
    274  6550  aw148015 {
    275  6550  aw148015 	flowop_t *flowop = *ops_list_ptr;
    276  6550  aw148015 
    277  6550  aw148015 	while (flowop) {
    278  6550  aw148015 		flowop_t *newflowop;
    279  6550  aw148015 
    280  6550  aw148015 		if (flowop == *ops_list_ptr)
    281  6550  aw148015 			*ops_list_ptr = NULL;
    282  6550  aw148015 
    283  6550  aw148015 		newflowop = flowop_define_common(threadflow, flowop->fo_name,
    284  6550  aw148015 		    flowop, ops_list_ptr, 1, 0);
    285  6550  aw148015 		if (newflowop == NULL)
    286  6550  aw148015 			return (FILEBENCH_ERROR);
    287  6550  aw148015 
    288  6550  aw148015 		/* check for fo_filename attribute, and resolve if present */
    289  6550  aw148015 		if (flowop->fo_filename) {
    290  6550  aw148015 			char *name;
    291  6550  aw148015 
    292  6550  aw148015 			name = avd_get_str(flowop->fo_filename);
    293  6550  aw148015 			newflowop->fo_fileset = fileset_find(name);
    294  6550  aw148015 
    295  6550  aw148015 			if (newflowop->fo_fileset == NULL) {
    296  6550  aw148015 				filebench_log(LOG_ERROR,
    297  6550  aw148015 				    "flowop %s: file %s not found",
    298  6550  aw148015 				    newflowop->fo_name, name);
    299  6550  aw148015 				filebench_shutdown(1);
    300  6550  aw148015 			}
    301  6550  aw148015 		}
    302  6550  aw148015 
    303  6550  aw148015 		if (flowop_initflow(newflowop) < 0) {
    304  6550  aw148015 			filebench_log(LOG_ERROR, "Flowop init of %s failed",
    305  6550  aw148015 			    newflowop->fo_name);
    306  6550  aw148015 		}
    307  6550  aw148015 
    308  6550  aw148015 		flowop = flowop->fo_exec_next;
    309  6550  aw148015 	}
    310  6550  aw148015 	return (FILEBENCH_OK);
    311  6550  aw148015 }
    312  6550  aw148015 
    313  5184  ek110237 /*
    314  6084  aw148015  * Calls the flowop's destruct function, pointed to by
    315  6084  aw148015  * flowop->fo_destruct.
    316  6084  aw148015  */
    317  6084  aw148015 static void
    318  6084  aw148015 flowop_destructflow(flowop_t *flowop)
    319  6084  aw148015 {
    320  6084  aw148015 	(*flowop->fo_destruct)(flowop);
    321  6084  aw148015 }
    322  6084  aw148015 
    323  6084  aw148015 /*
    324  6084  aw148015  * call the destruct funtions of all the threadflow's flowops,
    325  6084  aw148015  * if it is still flagged as "running".
    326  6084  aw148015  */
    327  6084  aw148015 void
    328  6084  aw148015 flowop_destruct_all_flows(threadflow_t *threadflow)
    329  6084  aw148015 {
    330  6084  aw148015 	flowop_t *flowop;
    331  6084  aw148015 
    332  6701  aw148015 	/* wait a moment to give other threads a chance to stop too */
    333  6701  aw148015 	(void) sleep(1);
    334  6701  aw148015 
    335  6084  aw148015 	(void) ipc_mutex_lock(&threadflow->tf_lock);
    336  6084  aw148015 
    337  6084  aw148015 	/* prepare to call destruct flow routines, if necessary */
    338  6084  aw148015 	if (threadflow->tf_running == 0) {
    339  6084  aw148015 
    340  6084  aw148015 		/* allready destroyed */
    341  6084  aw148015 		(void) ipc_mutex_unlock(&threadflow->tf_lock);
    342  6084  aw148015 		return;
    343  6084  aw148015 	}
    344  6084  aw148015 
    345  6550  aw148015 	flowop = threadflow->tf_thrd_fops;
    346  6084  aw148015 	threadflow->tf_running = 0;
    347  6084  aw148015 	(void) ipc_mutex_unlock(&threadflow->tf_lock);
    348  6084  aw148015 
    349  6084  aw148015 	while (flowop) {
    350  6084  aw148015 		flowop_destructflow(flowop);
    351  6550  aw148015 		flowop = flowop->fo_exec_next;
    352  6084  aw148015 	}
    353  6084  aw148015 }
    354  6084  aw148015 
    355  6084  aw148015 /*
    356  5184  ek110237  * The final initialization and main execution loop for the
    357  5184  ek110237  * worker threads. Sets threadflow and flowop start times,
    358  5184  ek110237  * waits for all process to start, then creates the runtime
    359  5184  ek110237  * flowops from those defined by the F language workload
    360  5184  ek110237  * script. It does some more initialization, then enters a
    361  5184  ek110237  * loop to repeatedly execute the flowops on the flowop list
    362  5184  ek110237  * until an abort condition is detected, at which time it exits.
    363  5184  ek110237  * This is the starting routine for the new worker thread
    364  5184  ek110237  * created by threadflow_createthread(), and is not currently
    365  5184  ek110237  * called from anywhere else.
    366  5184  ek110237  */
    367  5184  ek110237 void
    368  5184  ek110237 flowop_start(threadflow_t *threadflow)
    369  5184  ek110237 {
    370  5184  ek110237 	flowop_t *flowop;
    371  5184  ek110237 	size_t memsize;
    372  9326    Andrew 	int ret = FILEBENCH_OK;
    373  5184  ek110237 
    374  5184  ek110237 #ifdef HAVE_PROCFS
    375  5184  ek110237 	if (noproc == 0) {
    376  5184  ek110237 		char procname[128];
    377  5184  ek110237 		long ctl[2] = {PCSET, PR_MSACCT};
    378  5184  ek110237 		int pfd;
    379  5184  ek110237 
    380  5184  ek110237 		(void) snprintf(procname, sizeof (procname),
    381  6084  aw148015 		    "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
    382  5184  ek110237 		pfd = open(procname, O_WRONLY);
    383  5184  ek110237 		(void) pwrite(pfd, &ctl, sizeof (ctl), 0);
    384  5184  ek110237 		(void) close(pfd);
    385  5184  ek110237 	}
    386  5184  ek110237 #endif
    387  5184  ek110237 
    388  6212  aw148015 	(void) ipc_mutex_lock(&controlstats_lock);
    389  5184  ek110237 	if (!controlstats_zeroed) {
    390  5184  ek110237 		(void) memset(&controlstats, 0, sizeof (controlstats));
    391  5184  ek110237 		controlstats_zeroed = 1;
    392  5184  ek110237 	}
    393  6212  aw148015 	(void) ipc_mutex_unlock(&controlstats_lock);
    394  5184  ek110237 
    395  6550  aw148015 	flowop = threadflow->tf_thrd_fops;
    396  5184  ek110237 	threadflow->tf_stats.fs_stime = gethrtime();
    397  5184  ek110237 	flowop->fo_stats.fs_stime = gethrtime();
    398  5184  ek110237 
    399  5184  ek110237 	/* Hold the flowop find lock as reader to prevent lookups */
    400  6391  aw148015 	(void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
    401  5184  ek110237 
    402  5184  ek110237 	/*
    403  5184  ek110237 	 * Block until all processes have started, acting like
    404  5184  ek110237 	 * a barrier. The original filebench process initially
    405  5184  ek110237 	 * holds the run_lock as a reader, preventing any of the
    406  5184  ek110237 	 * threads from obtaining the writer lock, and hence
    407  5184  ek110237 	 * passing this point. Once all processes and threads
    408  5184  ek110237 	 * have been created, the original process unlocks
    409  5184  ek110237 	 * run_lock, allowing each waiting thread to lock
    410  5184  ek110237 	 * and then immediately unlock it, then begin running.
    411  5184  ek110237 	 */
    412  6391  aw148015 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
    413  6391  aw148015 	(void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
    414  5184  ek110237 
    415  5184  ek110237 	/* Create the runtime flowops from those defined by the script */
    416  6391  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    417  6550  aw148015 	if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
    418  6550  aw148015 	    != FILEBENCH_OK) {
    419  6550  aw148015 		(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    420  6550  aw148015 		filebench_shutdown(1);
    421  6550  aw148015 		return;
    422  5184  ek110237 	}
    423  6391  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    424  5184  ek110237 
    425  5184  ek110237 	/* Release the find lock as reader to allow lookups */
    426  6391  aw148015 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
    427  5184  ek110237 
    428  5184  ek110237 	/* Set to the start of the new flowop list */
    429  6550  aw148015 	flowop = threadflow->tf_thrd_fops;
    430  5184  ek110237 
    431  5184  ek110237 	threadflow->tf_abort = 0;
    432  5184  ek110237 	threadflow->tf_running = 1;
    433  5184  ek110237 
    434  6212  aw148015 	memsize = (size_t)threadflow->tf_constmemsize;
    435  6212  aw148015 
    436  5184  ek110237 	/* If we are going to use ISM, allocate later */
    437  5184  ek110237 	if (threadflow->tf_attrs & THREADFLOW_USEISM) {
    438  5184  ek110237 		threadflow->tf_mem =
    439  6212  aw148015 		    ipc_ismmalloc(memsize);
    440  5184  ek110237 	} else {
    441  6212  aw148015 		threadflow->tf_mem =
    442  6212  aw148015 		    malloc(memsize);
    443  5184  ek110237 	}
    444  5184  ek110237 
    445  5184  ek110237 	(void) memset(threadflow->tf_mem, 0, memsize);
    446  5184  ek110237 	filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
    447  5184  ek110237 
    448  5184  ek110237 #ifdef HAVE_LWPS
    449  5184  ek110237 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
    450  5184  ek110237 	    threadflow,
    451  5184  ek110237 	    _lwp_self());
    452  5184  ek110237 #endif
    453  5184  ek110237 
    454  5184  ek110237 	/* Main filebench worker loop */
    455  9326    Andrew 	while (ret == FILEBENCH_OK) {
    456  6212  aw148015 		int i, count;
    457  5184  ek110237 
    458  5184  ek110237 		/* Abort if asked */
    459  6391  aw148015 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
    460  5184  ek110237 			break;
    461  5184  ek110237 
    462  5184  ek110237 		/* Be quiet while stats are gathered */
    463  6391  aw148015 		if (filebench_shm->shm_bequiet) {
    464  5184  ek110237 			(void) sleep(1);
    465  5184  ek110237 			continue;
    466  5184  ek110237 		}
    467  5184  ek110237 
    468  5184  ek110237 		/* Take it easy until everyone is ready to go */
    469  6701  aw148015 		if (!filebench_shm->shm_procs_running) {
    470  5184  ek110237 			(void) sleep(1);
    471  6391  aw148015 			continue;
    472  6391  aw148015 		}
    473  5184  ek110237 
    474  5184  ek110237 		if (flowop == NULL) {
    475  5184  ek110237 			filebench_log(LOG_ERROR, "flowop_read null flowop");
    476  5184  ek110237 			return;
    477  5184  ek110237 		}
    478  5184  ek110237 
    479  6212  aw148015 		if (flowop->fo_stats.fs_stime == 0)
    480  6212  aw148015 			flowop->fo_stats.fs_stime = gethrtime();
    481  5184  ek110237 
    482  5184  ek110237 		/* Execute the flowop for fo_iters times */
    483  6550  aw148015 		count = (int)avd_get_int(flowop->fo_iters);
    484  6212  aw148015 		for (i = 0; i < count; i++) {
    485  6212  aw148015 
    486  5184  ek110237 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
    487  5184  ek110237 			    "%s-%d", threadflow->tf_name, flowop->fo_name,
    488  5184  ek110237 			    flowop->fo_instance);
    489  6212  aw148015 
    490  5184  ek110237 			ret = (*flowop->fo_func)(threadflow, flowop);
    491  5184  ek110237 
    492  6084  aw148015 			/*
    493  6084  aw148015 			 * Return value FILEBENCH_ERROR means "flowop
    494  6084  aw148015 			 * failed, stop the filebench run"
    495  6084  aw148015 			 */
    496  6084  aw148015 			if (ret == FILEBENCH_ERROR) {
    497  6084  aw148015 				filebench_log(LOG_ERROR,
    498  6084  aw148015 				    "%s-%d: flowop %s-%d failed",
    499  6084  aw148015 				    threadflow->tf_name,
    500  6084  aw148015 				    threadflow->tf_instance,
    501  6084  aw148015 				    flowop->fo_name,
    502  5184  ek110237 				    flowop->fo_instance);
    503  5184  ek110237 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    504  5184  ek110237 				threadflow->tf_abort = 1;
    505  6391  aw148015 				filebench_shm->shm_f_abort =
    506  6391  aw148015 				    FILEBENCH_ABORT_ERROR;
    507  5184  ek110237 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    508  5184  ek110237 				break;
    509  5184  ek110237 			}
    510  6084  aw148015 
    511  5184  ek110237 			/*
    512  6084  aw148015 			 * Return value of FILEBENCH_NORSC means "stop
    513  6084  aw148015 			 * the filebench run" if in "end on no work mode",
    514  6084  aw148015 			 * otherwise it indicates an error
    515  5184  ek110237 			 */
    516  6084  aw148015 			if (ret == FILEBENCH_NORSC) {
    517  5184  ek110237 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    518  6084  aw148015 				threadflow->tf_abort = FILEBENCH_DONE;
    519  6084  aw148015 				if (filebench_shm->shm_rmode ==
    520  6084  aw148015 				    FILEBENCH_MODE_Q1STDONE) {
    521  6391  aw148015 					filebench_shm->shm_f_abort =
    522  6084  aw148015 					    FILEBENCH_ABORT_RSRC;
    523  6084  aw148015 				} else if (filebench_shm->shm_rmode !=
    524  6084  aw148015 				    FILEBENCH_MODE_QALLDONE) {
    525  6084  aw148015 					filebench_log(LOG_ERROR1,
    526  6084  aw148015 					    "WARNING! Run stopped early:\n   "
    527  6084  aw148015 					    "             flowop %s-%d could "
    528  6084  aw148015 					    "not obtain a file. Please\n      "
    529  6084  aw148015 					    "          reduce runtime, "
    530  6084  aw148015 					    "increase fileset entries "
    531  6084  aw148015 					    "($nfiles), or switch modes.",
    532  6084  aw148015 					    flowop->fo_name,
    533  6084  aw148015 					    flowop->fo_instance);
    534  6391  aw148015 					filebench_shm->shm_f_abort =
    535  6084  aw148015 					    FILEBENCH_ABORT_ERROR;
    536  6084  aw148015 				}
    537  6084  aw148015 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    538  6084  aw148015 				break;
    539  6084  aw148015 			}
    540  6084  aw148015 
    541  6084  aw148015 			/*
    542  6084  aw148015 			 * Return value of FILEBENCH_DONE means "stop
    543  6084  aw148015 			 * the filebench run without error"
    544  6084  aw148015 			 */
    545  6084  aw148015 			if (ret == FILEBENCH_DONE) {
    546  6084  aw148015 				(void) ipc_mutex_lock(&threadflow->tf_lock);
    547  6084  aw148015 				threadflow->tf_abort = FILEBENCH_DONE;
    548  6391  aw148015 				filebench_shm->shm_f_abort =
    549  6391  aw148015 				    FILEBENCH_ABORT_DONE;
    550  5184  ek110237 				(void) ipc_mutex_unlock(&threadflow->tf_lock);
    551  6391  aw148015 				break;
    552  6391  aw148015 			}
    553  6391  aw148015 
    554  6391  aw148015 			/*
    555  6391  aw148015 			 * If we get here and the return is something other
    556  6391  aw148015 			 * than FILEBENCH_OK, it means a spurious code
    557  6391  aw148015 			 * was returned, so treat as major error. This
    558  6391  aw148015 			 * probably indicates a bug in the flowop.
    559  6391  aw148015 			 */
    560  6391  aw148015 			if (ret != FILEBENCH_OK) {
    561  6391  aw148015 				filebench_log(LOG_ERROR,
    562  6391  aw148015 				    "Flowop %s unexpected return value = %d\n",
    563  6391  aw148015 				    flowop->fo_name, ret);
    564  6391  aw148015 				filebench_shm->shm_f_abort =
    565  6391  aw148015 				    FILEBENCH_ABORT_ERROR;
    566  5184  ek110237 				break;
    567  5184  ek110237 			}
    568  5184  ek110237 		}
    569  5184  ek110237 
    570  5184  ek110237 		/* advance to next flowop */
    571  6550  aw148015 		flowop = flowop->fo_exec_next;
    572  5184  ek110237 
    573  5184  ek110237 		/* but if at end of list, start over from the beginning */
    574  5184  ek110237 		if (flowop == NULL) {
    575  6550  aw148015 			flowop = threadflow->tf_thrd_fops;
    576  5184  ek110237 			threadflow->tf_stats.fs_count++;
    577  5184  ek110237 		}
    578  5184  ek110237 	}
    579  5184  ek110237 
    580  5184  ek110237 #ifdef HAVE_LWPS
    581  5184  ek110237 	filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
    582  5184  ek110237 	    _lwp_self());
    583  5184  ek110237 #endif
    584  5184  ek110237 
    585  6084  aw148015 	/* Tell flowops to destroy locally acquired state */
    586  6084  aw148015 	flowop_destruct_all_flows(threadflow);
    587  6084  aw148015 
    588  6084  aw148015 	pthread_exit(&threadflow->tf_abort);
    589  5184  ek110237 }
    590  5184  ek110237 
    591  8615    Andrew void flowoplib_flowinit(void);
    592  8615    Andrew void fb_lfs_flowinit(void);
    593  8615    Andrew 
    594  5184  ek110237 void
    595  5184  ek110237 flowop_init(void)
    596  5184  ek110237 {
    597  7556    Andrew 	(void) pthread_mutex_init(&controlstats_lock,
    598  7556    Andrew 	    ipc_mutexattr(IPC_MUTEX_NORMAL));
    599  8615    Andrew 	flowoplib_flowinit();
    600  5184  ek110237 }
    601  8615    Andrew 
    602  8615    Andrew static int plugin_flowinit_done = FALSE;
    603  8615    Andrew 
    604  8615    Andrew /*
    605  8615    Andrew  * Initialize any "plug-in" flowops. Called when the first "create fileset"
    606  8615    Andrew  * command is encountered.
    607  8615    Andrew  */
    608  8615    Andrew void
    609  8615    Andrew flowop_plugin_flowinit(void)
    610  8615    Andrew {
    611  8615    Andrew 	if (plugin_flowinit_done)
    612  8615    Andrew 		return;
    613  8615    Andrew 
    614  8615    Andrew 	plugin_flowinit_done = TRUE;
    615  8615    Andrew 
    616  8615    Andrew 	switch (filebench_shm->shm_filesys_type) {
    617  8615    Andrew 	case LOCAL_FS_PLUG:
    618  8615    Andrew 		fb_lfs_flowinit();
    619  8615    Andrew 		break;
    620  8615    Andrew 
    621  8615    Andrew 	case NFS3_PLUG:
    622  8615    Andrew 	case NFS4_PLUG:
    623  8615    Andrew 	case CIFS_PLUG:
    624  8615    Andrew 		break;
    625  8615    Andrew 	}
    626  8615    Andrew }
    627  8615    Andrew 
    628  5184  ek110237 
    629  5184  ek110237 /*
    630  5184  ek110237  * Delete the designated flowop from the thread's flowop list.
    631  5184  ek110237  */
    632  5184  ek110237 static void
    633  5184  ek110237 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
    634  5184  ek110237 {
    635  5184  ek110237 	flowop_t *entry = *flowoplist;
    636  5184  ek110237 	int found = 0;
    637  5184  ek110237 
    638  5184  ek110237 	filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
    639  5184  ek110237 	    flowop->fo_name,
    640  5184  ek110237 	    flowop->fo_instance);
    641  5184  ek110237 
    642  5184  ek110237 	/* Delete from thread's flowop list */
    643  5184  ek110237 	if (flowop == *flowoplist) {
    644  5184  ek110237 		/* First on list */
    645  6550  aw148015 		*flowoplist = flowop->fo_exec_next;
    646  5184  ek110237 		filebench_log(LOG_DEBUG_IMPL,
    647  5184  ek110237 		    "Delete0 flowop: (%s-%d)",
    648  5184  ek110237 		    flowop->fo_name,
    649  5184  ek110237 		    flowop->fo_instance);
    650  5184  ek110237 	} else {
    651  6550  aw148015 		while (entry->fo_exec_next) {
    652  5184  ek110237 			filebench_log(LOG_DEBUG_IMPL,
    653  5184  ek110237 			    "Delete0 flowop: (%s-%d) == (%s-%d)",
    654  6550  aw148015 			    entry->fo_exec_next->fo_name,
    655  6550  aw148015 			    entry->fo_exec_next->fo_instance,
    656  5184  ek110237 			    flowop->fo_name,
    657  5184  ek110237 			    flowop->fo_instance);
    658  5184  ek110237 
    659  6550  aw148015 			if (flowop == entry->fo_exec_next) {
    660  5184  ek110237 				/* Delete */
    661  5184  ek110237 				filebench_log(LOG_DEBUG_IMPL,
    662  5184  ek110237 				    "Deleted0 flowop: (%s-%d)",
    663  6550  aw148015 				    entry->fo_exec_next->fo_name,
    664  6550  aw148015 				    entry->fo_exec_next->fo_instance);
    665  6550  aw148015 				entry->fo_exec_next =
    666  6550  aw148015 				    entry->fo_exec_next->fo_exec_next;
    667  5184  ek110237 				break;
    668  5184  ek110237 			}
    669  6550  aw148015 			entry = entry->fo_exec_next;
    670  5184  ek110237 		}
    671  5184  ek110237 	}
    672  5184  ek110237 
    673  5184  ek110237 #ifdef HAVE_PROCFS
    674  5184  ek110237 	/* Close /proc stats */
    675  5184  ek110237 	if (flowop->fo_thread)
    676  5184  ek110237 		(void) close(flowop->fo_thread->tf_lwpusagefd);
    677  5184  ek110237 #endif
    678  5184  ek110237 
    679  5184  ek110237 	/* Delete from global list */
    680  6391  aw148015 	entry = filebench_shm->shm_flowoplist;
    681  5184  ek110237 
    682  6391  aw148015 	if (flowop == filebench_shm->shm_flowoplist) {
    683  5184  ek110237 		/* First on list */
    684  6391  aw148015 		filebench_shm->shm_flowoplist = flowop->fo_next;
    685  5184  ek110237 		found = 1;
    686  5184  ek110237 	} else {
    687  5184  ek110237 		while (entry->fo_next) {
    688  5184  ek110237 			filebench_log(LOG_DEBUG_IMPL,
    689  5184  ek110237 			    "Delete flowop: (%s-%d) == (%s-%d)",
    690  5184  ek110237 			    entry->fo_next->fo_name,
    691  5184  ek110237 			    entry->fo_next->fo_instance,
    692  5184  ek110237 			    flowop->fo_name,
    693  5184  ek110237 			    flowop->fo_instance);
    694  5184  ek110237 
    695  5184  ek110237 			if (flowop == entry->fo_next) {
    696  5184  ek110237 				/* Delete */
    697  5184  ek110237 				entry->fo_next = entry->fo_next->fo_next;
    698  5184  ek110237 				found = 1;
    699  5184  ek110237 				break;
    700  5184  ek110237 			}
    701  5184  ek110237 
    702  5184  ek110237 			entry = entry->fo_next;
    703  5184  ek110237 		}
    704  5184  ek110237 	}
    705  5184  ek110237 	if (found) {
    706  5184  ek110237 		filebench_log(LOG_DEBUG_IMPL,
    707  5184  ek110237 		    "Deleted flowop: (%s-%d)",
    708  5184  ek110237 		    flowop->fo_name,
    709  5184  ek110237 		    flowop->fo_instance);
    710  5184  ek110237 		ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
    711  5184  ek110237 	} else {
    712  5184  ek110237 		filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
    713  5184  ek110237 		    flowop->fo_name,
    714  5184  ek110237 		    flowop->fo_instance);
    715  5184  ek110237 	}
    716  5184  ek110237 }
    717  5184  ek110237 
    718  5184  ek110237 /*
    719  5184  ek110237  * Deletes all the flowops from a flowop list.
    720  5184  ek110237  */
    721  5184  ek110237 void
    722  5184  ek110237 flowop_delete_all(flowop_t **flowoplist)
    723  5184  ek110237 {
    724  5184  ek110237 	flowop_t *flowop = *flowoplist;
    725  8762    Andrew 	flowop_t *next_flowop;
    726  5184  ek110237 
    727  6550  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    728  6550  aw148015 
    729  5184  ek110237 	while (flowop) {
    730  6212  aw148015 		filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
    731  5184  ek110237 		    flowop->fo_name, flowop->fo_instance);
    732  5184  ek110237 
    733  5184  ek110237 		if (flowop->fo_instance &&
    734  5184  ek110237 		    (flowop->fo_instance == FLOW_MASTER)) {
    735  6550  aw148015 			flowop = flowop->fo_exec_next;
    736  5184  ek110237 			continue;
    737  5184  ek110237 		}
    738  8762    Andrew 		next_flowop = flowop->fo_exec_next;
    739  5184  ek110237 		flowop_delete(flowoplist, flowop);
    740  8762    Andrew 		flowop = next_flowop;
    741  5184  ek110237 	}
    742  5184  ek110237 
    743  6391  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    744  5184  ek110237 }
    745  5184  ek110237 
    746  5184  ek110237 /*
    747  5184  ek110237  * Allocates a flowop entity and initializes it with inherited
    748  5184  ek110237  * contents from the "inherit" flowop, if it is supplied, or
    749  6550  aw148015  * with zeros otherwise. In either case the fo_next and fo_exec_next
    750  5184  ek110237  * pointers are set to NULL, and fo_thread is set to point to
    751  5184  ek110237  * the owning threadflow. The initialized flowop is placed at
    752  5184  ek110237  * the head of the global flowop list, and also placed on the
    753  6550  aw148015  * tail of the supplied local flowop list, which will either
    754  6550  aw148015  * be a threadflow's tf_thrd_fops list or a composite flowop's
    755  6550  aw148015  * fo_comp_fops list. The routine locks the flowop's fo_lock and
    756  6550  aw148015  * leaves it held on return. If successful, it returns a pointer
    757  6550  aw148015  * to the allocated and initialized flowop, otherwise it returns NULL.
    758  5184  ek110237  *
    759  6391  aw148015  * filebench_shm->shm_flowop_lock must be held by caller.
    760  5184  ek110237  */
    761  5184  ek110237 static flowop_t *
    762  5184  ek110237 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
    763  6550  aw148015     flowop_t **flowoplist_hdp, int instance, int type)
    764  5184  ek110237 {
    765  5184  ek110237 	flowop_t *flowop;
    766  5184  ek110237 
    767  5184  ek110237 	if (name == NULL)
    768  5184  ek110237 		return (NULL);
    769  5184  ek110237 
    770  5184  ek110237 	if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
    771  5184  ek110237 		filebench_log(LOG_ERROR,
    772  5184  ek110237 		    "flowop_define: Can't malloc flowop");
    773  5184  ek110237 		return (NULL);
    774  5184  ek110237 	}
    775  5184  ek110237 
    776  5184  ek110237 	filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
    777  5184  ek110237 	    name, instance, flowop);
    778  5184  ek110237 
    779  5184  ek110237 	if (flowop == NULL)
    780  5184  ek110237 		return (NULL);
    781  5184  ek110237 
    782  5184  ek110237 	if (inherit) {
    783  5184  ek110237 		(void) memcpy(flowop, inherit, sizeof (flowop_t));
    784  7556    Andrew 		(void) pthread_mutex_init(&flowop->fo_lock,
    785  7556    Andrew 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
    786  5184  ek110237 		(void) ipc_mutex_lock(&flowop->fo_lock);
    787  5184  ek110237 		flowop->fo_next = NULL;
    788  6550  aw148015 		flowop->fo_exec_next = NULL;
    789  5184  ek110237 		filebench_log(LOG_DEBUG_IMPL,
    790  5184  ek110237 		    "flowop %s-%d calling init", name, instance);
    791  5184  ek110237 	} else {
    792  5184  ek110237 		(void) memset(flowop, 0, sizeof (flowop_t));
    793  6212  aw148015 		flowop->fo_iters = avd_int_alloc(1);
    794  5184  ek110237 		flowop->fo_type = type;
    795  7556    Andrew 		(void) pthread_mutex_init(&flowop->fo_lock,
    796  7556    Andrew 		    ipc_mutexattr(IPC_MUTEX_PRI_ROB));
    797  5184  ek110237 		(void) ipc_mutex_lock(&flowop->fo_lock);
    798  5184  ek110237 	}
    799  5184  ek110237 
    800  5184  ek110237 	/* Create backpointer to thread */
    801  5184  ek110237 	flowop->fo_thread = threadflow;
    802  5184  ek110237 
    803  5184  ek110237 	/* Add flowop to global list */
    804  6391  aw148015 	if (filebench_shm->shm_flowoplist == NULL) {
    805  6391  aw148015 		filebench_shm->shm_flowoplist = flowop;
    806  5184  ek110237 		flowop->fo_next = NULL;
    807  5184  ek110237 	} else {
    808  6391  aw148015 		flowop->fo_next = filebench_shm->shm_flowoplist;
    809  6391  aw148015 		filebench_shm->shm_flowoplist = flowop;
    810  5184  ek110237 	}
    811  5184  ek110237 
    812  5184  ek110237 	(void) strcpy(flowop->fo_name, name);
    813  5184  ek110237 	flowop->fo_instance = instance;
    814  5184  ek110237 
    815  6550  aw148015 	if (flowoplist_hdp == NULL)
    816  5184  ek110237 		return (flowop);
    817  5184  ek110237 
    818  5184  ek110237 	/* Add flowop to thread op list */
    819  6550  aw148015 	if (*flowoplist_hdp == NULL) {
    820  6550  aw148015 		*flowoplist_hdp = flowop;
    821  6550  aw148015 		flowop->fo_exec_next = NULL;
    822  5184  ek110237 	} else {
    823  5184  ek110237 		flowop_t *flowend;
    824  5184  ek110237 
    825  5184  ek110237 		/* Find the end of the thread list */
    826  6550  aw148015 		flowend = *flowoplist_hdp;
    827  6550  aw148015 		while (flowend->fo_exec_next != NULL)
    828  6550  aw148015 			flowend = flowend->fo_exec_next;
    829  6550  aw148015 		flowend->fo_exec_next = flowop;
    830  6550  aw148015 		flowop->fo_exec_next = NULL;
    831  5184  ek110237 	}
    832  5184  ek110237 
    833  5184  ek110237 	return (flowop);
    834  5184  ek110237 }
    835  5184  ek110237 
    836  5184  ek110237 /*
    837  5184  ek110237  * Calls flowop_define_common() to allocate and initialize a
    838  5184  ek110237  * flowop, and holds the shared flowop_lock during the call.
    839  5184  ek110237  * It releases the created flowop's fo_lock when done.
    840  5184  ek110237  */
    841  5184  ek110237 flowop_t *
    842  5184  ek110237 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
    843  6550  aw148015     flowop_t **flowoplist_hdp, int instance, int type)
    844  5184  ek110237 {
    845  6212  aw148015 	flowop_t	*flowop;
    846  5184  ek110237 
    847  6391  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    848  5184  ek110237 	flowop = flowop_define_common(threadflow, name,
    849  6550  aw148015 	    inherit, flowoplist_hdp, instance, type);
    850  6391  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    851  5184  ek110237 
    852  5184  ek110237 	if (flowop == NULL)
    853  5184  ek110237 		return (NULL);
    854  5184  ek110237 
    855  6550  aw148015 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    856  7556    Andrew 
    857  6550  aw148015 	return (flowop);
    858  6550  aw148015 }
    859  6550  aw148015 
    860  6550  aw148015 /*
    861  6550  aw148015  * Calls flowop_define_common() to allocate and initialize a
    862  6550  aw148015  * composite flowop, and holds the shared flowop_lock during the call.
    863  6550  aw148015  * It releases the created flowop's fo_lock when done.
    864  6550  aw148015  */
    865  6550  aw148015 flowop_t *
    866  6550  aw148015 flowop_new_composite_define(char *name)
    867  6550  aw148015 {
    868  6550  aw148015 	flowop_t *flowop;
    869  6550  aw148015 
    870  6550  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    871  6550  aw148015 	flowop = flowop_define_common(NULL, name,
    872  6550  aw148015 	    NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
    873  6550  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    874  6550  aw148015 
    875  6550  aw148015 	if (flowop == NULL)
    876  6550  aw148015 		return (NULL);
    877  6550  aw148015 
    878  6550  aw148015 	flowop->fo_func = flowop_composite;
    879  6550  aw148015 	flowop->fo_init = flowop_composite_init;
    880  6550  aw148015 	flowop->fo_destruct = flowop_composite_destruct;
    881  5184  ek110237 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    882  5184  ek110237 
    883  5184  ek110237 	return (flowop);
    884  5184  ek110237 }
    885  5184  ek110237 
    886  5184  ek110237 /*
    887  5184  ek110237  * Attempts to take a write lock on the flowop_find_lock that is
    888  5184  ek110237  * defined in interprocess shared memory. Since each call to
    889  5184  ek110237  * flowop_start() holds a read lock on flowop_find_lock, this
    890  5184  ek110237  * routine effectively blocks until all instances of
    891  5184  ek110237  * flowop_start() have finished. The flowop_find() routine calls
    892  5184  ek110237  * this routine so that flowops won't be searched for until all
    893  5184  ek110237  * flowops have been created by flowop_start.
    894  5184  ek110237  */
    895  5184  ek110237 static void
    896  5184  ek110237 flowop_find_barrier(void)
    897  5184  ek110237 {
    898  5184  ek110237 	/* Block on wrlock to ensure find waits for all creates */
    899  6391  aw148015 	(void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
    900  6391  aw148015 	(void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
    901  5184  ek110237 }
    902  5184  ek110237 
    903  5184  ek110237 /*
    904  5184  ek110237  * Returns a list of flowops named "name" from the master
    905  5184  ek110237  * flowop list.
    906  5184  ek110237  */
    907  5184  ek110237 flowop_t *
    908  5184  ek110237 flowop_find(char *name)
    909  5184  ek110237 {
    910  6212  aw148015 	flowop_t *flowop;
    911  5184  ek110237 	flowop_t *result = NULL;
    912  5184  ek110237 
    913  5184  ek110237 	flowop_find_barrier();
    914  5184  ek110237 
    915  6391  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    916  6212  aw148015 
    917  6391  aw148015 	flowop = filebench_shm->shm_flowoplist;
    918  5184  ek110237 
    919  5184  ek110237 	while (flowop) {
    920  5184  ek110237 		if (strcmp(name, flowop->fo_name) == 0) {
    921  5184  ek110237 
    922  5184  ek110237 			/* Add flowop to result list */
    923  5184  ek110237 			if (result == NULL) {
    924  5184  ek110237 				result = flowop;
    925  5184  ek110237 				flowop->fo_resultnext = NULL;
    926  5184  ek110237 			} else {
    927  5184  ek110237 				flowop->fo_resultnext = result;
    928  5184  ek110237 				result = flowop;
    929  5184  ek110237 			}
    930  5184  ek110237 		}
    931  5184  ek110237 		flowop = flowop->fo_next;
    932  5184  ek110237 	}
    933  5184  ek110237 
    934  6391  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    935  5184  ek110237 
    936  5184  ek110237 
    937  5184  ek110237 	return (result);
    938  5184  ek110237 }
    939  5184  ek110237 
    940  5184  ek110237 /*
    941  5184  ek110237  * Returns a pointer to the specified instance of flowop
    942  6701  aw148015  * "name" from the global list.
    943  5184  ek110237  */
    944  5184  ek110237 flowop_t *
    945  5184  ek110237 flowop_find_one(char *name, int instance)
    946  5184  ek110237 {
    947  6212  aw148015 	flowop_t *test_flowop;
    948  5184  ek110237 
    949  6212  aw148015 	flowop_find_barrier();
    950  5184  ek110237 
    951  6391  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
    952  6212  aw148015 
    953  6391  aw148015 	test_flowop = filebench_shm->shm_flowoplist;
    954  6212  aw148015 
    955  6212  aw148015 	while (test_flowop) {
    956  6212  aw148015 		if ((strcmp(name, test_flowop->fo_name) == 0) &&
    957  6212  aw148015 		    (instance == test_flowop->fo_instance))
    958  5184  ek110237 			break;
    959  6212  aw148015 
    960  6212  aw148015 		test_flowop = test_flowop->fo_next;
    961  5184  ek110237 	}
    962  5184  ek110237 
    963  6391  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
    964  6212  aw148015 
    965  6212  aw148015 	return (test_flowop);
    966  6701  aw148015 }
    967  6701  aw148015 
    968  6701  aw148015 /*
    969  6701  aw148015  * recursively searches through lists of flowops on a given thread
    970  6701  aw148015  * and those on any included composite flowops for the named flowop.
    971  6701  aw148015  * either returns with a pointer to the named flowop or NULL if it
    972  6701  aw148015  * cannot be found.
    973  6701  aw148015  */
    974  6701  aw148015 static flowop_t *
    975  6701  aw148015 flowop_recurse_search(char *path, char *name, flowop_t *list)
    976  6701  aw148015 {
    977  6701  aw148015 	flowop_t *test_flowop;
    978  6701  aw148015 	char fullname[MAXPATHLEN];
    979  6701  aw148015 
    980  6701  aw148015 	test_flowop = list;
    981  6701  aw148015 
    982  6701  aw148015 	/*
    983  6701  aw148015 	 * when searching a list of inner flowops, "path" is the fullname
    984  6701  aw148015 	 * of the containing composite flowop. Use it to form the
    985  6701  aw148015 	 * full name of the inner flowop to search for.
    986  6701  aw148015 	 */
    987  6701  aw148015 	if (path) {
    988  6701  aw148015 		if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
    989  6701  aw148015 			filebench_log(LOG_ERROR,
    990  6701  aw148015 			    "composite flowop path name %s.%s too long",
    991  6701  aw148015 			    path, name);
    992  6701  aw148015 			return (NULL);
    993  6701  aw148015 		}
    994  6701  aw148015 
    995  6701  aw148015 		/* create composite_name.name for recursive search */
    996  6701  aw148015 		(void) strcpy(fullname, path);
    997  6701  aw148015 		(void) strcat(fullname, ".");
    998  6701  aw148015 		(void) strcat(fullname, name);
    999  6701  aw148015 	} else {
   1000  6701  aw148015 		(void) strcpy(fullname, name);
   1001  6701  aw148015 	}
   1002  6701  aw148015 
   1003  6701  aw148015 	/*
   1004  6701  aw148015 	 * loop through all flowops on the supplied tf_thrd_fops (flowop)
   1005  6701  aw148015 	 * list or fo_comp_fops (inner flowop) list.
   1006  6701  aw148015 	 */
   1007  6701  aw148015 	while (test_flowop) {
   1008  6701  aw148015 		if (strcmp(fullname, test_flowop->fo_name) == 0)
   1009  6701  aw148015 			return (test_flowop);
   1010  6701  aw148015 
   1011  6701  aw148015 		if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
   1012  6701  aw148015 			flowop_t *found_flowop;
   1013  6701  aw148015 
   1014  6701  aw148015 			found_flowop = flowop_recurse_search(
   1015  6701  aw148015 			    test_flowop->fo_name, name,
   1016  6701  aw148015 			    test_flowop->fo_comp_fops);
   1017  6701  aw148015 
   1018  6701  aw148015 			if (found_flowop)
   1019  6701  aw148015 				return (found_flowop);
   1020  6701  aw148015 		}
   1021  6701  aw148015 		test_flowop = test_flowop->fo_exec_next;
   1022  6701  aw148015 	}
   1023  6701  aw148015 
   1024  6701  aw148015 	/* not found here or on any child lists */
   1025  6701  aw148015 	return (NULL);
   1026  6701  aw148015 }
   1027  6701  aw148015 
   1028  6701  aw148015 /*
   1029  6701  aw148015  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
   1030  6701  aw148015  * list of flowops. Returns the named flowop if found, or NULL.
   1031  6701  aw148015  */
   1032  6701  aw148015 flowop_t *
   1033  6701  aw148015 flowop_find_from_list(char *name, flowop_t *list)
   1034  6701  aw148015 {
   1035  6701  aw148015 	flowop_t *found_flowop;
   1036  6701  aw148015 
   1037  6701  aw148015 	flowop_find_barrier();
   1038  6701  aw148015 
   1039  6701  aw148015 	(void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
   1040  6701  aw148015 
   1041  6701  aw148015 	found_flowop = flowop_recurse_search(NULL, name, list);
   1042  6701  aw148015 
   1043  6701  aw148015 	(void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
   1044  6701  aw148015 
   1045  6701  aw148015 	return (found_flowop);
   1046  5184  ek110237 }
   1047  6550  aw148015 
   1048  6550  aw148015 /*
   1049  6550  aw148015  * Composite flowop method. Does one pass through its list of
   1050  6550  aw148015  * inner flowops per iteration.
   1051  6550  aw148015  */
   1052  6550  aw148015 static int
   1053  6550  aw148015 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
   1054  6550  aw148015 {
   1055  6550  aw148015 	flowop_t	*inner_flowop;
   1056  6550  aw148015 
   1057  6550  aw148015 	/* get the first flowop in the list */
   1058  6550  aw148015 	inner_flowop = flowop->fo_comp_fops;
   1059  6550  aw148015 
   1060  6550  aw148015 	/* make a pass through the list of sub flowops */
   1061  6550  aw148015 	while (inner_flowop) {
   1062  6550  aw148015 		int	i, count;
   1063  6550  aw148015 
   1064  6550  aw148015 		/* Abort if asked */
   1065  6550  aw148015 		if (threadflow->tf_abort || filebench_shm->shm_f_abort)
   1066  6550  aw148015 			return (FILEBENCH_DONE);
   1067  6550  aw148015 
   1068  6550  aw148015 		if (inner_flowop->fo_stats.fs_stime == 0)
   1069  6550  aw148015 			inner_flowop->fo_stats.fs_stime = gethrtime();
   1070  6550  aw148015 
   1071  6550  aw148015 		/* Execute the flowop for fo_iters times */
   1072  6550  aw148015 		count = (int)avd_get_int(inner_flowop->fo_iters);
   1073  6550  aw148015 		for (i = 0; i < count; i++) {
   1074  6550  aw148015 
   1075  6550  aw148015 			filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
   1076  6550  aw148015 			    "%s-%d", threadflow->tf_name,
   1077  6550  aw148015 			    inner_flowop->fo_name,
   1078  6550  aw148015 			    inner_flowop->fo_instance);
   1079  6550  aw148015 
   1080  6550  aw148015 			switch ((*inner_flowop->fo_func)(threadflow,
   1081  6550  aw148015 			    inner_flowop)) {
   1082  6550  aw148015 
   1083  6550  aw148015 			/* all done */
   1084  6550  aw148015 			case FILEBENCH_DONE:
   1085  6550  aw148015 				return (FILEBENCH_DONE);
   1086  6550  aw148015 
   1087  6550  aw148015 			/* quit if inner flowop limit reached */
   1088  6550  aw148015 			case FILEBENCH_NORSC:
   1089  6550  aw148015 				return (FILEBENCH_NORSC);
   1090  6550  aw148015 
   1091  6550  aw148015 			/* quit on inner flowop error */
   1092  6550  aw148015 			case FILEBENCH_ERROR:
   1093  6550  aw148015 				filebench_log(LOG_ERROR,
   1094  6550  aw148015 				    "inner flowop %s failed",
   1095  6550  aw148015 				    inner_flowop->fo_name);
   1096  6550  aw148015 				return (FILEBENCH_ERROR);
   1097  6550  aw148015 
   1098  6550  aw148015 			/* otherwise keep going */
   1099  6550  aw148015 			default:
   1100  6550  aw148015 				break;
   1101  6550  aw148015 			}
   1102  6550  aw148015 
   1103  6550  aw148015 		}
   1104  6550  aw148015 
   1105  6550  aw148015 		/* advance to next flowop */
   1106  6550  aw148015 		inner_flowop = inner_flowop->fo_exec_next;
   1107  6550  aw148015 	}
   1108  6550  aw148015 
   1109  6550  aw148015 	/* finished with this pass */
   1110  6550  aw148015 	return (FILEBENCH_OK);
   1111  6550  aw148015 }
   1112  6550  aw148015 
   1113  6550  aw148015 /*
   1114  6550  aw148015  * Composite flowop initialization. Creates runtime inner flowops
   1115  6550  aw148015  * from prototype inner flowops.
   1116  6550  aw148015  */
   1117  6550  aw148015 static int
   1118  6550  aw148015 flowop_composite_init(flowop_t *flowop)
   1119  6550  aw148015 {
   1120  6550  aw148015 	int err;
   1121  6550  aw148015 
   1122  6550  aw148015 	err = flowop_create_runtime_flowops(flowop->fo_thread,
   1123  6550  aw148015 	    &flowop->fo_comp_fops);
   1124  6550  aw148015 	if (err != FILEBENCH_OK)
   1125  6550  aw148015 		return (err);
   1126  6550  aw148015 
   1127  6550  aw148015 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1128  6550  aw148015 	return (0);
   1129  6550  aw148015 }
   1130  6550  aw148015 
   1131  6550  aw148015 /*
   1132  6550  aw148015  * clean up inner flowops
   1133  6550  aw148015  */
   1134  6550  aw148015 static void
   1135  6550  aw148015 flowop_composite_destruct(flowop_t *flowop)
   1136  6550  aw148015 {
   1137  6550  aw148015 	flowop_t *inner_flowop = flowop->fo_comp_fops;
   1138  6550  aw148015 
   1139  6550  aw148015 	while (inner_flowop) {
   1140  6550  aw148015 		filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
   1141  6550  aw148015 		    inner_flowop->fo_name, inner_flowop->fo_instance);
   1142  6550  aw148015 
   1143  6550  aw148015 		if (inner_flowop->fo_instance &&
   1144  6550  aw148015 		    (inner_flowop->fo_instance == FLOW_MASTER)) {
   1145  6550  aw148015 			inner_flowop = inner_flowop->fo_exec_next;
   1146  6550  aw148015 			continue;
   1147  6550  aw148015 		}
   1148  6550  aw148015 		flowop_delete(&flowop->fo_comp_fops, inner_flowop);
   1149  6550  aw148015 		inner_flowop = inner_flowop->fo_exec_next;
   1150  6550  aw148015 	}
   1151  6550  aw148015 }
   1152  8615    Andrew 
   1153  8615    Andrew /*
   1154  8615    Andrew  * Support routines for libraries of flowops
   1155  8615    Andrew  */
   1156  8615    Andrew 
   1157  8615    Andrew int
   1158  8615    Andrew flowop_init_generic(flowop_t *flowop)
   1159  8615    Andrew {
   1160  8615    Andrew 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1161  8615    Andrew 	return (FILEBENCH_OK);
   1162  8615    Andrew }
   1163  8615    Andrew 
   1164  8615    Andrew void
   1165  8615    Andrew flowop_destruct_generic(flowop_t *flowop)
   1166  8615    Andrew {
   1167  8615    Andrew 	char *buf;
   1168  8615    Andrew 
   1169  8615    Andrew 	/* release any local resources held by the flowop */
   1170  8615    Andrew 	(void) ipc_mutex_lock(&flowop->fo_lock);
   1171  8615    Andrew 	buf = flowop->fo_buf;
   1172  8615    Andrew 	flowop->fo_buf = NULL;
   1173  8615    Andrew 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   1174  8615    Andrew 
   1175  8615    Andrew 	if (buf)
   1176  8615    Andrew 		free(buf);
   1177  8615    Andrew }
   1178  8615    Andrew 
   1179  8615    Andrew 
   1180  8615    Andrew /*
   1181  8615    Andrew  * Loops through the supplied list of flowops and creates and initializes
   1182  8615    Andrew  * a flowop for each one by calling flowop_define. As a side effect of
   1183  8615    Andrew  * calling flowop define, the created flowops are placed on the
   1184  8615    Andrew  * master flowop list. All created flowops are set to instance "0".
   1185  8615    Andrew  */
   1186  8615    Andrew void
   1187  8615    Andrew flowop_flow_init(flowop_proto_t *list, int nops)
   1188  8615    Andrew {
   1189  8615    Andrew 	int i;
   1190  8615    Andrew 
   1191  8615    Andrew 	for (i = 0; i < nops; i++) {
   1192  8615    Andrew 		flowop_t *flowop;
   1193  8615    Andrew 		flowop_proto_t *fl;
   1194  8615    Andrew 
   1195  8615    Andrew 		fl = &(list[i]);
   1196  8615    Andrew 
   1197  8615    Andrew 		if ((flowop = flowop_define(NULL,
   1198  8615    Andrew 		    fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
   1199  8615    Andrew 			filebench_log(LOG_ERROR,
   1200  8615    Andrew 			    "failed to create flowop %s\n",
   1201  8615    Andrew 			    fl->fl_name);
   1202  8615    Andrew 			filebench_shutdown(1);
   1203  8615    Andrew 		}
   1204  8615    Andrew 
   1205  8615    Andrew 		flowop->fo_func = fl->fl_func;
   1206  8615    Andrew 		flowop->fo_init = fl->fl_init;
   1207  8615    Andrew 		flowop->fo_destruct = fl->fl_destruct;
   1208  8615    Andrew 		flowop->fo_attrs = fl->fl_attrs;
   1209  8615    Andrew 	}
   1210  8615    Andrew }
   1211