OpenGrok

Cross Reference: zio.c
xref: /onnv/onnv-gate/usr/src/uts/common/fs/zfs/zio.c
Home | History | Annotate | Line # | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
     23  */
     24 
     25 #include <sys/zfs_context.h>
     26 #include <sys/fm/fs/zfs.h>
     27 #include <sys/spa.h>
     28 #include <sys/txg.h>
     29 #include <sys/spa_impl.h>
     30 #include <sys/vdev_impl.h>
     31 #include <sys/zio_impl.h>
     32 #include <sys/zio_compress.h>
     33 #include <sys/zio_checksum.h>
     34 #include <sys/dmu_objset.h>
     35 #include <sys/arc.h>
     36 #include <sys/ddt.h>
     37 
     38 /*
     39  * ==========================================================================
     40  * I/O priority table
     41  * ==========================================================================
     42  */
     43 uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
     44 	0,	/* ZIO_PRIORITY_NOW		*/
     45 	0,	/* ZIO_PRIORITY_SYNC_READ	*/
     46 	0,	/* ZIO_PRIORITY_SYNC_WRITE	*/
     47 	0,	/* ZIO_PRIORITY_LOG_WRITE	*/
     48 	1,	/* ZIO_PRIORITY_CACHE_FILL	*/
     49 	1,	/* ZIO_PRIORITY_AGG		*/
     50 	4,	/* ZIO_PRIORITY_FREE		*/
     51 	4,	/* ZIO_PRIORITY_ASYNC_WRITE	*/
     52 	6,	/* ZIO_PRIORITY_ASYNC_READ	*/
     53 	10,	/* ZIO_PRIORITY_RESILVER	*/
     54 	20,	/* ZIO_PRIORITY_SCRUB		*/
     55 	2,	/* ZIO_PRIORITY_DDT_PREFETCH	*/
     56 };
     57 
     58 /*
     59  * ==========================================================================
     60  * I/O type descriptions
     61  * ==========================================================================
     62  */
     63 char *zio_type_name[ZIO_TYPES] = {
     64 	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
     65 	"zio_ioctl"
     66 };
     67 
     68 /*
     69  * ==========================================================================
     70  * I/O kmem caches
     71  * ==========================================================================
     72  */
     73 kmem_cache_t *zio_cache;
     74 kmem_cache_t *zio_link_cache;
     75 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
     76 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
     77 
     78 #ifdef _KERNEL
     79 extern vmem_t *zio_alloc_arena;
     80 #endif
     81 
     82 /*
     83  * An allocating zio is one that either currently has the DVA allocate
     84  * stage set or will have it later in its lifetime.
     85  */
     86 #define	IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
     87 
     88 boolean_t	zio_requeue_io_start_cut_in_line = B_TRUE;
     89 
     90 #ifdef ZFS_DEBUG
     91 int zio_buf_debug_limit = 16384;
     92 #else
     93 int zio_buf_debug_limit = 0;
     94 #endif
     95 
     96 void
     97 zio_init(void)
     98 {
     99 	size_t c;
    100 	vmem_t *data_alloc_arena = NULL;
    101 
    102 #ifdef _KERNEL
    103 	data_alloc_arena = zio_alloc_arena;
    104 #endif
    105 	zio_cache = kmem_cache_create("zio_cache",
    106 	    sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
    107 	zio_link_cache = kmem_cache_create("zio_link_cache",
    108 	    sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
    109 
    110 	/*
    111 	 * For small buffers, we want a cache for each multiple of
    112 	 * SPA_MINBLOCKSIZE.  For medium-size buffers, we want a cache
    113 	 * for each quarter-power of 2.  For large buffers, we want
    114 	 * a cache for each multiple of PAGESIZE.
    115 	 */
    116 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
    117 		size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
    118 		size_t p2 = size;
    119 		size_t align = 0;
    120 		size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
    121 
    122 		while (p2 & (p2 - 1))
    123 			p2 &= p2 - 1;
    124 
    125 		if (size <= 4 * SPA_MINBLOCKSIZE) {
    126 			align = SPA_MINBLOCKSIZE;
    127 		} else if (P2PHASE(size, PAGESIZE) == 0) {
    128 			align = PAGESIZE;
    129 		} else if (P2PHASE(size, p2 >> 2) == 0) {
    130 			align = p2 >> 2;
    131 		}
    132 
    133 		if (align != 0) {
    134 			char name[36];
    135 			(void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
    136 			zio_buf_cache[c] = kmem_cache_create(name, size,
    137 			    align, NULL, NULL, NULL, NULL, NULL, cflags);
    138 
    139 			/*
    140 			 * Since zio_data bufs do not appear in crash dumps, we
    141 			 * pass KMC_NOTOUCH so that no allocator metadata is
    142 			 * stored with the buffers.
    143 			 */
    144 			(void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
    145 			zio_data_buf_cache[c] = kmem_cache_create(name, size,
    146 			    align, NULL, NULL, NULL, NULL, data_alloc_arena,
    147 			    cflags | KMC_NOTOUCH);
    148 		}
    149 	}
    150 
    151 	while (--c != 0) {
    152 		ASSERT(zio_buf_cache[c] != NULL);
    153 		if (zio_buf_cache[c - 1] == NULL)
    154 			zio_buf_cache[c - 1] = zio_buf_cache[c];
    155 
    156 		ASSERT(zio_data_buf_cache[c] != NULL);
    157 		if (zio_data_buf_cache[c - 1] == NULL)
    158 			zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
    159 	}
    160 
    161 	zio_inject_init();
    162 }
    163 
    164 void
    165 zio_fini(void)
    166 {
    167 	size_t c;
    168 	kmem_cache_t *last_cache = NULL;
    169 	kmem_cache_t *last_data_cache = NULL;
    170 
    171 	for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
    172 		if (zio_buf_cache[c] != last_cache) {
    173 			last_cache = zio_buf_cache[c];
    174 			kmem_cache_destroy(zio_buf_cache[c]);
    175 		}
    176 		zio_buf_cache[c] = NULL;
    177 
    178 		if (zio_data_buf_cache[c] != last_data_cache) {
    179 			last_data_cache = zio_data_buf_cache[c];
    180 			kmem_cache_destroy(zio_data_buf_cache[c]);
    181 		}
    182 		zio_data_buf_cache[c] = NULL;
    183 	}
    184 
    185 	kmem_cache_destroy(zio_link_cache);
    186 	kmem_cache_destroy(zio_cache);
    187 
    188 	zio_inject_fini();
    189 }
    190 
    191 /*
    192  * ==========================================================================
    193  * Allocate and free I/O buffers
    194  * ==========================================================================
    195  */
    196 
    197 /*
    198  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
    199  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
    200  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
    201  * excess / transient data in-core during a crashdump.
    202  */
    203 void *
    204 zio_buf_alloc(size_t size)
    205 {
    206 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    207 
    208 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    209 
    210 	return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
    211 }
    212 
    213 /*
    214  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
    215  * crashdump if the kernel panics.  This exists so that we will limit the amount
    216  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
    217  * of kernel heap dumped to disk when the kernel panics)
    218  */
    219 void *
    220 zio_data_buf_alloc(size_t size)
    221 {
    222 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    223 
    224 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    225 
    226 	return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
    227 }
    228 
    229 void
    230 zio_buf_free(void *buf, size_t size)
    231 {
    232 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    233 
    234 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    235 
    236 	kmem_cache_free(zio_buf_cache[c], buf);
    237 }
    238 
    239 void
    240 zio_data_buf_free(void *buf, size_t size)
    241 {
    242 	size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
    243 
    244 	ASSERT(c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
    245 
    246 	kmem_cache_free(zio_data_buf_cache[c], buf);
    247 }
    248 
    249 /*
    250  * ==========================================================================
    251  * Push and pop I/O transform buffers
    252  * ==========================================================================
    253  */
    254 static void
    255 zio_push_transform(zio_t *zio, void *data, uint64_t size, uint64_t bufsize,
    256 	zio_transform_func_t *transform)
    257 {
    258 	zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
    259 
    260 	zt->zt_orig_data = zio->io_data;
    261 	zt->zt_orig_size = zio->io_size;
    262 	zt->zt_bufsize = bufsize;
    263 	zt->zt_transform = transform;
    264 
    265 	zt->zt_next = zio->io_transform_stack;
    266 	zio->io_transform_stack = zt;
    267 
    268 	zio->io_data = data;
    269 	zio->io_size = size;
    270 }
    271 
    272 static void
    273 zio_pop_transforms(zio_t *zio)
    274 {
    275 	zio_transform_t *zt;
    276 
    277 	while ((zt = zio->io_transform_stack) != NULL) {
    278 		if (zt->zt_transform != NULL)
    279 			zt->zt_transform(zio,
    280 			    zt->zt_orig_data, zt->zt_orig_size);
    281 
    282 		if (zt->zt_bufsize != 0)
    283 			zio_buf_free(zio->io_data, zt->zt_bufsize);
    284 
    285 		zio->io_data = zt->zt_orig_data;
    286 		zio->io_size = zt->zt_orig_size;
    287 		zio->io_transform_stack = zt->zt_next;
    288 
    289 		kmem_free(zt, sizeof (zio_transform_t));
    290 	}
    291 }
    292 
    293 /*
    294  * ==========================================================================
    295  * I/O transform callbacks for subblocks and decompression
    296  * ==========================================================================
    297  */
    298 static void
    299 zio_subblock(zio_t *zio, void *data, uint64_t size)
    300 {
    301 	ASSERT(zio->io_size > size);
    302 
    303 	if (zio->io_type == ZIO_TYPE_READ)
    304 		bcopy(zio->io_data, data, size);
    305 }
    306 
    307 static void
    308 zio_decompress(zio_t *zio, void *data, uint64_t size)
    309 {
    310 	if (zio->io_error == 0 &&
    311 	    zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
    312 	    zio->io_data, data, zio->io_size, size) != 0)
    313 		zio->io_error = EIO;
    314 }
    315 
    316 /*
    317  * ==========================================================================
    318  * I/O parent/child relationships and pipeline interlocks
    319  * ==========================================================================
    320  */
    321 /*
    322  * NOTE - Callers to zio_walk_parents() and zio_walk_children must
    323  *        continue calling these functions until they return NULL.
    324  *        Otherwise, the next caller will pick up the list walk in
    325  *        some indeterminate state.  (Otherwise every caller would
    326  *        have to pass in a cookie to keep the state represented by
    327  *        io_walk_link, which gets annoying.)
    328  */
    329 zio_t *
    330 zio_walk_parents(zio_t *cio)
    331 {
    332 	zio_link_t *zl = cio->io_walk_link;
    333 	list_t *pl = &cio->io_parent_list;
    334 
    335 	zl = (zl == NULL) ? list_head(pl) : list_next(pl, zl);
    336 	cio->io_walk_link = zl;
    337 
    338 	if (zl == NULL)
    339 		return (NULL);
    340 
    341 	ASSERT(zl->zl_child == cio);
    342 	return (zl->zl_parent);
    343 }
    344 
    345 zio_t *
    346 zio_walk_children(zio_t *pio)
    347 {
    348 	zio_link_t *zl = pio->io_walk_link;
    349 	list_t *cl = &pio->io_child_list;
    350 
    351 	zl = (zl == NULL) ? list_head(cl) : list_next(cl, zl);
    352 	pio->io_walk_link = zl;
    353 
    354 	if (zl == NULL)
    355 		return (NULL);
    356 
    357 	ASSERT(zl->zl_parent == pio);
    358 	return (zl->zl_child);
    359 }
    360 
    361 zio_t *
    362 zio_unique_parent(zio_t *cio)
    363 {
    364 	zio_t *pio = zio_walk_parents(cio);
    365 
    366 	VERIFY(zio_walk_parents(cio) == NULL);
    367 	return (pio);
    368 }
    369 
    370 void
    371 zio_add_child(zio_t *pio, zio_t *cio)
    372 {
    373 	zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
    374 
    375 	/*
    376 	 * Logical I/Os can have logical, gang, or vdev children.
    377 	 * Gang I/Os can have gang or vdev children.
    378 	 * Vdev I/Os can only have vdev children.
    379 	 * The following ASSERT captures all of these constraints.
    380 	 */
    381 	ASSERT(cio->io_child_type <= pio->io_child_type);
    382 
    383 	zl->zl_parent = pio;
    384 	zl->zl_child = cio;
    385 
    386 	mutex_enter(&cio->io_lock);
    387 	mutex_enter(&pio->io_lock);
    388 
    389 	ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
    390 
    391 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
    392 		pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
    393 
    394 	list_insert_head(&pio->io_child_list, zl);
    395 	list_insert_head(&cio->io_parent_list, zl);
    396 
    397 	pio->io_child_count++;
    398 	cio->io_parent_count++;
    399 
    400 	mutex_exit(&pio->io_lock);
    401 	mutex_exit(&cio->io_lock);
    402 }
    403 
    404 static void
    405 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
    406 {
    407 	ASSERT(zl->zl_parent == pio);
    408 	ASSERT(zl->zl_child == cio);
    409 
    410 	mutex_enter(&cio->io_lock);
    411 	mutex_enter(&pio->io_lock);
    412 
    413 	list_remove(&pio->io_child_list, zl);
    414 	list_remove(&cio->io_parent_list, zl);
    415 
    416 	pio->io_child_count--;
    417 	cio->io_parent_count--;
    418 
    419 	mutex_exit(&pio->io_lock);
    420 	mutex_exit(&cio->io_lock);
    421 
    422 	kmem_cache_free(zio_link_cache, zl);
    423 }
    424 
    425 static boolean_t
    426 zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
    427 {
    428 	uint64_t *countp = &zio->io_children[child][wait];
    429 	boolean_t waiting = B_FALSE;
    430 
    431 	mutex_enter(&zio->io_lock);
    432 	ASSERT(zio->io_stall == NULL);
    433 	if (*countp != 0) {
    434 		zio->io_stage >>= 1;
    435 		zio->io_stall = countp;
    436 		waiting = B_TRUE;
    437 	}
    438 	mutex_exit(&zio->io_lock);
    439 
    440 	return (waiting);
    441 }
    442 
    443 static void
    444 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
    445 {
    446 	uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
    447 	int *errorp = &pio->io_child_error[zio->io_child_type];
    448 
    449 	mutex_enter(&pio->io_lock);
    450 	if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
    451 		*errorp = zio_worst_error(*errorp, zio->io_error);
    452 	pio->io_reexecute |= zio->io_reexecute;
    453 	ASSERT3U(*countp, >, 0);
    454 	if (--*countp == 0 && pio->io_stall == countp) {
    455 		pio->io_stall = NULL;
    456 		mutex_exit(&pio->io_lock);
    457 		zio_execute(pio);
    458 	} else {
    459 		mutex_exit(&pio->io_lock);
    460 	}
    461 }
    462 
    463 static void
    464 zio_inherit_child_errors(zio_t *zio, enum zio_child c)
    465 {
    466 	if (zio->io_child_error[c] != 0 && zio->io_error == 0)
    467 		zio->io_error = zio->io_child_error[c];
    468 }
    469 
    470 /*
    471  * ==========================================================================
    472  * Create the various types of I/O (read, write, free, etc)
    473  * ==========================================================================
    474  */
    475 static zio_t *
    476 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
    477     void *data, uint64_t size, zio_done_func_t *done, void *private,
    478     zio_type_t type, int priority, enum zio_flag flags,
    479     vdev_t *vd, uint64_t offset, const zbookmark_t *zb,
    480     enum zio_stage stage, enum zio_stage pipeline)
    481 {
    482 	zio_t *zio;
    483 
    484 	ASSERT3U(size, <=, SPA_MAXBLOCKSIZE);
    485 	ASSERT(P2PHASE(size, SPA_MINBLOCKSIZE) == 0);
    486 	ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
    487 
    488 	ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
    489 	ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
    490 	ASSERT(vd || stage == ZIO_STAGE_OPEN);
    491 
    492 	zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
    493 	bzero(zio, sizeof (zio_t));
    494 
    495 	mutex_init(&zio->io_lock, NULL, MUTEX_DEFAULT, NULL);
    496 	cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
    497 
    498 	list_create(&zio->io_parent_list, sizeof (zio_link_t),
    499 	    offsetof(zio_link_t, zl_parent_node));
    500 	list_create(&zio->io_child_list, sizeof (zio_link_t),
    501 	    offsetof(zio_link_t, zl_child_node));
    502 
    503 	if (vd != NULL)
    504 		zio->io_child_type = ZIO_CHILD_VDEV;
    505 	else if (flags & ZIO_FLAG_GANG_CHILD)
    506 		zio->io_child_type = ZIO_CHILD_GANG;
    507 	else if (flags & ZIO_FLAG_DDT_CHILD)
    508 		zio->io_child_type = ZIO_CHILD_DDT;
    509 	else
    510 		zio->io_child_type = ZIO_CHILD_LOGICAL;
    511 
    512 	if (bp != NULL) {
    513 		zio->io_bp = (blkptr_t *)bp;
    514 		zio->io_bp_copy = *bp;
    515 		zio->io_bp_orig = *bp;
    516 		if (type != ZIO_TYPE_WRITE ||
    517 		    zio->io_child_type == ZIO_CHILD_DDT)
    518 			zio->io_bp = &zio->io_bp_copy;	/* so caller can free */
    519 		if (zio->io_child_type == ZIO_CHILD_LOGICAL)
    520 			zio->io_logical = zio;
    521 		if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
    522 			pipeline |= ZIO_GANG_STAGES;
    523 	}
    524 
    525 	zio->io_spa = spa;
    526 	zio->io_txg = txg;
    527 	zio->io_done = done;
    528 	zio->io_private = private;
    529 	zio->io_type = type;
    530 	zio->io_priority = priority;
    531 	zio->io_vd = vd;
    532 	zio->io_offset = offset;
    533 	zio->io_orig_data = zio->io_data = data;
    534 	zio->io_orig_size = zio->io_size = size;
    535 	zio->io_orig_flags = zio->io_flags = flags;
    536 	zio->io_orig_stage = zio->io_stage = stage;
    537 	zio->io_orig_pipeline = zio->io_pipeline = pipeline;
    538 
    539 	zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
    540 	zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
    541 
    542 	if (zb != NULL)
    543 		zio->io_bookmark = *zb;
    544 
    545 	if (pio != NULL) {
    546 		if (zio->io_logical == NULL)
    547 			zio->io_logical = pio->io_logical;
    548 		if (zio->io_child_type == ZIO_CHILD_GANG)
    549 			zio->io_gang_leader = pio->io_gang_leader;
    550 		zio_add_child(pio, zio);
    551 	}
    552 
    553 	return (zio);
    554 }
    555 
    556 static void
    557 zio_destroy(zio_t *zio)
    558 {
    559 	list_destroy(&zio->io_parent_list);
    560 	list_destroy(&zio->io_child_list);
    561 	mutex_destroy(&zio->io_lock);
    562 	cv_destroy(&zio->io_cv);
    563 	kmem_cache_free(zio_cache, zio);
    564 }
    565 
    566 zio_t *
    567 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
    568     void *private, enum zio_flag flags)
    569 {
    570 	zio_t *zio;
    571 
    572 	zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
    573 	    ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
    574 	    ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
    575 
    576 	return (zio);
    577 }
    578 
    579 zio_t *
    580 zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
    581 {
    582 	return (zio_null(NULL, spa, NULL, done, private, flags));
    583 }
    584 
    585 zio_t *
    586 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
    587     void *data, uint64_t size, zio_done_func_t *done, void *private,
    588     int priority, enum zio_flag flags, const zbookmark_t *zb)
    589 {
    590 	zio_t *zio;
    591 
    592 	zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
    593 	    data, size, done, private,
    594 	    ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
    595 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
    596 	    ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
    597 
    598 	return (zio);
    599 }
    600 
    601 zio_t *
    602 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
    603     void *data, uint64_t size, const zio_prop_t *zp,
    604     zio_done_func_t *ready, zio_done_func_t *done, void *private,
    605     int priority, enum zio_flag flags, const zbookmark_t *zb)
    606 {
    607 	zio_t *zio;
    608 
    609 	ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
    610 	    zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
    611 	    zp->zp_compress >= ZIO_COMPRESS_OFF &&
    612 	    zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
    613 	    zp->zp_type < DMU_OT_NUMTYPES &&
    614 	    zp->zp_level < 32 &&
    615 	    zp->zp_copies > 0 &&
    616 	    zp->zp_copies <= spa_max_replication(spa) &&
    617 	    zp->zp_dedup <= 1 &&
    618 	    zp->zp_dedup_verify <= 1);
    619 
    620 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
    621 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
    622 	    ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
    623 	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
    624 
    625 	zio->io_ready = ready;
    626 	zio->io_prop = *zp;
    627 
    628 	return (zio);
    629 }
    630 
    631 zio_t *
    632 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
    633     uint64_t size, zio_done_func_t *done, void *private, int priority,
    634     enum zio_flag flags, zbookmark_t *zb)
    635 {
    636 	zio_t *zio;
    637 
    638 	zio = zio_create(pio, spa, txg, bp, data, size, done, private,
    639 	    ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
    640 	    ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
    641 
    642 	return (zio);
    643 }
    644 
    645 void
    646 zio_write_override(zio_t *zio, blkptr_t *bp, int copies)
    647 {
    648 	ASSERT(zio->io_type == ZIO_TYPE_WRITE);
    649 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
    650 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
    651 	ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
    652 
    653 	zio->io_prop.zp_copies = copies;
    654 	zio->io_bp_override = bp;
    655 }
    656 
    657 void
    658 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
    659 {
    660 	bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
    661 }
    662 
    663 zio_t *
    664 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
    665     enum zio_flag flags)
    666 {
    667 	zio_t *zio;
    668 
    669 	dprintf_bp(bp, "freeing in txg %llu, pass %u",
    670 	    (longlong_t)txg, spa->spa_sync_pass);
    671 
    672 	ASSERT(!BP_IS_HOLE(bp));
    673 	ASSERT(spa_syncing_txg(spa) == txg);
    674 	ASSERT(spa_sync_pass(spa) <= SYNC_PASS_DEFERRED_FREE);
    675 
    676 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
    677 	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, flags,
    678 	    NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_FREE_PIPELINE);
    679 
    680 	return (zio);
    681 }
    682 
    683 zio_t *
    684 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
    685     zio_done_func_t *done, void *private, enum zio_flag flags)
    686 {
    687 	zio_t *zio;
    688 
    689 	/*
    690 	 * A claim is an allocation of a specific block.  Claims are needed
    691 	 * to support immediate writes in the intent log.  The issue is that
    692 	 * immediate writes contain committed data, but in a txg that was
    693 	 * *not* committed.  Upon opening the pool after an unclean shutdown,
    694 	 * the intent log claims all blocks that contain immediate write data
    695 	 * so that the SPA knows they're in use.
    696 	 *
    697 	 * All claims *must* be resolved in the first txg -- before the SPA
    698 	 * starts allocating blocks -- so that nothing is allocated twice.
    699 	 * If txg == 0 we just verify that the block is claimable.
    700 	 */
    701 	ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
    702 	ASSERT(txg == spa_first_txg(spa) || txg == 0);
    703 	ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));	/* zdb(1M) */
    704 
    705 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
    706 	    done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW, flags,
    707 	    NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
    708 
    709 	return (zio);
    710 }
    711 
    712 zio_t *
    713 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
    714     zio_done_func_t *done, void *private, int priority, enum zio_flag flags)
    715 {
    716 	zio_t *zio;
    717 	int c;
    718 
    719 	if (vd->vdev_children == 0) {
    720 		zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
    721 		    ZIO_TYPE_IOCTL, priority, flags, vd, 0, NULL,
    722 		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
    723 
    724 		zio->io_cmd = cmd;
    725 	} else {
    726 		zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
    727 
    728 		for (c = 0; c < vd->vdev_children; c++)
    729 			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
    730 			    done, private, priority, flags));
    731 	}
    732 
    733 	return (zio);
    734 }
    735 
    736 zio_t *
    737 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
    738     void *data, int checksum, zio_done_func_t *done, void *private,
    739     int priority, enum zio_flag flags, boolean_t labels)
    740 {
    741 	zio_t *zio;
    742 
    743 	ASSERT(vd->vdev_children == 0);
    744 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
    745 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
    746 	ASSERT3U(offset + size, <=, vd->vdev_psize);
    747 
    748 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
    749 	    ZIO_TYPE_READ, priority, flags, vd, offset, NULL,
    750 	    ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
    751 
    752 	zio->io_prop.zp_checksum = checksum;
    753 
    754 	return (zio);
    755 }
    756 
    757 zio_t *
    758 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
    759     void *data, int checksum, zio_done_func_t *done, void *private,
    760     int priority, enum zio_flag flags, boolean_t labels)
    761 {
    762 	zio_t *zio;
    763 
    764 	ASSERT(vd->vdev_children == 0);
    765 	ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
    766 	    offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
    767 	ASSERT3U(offset + size, <=, vd->vdev_psize);
    768 
    769 	zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, done, private,
    770 	    ZIO_TYPE_WRITE, priority, flags, vd, offset, NULL,
    771 	    ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
    772 
    773 	zio->io_prop.zp_checksum = checksum;
    774 
    775 	if (zio_checksum_table[checksum].ci_eck) {
    776 		/*
    777 		 * zec checksums are necessarily destructive -- they modify
    778 		 * the end of the write buffer to hold the verifier/checksum.
    779 		 * Therefore, we must make a local copy in case the data is
    780 		 * being written to multiple places in parallel.
    781 		 */
    782 		void *wbuf = zio_buf_alloc(size);
    783 		bcopy(data, wbuf, size);
    784 		zio_push_transform(zio, wbuf, size, size, NULL);
    785 	}
    786 
    787 	return (zio);
    788 }
    789 
    790 /*
    791  * Create a child I/O to do some work for us.
    792  */
    793 zio_t *
    794 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
    795 	void *data, uint64_t size, int type, int priority, enum zio_flag flags,
    796 	zio_done_func_t *done, void *private)
    797 {
    798 	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
    799 	zio_t *zio;
    800 
    801 	ASSERT(vd->vdev_parent ==
    802 	    (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
    803 
    804 	if (type == ZIO_TYPE_READ && bp != NULL) {
    805 		/*
    806 		 * If we have the bp, then the child should perform the
    807 		 * checksum and the parent need not.  This pushes error
    808 		 * detection as close to the leaves as possible and
    809 		 * eliminates redundant checksums in the interior nodes.
    810 		 */
    811 		pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
    812 		pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
    813 	}
    814 
    815 	if (vd->vdev_children == 0)
    816 		offset += VDEV_LABEL_START_SIZE;
    817 
    818 	flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
    819 
    820 	/*
    821 	 * If we've decided to do a repair, the write is not speculative --
    822 	 * even if the original read was.
    823 	 */
    824 	if (flags & ZIO_FLAG_IO_REPAIR)
    825 		flags &= ~ZIO_FLAG_SPECULATIVE;
    826 
    827 	zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size,
    828 	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
    829 	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
    830 
    831 	return (zio);
    832 }
    833 
    834 zio_t *
    835 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
    836 	int type, int priority, enum zio_flag flags,
    837 	zio_done_func_t *done, void *private)
    838 {
    839 	zio_t *zio;
    840 
    841 	ASSERT(vd->vdev_ops->vdev_op_leaf);
    842 
    843 	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
    844 	    data, size, done, private, type, priority,
    845 	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY,
    846 	    vd, offset, NULL,
    847 	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
    848 
    849 	return (zio);
    850 }
    851 
    852 void
    853 zio_flush(zio_t *zio, vdev_t *vd)
    854 {
    855 	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
    856 	    NULL, NULL, ZIO_PRIORITY_NOW,
    857 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
    858 }
    859 
    860 void
    861 zio_shrink(zio_t *zio, uint64_t size)
    862 {
    863 	ASSERT(zio->io_executor == NULL);
    864 	ASSERT(zio->io_orig_size == zio->io_size);
    865 	ASSERT(size <= zio->io_size);
    866 
    867 	/*
    868 	 * We don't shrink for raidz because of problems with the
    869 	 * reconstruction when reading back less than the block size.
    870 	 * Note, BP_IS_RAIDZ() assumes no compression.
    871 	 */
    872 	ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
    873 	if (!BP_IS_RAIDZ(zio->io_bp))
    874 		zio->io_orig_size = zio->io_size = size;
    875 }
    876 
    877 /*
    878  * ==========================================================================
    879  * Prepare to read and write logical blocks
    880  * ==========================================================================
    881  */
    882 
    883 static int
    884 zio_read_bp_init(zio_t *zio)
    885 {
    886 	blkptr_t *bp = zio->io_bp;
    887 
    888 	if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
    889 	    zio->io_child_type == ZIO_CHILD_LOGICAL &&
    890 	    !(zio->io_flags & ZIO_FLAG_RAW)) {
    891 		uint64_t psize = BP_GET_PSIZE(bp);
    892 		void *cbuf = zio_buf_alloc(psize);
    893 
    894 		zio_push_transform(zio, cbuf, psize, psize, zio_decompress);
    895 	}
    896 
    897 	if (!dmu_ot[BP_GET_TYPE(bp)].ot_metadata && BP_GET_LEVEL(bp) == 0)
    898 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
    899 
    900 	if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
    901 		zio->io_flags |= ZIO_FLAG_DONT_CACHE;
    902 
    903 	if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
    904 		zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
    905 
    906 	return (ZIO_PIPELINE_CONTINUE);
    907 }
    908 
    909 static int
    910 zio_write_bp_init(zio_t *zio)
    911 {
    912 	spa_t *spa = zio->io_spa;
    913 	zio_prop_t *zp = &zio->io_prop;
    914 	enum zio_compress compress = zp->zp_compress;
    915 	blkptr_t *bp = zio->io_bp;
    916 	uint64_t lsize = zio->io_size;
    917 	uint64_t psize = lsize;
    918 	int pass = 1;
    919 
    920 	/*
    921 	 * If our children haven't all reached the ready stage,
    922 	 * wait for them and then repeat this pipeline stage.
    923 	 */
    924 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
    925 	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
    926 		return (ZIO_PIPELINE_STOP);
    927 
    928 	if (!IO_IS_ALLOCATING(zio))
    929 		return (ZIO_PIPELINE_CONTINUE);
    930 
    931 	ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
    932 
    933 	if (zio->io_bp_override) {
    934 		ASSERT(bp->blk_birth != zio->io_txg);
    935 		ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
    936 
    937 		*bp = *zio->io_bp_override;
    938 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
    939 
    940 		if (BP_IS_HOLE(bp) || !zp->zp_dedup)
    941 			return (ZIO_PIPELINE_CONTINUE);
    942 
    943 		ASSERT(zio_checksum_table[zp->zp_checksum].ci_dedup ||
    944 		    zp->zp_dedup_verify);
    945 
    946 		if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
    947 			BP_SET_DEDUP(bp, 1);
    948 			zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
    949 			return (ZIO_PIPELINE_CONTINUE);
    950 		}
    951 		zio->io_bp_override = NULL;
    952 		BP_ZERO(bp);
    953 	}
    954 
    955 	if (bp->blk_birth == zio->io_txg) {
    956 		/*
    957 		 * We're rewriting an existing block, which means we're
    958 		 * working on behalf of spa_sync().  For spa_sync() to
    959 		 * converge, it must eventually be the case that we don't
    960 		 * have to allocate new blocks.  But compression changes
    961 		 * the blocksize, which forces a reallocate, and makes
    962 		 * convergence take longer.  Therefore, after the first
    963 		 * few passes, stop compressing to ensure convergence.
    964 		 */
    965 		pass = spa_sync_pass(spa);
    966 
    967 		ASSERT(zio->io_txg == spa_syncing_txg(spa));
    968 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
    969 		ASSERT(!BP_GET_DEDUP(bp));
    970 
    971 		if (pass > SYNC_PASS_DONT_COMPRESS)
    972 			compress = ZIO_COMPRESS_OFF;
    973 
    974 		/* Make sure someone doesn't change their mind on overwrites */
    975 		ASSERT(MIN(zp->zp_copies + BP_IS_GANG(bp),
    976 		    spa_max_replication(spa)) == BP_GET_NDVAS(bp));
    977 	}
    978 
    979 	if (compress != ZIO_COMPRESS_OFF) {
    980 		void *cbuf = zio_buf_alloc(lsize);
    981 		psize = zio_compress_data(compress, zio->io_data, cbuf, lsize);
    982 		if (psize == 0 || psize == lsize) {
    983 			compress = ZIO_COMPRESS_OFF;
    984 			zio_buf_free(cbuf, lsize);
    985 		} else {
    986 			ASSERT(psize < lsize);
    987 			zio_push_transform(zio, cbuf, psize, lsize, NULL);
    988 		}
    989 	}
    990 
    991 	/*
    992 	 * The final pass of spa_sync() must be all rewrites, but the first
    993 	 * few passes offer a trade-off: allocating blocks defers convergence,
    994 	 * but newly allocated blocks are sequential, so they can be written
    995 	 * to disk faster.  Therefore, we allow the first few passes of
    996 	 * spa_sync() to allocate new blocks, but force rewrites after that.
    997 	 * There should only be a handful of blocks after pass 1 in any case.
    998 	 */
    999 	if (bp->blk_birth == zio->io_txg && BP_GET_PSIZE(bp) == psize &&
   1000 	    pass > SYNC_PASS_REWRITE) {
   1001 		ASSERT(psize != 0);
   1002 		enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
   1003 		zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
   1004 		zio->io_flags |= ZIO_FLAG_IO_REWRITE;
   1005 	} else {
   1006 		BP_ZERO(bp);
   1007 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
   1008 	}
   1009 
   1010 	if (psize == 0) {
   1011 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1012 	} else {
   1013 		ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
   1014 		BP_SET_LSIZE(bp, lsize);
   1015 		BP_SET_PSIZE(bp, psize);
   1016 		BP_SET_COMPRESS(bp, compress);
   1017 		BP_SET_CHECKSUM(bp, zp->zp_checksum);
   1018 		BP_SET_TYPE(bp, zp->zp_type);
   1019 		BP_SET_LEVEL(bp, zp->zp_level);
   1020 		BP_SET_DEDUP(bp, zp->zp_dedup);
   1021 		BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
   1022 		if (zp->zp_dedup) {
   1023 			ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1024 			ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
   1025 			zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
   1026 		}
   1027 	}
   1028 
   1029 	return (ZIO_PIPELINE_CONTINUE);
   1030 }
   1031 
   1032 static int
   1033 zio_free_bp_init(zio_t *zio)
   1034 {
   1035 	blkptr_t *bp = zio->io_bp;
   1036 
   1037 	if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
   1038 		if (BP_GET_DEDUP(bp))
   1039 			zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
   1040 	}
   1041 
   1042 	return (ZIO_PIPELINE_CONTINUE);
   1043 }
   1044 
   1045 /*
   1046  * ==========================================================================
   1047  * Execute the I/O pipeline
   1048  * ==========================================================================
   1049  */
   1050 
   1051 static void
   1052 zio_taskq_dispatch(zio_t *zio, enum zio_taskq_type q, boolean_t cutinline)
   1053 {
   1054 	spa_t *spa = zio->io_spa;
   1055 	zio_type_t t = zio->io_type;
   1056 	int flags = TQ_SLEEP | (cutinline ? TQ_FRONT : 0);
   1057 
   1058 	/*
   1059 	 * If we're a config writer or a probe, the normal issue and
   1060 	 * interrupt threads may all be blocked waiting for the config lock.
   1061 	 * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
   1062 	 */
   1063 	if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
   1064 		t = ZIO_TYPE_NULL;
   1065 
   1066 	/*
   1067 	 * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
   1068 	 */
   1069 	if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
   1070 		t = ZIO_TYPE_NULL;
   1071 
   1072 	/*
   1073 	 * If this is a high priority I/O, then use the high priority taskq.
   1074 	 */
   1075 	if (zio->io_priority == ZIO_PRIORITY_NOW &&
   1076 	    spa->spa_zio_taskq[t][q + 1] != NULL)
   1077 		q++;
   1078 
   1079 	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
   1080 	(void) taskq_dispatch(spa->spa_zio_taskq[t][q],
   1081 	    (task_func_t *)zio_execute, zio, flags);
   1082 }
   1083 
   1084 static boolean_t
   1085 zio_taskq_member(zio_t *zio, enum zio_taskq_type q)
   1086 {
   1087 	kthread_t *executor = zio->io_executor;
   1088 	spa_t *spa = zio->io_spa;
   1089 
   1090 	for (zio_type_t t = 0; t < ZIO_TYPES; t++)
   1091 		if (taskq_member(spa->spa_zio_taskq[t][q], executor))
   1092 			return (B_TRUE);
   1093 
   1094 	return (B_FALSE);
   1095 }
   1096 
   1097 static int
   1098 zio_issue_async(zio_t *zio)
   1099 {
   1100 	zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
   1101 
   1102 	return (ZIO_PIPELINE_STOP);
   1103 }
   1104 
   1105 void
   1106 zio_interrupt(zio_t *zio)
   1107 {
   1108 	zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
   1109 }
   1110 
   1111 /*
   1112  * Execute the I/O pipeline until one of the following occurs:
   1113  * (1) the I/O completes; (2) the pipeline stalls waiting for
   1114  * dependent child I/Os; (3) the I/O issues, so we're waiting
   1115  * for an I/O completion interrupt; (4) the I/O is delegated by
   1116  * vdev-level caching or aggregation; (5) the I/O is deferred
   1117  * due to vdev-level queueing; (6) the I/O is handed off to
   1118  * another thread.  In all cases, the pipeline stops whenever
   1119  * there's no CPU work; it never burns a thread in cv_wait().
   1120  *
   1121  * There's no locking on io_stage because there's no legitimate way
   1122  * for multiple threads to be attempting to process the same I/O.
   1123  */
   1124 static zio_pipe_stage_t *zio_pipeline[];
   1125 
   1126 void
   1127 zio_execute(zio_t *zio)
   1128 {
   1129 	zio->io_executor = curthread;
   1130 
   1131 	while (zio->io_stage < ZIO_STAGE_DONE) {
   1132 		enum zio_stage pipeline = zio->io_pipeline;
   1133 		enum zio_stage stage = zio->io_stage;
   1134 		int rv;
   1135 
   1136 		ASSERT(!MUTEX_HELD(&zio->io_lock));
   1137 		ASSERT(ISP2(stage));
   1138 		ASSERT(zio->io_stall == NULL);
   1139 
   1140 		do {
   1141 			stage <<= 1;
   1142 		} while ((stage & pipeline) == 0);
   1143 
   1144 		ASSERT(stage <= ZIO_STAGE_DONE);
   1145 
   1146 		/*
   1147 		 * If we are in interrupt context and this pipeline stage
   1148 		 * will grab a config lock that is held across I/O,
   1149 		 * or may wait for an I/O that needs an interrupt thread
   1150 		 * to complete, issue async to avoid deadlock.
   1151 		 *
   1152 		 * For VDEV_IO_START, we cut in line so that the io will
   1153 		 * be sent to disk promptly.
   1154 		 */
   1155 		if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
   1156 		    zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
   1157 			boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
   1158 			    zio_requeue_io_start_cut_in_line : B_FALSE;
   1159 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
   1160 			return;
   1161 		}
   1162 
   1163 		zio->io_stage = stage;
   1164 		rv = zio_pipeline[highbit(stage) - 1](zio);
   1165 
   1166 		if (rv == ZIO_PIPELINE_STOP)
   1167 			return;
   1168 
   1169 		ASSERT(rv == ZIO_PIPELINE_CONTINUE);
   1170 	}
   1171 }
   1172 
   1173 /*
   1174  * ==========================================================================
   1175  * Initiate I/O, either sync or async
   1176  * ==========================================================================
   1177  */
   1178 int
   1179 zio_wait(zio_t *zio)
   1180 {
   1181 	int error;
   1182 
   1183 	ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
   1184 	ASSERT(zio->io_executor == NULL);
   1185 
   1186 	zio->io_waiter = curthread;
   1187 
   1188 	zio_execute(zio);
   1189 
   1190 	mutex_enter(&zio->io_lock);
   1191 	while (zio->io_executor != NULL)
   1192 		cv_wait(&zio->io_cv, &zio->io_lock);
   1193 	mutex_exit(&zio->io_lock);
   1194 
   1195 	error = zio->io_error;
   1196 	zio_destroy(zio);
   1197 
   1198 	return (error);
   1199 }
   1200 
   1201 void
   1202 zio_nowait(zio_t *zio)
   1203 {
   1204 	ASSERT(zio->io_executor == NULL);
   1205 
   1206 	if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
   1207 	    zio_unique_parent(zio) == NULL) {
   1208 		/*
   1209 		 * This is a logical async I/O with no parent to wait for it.
   1210 		 * We add it to the spa_async_root_zio "Godfather" I/O which
   1211 		 * will ensure they complete prior to unloading the pool.
   1212 		 */
   1213 		spa_t *spa = zio->io_spa;
   1214 
   1215 		zio_add_child(spa->spa_async_zio_root, zio);
   1216 	}
   1217 
   1218 	zio_execute(zio);
   1219 }
   1220 
   1221 /*
   1222  * ==========================================================================
   1223  * Reexecute or suspend/resume failed I/O
   1224  * ==========================================================================
   1225  */
   1226 
   1227 static void
   1228 zio_reexecute(zio_t *pio)
   1229 {
   1230 	zio_t *cio, *cio_next;
   1231 
   1232 	ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
   1233 	ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
   1234 	ASSERT(pio->io_gang_leader == NULL);
   1235 	ASSERT(pio->io_gang_tree == NULL);
   1236 
   1237 	pio->io_flags = pio->io_orig_flags;
   1238 	pio->io_stage = pio->io_orig_stage;
   1239 	pio->io_pipeline = pio->io_orig_pipeline;
   1240 	pio->io_reexecute = 0;
   1241 	pio->io_error = 0;
   1242 	for (int w = 0; w < ZIO_WAIT_TYPES; w++)
   1243 		pio->io_state[w] = 0;
   1244 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
   1245 		pio->io_child_error[c] = 0;
   1246 
   1247 	if (IO_IS_ALLOCATING(pio))
   1248 		BP_ZERO(pio->io_bp);
   1249 
   1250 	/*
   1251 	 * As we reexecute pio's children, new children could be created.
   1252 	 * New children go to the head of pio's io_child_list, however,
   1253 	 * so we will (correctly) not reexecute them.  The key is that
   1254 	 * the remainder of pio's io_child_list, from 'cio_next' onward,
   1255 	 * cannot be affected by any side effects of reexecuting 'cio'.
   1256 	 */
   1257 	for (cio = zio_walk_children(pio); cio != NULL; cio = cio_next) {
   1258 		cio_next = zio_walk_children(pio);
   1259 		mutex_enter(&pio->io_lock);
   1260 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
   1261 			pio->io_children[cio->io_child_type][w]++;
   1262 		mutex_exit(&pio->io_lock);
   1263 		zio_reexecute(cio);
   1264 	}
   1265 
   1266 	/*
   1267 	 * Now that all children have been reexecuted, execute the parent.
   1268 	 * We don't reexecute "The Godfather" I/O here as it's the
   1269 	 * responsibility of the caller to wait on him.
   1270 	 */
   1271 	if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
   1272 		zio_execute(pio);
   1273 }
   1274 
   1275 void
   1276 zio_suspend(spa_t *spa, zio_t *zio)
   1277 {
   1278 	if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
   1279 		fm_panic("Pool '%s' has encountered an uncorrectable I/O "
   1280 		    "failure and the failure mode property for this pool "
   1281 		    "is set to panic.", spa_name(spa));
   1282 
   1283 	zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
   1284 
   1285 	mutex_enter(&spa->spa_suspend_lock);
   1286 
   1287 	if (spa->spa_suspend_zio_root == NULL)
   1288 		spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
   1289 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
   1290 		    ZIO_FLAG_GODFATHER);
   1291 
   1292 	spa->spa_suspended = B_TRUE;
   1293 
   1294 	if (zio != NULL) {
   1295 		ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
   1296 		ASSERT(zio != spa->spa_suspend_zio_root);
   1297 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1298 		ASSERT(zio_unique_parent(zio) == NULL);
   1299 		ASSERT(zio->io_stage == ZIO_STAGE_DONE);
   1300 		zio_add_child(spa->spa_suspend_zio_root, zio);
   1301 	}
   1302 
   1303 	mutex_exit(&spa->spa_suspend_lock);
   1304 }
   1305 
   1306 int
   1307 zio_resume(spa_t *spa)
   1308 {
   1309 	zio_t *pio;
   1310 
   1311 	/*
   1312 	 * Reexecute all previously suspended i/o.
   1313 	 */
   1314 	mutex_enter(&spa->spa_suspend_lock);
   1315 	spa->spa_suspended = B_FALSE;
   1316 	cv_broadcast(&spa->spa_suspend_cv);
   1317 	pio = spa->spa_suspend_zio_root;
   1318 	spa->spa_suspend_zio_root = NULL;
   1319 	mutex_exit(&spa->spa_suspend_lock);
   1320 
   1321 	if (pio == NULL)
   1322 		return (0);
   1323 
   1324 	zio_reexecute(pio);
   1325 	return (zio_wait(pio));
   1326 }
   1327 
   1328 void
   1329 zio_resume_wait(spa_t *spa)
   1330 {
   1331 	mutex_enter(&spa->spa_suspend_lock);
   1332 	while (spa_suspended(spa))
   1333 		cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
   1334 	mutex_exit(&spa->spa_suspend_lock);
   1335 }
   1336 
   1337 /*
   1338  * ==========================================================================
   1339  * Gang blocks.
   1340  *
   1341  * A gang block is a collection of small blocks that looks to the DMU
   1342  * like one large block.  When zio_dva_allocate() cannot find a block
   1343  * of the requested size, due to either severe fragmentation or the pool
   1344  * being nearly full, it calls zio_write_gang_block() to construct the
   1345  * block from smaller fragments.
   1346  *
   1347  * A gang block consists of a gang header (zio_gbh_phys_t) and up to
   1348  * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
   1349  * an indirect block: it's an array of block pointers.  It consumes
   1350  * only one sector and hence is allocatable regardless of fragmentation.
   1351  * The gang header's bps point to its gang members, which hold the data.
   1352  *
   1353  * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
   1354  * as the verifier to ensure uniqueness of the SHA256 checksum.
   1355  * Critically, the gang block bp's blk_cksum is the checksum of the data,
   1356  * not the gang header.  This ensures that data block signatures (needed for
   1357  * deduplication) are independent of how the block is physically stored.
   1358  *
   1359  * Gang blocks can be nested: a gang member may itself be a gang block.
   1360  * Thus every gang block is a tree in which root and all interior nodes are
   1361  * gang headers, and the leaves are normal blocks that contain user data.
   1362  * The root of the gang tree is called the gang leader.
   1363  *
   1364  * To perform any operation (read, rewrite, free, claim) on a gang block,
   1365  * zio_gang_assemble() first assembles the gang tree (minus data leaves)
   1366  * in the io_gang_tree field of the original logical i/o by recursively
   1367  * reading the gang leader and all gang headers below it.  This yields
   1368  * an in-core tree containing the contents of every gang header and the
   1369  * bps for every constituent of the gang block.
   1370  *
   1371  * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
   1372  * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
   1373  * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
   1374  * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
   1375  * zio_read_gang() is a wrapper around zio_read() that omits reading gang
   1376  * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
   1377  * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
   1378  * of the gang header plus zio_checksum_compute() of the data to update the
   1379  * gang header's blk_cksum as described above.
   1380  *
   1381  * The two-phase assemble/issue model solves the problem of partial failure --
   1382  * what if you'd freed part of a gang block but then couldn't read the
   1383  * gang header for another part?  Assembling the entire gang tree first
   1384  * ensures that all the necessary gang header I/O has succeeded before
   1385  * starting the actual work of free, claim, or write.  Once the gang tree
   1386  * is assembled, free and claim are in-memory operations that cannot fail.
   1387  *
   1388  * In the event that a gang write fails, zio_dva_unallocate() walks the
   1389  * gang tree to immediately free (i.e. insert back into the space map)
   1390  * everything we've allocated.  This ensures that we don't get ENOSPC
   1391  * errors during repeated suspend/resume cycles due to a flaky device.
   1392  *
   1393  * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
   1394  * the gang tree, we won't modify the block, so we can safely defer the free
   1395  * (knowing that the block is still intact).  If we *can* assemble the gang
   1396  * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
   1397  * each constituent bp and we can allocate a new block on the next sync pass.
   1398  *
   1399  * In all cases, the gang tree allows complete recovery from partial failure.
   1400  * ==========================================================================
   1401  */
   1402 
   1403 static zio_t *
   1404 zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1405 {
   1406 	if (gn != NULL)
   1407 		return (pio);
   1408 
   1409 	return (zio_read(pio, pio->io_spa, bp, data, BP_GET_PSIZE(bp),
   1410 	    NULL, NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
   1411 	    &pio->io_bookmark));
   1412 }
   1413 
   1414 zio_t *
   1415 zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1416 {
   1417 	zio_t *zio;
   1418 
   1419 	if (gn != NULL) {
   1420 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
   1421 		    gn->gn_gbh, SPA_GANGBLOCKSIZE, NULL, NULL, pio->io_priority,
   1422 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   1423 		/*
   1424 		 * As we rewrite each gang header, the pipeline will compute
   1425 		 * a new gang block header checksum for it; but no one will
   1426 		 * compute a new data checksum, so we do that here.  The one
   1427 		 * exception is the gang leader: the pipeline already computed
   1428 		 * its data checksum because that stage precedes gang assembly.
   1429 		 * (Presently, nothing actually uses interior data checksums;
   1430 		 * this is just good hygiene.)
   1431 		 */
   1432 		if (gn != pio->io_gang_leader->io_gang_tree) {
   1433 			zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
   1434 			    data, BP_GET_PSIZE(bp));
   1435 		}
   1436 		/*
   1437 		 * If we are here to damage data for testing purposes,
   1438 		 * leave the GBH alone so that we can detect the damage.
   1439 		 */
   1440 		if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
   1441 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
   1442 	} else {
   1443 		zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
   1444 		    data, BP_GET_PSIZE(bp), NULL, NULL, pio->io_priority,
   1445 		    ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   1446 	}
   1447 
   1448 	return (zio);
   1449 }
   1450 
   1451 /* ARGSUSED */
   1452 zio_t *
   1453 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1454 {
   1455 	return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
   1456 	    ZIO_GANG_CHILD_FLAGS(pio)));
   1457 }
   1458 
   1459 /* ARGSUSED */
   1460 zio_t *
   1461 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, void *data)
   1462 {
   1463 	return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
   1464 	    NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
   1465 }
   1466 
   1467 static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
   1468 	NULL,
   1469 	zio_read_gang,
   1470 	zio_rewrite_gang,
   1471 	zio_free_gang,
   1472 	zio_claim_gang,
   1473 	NULL
   1474 };
   1475 
   1476 static void zio_gang_tree_assemble_done(zio_t *zio);
   1477 
   1478 static zio_gang_node_t *
   1479 zio_gang_node_alloc(zio_gang_node_t **gnpp)
   1480 {
   1481 	zio_gang_node_t *gn;
   1482 
   1483 	ASSERT(*gnpp == NULL);
   1484 
   1485 	gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
   1486 	gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
   1487 	*gnpp = gn;
   1488 
   1489 	return (gn);
   1490 }
   1491 
   1492 static void
   1493 zio_gang_node_free(zio_gang_node_t **gnpp)
   1494 {
   1495 	zio_gang_node_t *gn = *gnpp;
   1496 
   1497 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
   1498 		ASSERT(gn->gn_child[g] == NULL);
   1499 
   1500 	zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
   1501 	kmem_free(gn, sizeof (*gn));
   1502 	*gnpp = NULL;
   1503 }
   1504 
   1505 static void
   1506 zio_gang_tree_free(zio_gang_node_t **gnpp)
   1507 {
   1508 	zio_gang_node_t *gn = *gnpp;
   1509 
   1510 	if (gn == NULL)
   1511 		return;
   1512 
   1513 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++)
   1514 		zio_gang_tree_free(&gn->gn_child[g]);
   1515 
   1516 	zio_gang_node_free(gnpp);
   1517 }
   1518 
   1519 static void
   1520 zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
   1521 {
   1522 	zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
   1523 
   1524 	ASSERT(gio->io_gang_leader == gio);
   1525 	ASSERT(BP_IS_GANG(bp));
   1526 
   1527 	zio_nowait(zio_read(gio, gio->io_spa, bp, gn->gn_gbh,
   1528 	    SPA_GANGBLOCKSIZE, zio_gang_tree_assemble_done, gn,
   1529 	    gio->io_priority, ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
   1530 }
   1531 
   1532 static void
   1533 zio_gang_tree_assemble_done(zio_t *zio)
   1534 {
   1535 	zio_t *gio = zio->io_gang_leader;
   1536 	zio_gang_node_t *gn = zio->io_private;
   1537 	blkptr_t *bp = zio->io_bp;
   1538 
   1539 	ASSERT(gio == zio_unique_parent(zio));
   1540 	ASSERT(zio->io_child_count == 0);
   1541 
   1542 	if (zio->io_error)
   1543 		return;
   1544 
   1545 	if (BP_SHOULD_BYTESWAP(bp))
   1546 		byteswap_uint64_array(zio->io_data, zio->io_size);
   1547 
   1548 	ASSERT(zio->io_data == gn->gn_gbh);
   1549 	ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
   1550 	ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
   1551 
   1552 	for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
   1553 		blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
   1554 		if (!BP_IS_GANG(gbp))
   1555 			continue;
   1556 		zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
   1557 	}
   1558 }
   1559 
   1560 static void
   1561 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, void *data)
   1562 {
   1563 	zio_t *gio = pio->io_gang_leader;
   1564 	zio_t *zio;
   1565 
   1566 	ASSERT(BP_IS_GANG(bp) == !!gn);
   1567 	ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
   1568 	ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
   1569 
   1570 	/*
   1571 	 * If you're a gang header, your data is in gn->gn_gbh.
   1572 	 * If you're a gang member, your data is in 'data' and gn == NULL.
   1573 	 */
   1574 	zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data);
   1575 
   1576 	if (gn != NULL) {
   1577 		ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
   1578 
   1579 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
   1580 			blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
   1581 			if (BP_IS_HOLE(gbp))
   1582 				continue;
   1583 			zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data);
   1584 			data = (char *)data + BP_GET_PSIZE(gbp);
   1585 		}
   1586 	}
   1587 
   1588 	if (gn == gio->io_gang_tree)
   1589 		ASSERT3P((char *)gio->io_data + gio->io_size, ==, data);
   1590 
   1591 	if (zio != pio)
   1592 		zio_nowait(zio);
   1593 }
   1594 
   1595 static int
   1596 zio_gang_assemble(zio_t *zio)
   1597 {
   1598 	blkptr_t *bp = zio->io_bp;
   1599 
   1600 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
   1601 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   1602 
   1603 	zio->io_gang_leader = zio;
   1604 
   1605 	zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
   1606 
   1607 	return (ZIO_PIPELINE_CONTINUE);
   1608 }
   1609 
   1610 static int
   1611 zio_gang_issue(zio_t *zio)
   1612 {
   1613 	blkptr_t *bp = zio->io_bp;
   1614 
   1615 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
   1616 		return (ZIO_PIPELINE_STOP);
   1617 
   1618 	ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
   1619 	ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   1620 
   1621 	if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
   1622 		zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_data);
   1623 	else
   1624 		zio_gang_tree_free(&zio->io_gang_tree);
   1625 
   1626 	zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1627 
   1628 	return (ZIO_PIPELINE_CONTINUE);
   1629 }
   1630 
   1631 static void
   1632 zio_write_gang_member_ready(zio_t *zio)
   1633 {
   1634 	zio_t *pio = zio_unique_parent(zio);
   1635 	zio_t *gio = zio->io_gang_leader;
   1636 	dva_t *cdva = zio->io_bp->blk_dva;
   1637 	dva_t *pdva = pio->io_bp->blk_dva;
   1638 	uint64_t asize;
   1639 
   1640 	if (BP_IS_HOLE(zio->io_bp))
   1641 		return;
   1642 
   1643 	ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
   1644 
   1645 	ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
   1646 	ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
   1647 	ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
   1648 	ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
   1649 	ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
   1650 
   1651 	mutex_enter(&pio->io_lock);
   1652 	for (int d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
   1653 		ASSERT(DVA_GET_GANG(&pdva[d]));
   1654 		asize = DVA_GET_ASIZE(&pdva[d]);
   1655 		asize += DVA_GET_ASIZE(&cdva[d]);
   1656 		DVA_SET_ASIZE(&pdva[d], asize);
   1657 	}
   1658 	mutex_exit(&pio->io_lock);
   1659 }
   1660 
   1661 static int
   1662 zio_write_gang_block(zio_t *pio)
   1663 {
   1664 	spa_t *spa = pio->io_spa;
   1665 	blkptr_t *bp = pio->io_bp;
   1666 	zio_t *gio = pio->io_gang_leader;
   1667 	zio_t *zio;
   1668 	zio_gang_node_t *gn, **gnpp;
   1669 	zio_gbh_phys_t *gbh;
   1670 	uint64_t txg = pio->io_txg;
   1671 	uint64_t resid = pio->io_size;
   1672 	uint64_t lsize;
   1673 	int copies = gio->io_prop.zp_copies;
   1674 	int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
   1675 	zio_prop_t zp;
   1676 	int error;
   1677 
   1678 	error = metaslab_alloc(spa, spa_normal_class(spa), SPA_GANGBLOCKSIZE,
   1679 	    bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp,
   1680 	    METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER);
   1681 	if (error) {
   1682 		pio->io_error = error;
   1683 		return (ZIO_PIPELINE_CONTINUE);
   1684 	}
   1685 
   1686 	if (pio == gio) {
   1687 		gnpp = &gio->io_gang_tree;
   1688 	} else {
   1689 		gnpp = pio->io_private;
   1690 		ASSERT(pio->io_ready == zio_write_gang_member_ready);
   1691 	}
   1692 
   1693 	gn = zio_gang_node_alloc(gnpp);
   1694 	gbh = gn->gn_gbh;
   1695 	bzero(gbh, SPA_GANGBLOCKSIZE);
   1696 
   1697 	/*
   1698 	 * Create the gang header.
   1699 	 */
   1700 	zio = zio_rewrite(pio, spa, txg, bp, gbh, SPA_GANGBLOCKSIZE, NULL, NULL,
   1701 	    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
   1702 
   1703 	/*
   1704 	 * Create and nowait the gang children.
   1705 	 */
   1706 	for (int g = 0; resid != 0; resid -= lsize, g++) {
   1707 		lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
   1708 		    SPA_MINBLOCKSIZE);
   1709 		ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
   1710 
   1711 		zp.zp_checksum = gio->io_prop.zp_checksum;
   1712 		zp.zp_compress = ZIO_COMPRESS_OFF;
   1713 		zp.zp_type = DMU_OT_NONE;
   1714 		zp.zp_level = 0;
   1715 		zp.zp_copies = gio->io_prop.zp_copies;
   1716 		zp.zp_dedup = 0;
   1717 		zp.zp_dedup_verify = 0;
   1718 
   1719 		zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
   1720 		    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
   1721 		    zio_write_gang_member_ready, NULL, &gn->gn_child[g],
   1722 		    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
   1723 		    &pio->io_bookmark));
   1724 	}
   1725 
   1726 	/*
   1727 	 * Set pio's pipeline to just wait for zio to finish.
   1728 	 */
   1729 	pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   1730 
   1731 	zio_nowait(zio);
   1732 
   1733 	return (ZIO_PIPELINE_CONTINUE);
   1734 }
   1735 
   1736 /*
   1737  * ==========================================================================
   1738  * Dedup
   1739  * ==========================================================================
   1740  */
   1741 static void
   1742 zio_ddt_child_read_done(zio_t *zio)
   1743 {
   1744 	blkptr_t *bp = zio->io_bp;
   1745 	ddt_entry_t *dde = zio->io_private;
   1746 	ddt_phys_t *ddp;
   1747 	zio_t *pio = zio_unique_parent(zio);
   1748 
   1749 	mutex_enter(&pio->io_lock);
   1750 	ddp = ddt_phys_select(dde, bp);
   1751 	if (zio->io_error == 0)
   1752 		ddt_phys_clear(ddp);	/* this ddp doesn't need repair */
   1753 	if (zio->io_error == 0 && dde->dde_repair_data == NULL)
   1754 		dde->dde_repair_data = zio->io_data;
   1755 	else
   1756 		zio_buf_free(zio->io_data, zio->io_size);
   1757 	mutex_exit(&pio->io_lock);
   1758 }
   1759 
   1760 static int
   1761 zio_ddt_read_start(zio_t *zio)
   1762 {
   1763 	blkptr_t *bp = zio->io_bp;
   1764 
   1765 	ASSERT(BP_GET_DEDUP(bp));
   1766 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
   1767 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1768 
   1769 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
   1770 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
   1771 		ddt_entry_t *dde = ddt_repair_start(ddt, bp);
   1772 		ddt_phys_t *ddp = dde->dde_phys;
   1773 		ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
   1774 		blkptr_t blk;
   1775 
   1776 		ASSERT(zio->io_vsd == NULL);
   1777 		zio->io_vsd = dde;
   1778 
   1779 		if (ddp_self == NULL)
   1780 			return (ZIO_PIPELINE_CONTINUE);
   1781 
   1782 		for (int p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
   1783 			if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
   1784 				continue;
   1785 			ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
   1786 			    &blk);
   1787 			zio_nowait(zio_read(zio, zio->io_spa, &blk,
   1788 			    zio_buf_alloc(zio->io_size), zio->io_size,
   1789 			    zio_ddt_child_read_done, dde, zio->io_priority,
   1790 			    ZIO_DDT_CHILD_FLAGS(zio) | ZIO_FLAG_DONT_PROPAGATE,
   1791 			    &zio->io_bookmark));
   1792 		}
   1793 		return (ZIO_PIPELINE_CONTINUE);
   1794 	}
   1795 
   1796 	zio_nowait(zio_read(zio, zio->io_spa, bp,
   1797 	    zio->io_data, zio->io_size, NULL, NULL, zio->io_priority,
   1798 	    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
   1799 
   1800 	return (ZIO_PIPELINE_CONTINUE);
   1801 }
   1802 
   1803 static int
   1804 zio_ddt_read_done(zio_t *zio)
   1805 {
   1806 	blkptr_t *bp = zio->io_bp;
   1807 
   1808 	if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
   1809 		return (ZIO_PIPELINE_STOP);
   1810 
   1811 	ASSERT(BP_GET_DEDUP(bp));
   1812 	ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
   1813 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   1814 
   1815 	if (zio->io_child_error[ZIO_CHILD_DDT]) {
   1816 		ddt_t *ddt = ddt_select(zio->io_spa, bp);
   1817 		ddt_entry_t *dde = zio->io_vsd;
   1818 		if (ddt == NULL) {
   1819 			ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
   1820 			return (ZIO_PIPELINE_CONTINUE);
   1821 		}
   1822 		if (dde == NULL) {
   1823 			zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
   1824 			zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
   1825 			return (ZIO_PIPELINE_STOP);
   1826 		}
   1827 		if (dde->dde_repair_data != NULL) {
   1828 			bcopy(dde->dde_repair_data, zio->io_data, zio->io_size);
   1829 			zio->io_child_error[ZIO_CHILD_DDT] = 0;
   1830 		}
   1831 		ddt_repair_done(ddt, dde);
   1832 		zio->io_vsd = NULL;
   1833 	}
   1834 
   1835 	ASSERT(zio->io_vsd == NULL);
   1836 
   1837 	return (ZIO_PIPELINE_CONTINUE);
   1838 }
   1839 
   1840 static boolean_t
   1841 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
   1842 {
   1843 	spa_t *spa = zio->io_spa;
   1844 
   1845 	/*
   1846 	 * Note: we compare the original data, not the transformed data,
   1847 	 * because when zio->io_bp is an override bp, we will not have
   1848 	 * pushed the I/O transforms.  That's an important optimization
   1849 	 * because otherwise we'd compress/encrypt all dmu_sync() data twice.
   1850 	 */
   1851 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
   1852 		zio_t *lio = dde->dde_lead_zio[p];
   1853 
   1854 		if (lio != NULL) {
   1855 			return (lio->io_orig_size != zio->io_orig_size ||
   1856 			    bcmp(zio->io_orig_data, lio->io_orig_data,
   1857 			    zio->io_orig_size) != 0);
   1858 		}
   1859 	}
   1860 
   1861 	for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
   1862 		ddt_phys_t *ddp = &dde->dde_phys[p];
   1863 
   1864 		if (ddp->ddp_phys_birth != 0) {
   1865 			arc_buf_t *abuf = NULL;
   1866 			uint32_t aflags = ARC_WAIT;
   1867 			blkptr_t blk = *zio->io_bp;
   1868 			int error;
   1869 
   1870 			ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
   1871 
   1872 			ddt_exit(ddt);
   1873 
   1874 			error = arc_read_nolock(NULL, spa, &blk,
   1875 			    arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
   1876 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
   1877 			    &aflags, &zio->io_bookmark);
   1878 
   1879 			if (error == 0) {
   1880 				if (arc_buf_size(abuf) != zio->io_orig_size ||
   1881 				    bcmp(abuf->b_data, zio->io_orig_data,
   1882 				    zio->io_orig_size) != 0)
   1883 					error = EEXIST;
   1884 				VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
   1885 			}
   1886 
   1887 			ddt_enter(ddt);
   1888 			return (error != 0);
   1889 		}
   1890 	}
   1891 
   1892 	return (B_FALSE);
   1893 }
   1894 
   1895 static void
   1896 zio_ddt_child_write_ready(zio_t *zio)
   1897 {
   1898 	int p = zio->io_prop.zp_copies;
   1899 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
   1900 	ddt_entry_t *dde = zio->io_private;
   1901 	ddt_phys_t *ddp = &dde->dde_phys[p];
   1902 	zio_t *pio;
   1903 
   1904 	if (zio->io_error)
   1905 		return;
   1906 
   1907 	ddt_enter(ddt);
   1908 
   1909 	ASSERT(dde->dde_lead_zio[p] == zio);
   1910 
   1911 	ddt_phys_fill(ddp, zio->io_bp);
   1912 
   1913 	while ((pio = zio_walk_parents(zio)) != NULL)
   1914 		ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
   1915 
   1916 	ddt_exit(ddt);
   1917 }
   1918 
   1919 static void
   1920 zio_ddt_child_write_done(zio_t *zio)
   1921 {
   1922 	int p = zio->io_prop.zp_copies;
   1923 	ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
   1924 	ddt_entry_t *dde = zio->io_private;
   1925 	ddt_phys_t *ddp = &dde->dde_phys[p];
   1926 
   1927 	ddt_enter(ddt);
   1928 
   1929 	ASSERT(ddp->ddp_refcnt == 0);
   1930 	ASSERT(dde->dde_lead_zio[p] == zio);
   1931 	dde->dde_lead_zio[p] = NULL;
   1932 
   1933 	if (zio->io_error == 0) {
   1934 		while (zio_walk_parents(zio) != NULL)
   1935 			ddt_phys_addref(ddp);
   1936 	} else {
   1937 		ddt_phys_clear(ddp);
   1938 	}
   1939 
   1940 	ddt_exit(ddt);
   1941 }
   1942 
   1943 static void
   1944 zio_ddt_ditto_write_done(zio_t *zio)
   1945 {
   1946 	int p = DDT_PHYS_DITTO;
   1947 	zio_prop_t *zp = &zio->io_prop;
   1948 	blkptr_t *bp = zio->io_bp;
   1949 	ddt_t *ddt = ddt_select(zio->io_spa, bp);
   1950 	ddt_entry_t *dde = zio->io_private;
   1951 	ddt_phys_t *ddp = &dde->dde_phys[p];
   1952 	ddt_key_t *ddk = &dde->dde_key;
   1953 
   1954 	ddt_enter(ddt);
   1955 
   1956 	ASSERT(ddp->ddp_refcnt == 0);
   1957 	ASSERT(dde->dde_lead_zio[p] == zio);
   1958 	dde->dde_lead_zio[p] = NULL;
   1959 
   1960 	if (zio->io_error == 0) {
   1961 		ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
   1962 		ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
   1963 		ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
   1964 		if (ddp->ddp_phys_birth != 0)
   1965 			ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
   1966 		ddt_phys_fill(ddp, bp);
   1967 	}
   1968 
   1969 	ddt_exit(ddt);
   1970 }
   1971 
   1972 static int
   1973 zio_ddt_write(zio_t *zio)
   1974 {
   1975 	spa_t *spa = zio->io_spa;
   1976 	blkptr_t *bp = zio->io_bp;
   1977 	uint64_t txg = zio->io_txg;
   1978 	zio_prop_t *zp = &zio->io_prop;
   1979 	int p = zp->zp_copies;
   1980 	int ditto_copies;
   1981 	zio_t *cio = NULL;
   1982 	zio_t *dio = NULL;
   1983 	ddt_t *ddt = ddt_select(spa, bp);
   1984 	ddt_entry_t *dde;
   1985 	ddt_phys_t *ddp;
   1986 
   1987 	ASSERT(BP_GET_DEDUP(bp));
   1988 	ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
   1989 	ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
   1990 
   1991 	ddt_enter(ddt);
   1992 	dde = ddt_lookup(ddt, bp, B_TRUE);
   1993 	ddp = &dde->dde_phys[p];
   1994 
   1995 	if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
   1996 		/*
   1997 		 * If we're using a weak checksum, upgrade to a strong checksum
   1998 		 * and try again.  If we're already using a strong checksum,
   1999 		 * we can't resolve it, so just convert to an ordinary write.
   2000 		 * (And automatically e-mail a paper to Nature?)
   2001 		 */
   2002 		if (!zio_checksum_table[zp->zp_checksum].ci_dedup) {
   2003 			zp->zp_checksum = spa_dedup_checksum(spa);
   2004 			zio_pop_transforms(zio);
   2005 			zio->io_stage = ZIO_STAGE_OPEN;
   2006 			BP_ZERO(bp);
   2007 		} else {
   2008 			zp->zp_dedup = 0;
   2009 		}
   2010 		zio->io_pipeline = ZIO_WRITE_PIPELINE;
   2011 		ddt_exit(ddt);
   2012 		return (ZIO_PIPELINE_CONTINUE);
   2013 	}
   2014 
   2015 	ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
   2016 	ASSERT(ditto_copies < SPA_DVAS_PER_BP);
   2017 
   2018 	if (ditto_copies > ddt_ditto_copies_present(dde) &&
   2019 	    dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
   2020 		zio_prop_t czp = *zp;
   2021 
   2022 		czp.zp_copies = ditto_copies;
   2023 
   2024 		/*
   2025 		 * If we arrived here with an override bp, we won't have run
   2026 		 * the transform stack, so we won't have the data we need to
   2027 		 * generate a child i/o.  So, toss the override bp and restart.
   2028 		 * This is safe, because using the override bp is just an
   2029 		 * optimization; and it's rare, so the cost doesn't matter.
   2030 		 */
   2031 		if (zio->io_bp_override) {
   2032 			zio_pop_transforms(zio);
   2033 			zio->io_stage = ZIO_STAGE_OPEN;
   2034 			zio->io_pipeline = ZIO_WRITE_PIPELINE;
   2035 			zio->io_bp_override = NULL;
   2036 			BP_ZERO(bp);
   2037 			ddt_exit(ddt);
   2038 			return (ZIO_PIPELINE_CONTINUE);
   2039 		}
   2040 
   2041 		dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
   2042 		    zio->io_orig_size, &czp, NULL,
   2043 		    zio_ddt_ditto_write_done, dde, zio->io_priority,
   2044 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
   2045 
   2046 		zio_push_transform(dio, zio->io_data, zio->io_size, 0, NULL);
   2047 		dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
   2048 	}
   2049 
   2050 	if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
   2051 		if (ddp->ddp_phys_birth != 0)
   2052 			ddt_bp_fill(ddp, bp, txg);
   2053 		if (dde->dde_lead_zio[p] != NULL)
   2054 			zio_add_child(zio, dde->dde_lead_zio[p]);
   2055 		else
   2056 			ddt_phys_addref(ddp);
   2057 	} else if (zio->io_bp_override) {
   2058 		ASSERT(bp->blk_birth == txg);
   2059 		ASSERT(BP_EQUAL(bp, zio->io_bp_override));
   2060 		ddt_phys_fill(ddp, bp);
   2061 		ddt_phys_addref(ddp);
   2062 	} else {
   2063 		cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
   2064 		    zio->io_orig_size, zp, zio_ddt_child_write_ready,
   2065 		    zio_ddt_child_write_done, dde, zio->io_priority,
   2066 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
   2067 
   2068 		zio_push_transform(cio, zio->io_data, zio->io_size, 0, NULL);
   2069 		dde->dde_lead_zio[p] = cio;
   2070 	}
   2071 
   2072 	ddt_exit(ddt);
   2073 
   2074 	if (cio)
   2075 		zio_nowait(cio);
   2076 	if (dio)
   2077 		zio_nowait(dio);
   2078 
   2079 	return (ZIO_PIPELINE_CONTINUE);
   2080 }
   2081 
   2082 ddt_entry_t *freedde; /* for debugging */
   2083 
   2084 static int
   2085 zio_ddt_free(zio_t *zio)
   2086 {
   2087 	spa_t *spa = zio->io_spa;
   2088 	blkptr_t *bp = zio->io_bp;
   2089 	ddt_t *ddt = ddt_select(spa, bp);
   2090 	ddt_entry_t *dde;
   2091 	ddt_phys_t *ddp;
   2092 
   2093 	ASSERT(BP_GET_DEDUP(bp));
   2094 	ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   2095 
   2096 	ddt_enter(ddt);
   2097 	freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
   2098 	ddp = ddt_phys_select(dde, bp);
   2099 	ddt_phys_decref(ddp);
   2100 	ddt_exit(ddt);
   2101 
   2102 	return (ZIO_PIPELINE_CONTINUE);
   2103 }
   2104 
   2105 /*
   2106  * ==========================================================================
   2107  * Allocate and free blocks
   2108  * ==========================================================================
   2109  */
   2110 static int
   2111 zio_dva_allocate(zio_t *zio)
   2112 {
   2113 	spa_t *spa = zio->io_spa;
   2114 	metaslab_class_t *mc = spa_normal_class(spa);
   2115 	blkptr_t *bp = zio->io_bp;
   2116 	int error;
   2117 
   2118 	if (zio->io_gang_leader == NULL) {
   2119 		ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
   2120 		zio->io_gang_leader = zio;
   2121 	}
   2122 
   2123 	ASSERT(BP_IS_HOLE(bp));
   2124 	ASSERT3U(BP_GET_NDVAS(bp), ==, 0);
   2125 	ASSERT3U(zio->io_prop.zp_copies, >, 0);
   2126 	ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
   2127 	ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
   2128 
   2129 	error = metaslab_alloc(spa, mc, zio->io_size, bp,
   2130 	    zio->io_prop.zp_copies, zio->io_txg, NULL, 0);
   2131 
   2132 	if (error) {
   2133 		if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
   2134 			return (zio_write_gang_block(zio));
   2135 		zio->io_error = error;
   2136 	}
   2137 
   2138 	return (ZIO_PIPELINE_CONTINUE);
   2139 }
   2140 
   2141 static int
   2142 zio_dva_free(zio_t *zio)
   2143 {
   2144 	metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
   2145 
   2146 	return (ZIO_PIPELINE_CONTINUE);
   2147 }
   2148 
   2149 static int
   2150 zio_dva_claim(zio_t *zio)
   2151 {
   2152 	int error;
   2153 
   2154 	error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
   2155 	if (error)
   2156 		zio->io_error = error;
   2157 
   2158 	return (ZIO_PIPELINE_CONTINUE);
   2159 }
   2160 
   2161 /*
   2162  * Undo an allocation.  This is used by zio_done() when an I/O fails
   2163  * and we want to give back the block we just allocated.
   2164  * This handles both normal blocks and gang blocks.
   2165  */
   2166 static void
   2167 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
   2168 {
   2169 	ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
   2170 	ASSERT(zio->io_bp_override == NULL);
   2171 
   2172 	if (!BP_IS_HOLE(bp))
   2173 		metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
   2174 
   2175 	if (gn != NULL) {
   2176 		for (int g = 0; g < SPA_GBH_NBLKPTRS; g++) {
   2177 			zio_dva_unallocate(zio, gn->gn_child[g],
   2178 			    &gn->gn_gbh->zg_blkptr[g]);
   2179 		}
   2180 	}
   2181 }
   2182 
   2183 /*
   2184  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
   2185  */
   2186 int
   2187 zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, blkptr_t *old_bp,
   2188     uint64_t size, boolean_t use_slog)
   2189 {
   2190 	int error = 1;
   2191 
   2192 	ASSERT(txg > spa_syncing_txg(spa));
   2193 
   2194 	if (use_slog)
   2195 		error = metaslab_alloc(spa, spa_log_class(spa), size,
   2196 		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
   2197 
   2198 	if (error)
   2199 		error = metaslab_alloc(spa, spa_normal_class(spa), size,
   2200 		    new_bp, 1, txg, old_bp, METASLAB_HINTBP_AVOID);
   2201 
   2202 	if (error == 0) {
   2203 		BP_SET_LSIZE(new_bp, size);
   2204 		BP_SET_PSIZE(new_bp, size);
   2205 		BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
   2206 		BP_SET_CHECKSUM(new_bp,
   2207 		    spa_version(spa) >= SPA_VERSION_SLIM_ZIL
   2208 		    ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
   2209 		BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
   2210 		BP_SET_LEVEL(new_bp, 0);
   2211 		BP_SET_DEDUP(new_bp, 0);
   2212 		BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
   2213 	}
   2214 
   2215 	return (error);
   2216 }
   2217 
   2218 /*
   2219  * Free an intent log block.
   2220  */
   2221 void
   2222 zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
   2223 {
   2224 	ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
   2225 	ASSERT(!BP_IS_GANG(bp));
   2226 
   2227 	zio_free(spa, txg, bp);
   2228 }
   2229 
   2230 /*
   2231  * ==========================================================================
   2232  * Read and write to physical devices
   2233  * ==========================================================================
   2234  */
   2235 static int
   2236 zio_vdev_io_start(zio_t *zio)
   2237 {
   2238 	vdev_t *vd = zio->io_vd;
   2239 	uint64_t align;
   2240 	spa_t *spa = zio->io_spa;
   2241 
   2242 	ASSERT(zio->io_error == 0);
   2243 	ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
   2244 
   2245 	if (vd == NULL) {
   2246 		if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
   2247 			spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
   2248 
   2249 		/*
   2250 		 * The mirror_ops handle multiple DVAs in a single BP.
   2251 		 */
   2252 		return (vdev_mirror_ops.vdev_op_io_start(zio));
   2253 	}
   2254 
   2255 	/*
   2256 	 * We keep track of time-sensitive I/Os so that the scan thread
   2257 	 * can quickly react to certain workloads.  In particular, we care
   2258 	 * about non-scrubbing, top-level reads and writes with the following
   2259 	 * characteristics:
   2260 	 * 	- synchronous writes of user data to non-slog devices
   2261 	 *	- any reads of user data
   2262 	 * When these conditions are met, adjust the timestamp of spa_last_io
   2263 	 * which allows the scan thread to adjust its workload accordingly.
   2264 	 */
   2265 	if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
   2266 	    vd == vd->vdev_top && !vd->vdev_islog &&
   2267 	    zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
   2268 	    zio->io_txg != spa_syncing_txg(spa)) {
   2269 		uint64_t old = spa->spa_last_io;
   2270 		uint64_t new = ddi_get_lbolt64();
   2271 		if (old != new)
   2272 			(void) atomic_cas_64(&spa->spa_last_io, old, new);
   2273 	}
   2274 
   2275 	align = 1ULL << vd->vdev_top->vdev_ashift;
   2276 
   2277 	if (P2PHASE(zio->io_size, align) != 0) {
   2278 		uint64_t asize = P2ROUNDUP(zio->io_size, align);
   2279 		char *abuf = zio_buf_alloc(asize);
   2280 		ASSERT(vd == vd->vdev_top);
   2281 		if (zio->io_type == ZIO_TYPE_WRITE) {
   2282 			bcopy(zio->io_data, abuf, zio->io_size);
   2283 			bzero(abuf + zio->io_size, asize - zio->io_size);
   2284 		}
   2285 		zio_push_transform(zio, abuf, asize, asize, zio_subblock);
   2286 	}
   2287 
   2288 	ASSERT(P2PHASE(zio->io_offset, align) == 0);
   2289 	ASSERT(P2PHASE(zio->io_size, align) == 0);
   2290 	VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
   2291 
   2292 	/*
   2293 	 * If this is a repair I/O, and there's no self-healing involved --
   2294 	 * that is, we're just resilvering what we expect to resilver --
   2295 	 * then don't do the I/O unless zio's txg is actually in vd's DTL.
   2296 	 * This prevents spurious resilvering with nested replication.
   2297 	 * For example, given a mirror of mirrors, (A+B)+(C+D), if only
   2298 	 * A is out of date, we'll read from C+D, then use the data to
   2299 	 * resilver A+B -- but we don't actually want to resilver B, just A.
   2300 	 * The top-level mirror has no way to know this, so instead we just
   2301 	 * discard unnecessary repairs as we work our way down the vdev tree.
   2302 	 * The same logic applies to any form of nested replication:
   2303 	 * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
   2304 	 */
   2305 	if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
   2306 	    !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
   2307 	    zio->io_txg != 0 &&	/* not a delegated i/o */
   2308 	    !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
   2309 		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
   2310 		zio_vdev_io_bypass(zio);
   2311 		return (ZIO_PIPELINE_CONTINUE);
   2312 	}
   2313 
   2314 	if (vd->vdev_ops->vdev_op_leaf &&
   2315 	    (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
   2316 
   2317 		if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio) == 0)
   2318 			return (ZIO_PIPELINE_CONTINUE);
   2319 
   2320 		if ((zio = vdev_queue_io(zio)) == NULL)
   2321 			return (ZIO_PIPELINE_STOP);
   2322 
   2323 		if (!vdev_accessible(vd, zio)) {
   2324 			zio->io_error = ENXIO;
   2325 			zio_interrupt(zio);
   2326 			return (ZIO_PIPELINE_STOP);
   2327 		}
   2328 	}
   2329 
   2330 	return (vd->vdev_ops->vdev_op_io_start(zio));
   2331 }
   2332 
   2333 static int
   2334 zio_vdev_io_done(zio_t *zio)
   2335 {
   2336 	vdev_t *vd = zio->io_vd;
   2337 	vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
   2338 	boolean_t unexpected_error = B_FALSE;
   2339 
   2340 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
   2341 		return (ZIO_PIPELINE_STOP);
   2342 
   2343 	ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
   2344 
   2345 	if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
   2346 
   2347 		vdev_queue_io_done(zio);
   2348 
   2349 		if (zio->io_type == ZIO_TYPE_WRITE)
   2350 			vdev_cache_write(zio);
   2351 
   2352 		if (zio_injection_enabled && zio->io_error == 0)
   2353 			zio->io_error = zio_handle_device_injection(vd,
   2354 			    zio, EIO);
   2355 
   2356 		if (zio_injection_enabled && zio->io_error == 0)
   2357 			zio->io_error = zio_handle_label_injection(zio, EIO);
   2358 
   2359 		if (zio->io_error) {
   2360 			if (!vdev_accessible(vd, zio)) {
   2361 				zio->io_error = ENXIO;
   2362 			} else {
   2363 				unexpected_error = B_TRUE;
   2364 			}
   2365 		}
   2366 	}
   2367 
   2368 	ops->vdev_op_io_done(zio);
   2369 
   2370 	if (unexpected_error)
   2371 		VERIFY(vdev_probe(vd, zio) == NULL);
   2372 
   2373 	return (ZIO_PIPELINE_CONTINUE);
   2374 }
   2375 
   2376 /*
   2377  * For non-raidz ZIOs, we can just copy aside the bad data read from the
   2378  * disk, and use that to finish the checksum ereport later.
   2379  */
   2380 static void
   2381 zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
   2382     const void *good_buf)
   2383 {
   2384 	/* no processing needed */
   2385 	zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
   2386 }
   2387 
   2388 /*ARGSUSED*/
   2389 void
   2390 zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
   2391 {
   2392 	void *buf = zio_buf_alloc(zio->io_size);
   2393 
   2394 	bcopy(zio->io_data, buf, zio->io_size);
   2395 
   2396 	zcr->zcr_cbinfo = zio->io_size;
   2397 	zcr->zcr_cbdata = buf;
   2398 	zcr->zcr_finish = zio_vsd_default_cksum_finish;
   2399 	zcr->zcr_free = zio_buf_free;
   2400 }
   2401 
   2402 static int
   2403 zio_vdev_io_assess(zio_t *zio)
   2404 {
   2405 	vdev_t *vd = zio->io_vd;
   2406 
   2407 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
   2408 		return (ZIO_PIPELINE_STOP);
   2409 
   2410 	if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
   2411 		spa_config_exit(zio->io_spa, SCL_ZIO, zio);
   2412 
   2413 	if (zio->io_vsd != NULL) {
   2414 		zio->io_vsd_ops->vsd_free(zio);
   2415 		zio->io_vsd = NULL;
   2416 	}
   2417 
   2418 	if (zio_injection_enabled && zio->io_error == 0)
   2419 		zio->io_error = zio_handle_fault_injection(zio, EIO);
   2420 
   2421 	/*
   2422 	 * If the I/O failed, determine whether we should attempt to retry it.
   2423 	 *
   2424 	 * On retry, we cut in line in the issue queue, since we don't want
   2425 	 * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
   2426 	 */
   2427 	if (zio->io_error && vd == NULL &&
   2428 	    !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
   2429 		ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE));	/* not a leaf */
   2430 		ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));	/* not a leaf */
   2431 		zio->io_error = 0;
   2432 		zio->io_flags |= ZIO_FLAG_IO_RETRY |
   2433 		    ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
   2434 		zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
   2435 		zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
   2436 		    zio_requeue_io_start_cut_in_line);
   2437 		return (ZIO_PIPELINE_STOP);
   2438 	}
   2439 
   2440 	/*
   2441 	 * If we got an error on a leaf device, convert it to ENXIO
   2442 	 * if the device is not accessible at all.
   2443 	 */
   2444 	if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
   2445 	    !vdev_accessible(vd, zio))
   2446 		zio->io_error = ENXIO;
   2447 
   2448 	/*
   2449 	 * If we can't write to an interior vdev (mirror or RAID-Z),
   2450 	 * set vdev_cant_write so that we stop trying to allocate from it.
   2451 	 */
   2452 	if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
   2453 	    vd != NULL && !vd->vdev_ops->vdev_op_leaf)
   2454 		vd->vdev_cant_write = B_TRUE;
   2455 
   2456 	if (zio->io_error)
   2457 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   2458 
   2459 	return (ZIO_PIPELINE_CONTINUE);
   2460 }
   2461 
   2462 void
   2463 zio_vdev_io_reissue(zio_t *zio)
   2464 {
   2465 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
   2466 	ASSERT(zio->io_error == 0);
   2467 
   2468 	zio->io_stage >>= 1;
   2469 }
   2470 
   2471 void
   2472 zio_vdev_io_redone(zio_t *zio)
   2473 {
   2474 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
   2475 
   2476 	zio->io_stage >>= 1;
   2477 }
   2478 
   2479 void
   2480 zio_vdev_io_bypass(zio_t *zio)
   2481 {
   2482 	ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
   2483 	ASSERT(zio->io_error == 0);
   2484 
   2485 	zio->io_flags |= ZIO_FLAG_IO_BYPASS;
   2486 	zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
   2487 }
   2488 
   2489 /*
   2490  * ==========================================================================
   2491  * Generate and verify checksums
   2492  * ==========================================================================
   2493  */
   2494 static int
   2495 zio_checksum_generate(zio_t *zio)
   2496 {
   2497 	blkptr_t *bp = zio->io_bp;
   2498 	enum zio_checksum checksum;
   2499 
   2500 	if (bp == NULL) {
   2501 		/*
   2502 		 * This is zio_write_phys().
   2503 		 * We're either generating a label checksum, or none at all.
   2504 		 */
   2505 		checksum = zio->io_prop.zp_checksum;
   2506 
   2507 		if (checksum == ZIO_CHECKSUM_OFF)
   2508 			return (ZIO_PIPELINE_CONTINUE);
   2509 
   2510 		ASSERT(checksum == ZIO_CHECKSUM_LABEL);
   2511 	} else {
   2512 		if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
   2513 			ASSERT(!IO_IS_ALLOCATING(zio));
   2514 			checksum = ZIO_CHECKSUM_GANG_HEADER;
   2515 		} else {
   2516 			checksum = BP_GET_CHECKSUM(bp);
   2517 		}
   2518 	}
   2519 
   2520 	zio_checksum_compute(zio, checksum, zio->io_data, zio->io_size);
   2521 
   2522 	return (ZIO_PIPELINE_CONTINUE);
   2523 }
   2524 
   2525 static int
   2526 zio_checksum_verify(zio_t *zio)
   2527 {
   2528 	zio_bad_cksum_t info;
   2529 	blkptr_t *bp = zio->io_bp;
   2530 	int error;
   2531 
   2532 	ASSERT(zio->io_vd != NULL);
   2533 
   2534 	if (bp == NULL) {
   2535 		/*
   2536 		 * This is zio_read_phys().
   2537 		 * We're either verifying a label checksum, or nothing at all.
   2538 		 */
   2539 		if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
   2540 			return (ZIO_PIPELINE_CONTINUE);
   2541 
   2542 		ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
   2543 	}
   2544 
   2545 	if ((error = zio_checksum_error(zio, &info)) != 0) {
   2546 		zio->io_error = error;
   2547 		if (!(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
   2548 			zfs_ereport_start_checksum(zio->io_spa,
   2549 			    zio->io_vd, zio, zio->io_offset,
   2550 			    zio->io_size, NULL, &info);
   2551 		}
   2552 	}
   2553 
   2554 	return (ZIO_PIPELINE_CONTINUE);
   2555 }
   2556 
   2557 /*
   2558  * Called by RAID-Z to ensure we don't compute the checksum twice.
   2559  */
   2560 void
   2561 zio_checksum_verified(zio_t *zio)
   2562 {
   2563 	zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
   2564 }
   2565 
   2566 /*
   2567  * ==========================================================================
   2568  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
   2569  * An error of 0 indictes success.  ENXIO indicates whole-device failure,
   2570  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
   2571  * indicate errors that are specific to one I/O, and most likely permanent.
   2572  * Any other error is presumed to be worse because we weren't expecting it.
   2573  * ==========================================================================
   2574  */
   2575 int
   2576 zio_worst_error(int e1, int e2)
   2577 {
   2578 	static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
   2579 	int r1, r2;
   2580 
   2581 	for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
   2582 		if (e1 == zio_error_rank[r1])
   2583 			break;
   2584 
   2585 	for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
   2586 		if (e2 == zio_error_rank[r2])
   2587 			break;
   2588 
   2589 	return (r1 > r2 ? e1 : e2);
   2590 }
   2591 
   2592 /*
   2593  * ==========================================================================
   2594  * I/O completion
   2595  * ==========================================================================
   2596  */
   2597 static int
   2598 zio_ready(zio_t *zio)
   2599 {
   2600 	blkptr_t *bp = zio->io_bp;
   2601 	zio_t *pio, *pio_next;
   2602 
   2603 	if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
   2604 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
   2605 		return (ZIO_PIPELINE_STOP);
   2606 
   2607 	if (zio->io_ready) {
   2608 		ASSERT(IO_IS_ALLOCATING(zio));
   2609 		ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
   2610 		ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
   2611 
   2612 		zio->io_ready(zio);
   2613 	}
   2614 
   2615 	if (bp != NULL && bp != &zio->io_bp_copy)
   2616 		zio->io_bp_copy = *bp;
   2617 
   2618 	if (zio->io_error)
   2619 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
   2620 
   2621 	mutex_enter(&zio->io_lock);
   2622 	zio->io_state[ZIO_WAIT_READY] = 1;
   2623 	pio = zio_walk_parents(zio);
   2624 	mutex_exit(&zio->io_lock);
   2625 
   2626 	/*
   2627 	 * As we notify zio's parents, new parents could be added.
   2628 	 * New parents go to the head of zio's io_parent_list, however,
   2629 	 * so we will (correctly) not notify them.  The remainder of zio's
   2630 	 * io_parent_list, from 'pio_next' onward, cannot change because
   2631 	 * all parents must wait for us to be done before they can be done.
   2632 	 */
   2633 	for (; pio != NULL; pio = pio_next) {
   2634 		pio_next = zio_walk_parents(zio);
   2635 		zio_notify_parent(pio, zio, ZIO_WAIT_READY);
   2636 	}
   2637 
   2638 	if (zio->io_flags & ZIO_FLAG_NODATA) {
   2639 		if (BP_IS_GANG(bp)) {
   2640 			zio->io_flags &= ~ZIO_FLAG_NODATA;
   2641 		} else {
   2642 			ASSERT((uintptr_t)zio->io_data < SPA_MAXBLOCKSIZE);
   2643 			zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
   2644 		}
   2645 	}
   2646 
   2647 	if (zio_injection_enabled &&
   2648 	    zio->io_spa->spa_syncing_txg == zio->io_txg)
   2649 		zio_handle_ignored_writes(zio);
   2650 
   2651 	return (ZIO_PIPELINE_CONTINUE);
   2652 }
   2653 
   2654 static int
   2655 zio_done(zio_t *zio)
   2656 {
   2657 	spa_t *spa = zio->io_spa;
   2658 	zio_t *lio = zio->io_logical;
   2659 	blkptr_t *bp = zio->io_bp;
   2660 	vdev_t *vd = zio->io_vd;
   2661 	uint64_t psize = zio->io_size;
   2662 	zio_t *pio, *pio_next;
   2663 
   2664 	/*
   2665 	 * If our children haven't all completed,
   2666 	 * wait for them and then repeat this pipeline stage.
   2667 	 */
   2668 	if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
   2669 	    zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
   2670 	    zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
   2671 	    zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
   2672 		return (ZIO_PIPELINE_STOP);
   2673 
   2674 	for (int c = 0; c < ZIO_CHILD_TYPES; c++)
   2675 		for (int w = 0; w < ZIO_WAIT_TYPES; w++)
   2676 			ASSERT(zio->io_children[c][w] == 0);
   2677 
   2678 	if (bp != NULL) {
   2679 		ASSERT(bp->blk_pad[0] == 0);
   2680 		ASSERT(bp->blk_pad[1] == 0);
   2681 		ASSERT(bcmp(bp, &zio->io_bp_copy, sizeof (blkptr_t)) == 0 ||
   2682 		    (bp == zio_unique_parent(zio)->io_bp));
   2683 		if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(bp) &&
   2684 		    zio->io_bp_override == NULL &&
   2685 		    !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
   2686 			ASSERT(!BP_SHOULD_BYTESWAP(bp));
   2687 			ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(bp));
   2688 			ASSERT(BP_COUNT_GANG(bp) == 0 ||
   2689 			    (BP_COUNT_GANG(bp) == BP_GET_NDVAS(bp)));
   2690 		}
   2691 	}
   2692 
   2693 	/*
   2694 	 * If there were child vdev/gang/ddt errors, they apply to us now.
   2695 	 */
   2696 	zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
   2697 	zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
   2698 	zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
   2699 
   2700 	/*
   2701 	 * If the I/O on the transformed data was successful, generate any
   2702 	 * checksum reports now while we still have the transformed data.
   2703 	 */
   2704 	if (zio->io_error == 0) {
   2705 		while (zio->io_cksum_report != NULL) {
   2706 			zio_cksum_report_t *zcr = zio->io_cksum_report;
   2707 			uint64_t align = zcr->zcr_align;
   2708 			uint64_t asize = P2ROUNDUP(psize, align);
   2709 			char *abuf = zio->io_data;
   2710 
   2711 			if (asize != psize) {
   2712 				abuf = zio_buf_alloc(asize);
   2713 				bcopy(zio->io_data, abuf, psize);
   2714 				bzero(abuf + psize, asize - psize);
   2715 			}
   2716 
   2717 			zio->io_cksum_report = zcr->zcr_next;
   2718 			zcr->zcr_next = NULL;
   2719 			zcr->zcr_finish(zcr, abuf);
   2720 			zfs_ereport_free_checksum(zcr);
   2721 
   2722 			if (asize != psize)
   2723 				zio_buf_free(abuf, asize);
   2724 		}
   2725 	}
   2726 
   2727 	zio_pop_transforms(zio);	/* note: may set zio->io_error */
   2728 
   2729 	vdev_stat_update(zio, psize);
   2730 
   2731 	if (zio->io_error) {
   2732 		/*
   2733 		 * If this I/O is attached to a particular vdev,
   2734 		 * generate an error message describing the I/O failure
   2735 		 * at the block level.  We ignore these errors if the
   2736 		 * device is currently unavailable.
   2737 		 */
   2738 		if (zio->io_error != ECKSUM && vd != NULL && !vdev_is_dead(vd))
   2739 			zfs_ereport_post(FM_EREPORT_ZFS_IO, spa, vd, zio, 0, 0);
   2740 
   2741 		if ((zio->io_error == EIO || !(zio->io_flags &
   2742 		    (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
   2743 		    zio == lio) {
   2744 			/*
   2745 			 * For logical I/O requests, tell the SPA to log the
   2746 			 * error and generate a logical data ereport.
   2747 			 */
   2748 			spa_log_error(spa, zio);
   2749 			zfs_ereport_post(FM_EREPORT_ZFS_DATA, spa, NULL, zio,
   2750 			    0, 0);
   2751 		}
   2752 	}
   2753 
   2754 	if (zio->io_error && zio == lio) {
   2755 		/*
   2756 		 * Determine whether zio should be reexecuted.  This will
   2757 		 * propagate all the way to the root via zio_notify_parent().
   2758 		 */
   2759 		ASSERT(vd == NULL && bp != NULL);
   2760 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   2761 
   2762 		if (IO_IS_ALLOCATING(zio) &&
   2763 		    !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
   2764 			if (zio->io_error != ENOSPC)
   2765 				zio->io_reexecute |= ZIO_REEXECUTE_NOW;
   2766 			else
   2767 				zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
   2768 		}
   2769 
   2770 		if ((zio->io_type == ZIO_TYPE_READ ||
   2771 		    zio->io_type == ZIO_TYPE_FREE) &&
   2772 		    !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
   2773 		    zio->io_error == ENXIO &&
   2774 		    spa_load_state(spa) == SPA_LOAD_NONE &&
   2775 		    spa_get_failmode(spa) != ZIO_FAILURE_MODE_CONTINUE)
   2776 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
   2777 
   2778 		if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
   2779 			zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
   2780 
   2781 		/*
   2782 		 * Here is a possibly good place to attempt to do
   2783 		 * either combinatorial reconstruction or error correction
   2784 		 * based on checksums.  It also might be a good place
   2785 		 * to send out preliminary ereports before we suspend
   2786 		 * processing.
   2787 		 */
   2788 	}
   2789 
   2790 	/*
   2791 	 * If there were logical child errors, they apply to us now.
   2792 	 * We defer this until now to avoid conflating logical child
   2793 	 * errors with errors that happened to the zio itself when
   2794 	 * updating vdev stats and reporting FMA events above.
   2795 	 */
   2796 	zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
   2797 
   2798 	if ((zio->io_error || zio->io_reexecute) &&
   2799 	    IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
   2800 	    !(zio->io_flags & ZIO_FLAG_IO_REWRITE))
   2801 		zio_dva_unallocate(zio, zio->io_gang_tree, bp);
   2802 
   2803 	zio_gang_tree_free(&zio->io_gang_tree);
   2804 
   2805 	/*
   2806 	 * Godfather I/Os should never suspend.
   2807 	 */
   2808 	if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
   2809 	    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
   2810 		zio->io_reexecute = 0;
   2811 
   2812 	if (zio->io_reexecute) {
   2813 		/*
   2814 		 * This is a logical I/O that wants to reexecute.
   2815 		 *
   2816 		 * Reexecute is top-down.  When an i/o fails, if it's not
   2817 		 * the root, it simply notifies its parent and sticks around.
   2818 		 * The parent, seeing that it still has children in zio_done(),
   2819 		 * does the same.  This percolates all the way up to the root.
   2820 		 * The root i/o will reexecute or suspend the entire tree.
   2821 		 *
   2822 		 * This approach ensures that zio_reexecute() honors
   2823 		 * all the original i/o dependency relationships, e.g.
   2824 		 * parents not executing until children are ready.
   2825 		 */
   2826 		ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
   2827 
   2828 		zio->io_gang_leader = NULL;
   2829 
   2830 		mutex_enter(&zio->io_lock);
   2831 		zio->io_state[ZIO_WAIT_DONE] = 1;
   2832 		mutex_exit(&zio->io_lock);
   2833 
   2834 		/*
   2835 		 * "The Godfather" I/O monitors its children but is
   2836 		 * not a true parent to them. It will track them through
   2837 		 * the pipeline but severs its ties whenever they get into
   2838 		 * trouble (e.g. suspended). This allows "The Godfather"
   2839 		 * I/O to return status without blocking.
   2840 		 */
   2841 		for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
   2842 			zio_link_t *zl = zio->io_walk_link;
   2843 			pio_next = zio_walk_parents(zio);
   2844 
   2845 			if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
   2846 			    (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
   2847 				zio_remove_child(pio, zio, zl);
   2848 				zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
   2849 			}
   2850 		}
   2851 
   2852 		if ((pio = zio_unique_parent(zio)) != NULL) {
   2853 			/*
   2854 			 * We're not a root i/o, so there's nothing to do
   2855 			 * but notify our parent.  Don't propagate errors
   2856 			 * upward since we haven't permanently failed yet.
   2857 			 */
   2858 			ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
   2859 			zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
   2860 			zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
   2861 		} else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
   2862 			/*
   2863 			 * We'd fail again if we reexecuted now, so suspend
   2864 			 * until conditions improve (e.g. device comes online).
   2865 			 */
   2866 			zio_suspend(spa, zio);
   2867 		} else {
   2868 			/*
   2869 			 * Reexecution is potentially a huge amount of work.
   2870 			 * Hand it off to the otherwise-unused claim taskq.
   2871 			 */
   2872 			(void) taskq_dispatch(
   2873 			    spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
   2874 			    (task_func_t *)zio_reexecute, zio, TQ_SLEEP);
   2875 		}
   2876 		return (ZIO_PIPELINE_STOP);
   2877 	}
   2878 
   2879 	ASSERT(zio->io_child_count == 0);
   2880 	ASSERT(zio->io_reexecute == 0);
   2881 	ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
   2882 
   2883 	/*
   2884 	 * Report any checksum errors, since the I/O is complete.
   2885 	 */
   2886 	while (zio->io_cksum_report != NULL) {
   2887 		zio_cksum_report_t *zcr = zio->io_cksum_report;
   2888 		zio->io_cksum_report = zcr->zcr_next;
   2889 		zcr->zcr_next = NULL;
   2890 		zcr->zcr_finish(zcr, NULL);
   2891 		zfs_ereport_free_checksum(zcr);
   2892 	}
   2893 
   2894 	/*
   2895 	 * It is the responsibility of the done callback to ensure that this
   2896 	 * particular zio is no longer discoverable for adoption, and as
   2897 	 * such, cannot acquire any new parents.
   2898 	 */
   2899 	if (zio->io_done)
   2900 		zio->io_done(zio);
   2901 
   2902 	mutex_enter(&zio->io_lock);
   2903 	zio->io_state[ZIO_WAIT_DONE] = 1;
   2904 	mutex_exit(&zio->io_lock);
   2905 
   2906 	for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
   2907 		zio_link_t *zl = zio->io_walk_link;
   2908 		pio_next = zio_walk_parents(zio);
   2909 		zio_remove_child(pio, zio, zl);
   2910 		zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
   2911 	}
   2912 
   2913 	if (zio->io_waiter != NULL) {
   2914 		mutex_enter(&zio->io_lock);
   2915 		zio->io_executor = NULL;
   2916 		cv_broadcast(&zio->io_cv);
   2917 		mutex_exit(&zio->io_lock);
   2918 	} else {
   2919 		zio_destroy(zio);
   2920 	}
   2921 
   2922 	return (ZIO_PIPELINE_STOP);
   2923 }
   2924 
   2925 /*
   2926  * ==========================================================================
   2927  * I/O pipeline definition
   2928  * ==========================================================================
   2929  */
   2930 static zio_pipe_stage_t *zio_pipeline[] = {
   2931 	NULL,
   2932 	zio_read_bp_init,
   2933 	zio_free_bp_init,
   2934 	zio_issue_async,
   2935 	zio_write_bp_init,
   2936 	zio_checksum_generate,
   2937 	zio_ddt_read_start,
   2938 	zio_ddt_read_done,
   2939 	zio_ddt_write,
   2940 	zio_ddt_free,
   2941 	zio_gang_assemble,
   2942 	zio_gang_issue,
   2943 	zio_dva_allocate,
   2944 	zio_dva_free,
   2945 	zio_dva_claim,
   2946 	zio_ready,
   2947 	zio_vdev_io_start,
   2948 	zio_vdev_io_done,
   2949 	zio_vdev_io_assess,
   2950 	zio_checksum_verify,
   2951 	zio_done
   2952 };
   2953