Home | History | Annotate | Download | only in common
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  *
     25  * Portions Copyright 2008 Denis Cheng
     26  */
     27 
     28 #include "config.h"
     29 
     30 #include <sys/types.h>
     31 #ifdef HAVE_SYS_ASYNCH_H
     32 #include <sys/asynch.h>
     33 #endif
     34 #include <stddef.h>
     35 #include <sys/ipc.h>
     36 #include <sys/sem.h>
     37 #include <sys/errno.h>
     38 #include <sys/time.h>
     39 #include <inttypes.h>
     40 #include <fcntl.h>
     41 #include <math.h>
     42 #include <dirent.h>
     43 
     44 #ifdef HAVE_UTILITY_H
     45 #include <utility.h>
     46 #endif /* HAVE_UTILITY_H */
     47 
     48 #ifdef HAVE_SYS_ASYNC_H
     49 #include <sys/asynch.h>
     50 #endif /* HAVE_SYS_ASYNC_H */
     51 
     52 #ifndef HAVE_SYSV_SEM
     53 #include <semaphore.h>
     54 #endif /* HAVE_SYSV_SEM */
     55 
     56 #include "filebench.h"
     57 #include "flowop.h"
     58 #include "fileset.h"
     59 #include "fb_random.h"
     60 #include "utils.h"
     61 #include "fsplug.h"
     62 
     63 /*
     64  * These routines implement the flowops from the f language. Each
     65  * flowop has has a name such as "read", and a set of function pointers
     66  * to call for initialization, execution and destruction of the flowop.
     67  * The table flowoplib_funcs[] contains a flowoplib struct for each
     68  * implemented flowop. Most flowops use a generic initialization function
     69  * and all currently use a generic destruction function. All flowop
     70  * functions referenced from the table are in this file, though, of
     71  * course, they often call functions from other files.
     72  *
     73  * The flowop_init() routine uses the flowoplib_funcs[] table to
     74  * create an initial set of "instance 0" flowops, one for each type of
     75  * flowop, from which all other flowops are derived. These "instance 0"
     76  * flowops are initialized with information from the table including
     77  * pointers for their fo_init, fo_func and fo_destroy functions. When
     78  * a flowop definition is encountered in an f language script, the
     79  * "type" of flowop, such as "read" is used to search for the
     80  * "instance 0" flowop named "read", then a new flowop is allocated
     81  * which inherits its function pointers and other initial properties
     82  * from the instance 0 flowop, and is given a new name as specified
     83  * by the "name=" attribute.
     84  */
     85 
     86 static void flowoplib_destruct_noop(flowop_t *flowop);
     87 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
     88 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
     89 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
     90 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
     91 static int flowoplib_block_init(flowop_t *flowop);
     92 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
     93 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
     94 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
     95 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
     96 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
     97 static int flowoplib_sempost_init(flowop_t *flowop);
     98 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
     99 static int flowoplib_semblock_init(flowop_t *flowop);
    100 static void flowoplib_semblock_destruct(flowop_t *flowop);
    101 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
    102 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
    103 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
    104 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
    105 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
    106 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
    107 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
    108 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
    109 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
    110 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
    111 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
    112 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
    113 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
    114 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
    115 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
    116 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
    117 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
    118 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
    119 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
    120 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
    121 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
    122 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
    123 static int flowoplib_testrandvar_init(flowop_t *flowop);
    124 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
    125 
    126 static flowop_proto_t flowoplib_funcs[] = {
    127 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowop_init_generic,
    128 	flowoplib_write, flowop_destruct_generic,
    129 	FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowop_init_generic,
    130 	flowoplib_read, flowop_destruct_generic,
    131 	FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
    132 	flowoplib_block, flowop_destruct_generic,
    133 	FLOW_TYPE_SYNC, 0, "wakeup", flowop_init_generic,
    134 	flowoplib_wakeup, flowop_destruct_generic,
    135 	FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
    136 	flowoplib_semblock, flowoplib_semblock_destruct,
    137 	FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
    138 	flowoplib_sempost, flowoplib_destruct_noop,
    139 	FLOW_TYPE_OTHER, 0, "hog", flowop_init_generic,
    140 	flowoplib_hog, flowop_destruct_generic,
    141 	FLOW_TYPE_OTHER, 0, "delay", flowop_init_generic,
    142 	flowoplib_delay, flowop_destruct_generic,
    143 	FLOW_TYPE_OTHER, 0, "eventlimit", flowop_init_generic,
    144 	flowoplib_eventlimit, flowop_destruct_generic,
    145 	FLOW_TYPE_OTHER, 0, "bwlimit", flowop_init_generic,
    146 	flowoplib_bwlimit, flowop_destruct_generic,
    147 	FLOW_TYPE_OTHER, 0, "iopslimit", flowop_init_generic,
    148 	flowoplib_iopslimit, flowop_destruct_generic,
    149 	FLOW_TYPE_OTHER, 0, "opslimit", flowop_init_generic,
    150 	flowoplib_opslimit, flowop_destruct_generic,
    151 	FLOW_TYPE_OTHER, 0, "finishoncount", flowop_init_generic,
    152 	flowoplib_finishoncount, flowop_destruct_generic,
    153 	FLOW_TYPE_OTHER, 0, "finishonbytes", flowop_init_generic,
    154 	flowoplib_finishonbytes, flowop_destruct_generic,
    155 	FLOW_TYPE_IO, 0, "openfile", flowop_init_generic,
    156 	flowoplib_openfile, flowop_destruct_generic,
    157 	FLOW_TYPE_IO, 0, "createfile", flowop_init_generic,
    158 	flowoplib_createfile, flowop_destruct_generic,
    159 	FLOW_TYPE_IO, 0, "closefile", flowop_init_generic,
    160 	flowoplib_closefile, flowop_destruct_generic,
    161 	FLOW_TYPE_IO, 0, "makedir", flowop_init_generic,
    162 	flowoplib_makedir, flowop_destruct_generic,
    163 	FLOW_TYPE_IO, 0, "removedir", flowop_init_generic,
    164 	flowoplib_removedir, flowop_destruct_generic,
    165 	FLOW_TYPE_IO, 0, "listdir", flowop_init_generic,
    166 	flowoplib_listdir, flowop_destruct_generic,
    167 	FLOW_TYPE_IO, 0, "fsync", flowop_init_generic,
    168 	flowoplib_fsync, flowop_destruct_generic,
    169 	FLOW_TYPE_IO, 0, "fsyncset", flowop_init_generic,
    170 	flowoplib_fsyncset, flowop_destruct_generic,
    171 	FLOW_TYPE_IO, 0, "statfile", flowop_init_generic,
    172 	flowoplib_statfile, flowop_destruct_generic,
    173 	FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowop_init_generic,
    174 	flowoplib_readwholefile, flowop_destruct_generic,
    175 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowop_init_generic,
    176 	flowoplib_appendfile, flowop_destruct_generic,
    177 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowop_init_generic,
    178 	flowoplib_appendfilerand, flowop_destruct_generic,
    179 	FLOW_TYPE_IO, 0, "deletefile", flowop_init_generic,
    180 	flowoplib_deletefile, flowop_destruct_generic,
    181 	FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowop_init_generic,
    182 	flowoplib_writewholefile, flowop_destruct_generic,
    183 	FLOW_TYPE_OTHER, 0, "print", flowop_init_generic,
    184 	flowoplib_print, flowop_destruct_generic,
    185 	/* routine to calculate mean and stddev for output from a randvar */
    186 	FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
    187 	flowoplib_testrandvar, flowoplib_testrandvar_destruct
    188 };
    189 
    190 /*
    191  * Loops through the list of flowops defined in this
    192  * module, and creates and initializes a flowop for each one
    193  * by calling flowop_flow_init. As a side effect of calling
    194  * flowop_flow_init, the created flowops are placed on the
    195  * master flowop list. All created flowops are set to
    196  * instance "0".
    197  */
    198 void
    199 flowoplib_flowinit()
    200 {
    201 	int nops = sizeof (flowoplib_funcs) / sizeof (flowop_proto_t);
    202 
    203 	flowop_flow_init(flowoplib_funcs, nops);
    204 }
    205 
    206 /*
    207  * Special total noop destruct
    208  */
    209 /* ARGSUSED */
    210 static void
    211 flowoplib_destruct_noop(flowop_t *flowop)
    212 {
    213 }
    214 
    215 /*
    216  * Generates a file attribute from flags in the supplied flowop.
    217  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
    218  */
    219 static int
    220 flowoplib_fileattrs(flowop_t *flowop)
    221 {
    222 	int attrs = 0;
    223 
    224 	if (avd_get_bool(flowop->fo_directio))
    225 		attrs |= FLOW_ATTR_DIRECTIO;
    226 
    227 	if (avd_get_bool(flowop->fo_dsync))
    228 		attrs |= FLOW_ATTR_DSYNC;
    229 
    230 	return (attrs);
    231 }
    232 
    233 /*
    234  * Obtain a filesetentry for a file. Result placed where filep points.
    235  * Supply with a flowop and a flag to indicate whether an existent or
    236  * non-existent file is required. Returns FILEBENCH_NORSC if all out
    237  * of the appropriate type of directories, FILEBENCH_ERROR if the
    238  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
    239  */
    240 static int
    241 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
    242 {
    243 	fileset_t	*fileset;
    244 	int		fileindex;
    245 
    246 	if ((fileset = flowop->fo_fileset) == NULL) {
    247 		filebench_log(LOG_ERROR, "flowop NO fileset");
    248 		return (FILEBENCH_ERROR);
    249 	}
    250 
    251 	if (flowop->fo_fileindex) {
    252 		fileindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
    253 		    ((double)(fileset->fs_constentries / 2)));
    254 		fileindex = fileindex % fileset->fs_constentries;
    255 		flags |= FILESET_PICKBYINDEX;
    256 	} else {
    257 		fileindex = 0;
    258 	}
    259 
    260 	if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
    261 	    tid, fileindex)) == NULL) {
    262 		filebench_log(LOG_DEBUG_SCRIPT,
    263 		    "flowop %s failed to pick file from fileset %s",
    264 		    flowop->fo_name,
    265 		    avd_get_str(fileset->fs_name));
    266 		return (FILEBENCH_NORSC);
    267 	}
    268 
    269 	return (FILEBENCH_OK);
    270 }
    271 
    272 /*
    273  * Obtain a filesetentry for a leaf directory. Result placed where dirp
    274  * points. Supply with flowop and a flag to indicate whether an existent
    275  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
    276  * if all out of the appropriate type of directories, FILEBENCH_ERROR
    277  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
    278  */
    279 static int
    280 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
    281 {
    282 	fileset_t	*fileset;
    283 	int		dirindex;
    284 
    285 	if ((fileset = flowop->fo_fileset) == NULL) {
    286 		filebench_log(LOG_ERROR, "flowop NO fileset");
    287 		return (FILEBENCH_ERROR);
    288 	}
    289 
    290 	if (flowop->fo_fileindex) {
    291 		dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
    292 		    ((double)(fileset->fs_constleafdirs / 2)));
    293 		dirindex = dirindex % fileset->fs_constleafdirs;
    294 		flags |= FILESET_PICKBYINDEX;
    295 	} else {
    296 		dirindex = 0;
    297 	}
    298 
    299 	if ((*dirp = fileset_pick(fileset,
    300 	    FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
    301 		filebench_log(LOG_DEBUG_SCRIPT,
    302 		    "flowop %s failed to pick directory from fileset %s",
    303 		    flowop->fo_name,
    304 		    avd_get_str(fileset->fs_name));
    305 		return (FILEBENCH_NORSC);
    306 	}
    307 
    308 	return (FILEBENCH_OK);
    309 }
    310 
    311 /*
    312  * Searches for a file descriptor. Tries the flowop's
    313  * fo_fdnumber first and returns with it if it has been
    314  * explicitly set (greater than 0). It next checks to
    315  * see if a rotating file descriptor policy is in effect,
    316  * and if not returns the fdnumber regardless of what
    317  * it is. (note that if it is 0, it just selects to the
    318  * default file descriptor in the threadflow's tf_fd
    319  * array). If the rotating fd policy is in effect, it
    320  * cycles from the end of the tf_fd array to one location
    321  * beyond the maximum needed by the number of entries in
    322  * the associated fileset on each invocation, then starts
    323  * over from the end.
    324  *
    325  * The routine returns an index into the threadflow's
    326  * tf_fd table where the actual file descriptor will be
    327  * found. Note: the calling routine must not call this
    328  * routine if the flowop does not have a fileset, and the
    329  * flowop's fo_fdnumber is zero and fo_rotatefd is
    330  * asserted, or an addressing fault may occur.
    331  */
    332 static int
    333 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
    334 {
    335 	fbint_t	entries;
    336 	int fdnumber = flowop->fo_fdnumber;
    337 
    338 	/* If the script sets the fd explicitly */
    339 	if (fdnumber > 0)
    340 		return (fdnumber);
    341 
    342 	/* If the flowop defaults to persistent fd */
    343 	if (!avd_get_bool(flowop->fo_rotatefd))
    344 		return (fdnumber);
    345 
    346 	if (flowop->fo_fileset == NULL) {
    347 		filebench_log(LOG_ERROR, "flowop NULL file");
    348 		return (FILEBENCH_ERROR);
    349 	}
    350 
    351 	entries = flowop->fo_fileset->fs_constentries;
    352 
    353 	/* Rotate the fd on each flowop invocation */
    354 	if (entries > (THREADFLOW_MAXFD / 2)) {
    355 		filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
    356 		    " (too many files : %llu",
    357 		    flowop->fo_name, (u_longlong_t)entries);
    358 		return (FILEBENCH_ERROR);
    359 	}
    360 
    361 	/* First time around */
    362 	if (threadflow->tf_fdrotor == 0)
    363 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
    364 
    365 	/* One fd for every file in the set */
    366 	if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
    367 		threadflow->tf_fdrotor = THREADFLOW_MAXFD;
    368 
    369 
    370 	threadflow->tf_fdrotor--;
    371 	filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
    372 	    threadflow->tf_fdrotor);
    373 	return (threadflow->tf_fdrotor);
    374 }
    375 
    376 /*
    377  * Determines the file descriptor to use, and attempts to open
    378  * the file if it is not already open. Also determines the wss
    379  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
    380  * if flowop_openfile_common couldn't obtain an appropriate file
    381  * from a the fileset, and FILEBENCH_OK otherwise.
    382  */
    383 static int
    384 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
    385     fbint_t *wssp, fb_fdesc_t **fdescp)
    386 {
    387 	int fd = flowoplib_fdnum(threadflow, flowop);
    388 
    389 	if (fd == -1)
    390 		return (FILEBENCH_ERROR);
    391 
    392 	/* check for conflicting fdnumber and file name */
    393 	if ((fd > 0) && (threadflow->tf_fse[fd] != NULL)) {
    394 		char *fd_based_name;
    395 
    396 		fd_based_name =
    397 		    avd_get_str(threadflow->tf_fse[fd]->fse_fileset->fs_name);
    398 
    399 		if (flowop->fo_filename != NULL) {
    400 			char *fo_based_name;
    401 
    402 			fo_based_name = avd_get_str(flowop->fo_filename);
    403 			if (strcmp(fd_based_name, fo_based_name) != 0) {
    404 				filebench_log(LOG_ERROR, "Name of fd refer"
    405 				    "enced fileset name (%s) CONFLICTS with"
    406 				    " flowop supplied fileset name (%s)",
    407 				    fd_based_name, fo_based_name);
    408 				filebench_shutdown(1);
    409 				return (FILEBENCH_ERROR);
    410 			}
    411 		}
    412 	}
    413 
    414 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
    415 		int ret;
    416 
    417 		if ((ret = flowoplib_openfile_common(
    418 		    threadflow, flowop, fd)) != FILEBENCH_OK)
    419 			return (ret);
    420 
    421 		if (threadflow->tf_fse[fd]) {
    422 			filebench_log(LOG_DEBUG_IMPL, "opened file %s",
    423 			    threadflow->tf_fse[fd]->fse_path);
    424 		} else {
    425 			filebench_log(LOG_DEBUG_IMPL,
    426 			    "opened device %s/%s",
    427 			    avd_get_str(flowop->fo_fileset->fs_path),
    428 			    avd_get_str(flowop->fo_fileset->fs_name));
    429 		}
    430 	}
    431 
    432 	*fdescp = &(threadflow->tf_fd[fd]);
    433 
    434 	if ((*wssp = flowop->fo_constwss) == 0) {
    435 		if (threadflow->tf_fse[fd])
    436 			*wssp = threadflow->tf_fse[fd]->fse_size;
    437 		else
    438 			*wssp = avd_get_int(flowop->fo_fileset->fs_size);
    439 	}
    440 
    441 	return (FILEBENCH_OK);
    442 }
    443 
    444 /*
    445  * Determines the io buffer or random offset into tf_mem for
    446  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
    447  */
    448 static int
    449 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
    450     caddr_t *iobufp, fbint_t iosize)
    451 {
    452 	long memsize;
    453 	size_t memoffset;
    454 
    455 	if (iosize == 0) {
    456 		filebench_log(LOG_ERROR, "zero iosize for thread %s",
    457 		    flowop->fo_name);
    458 		return (FILEBENCH_ERROR);
    459 	}
    460 
    461 	if ((memsize = threadflow->tf_constmemsize) != 0) {
    462 
    463 		/* use tf_mem for I/O with random offset */
    464 		if (filebench_randomno(&memoffset,
    465 		    memsize, iosize, NULL) == -1) {
    466 			filebench_log(LOG_ERROR,
    467 			    "tf_memsize smaller than IO size for thread %s",
    468 			    flowop->fo_name);
    469 			return (FILEBENCH_ERROR);
    470 		}
    471 		*iobufp = threadflow->tf_mem + memoffset;
    472 
    473 	} else {
    474 		/* use private I/O buffer */
    475 		if ((flowop->fo_buf != NULL) &&
    476 		    (flowop->fo_buf_size < iosize)) {
    477 			/* too small, so free up and re-allocate */
    478 			free(flowop->fo_buf);
    479 			flowop->fo_buf = NULL;
    480 		}
    481 
    482 		/*
    483 		 * Allocate memory for the  buffer. The memory is freed
    484 		 * by flowop_destruct_generic() or by this routine if more
    485 		 * memory is needed for the buffer.
    486 		 */
    487 		if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
    488 		    = (char *)malloc(iosize)) == NULL))
    489 			return (FILEBENCH_ERROR);
    490 
    491 		flowop->fo_buf_size = iosize;
    492 		*iobufp = flowop->fo_buf;
    493 	}
    494 	return (FILEBENCH_OK);
    495 }
    496 
    497 /*
    498  * Determines the file descriptor to use, opens it if necessary, the
    499  * io buffer or random offset into tf_mem for IO operation and the wss
    500  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
    501  */
    502 int
    503 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
    504     fbint_t *wssp, caddr_t *iobufp, fb_fdesc_t **filedescp, fbint_t iosize)
    505 {
    506 	int ret;
    507 
    508 	if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
    509 	    FILEBENCH_OK)
    510 		return (ret);
    511 
    512 	if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
    513 	    FILEBENCH_OK)
    514 		return (ret);
    515 
    516 	return (FILEBENCH_OK);
    517 }
    518 
    519 /*
    520  * Emulate posix read / pread. If the flowop has a fileset,
    521  * a file descriptor number index is fetched, otherwise a
    522  * supplied fileobj file is used. In either case the specified
    523  * file will be opened if not already open. If the flowop has
    524  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
    525  * returned.
    526  *
    527  * The actual read is done to a random offset in the
    528  * threadflow's thread memory (tf_mem), with a size set by
    529  * fo_iosize and at either a random disk offset within the
    530  * working set size, or at the next sequential location. If
    531  * any errors are encountered, FILEBENCH_ERROR is returned,
    532  * if no appropriate file can be obtained from the fileset then
    533  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
    534  */
    535 static int
    536 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
    537 {
    538 	caddr_t iobuf;
    539 	fbint_t wss;
    540 	fbint_t iosize;
    541 	fb_fdesc_t *fdesc;
    542 	int ret;
    543 
    544 
    545 	iosize = avd_get_int(flowop->fo_iosize);
    546 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
    547 	    &fdesc, iosize)) != FILEBENCH_OK)
    548 		return (ret);
    549 
    550 	if (avd_get_bool(flowop->fo_random)) {
    551 		uint64_t fileoffset;
    552 
    553 		if (filebench_randomno64(&fileoffset,
    554 		    wss, iosize, NULL) == -1) {
    555 			filebench_log(LOG_ERROR,
    556 			    "file size smaller than IO size for thread %s",
    557 			    flowop->fo_name);
    558 			return (FILEBENCH_ERROR);
    559 		}
    560 
    561 		(void) flowop_beginop(threadflow, flowop);
    562 		if ((ret = FB_PREAD(fdesc, iobuf,
    563 		    iosize, (off64_t)fileoffset)) == -1) {
    564 			(void) flowop_endop(threadflow, flowop, 0);
    565 			filebench_log(LOG_ERROR,
    566 			    "read file %s failed, offset %llu "
    567 			    "io buffer %zd: %s",
    568 			    avd_get_str(flowop->fo_fileset->fs_name),
    569 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
    570 			flowop_endop(threadflow, flowop, 0);
    571 			return (FILEBENCH_ERROR);
    572 		}
    573 		(void) flowop_endop(threadflow, flowop, ret);
    574 
    575 		if ((ret == 0))
    576 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
    577 
    578 	} else {
    579 		(void) flowop_beginop(threadflow, flowop);
    580 		if ((ret = FB_READ(fdesc, iobuf, iosize)) == -1) {
    581 			(void) flowop_endop(threadflow, flowop, 0);
    582 			filebench_log(LOG_ERROR,
    583 			    "read file %s failed, io buffer %zd: %s",
    584 			    avd_get_str(flowop->fo_fileset->fs_name),
    585 			    iobuf, strerror(errno));
    586 			(void) flowop_endop(threadflow, flowop, 0);
    587 			return (FILEBENCH_ERROR);
    588 		}
    589 		(void) flowop_endop(threadflow, flowop, ret);
    590 
    591 		if ((ret == 0))
    592 			(void) FB_LSEEK(fdesc, 0, SEEK_SET);
    593 	}
    594 
    595 	return (FILEBENCH_OK);
    596 }
    597 
    598 /*
    599  * Initializes a "flowop_block" flowop. Specifically, it
    600  * initializes the flowop's fo_cv and unlocks the fo_lock.
    601  */
    602 static int
    603 flowoplib_block_init(flowop_t *flowop)
    604 {
    605 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
    606 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
    607 	(void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
    608 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    609 
    610 	return (FILEBENCH_OK);
    611 }
    612 
    613 /*
    614  * Blocks the threadflow until woken up by flowoplib_wakeup.
    615  * The routine blocks on the flowop's fo_cv condition variable.
    616  */
    617 static int
    618 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
    619 {
    620 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
    621 	    flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
    622 	(void) ipc_mutex_lock(&flowop->fo_lock);
    623 
    624 	flowop_beginop(threadflow, flowop);
    625 	(void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
    626 	flowop_endop(threadflow, flowop, 0);
    627 
    628 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
    629 	    flowop->fo_name, flowop->fo_instance);
    630 
    631 	(void) ipc_mutex_unlock(&flowop->fo_lock);
    632 
    633 	return (FILEBENCH_OK);
    634 }
    635 
    636 /*
    637  * Wakes up one or more target blocking flowops.
    638  * Sends broadcasts on the fo_cv condition variables of all
    639  * flowops on the target list, except those that are
    640  * FLOW_MASTER flowops. The target list consists of all
    641  * flowops whose name matches this flowop's "fo_targetname"
    642  * attribute. The target list is generated on the first
    643  * invocation, and the run will be shutdown if no targets
    644  * are found. Otherwise the routine always returns FILEBENCH_OK.
    645  */
    646 static int
    647 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
    648 {
    649 	flowop_t *target;
    650 
    651 	/* if this is the first wakeup, create the wakeup list */
    652 	if (flowop->fo_targets == NULL) {
    653 		flowop_t *result = flowop_find(flowop->fo_targetname);
    654 
    655 		flowop->fo_targets = result;
    656 		if (result == NULL) {
    657 			filebench_log(LOG_ERROR,
    658 			    "wakeup: could not find op %s for thread %s",
    659 			    flowop->fo_targetname,
    660 			    threadflow->tf_name);
    661 			filebench_shutdown(1);
    662 		}
    663 		while (result) {
    664 			result->fo_targetnext =
    665 			    result->fo_resultnext;
    666 			result = result->fo_resultnext;
    667 		}
    668 	}
    669 
    670 	target = flowop->fo_targets;
    671 
    672 	/* wakeup the targets */
    673 	while (target) {
    674 		if (target->fo_instance == FLOW_MASTER) {
    675 			target = target->fo_targetnext;
    676 			continue;
    677 		}
    678 		filebench_log(LOG_DEBUG_IMPL,
    679 		    "wakeup flow %s-%d at address %zx",
    680 		    target->fo_name,
    681 		    target->fo_instance,
    682 		    &target->fo_cv);
    683 
    684 		flowop_beginop(threadflow, flowop);
    685 		(void) ipc_mutex_lock(&target->fo_lock);
    686 		(void) pthread_cond_broadcast(&target->fo_cv);
    687 		(void) ipc_mutex_unlock(&target->fo_lock);
    688 		flowop_endop(threadflow, flowop, 0);
    689 
    690 		target = target->fo_targetnext;
    691 	}
    692 
    693 	return (FILEBENCH_OK);
    694 }
    695 
    696 /*
    697  * "think time" routines. the "hog" routine consumes cpu cycles as
    698  * it "thinks", while the "delay" flowop simply calls sleep() to delay
    699  * for a given number of seconds without consuming cpu cycles.
    700  */
    701 
    702 
    703 /*
    704  * Consumes CPU cycles and memory bandwidth by looping for
    705  * flowop->fo_value times. With each loop sets memory location
    706  * threadflow->tf_mem to 1.
    707  */
    708 static int
    709 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
    710 {
    711 	uint64_t value = avd_get_int(flowop->fo_value);
    712 	int i;
    713 
    714 	filebench_log(LOG_DEBUG_IMPL, "hog enter");
    715 	flowop_beginop(threadflow, flowop);
    716 	if (threadflow->tf_mem != NULL) {
    717 		for (i = 0; i < value; i++)
    718 			*(threadflow->tf_mem) = 1;
    719 	}
    720 	flowop_endop(threadflow, flowop, 0);
    721 	filebench_log(LOG_DEBUG_IMPL, "hog exit");
    722 	return (FILEBENCH_OK);
    723 }
    724 
    725 
    726 /*
    727  * Delays for fo_value seconds.
    728  */
    729 static int
    730 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
    731 {
    732 	int value = avd_get_int(flowop->fo_value);
    733 
    734 	flowop_beginop(threadflow, flowop);
    735 	(void) sleep(value);
    736 	flowop_endop(threadflow, flowop, 0);
    737 	return (FILEBENCH_OK);
    738 }
    739 
    740 /*
    741  * Rate limiting routines. This is the event consuming half of the
    742  * event system. Each of the four following routines will limit the rate
    743  * to one unit of either calls, issued I/O operations, issued filebench
    744  * operations, or I/O bandwidth. Since there is only one event generator,
    745  * the events will be divided amoung multiple instances of an event
    746  * consumer, and further divided among different consumers if more than
    747  * one has been defined. There is no mechanism to enforce equal sharing
    748  * of events.
    749  */
    750 
    751 /*
    752  * Completes one invocation per posted event. If eventgen_q
    753  * has an event count greater than zero, one will be removed
    754  * (count decremented), otherwise the calling thread will
    755  * block until another event has been posted. Always returns 0
    756  */
    757 static int
    758 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
    759 {
    760 	/* Immediately bail if not set/enabled */
    761 	if (!filebench_shm->shm_eventgen_enabled)
    762 		return (FILEBENCH_OK);
    763 
    764 	if (flowop->fo_initted == 0) {
    765 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
    766 		    flowop, threadflow->tf_name, threadflow->tf_instance);
    767 		flowop->fo_initted = 1;
    768 	}
    769 
    770 	flowop_beginop(threadflow, flowop);
    771 	while (filebench_shm->shm_eventgen_enabled) {
    772 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
    773 		if (filebench_shm->shm_eventgen_q > 0) {
    774 			filebench_shm->shm_eventgen_q--;
    775 			(void) ipc_mutex_unlock(
    776 			    &filebench_shm->shm_eventgen_lock);
    777 			break;
    778 		}
    779 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
    780 		    &filebench_shm->shm_eventgen_lock);
    781 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
    782 	}
    783 	flowop_endop(threadflow, flowop, 0);
    784 	return (FILEBENCH_OK);
    785 }
    786 
    787 static int
    788 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
    789 {
    790 	if (flowop->fo_targetname[0] != '\0') {
    791 
    792 		/* Try to use statistics from specific flowop */
    793 		flowop->fo_targets =
    794 		    flowop_find_from_list(flowop->fo_targetname,
    795 		    threadflow->tf_thrd_fops);
    796 		if (flowop->fo_targets == NULL) {
    797 			filebench_log(LOG_ERROR,
    798 			    "limit target: could not find flowop %s",
    799 			    flowop->fo_targetname);
    800 			filebench_shutdown(1);
    801 			return (FILEBENCH_ERROR);
    802 		}
    803 	} else {
    804 		/* use total workload statistics */
    805 		flowop->fo_targets = NULL;
    806 	}
    807 	return (FILEBENCH_OK);
    808 }
    809 
    810 /*
    811  * Blocks the calling thread if the number of issued I/O
    812  * operations exceeds the number of posted events, thus
    813  * limiting the average I/O operation rate to the rate
    814  * specified by eventgen_hz. Always returns FILEBENCH_OK.
    815  */
    816 static int
    817 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
    818 {
    819 	uint64_t iops;
    820 	uint64_t delta;
    821 	uint64_t events;
    822 
    823 	/* Immediately bail if not set/enabled */
    824 	if (!filebench_shm->shm_eventgen_enabled)
    825 		return (FILEBENCH_OK);
    826 
    827 	if (flowop->fo_initted == 0) {
    828 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
    829 		    flowop, threadflow->tf_name, threadflow->tf_instance);
    830 		flowop->fo_initted = 1;
    831 
    832 		if (flowoplib_event_find_target(threadflow, flowop)
    833 		    == FILEBENCH_ERROR)
    834 			return (FILEBENCH_ERROR);
    835 
    836 		if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
    837 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
    838 			filebench_log(LOG_ERROR,
    839 			    "WARNING: Flowop %s does no IO",
    840 			    flowop->fo_targets->fo_name);
    841 			filebench_shutdown(1);
    842 			return (FILEBENCH_ERROR);
    843 		}
    844 	}
    845 
    846 	if (flowop->fo_targets) {
    847 		/*
    848 		 * Note that fs_count is already the sum of fs_rcount
    849 		 * and fs_wcount if looking at a single flowop.
    850 		 */
    851 		iops = flowop->fo_targets->fo_stats.fs_count;
    852 	} else {
    853 		(void) ipc_mutex_lock(&controlstats_lock);
    854 		iops = (controlstats.fs_rcount +
    855 		    controlstats.fs_wcount);
    856 		(void) ipc_mutex_unlock(&controlstats_lock);
    857 	}
    858 
    859 	/* Is this the first time around */
    860 	if (flowop->fo_tputlast == 0) {
    861 		flowop->fo_tputlast = iops;
    862 		return (FILEBENCH_OK);
    863 	}
    864 
    865 	delta = iops - flowop->fo_tputlast;
    866 	flowop->fo_tputbucket -= delta;
    867 	flowop->fo_tputlast = iops;
    868 
    869 	/* No need to block if the q isn't empty */
    870 	if (flowop->fo_tputbucket >= 0LL) {
    871 		flowop_endop(threadflow, flowop, 0);
    872 		return (FILEBENCH_OK);
    873 	}
    874 
    875 	iops = flowop->fo_tputbucket * -1;
    876 	events = iops;
    877 
    878 	flowop_beginop(threadflow, flowop);
    879 	while (filebench_shm->shm_eventgen_enabled) {
    880 
    881 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
    882 		if (filebench_shm->shm_eventgen_q >= events) {
    883 			filebench_shm->shm_eventgen_q -= events;
    884 			(void) ipc_mutex_unlock(
    885 			    &filebench_shm->shm_eventgen_lock);
    886 			flowop->fo_tputbucket += events;
    887 			break;
    888 		}
    889 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
    890 		    &filebench_shm->shm_eventgen_lock);
    891 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
    892 	}
    893 	flowop_endop(threadflow, flowop, 0);
    894 
    895 	return (FILEBENCH_OK);
    896 }
    897 
    898 /*
    899  * Blocks the calling thread if the number of issued filebench
    900  * operations exceeds the number of posted events, thus limiting
    901  * the average filebench operation rate to the rate specified by
    902  * eventgen_hz. Always returns FILEBENCH_OK.
    903  */
    904 static int
    905 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
    906 {
    907 	uint64_t ops;
    908 	uint64_t delta;
    909 	uint64_t events;
    910 
    911 	/* Immediately bail if not set/enabled */
    912 	if (!filebench_shm->shm_eventgen_enabled)
    913 		return (FILEBENCH_OK);
    914 
    915 	if (flowop->fo_initted == 0) {
    916 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
    917 		    flowop, threadflow->tf_name, threadflow->tf_instance);
    918 		flowop->fo_initted = 1;
    919 
    920 		if (flowoplib_event_find_target(threadflow, flowop)
    921 		    == FILEBENCH_ERROR)
    922 			return (FILEBENCH_ERROR);
    923 	}
    924 
    925 	if (flowop->fo_targets) {
    926 		ops = flowop->fo_targets->fo_stats.fs_count;
    927 	} else {
    928 		(void) ipc_mutex_lock(&controlstats_lock);
    929 		ops = controlstats.fs_count;
    930 		(void) ipc_mutex_unlock(&controlstats_lock);
    931 	}
    932 
    933 	/* Is this the first time around */
    934 	if (flowop->fo_tputlast == 0) {
    935 		flowop->fo_tputlast = ops;
    936 		return (FILEBENCH_OK);
    937 	}
    938 
    939 	delta = ops - flowop->fo_tputlast;
    940 	flowop->fo_tputbucket -= delta;
    941 	flowop->fo_tputlast = ops;
    942 
    943 	/* No need to block if the q isn't empty */
    944 	if (flowop->fo_tputbucket >= 0LL) {
    945 		flowop_endop(threadflow, flowop, 0);
    946 		return (FILEBENCH_OK);
    947 	}
    948 
    949 	ops = flowop->fo_tputbucket * -1;
    950 	events = ops;
    951 
    952 	flowop_beginop(threadflow, flowop);
    953 	while (filebench_shm->shm_eventgen_enabled) {
    954 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
    955 		if (filebench_shm->shm_eventgen_q >= events) {
    956 			filebench_shm->shm_eventgen_q -= events;
    957 			(void) ipc_mutex_unlock(
    958 			    &filebench_shm->shm_eventgen_lock);
    959 			flowop->fo_tputbucket += events;
    960 			break;
    961 		}
    962 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
    963 		    &filebench_shm->shm_eventgen_lock);
    964 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
    965 	}
    966 	flowop_endop(threadflow, flowop, 0);
    967 
    968 	return (FILEBENCH_OK);
    969 }
    970 
    971 
    972 /*
    973  * Blocks the calling thread if the number of bytes of I/O
    974  * issued exceeds one megabyte times the number of posted
    975  * events, thus limiting the average I/O byte rate to one
    976  * megabyte times the event rate as set by eventgen_hz.
    977  * Always retuns FILEBENCH_OK.
    978  */
    979 static int
    980 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
    981 {
    982 	uint64_t bytes;
    983 	uint64_t delta;
    984 	uint64_t events;
    985 
    986 	/* Immediately bail if not set/enabled */
    987 	if (!filebench_shm->shm_eventgen_enabled)
    988 		return (FILEBENCH_OK);
    989 
    990 	if (flowop->fo_initted == 0) {
    991 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
    992 		    flowop, threadflow->tf_name, threadflow->tf_instance);
    993 		flowop->fo_initted = 1;
    994 
    995 		if (flowoplib_event_find_target(threadflow, flowop)
    996 		    == FILEBENCH_ERROR)
    997 			return (FILEBENCH_ERROR);
    998 
    999 		if ((flowop->fo_targets) &&
   1000 		    ((flowop->fo_targets->fo_attrs &
   1001 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
   1002 			filebench_log(LOG_ERROR,
   1003 			    "WARNING: Flowop %s does no Reads or Writes",
   1004 			    flowop->fo_targets->fo_name);
   1005 			filebench_shutdown(1);
   1006 			return (FILEBENCH_ERROR);
   1007 		}
   1008 	}
   1009 
   1010 	if (flowop->fo_targets) {
   1011 		/*
   1012 		 * Note that fs_bytes is already the sum of fs_rbytes
   1013 		 * and fs_wbytes if looking at a single flowop.
   1014 		 */
   1015 		bytes = flowop->fo_targets->fo_stats.fs_bytes;
   1016 	} else {
   1017 		(void) ipc_mutex_lock(&controlstats_lock);
   1018 		bytes = (controlstats.fs_rbytes +
   1019 		    controlstats.fs_wbytes);
   1020 		(void) ipc_mutex_unlock(&controlstats_lock);
   1021 	}
   1022 
   1023 	/* Is this the first time around? */
   1024 	if (flowop->fo_tputlast == 0) {
   1025 		flowop->fo_tputlast = bytes;
   1026 		return (FILEBENCH_OK);
   1027 	}
   1028 
   1029 	delta = bytes - flowop->fo_tputlast;
   1030 	flowop->fo_tputbucket -= delta;
   1031 	flowop->fo_tputlast = bytes;
   1032 
   1033 	/* No need to block if the q isn't empty */
   1034 	if (flowop->fo_tputbucket >= 0LL) {
   1035 		flowop_endop(threadflow, flowop, 0);
   1036 		return (FILEBENCH_OK);
   1037 	}
   1038 
   1039 	bytes = flowop->fo_tputbucket * -1;
   1040 	events = (bytes / MB) + 1;
   1041 
   1042 	filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
   1043 	    (u_longlong_t)bytes, (u_longlong_t)events);
   1044 
   1045 	flowop_beginop(threadflow, flowop);
   1046 	while (filebench_shm->shm_eventgen_enabled) {
   1047 		(void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
   1048 		if (filebench_shm->shm_eventgen_q >= events) {
   1049 			filebench_shm->shm_eventgen_q -= events;
   1050 			(void) ipc_mutex_unlock(
   1051 			    &filebench_shm->shm_eventgen_lock);
   1052 			flowop->fo_tputbucket += (events * MB);
   1053 			break;
   1054 		}
   1055 		(void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
   1056 		    &filebench_shm->shm_eventgen_lock);
   1057 		(void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
   1058 	}
   1059 	flowop_endop(threadflow, flowop, 0);
   1060 
   1061 	return (FILEBENCH_OK);
   1062 }
   1063 
   1064 /*
   1065  * These flowops terminate a benchmark run when either the specified
   1066  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
   1067  * number of I/O operations (flowoplib_finishoncount) have been generated.
   1068  */
   1069 
   1070 
   1071 /*
   1072  * Stop filebench run when specified number of I/O bytes have been
   1073  * transferred. Compares controlstats.fs_bytes with flowop->value,
   1074  * and if greater returns 1, stopping the run, if not, returns 0
   1075  * to continue running.
   1076  */
   1077 static int
   1078 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
   1079 {
   1080 	uint64_t bytes_io;		/* Bytes of I/O delivered so far */
   1081 	uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
   1082 						    /* Uses constant value */
   1083 
   1084 	if (flowop->fo_initted == 0) {
   1085 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
   1086 		    flowop, threadflow->tf_name, threadflow->tf_instance);
   1087 		flowop->fo_initted = 1;
   1088 
   1089 		if (flowoplib_event_find_target(threadflow, flowop)
   1090 		    == FILEBENCH_ERROR)
   1091 			return (FILEBENCH_ERROR);
   1092 
   1093 		if ((flowop->fo_targets) &&
   1094 		    ((flowop->fo_targets->fo_attrs &
   1095 		    (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
   1096 			filebench_log(LOG_ERROR,
   1097 			    "WARNING: Flowop %s does no Reads or Writes",
   1098 			    flowop->fo_targets->fo_name);
   1099 			filebench_shutdown(1);
   1100 			return (FILEBENCH_ERROR);
   1101 		}
   1102 	}
   1103 
   1104 	if (flowop->fo_targets) {
   1105 		bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
   1106 	} else {
   1107 		(void) ipc_mutex_lock(&controlstats_lock);
   1108 		bytes_io = controlstats.fs_bytes;
   1109 		(void) ipc_mutex_unlock(&controlstats_lock);
   1110 	}
   1111 
   1112 	flowop_beginop(threadflow, flowop);
   1113 	if (bytes_io > byte_lim) {
   1114 		flowop_endop(threadflow, flowop, 0);
   1115 		return (FILEBENCH_DONE);
   1116 	}
   1117 	flowop_endop(threadflow, flowop, 0);
   1118 
   1119 	return (FILEBENCH_OK);
   1120 }
   1121 
   1122 /*
   1123  * Stop filebench run when specified number of I/O operations have
   1124  * been performed. Compares controlstats.fs_count with *flowop->value,
   1125  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
   1126  * to continue running.
   1127  */
   1128 static int
   1129 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
   1130 {
   1131 	uint64_t ops;
   1132 	uint64_t count = flowop->fo_constvalue; /* use constant value */
   1133 
   1134 	if (flowop->fo_initted == 0) {
   1135 		filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
   1136 		    flowop, threadflow->tf_name, threadflow->tf_instance);
   1137 		flowop->fo_initted = 1;
   1138 
   1139 		if (flowoplib_event_find_target(threadflow, flowop)
   1140 		    == FILEBENCH_ERROR)
   1141 			return (FILEBENCH_ERROR);
   1142 	}
   1143 
   1144 	if (flowop->fo_targets) {
   1145 		ops = flowop->fo_targets->fo_stats.fs_count;
   1146 	} else {
   1147 		(void) ipc_mutex_lock(&controlstats_lock);
   1148 		ops = controlstats.fs_count;
   1149 		(void) ipc_mutex_unlock(&controlstats_lock);
   1150 	}
   1151 
   1152 	flowop_beginop(threadflow, flowop);
   1153 	if (ops >= count) {
   1154 		flowop_endop(threadflow, flowop, 0);
   1155 		return (FILEBENCH_DONE);
   1156 	}
   1157 	flowop_endop(threadflow, flowop, 0);
   1158 
   1159 	return (FILEBENCH_OK);
   1160 }
   1161 
   1162 /*
   1163  * Semaphore synchronization using either System V semaphores or
   1164  * posix semaphores. If System V semaphores are available, they will be
   1165  * used, otherwise posix semaphores will be used.
   1166  */
   1167 
   1168 
   1169 /*
   1170  * Initializes the filebench "block on semaphore" flowop.
   1171  * If System V semaphores are implemented, the routine
   1172  * initializes the System V semaphore subsystem if it hasn't
   1173  * already been initialized, also allocates a pair of semids
   1174  * and initializes the highwater System V semaphore.
   1175  * If no System V semaphores, then does nothing special.
   1176  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
   1177  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
   1178  * on success.
   1179  */
   1180 static int
   1181 flowoplib_semblock_init(flowop_t *flowop)
   1182 {
   1183 
   1184 #ifdef HAVE_SYSV_SEM
   1185 	int sys_semid;
   1186 	struct sembuf sbuf[2];
   1187 	int highwater;
   1188 
   1189 	ipc_seminit();
   1190 
   1191 	flowop->fo_semid_lw = ipc_semidalloc();
   1192 	flowop->fo_semid_hw = ipc_semidalloc();
   1193 
   1194 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
   1195 	    flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
   1196 
   1197 	sys_semid = filebench_shm->shm_sys_semid;
   1198 
   1199 	if ((highwater = flowop->fo_semid_hw) == 0)
   1200 		highwater = flowop->fo_constvalue; /* use constant value */
   1201 
   1202 	filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
   1203 
   1204 	sbuf[0].sem_num = (short)highwater;
   1205 	sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
   1206 	sbuf[0].sem_flg = 0;
   1207 	if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
   1208 		filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
   1209 		    "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
   1210 		return (FILEBENCH_ERROR);
   1211 	}
   1212 #else
   1213 	filebench_log(LOG_DEBUG_IMPL,
   1214 	    "flow %s-%d semblock init with posix semaphore",
   1215 	    flowop->fo_name, flowop->fo_instance);
   1216 
   1217 	sem_init(&flowop->fo_sem, 1, 0);
   1218 #endif	/* HAVE_SYSV_SEM */
   1219 
   1220 	if (!(avd_get_bool(flowop->fo_blocking)))
   1221 		(void) ipc_mutex_unlock(&flowop->fo_lock);
   1222 
   1223 	return (FILEBENCH_OK);
   1224 }
   1225 
   1226 /*
   1227  * Releases the semids for the System V semaphore allocated
   1228  * to this flowop. If not using System V semaphores, then
   1229  * it is effectively just a no-op.
   1230  */
   1231 static void
   1232 flowoplib_semblock_destruct(flowop_t *flowop)
   1233 {
   1234 #ifdef HAVE_SYSV_SEM
   1235 	ipc_semidfree(flowop->fo_semid_lw);
   1236 	ipc_semidfree(flowop->fo_semid_hw);
   1237 #else
   1238 	sem_destroy(&flowop->fo_sem);
   1239 #endif /* HAVE_SYSV_SEM */
   1240 }
   1241 
   1242 /*
   1243  * Attempts to pass a System V or posix semaphore as appropriate,
   1244  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
   1245  * semphores is not available or cannot be acquired, or if the initial
   1246  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
   1247  */
   1248 static int
   1249 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
   1250 {
   1251 
   1252 #ifdef HAVE_SYSV_SEM
   1253 	struct sembuf sbuf[2];
   1254 	int value = avd_get_int(flowop->fo_value);
   1255 	int sys_semid;
   1256 	struct timespec timeout;
   1257 
   1258 	sys_semid = filebench_shm->shm_sys_semid;
   1259 
   1260 	filebench_log(LOG_DEBUG_IMPL,
   1261 	    "flow %s-%d sem blocking on id %x num %x value %d",
   1262 	    flowop->fo_name, flowop->fo_instance, sys_semid,
   1263 	    flowop->fo_semid_hw, value);
   1264 
   1265 	/* Post, decrement the increment the hw queue */
   1266 	sbuf[0].sem_num = flowop->fo_semid_hw;
   1267 	sbuf[0].sem_op = (short)value;
   1268 	sbuf[0].sem_flg = 0;
   1269 	sbuf[1].sem_num = flowop->fo_semid_lw;
   1270 	sbuf[1].sem_op = value * -1;
   1271 	sbuf[1].sem_flg = 0;
   1272 	timeout.tv_sec = 600;
   1273 	timeout.tv_nsec = 0;
   1274 
   1275 	if (avd_get_bool(flowop->fo_blocking))
   1276 		(void) ipc_mutex_unlock(&flowop->fo_lock);
   1277 
   1278 	flowop_beginop(threadflow, flowop);
   1279 
   1280 #ifdef HAVE_SEMTIMEDOP
   1281 	(void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
   1282 	(void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
   1283 #else
   1284 	(void) semop(sys_semid, &sbuf[0], 1);
   1285 	(void) semop(sys_semid, &sbuf[1], 1);
   1286 #endif /* HAVE_SEMTIMEDOP */
   1287 
   1288 	if (avd_get_bool(flowop->fo_blocking))
   1289 		(void) ipc_mutex_lock(&flowop->fo_lock);
   1290 
   1291 	flowop_endop(threadflow, flowop, 0);
   1292 
   1293 #else
   1294 	int value = avd_get_int(flowop->fo_value);
   1295 	int i;
   1296 
   1297 	filebench_log(LOG_DEBUG_IMPL,
   1298 	    "flow %s-%d sem blocking on posix semaphore",
   1299 	    flowop->fo_name, flowop->fo_instance);
   1300 
   1301 	/* Decrement sem by value */
   1302 	for (i = 0; i < value; i++) {
   1303 		if (sem_wait(&flowop->fo_sem) == -1) {
   1304 			filebench_log(LOG_ERROR, "semop wait failed");
   1305 			return (FILEBENCH_ERROR);
   1306 		}
   1307 	}
   1308 
   1309 	filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
   1310 	    flowop->fo_name, flowop->fo_instance);
   1311 #endif /* HAVE_SYSV_SEM */
   1312 
   1313 	return (FILEBENCH_OK);
   1314 }
   1315 
   1316 /*
   1317  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
   1318  */
   1319 /* ARGSUSED */
   1320 static int
   1321 flowoplib_sempost_init(flowop_t *flowop)
   1322 {
   1323 #ifdef HAVE_SYSV_SEM
   1324 	ipc_seminit();
   1325 #endif /* HAVE_SYSV_SEM */
   1326 	return (FILEBENCH_OK);
   1327 }
   1328 
   1329 /*
   1330  * Post to a System V or posix semaphore as appropriate.
   1331  * On the first call for a given flowop instance, this routine
   1332  * will use the fo_targetname attribute to locate all semblock
   1333  * flowops that are expecting posts from this flowop. All
   1334  * target flowops on this list will have a post operation done
   1335  * to their semaphores on each call.
   1336  */
   1337 static int
   1338 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
   1339 {
   1340 	flowop_t *target;
   1341 
   1342 	filebench_log(LOG_DEBUG_IMPL,
   1343 	    "sempost flow %s-%d",
   1344 	    flowop->fo_name,
   1345 	    flowop->fo_instance);
   1346 
   1347 	/* if this is the first post, create the post list */
   1348 	if (flowop->fo_targets == NULL) {
   1349 		flowop_t *result = flowop_find(flowop->fo_targetname);
   1350 
   1351 		flowop->fo_targets = result;
   1352 
   1353 		if (result == NULL) {
   1354 			filebench_log(LOG_ERROR,
   1355 			    "sempost: could not find op %s for thread %s",
   1356 			    flowop->fo_targetname,
   1357 			    threadflow->tf_name);
   1358 			filebench_shutdown(1);
   1359 		}
   1360 
   1361 		while (result) {
   1362 			result->fo_targetnext =
   1363 			    result->fo_resultnext;
   1364 			result = result->fo_resultnext;
   1365 		}
   1366 	}
   1367 
   1368 	target = flowop->fo_targets;
   1369 
   1370 	flowop_beginop(threadflow, flowop);
   1371 	/* post to the targets */
   1372 	while (target) {
   1373 #ifdef HAVE_SYSV_SEM
   1374 		struct sembuf sbuf[2];
   1375 		int sys_semid;
   1376 		int blocking;
   1377 #else
   1378 		int i;
   1379 #endif /* HAVE_SYSV_SEM */
   1380 		struct timespec timeout;
   1381 		int value = (int)avd_get_int(flowop->fo_value);
   1382 
   1383 		if (target->fo_instance == FLOW_MASTER) {
   1384 			target = target->fo_targetnext;
   1385 			continue;
   1386 		}
   1387 
   1388 #ifdef HAVE_SYSV_SEM
   1389 
   1390 		filebench_log(LOG_DEBUG_IMPL,
   1391 		    "sempost flow %s-%d num %x",
   1392 		    target->fo_name,
   1393 		    target->fo_instance,
   1394 		    target->fo_semid_lw);
   1395 
   1396 		sys_semid = filebench_shm->shm_sys_semid;
   1397 		sbuf[0].sem_num = target->fo_semid_lw;
   1398 		sbuf[0].sem_op = (short)value;
   1399 		sbuf[0].sem_flg = 0;
   1400 		sbuf[1].sem_num = target->fo_semid_hw;
   1401 		sbuf[1].sem_op = value * -1;
   1402 		sbuf[1].sem_flg = 0;
   1403 		timeout.tv_sec = 600;
   1404 		timeout.tv_nsec = 0;
   1405 
   1406 		if (avd_get_bool(flowop->fo_blocking))
   1407 			blocking = 1;
   1408 		else
   1409 			blocking = 0;
   1410 
   1411 #ifdef HAVE_SEMTIMEDOP
   1412 		if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
   1413 		    &timeout) == -1) && (errno && (errno != EAGAIN))) {
   1414 #else
   1415 		if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
   1416 		    (errno && (errno != EAGAIN))) {
   1417 #endif /* HAVE_SEMTIMEDOP */
   1418 			filebench_log(LOG_ERROR, "semop post failed: %s",
   1419 			    strerror(errno));
   1420 			return (FILEBENCH_ERROR);
   1421 		}
   1422 
   1423 		filebench_log(LOG_DEBUG_IMPL,
   1424 		    "flow %s-%d finished posting",
   1425 		    target->fo_name, target->fo_instance);
   1426 #else
   1427 		filebench_log(LOG_DEBUG_IMPL,
   1428 		    "sempost flow %s-%d to posix semaphore",
   1429 		    target->fo_name,
   1430 		    target->fo_instance);
   1431 
   1432 		/* Increment sem by value */
   1433 		for (i = 0; i < value; i++) {
   1434 			if (sem_post(&target->fo_sem) == -1) {
   1435 				filebench_log(LOG_ERROR, "semop post failed");
   1436 				return (FILEBENCH_ERROR);
   1437 			}
   1438 		}
   1439 
   1440 		filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
   1441 		    target->fo_name, target->fo_instance);
   1442 #endif /* HAVE_SYSV_SEM */
   1443 
   1444 		target = target->fo_targetnext;
   1445 	}
   1446 	flowop_endop(threadflow, flowop, 0);
   1447 
   1448 	return (FILEBENCH_OK);
   1449 }
   1450 
   1451 
   1452 /*
   1453  * Section for exercising create / open / close / delete operations
   1454  * on files within a fileset. For proper operation, the flowop attribute
   1455  * "fd", which sets the fo_fdnumber field in the flowop, must be used
   1456  * so that the same file is opened and later closed. "fd" is an index
   1457  * into a pair of arrays maintained by threadflows, one of which
   1458  * contains the operating system assigned file descriptors and the other
   1459  * a pointer to the filesetentry whose file the file descriptor
   1460  * references. An openfile flowop defined without fd being set will use
   1461  * the default (0) fd or, if specified, rotate through fd indices, but
   1462  * createfile and closefile must use the default or a specified fd.
   1463  * Meanwhile deletefile picks and arbitrary file to delete, regardless
   1464  * of fd attribute.
   1465  */
   1466 
   1467 /*
   1468  * Emulates (and actually does) file open. Obtains a file descriptor
   1469  * index, then calls flowoplib_openfile_common() to open. Returns
   1470  * FILEBENCH_ERROR if no file descriptor is found, and returns the
   1471  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
   1472  * FILEBENCH_NORSC, FILEBENCH_OK).
   1473  */
   1474 static int
   1475 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
   1476 {
   1477 	int fd = flowoplib_fdnum(threadflow, flowop);
   1478 
   1479 	if (fd == -1)
   1480 		return (FILEBENCH_ERROR);
   1481 
   1482 	return (flowoplib_openfile_common(threadflow, flowop, fd));
   1483 }
   1484 
   1485 /*
   1486  * Common file opening code for filesets. Uses the supplied
   1487  * file descriptor index to determine the tf_fd entry to use.
   1488  * If the entry is empty (0) and the fileset exists, fileset
   1489  * pick is called to select a fileset entry to use. The file
   1490  * specified in the filesetentry is opened, and the returned
   1491  * operating system file descriptor and a pointer to the
   1492  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
   1493  * respectively. Returns FILEBENCH_ERROR on error,
   1494  * FILEBENCH_NORSC if no suitable filesetentry can be found,
   1495  * and FILEBENCH_OK on success.
   1496  */
   1497 static int
   1498 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
   1499 {
   1500 	filesetentry_t *file;
   1501 	char *fileset_name;
   1502 	int tid = 0;
   1503 	int openflag = 0;
   1504 	int err;
   1505 
   1506 	if (flowop->fo_fileset == NULL) {
   1507 		filebench_log(LOG_ERROR, "flowop NULL file");
   1508 		return (FILEBENCH_ERROR);
   1509 	}
   1510 
   1511 	if ((fileset_name =
   1512 	    avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
   1513 		filebench_log(LOG_ERROR,
   1514 		    "flowop %s: fileset has no name", flowop->fo_name);
   1515 		return (FILEBENCH_ERROR);
   1516 	}
   1517 
   1518 	/*
   1519 	 * set the open flag for read only or read/write, as appropriate.
   1520 	 */
   1521 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE)
   1522 		openflag = O_RDONLY;
   1523 	else
   1524 		openflag = O_RDWR;
   1525 
   1526 	/*
   1527 	 * If the flowop doesn't default to persistent fd
   1528 	 * then get unique thread ID for use by fileset_pick
   1529 	 */
   1530 	if (avd_get_bool(flowop->fo_rotatefd))
   1531 		tid = threadflow->tf_utid;
   1532 
   1533 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
   1534 		filebench_log(LOG_ERROR,
   1535 		    "flowop %s attempted to open without closing on fd %d",
   1536 		    flowop->fo_name, fd);
   1537 		return (FILEBENCH_ERROR);
   1538 	}
   1539 
   1540 #ifdef HAVE_RAW_SUPPORT
   1541 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
   1542 		int open_attrs = 0;
   1543 		char name[MAXPATHLEN];
   1544 
   1545 		(void) fb_strlcpy(name,
   1546 		    avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
   1547 		(void) fb_strlcat(name, "/", MAXPATHLEN);
   1548 		(void) fb_strlcat(name, fileset_name, MAXPATHLEN);
   1549 
   1550 		if (avd_get_bool(flowop->fo_dsync)) {
   1551 #ifdef sun
   1552 			open_attrs |= O_DSYNC;
   1553 #else
   1554 			open_attrs |= O_FSYNC;
   1555 #endif
   1556 		}
   1557 
   1558 		filebench_log(LOG_DEBUG_SCRIPT,
   1559 		    "open raw device %s flags %d = %d", name, open_attrs, fd);
   1560 
   1561 		if (FB_OPEN(&(threadflow->tf_fd[fd]), name,
   1562 		    openflag | open_attrs, 0666) == FILEBENCH_ERROR) {
   1563 			filebench_log(LOG_ERROR,
   1564 			    "Failed to open raw device %s: %s",
   1565 			    name, strerror(errno));
   1566 			return (FILEBENCH_ERROR);
   1567 		}
   1568 
   1569 		/* if running on Solaris, use un-buffered io */
   1570 #ifdef sun
   1571 		(void) directio(threadflow->tf_fd[fd].fd_num, DIRECTIO_ON);
   1572 #endif
   1573 
   1574 		threadflow->tf_fse[fd] = NULL;
   1575 
   1576 		return (FILEBENCH_OK);
   1577 	}
   1578 #endif /* HAVE_RAW_SUPPORT */
   1579 
   1580 	if ((err = flowoplib_pickfile(&file, flowop,
   1581 	    FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
   1582 		filebench_log(LOG_DEBUG_SCRIPT,
   1583 		    "flowop %s failed to pick file from %s on fd %d",
   1584 		    flowop->fo_name, fileset_name, fd);
   1585 		return (err);
   1586 	}
   1587 
   1588 	threadflow->tf_fse[fd] = file;
   1589 
   1590 	flowop_beginop(threadflow, flowop);
   1591 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
   1592 	    file, openflag, 0666, flowoplib_fileattrs(flowop));
   1593 	flowop_endop(threadflow, flowop, 0);
   1594 
   1595 	if (err == FILEBENCH_ERROR) {
   1596 		filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
   1597 		    flowop->fo_name, file->fse_path);
   1598 		return (FILEBENCH_ERROR);
   1599 	}
   1600 
   1601 	filebench_log(LOG_DEBUG_SCRIPT,
   1602 	    "flowop %s: opened %s fd[%d] = %d",
   1603 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
   1604 
   1605 	return (FILEBENCH_OK);
   1606 }
   1607 
   1608 /*
   1609  * Emulate create of a file. Uses the flowop's fdnumber to select
   1610  * tf_fd and tf_fse array locations to put the created file's file
   1611  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
   1612  * to select a specific filesetentry whose file does not currently
   1613  * exist for the file create operation. Then calls
   1614  * fileset_openfile() with the O_CREATE flag set to create the
   1615  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
   1616  * already in use, the flowop has no associated fileset, or
   1617  * the create call fails. Returns 1 if a filesetentry with a
   1618  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
   1619  */
   1620 static int
   1621 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
   1622 {
   1623 	filesetentry_t *file;
   1624 	int fd = flowop->fo_fdnumber;
   1625 	int err;
   1626 
   1627 	if (threadflow->tf_fd[fd].fd_ptr != NULL) {
   1628 		filebench_log(LOG_ERROR,
   1629 		    "flowop %s attempted to create without closing on fd %d",
   1630 		    flowop->fo_name, fd);
   1631 		return (FILEBENCH_ERROR);
   1632 	}
   1633 
   1634 	if (flowop->fo_fileset == NULL) {
   1635 		filebench_log(LOG_ERROR, "flowop NULL file");
   1636 		return (FILEBENCH_ERROR);
   1637 	}
   1638 
   1639 	if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE) {
   1640 		filebench_log(LOG_ERROR, "Can not CREATE the READONLY file %s",
   1641 		    avd_get_str(flowop->fo_fileset->fs_name));
   1642 		return (FILEBENCH_ERROR);
   1643 	}
   1644 
   1645 
   1646 #ifdef HAVE_RAW_SUPPORT
   1647 	/* can't be used with raw devices */
   1648 	if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
   1649 		filebench_log(LOG_ERROR,
   1650 		    "flowop %s attempted to a createfile on RAW device",
   1651 		    flowop->fo_name);
   1652 		return (FILEBENCH_ERROR);
   1653 	}
   1654 #endif /* HAVE_RAW_SUPPORT */
   1655 
   1656 	if ((err = flowoplib_pickfile(&file, flowop,
   1657 	    FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
   1658 		filebench_log(LOG_DEBUG_SCRIPT,
   1659 		    "flowop %s failed to pick file from fileset %s",
   1660 		    flowop->fo_name,
   1661 		    avd_get_str(flowop->fo_fileset->fs_name));
   1662 		return (err);
   1663 	}
   1664 
   1665 	threadflow->tf_fse[fd] = file;
   1666 
   1667 	flowop_beginop(threadflow, flowop);
   1668 	err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
   1669 	    file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
   1670 	flowop_endop(threadflow, flowop, 0);
   1671 
   1672 	if (err == FILEBENCH_ERROR) {
   1673 		filebench_log(LOG_ERROR, "failed to create file %s",
   1674 		    flowop->fo_name);
   1675 		return (FILEBENCH_ERROR);
   1676 	}
   1677 
   1678 	filebench_log(LOG_DEBUG_SCRIPT,
   1679 	    "flowop %s: created %s fd[%d] = %d",
   1680 	    flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
   1681 
   1682 	return (FILEBENCH_OK);
   1683 }
   1684 
   1685 /*
   1686  * Emulates delete of a file. If a valid fd is provided, it uses the
   1687  * filesetentry stored at that fd location to select the file to be
   1688  * deleted, otherwise it picks an arbitrary filesetentry
   1689  * whose file exists. It then uses unlink() to delete it and Clears
   1690  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
   1691  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
   1692  * filesetentry cannot be found, and FILEBENCH_OK on success.
   1693  */
   1694 static int
   1695 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
   1696 {
   1697 	filesetentry_t *file;
   1698 	fileset_t *fileset;
   1699 	char path[MAXPATHLEN];
   1700 	char *pathtmp;
   1701 	int fd = flowop->fo_fdnumber;
   1702 
   1703 	/* if fd specified, use it to access file */
   1704 	if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
   1705 
   1706 		/* indicate that the file will be deleted */
   1707 		threadflow->tf_fse[fd] = NULL;
   1708 
   1709 		/* if here, we still have a valid file pointer */
   1710 		fileset = file->fse_fileset;
   1711 	} else {
   1712 
   1713 		/* Otherwise, pick arbitrary file */
   1714 		file = NULL;
   1715 		fileset = flowop->fo_fileset;
   1716 	}
   1717 
   1718 
   1719 	if (fileset == NULL) {
   1720 		filebench_log(LOG_ERROR, "flowop NULL file");
   1721 		return (FILEBENCH_ERROR);
   1722 	}
   1723 
   1724 #ifdef HAVE_RAW_SUPPORT
   1725 	/* can't be used with raw devices */
   1726 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
   1727 		filebench_log(LOG_ERROR,
   1728 		    "flowop %s attempted a deletefile on RAW device",
   1729 		    flowop->fo_name);
   1730 		return (FILEBENCH_ERROR);
   1731 	}
   1732 #endif /* HAVE_RAW_SUPPORT */
   1733 
   1734 	if (file == NULL) {
   1735 		int err;
   1736 
   1737 		/* pick arbitrary, existing (allocated) file */
   1738 		if ((err = flowoplib_pickfile(&file, flowop,
   1739 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
   1740 			filebench_log(LOG_DEBUG_SCRIPT,
   1741 			    "flowop %s failed to pick file", flowop->fo_name);
   1742 			return (err);
   1743 		}
   1744 	} else {
   1745 		/* delete specific file. wait for it to be non-busy */
   1746 		(void) ipc_mutex_lock(&fileset->fs_pick_lock);
   1747 		while (file->fse_flags & FSE_BUSY) {
   1748 			file->fse_flags |= FSE_THRD_WAITNG;
   1749 			(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
   1750 			    &fileset->fs_pick_lock);
   1751 		}
   1752 
   1753 		/* File now available, grab it for deletion */
   1754 		file->fse_flags |= FSE_BUSY;
   1755 		fileset->fs_idle_files--;
   1756 		(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
   1757 	}
   1758 
   1759 	/* don't delete if anyone (other than me) has file open */
   1760 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
   1761 		if (file->fse_open_cnt > 1) {
   1762 			filebench_log(LOG_DEBUG_SCRIPT,
   1763 			    "flowop %s can't delete file opened by other"
   1764 			    " threads at fd = %d", flowop->fo_name, fd);
   1765 			fileset_unbusy(file, FALSE, FALSE, 0);
   1766 			return (FILEBENCH_OK);
   1767 		} else {
   1768 			filebench_log(LOG_DEBUG_SCRIPT,
   1769 			    "flowop %s deleting still open file at fd = %d",
   1770 			    flowop->fo_name, fd);
   1771 		}
   1772 	} else if (file->fse_open_cnt > 0) {
   1773 		filebench_log(LOG_DEBUG_SCRIPT,
   1774 		    "flowop %s can't delete file opened by other"
   1775 		    " threads at fd = %d, open count = %d",
   1776 		    flowop->fo_name, fd, file->fse_open_cnt);
   1777 		fileset_unbusy(file, FALSE, FALSE, 0);
   1778 		return (FILEBENCH_OK);
   1779 	}
   1780 
   1781 	(void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
   1782 	(void) fb_strlcat(path, "/", MAXPATHLEN);
   1783 	(void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
   1784 	pathtmp = fileset_resolvepath(file);
   1785 	(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
   1786 	free(pathtmp);
   1787 
   1788 	/* delete the selected file */
   1789 	flowop_beginop(threadflow, flowop);
   1790 	(void) FB_UNLINK(path);
   1791 	flowop_endop(threadflow, flowop, 0);
   1792 
   1793 	/* indicate that it is no longer busy and no longer exists */
   1794 	fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
   1795 
   1796 	filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
   1797 
   1798 	return (FILEBENCH_OK);
   1799 }
   1800 
   1801 /*
   1802  * Emulates fsync of a file. Obtains the file descriptor index
   1803  * from the flowop, obtains the actual file descriptor from
   1804  * the threadflow's table, checks to be sure it is still an
   1805  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
   1806  * if the file no longer is open, FILEBENCH_OK otherwise.
   1807  */
   1808 static int
   1809 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
   1810 {
   1811 	filesetentry_t *file;
   1812 	int fd = flowop->fo_fdnumber;
   1813 
   1814 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
   1815 		filebench_log(LOG_ERROR,
   1816 		    "flowop %s attempted to fsync a closed fd %d",
   1817 		    flowop->fo_name, fd);
   1818 		return (FILEBENCH_ERROR);
   1819 	}
   1820 
   1821 	file = threadflow->tf_fse[fd];
   1822 
   1823 	if ((file == NULL) ||
   1824 	    (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
   1825 		filebench_log(LOG_ERROR,
   1826 		    "flowop %s attempted to a fsync a RAW device",
   1827 		    flowop->fo_name);
   1828 		return (FILEBENCH_ERROR);
   1829 	}
   1830 
   1831 	/* Measure time to fsync */
   1832 	flowop_beginop(threadflow, flowop);
   1833 	(void) FB_FSYNC(&threadflow->tf_fd[fd]);
   1834 	flowop_endop(threadflow, flowop, 0);
   1835 
   1836 	filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
   1837 
   1838 	return (FILEBENCH_OK);
   1839 }
   1840 
   1841 /*
   1842  * Emulate fsync of an entire fileset. Search through the
   1843  * threadflow's file descriptor array, doing fsync() on each
   1844  * open file that belongs to the flowop's fileset. Always
   1845  * returns FILEBENCH_OK.
   1846  */
   1847 static int
   1848 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
   1849 {
   1850 	int fd;
   1851 
   1852 	for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
   1853 		filesetentry_t *file;
   1854 
   1855 		/* Match the file set to fsync */
   1856 		if ((threadflow->tf_fse[fd] == NULL) ||
   1857 		    (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
   1858 			continue;
   1859 
   1860 		/* Measure time to fsync */
   1861 		flowop_beginop(threadflow, flowop);
   1862 		(void) FB_FSYNC(&threadflow->tf_fd[fd]);
   1863 		flowop_endop(threadflow, flowop, 0);
   1864 
   1865 		file = threadflow->tf_fse[fd];
   1866 
   1867 		filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
   1868 		    file->fse_path);
   1869 	}
   1870 
   1871 	return (FILEBENCH_OK);
   1872 }
   1873 
   1874 /*
   1875  * Emulate close of a file.  Obtains the file descriptor index
   1876  * from the flowop, obtains the actual file descriptor from the
   1877  * threadflow's table, checks to be sure it is still an open
   1878  * file, then does a close operation on it. Then sets the
   1879  * threadflow file descriptor table entry to 0, and the file set
   1880  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
   1881  * FILEBENCH_OK otherwise.
   1882  */
   1883 static int
   1884 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
   1885 {
   1886 	filesetentry_t *file;
   1887 	fileset_t *fileset;
   1888 	int fd = flowop->fo_fdnumber;
   1889 
   1890 	if (threadflow->tf_fd[fd].fd_ptr == NULL) {
   1891 		filebench_log(LOG_ERROR,
   1892 		    "flowop %s attempted to close an already closed fd %d",
   1893 		    flowop->fo_name, fd);
   1894 		return (FILEBENCH_ERROR);
   1895 	}
   1896 
   1897 	file = threadflow->tf_fse[fd];
   1898 	fileset = file->fse_fileset;
   1899 
   1900 	/* Wait for it to be non-busy */
   1901 	(void) ipc_mutex_lock(&fileset->fs_pick_lock);
   1902 	while (file->fse_flags & FSE_BUSY) {
   1903 		file->fse_flags |= FSE_THRD_WAITNG;
   1904 		(void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
   1905 		    &fileset->fs_pick_lock);
   1906 	}
   1907 
   1908 	/* File now available, grab it for closing */
   1909 	file->fse_flags |= FSE_BUSY;
   1910 
   1911 	/* if last open, set declare idle */
   1912 	if (file->fse_open_cnt == 1)
   1913 		fileset->fs_idle_files--;
   1914 
   1915 	(void) ipc_mutex_unlock(&fileset->fs_pick_lock);
   1916 
   1917 	/* Measure time to close */
   1918 	flowop_beginop(threadflow, flowop);
   1919 	(void) FB_CLOSE(&threadflow->tf_fd[fd]);
   1920 	flowop_endop(threadflow, flowop, 0);
   1921 
   1922 	fileset_unbusy(file, FALSE, FALSE, -1);
   1923 
   1924 	threadflow->tf_fd[fd].fd_ptr = NULL;
   1925 
   1926 	filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
   1927 
   1928 	return (FILEBENCH_OK);
   1929 }
   1930 
   1931 /*
   1932  * Obtain the full pathname of the directory described by the filesetentry
   1933  * indicated by "dir", and copy it into the character array pointed to by
   1934  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
   1935  */
   1936 static int
   1937 flowoplib_getdirpath(filesetentry_t *dir, char *path)
   1938 {
   1939 	char		*fileset_path;
   1940 	char		*fileset_name;
   1941 	char		*part_path;
   1942 
   1943 	if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
   1944 		filebench_log(LOG_ERROR, "Fileset path not set");
   1945 		return (FILEBENCH_ERROR);
   1946 	}
   1947 
   1948 	if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
   1949 		filebench_log(LOG_ERROR, "Fileset name not set");
   1950 		return (FILEBENCH_ERROR);
   1951 	}
   1952 
   1953 	(void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
   1954 	(void) fb_strlcat(path, "/", MAXPATHLEN);
   1955 	(void) fb_strlcat(path, fileset_name, MAXPATHLEN);
   1956 
   1957 	if ((part_path = fileset_resolvepath(dir)) == NULL)
   1958 		return (FILEBENCH_ERROR);
   1959 
   1960 	(void) fb_strlcat(path, part_path, MAXPATHLEN);
   1961 	free(part_path);
   1962 
   1963 	return (FILEBENCH_OK);
   1964 }
   1965 
   1966 /*
   1967  * Use mkdir to create a directory.  Obtains the fileset name from the
   1968  * flowop, selects a non-existent leaf directory and obtains its full
   1969  * path, then uses mkdir to create it on the storage subsystem (make it
   1970  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
   1971  * directories in the fileset, FILEBENCH_ERROR on other errors, and
   1972  * FILEBENCH_OK on success.
   1973  */
   1974 static int
   1975 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
   1976 {
   1977 	filesetentry_t	*dir;
   1978 	int		ret;
   1979 	char		full_path[MAXPATHLEN];
   1980 
   1981 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
   1982 	    FILESET_PICKNOEXIST)) != FILEBENCH_OK)
   1983 		return (ret);
   1984 
   1985 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
   1986 		return (ret);
   1987 
   1988 	flowop_beginop(threadflow, flowop);
   1989 	(void) FB_MKDIR(full_path, 0755);
   1990 	flowop_endop(threadflow, flowop, 0);
   1991 
   1992 	/* indicate that it is no longer busy and now exists */
   1993 	fileset_unbusy(dir, TRUE, TRUE, 0);
   1994 
   1995 	return (FILEBENCH_OK);
   1996 }
   1997 
   1998 /*
   1999  * Use rmdir to delete a directory.  Obtains the fileset name from the
   2000  * flowop, selects an existent leaf directory and obtains its full path,
   2001  * then uses rmdir to remove it from the storage subsystem (make it
   2002  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
   2003  * directories in the fileset, FILEBENCH_ERROR on other errors, and
   2004  * FILEBENCH_OK on success.
   2005  */
   2006 static int
   2007 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
   2008 {
   2009 	filesetentry_t *dir;
   2010 	int		ret;
   2011 	char		full_path[MAXPATHLEN];
   2012 
   2013 	if ((ret = flowoplib_pickleafdir(&dir, flowop,
   2014 	    FILESET_PICKEXISTS)) != FILEBENCH_OK)
   2015 		return (ret);
   2016 
   2017 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
   2018 		return (ret);
   2019 
   2020 	flowop_beginop(threadflow, flowop);
   2021 	(void) FB_RMDIR(full_path);
   2022 	flowop_endop(threadflow, flowop, 0);
   2023 
   2024 	/* indicate that it is no longer busy and no longer exists */
   2025 	fileset_unbusy(dir, TRUE, FALSE, 0);
   2026 
   2027 	return (FILEBENCH_OK);
   2028 }
   2029 
   2030 /*
   2031  * Use opendir(), multiple readdir() calls, and closedir() to list the
   2032  * contents of a directory.  Obtains the fileset name from the
   2033  * flowop, selects a normal subdirectory (which always exist) and obtains
   2034  * its full path, then uses opendir() to get a DIR handle to it from the
   2035  * file system, a readdir() loop to access each directory entry, and
   2036  * finally cleans up with a closedir(). The latency reported is the total
   2037  * for all this activity, and it also reports the total number of bytes
   2038  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
   2039  * and FILEBENCH_OK on success.
   2040  */
   2041 static int
   2042 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
   2043 {
   2044 	fileset_t	*fileset;
   2045 	filesetentry_t	*dir;
   2046 	DIR		*dir_handle;
   2047 	struct dirent	*direntp;
   2048 	int		dir_bytes = 0;
   2049 	int		ret;
   2050 	char		full_path[MAXPATHLEN];
   2051 
   2052 	if ((fileset = flowop->fo_fileset) == NULL) {
   2053 		filebench_log(LOG_ERROR, "flowop NO fileset");
   2054 		return (FILEBENCH_ERROR);
   2055 	}
   2056 
   2057 	if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
   2058 		filebench_log(LOG_DEBUG_SCRIPT,
   2059 		    "flowop %s failed to pick directory from fileset %s",
   2060 		    flowop->fo_name,
   2061 		    avd_get_str(fileset->fs_name));
   2062 		return (FILEBENCH_ERROR);
   2063 	}
   2064 
   2065 	if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
   2066 		return (ret);
   2067 
   2068 	flowop_beginop(threadflow, flowop);
   2069 
   2070 	/* open the directory */
   2071 	if ((dir_handle = FB_OPENDIR(full_path)) == NULL) {
   2072 		filebench_log(LOG_ERROR,
   2073 		    "flowop %s failed to open directory in fileset %s\n",
   2074 		    flowop->fo_name, avd_get_str(fileset->fs_name));
   2075 		return (FILEBENCH_ERROR);
   2076 	}
   2077 
   2078 	/* read through the directory entries */
   2079 	while ((direntp = FB_READDIR(dir_handle)) != NULL) {
   2080 		dir_bytes += (strlen(direntp->d_name) +
   2081 		    sizeof (struct dirent) - 1);
   2082 	}
   2083 
   2084 	/* close the directory */
   2085 	(void) FB_CLOSEDIR(dir_handle);
   2086 
   2087 	flowop_endop(threadflow, flowop, dir_bytes);
   2088 
   2089 	/* indicate that it is no longer busy */
   2090 	fileset_unbusy(dir, FALSE, FALSE, 0);
   2091 
   2092 	return (FILEBENCH_OK);
   2093 }
   2094 
   2095 /*
   2096  * Emulate stat of a file. Picks an arbitrary filesetentry with
   2097  * an existing file from the flowop's fileset, then performs a
   2098  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
   2099  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
   2100  * cannot be found, and FILEBENCH_OK on success.
   2101  */
   2102 static int
   2103 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
   2104 {
   2105 	filesetentry_t *file;
   2106 	fileset_t *fileset;
   2107 	struct stat64 statbuf;
   2108 	int fd = flowop->fo_fdnumber;
   2109 
   2110 	/* if fd specified and the file is open, use it to access file */
   2111 	if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
   2112 
   2113 		/* check whether file handle still valid */
   2114 		if ((file = threadflow->tf_fse[fd]) == NULL) {
   2115 			filebench_log(LOG_DEBUG_SCRIPT,
   2116 			    "flowop %s trying to stat NULL file at fd = %d",
   2117 			    flowop->fo_name, fd);
   2118 			return (FILEBENCH_ERROR);
   2119 		}
   2120 
   2121 		/* if here, we still have a valid file pointer */
   2122 		fileset = file->fse_fileset;
   2123 	} else {
   2124 		/* Otherwise, pick arbitrary file */
   2125 		file = NULL;
   2126 		fileset = flowop->fo_fileset;
   2127 	}
   2128 
   2129 	if (fileset == NULL) {
   2130 		filebench_log(LOG_ERROR,
   2131 		    "statfile with no fileset specified");
   2132 		return (FILEBENCH_ERROR);
   2133 	}
   2134 
   2135 #ifdef HAVE_RAW_SUPPORT
   2136 	/* can't be used with raw devices */
   2137 	if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
   2138 		filebench_log(LOG_ERROR,
   2139 		    "flowop %s attempted do a statfile on a RAW device",
   2140 		    flowop->fo_name);
   2141 		return (FILEBENCH_ERROR);
   2142 	}
   2143 #endif /* HAVE_RAW_SUPPORT */
   2144 
   2145 	if (file == NULL) {
   2146 		char path[MAXPATHLEN];
   2147 		char *pathtmp;
   2148 		int err;
   2149 
   2150 		/* pick arbitrary, existing (allocated) file */
   2151 		if ((err = flowoplib_pickfile(&file, flowop,
   2152 		    FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
   2153 			filebench_log(LOG_DEBUG_SCRIPT,
   2154 			    "Statfile flowop %s failed to pick file",
   2155 			    flowop->fo_name);
   2156 			return (err);
   2157 		}
   2158 
   2159 		/* resolve path and do a stat on file */
   2160 		(void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
   2161 		    MAXPATHLEN);
   2162 		(void) fb_strlcat(path, "/", MAXPATHLEN);
   2163 		(void) fb_strlcat(path, avd_get_str(fileset->fs_name),
   2164 		    MAXPATHLEN);
   2165 		pathtmp = fileset_resolvepath(file);
   2166 		(void) fb_strlcat(path, pathtmp, MAXPATHLEN);
   2167 		free(pathtmp);
   2168 
   2169 		/* stat the file */
   2170 		flowop_beginop(threadflow, flowop);
   2171 		if (FB_STAT(path, &statbuf) == -1)
   2172 			filebench_log(LOG_ERROR,
   2173 			    "statfile flowop %s failed", flowop->fo_name);
   2174 		flowop_endop(threadflow, flowop, 0);
   2175 
   2176 		fileset_unbusy(file, FALSE, FALSE, 0);
   2177 	} else {
   2178 		/* stat specific file */
   2179 		flowop_beginop(threadflow, flowop);
   2180 		if (FB_FSTAT(&threadflow->tf_fd[fd], &statbuf) == -1)
   2181 			filebench_log(LOG_ERROR,
   2182 			    "statfile flowop %s failed", flowop->fo_name);
   2183 		flowop_endop(threadflow, flowop, 0);
   2184 
   2185 	}
   2186 
   2187 	return (FILEBENCH_OK);
   2188 }
   2189 
   2190 
   2191 /*
   2192  * Additional reads and writes. Read and write whole files, write
   2193  * and append to files. Some of these work with both fileobjs and
   2194  * filesets, others only with filesets. The flowoplib_write routine
   2195  * writes from thread memory, while the others read or write using
   2196  * fo_buf memory. Note that both flowoplib_read() and
   2197  * flowoplib_aiowrite() use thread memory as well.
   2198  */
   2199 
   2200 
   2201 /*
   2202  * Emulate a read of a whole file. The file must be open with
   2203  * file descriptor and filesetentry stored at the locations indexed
   2204  * by the flowop's fdnumber. It then seeks to the beginning of the
   2205  * associated file, and reads fs_iosize bytes at a time until the end
   2206  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
   2207  * out of files, and FILEBENCH_OK on success.
   2208  */
   2209 static int
   2210 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
   2211 {
   2212 	caddr_t iobuf;
   2213 	off64_t bytes = 0;
   2214 	fb_fdesc_t *fdesc;
   2215 	uint64_t wss;
   2216 	fbint_t iosize;
   2217 	int ret;
   2218 	char zerordbuf;
   2219 
   2220 	/* get the file to use */
   2221 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
   2222 	    &fdesc)) != FILEBENCH_OK)
   2223 		return (ret);
   2224 
   2225 	/* an I/O size of zero means read entire working set with one I/O */
   2226 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
   2227 		iosize = wss;
   2228 
   2229 	/*
   2230 	 * The file may actually be 0 bytes long, in which case skip
   2231 	 * the buffer set up call (which would fail) and substitute
   2232 	 * a small buffer, which won't really be used.
   2233 	 */
   2234 	if (iosize == 0) {
   2235 		iobuf = (caddr_t)&zerordbuf;
   2236 		filebench_log(LOG_DEBUG_SCRIPT,
   2237 		    "flowop %s read zero length file", flowop->fo_name);
   2238 	} else {
   2239 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
   2240 		    iosize) != 0)
   2241 			return (FILEBENCH_ERROR);
   2242 	}
   2243 
   2244 	/* Measure time to read bytes */
   2245 	flowop_beginop(threadflow, flowop);
   2246 	(void) FB_LSEEK(fdesc, 0, SEEK_SET);
   2247 	while ((ret = FB_READ(fdesc, iobuf, iosize)) > 0)
   2248 		bytes += ret;
   2249 
   2250 	flowop_endop(threadflow, flowop, bytes);
   2251 
   2252 	if (ret < 0) {
   2253 		filebench_log(LOG_ERROR,
   2254 		    "readwhole fail Failed to read whole file: %s",
   2255 		    strerror(errno));
   2256 		return (FILEBENCH_ERROR);
   2257 	}
   2258 
   2259 	return (FILEBENCH_OK);
   2260 }
   2261 
   2262 /*
   2263  * Emulate a write to a file of size fo_iosize.  Will write
   2264  * to a file from a fileset if the flowop's fo_fileset field
   2265  * specifies one or its fdnumber is non zero. Otherwise it
   2266  * will write to a fileobj file, if one exists. If the file
   2267  * is not currently open, the routine will attempt to open
   2268  * it. The flowop's fo_wss parameter will be used to set the
   2269  * maximum file size if it is non-zero, otherwise the
   2270  * filesetentry's  fse_size will be used. A random memory
   2271  * buffer offset is calculated, and, if fo_random is TRUE,
   2272  * a random file offset is used for the write. Otherwise the
   2273  * write is to the next sequential location. Returns
   2274  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
   2275  * obtain a file, or FILEBENCH_OK on success.
   2276  */
   2277 static int
   2278 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
   2279 {
   2280 	caddr_t iobuf;
   2281 	fbint_t wss;
   2282 	fbint_t iosize;
   2283 	fb_fdesc_t *fdesc;
   2284 	int ret;
   2285 
   2286 	iosize = avd_get_int(flowop->fo_iosize);
   2287 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
   2288 	    &fdesc, iosize)) != FILEBENCH_OK)
   2289 		return (ret);
   2290 
   2291 	if (avd_get_bool(flowop->fo_random)) {
   2292 		uint64_t fileoffset;
   2293 
   2294 		if (filebench_randomno64(&fileoffset,
   2295 		    wss, iosize, NULL) == -1) {
   2296 			filebench_log(LOG_ERROR,
   2297 			    "file size smaller than IO size for thread %s",
   2298 			    flowop->fo_name);
   2299 			return (FILEBENCH_ERROR);
   2300 		}
   2301 		flowop_beginop(threadflow, flowop);
   2302 		if (FB_PWRITE(fdesc, iobuf,
   2303 		    iosize, (off64_t)fileoffset) == -1) {
   2304 			filebench_log(LOG_ERROR, "write failed, "
   2305 			    "offset %llu io buffer %zd: %s",
   2306 			    (u_longlong_t)fileoffset, iobuf, strerror(errno));
   2307 			flowop_endop(threadflow, flowop, 0);
   2308 			return (FILEBENCH_ERROR);
   2309 		}
   2310 		flowop_endop(threadflow, flowop, iosize);
   2311 	} else {
   2312 		flowop_beginop(threadflow, flowop);
   2313 		if (FB_WRITE(fdesc, iobuf, iosize) == -1) {
   2314 			filebench_log(LOG_ERROR,
   2315 			    "write failed, io buffer %zd: %s",
   2316 			    iobuf, strerror(errno));
   2317 			flowop_endop(threadflow, flowop, 0);
   2318 			return (FILEBENCH_ERROR);
   2319 		}
   2320 		flowop_endop(threadflow, flowop, iosize);
   2321 	}
   2322 
   2323 	return (FILEBENCH_OK);
   2324 }
   2325 
   2326 /*
   2327  * Emulate a write of a whole file.  The size of the file
   2328  * is taken from a filesetentry identified by fo_srcfdnumber or
   2329  * from the working set size, while the file descriptor used is
   2330  * identified by fo_fdnumber. Does multiple writes of fo_iosize
   2331  * length length until full file has been written. Returns FILEBENCH_ERROR on
   2332  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
   2333  */
   2334 static int
   2335 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
   2336 {
   2337 	caddr_t iobuf;
   2338 	filesetentry_t *file;
   2339 	int wsize;
   2340 	off64_t seek;
   2341 	off64_t bytes = 0;
   2342 	uint64_t wss;
   2343 	fbint_t iosize;
   2344 	fb_fdesc_t *fdesc;
   2345 	int srcfd = flowop->fo_srcfdnumber;
   2346 	int ret;
   2347 	char zerowrtbuf;
   2348 
   2349 	/* get the file to use */
   2350 	if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
   2351 	    &fdesc)) != FILEBENCH_OK)
   2352 		return (ret);
   2353 
   2354 	/* an I/O size of zero means write entire working set with one I/O */
   2355 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
   2356 		iosize = wss;
   2357 
   2358 	/*
   2359 	 * The file may actually be 0 bytes long, in which case skip
   2360 	 * the buffer set up call (which would fail) and substitute
   2361 	 * a small buffer, which won't really be used.
   2362 	 */
   2363 	if (iosize == 0) {
   2364 		iobuf = (caddr_t)&zerowrtbuf;
   2365 		filebench_log(LOG_DEBUG_SCRIPT,
   2366 		    "flowop %s wrote zero length file", flowop->fo_name);
   2367 	} else {
   2368 		if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
   2369 		    iosize) != 0)
   2370 			return (FILEBENCH_ERROR);
   2371 	}
   2372 
   2373 	file = threadflow->tf_fse[srcfd];
   2374 	if ((srcfd != 0) && (file == NULL)) {
   2375 		filebench_log(LOG_ERROR, "flowop %s: NULL src file",
   2376 		    flowop->fo_name);
   2377 		return (FILEBENCH_ERROR);
   2378 	}
   2379 
   2380 	if (file)
   2381 		wss = file->fse_size;
   2382 
   2383 	wsize = (int)MIN(wss, iosize);
   2384 
   2385 	/* Measure time to write bytes */
   2386 	flowop_beginop(threadflow, flowop);
   2387 	for (seek = 0; seek < wss; seek += wsize) {
   2388 		ret = FB_WRITE(fdesc, iobuf, wsize);
   2389 		if (ret != wsize) {
   2390 			filebench_log(LOG_ERROR,
   2391 			    "Failed to write %d bytes on fd %d: %s",
   2392 			    wsize, fdesc->fd_num, strerror(errno));
   2393 			flowop_endop(threadflow, flowop, 0);
   2394 			return (FILEBENCH_ERROR);
   2395 		}
   2396 		wsize = (int)MIN(wss - seek, iosize);
   2397 		bytes += ret;
   2398 	}
   2399 	flowop_endop(threadflow, flowop, bytes);
   2400 
   2401 	return (FILEBENCH_OK);
   2402 }
   2403 
   2404 
   2405 /*
   2406  * Emulate a fixed size append to a file. Will append data to
   2407  * a file chosen from a fileset if the flowop's fo_fileset
   2408  * field specifies one or if its fdnumber is non zero.
   2409  * Otherwise it will write to a fileobj file, if one exists.
   2410  * The flowop's fo_wss parameter will be used to set the
   2411  * maximum file size if it is non-zero, otherwise the
   2412  * filesetentry's fse_size will be used. A random memory
   2413  * buffer offset is calculated, then a logical seek to the
   2414  * end of file is done followed by a write of fo_iosize
   2415  * bytes. Writes are actually done from fo_buf, rather than
   2416  * tf_mem as is done with flowoplib_write(), and no check
   2417  * is made to see if fo_iosize exceeds the size of fo_buf.
   2418  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
   2419  * files in the fileset, FILEBENCH_OK on success.
   2420  */
   2421 static int
   2422 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
   2423 {
   2424 	caddr_t iobuf;
   2425 	fb_fdesc_t *fdesc;
   2426 	fbint_t wss;
   2427 	fbint_t iosize;
   2428 	int ret;
   2429 
   2430 	iosize = avd_get_int(flowop->fo_iosize);
   2431 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
   2432 	    &fdesc, iosize)) != FILEBENCH_OK)
   2433 		return (ret);
   2434 
   2435 	/* XXX wss is not being used */
   2436 
   2437 	/* Measure time to write bytes */
   2438 	flowop_beginop(threadflow, flowop);
   2439 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
   2440 	ret = FB_WRITE(fdesc, iobuf, iosize);
   2441 	if (ret != iosize) {
   2442 		filebench_log(LOG_ERROR,
   2443 		    "Failed to write %llu bytes on fd %d: %s",
   2444 		    (u_longlong_t)iosize, fdesc->fd_num, strerror(errno));
   2445 		flowop_endop(threadflow, flowop, ret);
   2446 		return (FILEBENCH_ERROR);
   2447 	}
   2448 	flowop_endop(threadflow, flowop, ret);
   2449 
   2450 	return (FILEBENCH_OK);
   2451 }
   2452 
   2453 /*
   2454  * Emulate a random size append to a file. Will append data
   2455  * to a file chosen from a fileset if the flowop's fo_fileset
   2456  * field specifies one or if its fdnumber is non zero. Otherwise
   2457  * it will write to a fileobj file, if one exists. The flowop's
   2458  * fo_wss parameter will be used to set the maximum file size
   2459  * if it is non-zero, otherwise the filesetentry's fse_size
   2460  * will be used.  A random transfer size (but at most fo_iosize
   2461  * bytes) and a random memory offset are calculated. A logical
   2462  * seek to the end of file is done, then writes of up to
   2463  * FILE_ALLOC_BLOCK in size are done until the full transfer
   2464  * size has been written. Writes are actually done from fo_buf,
   2465  * rather than tf_mem as is done with flowoplib_write().
   2466  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
   2467  * files in the fileset, FILEBENCH_OK on success.
   2468  */
   2469 static int
   2470 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
   2471 {
   2472 	caddr_t iobuf;
   2473 	uint64_t appendsize;
   2474 	fb_fdesc_t *fdesc;
   2475 	fbint_t wss;
   2476 	fbint_t iosize;
   2477 	int ret = 0;
   2478 
   2479 	if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
   2480 		filebench_log(LOG_ERROR, "zero iosize for flowop %s",
   2481 		    flowop->fo_name);
   2482 		return (FILEBENCH_ERROR);
   2483 	}
   2484 
   2485 	if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
   2486 		return (FILEBENCH_ERROR);
   2487 
   2488 	/* skip if attempting zero length append */
   2489 	if (appendsize == 0) {
   2490 		flowop_beginop(threadflow, flowop);
   2491 		flowop_endop(threadflow, flowop, 0LL);
   2492 		return (FILEBENCH_OK);
   2493 	}
   2494 
   2495 	if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
   2496 	    &fdesc, appendsize)) != FILEBENCH_OK)
   2497 		return (ret);
   2498 
   2499 	/* XXX wss is not being used */
   2500 
   2501 	/* Measure time to write bytes */
   2502 	flowop_beginop(threadflow, flowop);
   2503 
   2504 	(void) FB_LSEEK(fdesc, 0, SEEK_END);
   2505 	ret = FB_WRITE(fdesc, iobuf, appendsize);
   2506 	if (ret != appendsize) {
   2507 		filebench_log(LOG_ERROR,
   2508 		    "Failed to write %llu bytes on fd %d: %s",
   2509 		    (u_longlong_t)appendsize, fdesc->fd_num, strerror(errno));
   2510 		flowop_endop(threadflow, flowop, 0);
   2511 		return (FILEBENCH_ERROR);
   2512 	}
   2513 
   2514 	flowop_endop(threadflow, flowop, appendsize);
   2515 
   2516 	return (FILEBENCH_OK);
   2517 }
   2518 
   2519 typedef struct testrandvar_priv {
   2520 	uint64_t sample_count;
   2521 	double val_sum;
   2522 	double sqr_sum;
   2523 } testrandvar_priv_t;
   2524 
   2525 /*
   2526  * flowop to calculate various statistics from the number stream
   2527  * produced by a random variable. This allows verification that the
   2528  * random distribution used to define the random variable is producing
   2529  * the expected distribution of random numbers.
   2530  */
   2531 /* ARGSUSED */
   2532 static int
   2533 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
   2534 {
   2535 	testrandvar_priv_t	*mystats;
   2536 	double			value;
   2537 
   2538 	if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
   2539 		filebench_log(LOG_ERROR, "testrandvar not initialized\n");
   2540 		filebench_shutdown(1);
   2541 		return (-1);
   2542 	}
   2543 
   2544 	value = avd_get_dbl(flowop->fo_value);
   2545 
   2546 	mystats->sample_count++;
   2547 	mystats->val_sum += value;
   2548 	mystats->sqr_sum += (value * value);
   2549 
   2550 	return (0);
   2551 }
   2552 
   2553 /*
   2554  * Initialize the private data area used to accumulate the statistics
   2555  */
   2556 static int
   2557 flowoplib_testrandvar_init(flowop_t *flowop)
   2558 {
   2559 	testrandvar_priv_t	*mystats;
   2560 
   2561 	if ((mystats = (testrandvar_priv_t *)
   2562 	    malloc(sizeof (testrandvar_priv_t))) == NULL) {
   2563 		filebench_log(LOG_ERROR, "could not initialize testrandvar");
   2564 		filebench_shutdown(1);
   2565 		return (-1);
   2566 	}
   2567 
   2568 	mystats->sample_count = 0;
   2569 	mystats->val_sum = 0;
   2570 	mystats->sqr_sum = 0;
   2571 	flowop->fo_private = (void *)mystats;
   2572 
   2573 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   2574 	return (0);
   2575 }
   2576 
   2577 /*
   2578  * Print out the accumulated statistics, and free the private storage
   2579  */
   2580 static void
   2581 flowoplib_testrandvar_destruct(flowop_t *flowop)
   2582 {
   2583 	testrandvar_priv_t	*mystats;
   2584 	double mean, std_dev, dbl_count;
   2585 
   2586 	(void) ipc_mutex_lock(&flowop->fo_lock);
   2587 	if ((mystats = (testrandvar_priv_t *)
   2588 	    flowop->fo_private) == NULL) {
   2589 		(void) ipc_mutex_unlock(&flowop->fo_lock);
   2590 		return;
   2591 	}
   2592 
   2593 	flowop->fo_private = NULL;
   2594 	(void) ipc_mutex_unlock(&flowop->fo_lock);
   2595 
   2596 	dbl_count = (double)mystats->sample_count;
   2597 	mean = mystats->val_sum / dbl_count;
   2598 	std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
   2599 
   2600 	filebench_log(LOG_VERBOSE,
   2601 	    "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
   2602 	    (u_longlong_t)mystats->sample_count, mean, std_dev);
   2603 	free(mystats);
   2604 }
   2605 
   2606 /*
   2607  * prints message to the console from within a thread
   2608  */
   2609 static int
   2610 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
   2611 {
   2612 	procflow_t *procflow;
   2613 
   2614 	procflow = threadflow->tf_process;
   2615 	filebench_log(LOG_INFO,
   2616 	    "Message from process (%s,%d), thread (%s,%d): %s",
   2617 	    procflow->pf_name, procflow->pf_instance,
   2618 	    threadflow->tf_name, threadflow->tf_instance,
   2619 	    avd_get_str(flowop->fo_value));
   2620 
   2621 	return (FILEBENCH_OK);
   2622 }
   2623 
   2624 /*
   2625  * Prints usage information for flowop operations.
   2626  */
   2627 void
   2628 flowoplib_usage()
   2629 {
   2630 	(void) fprintf(stderr,
   2631 	    "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
   2632 	(void) fprintf(stderr,
   2633 	    "                       [,fd=<file desc num>]\n");
   2634 	(void) fprintf(stderr, "\n");
   2635 	(void) fprintf(stderr,
   2636 	    "flowop closefile name=<name>,fd=<file desc num>]\n");
   2637 	(void) fprintf(stderr, "\n");
   2638 	(void) fprintf(stderr, "flowop deletefile name=<name>\n");
   2639 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
   2640 	(void) fprintf(stderr,
   2641 	    "                       [,fd=<file desc num>]\n");
   2642 	(void) fprintf(stderr, "\n");
   2643 	(void) fprintf(stderr, "flowop statfile name=<name>\n");
   2644 	(void) fprintf(stderr, "                       [,fileset=<fname>]\n");
   2645 	(void) fprintf(stderr,
   2646 	    "                       [,fd=<file desc num>]\n");
   2647 	(void) fprintf(stderr, "\n");
   2648 	(void) fprintf(stderr,
   2649 	    "flowop fsync name=<name>,fd=<file desc num>]\n");
   2650 	(void) fprintf(stderr, "\n");
   2651 	(void) fprintf(stderr,
   2652 	    "flowop fsyncset name=<name>,fileset=<fname>]\n");
   2653 	(void) fprintf(stderr, "\n");
   2654 	(void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
   2655 	(void) fprintf(stderr,
   2656 	    "                       filename|fileset=<fname>,\n");
   2657 	(void) fprintf(stderr, "                       iosize=<size>\n");
   2658 	(void) fprintf(stderr, "                       [,directio]\n");
   2659 	(void) fprintf(stderr, "                       [,dsync]\n");
   2660 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
   2661 	(void) fprintf(stderr, "                       [,random]\n");
   2662 	(void) fprintf(stderr, "                       [,opennext]\n");
   2663 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
   2664 	(void) fprintf(stderr,
   2665 	    "flowop [appendfile|appendfilerand] name=<name>, \n");
   2666 	(void) fprintf(stderr,
   2667 	    "                       filename|fileset=<fname>,\n");
   2668 	(void) fprintf(stderr, "                       iosize=<size>\n");
   2669 	(void) fprintf(stderr, "                       [,dsync]\n");
   2670 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
   2671 	(void) fprintf(stderr, "                       [,workingset=<size>]\n");
   2672 	(void) fprintf(stderr,
   2673 	    "flowop [readwholefile|writewholefile] name=<name>, \n");
   2674 	(void) fprintf(stderr,
   2675 	    "                       filename|fileset=<fname>,\n");
   2676 	(void) fprintf(stderr, "                       iosize=<size>\n");
   2677 	(void) fprintf(stderr, "                       [,dsync]\n");
   2678 	(void) fprintf(stderr, "                       [,iters=<count>]\n");
   2679 	(void) fprintf(stderr, "\n");
   2680 	(void) fprintf(stderr, "flowop aiowait name=<name>,target="
   2681 	    "<aiowrite-flowop>\n");
   2682 	(void) fprintf(stderr, "\n");
   2683 	(void) fprintf(stderr, "flowop sempost name=<name>,"
   2684 	    "target=<semblock-flowop>,\n");
   2685 	(void) fprintf(stderr,
   2686 	    "                       value=<increment-to-post>\n");
   2687 	(void) fprintf(stderr, "\n");
   2688 	(void) fprintf(stderr, "flowop semblock name=<name>,value="
   2689 	    "<decrement-to-receive>,\n");
   2690 	(void) fprintf(stderr, "                       highwater="
   2691 	    "<inbound-queue-max>\n");
   2692 	(void) fprintf(stderr, "\n");
   2693 	(void) fprintf(stderr, "flowop block name=<name>\n");
   2694 	(void) fprintf(stderr, "\n");
   2695 	(void) fprintf(stderr,
   2696 	    "flowop wakeup name=<name>,target=<block-flowop>,\n");
   2697 	(void) fprintf(stderr, "\n");
   2698 	(void) fprintf(stderr,
   2699 	    "flowop hog name=<name>,value=<number-of-mem-ops>\n");
   2700 	(void) fprintf(stderr,
   2701 	    "flowop delay name=<name>,value=<number-of-seconds>\n");
   2702 	(void) fprintf(stderr, "\n");
   2703 	(void) fprintf(stderr, "flowop eventlimit name=<name>\n");
   2704 	(void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
   2705 	(void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
   2706 	(void) fprintf(stderr,
   2707 	    "flowop finishoncount name=<name>,value=<ops/s>\n");
   2708 	(void) fprintf(stderr,
   2709 	    "flowop finishonbytes name=<name>,value=<bytes>\n");
   2710 	(void) fprintf(stderr, "\n");
   2711 	(void) fprintf(stderr, "\n");
   2712 }
   2713