Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     27 
     28 #include <sys/zfs_context.h>
     29 #include <sys/dnode.h>
     30 #include <sys/dmu_objset.h>
     31 #include <sys/dmu_zfetch.h>
     32 #include <sys/dmu.h>
     33 #include <sys/dbuf.h>
     34 
     35 /*
     36  * I'm against tune-ables, but these should probably exist as tweakable globals
     37  * until we can get this working the way we want it to.
     38  */
     39 
     40 int zfs_prefetch_disable = 0;
     41 
     42 /* max # of streams per zfetch */
     43 uint32_t	zfetch_max_streams = 8;
     44 /* min time before stream reclaim */
     45 uint32_t	zfetch_min_sec_reap = 2;
     46 /* max number of blocks to fetch at a time */
     47 uint32_t	zfetch_block_cap = 256;
     48 /* number of bytes in a array_read at which we stop prefetching (1Mb) */
     49 uint64_t	zfetch_array_rd_sz = 1024 * 1024;
     50 
     51 /* forward decls for static routines */
     52 static int		dmu_zfetch_colinear(zfetch_t *, zstream_t *);
     53 static void		dmu_zfetch_dofetch(zfetch_t *, zstream_t *);
     54 static uint64_t		dmu_zfetch_fetch(dnode_t *, uint64_t, uint64_t);
     55 static uint64_t		dmu_zfetch_fetchsz(dnode_t *, uint64_t, uint64_t);
     56 static int		dmu_zfetch_find(zfetch_t *, zstream_t *, int);
     57 static int		dmu_zfetch_stream_insert(zfetch_t *, zstream_t *);
     58 static zstream_t	*dmu_zfetch_stream_reclaim(zfetch_t *);
     59 static void		dmu_zfetch_stream_remove(zfetch_t *, zstream_t *);
     60 static int		dmu_zfetch_streams_equal(zstream_t *, zstream_t *);
     61 
     62 /*
     63  * Given a zfetch structure and a zstream structure, determine whether the
     64  * blocks to be read are part of a co-linear pair of existing prefetch
     65  * streams.  If a set is found, coalesce the streams, removing one, and
     66  * configure the prefetch so it looks for a strided access pattern.
     67  *
     68  * In other words: if we find two sequential access streams that are
     69  * the same length and distance N appart, and this read is N from the
     70  * last stream, then we are probably in a strided access pattern.  So
     71  * combine the two sequential streams into a single strided stream.
     72  *
     73  * If no co-linear streams are found, return NULL.
     74  */
     75 static int
     76 dmu_zfetch_colinear(zfetch_t *zf, zstream_t *zh)
     77 {
     78 	zstream_t	*z_walk;
     79 	zstream_t	*z_comp;
     80 
     81 	if (! rw_tryenter(&zf->zf_rwlock, RW_WRITER))
     82 		return (0);
     83 
     84 	if (zh == NULL) {
     85 		rw_exit(&zf->zf_rwlock);
     86 		return (0);
     87 	}
     88 
     89 	for (z_walk = list_head(&zf->zf_stream); z_walk;
     90 	    z_walk = list_next(&zf->zf_stream, z_walk)) {
     91 		for (z_comp = list_next(&zf->zf_stream, z_walk); z_comp;
     92 		    z_comp = list_next(&zf->zf_stream, z_comp)) {
     93 			int64_t		diff;
     94 
     95 			if (z_walk->zst_len != z_walk->zst_stride ||
     96 			    z_comp->zst_len != z_comp->zst_stride) {
     97 				continue;
     98 			}
     99 
    100 			diff = z_comp->zst_offset - z_walk->zst_offset;
    101 			if (z_comp->zst_offset + diff == zh->zst_offset) {
    102 				z_walk->zst_offset = zh->zst_offset;
    103 				z_walk->zst_direction = diff < 0 ? -1 : 1;
    104 				z_walk->zst_stride =
    105 				    diff * z_walk->zst_direction;
    106 				z_walk->zst_ph_offset =
    107 				    zh->zst_offset + z_walk->zst_stride;
    108 				dmu_zfetch_stream_remove(zf, z_comp);
    109 				mutex_destroy(&z_comp->zst_lock);
    110 				kmem_free(z_comp, sizeof (zstream_t));
    111 
    112 				dmu_zfetch_dofetch(zf, z_walk);
    113 
    114 				rw_exit(&zf->zf_rwlock);
    115 				return (1);
    116 			}
    117 
    118 			diff = z_walk->zst_offset - z_comp->zst_offset;
    119 			if (z_walk->zst_offset + diff == zh->zst_offset) {
    120 				z_walk->zst_offset = zh->zst_offset;
    121 				z_walk->zst_direction = diff < 0 ? -1 : 1;
    122 				z_walk->zst_stride =
    123 				    diff * z_walk->zst_direction;
    124 				z_walk->zst_ph_offset =
    125 				    zh->zst_offset + z_walk->zst_stride;
    126 				dmu_zfetch_stream_remove(zf, z_comp);
    127 				mutex_destroy(&z_comp->zst_lock);
    128 				kmem_free(z_comp, sizeof (zstream_t));
    129 
    130 				dmu_zfetch_dofetch(zf, z_walk);
    131 
    132 				rw_exit(&zf->zf_rwlock);
    133 				return (1);
    134 			}
    135 		}
    136 	}
    137 
    138 	rw_exit(&zf->zf_rwlock);
    139 	return (0);
    140 }
    141 
    142 /*
    143  * Given a zstream_t, determine the bounds of the prefetch.  Then call the
    144  * routine that actually prefetches the individual blocks.
    145  */
    146 static void
    147 dmu_zfetch_dofetch(zfetch_t *zf, zstream_t *zs)
    148 {
    149 	uint64_t	prefetch_tail;
    150 	uint64_t	prefetch_limit;
    151 	uint64_t	prefetch_ofst;
    152 	uint64_t	prefetch_len;
    153 	uint64_t	blocks_fetched;
    154 
    155 	zs->zst_stride = MAX((int64_t)zs->zst_stride, zs->zst_len);
    156 	zs->zst_cap = MIN(zfetch_block_cap, 2 * zs->zst_cap);
    157 
    158 	prefetch_tail = MAX((int64_t)zs->zst_ph_offset,
    159 	    (int64_t)(zs->zst_offset + zs->zst_stride));
    160 	/*
    161 	 * XXX: use a faster division method?
    162 	 */
    163 	prefetch_limit = zs->zst_offset + zs->zst_len +
    164 	    (zs->zst_cap * zs->zst_stride) / zs->zst_len;
    165 
    166 	while (prefetch_tail < prefetch_limit) {
    167 		prefetch_ofst = zs->zst_offset + zs->zst_direction *
    168 		    (prefetch_tail - zs->zst_offset);
    169 
    170 		prefetch_len = zs->zst_len;
    171 
    172 		/*
    173 		 * Don't prefetch beyond the end of the file, if working
    174 		 * backwards.
    175 		 */
    176 		if ((zs->zst_direction == ZFETCH_BACKWARD) &&
    177 		    (prefetch_ofst > prefetch_tail)) {
    178 			prefetch_len += prefetch_ofst;
    179 			prefetch_ofst = 0;
    180 		}
    181 
    182 		/* don't prefetch more than we're supposed to */
    183 		if (prefetch_len > zs->zst_len)
    184 			break;
    185 
    186 		blocks_fetched = dmu_zfetch_fetch(zf->zf_dnode,
    187 		    prefetch_ofst, zs->zst_len);
    188 
    189 		prefetch_tail += zs->zst_stride;
    190 		/* stop if we've run out of stuff to prefetch */
    191 		if (blocks_fetched < zs->zst_len)
    192 			break;
    193 	}
    194 	zs->zst_ph_offset = prefetch_tail;
    195 	zs->zst_last = lbolt;
    196 }
    197 
    198 /*
    199  * This takes a pointer to a zfetch structure and a dnode.  It performs the
    200  * necessary setup for the zfetch structure, grokking data from the
    201  * associated dnode.
    202  */
    203 void
    204 dmu_zfetch_init(zfetch_t *zf, dnode_t *dno)
    205 {
    206 	if (zf == NULL) {
    207 		return;
    208 	}
    209 
    210 	zf->zf_dnode = dno;
    211 	zf->zf_stream_cnt = 0;
    212 	zf->zf_alloc_fail = 0;
    213 
    214 	list_create(&zf->zf_stream, sizeof (zstream_t),
    215 	    offsetof(zstream_t, zst_node));
    216 
    217 	rw_init(&zf->zf_rwlock, NULL, RW_DEFAULT, NULL);
    218 }
    219 
    220 /*
    221  * This function computes the actual size, in blocks, that can be prefetched,
    222  * and fetches it.
    223  */
    224 static uint64_t
    225 dmu_zfetch_fetch(dnode_t *dn, uint64_t blkid, uint64_t nblks)
    226 {
    227 	uint64_t	fetchsz;
    228 	uint64_t	i;
    229 
    230 	fetchsz = dmu_zfetch_fetchsz(dn, blkid, nblks);
    231 
    232 	for (i = 0; i < fetchsz; i++) {
    233 		dbuf_prefetch(dn, blkid + i);
    234 	}
    235 
    236 	return (fetchsz);
    237 }
    238 
    239 /*
    240  * this function returns the number of blocks that would be prefetched, based
    241  * upon the supplied dnode, blockid, and nblks.  This is used so that we can
    242  * update streams in place, and then prefetch with their old value after the
    243  * fact.  This way, we can delay the prefetch, but subsequent accesses to the
    244  * stream won't result in the same data being prefetched multiple times.
    245  */
    246 static uint64_t
    247 dmu_zfetch_fetchsz(dnode_t *dn, uint64_t blkid, uint64_t nblks)
    248 {
    249 	uint64_t	fetchsz;
    250 
    251 	if (blkid > dn->dn_maxblkid) {
    252 		return (0);
    253 	}
    254 
    255 	/* compute fetch size */
    256 	if (blkid + nblks + 1 > dn->dn_maxblkid) {
    257 		fetchsz = (dn->dn_maxblkid - blkid) + 1;
    258 		ASSERT(blkid + fetchsz - 1 <= dn->dn_maxblkid);
    259 	} else {
    260 		fetchsz = nblks;
    261 	}
    262 
    263 
    264 	return (fetchsz);
    265 }
    266 
    267 /*
    268  * given a zfetch and a zsearch structure, see if there is an associated zstream
    269  * for this block read.  If so, it starts a prefetch for the stream it
    270  * located and returns true, otherwise it returns false
    271  */
    272 static int
    273 dmu_zfetch_find(zfetch_t *zf, zstream_t *zh, int prefetched)
    274 {
    275 	zstream_t	*zs;
    276 	int64_t		diff;
    277 	int		reset = !prefetched;
    278 	int		rc = 0;
    279 
    280 	if (zh == NULL)
    281 		return (0);
    282 
    283 	/*
    284 	 * XXX: This locking strategy is a bit coarse; however, it's impact has
    285 	 * yet to be tested.  If this turns out to be an issue, it can be
    286 	 * modified in a number of different ways.
    287 	 */
    288 
    289 	rw_enter(&zf->zf_rwlock, RW_READER);
    290 top:
    291 
    292 	for (zs = list_head(&zf->zf_stream); zs;
    293 	    zs = list_next(&zf->zf_stream, zs)) {
    294 
    295 		/*
    296 		 * XXX - should this be an assert?
    297 		 */
    298 		if (zs->zst_len == 0) {
    299 			/* bogus stream */
    300 			continue;
    301 		}
    302 
    303 		/*
    304 		 * We hit this case when we are in a strided prefetch stream:
    305 		 * we will read "len" blocks before "striding".
    306 		 */
    307 		if (zh->zst_offset >= zs->zst_offset &&
    308 		    zh->zst_offset < zs->zst_offset + zs->zst_len) {
    309 			/* already fetched */
    310 			rc = 1;
    311 			goto out;
    312 		}
    313 
    314 		/*
    315 		 * This is the forward sequential read case: we increment
    316 		 * len by one each time we hit here, so we will enter this
    317 		 * case on every read.
    318 		 */
    319 		if (zh->zst_offset == zs->zst_offset + zs->zst_len) {
    320 
    321 			reset = !prefetched && zs->zst_len > 1;
    322 
    323 			mutex_enter(&zs->zst_lock);
    324 
    325 			if (zh->zst_offset != zs->zst_offset + zs->zst_len) {
    326 				mutex_exit(&zs->zst_lock);
    327 				goto top;
    328 			}
    329 			zs->zst_len += zh->zst_len;
    330 			diff = zs->zst_len - zfetch_block_cap;
    331 			if (diff > 0) {
    332 				zs->zst_offset += diff;
    333 				zs->zst_len = zs->zst_len > diff ?
    334 				    zs->zst_len - diff : 0;
    335 			}
    336 			zs->zst_direction = ZFETCH_FORWARD;
    337 
    338 			break;
    339 
    340 		/*
    341 		 * Same as above, but reading backwards through the file.
    342 		 */
    343 		} else if (zh->zst_offset == zs->zst_offset - zh->zst_len) {
    344 			/* backwards sequential access */
    345 
    346 			reset = !prefetched && zs->zst_len > 1;
    347 
    348 			mutex_enter(&zs->zst_lock);
    349 
    350 			if (zh->zst_offset != zs->zst_offset - zh->zst_len) {
    351 				mutex_exit(&zs->zst_lock);
    352 				goto top;
    353 			}
    354 
    355 			zs->zst_offset = zs->zst_offset > zh->zst_len ?
    356 			    zs->zst_offset - zh->zst_len : 0;
    357 			zs->zst_ph_offset = zs->zst_ph_offset > zh->zst_len ?
    358 			    zs->zst_ph_offset - zh->zst_len : 0;
    359 			zs->zst_len += zh->zst_len;
    360 
    361 			diff = zs->zst_len - zfetch_block_cap;
    362 			if (diff > 0) {
    363 				zs->zst_ph_offset = zs->zst_ph_offset > diff ?
    364 				    zs->zst_ph_offset - diff : 0;
    365 				zs->zst_len = zs->zst_len > diff ?
    366 				    zs->zst_len - diff : zs->zst_len;
    367 			}
    368 			zs->zst_direction = ZFETCH_BACKWARD;
    369 
    370 			break;
    371 
    372 		} else if ((zh->zst_offset - zs->zst_offset - zs->zst_stride <
    373 		    zs->zst_len) && (zs->zst_len != zs->zst_stride)) {
    374 			/* strided forward access */
    375 
    376 			mutex_enter(&zs->zst_lock);
    377 
    378 			if ((zh->zst_offset - zs->zst_offset - zs->zst_stride >=
    379 			    zs->zst_len) || (zs->zst_len == zs->zst_stride)) {
    380 				mutex_exit(&zs->zst_lock);
    381 				goto top;
    382 			}
    383 
    384 			zs->zst_offset += zs->zst_stride;
    385 			zs->zst_direction = ZFETCH_FORWARD;
    386 
    387 			break;
    388 
    389 		} else if ((zh->zst_offset - zs->zst_offset + zs->zst_stride <
    390 		    zs->zst_len) && (zs->zst_len != zs->zst_stride)) {
    391 			/* strided reverse access */
    392 
    393 			mutex_enter(&zs->zst_lock);
    394 
    395 			if ((zh->zst_offset - zs->zst_offset + zs->zst_stride >=
    396 			    zs->zst_len) || (zs->zst_len == zs->zst_stride)) {
    397 				mutex_exit(&zs->zst_lock);
    398 				goto top;
    399 			}
    400 
    401 			zs->zst_offset = zs->zst_offset > zs->zst_stride ?
    402 			    zs->zst_offset - zs->zst_stride : 0;
    403 			zs->zst_ph_offset = (zs->zst_ph_offset >
    404 			    (2 * zs->zst_stride)) ?
    405 			    (zs->zst_ph_offset - (2 * zs->zst_stride)) : 0;
    406 			zs->zst_direction = ZFETCH_BACKWARD;
    407 
    408 			break;
    409 		}
    410 	}
    411 
    412 	if (zs) {
    413 		if (reset) {
    414 			zstream_t *remove = zs;
    415 
    416 			rc = 0;
    417 			mutex_exit(&zs->zst_lock);
    418 			rw_exit(&zf->zf_rwlock);
    419 			rw_enter(&zf->zf_rwlock, RW_WRITER);
    420 			/*
    421 			 * Relocate the stream, in case someone removes
    422 			 * it while we were acquiring the WRITER lock.
    423 			 */
    424 			for (zs = list_head(&zf->zf_stream); zs;
    425 			    zs = list_next(&zf->zf_stream, zs)) {
    426 				if (zs == remove) {
    427 					dmu_zfetch_stream_remove(zf, zs);
    428 					mutex_destroy(&zs->zst_lock);
    429 					kmem_free(zs, sizeof (zstream_t));
    430 					break;
    431 				}
    432 			}
    433 		} else {
    434 			rc = 1;
    435 			dmu_zfetch_dofetch(zf, zs);
    436 			mutex_exit(&zs->zst_lock);
    437 		}
    438 	}
    439 out:
    440 	rw_exit(&zf->zf_rwlock);
    441 	return (rc);
    442 }
    443 
    444 /*
    445  * Clean-up state associated with a zfetch structure.  This frees allocated
    446  * structure members, empties the zf_stream tree, and generally makes things
    447  * nice.  This doesn't free the zfetch_t itself, that's left to the caller.
    448  */
    449 void
    450 dmu_zfetch_rele(zfetch_t *zf)
    451 {
    452 	zstream_t	*zs;
    453 	zstream_t	*zs_next;
    454 
    455 	ASSERT(!RW_LOCK_HELD(&zf->zf_rwlock));
    456 
    457 	for (zs = list_head(&zf->zf_stream); zs; zs = zs_next) {
    458 		zs_next = list_next(&zf->zf_stream, zs);
    459 
    460 		list_remove(&zf->zf_stream, zs);
    461 		mutex_destroy(&zs->zst_lock);
    462 		kmem_free(zs, sizeof (zstream_t));
    463 	}
    464 	list_destroy(&zf->zf_stream);
    465 	rw_destroy(&zf->zf_rwlock);
    466 
    467 	zf->zf_dnode = NULL;
    468 }
    469 
    470 /*
    471  * Given a zfetch and zstream structure, insert the zstream structure into the
    472  * AVL tree contained within the zfetch structure.  Peform the appropriate
    473  * book-keeping.  It is possible that another thread has inserted a stream which
    474  * matches one that we are about to insert, so we must be sure to check for this
    475  * case.  If one is found, return failure, and let the caller cleanup the
    476  * duplicates.
    477  */
    478 static int
    479 dmu_zfetch_stream_insert(zfetch_t *zf, zstream_t *zs)
    480 {
    481 	zstream_t	*zs_walk;
    482 	zstream_t	*zs_next;
    483 
    484 	ASSERT(RW_WRITE_HELD(&zf->zf_rwlock));
    485 
    486 	for (zs_walk = list_head(&zf->zf_stream); zs_walk; zs_walk = zs_next) {
    487 		zs_next = list_next(&zf->zf_stream, zs_walk);
    488 
    489 		if (dmu_zfetch_streams_equal(zs_walk, zs)) {
    490 		    return (0);
    491 		}
    492 	}
    493 
    494 	list_insert_head(&zf->zf_stream, zs);
    495 	zf->zf_stream_cnt++;
    496 
    497 	return (1);
    498 }
    499 
    500 
    501 /*
    502  * Walk the list of zstreams in the given zfetch, find an old one (by time), and
    503  * reclaim it for use by the caller.
    504  */
    505 static zstream_t *
    506 dmu_zfetch_stream_reclaim(zfetch_t *zf)
    507 {
    508 	zstream_t	*zs;
    509 
    510 	if (! rw_tryenter(&zf->zf_rwlock, RW_WRITER))
    511 		return (0);
    512 
    513 	for (zs = list_head(&zf->zf_stream); zs;
    514 	    zs = list_next(&zf->zf_stream, zs)) {
    515 
    516 		if (((lbolt - zs->zst_last) / hz) > zfetch_min_sec_reap)
    517 			break;
    518 	}
    519 
    520 	if (zs) {
    521 		dmu_zfetch_stream_remove(zf, zs);
    522 		mutex_destroy(&zs->zst_lock);
    523 		bzero(zs, sizeof (zstream_t));
    524 	} else {
    525 		zf->zf_alloc_fail++;
    526 	}
    527 	rw_exit(&zf->zf_rwlock);
    528 
    529 	return (zs);
    530 }
    531 
    532 /*
    533  * Given a zfetch and zstream structure, remove the zstream structure from its
    534  * container in the zfetch structure.  Perform the appropriate book-keeping.
    535  */
    536 static void
    537 dmu_zfetch_stream_remove(zfetch_t *zf, zstream_t *zs)
    538 {
    539 	ASSERT(RW_WRITE_HELD(&zf->zf_rwlock));
    540 
    541 	list_remove(&zf->zf_stream, zs);
    542 	zf->zf_stream_cnt--;
    543 }
    544 
    545 static int
    546 dmu_zfetch_streams_equal(zstream_t *zs1, zstream_t *zs2)
    547 {
    548 	if (zs1->zst_offset != zs2->zst_offset)
    549 		return (0);
    550 
    551 	if (zs1->zst_len != zs2->zst_len)
    552 		return (0);
    553 
    554 	if (zs1->zst_stride != zs2->zst_stride)
    555 		return (0);
    556 
    557 	if (zs1->zst_ph_offset != zs2->zst_ph_offset)
    558 		return (0);
    559 
    560 	if (zs1->zst_cap != zs2->zst_cap)
    561 		return (0);
    562 
    563 	if (zs1->zst_direction != zs2->zst_direction)
    564 		return (0);
    565 
    566 	return (1);
    567 }
    568 
    569 /*
    570  * This is the prefetch entry point.  It calls all of the other dmu_zfetch
    571  * routines to create, delete, find, or operate upon prefetch streams.
    572  */
    573 void
    574 dmu_zfetch(zfetch_t *zf, uint64_t offset, uint64_t size, int prefetched)
    575 {
    576 	zstream_t	zst;
    577 	zstream_t	*newstream;
    578 	int		fetched;
    579 	int		inserted;
    580 	unsigned int	blkshft;
    581 	uint64_t	blksz;
    582 
    583 	if (zfs_prefetch_disable)
    584 		return;
    585 
    586 	/* files that aren't ln2 blocksz are only one block -- nothing to do */
    587 	if (!zf->zf_dnode->dn_datablkshift)
    588 		return;
    589 
    590 	/* convert offset and size, into blockid and nblocks */
    591 	blkshft = zf->zf_dnode->dn_datablkshift;
    592 	blksz = (1 << blkshft);
    593 
    594 	bzero(&zst, sizeof (zstream_t));
    595 	zst.zst_offset = offset >> blkshft;
    596 	zst.zst_len = (P2ROUNDUP(offset + size, blksz) -
    597 	    P2ALIGN(offset, blksz)) >> blkshft;
    598 
    599 	fetched = dmu_zfetch_find(zf, &zst, prefetched);
    600 	if (!fetched) {
    601 		fetched = dmu_zfetch_colinear(zf, &zst);
    602 	}
    603 
    604 	if (!fetched) {
    605 		newstream = dmu_zfetch_stream_reclaim(zf);
    606 
    607 		/*
    608 		 * we still couldn't find a stream, drop the lock, and allocate
    609 		 * one if possible.  Otherwise, give up and go home.
    610 		 */
    611 		if (newstream == NULL) {
    612 			uint64_t	maxblocks;
    613 			uint32_t	max_streams;
    614 			uint32_t	cur_streams;
    615 
    616 			cur_streams = zf->zf_stream_cnt;
    617 			maxblocks = zf->zf_dnode->dn_maxblkid;
    618 
    619 			max_streams = MIN(zfetch_max_streams,
    620 			    (maxblocks / zfetch_block_cap));
    621 			if (max_streams == 0) {
    622 				max_streams++;
    623 			}
    624 
    625 			if (cur_streams >= max_streams) {
    626 				return;
    627 			}
    628 
    629 			newstream = kmem_zalloc(sizeof (zstream_t), KM_SLEEP);
    630 		}
    631 
    632 		newstream->zst_offset = zst.zst_offset;
    633 		newstream->zst_len = zst.zst_len;
    634 		newstream->zst_stride = zst.zst_len;
    635 		newstream->zst_ph_offset = zst.zst_len + zst.zst_offset;
    636 		newstream->zst_cap = zst.zst_len;
    637 		newstream->zst_direction = ZFETCH_FORWARD;
    638 		newstream->zst_last = lbolt;
    639 
    640 		mutex_init(&newstream->zst_lock, NULL, MUTEX_DEFAULT, NULL);
    641 
    642 		rw_enter(&zf->zf_rwlock, RW_WRITER);
    643 		inserted = dmu_zfetch_stream_insert(zf, newstream);
    644 		rw_exit(&zf->zf_rwlock);
    645 
    646 		if (!inserted) {
    647 			mutex_destroy(&newstream->zst_lock);
    648 			kmem_free(newstream, sizeof (zstream_t));
    649 		}
    650 	}
    651 }
    652