Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/dmu_objset.h>
     27 #include <sys/dsl_dataset.h>
     28 #include <sys/dsl_dir.h>
     29 #include <sys/dsl_prop.h>
     30 #include <sys/dsl_synctask.h>
     31 #include <sys/dmu_traverse.h>
     32 #include <sys/dmu_tx.h>
     33 #include <sys/arc.h>
     34 #include <sys/zio.h>
     35 #include <sys/zap.h>
     36 #include <sys/unique.h>
     37 #include <sys/zfs_context.h>
     38 #include <sys/zfs_ioctl.h>
     39 #include <sys/spa.h>
     40 #include <sys/zfs_znode.h>
     41 #include <sys/zvol.h>
     42 
     43 static char *dsl_reaper = "the grim reaper";
     44 
     45 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
     46 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
     47 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
     48 
     49 #define	DS_REF_MAX	(1ULL << 62)
     50 
     51 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
     52 
     53 #define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
     54 
     55 
     56 /*
     57  * Figure out how much of this delta should be propogated to the dsl_dir
     58  * layer.  If there's a refreservation, that space has already been
     59  * partially accounted for in our ancestors.
     60  */
     61 static int64_t
     62 parent_delta(dsl_dataset_t *ds, int64_t delta)
     63 {
     64 	uint64_t old_bytes, new_bytes;
     65 
     66 	if (ds->ds_reserved == 0)
     67 		return (delta);
     68 
     69 	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
     70 	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
     71 
     72 	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
     73 	return (new_bytes - old_bytes);
     74 }
     75 
     76 void
     77 dsl_dataset_block_born(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx)
     78 {
     79 	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
     80 	int compressed = BP_GET_PSIZE(bp);
     81 	int uncompressed = BP_GET_UCSIZE(bp);
     82 	int64_t delta;
     83 
     84 	dprintf_bp(bp, "born, ds=%p\n", ds);
     85 
     86 	ASSERT(dmu_tx_is_syncing(tx));
     87 	/* It could have been compressed away to nothing */
     88 	if (BP_IS_HOLE(bp))
     89 		return;
     90 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
     91 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
     92 	if (ds == NULL) {
     93 		/*
     94 		 * Account for the meta-objset space in its placeholder
     95 		 * dsl_dir.
     96 		 */
     97 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
     98 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
     99 		    used, compressed, uncompressed, tx);
    100 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    101 		return;
    102 	}
    103 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    104 	mutex_enter(&ds->ds_dir->dd_lock);
    105 	mutex_enter(&ds->ds_lock);
    106 	delta = parent_delta(ds, used);
    107 	ds->ds_phys->ds_used_bytes += used;
    108 	ds->ds_phys->ds_compressed_bytes += compressed;
    109 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
    110 	ds->ds_phys->ds_unique_bytes += used;
    111 	mutex_exit(&ds->ds_lock);
    112 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
    113 	    compressed, uncompressed, tx);
    114 	dsl_dir_transfer_space(ds->ds_dir, used - delta,
    115 	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
    116 	mutex_exit(&ds->ds_dir->dd_lock);
    117 }
    118 
    119 int
    120 dsl_dataset_block_kill(dsl_dataset_t *ds, const blkptr_t *bp, dmu_tx_t *tx,
    121     boolean_t async)
    122 {
    123 	if (BP_IS_HOLE(bp))
    124 		return (0);
    125 
    126 	ASSERT(dmu_tx_is_syncing(tx));
    127 	ASSERT(bp->blk_birth <= tx->tx_txg);
    128 
    129 	int used = bp_get_dsize_sync(tx->tx_pool->dp_spa, bp);
    130 	int compressed = BP_GET_PSIZE(bp);
    131 	int uncompressed = BP_GET_UCSIZE(bp);
    132 
    133 	ASSERT(used > 0);
    134 	if (ds == NULL) {
    135 		/*
    136 		 * Account for the meta-objset space in its placeholder
    137 		 * dataset.
    138 		 */
    139 		dsl_free(tx->tx_pool, tx->tx_txg, bp);
    140 
    141 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
    142 		    -used, -compressed, -uncompressed, tx);
    143 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    144 		return (used);
    145 	}
    146 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
    147 
    148 	ASSERT(!dsl_dataset_is_snapshot(ds));
    149 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    150 
    151 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
    152 		int64_t delta;
    153 
    154 		dprintf_bp(bp, "freeing: %s", "");
    155 		dsl_free(tx->tx_pool, tx->tx_txg, bp);
    156 
    157 		mutex_enter(&ds->ds_dir->dd_lock);
    158 		mutex_enter(&ds->ds_lock);
    159 		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
    160 		    !DS_UNIQUE_IS_ACCURATE(ds));
    161 		delta = parent_delta(ds, -used);
    162 		ds->ds_phys->ds_unique_bytes -= used;
    163 		mutex_exit(&ds->ds_lock);
    164 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
    165 		    delta, -compressed, -uncompressed, tx);
    166 		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
    167 		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
    168 		mutex_exit(&ds->ds_dir->dd_lock);
    169 	} else {
    170 		dprintf_bp(bp, "putting on dead list: %s", "");
    171 		if (async) {
    172 			/*
    173 			 * We are here as part of zio's write done callback,
    174 			 * which means we're a zio interrupt thread.  We can't
    175 			 * call bplist_enqueue() now because it may block
    176 			 * waiting for I/O.  Instead, put bp on the deferred
    177 			 * queue and let dsl_pool_sync() finish the job.
    178 			 */
    179 			bplist_enqueue_deferred(&ds->ds_deadlist, bp);
    180 		} else {
    181 			VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
    182 		}
    183 		ASSERT3U(ds->ds_prev->ds_object, ==,
    184 		    ds->ds_phys->ds_prev_snap_obj);
    185 		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
    186 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
    187 		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
    188 		    ds->ds_object && bp->blk_birth >
    189 		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
    190 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
    191 			mutex_enter(&ds->ds_prev->ds_lock);
    192 			ds->ds_prev->ds_phys->ds_unique_bytes += used;
    193 			mutex_exit(&ds->ds_prev->ds_lock);
    194 		}
    195 		if (bp->blk_birth > ds->ds_origin_txg) {
    196 			dsl_dir_transfer_space(ds->ds_dir, used,
    197 			    DD_USED_HEAD, DD_USED_SNAP, tx);
    198 		}
    199 	}
    200 	mutex_enter(&ds->ds_lock);
    201 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
    202 	ds->ds_phys->ds_used_bytes -= used;
    203 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
    204 	ds->ds_phys->ds_compressed_bytes -= compressed;
    205 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
    206 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
    207 	mutex_exit(&ds->ds_lock);
    208 
    209 	return (used);
    210 }
    211 
    212 uint64_t
    213 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
    214 {
    215 	uint64_t trysnap = 0;
    216 
    217 	if (ds == NULL)
    218 		return (0);
    219 	/*
    220 	 * The snapshot creation could fail, but that would cause an
    221 	 * incorrect FALSE return, which would only result in an
    222 	 * overestimation of the amount of space that an operation would
    223 	 * consume, which is OK.
    224 	 *
    225 	 * There's also a small window where we could miss a pending
    226 	 * snapshot, because we could set the sync task in the quiescing
    227 	 * phase.  So this should only be used as a guess.
    228 	 */
    229 	if (ds->ds_trysnap_txg >
    230 	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
    231 		trysnap = ds->ds_trysnap_txg;
    232 	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
    233 }
    234 
    235 boolean_t
    236 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
    237 {
    238 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
    239 }
    240 
    241 /* ARGSUSED */
    242 static void
    243 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
    244 {
    245 	dsl_dataset_t *ds = dsv;
    246 
    247 	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
    248 
    249 	unique_remove(ds->ds_fsid_guid);
    250 
    251 	if (ds->ds_objset != NULL)
    252 		dmu_objset_evict(ds->ds_objset);
    253 
    254 	if (ds->ds_prev) {
    255 		dsl_dataset_drop_ref(ds->ds_prev, ds);
    256 		ds->ds_prev = NULL;
    257 	}
    258 
    259 	bplist_close(&ds->ds_deadlist);
    260 	if (ds->ds_dir)
    261 		dsl_dir_close(ds->ds_dir, ds);
    262 
    263 	ASSERT(!list_link_active(&ds->ds_synced_link));
    264 
    265 	mutex_destroy(&ds->ds_lock);
    266 	mutex_destroy(&ds->ds_recvlock);
    267 	mutex_destroy(&ds->ds_opening_lock);
    268 	rw_destroy(&ds->ds_rwlock);
    269 	cv_destroy(&ds->ds_exclusive_cv);
    270 	bplist_fini(&ds->ds_deadlist);
    271 
    272 	kmem_free(ds, sizeof (dsl_dataset_t));
    273 }
    274 
    275 static int
    276 dsl_dataset_get_snapname(dsl_dataset_t *ds)
    277 {
    278 	dsl_dataset_phys_t *headphys;
    279 	int err;
    280 	dmu_buf_t *headdbuf;
    281 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    282 	objset_t *mos = dp->dp_meta_objset;
    283 
    284 	if (ds->ds_snapname[0])
    285 		return (0);
    286 	if (ds->ds_phys->ds_next_snap_obj == 0)
    287 		return (0);
    288 
    289 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
    290 	    FTAG, &headdbuf);
    291 	if (err)
    292 		return (err);
    293 	headphys = headdbuf->db_data;
    294 	err = zap_value_search(dp->dp_meta_objset,
    295 	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
    296 	dmu_buf_rele(headdbuf, FTAG);
    297 	return (err);
    298 }
    299 
    300 static int
    301 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
    302 {
    303 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
    304 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
    305 	matchtype_t mt;
    306 	int err;
    307 
    308 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
    309 		mt = MT_FIRST;
    310 	else
    311 		mt = MT_EXACT;
    312 
    313 	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
    314 	    value, mt, NULL, 0, NULL);
    315 	if (err == ENOTSUP && mt == MT_FIRST)
    316 		err = zap_lookup(mos, snapobj, name, 8, 1, value);
    317 	return (err);
    318 }
    319 
    320 static int
    321 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
    322 {
    323 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
    324 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
    325 	matchtype_t mt;
    326 	int err;
    327 
    328 	dsl_dir_snap_cmtime_update(ds->ds_dir);
    329 
    330 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
    331 		mt = MT_FIRST;
    332 	else
    333 		mt = MT_EXACT;
    334 
    335 	err = zap_remove_norm(mos, snapobj, name, mt, tx);
    336 	if (err == ENOTSUP && mt == MT_FIRST)
    337 		err = zap_remove(mos, snapobj, name, tx);
    338 	return (err);
    339 }
    340 
    341 static int
    342 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
    343     dsl_dataset_t **dsp)
    344 {
    345 	objset_t *mos = dp->dp_meta_objset;
    346 	dmu_buf_t *dbuf;
    347 	dsl_dataset_t *ds;
    348 	int err;
    349 
    350 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
    351 	    dsl_pool_sync_context(dp));
    352 
    353 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
    354 	if (err)
    355 		return (err);
    356 	ds = dmu_buf_get_user(dbuf);
    357 	if (ds == NULL) {
    358 		dsl_dataset_t *winner;
    359 
    360 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
    361 		ds->ds_dbuf = dbuf;
    362 		ds->ds_object = dsobj;
    363 		ds->ds_phys = dbuf->db_data;
    364 
    365 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
    366 		mutex_init(&ds->ds_recvlock, NULL, MUTEX_DEFAULT, NULL);
    367 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
    368 		rw_init(&ds->ds_rwlock, 0, 0, 0);
    369 		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
    370 		bplist_init(&ds->ds_deadlist);
    371 
    372 		err = bplist_open(&ds->ds_deadlist,
    373 		    mos, ds->ds_phys->ds_deadlist_obj);
    374 		if (err == 0) {
    375 			err = dsl_dir_open_obj(dp,
    376 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
    377 		}
    378 		if (err) {
    379 			/*
    380 			 * we don't really need to close the blist if we
    381 			 * just opened it.
    382 			 */
    383 			mutex_destroy(&ds->ds_lock);
    384 			mutex_destroy(&ds->ds_recvlock);
    385 			mutex_destroy(&ds->ds_opening_lock);
    386 			rw_destroy(&ds->ds_rwlock);
    387 			cv_destroy(&ds->ds_exclusive_cv);
    388 			bplist_fini(&ds->ds_deadlist);
    389 			kmem_free(ds, sizeof (dsl_dataset_t));
    390 			dmu_buf_rele(dbuf, tag);
    391 			return (err);
    392 		}
    393 
    394 		if (!dsl_dataset_is_snapshot(ds)) {
    395 			ds->ds_snapname[0] = '\0';
    396 			if (ds->ds_phys->ds_prev_snap_obj) {
    397 				err = dsl_dataset_get_ref(dp,
    398 				    ds->ds_phys->ds_prev_snap_obj,
    399 				    ds, &ds->ds_prev);
    400 			}
    401 
    402 			if (err == 0 && dsl_dir_is_clone(ds->ds_dir)) {
    403 				dsl_dataset_t *origin;
    404 
    405 				err = dsl_dataset_hold_obj(dp,
    406 				    ds->ds_dir->dd_phys->dd_origin_obj,
    407 				    FTAG, &origin);
    408 				if (err == 0) {
    409 					ds->ds_origin_txg =
    410 					    origin->ds_phys->ds_creation_txg;
    411 					dsl_dataset_rele(origin, FTAG);
    412 				}
    413 			}
    414 		} else {
    415 			if (zfs_flags & ZFS_DEBUG_SNAPNAMES)
    416 				err = dsl_dataset_get_snapname(ds);
    417 			if (err == 0 && ds->ds_phys->ds_userrefs_obj != 0) {
    418 				err = zap_count(
    419 				    ds->ds_dir->dd_pool->dp_meta_objset,
    420 				    ds->ds_phys->ds_userrefs_obj,
    421 				    &ds->ds_userrefs);
    422 			}
    423 		}
    424 
    425 		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
    426 			/*
    427 			 * In sync context, we're called with either no lock
    428 			 * or with the write lock.  If we're not syncing,
    429 			 * we're always called with the read lock held.
    430 			 */
    431 			boolean_t need_lock =
    432 			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
    433 			    dsl_pool_sync_context(dp);
    434 
    435 			if (need_lock)
    436 				rw_enter(&dp->dp_config_rwlock, RW_READER);
    437 
    438 			err = dsl_prop_get_ds(ds,
    439 			    "refreservation", sizeof (uint64_t), 1,
    440 			    &ds->ds_reserved, NULL);
    441 			if (err == 0) {
    442 				err = dsl_prop_get_ds(ds,
    443 				    "refquota", sizeof (uint64_t), 1,
    444 				    &ds->ds_quota, NULL);
    445 			}
    446 
    447 			if (need_lock)
    448 				rw_exit(&dp->dp_config_rwlock);
    449 		} else {
    450 			ds->ds_reserved = ds->ds_quota = 0;
    451 		}
    452 
    453 		if (err == 0) {
    454 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
    455 			    dsl_dataset_evict);
    456 		}
    457 		if (err || winner) {
    458 			bplist_close(&ds->ds_deadlist);
    459 			if (ds->ds_prev)
    460 				dsl_dataset_drop_ref(ds->ds_prev, ds);
    461 			dsl_dir_close(ds->ds_dir, ds);
    462 			mutex_destroy(&ds->ds_lock);
    463 			mutex_destroy(&ds->ds_recvlock);
    464 			mutex_destroy(&ds->ds_opening_lock);
    465 			rw_destroy(&ds->ds_rwlock);
    466 			cv_destroy(&ds->ds_exclusive_cv);
    467 			bplist_fini(&ds->ds_deadlist);
    468 			kmem_free(ds, sizeof (dsl_dataset_t));
    469 			if (err) {
    470 				dmu_buf_rele(dbuf, tag);
    471 				return (err);
    472 			}
    473 			ds = winner;
    474 		} else {
    475 			ds->ds_fsid_guid =
    476 			    unique_insert(ds->ds_phys->ds_fsid_guid);
    477 		}
    478 	}
    479 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
    480 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
    481 	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
    482 	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
    483 	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
    484 	mutex_enter(&ds->ds_lock);
    485 	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
    486 		mutex_exit(&ds->ds_lock);
    487 		dmu_buf_rele(ds->ds_dbuf, tag);
    488 		return (ENOENT);
    489 	}
    490 	mutex_exit(&ds->ds_lock);
    491 	*dsp = ds;
    492 	return (0);
    493 }
    494 
    495 static int
    496 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
    497 {
    498 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    499 
    500 	/*
    501 	 * In syncing context we don't want the rwlock lock: there
    502 	 * may be an existing writer waiting for sync phase to
    503 	 * finish.  We don't need to worry about such writers, since
    504 	 * sync phase is single-threaded, so the writer can't be
    505 	 * doing anything while we are active.
    506 	 */
    507 	if (dsl_pool_sync_context(dp)) {
    508 		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
    509 		return (0);
    510 	}
    511 
    512 	/*
    513 	 * Normal users will hold the ds_rwlock as a READER until they
    514 	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
    515 	 * drop their READER lock after they set the ds_owner field.
    516 	 *
    517 	 * If the dataset is being destroyed, the destroy thread will
    518 	 * obtain a WRITER lock for exclusive access after it's done its
    519 	 * open-context work and then change the ds_owner to
    520 	 * dsl_reaper once destruction is assured.  So threads
    521 	 * may block here temporarily, until the "destructability" of
    522 	 * the dataset is determined.
    523 	 */
    524 	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
    525 	mutex_enter(&ds->ds_lock);
    526 	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
    527 		rw_exit(&dp->dp_config_rwlock);
    528 		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
    529 		if (DSL_DATASET_IS_DESTROYED(ds)) {
    530 			mutex_exit(&ds->ds_lock);
    531 			dsl_dataset_drop_ref(ds, tag);
    532 			rw_enter(&dp->dp_config_rwlock, RW_READER);
    533 			return (ENOENT);
    534 		}
    535 		/*
    536 		 * The dp_config_rwlock lives above the ds_lock. And
    537 		 * we need to check DSL_DATASET_IS_DESTROYED() while
    538 		 * holding the ds_lock, so we have to drop and reacquire
    539 		 * the ds_lock here.
    540 		 */
    541 		mutex_exit(&ds->ds_lock);
    542 		rw_enter(&dp->dp_config_rwlock, RW_READER);
    543 		mutex_enter(&ds->ds_lock);
    544 	}
    545 	mutex_exit(&ds->ds_lock);
    546 	return (0);
    547 }
    548 
    549 int
    550 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
    551     dsl_dataset_t **dsp)
    552 {
    553 	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
    554 
    555 	if (err)
    556 		return (err);
    557 	return (dsl_dataset_hold_ref(*dsp, tag));
    558 }
    559 
    560 int
    561 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, boolean_t inconsistentok,
    562     void *tag, dsl_dataset_t **dsp)
    563 {
    564 	int err = dsl_dataset_hold_obj(dp, dsobj, tag, dsp);
    565 	if (err)
    566 		return (err);
    567 	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
    568 		dsl_dataset_rele(*dsp, tag);
    569 		*dsp = NULL;
    570 		return (EBUSY);
    571 	}
    572 	return (0);
    573 }
    574 
    575 int
    576 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
    577 {
    578 	dsl_dir_t *dd;
    579 	dsl_pool_t *dp;
    580 	const char *snapname;
    581 	uint64_t obj;
    582 	int err = 0;
    583 
    584 	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
    585 	if (err)
    586 		return (err);
    587 
    588 	dp = dd->dd_pool;
    589 	obj = dd->dd_phys->dd_head_dataset_obj;
    590 	rw_enter(&dp->dp_config_rwlock, RW_READER);
    591 	if (obj)
    592 		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
    593 	else
    594 		err = ENOENT;
    595 	if (err)
    596 		goto out;
    597 
    598 	err = dsl_dataset_hold_ref(*dsp, tag);
    599 
    600 	/* we may be looking for a snapshot */
    601 	if (err == 0 && snapname != NULL) {
    602 		dsl_dataset_t *ds = NULL;
    603 
    604 		if (*snapname++ != '@') {
    605 			dsl_dataset_rele(*dsp, tag);
    606 			err = ENOENT;
    607 			goto out;
    608 		}
    609 
    610 		dprintf("looking for snapshot '%s'\n", snapname);
    611 		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
    612 		if (err == 0)
    613 			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
    614 		dsl_dataset_rele(*dsp, tag);
    615 
    616 		ASSERT3U((err == 0), ==, (ds != NULL));
    617 
    618 		if (ds) {
    619 			mutex_enter(&ds->ds_lock);
    620 			if (ds->ds_snapname[0] == 0)
    621 				(void) strlcpy(ds->ds_snapname, snapname,
    622 				    sizeof (ds->ds_snapname));
    623 			mutex_exit(&ds->ds_lock);
    624 			err = dsl_dataset_hold_ref(ds, tag);
    625 			*dsp = err ? NULL : ds;
    626 		}
    627 	}
    628 out:
    629 	rw_exit(&dp->dp_config_rwlock);
    630 	dsl_dir_close(dd, FTAG);
    631 	return (err);
    632 }
    633 
    634 int
    635 dsl_dataset_own(const char *name, boolean_t inconsistentok,
    636     void *tag, dsl_dataset_t **dsp)
    637 {
    638 	int err = dsl_dataset_hold(name, tag, dsp);
    639 	if (err)
    640 		return (err);
    641 	if (!dsl_dataset_tryown(*dsp, inconsistentok, tag)) {
    642 		dsl_dataset_rele(*dsp, tag);
    643 		return (EBUSY);
    644 	}
    645 	return (0);
    646 }
    647 
    648 void
    649 dsl_dataset_name(dsl_dataset_t *ds, char *name)
    650 {
    651 	if (ds == NULL) {
    652 		(void) strcpy(name, "mos");
    653 	} else {
    654 		dsl_dir_name(ds->ds_dir, name);
    655 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    656 		if (ds->ds_snapname[0]) {
    657 			(void) strcat(name, "@");
    658 			/*
    659 			 * We use a "recursive" mutex so that we
    660 			 * can call dprintf_ds() with ds_lock held.
    661 			 */
    662 			if (!MUTEX_HELD(&ds->ds_lock)) {
    663 				mutex_enter(&ds->ds_lock);
    664 				(void) strcat(name, ds->ds_snapname);
    665 				mutex_exit(&ds->ds_lock);
    666 			} else {
    667 				(void) strcat(name, ds->ds_snapname);
    668 			}
    669 		}
    670 	}
    671 }
    672 
    673 static int
    674 dsl_dataset_namelen(dsl_dataset_t *ds)
    675 {
    676 	int result;
    677 
    678 	if (ds == NULL) {
    679 		result = 3;	/* "mos" */
    680 	} else {
    681 		result = dsl_dir_namelen(ds->ds_dir);
    682 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    683 		if (ds->ds_snapname[0]) {
    684 			++result;	/* adding one for the @-sign */
    685 			if (!MUTEX_HELD(&ds->ds_lock)) {
    686 				mutex_enter(&ds->ds_lock);
    687 				result += strlen(ds->ds_snapname);
    688 				mutex_exit(&ds->ds_lock);
    689 			} else {
    690 				result += strlen(ds->ds_snapname);
    691 			}
    692 		}
    693 	}
    694 
    695 	return (result);
    696 }
    697 
    698 void
    699 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
    700 {
    701 	dmu_buf_rele(ds->ds_dbuf, tag);
    702 }
    703 
    704 void
    705 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
    706 {
    707 	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
    708 		rw_exit(&ds->ds_rwlock);
    709 	}
    710 	dsl_dataset_drop_ref(ds, tag);
    711 }
    712 
    713 void
    714 dsl_dataset_disown(dsl_dataset_t *ds, void *tag)
    715 {
    716 	ASSERT((ds->ds_owner == tag && ds->ds_dbuf) ||
    717 	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
    718 
    719 	mutex_enter(&ds->ds_lock);
    720 	ds->ds_owner = NULL;
    721 	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
    722 		rw_exit(&ds->ds_rwlock);
    723 		cv_broadcast(&ds->ds_exclusive_cv);
    724 	}
    725 	mutex_exit(&ds->ds_lock);
    726 	if (ds->ds_dbuf)
    727 		dsl_dataset_drop_ref(ds, tag);
    728 	else
    729 		dsl_dataset_evict(ds->ds_dbuf, ds);
    730 }
    731 
    732 boolean_t
    733 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *tag)
    734 {
    735 	boolean_t gotit = FALSE;
    736 
    737 	mutex_enter(&ds->ds_lock);
    738 	if (ds->ds_owner == NULL &&
    739 	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
    740 		ds->ds_owner = tag;
    741 		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
    742 			rw_exit(&ds->ds_rwlock);
    743 		gotit = TRUE;
    744 	}
    745 	mutex_exit(&ds->ds_lock);
    746 	return (gotit);
    747 }
    748 
    749 void
    750 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
    751 {
    752 	ASSERT3P(owner, ==, ds->ds_owner);
    753 	if (!RW_WRITE_HELD(&ds->ds_rwlock))
    754 		rw_enter(&ds->ds_rwlock, RW_WRITER);
    755 }
    756 
    757 uint64_t
    758 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
    759     uint64_t flags, dmu_tx_t *tx)
    760 {
    761 	dsl_pool_t *dp = dd->dd_pool;
    762 	dmu_buf_t *dbuf;
    763 	dsl_dataset_phys_t *dsphys;
    764 	uint64_t dsobj;
    765 	objset_t *mos = dp->dp_meta_objset;
    766 
    767 	if (origin == NULL)
    768 		origin = dp->dp_origin_snap;
    769 
    770 	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
    771 	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
    772 	ASSERT(dmu_tx_is_syncing(tx));
    773 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
    774 
    775 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
    776 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
    777 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
    778 	dmu_buf_will_dirty(dbuf, tx);
    779 	dsphys = dbuf->db_data;
    780 	bzero(dsphys, sizeof (dsl_dataset_phys_t));
    781 	dsphys->ds_dir_obj = dd->dd_object;
    782 	dsphys->ds_flags = flags;
    783 	dsphys->ds_fsid_guid = unique_create();
    784 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
    785 	    sizeof (dsphys->ds_guid));
    786 	dsphys->ds_snapnames_zapobj =
    787 	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
    788 	    DMU_OT_NONE, 0, tx);
    789 	dsphys->ds_creation_time = gethrestime_sec();
    790 	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
    791 	dsphys->ds_deadlist_obj =
    792 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
    793 
    794 	if (origin) {
    795 		dsphys->ds_prev_snap_obj = origin->ds_object;
    796 		dsphys->ds_prev_snap_txg =
    797 		    origin->ds_phys->ds_creation_txg;
    798 		dsphys->ds_used_bytes =
    799 		    origin->ds_phys->ds_used_bytes;
    800 		dsphys->ds_compressed_bytes =
    801 		    origin->ds_phys->ds_compressed_bytes;
    802 		dsphys->ds_uncompressed_bytes =
    803 		    origin->ds_phys->ds_uncompressed_bytes;
    804 		dsphys->ds_bp = origin->ds_phys->ds_bp;
    805 		dsphys->ds_flags |= origin->ds_phys->ds_flags;
    806 
    807 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
    808 		origin->ds_phys->ds_num_children++;
    809 
    810 		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
    811 			if (origin->ds_phys->ds_next_clones_obj == 0) {
    812 				origin->ds_phys->ds_next_clones_obj =
    813 				    zap_create(mos,
    814 				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
    815 			}
    816 			VERIFY(0 == zap_add_int(mos,
    817 			    origin->ds_phys->ds_next_clones_obj,
    818 			    dsobj, tx));
    819 		}
    820 
    821 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
    822 		dd->dd_phys->dd_origin_obj = origin->ds_object;
    823 	}
    824 
    825 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
    826 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
    827 
    828 	dmu_buf_rele(dbuf, FTAG);
    829 
    830 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
    831 	dd->dd_phys->dd_head_dataset_obj = dsobj;
    832 
    833 	return (dsobj);
    834 }
    835 
    836 uint64_t
    837 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
    838     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
    839 {
    840 	dsl_pool_t *dp = pdd->dd_pool;
    841 	uint64_t dsobj, ddobj;
    842 	dsl_dir_t *dd;
    843 
    844 	ASSERT(lastname[0] != '@');
    845 
    846 	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
    847 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
    848 
    849 	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
    850 
    851 	dsl_deleg_set_create_perms(dd, tx, cr);
    852 
    853 	dsl_dir_close(dd, FTAG);
    854 
    855 	return (dsobj);
    856 }
    857 
    858 struct destroyarg {
    859 	dsl_sync_task_group_t *dstg;
    860 	char *snapname;
    861 	char *failed;
    862 	boolean_t defer;
    863 };
    864 
    865 static int
    866 dsl_snapshot_destroy_one(const char *name, void *arg)
    867 {
    868 	struct destroyarg *da = arg;
    869 	dsl_dataset_t *ds;
    870 	int err;
    871 	char *dsname;
    872 
    873 	dsname = kmem_asprintf("%s@%s", name, da->snapname);
    874 	err = dsl_dataset_own(dsname, B_TRUE, da->dstg, &ds);
    875 	strfree(dsname);
    876 	if (err == 0) {
    877 		struct dsl_ds_destroyarg *dsda;
    878 
    879 		dsl_dataset_make_exclusive(ds, da->dstg);
    880 		if (ds->ds_objset != NULL) {
    881 			dmu_objset_evict(ds->ds_objset);
    882 			ds->ds_objset = NULL;
    883 		}
    884 		dsda = kmem_zalloc(sizeof (struct dsl_ds_destroyarg), KM_SLEEP);
    885 		dsda->ds = ds;
    886 		dsda->defer = da->defer;
    887 		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
    888 		    dsl_dataset_destroy_sync, dsda, da->dstg, 0);
    889 	} else if (err == ENOENT) {
    890 		err = 0;
    891 	} else {
    892 		(void) strcpy(da->failed, name);
    893 	}
    894 	return (err);
    895 }
    896 
    897 /*
    898  * Destroy 'snapname' in all descendants of 'fsname'.
    899  */
    900 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
    901 int
    902 dsl_snapshots_destroy(char *fsname, char *snapname, boolean_t defer)
    903 {
    904 	int err;
    905 	struct destroyarg da;
    906 	dsl_sync_task_t *dst;
    907 	spa_t *spa;
    908 
    909 	err = spa_open(fsname, &spa, FTAG);
    910 	if (err)
    911 		return (err);
    912 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
    913 	da.snapname = snapname;
    914 	da.failed = fsname;
    915 	da.defer = defer;
    916 
    917 	err = dmu_objset_find(fsname,
    918 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
    919 
    920 	if (err == 0)
    921 		err = dsl_sync_task_group_wait(da.dstg);
    922 
    923 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
    924 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
    925 		struct dsl_ds_destroyarg *dsda = dst->dst_arg1;
    926 		dsl_dataset_t *ds = dsda->ds;
    927 
    928 		/*
    929 		 * Return the file system name that triggered the error
    930 		 */
    931 		if (dst->dst_err) {
    932 			dsl_dataset_name(ds, fsname);
    933 			*strchr(fsname, '@') = '\0';
    934 		}
    935 		ASSERT3P(dsda->rm_origin, ==, NULL);
    936 		dsl_dataset_disown(ds, da.dstg);
    937 		kmem_free(dsda, sizeof (struct dsl_ds_destroyarg));
    938 	}
    939 
    940 	dsl_sync_task_group_destroy(da.dstg);
    941 	spa_close(spa, FTAG);
    942 	return (err);
    943 }
    944 
    945 static boolean_t
    946 dsl_dataset_might_destroy_origin(dsl_dataset_t *ds)
    947 {
    948 	boolean_t might_destroy = B_FALSE;
    949 
    950 	mutex_enter(&ds->ds_lock);
    951 	if (ds->ds_phys->ds_num_children == 2 && ds->ds_userrefs == 0 &&
    952 	    DS_IS_DEFER_DESTROY(ds))
    953 		might_destroy = B_TRUE;
    954 	mutex_exit(&ds->ds_lock);
    955 
    956 	return (might_destroy);
    957 }
    958 
    959 /*
    960  * If we're removing a clone, and these three conditions are true:
    961  *	1) the clone's origin has no other children
    962  *	2) the clone's origin has no user references
    963  *	3) the clone's origin has been marked for deferred destruction
    964  * Then, prepare to remove the origin as part of this sync task group.
    965  */
    966 static int
    967 dsl_dataset_origin_rm_prep(struct dsl_ds_destroyarg *dsda, void *tag)
    968 {
    969 	dsl_dataset_t *ds = dsda->ds;
    970 	dsl_dataset_t *origin = ds->ds_prev;
    971 
    972 	if (dsl_dataset_might_destroy_origin(origin)) {
    973 		char *name;
    974 		int namelen;
    975 		int error;
    976 
    977 		namelen = dsl_dataset_namelen(origin) + 1;
    978 		name = kmem_alloc(namelen, KM_SLEEP);
    979 		dsl_dataset_name(origin, name);
    980 #ifdef _KERNEL
    981 		error = zfs_unmount_snap(name, NULL);
    982 		if (error) {
    983 			kmem_free(name, namelen);
    984 			return (error);
    985 		}
    986 #endif
    987 		error = dsl_dataset_own(name, B_TRUE, tag, &origin);
    988 		kmem_free(name, namelen);
    989 		if (error)
    990 			return (error);
    991 		dsda->rm_origin = origin;
    992 		dsl_dataset_make_exclusive(origin, tag);
    993 
    994 		if (origin->ds_objset != NULL) {
    995 			dmu_objset_evict(origin->ds_objset);
    996 			origin->ds_objset = NULL;
    997 		}
    998 	}
    999 
   1000 	return (0);
   1001 }
   1002 
   1003 /*
   1004  * ds must be opened as OWNER.  On return (whether successful or not),
   1005  * ds will be closed and caller can no longer dereference it.
   1006  */
   1007 int
   1008 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag, boolean_t defer)
   1009 {
   1010 	int err;
   1011 	dsl_sync_task_group_t *dstg;
   1012 	objset_t *os;
   1013 	dsl_dir_t *dd;
   1014 	uint64_t obj;
   1015 	struct dsl_ds_destroyarg dsda = { 0 };
   1016 	dsl_dataset_t dummy_ds = { 0 };
   1017 
   1018 	dsda.ds = ds;
   1019 
   1020 	if (dsl_dataset_is_snapshot(ds)) {
   1021 		/* Destroying a snapshot is simpler */
   1022 		dsl_dataset_make_exclusive(ds, tag);
   1023 
   1024 		if (ds->ds_objset != NULL) {
   1025 			dmu_objset_evict(ds->ds_objset);
   1026 			ds->ds_objset = NULL;
   1027 		}
   1028 		dsda.defer = defer;
   1029 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
   1030 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
   1031 		    &dsda, tag, 0);
   1032 		ASSERT3P(dsda.rm_origin, ==, NULL);
   1033 		goto out;
   1034 	} else if (defer) {
   1035 		err = EINVAL;
   1036 		goto out;
   1037 	}
   1038 
   1039 	dd = ds->ds_dir;
   1040 	dummy_ds.ds_dir = dd;
   1041 	dummy_ds.ds_object = ds->ds_object;
   1042 
   1043 	/*
   1044 	 * Check for errors and mark this ds as inconsistent, in
   1045 	 * case we crash while freeing the objects.
   1046 	 */
   1047 	err = dsl_sync_task_do(dd->dd_pool, dsl_dataset_destroy_begin_check,
   1048 	    dsl_dataset_destroy_begin_sync, ds, NULL, 0);
   1049 	if (err)
   1050 		goto out;
   1051 
   1052 	err = dmu_objset_from_ds(ds, &os);
   1053 	if (err)
   1054 		goto out;
   1055 
   1056 	/*
   1057 	 * remove the objects in open context, so that we won't
   1058 	 * have too much to do in syncing context.
   1059 	 */
   1060 	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE,
   1061 	    ds->ds_phys->ds_prev_snap_txg)) {
   1062 		/*
   1063 		 * Ignore errors, if there is not enough disk space
   1064 		 * we will deal with it in dsl_dataset_destroy_sync().
   1065 		 */
   1066 		(void) dmu_free_object(os, obj);
   1067 	}
   1068 
   1069 	/*
   1070 	 * We need to sync out all in-flight IO before we try to evict
   1071 	 * (the dataset evict func is trying to clear the cached entries
   1072 	 * for this dataset in the ARC).
   1073 	 */
   1074 	txg_wait_synced(dd->dd_pool, 0);
   1075 
   1076 	/*
   1077 	 * If we managed to free all the objects in open
   1078 	 * context, the user space accounting should be zero.
   1079 	 */
   1080 	if (ds->ds_phys->ds_bp.blk_fill == 0 &&
   1081 	    dmu_objset_userused_enabled(os)) {
   1082 		uint64_t count;
   1083 
   1084 		ASSERT(zap_count(os, DMU_USERUSED_OBJECT, &count) != 0 ||
   1085 		    count == 0);
   1086 		ASSERT(zap_count(os, DMU_GROUPUSED_OBJECT, &count) != 0 ||
   1087 		    count == 0);
   1088 	}
   1089 
   1090 	if (err != ESRCH)
   1091 		goto out;
   1092 
   1093 	rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
   1094 	err = dsl_dir_open_obj(dd->dd_pool, dd->dd_object, NULL, FTAG, &dd);
   1095 	rw_exit(&dd->dd_pool->dp_config_rwlock);
   1096 
   1097 	if (err)
   1098 		goto out;
   1099 
   1100 	if (ds->ds_objset) {
   1101 		/*
   1102 		 * We need to sync out all in-flight IO before we try
   1103 		 * to evict (the dataset evict func is trying to clear
   1104 		 * the cached entries for this dataset in the ARC).
   1105 		 */
   1106 		txg_wait_synced(dd->dd_pool, 0);
   1107 	}
   1108 
   1109 	/*
   1110 	 * Blow away the dsl_dir + head dataset.
   1111 	 */
   1112 	dsl_dataset_make_exclusive(ds, tag);
   1113 	if (ds->ds_objset) {
   1114 		dmu_objset_evict(ds->ds_objset);
   1115 		ds->ds_objset = NULL;
   1116 	}
   1117 
   1118 	/*
   1119 	 * If we're removing a clone, we might also need to remove its
   1120 	 * origin.
   1121 	 */
   1122 	do {
   1123 		dsda.need_prep = B_FALSE;
   1124 		if (dsl_dir_is_clone(dd)) {
   1125 			err = dsl_dataset_origin_rm_prep(&dsda, tag);
   1126 			if (err) {
   1127 				dsl_dir_close(dd, FTAG);
   1128 				goto out;
   1129 			}
   1130 		}
   1131 
   1132 		dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
   1133 		dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
   1134 		    dsl_dataset_destroy_sync, &dsda, tag, 0);
   1135 		dsl_sync_task_create(dstg, dsl_dir_destroy_check,
   1136 		    dsl_dir_destroy_sync, &dummy_ds, FTAG, 0);
   1137 		err = dsl_sync_task_group_wait(dstg);
   1138 		dsl_sync_task_group_destroy(dstg);
   1139 
   1140 		/*
   1141 		 * We could be racing against 'zfs release' or 'zfs destroy -d'
   1142 		 * on the origin snap, in which case we can get EBUSY if we
   1143 		 * needed to destroy the origin snap but were not ready to
   1144 		 * do so.
   1145 		 */
   1146 		if (dsda.need_prep) {
   1147 			ASSERT(err == EBUSY);
   1148 			ASSERT(dsl_dir_is_clone(dd));
   1149 			ASSERT(dsda.rm_origin == NULL);
   1150 		}
   1151 	} while (dsda.need_prep);
   1152 
   1153 	if (dsda.rm_origin != NULL)
   1154 		dsl_dataset_disown(dsda.rm_origin, tag);
   1155 
   1156 	/* if it is successful, dsl_dir_destroy_sync will close the dd */
   1157 	if (err)
   1158 		dsl_dir_close(dd, FTAG);
   1159 out:
   1160 	dsl_dataset_disown(ds, tag);
   1161 	return (err);
   1162 }
   1163 
   1164 blkptr_t *
   1165 dsl_dataset_get_blkptr(dsl_dataset_t *ds)
   1166 {
   1167 	return (&ds->ds_phys->ds_bp);
   1168 }
   1169 
   1170 void
   1171 dsl_dataset_set_blkptr(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
   1172 {
   1173 	ASSERT(dmu_tx_is_syncing(tx));
   1174 	/* If it's the meta-objset, set dp_meta_rootbp */
   1175 	if (ds == NULL) {
   1176 		tx->tx_pool->dp_meta_rootbp = *bp;
   1177 	} else {
   1178 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
   1179 		ds->ds_phys->ds_bp = *bp;
   1180 	}
   1181 }
   1182 
   1183 spa_t *
   1184 dsl_dataset_get_spa(dsl_dataset_t *ds)
   1185 {
   1186 	return (ds->ds_dir->dd_pool->dp_spa);
   1187 }
   1188 
   1189 void
   1190 dsl_dataset_dirty(dsl_dataset_t *ds, dmu_tx_t *tx)
   1191 {
   1192 	dsl_pool_t *dp;
   1193 
   1194 	if (ds == NULL) /* this is the meta-objset */
   1195 		return;
   1196 
   1197 	ASSERT(ds->ds_objset != NULL);
   1198 
   1199 	if (ds->ds_phys->ds_next_snap_obj != 0)
   1200 		panic("dirtying snapshot!");
   1201 
   1202 	dp = ds->ds_dir->dd_pool;
   1203 
   1204 	if (txg_list_add(&dp->dp_dirty_datasets, ds, tx->tx_txg) == 0) {
   1205 		/* up the hold count until we can be written out */
   1206 		dmu_buf_add_ref(ds->ds_dbuf, ds);
   1207 	}
   1208 }
   1209 
   1210 /*
   1211  * The unique space in the head dataset can be calculated by subtracting
   1212  * the space used in the most recent snapshot, that is still being used
   1213  * in this file system, from the space currently in use.  To figure out
   1214  * the space in the most recent snapshot still in use, we need to take
   1215  * the total space used in the snapshot and subtract out the space that
   1216  * has been freed up since the snapshot was taken.
   1217  */
   1218 static void
   1219 dsl_dataset_recalc_head_uniq(dsl_dataset_t *ds)
   1220 {
   1221 	uint64_t mrs_used;
   1222 	uint64_t dlused, dlcomp, dluncomp;
   1223 
   1224 	ASSERT(ds->ds_object == ds->ds_dir->dd_phys->dd_head_dataset_obj);
   1225 
   1226 	if (ds->ds_phys->ds_prev_snap_obj != 0)
   1227 		mrs_used = ds->ds_prev->ds_phys->ds_used_bytes;
   1228 	else
   1229 		mrs_used = 0;
   1230 
   1231 	VERIFY(0 == bplist_space(&ds->ds_deadlist, &dlused, &dlcomp,
   1232 	    &dluncomp));
   1233 
   1234 	ASSERT3U(dlused, <=, mrs_used);
   1235 	ds->ds_phys->ds_unique_bytes =
   1236 	    ds->ds_phys->ds_used_bytes - (mrs_used - dlused);
   1237 
   1238 	if (!DS_UNIQUE_IS_ACCURATE(ds) &&
   1239 	    spa_version(ds->ds_dir->dd_pool->dp_spa) >=
   1240 	    SPA_VERSION_UNIQUE_ACCURATE)
   1241 		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
   1242 }
   1243 
   1244 static uint64_t
   1245 dsl_dataset_unique(dsl_dataset_t *ds)
   1246 {
   1247 	if (!DS_UNIQUE_IS_ACCURATE(ds) && !dsl_dataset_is_snapshot(ds))
   1248 		dsl_dataset_recalc_head_uniq(ds);
   1249 
   1250 	return (ds->ds_phys->ds_unique_bytes);
   1251 }
   1252 
   1253 struct killarg {
   1254 	dsl_dataset_t *ds;
   1255 	dmu_tx_t *tx;
   1256 };
   1257 
   1258 /* ARGSUSED */
   1259 static int
   1260 kill_blkptr(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
   1261     const zbookmark_t *zb, const dnode_phys_t *dnp, void *arg)
   1262 {
   1263 	struct killarg *ka = arg;
   1264 	dmu_tx_t *tx = ka->tx;
   1265 
   1266 	if (bp == NULL)
   1267 		return (0);
   1268 
   1269 	if (zb->zb_level == ZB_ZIL_LEVEL) {
   1270 		ASSERT(zilog != NULL);
   1271 		/*
   1272 		 * It's a block in the intent log.  It has no
   1273 		 * accounting, so just free it.
   1274 		 */
   1275 		dsl_free(ka->tx->tx_pool, ka->tx->tx_txg, bp);
   1276 	} else {
   1277 		ASSERT(zilog == NULL);
   1278 		ASSERT3U(bp->blk_birth, >, ka->ds->ds_phys->ds_prev_snap_txg);
   1279 		(void) dsl_dataset_block_kill(ka->ds, bp, tx, B_FALSE);
   1280 	}
   1281 
   1282 	return (0);
   1283 }
   1284 
   1285 /* ARGSUSED */
   1286 static int
   1287 dsl_dataset_destroy_begin_check(void *arg1, void *arg2, dmu_tx_t *tx)
   1288 {
   1289 	dsl_dataset_t *ds = arg1;
   1290 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
   1291 	uint64_t count;
   1292 	int err;
   1293 
   1294 	/*
   1295 	 * Can't delete a head dataset if there are snapshots of it.
   1296 	 * (Except if the only snapshots are from the branch we cloned
   1297 	 * from.)
   1298 	 */
   1299 	if (ds->ds_prev != NULL &&
   1300 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
   1301 		return (EBUSY);
   1302 
   1303 	/*
   1304 	 * This is really a dsl_dir thing, but check it here so that
   1305 	 * we'll be less likely to leave this dataset inconsistent &
   1306 	 * nearly destroyed.
   1307 	 */
   1308 	err = zap_count(mos, ds->ds_dir->dd_phys->dd_child_dir_zapobj, &count);
   1309 	if (err)
   1310 		return (err);
   1311 	if (count != 0)
   1312 		return (EEXIST);
   1313 
   1314 	return (0);
   1315 }
   1316 
   1317 /* ARGSUSED */
   1318 static void
   1319 dsl_dataset_destroy_begin_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
   1320 {
   1321 	dsl_dataset_t *ds = arg1;
   1322 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
   1323 
   1324 	/* Mark it as inconsistent on-disk, in case we crash */
   1325 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
   1326 	ds->ds_phys->ds_flags |= DS_FLAG_INCONSISTENT;
   1327 
   1328 	spa_history_internal_log(LOG_DS_DESTROY_BEGIN, dp->dp_spa, tx,
   1329 	    cr, "dataset = %llu", ds->ds_object);
   1330 }
   1331 
   1332 static int
   1333 dsl_dataset_origin_check(struct dsl_ds_destroyarg *dsda, void *tag,
   1334     dmu_tx_t *tx)
   1335 {
   1336 	dsl_dataset_t *ds = dsda->ds;
   1337 	dsl_dataset_t *ds_prev = ds->ds_prev;
   1338 
   1339 	if (dsl_dataset_might_destroy_origin(ds_prev)) {
   1340 		struct dsl_ds_destroyarg ndsda = {0};
   1341 
   1342 		/*
   1343 		 * If we're not prepared to remove the origin, don't remove
   1344 		 * the clone either.
   1345 		 */
   1346 		if (dsda->rm_origin == NULL) {
   1347 			dsda->need_prep = B_TRUE;
   1348 			return (EBUSY);
   1349 		}
   1350 
   1351 		ndsda.ds = ds_prev;
   1352 		ndsda.is_origin_rm = B_TRUE;
   1353 		return (dsl_dataset_destroy_check(&ndsda, tag, tx));
   1354 	}
   1355 
   1356 	/*
   1357 	 * If we're not going to remove the origin after all,
   1358 	 * undo the open context setup.
   1359 	 */
   1360 	if (dsda->rm_origin != NULL) {
   1361 		dsl_dataset_disown(dsda->rm_origin, tag);
   1362 		dsda->rm_origin = NULL;
   1363 	}
   1364 
   1365 	return (0);
   1366 }
   1367 
   1368 /* ARGSUSED */
   1369 int
   1370 dsl_dataset_destroy_check(void *arg1, void *arg2, dmu_tx_t *tx)
   1371 {
   1372 	struct dsl_ds_destroyarg *dsda = arg1;
   1373 	dsl_dataset_t *ds = dsda->ds;
   1374 
   1375 	/* we have an owner hold, so noone else can destroy us */
   1376 	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
   1377 
   1378 	/*
   1379 	 * Only allow deferred destroy on pools that support it.
   1380 	 * NOTE: deferred destroy is only supported on snapshots.
   1381 	 */
   1382 	if (dsda->defer) {
   1383 		if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
   1384 		    SPA_VERSION_USERREFS)
   1385 			return (ENOTSUP);
   1386 		ASSERT(dsl_dataset_is_snapshot(ds));
   1387 		return (0);
   1388 	}
   1389 
   1390 	/*
   1391 	 * Can't delete a head dataset if there are snapshots of it.
   1392 	 * (Except if the only snapshots are from the branch we cloned
   1393 	 * from.)
   1394 	 */
   1395 	if (ds->ds_prev != NULL &&
   1396 	    ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object)
   1397 		return (EBUSY);
   1398 
   1399 	/*
   1400 	 * If we made changes this txg, traverse_dsl_dataset won't find
   1401 	 * them.  Try again.
   1402 	 */
   1403 	if (ds->ds_phys->ds_bp.blk_birth >= tx->tx_txg)
   1404 		return (EAGAIN);
   1405 
   1406 	if (dsl_dataset_is_snapshot(ds)) {
   1407 		/*
   1408 		 * If this snapshot has an elevated user reference count,
   1409 		 * we can't destroy it yet.
   1410 		 */
   1411 		if (ds->ds_userrefs > 0 && !dsda->releasing)
   1412 			return (EBUSY);
   1413 
   1414 		mutex_enter(&ds->ds_lock);
   1415 		/*
   1416 		 * Can't delete a branch point. However, if we're destroying
   1417 		 * a clone and removing its origin due to it having a user
   1418 		 * hold count of 0 and having been marked for deferred destroy,
   1419 		 * it's OK for the origin to have a single clone.
   1420 		 */
   1421 		if (ds->ds_phys->ds_num_children >
   1422 		    (dsda->is_origin_rm ? 2 : 1)) {
   1423 			mutex_exit(&ds->ds_lock);
   1424 			return (EEXIST);
   1425 		}
   1426 		mutex_exit(&ds->ds_lock);
   1427 	} else if (dsl_dir_is_clone(ds->ds_dir)) {
   1428 		return (dsl_dataset_origin_check(dsda, arg2, tx));
   1429 	}
   1430 
   1431 	/* XXX we should do some i/o error checking... */
   1432 	return (0);
   1433 }
   1434 
   1435 struct refsarg {
   1436 	kmutex_t lock;
   1437 	boolean_t gone;
   1438 	kcondvar_t cv;
   1439 };
   1440 
   1441 /* ARGSUSED */
   1442 static void
   1443 dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
   1444 {
   1445 	struct refsarg *arg = argv;
   1446 
   1447 	mutex_enter(&arg->lock);
   1448 	arg->gone = TRUE;
   1449 	cv_signal(&arg->cv);
   1450 	mutex_exit(&arg->lock);
   1451 }
   1452 
   1453 static void
   1454 dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
   1455 {
   1456 	struct refsarg arg;
   1457 
   1458 	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
   1459 	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
   1460 	arg.gone = FALSE;
   1461 	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
   1462 	    dsl_dataset_refs_gone);
   1463 	dmu_buf_rele(ds->ds_dbuf, tag);
   1464 	mutex_enter(&arg.lock);
   1465 	while (!arg.gone)
   1466 		cv_wait(&arg.cv, &arg.lock);
   1467 	ASSERT(arg.gone);
   1468 	mutex_exit(&arg.lock);
   1469 	ds->ds_dbuf = NULL;
   1470 	ds->ds_phys = NULL;
   1471 	mutex_destroy(&arg.lock);
   1472 	cv_destroy(&arg.cv);
   1473 }
   1474 
   1475 static void
   1476 remove_from_next_clones(dsl_dataset_t *ds, uint64_t obj, dmu_tx_t *tx)
   1477 {
   1478 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
   1479 	uint64_t count;
   1480 	int err;
   1481 
   1482 	ASSERT(ds->ds_phys->ds_num_children >= 2);
   1483 	err = zap_remove_int(mos, ds->ds_phys->ds_next_clones_obj, obj, tx);
   1484 	/*
   1485 	 * The err should not be ENOENT, but a bug in a previous version
   1486 	 * of the code could cause upgrade_clones_cb() to not set
   1487 	 * ds_next_snap_obj when it should, leading to a missing entry.
   1488 	 * If we knew that the pool was created after
   1489 	 * SPA_VERSION_NEXT_CLONES, we could assert that it isn't
   1490 	 * ENOENT.  However, at least we can check that we don't have
   1491 	 * too many entries in the next_clones_obj even after failing to
   1492 	 * remove this one.
   1493 	 */
   1494 	if (err != ENOENT) {
   1495 		VERIFY3U(err, ==, 0);
   1496 	}
   1497 	ASSERT3U(0, ==, zap_count(mos, ds->ds_phys->ds_next_clones_obj,
   1498 	    &count));
   1499 	ASSERT3U(count, <=, ds->ds_phys->ds_num_children - 2);
   1500 }
   1501 
   1502 void
   1503 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
   1504 {
   1505 	struct dsl_ds_destroyarg *dsda = arg1;
   1506 	dsl_dataset_t *ds = dsda->ds;
   1507 	int err;
   1508 	int after_branch_point = FALSE;
   1509 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
   1510 	objset_t *mos = dp->dp_meta_objset;
   1511 	dsl_dataset_t *ds_prev = NULL;
   1512 	uint64_t obj;
   1513 
   1514 	ASSERT(ds->ds_owner);
   1515 	ASSERT(dsda->defer || ds->ds_phys->ds_num_children <= 1);
   1516 	ASSERT(ds->ds_prev == NULL ||
   1517 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
   1518 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
   1519 
   1520 	if (dsda->defer) {
   1521 		ASSERT(spa_version(dp->dp_spa) >= SPA_VERSION_USERREFS);
   1522 		if (ds->ds_userrefs > 0 || ds->ds_phys->ds_num_children > 1) {
   1523 			dmu_buf_will_dirty(ds->ds_dbuf, tx);
   1524 			ds->ds_phys->ds_flags |= DS_FLAG_DEFER_DESTROY;
   1525 			return;
   1526 		}
   1527 	}
   1528 
   1529 	/* signal any waiters that this dataset is going away */
   1530 	mutex_enter(&ds->ds_lock);
   1531 	ds->ds_owner = dsl_reaper;
   1532 	cv_broadcast(&ds->ds_exclusive_cv);
   1533 	mutex_exit(&ds->ds_lock);
   1534 
   1535 	/* Remove our reservation */
   1536 	if (ds->ds_reserved != 0) {
   1537 		dsl_prop_setarg_t psa;
   1538 		uint64_t value = 0;
   1539 
   1540 		dsl_prop_setarg_init_uint64(&psa, "refreservation",
   1541 		    (ZPROP_SRC_NONE | ZPROP_SRC_LOCAL | ZPROP_SRC_RECEIVED),
   1542 		    &value);
   1543 		psa.psa_effective_value = 0;	/* predict default value */
   1544 
   1545 		dsl_dataset_set_reservation_sync(ds, &psa, cr, tx);
   1546 		ASSERT3U(ds->ds_reserved, ==, 0);
   1547 	}
   1548 
   1549 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
   1550 
   1551 	dsl_pool_ds_destroyed(ds, tx);
   1552 
   1553 	obj = ds->ds_object;
   1554 
   1555 	if (ds->ds_phys->ds_prev_snap_obj != 0) {
   1556 		if (ds->ds_prev) {
   1557 			ds_prev = ds->ds_prev;
   1558 		} else {
   1559 			VERIFY(0 == dsl_dataset_hold_obj(dp,
   1560 			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
   1561 		}
   1562 		after_branch_point =
   1563 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
   1564 
   1565 		dmu_buf_will_dirty(ds_prev->ds_dbuf, tx);
   1566 		if (after_branch_point &&
   1567 		    ds_prev->ds_phys->ds_next_clones_obj != 0) {
   1568 			remove_from_next_clones(ds_prev, obj, tx);
   1569 			if (ds->ds_phys->ds_next_snap_obj != 0) {
   1570 				VERIFY(0 == zap_add_int(mos,
   1571 				    ds_prev->ds_phys->ds_next_clones_obj,
   1572 				    ds->ds_phys->ds_next_snap_obj, tx));
   1573 			}
   1574 		}
   1575 		if (after_branch_point &&
   1576 		    ds->ds_phys->ds_next_snap_obj == 0) {
   1577 			/* This clone is toast. */
   1578 			ASSERT(ds_prev->ds_phys->ds_num_children > 1);
   1579 			ds_prev->ds_phys->ds_num_children--;
   1580 
   1581 			/*
   1582 			 * If the clone's origin has no other clones, no
   1583 			 * user holds, and has been marked for deferred
   1584 			 * deletion, then we should have done the necessary
   1585 			 * destroy setup for it.
   1586 			 */
   1587 			if (ds_prev->ds_phys->ds_num_children == 1 &&
   1588 			    ds_prev->ds_userrefs == 0 &&
   1589 			    DS_IS_DEFER_DESTROY(ds_prev)) {
   1590 				ASSERT3P(dsda->rm_origin, !=, NULL);
   1591 			} else {
   1592 				ASSERT3P(dsda->rm_origin, ==, NULL);
   1593 			}
   1594 		} else if (!after_branch_point) {
   1595 			ds_prev->ds_phys->ds_next_snap_obj =
   1596 			    ds->ds_phys->ds_next_snap_obj;
   1597 		}
   1598 	}
   1599 
   1600 	if (ds->ds_phys->ds_next_snap_obj != 0) {
   1601 		blkptr_t bp;
   1602 		dsl_dataset_t *ds_next;
   1603 		uint64_t itor = 0;
   1604 		uint64_t old_unique;
   1605 		int64_t used = 0, compressed = 0, uncompressed = 0;
   1606 
   1607 		VERIFY(0 == dsl_dataset_hold_obj(dp,
   1608 		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
   1609 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
   1610 
   1611 		old_unique = dsl_dataset_unique(ds_next);
   1612 
   1613 		dmu_buf_will_dirty(ds_next->ds_dbuf, tx);
   1614 		ds_next->ds_phys->ds_prev_snap_obj =
   1615 		    ds->ds_phys->ds_prev_snap_obj;
   1616 		ds_next->ds_phys->ds_prev_snap_txg =
   1617 		    ds->ds_phys->ds_prev_snap_txg;
   1618 		ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
   1619 		    ds_prev ? ds_prev->ds_phys->ds_creation_txg : 0);
   1620 
   1621 		/*
   1622 		 * Transfer to our deadlist (which will become next's
   1623 		 * new deadlist) any entries from next's current
   1624 		 * deadlist which were born before prev, and free the
   1625 		 * other entries.
   1626 		 *
   1627 		 * XXX we're doing this long task with the config lock held
   1628 		 */
   1629 		while (bplist_iterate(&ds_next->ds_deadlist, &itor, &bp) == 0) {
   1630 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
   1631 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
   1632 				    &bp, tx));
   1633 				if (ds_prev && !after_branch_point &&
   1634 				    bp.blk_birth >
   1635 				    ds_prev->ds_phys->ds_prev_snap_txg) {
   1636 					ds_prev->ds_phys->ds_unique_bytes +=
   1637 					    bp_get_dsize_sync(dp->dp_spa, &bp);
   1638 				}
   1639 			} else {
   1640 				used += bp_get_dsize_sync(dp->dp_spa, &bp);
   1641 				compressed += BP_GET_PSIZE(&bp);
   1642 				uncompressed += BP_GET_UCSIZE(&bp);
   1643 				dsl_free(dp, tx->tx_txg, &bp);
   1644 			}
   1645 		}
   1646 
   1647 		ASSERT3U(used, ==, ds->ds_phys->ds_unique_bytes);
   1648 
   1649 		/* change snapused */
   1650 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_SNAP,
   1651 		    -used, -compressed, -uncompressed, tx);
   1652 
   1653 		/* free next's deadlist */
   1654 		bplist_close(&ds_next->ds_deadlist);
   1655 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
   1656 
   1657 		/* set next's deadlist to our deadlist */
   1658 		bplist_close(&ds->ds_deadlist);
   1659 		ds_next->ds_phys->ds_deadlist_obj =
   1660 		    ds->ds_phys->ds_deadlist_obj;
   1661 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
   1662 		    ds_next->ds_phys->ds_deadlist_obj));
   1663 		ds->ds_phys->ds_deadlist_obj = 0;
   1664 
   1665 		if (ds_next->ds_phys->ds_next_snap_obj != 0) {
   1666 			/*
   1667 			 * Update next's unique to include blocks which
   1668 			 * were previously shared by only this snapshot
   1669 			 * and it.  Those blocks will be born after the
   1670 			 * prev snap and before this snap, and will have
   1671 			 * died after the next snap and before the one
   1672 			 * after that (ie. be on the snap after next's
   1673 			 * deadlist).
   1674 			 *
   1675 			 * XXX we're doing this long task with the
   1676 			 * config lock held
   1677 			 */
   1678 			dsl_dataset_t *ds_after_next;
   1679 			uint64_t space;
   1680 
   1681 			VERIFY(0 == dsl_dataset_hold_obj(dp,
   1682 			    ds_next->ds_phys->ds_next_snap_obj,
   1683 			    FTAG, &ds_after_next));
   1684 
   1685 			VERIFY(0 ==
   1686 			    bplist_space_birthrange(&ds_after_next->ds_deadlist,
   1687 			    ds->ds_phys->ds_prev_snap_txg,
   1688 			    ds->ds_phys->ds_creation_txg, &space));
   1689 			ds_next->ds_phys->ds_unique_bytes += space;
   1690 
   1691 			dsl_dataset_rele(ds_after_next, FTAG);
   1692 			ASSERT3P(ds_next->ds_prev, ==, NULL);
   1693 		} else {
   1694 			ASSERT3P(ds_next->ds_prev, ==, ds);
   1695 			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
   1696 			ds_next->ds_prev = NULL;
   1697 			if (ds_prev) {
   1698 				VERIFY(0 == dsl_dataset_get_ref(dp,
   1699 				    ds->ds_phys->ds_prev_snap_obj,
   1700 				    ds_next, &ds_next->ds_prev));
   1701 			}
   1702 
   1703 			dsl_dataset_recalc_head_uniq(ds_next);
   1704 
   1705 			/*
   1706 			 * Reduce the amount of our unconsmed refreservation
   1707 			 * being charged to our parent by the amount of
   1708 			 * new unique data we have gained.
   1709 			 */
   1710 			if (old_unique < ds_next->ds_reserved) {
   1711 				int64_t mrsdelta;
   1712 				uint64_t new_unique =
   1713 				    ds_next->ds_phys->ds_unique_bytes;
   1714 
   1715 				ASSERT(old_unique <= new_unique);
   1716 				mrsdelta = MIN(new_unique - old_unique,
   1717 				    ds_next->ds_reserved - old_unique);
   1718 				dsl_dir_diduse_space(ds->ds_dir,
   1719 				    DD_USED_REFRSRV, -mrsdelta, 0, 0, tx);
   1720 			}
   1721 		}
   1722 		dsl_dataset_rele(ds_next, FTAG);
   1723 	} else {
   1724 		/*
   1725 		 * There's no next snapshot, so this is a head dataset.
   1726 		 * Destroy the deadlist.  Unless it's a clone, the
   1727 		 * deadlist should be empty.  (If it's a clone, it's
   1728 		 * safe to ignore the deadlist contents.)
   1729 		 */
   1730 		struct killarg ka;
   1731 
   1732 		ASSERT(after_branch_point || bplist_empty(&ds->ds_deadlist));
   1733 		bplist_close(&ds->ds_deadlist);
   1734 		bplist_destroy(mos, ds->ds_phys->ds_deadlist_obj, tx);
   1735 		ds->ds_phys->ds_deadlist_obj = 0;
   1736 
   1737 		/*
   1738 		 * Free everything that we point to (that's born after
   1739 		 * the previous snapshot, if we are a clone)
   1740 		 *
   1741 		 * NB: this should be very quick, because we already
   1742 		 * freed all the objects in open context.
   1743 		 */
   1744 		ka.ds = ds;
   1745 		ka.tx = tx;
   1746 		err = traverse_dataset(ds, ds->ds_phys->ds_prev_snap_txg,
   1747 		    TRAVERSE_POST, kill_blkptr, &ka);
   1748 		ASSERT3U(err, ==, 0);
   1749 		ASSERT(!DS_UNIQUE_IS_ACCURATE(ds) ||
   1750 		    ds->ds_phys->ds_unique_bytes == 0);
   1751 
   1752 		if (ds->ds_prev != NULL) {
   1753 			dsl_dataset_rele(ds->ds_prev, ds);
   1754 			ds->ds_prev = ds_prev = NULL;
   1755 		}
   1756 	}
   1757 
   1758 	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
   1759 		/* Erase the link in the dir */
   1760 		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
   1761 		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
   1762 		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
   1763 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
   1764 		ASSERT(err == 0);
   1765 	} else {
   1766 		/* remove from snapshot namespace */
   1767 		dsl_dataset_t *ds_head;
   1768 		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
   1769 		VERIFY(0 == dsl_dataset_hold_obj(dp,
   1770 		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
   1771 		VERIFY(0 == dsl_dataset_get_snapname(ds));
   1772 #ifdef ZFS_DEBUG
   1773 		{
   1774 			uint64_t val;
   1775 
   1776 			err = dsl_dataset_snap_lookup(ds_head,
   1777 			    ds->ds_snapname, &val);
   1778 			ASSERT3U(err, ==, 0);
   1779 			ASSERT3U(val, ==, obj);
   1780 		}
   1781 #endif
   1782 		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
   1783 		ASSERT(err == 0);
   1784 		dsl_dataset_rele(ds_head, FTAG);
   1785 	}
   1786 
   1787 	if (ds_prev && ds->ds_prev != ds_prev)
   1788 		dsl_dataset_rele(ds_prev, FTAG);
   1789 
   1790 	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
   1791 	spa_history_internal_log(LOG_DS_DESTROY, dp->dp_spa, tx,
   1792 	    cr, "dataset = %llu", ds->ds_object);
   1793 
   1794 	if (ds->ds_phys->ds_next_clones_obj != 0) {
   1795 		uint64_t count;
   1796 		ASSERT(0 == zap_count(mos,
   1797 		    ds->ds_phys->ds_next_clones_obj, &count) && count == 0);
   1798 		VERIFY(0 == dmu_object_free(mos,
   1799 		    ds->ds_phys->ds_next_clones_obj, tx));
   1800 	}
   1801 	if (ds->ds_phys->ds_props_obj != 0)
   1802 		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_props_obj, tx));
   1803 	if (ds->ds_phys->ds_userrefs_obj != 0)
   1804 		VERIFY(0 == zap_destroy(mos, ds->ds_phys->ds_userrefs_obj, tx));
   1805 	dsl_dir_close(ds->ds_dir, ds);
   1806 	ds->ds_dir = NULL;
   1807 	dsl_dataset_drain_refs(ds, tag);
   1808 	VERIFY(0 == dmu_object_free(mos, obj, tx));
   1809 
   1810 	if (dsda->rm_origin) {
   1811 		/*
   1812 		 * Remove the origin of the clone we just destroyed.
   1813 		 */
   1814 		struct dsl_ds_destroyarg ndsda = {0};
   1815 
   1816 		ndsda.ds = dsda->rm_origin;
   1817 		dsl_dataset_destroy_sync(&ndsda, tag, cr, tx);
   1818 	}
   1819 }
   1820 
   1821 static int
   1822 dsl_dataset_snapshot_reserve_space(dsl_dataset_t *ds, dmu_tx_t *tx)
   1823 {
   1824 	uint64_t asize;
   1825 
   1826 	if (!dmu_tx_is_syncing(tx))
   1827 		return (0);
   1828 
   1829 	/*
   1830 	 * If there's an fs-only reservation, any blocks that might become
   1831 	 * owned by the snapshot dataset must be accommodated by space
   1832 	 * outside of the reservation.
   1833 	 */
   1834 	asize = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
   1835 	if (asize > dsl_dir_space_available(ds->ds_dir, NULL, 0, FALSE))
   1836 		return (ENOSPC);
   1837 
   1838 	/*
   1839 	 * Propogate any reserved space for this snapshot to other
   1840 	 * snapshot checks in this sync group.
   1841 	 */
   1842 	if (asize > 0)
   1843 		dsl_dir_willuse_space(ds->ds_dir, asize, tx);
   1844 
   1845 	return (0);
   1846 }
   1847 
   1848 /* ARGSUSED */
   1849 int
   1850 dsl_dataset_snapshot_check(void *arg1, void *arg2, dmu_tx_t *tx)
   1851 {
   1852 	dsl_dataset_t *ds = arg1;
   1853 	const char *snapname = arg2;
   1854 	int err;
   1855 	uint64_t value;
   1856 
   1857 	/*
   1858 	 * We don't allow multiple snapshots of the same txg.  If there
   1859 	 * is already one, try again.
   1860 	 */
   1861 	if (ds->ds_phys->ds_prev_snap_txg >= tx->tx_txg)
   1862 		return (EAGAIN);
   1863 
   1864 	/*
   1865 	 * Check for conflicting name snapshot name.
   1866 	 */
   1867 	err = dsl_dataset_snap_lookup(ds, snapname, &value);
   1868 	if (err == 0)
   1869 		return (EEXIST);
   1870 	if (err != ENOENT)
   1871 		return (err);
   1872 
   1873 	/*
   1874 	 * Check that the dataset's name is not too long.  Name consists
   1875 	 * of the dataset's length + 1 for the @-sign + snapshot name's length
   1876 	 */
   1877 	if (dsl_dataset_namelen(ds) + 1 + strlen(snapname) >= MAXNAMELEN)
   1878 		return (ENAMETOOLONG);
   1879 
   1880 	err = dsl_dataset_snapshot_reserve_space(ds, tx);
   1881 	if (err)
   1882 		return (err);
   1883 
   1884 	ds->ds_trysnap_txg = tx->tx_txg;
   1885 	return (0);
   1886 }
   1887 
   1888 void
   1889 dsl_dataset_snapshot_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
   1890 {
   1891 	dsl_dataset_t *ds = arg1;
   1892 	const char *snapname = arg2;
   1893 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
   1894 	dmu_buf_t *dbuf;
   1895 	dsl_dataset_phys_t *dsphys;
   1896 	uint64_t dsobj, crtxg;
   1897 	objset_t *mos = dp->dp_meta_objset;
   1898 	int err;
   1899 
   1900 	ASSERT(RW_WRITE_HELD(&dp->dp_config_rwlock));
   1901 
   1902 	/*
   1903 	 * The origin's ds_creation_txg has to be < TXG_INITIAL
   1904 	 */
   1905 	if (strcmp(snapname, ORIGIN_DIR_NAME) == 0)
   1906 		crtxg = 1;
   1907 	else
   1908 		crtxg = tx->tx_txg;
   1909 
   1910 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
   1911 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
   1912 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
   1913 	dmu_buf_will_dirty(dbuf, tx);
   1914 	dsphys = dbuf->db_data;
   1915 	bzero(dsphys, sizeof (dsl_dataset_phys_t));
   1916 	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
   1917 	dsphys->ds_fsid_guid = unique_create();
   1918 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
   1919 	    sizeof (dsphys->ds_guid));
   1920 	dsphys->ds_prev_snap_obj = ds->ds_phys->ds_prev_snap_obj;
   1921 	dsphys->ds_prev_snap_txg = ds->ds_phys->ds_prev_snap_txg;
   1922 	dsphys->ds_next_snap_obj = ds->ds_object;
   1923 	dsphys->ds_num_children = 1;
   1924 	dsphys->ds_creation_time = gethrestime_sec();
   1925 	dsphys->ds_creation_txg = crtxg;
   1926 	dsphys->ds_deadlist_obj = ds->ds_phys->ds_deadlist_obj;
   1927 	dsphys->ds_used_bytes = ds->ds_phys->ds_used_bytes;
   1928 	dsphys->ds_compressed_bytes = ds->ds_phys->ds_compressed_bytes;
   1929 	dsphys->ds_uncompressed_bytes = ds->ds_phys->ds_uncompressed_bytes;
   1930 	dsphys->ds_flags = ds->ds_phys->ds_flags;
   1931 	dsphys->ds_bp = ds->ds_phys->ds_bp;
   1932 	dmu_buf_rele(dbuf, FTAG);
   1933 
   1934 	ASSERT3U(ds->ds_prev != 0, ==, ds->ds_phys->ds_prev_snap_obj != 0);
   1935 	if (ds->ds_prev) {
   1936 		uint64_t next_clones_obj =
   1937 		    ds->ds_prev->ds_phys->ds_next_clones_obj;
   1938 		ASSERT(ds->ds_prev->ds_phys->ds_next_snap_obj ==
   1939 		    ds->ds_object ||
   1940 		    ds->ds_prev->ds_phys->ds_num_children > 1);
   1941 		if (ds->ds_prev->ds_phys->ds_next_snap_obj == ds->ds_object) {
   1942 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
   1943 			ASSERT3U(ds->ds_phys->ds_prev_snap_txg, ==,
   1944 			    ds->ds_prev->ds_phys->ds_creation_txg);
   1945 			ds->ds_prev->ds_phys->ds_next_snap_obj = dsobj;
   1946 		} else if (next_clones_obj != 0) {
   1947 			remove_from_next_clones(ds->ds_prev,
   1948 			    dsphys->ds_next_snap_obj, tx);
   1949 			VERIFY3U(0, ==, zap_add_int(mos,
   1950 			    next_clones_obj, dsobj, tx));
   1951 		}
   1952 	}
   1953 
   1954 	/*
   1955 	 * If we have a reference-reservation on this dataset, we will
   1956 	 * need to increase the amount of refreservation being charged
   1957 	 * since our unique space is going to zero.
   1958 	 */
   1959 	if (ds->ds_reserved) {
   1960 		int64_t add = MIN(dsl_dataset_unique(ds), ds->ds_reserved);
   1961 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV,
   1962 		    add, 0, 0, tx);
   1963 	}
   1964 
   1965 	bplist_close(&ds->ds_deadlist);
   1966 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
   1967 	ASSERT3U(ds->ds_phys->ds_prev_snap_txg, <, tx->tx_txg);
   1968 	ds->ds_phys->ds_prev_snap_obj = dsobj;
   1969 	ds->ds_phys->ds_prev_snap_txg = crtxg;
   1970 	ds->ds_phys->ds_unique_bytes = 0;
   1971 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
   1972 		ds->ds_phys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
   1973 	ds->ds_phys->ds_deadlist_obj =
   1974 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
   1975 	VERIFY(0 == bplist_open(&ds->ds_deadlist, mos,
   1976 	    ds->ds_phys->ds_deadlist_obj));
   1977 
   1978 	dprintf("snap '%s' -> obj %llu\n", snapname, dsobj);
   1979 	err = zap_add(mos, ds->ds_phys->ds_snapnames_zapobj,
   1980 	    snapname, 8, 1, &dsobj, tx);
   1981 	ASSERT(err == 0);
   1982 
   1983 	if (ds->ds_prev)
   1984 		dsl_dataset_drop_ref(ds->ds_prev, ds);
   1985 	VERIFY(0 == dsl_dataset_get_ref(dp,
   1986 	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
   1987 
   1988 	dsl_pool_ds_snapshotted(ds, tx);
   1989 
   1990 	dsl_dir_snap_cmtime_update(ds->ds_dir);
   1991 
   1992 	spa_history_internal_log(LOG_DS_SNAPSHOT, dp->dp_spa, tx, cr,
   1993 	    "dataset = %llu", dsobj);
   1994 }
   1995 
   1996 void
   1997 dsl_dataset_sync(dsl_dataset_t *ds, zio_t *zio, dmu_tx_t *tx)
   1998 {
   1999 	ASSERT(dmu_tx_is_syncing(tx));
   2000 	ASSERT(ds->ds_objset != NULL);
   2001 	ASSERT(ds->ds_phys->ds_next_snap_obj == 0);
   2002 
   2003 	/*
   2004 	 * in case we had to change ds_fsid_guid when we opened it,
   2005 	 * sync it out now.
   2006 	 */
   2007 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
   2008 	ds->ds_phys->ds_fsid_guid = ds->ds_fsid_guid;
   2009 
   2010 	dsl_dir_dirty(ds->ds_dir, tx);
   2011 	dmu_objset_sync(ds->ds_objset, zio, tx);
   2012 }
   2013 
   2014 void
   2015 dsl_dataset_stats(dsl_dataset_t *ds, nvlist_t *nv)
   2016 {
   2017 	uint64_t refd, avail, uobjs, aobjs;
   2018 
   2019 	dsl_dir_stats(ds->ds_dir, nv);
   2020 
   2021 	dsl_dataset_space(ds, &refd, &avail, &uobjs, &aobjs);
   2022 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_AVAILABLE, avail);
   2023 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFERENCED, refd);
   2024 
   2025 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATION,
   2026 	    ds->ds_phys->ds_creation_time);
   2027 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_CREATETXG,
   2028 	    ds->ds_phys->ds_creation_txg);
   2029 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFQUOTA,
   2030 	    ds->ds_quota);
   2031 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_REFRESERVATION,
   2032 	    ds->ds_reserved);
   2033 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_GUID,
   2034 	    ds->ds_phys->ds_guid);
   2035 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_UNIQUE,
   2036 	    dsl_dataset_unique(ds));
   2037 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_OBJSETID,
   2038 	    ds->ds_object);
   2039 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERREFS,
   2040 	    ds->ds_userrefs);
   2041 	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_DEFER_DESTROY,
   2042 	    DS_IS_DEFER_DESTROY(ds) ? 1 : 0);
   2043 
   2044 	if (ds->ds_phys->ds_next_snap_obj) {
   2045 		/*
   2046 		 * This is a snapshot; override the dd's space used with
   2047 		 * our unique space and compression ratio.
   2048 		 */
   2049 		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USED,
   2050 		    ds->ds_phys->ds_unique_bytes);
   2051 		dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_COMPRESSRATIO,
   2052 		    ds->ds_phys->ds_compressed_bytes == 0 ? 100 :
   2053 		    (ds->ds_phys->ds_uncompressed_bytes * 100 /
   2054 		    ds->ds_phys->ds_compressed_bytes));
   2055 	}
   2056 }
   2057 
   2058 void
   2059 dsl_dataset_fast_stat(dsl_dataset_t *ds, dmu_objset_stats_t *stat)
   2060 {
   2061 	stat->dds_creation_txg = ds->ds_phys->ds_creation_txg;
   2062 	stat->dds_inconsistent = ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT;
   2063 	stat->dds_guid = ds->ds_phys->ds_guid;
   2064 	if (ds->ds_phys->ds_next_snap_obj) {
   2065 		stat->dds_is_snapshot = B_TRUE;
   2066 		stat->dds_num_clones = ds->ds_phys->ds_num_children - 1;
   2067 	} else {
   2068 		stat->dds_is_snapshot = B_FALSE;
   2069 		stat->dds_num_clones = 0;
   2070 	}
   2071 
   2072 	/* clone origin is really a dsl_dir thing... */
   2073 	rw_enter(&ds->ds_dir->dd_pool->dp_config_rwlock, RW_READER);
   2074 	if (dsl_dir_is_clone(ds->ds_dir)) {
   2075 		dsl_dataset_t *ods;
   2076 
   2077 		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
   2078 		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
   2079 		dsl_dataset_name(ods, stat->dds_origin);
   2080 		dsl_dataset_drop_ref(ods, FTAG);
   2081 	} else {
   2082 		stat->dds_origin[0] = '\0';
   2083 	}
   2084 	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
   2085 }
   2086 
   2087 uint64_t
   2088 dsl_dataset_fsid_guid(dsl_dataset_t *ds)
   2089 {
   2090 	return (ds->ds_fsid_guid);
   2091 }
   2092 
   2093 void
   2094 dsl_dataset_space(dsl_dataset_t *ds,
   2095     uint64_t *refdbytesp, uint64_t *availbytesp,
   2096     uint64_t *usedobjsp, uint64_t *availobjsp)
   2097 {
   2098 	*refdbytesp = ds->ds_phys->ds_used_bytes;
   2099 	*availbytesp = dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE);
   2100 	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes)
   2101 		*availbytesp += ds->ds_reserved - ds->ds_phys->ds_unique_bytes;
   2102 	if (ds->ds_quota != 0) {
   2103 		/*
   2104 		 * Adjust available bytes according to refquota
   2105 		 */
   2106 		if (*refdbytesp < ds->ds_quota)
   2107 			*availbytesp = MIN(*availbytesp,
   2108 			    ds->ds_quota - *refdbytesp);
   2109 		else
   2110 			*availbytesp = 0;
   2111 	}
   2112 	*usedobjsp = ds->ds_phys->ds_bp.blk_fill;
   2113 	*availobjsp = DN_MAX_OBJECT - *usedobjsp;
   2114 }
   2115 
   2116 boolean_t
   2117 dsl_dataset_modified_since_lastsnap(dsl_dataset_t *ds)
   2118 {
   2119 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
   2120 
   2121 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
   2122 	    dsl_pool_sync_context(dp));
   2123 	if (ds->ds_prev == NULL)
   2124 		return (B_FALSE);
   2125 	if (ds->ds_phys->ds_bp.blk_birth >
   2126 	    ds->ds_prev->ds_phys->ds_creation_txg)
   2127 		return (B_TRUE);
   2128 	return (B_FALSE);
   2129 }
   2130 
   2131 /* ARGSUSED */
   2132 static int
   2133 dsl_dataset_snapshot_rename_check(void *arg1, void *arg2, dmu_tx_t *tx)
   2134 {
   2135 	dsl_dataset_t *ds = arg1;
   2136 	char *newsnapname = arg2;
   2137 	dsl_dir_t *dd = ds->ds_dir;
   2138 	dsl_dataset_t *hds;
   2139 	uint64_t val;
   2140 	int err;
   2141 
   2142 	err = dsl_dataset_hold_obj(dd->dd_pool,
   2143 	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
   2144 	if (err)
   2145 		return (err);
   2146 
   2147 	/* new name better not be in use */
   2148 	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
   2149 	dsl_dataset_rele(hds, FTAG);
   2150 
   2151 	if (err == 0)
   2152 		err = EEXIST;
   2153 	else if (err == ENOENT)
   2154 		err = 0;
   2155 
   2156 	/* dataset name + 1 for the "@" + the new snapshot name must fit */
   2157 	if (dsl_dir_namelen(ds->ds_dir) + 1 + strlen(newsnapname) >= MAXNAMELEN)
   2158 		err = ENAMETOOLONG;
   2159 
   2160 	return (err);
   2161 }
   2162 
   2163 static void
   2164 dsl_dataset_snapshot_rename_sync(void *arg1, void *arg2,
   2165     cred_t *cr, dmu_tx_t *tx)
   2166 {
   2167 	dsl_dataset_t *ds = arg1;
   2168 	const char *newsnapname = arg2;
   2169 	dsl_dir_t *dd = ds->ds_dir;
   2170 	objset_t *mos = dd->dd_pool->dp_meta_objset;
   2171 	dsl_dataset_t *hds;
   2172 	int err;
   2173 
   2174 	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
   2175 
   2176 	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
   2177 	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
   2178 
   2179 	VERIFY(0 == dsl_dataset_get_snapname(ds));
   2180 	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
   2181 	ASSERT3U(err, ==, 0);
   2182 	mutex_enter(&ds->ds_lock);
   2183 	(void) strcpy(ds->ds_snapname, newsnapname);
   2184 	mutex_exit(&ds->ds_lock);
   2185 	err = zap_add(mos, hds->ds_phys->ds_snapnames_zapobj,
   2186 	    ds->ds_snapname, 8, 1, &ds->ds_object, tx);
   2187 	ASSERT3U(err, ==, 0);
   2188 
   2189 	spa_history_internal_log(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
   2190 	    cr, "dataset = %llu", ds->ds_object);
   2191 	dsl_dataset_rele(hds, FTAG);
   2192 }
   2193 
   2194 struct renamesnaparg {
   2195 	dsl_sync_task_group_t *dstg;
   2196 	char failed[MAXPATHLEN];
   2197 	char *oldsnap;
   2198 	char *newsnap;
   2199 };
   2200 
   2201 static int
   2202 dsl_snapshot_rename_one(const char *name, void *arg)
   2203 {
   2204 	struct renamesnaparg *ra = arg;
   2205 	dsl_dataset_t *ds = NULL;
   2206 	char *snapname;
   2207 	int err;
   2208 
   2209 	snapname = kmem_asprintf("%s@%s", name, ra->oldsnap);
   2210 	(void) strlcpy(ra->failed, snapname, sizeof (ra->failed));
   2211 
   2212 	/*
   2213 	 * For recursive snapshot renames the parent won't be changing
   2214 	 * so we just pass name for both the to/from argument.
   2215 	 */
   2216 	err = zfs_secpolicy_rename_perms(snapname, snapname, CRED());
   2217 	if (err != 0) {
   2218 		strfree(snapname);
   2219 		return (err == ENOENT ? 0 : err);
   2220 	}
   2221 
   2222 #ifdef _KERNEL
   2223 	/*
   2224 	 * For all filesystems undergoing rename, we'll need to unmount it.
   2225 	 */
   2226 	(void) zfs_unmount_snap(snapname, NULL);
   2227 #endif
   2228 	err = dsl_dataset_hold(snapname, ra->dstg, &ds);
   2229 	strfree(snapname);
   2230 	if (err != 0)
   2231 		return (err == ENOENT ? 0 : err);
   2232 
   2233 	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
   2234 	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
   2235 
   2236 	return (0);
   2237 }
   2238 
   2239 static int
   2240 dsl_recursive_rename(char *oldname, const char *newname)
   2241 {
   2242 	int err;
   2243 	struct renamesnaparg *ra;
   2244 	dsl_sync_task_t *dst;
   2245 	spa_t *spa;
   2246 	char *cp, *fsname = spa_strdup(oldname);
   2247 	int len = strlen(oldname) + 1;
   2248 
   2249 	/* truncate the snapshot name to get the fsname */
   2250 	cp = strchr(fsname, '@');
   2251 	*cp = '\0';
   2252 
   2253 	err = spa_open(fsname, &spa, FTAG);
   2254 	if (err) {
   2255 		kmem_free(fsname, len);
   2256 		return (err);
   2257 	}
   2258 	ra = kmem_alloc(sizeof (struct renamesnaparg), KM_SLEEP);
   2259 	ra->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
   2260 
   2261 	ra->oldsnap = strchr(oldname, '@') + 1;
   2262 	ra->newsnap = strchr(newname, '@') + 1;
   2263 	*ra->failed = '\0';
   2264 
   2265 	err = dmu_objset_find(fsname, dsl_snapshot_rename_one, ra,
   2266 	    DS_FIND_CHILDREN);
   2267 	kmem_free(fsname, len);
   2268 
   2269 	if (err == 0) {
   2270 		err = dsl_sync_task_group_wait(ra->dstg);
   2271 	}
   2272 
   2273 	for (dst = list_head(&ra->dstg->dstg_tasks); dst;
   2274 	    dst = list_next(&ra->dstg->dstg_tasks, dst)) {
   2275 		dsl_dataset_t *ds = dst->dst_arg1;
   2276 		if (dst->dst_err) {
   2277 			dsl_dir_name(ds->ds_dir, ra->failed);
   2278 			(void) strlcat(ra->failed, "@", sizeof (ra->failed));
   2279 			(void) strlcat(ra->failed, ra->newsnap,
   2280 			    sizeof (ra->failed));
   2281 		}
   2282 		dsl_dataset_rele(ds, ra->dstg);
   2283 	}
   2284 
   2285 	if (err)
   2286 		(void) strlcpy(oldname, ra->failed, sizeof (ra->failed));
   2287 
   2288 	dsl_sync_task_group_destroy(ra->dstg);
   2289 	kmem_free(ra, sizeof (struct renamesnaparg));
   2290 	spa_close(spa, FTAG);
   2291 	return (err);
   2292 }
   2293 
   2294 static int
   2295 dsl_valid_rename(const char *oldname, void *arg)
   2296 {
   2297 	int delta = *(int *)arg;
   2298 
   2299 	if (strlen(oldname) + delta >= MAXNAMELEN)
   2300 		return (ENAMETOOLONG);
   2301 
   2302 	return (0);
   2303 }
   2304 
   2305 #pragma weak dmu_objset_rename = dsl_dataset_rename
   2306 int
   2307 dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
   2308 {
   2309 	dsl_dir_t *dd;
   2310 	dsl_dataset_t *ds;
   2311 	const char *tail;
   2312 	int err;
   2313 
   2314 	err = dsl_dir_open(oldname, FTAG, &dd, &tail);
   2315 	if (err)
   2316 		return (err);
   2317 	/*
   2318 	 * If there are more than 2 references there may be holds
   2319 	 * hanging around that haven't been cleared out yet.
   2320 	 */
   2321 	if (dmu_buf_refcount(dd->dd_dbuf) > 2)
   2322 		txg_wait_synced(dd->dd_pool, 0);
   2323 	if (tail == NULL) {
   2324 		int delta = strlen(newname) - strlen(oldname);
   2325 
   2326 		/* if we're growing, validate child name lengths */
   2327 		if (delta > 0)
   2328 			err = dmu_objset_find(oldname, dsl_valid_rename,
   2329 			    &delta, DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
   2330 
   2331 		if (!err)
   2332 			err = dsl_dir_rename(dd, newname);
   2333 		dsl_dir_close(dd, FTAG);
   2334 		return (err);
   2335 	}
   2336 	if (tail[0] != '@') {
   2337 		/* the name ended in a nonexistent component */
   2338 		dsl_dir_close(dd, FTAG);
   2339 		return (ENOENT);
   2340 	}
   2341 
   2342 	dsl_dir_close(dd, FTAG);
   2343 
   2344 	/* new name must be snapshot in same filesystem */
   2345 	tail = strchr(newname, '@');
   2346 	if (tail == NULL)
   2347 		return (EINVAL);
   2348 	tail++;
   2349 	if (strncmp(oldname, newname, tail - newname) != 0)
   2350 		return (EXDEV);
   2351 
   2352 	if (recursive) {
   2353 		err = dsl_recursive_rename(oldname, newname);
   2354 	} else {
   2355 		err = dsl_dataset_hold(oldname, FTAG, &ds);
   2356 		if (err)
   2357 			return (err);
   2358 
   2359 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
   2360 		    dsl_dataset_snapshot_rename_check,
   2361 		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
   2362 
   2363 		dsl_dataset_rele(ds, FTAG);
   2364 	}
   2365 
   2366 	return (err);
   2367 }
   2368 
   2369 struct promotenode {
   2370 	list_node_t link;
   2371 	dsl_dataset_t *ds;
   2372 };
   2373 
   2374 struct promotearg {
   2375 	list_t shared_snaps, origin_snaps, clone_snaps;
   2376 	dsl_dataset_t *origin_origin, *origin_head;
   2377 	uint64_t used, comp, uncomp, unique, cloneusedsnap, originusedsnap;
   2378 	char *err_ds;
   2379 };
   2380 
   2381 static int snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep);
   2382 
   2383 /* ARGSUSED */
   2384 static int
   2385 dsl_dataset_promote_check(void *arg1, void *arg2, dmu_tx_t *tx)
   2386 {
   2387 	dsl_dataset_t *hds = arg1;
   2388 	struct promotearg *pa = arg2;
   2389 	struct promotenode *snap = list_head(&pa->shared_snaps);
   2390 	dsl_dataset_t *origin_ds = snap->ds;
   2391 	int err;
   2392 
   2393 	/* Check that it is a real clone */
   2394 	if (!dsl_dir_is_clone(hds->ds_dir))
   2395 		return (EINVAL);
   2396 
   2397 	/* Since this is so expensive, don't do the preliminary check */
   2398 	if (!dmu_tx_is_syncing(tx))
   2399 		return (0);
   2400 
   2401 	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
   2402 		return (EXDEV);
   2403 
   2404 	/* compute origin's new unique space */
   2405 	snap = list_tail(&pa->clone_snaps);
   2406 	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
   2407 	err = bplist_space_birthrange(&snap->ds->ds_deadlist,
   2408 	    origin_ds->ds_phys->ds_prev_snap_txg, UINT64_MAX, &pa->unique);
   2409 	if (err)
   2410 		return (err);
   2411 
   2412 	/*
   2413 	 * Walk the snapshots that we are moving
   2414 	 *
   2415 	 * Compute space to transfer.  Consider the incremental changes
   2416 	 * to used for each snapshot:
   2417 	 * (my used) = (prev's used) + (blocks born) - (blocks killed)
   2418 	 * So each snapshot gave birth to:
   2419 	 * (blocks born) = (my used) - (prev's used) + (blocks killed)
   2420 	 * So a sequence would look like:
   2421 	 * (uN - u(N-1) + kN) + ... + (u1 - u0 + k1) + (u0 - 0 + k0)
   2422 	 * Which simplifies to:
   2423 	 * uN + kN + kN-1 + ... + k1 + k0
   2424 	 * Note however, if we stop before we reach the ORIGIN we get:
   2425 	 * uN + kN + kN-1 + ... + kM - uM-1
   2426 	 */
   2427 	pa->used = origin_ds->ds_phys->ds_used_bytes;
   2428 	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
   2429 	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
   2430 	for (snap = list_head(&pa->shared_snaps); snap;
   2431 	    snap = list_next(&pa->shared_snaps, snap)) {
   2432 		uint64_t val, dlused, dlcomp, dluncomp;
   2433 		dsl_dataset_t *ds = snap->ds;
   2434 
   2435 		/* Check that the snapshot name does not conflict */
   2436 		VERIFY(0 == dsl_dataset_get_snapname(ds));
   2437 		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
   2438 		if (err == 0) {
   2439 			err = EEXIST;
   2440 			goto out;
   2441 		}
   2442 		if (err != ENOENT)
   2443 			goto out;
   2444 
   2445 		/* The very first snapshot does not have a deadlist */
   2446 		if (ds->ds_phys->ds_prev_snap_obj == 0)
   2447 			continue;
   2448 
   2449 		if (err = bplist_space(&ds->ds_deadlist,
   2450 		    &dlused, &dlcomp, &dluncomp))
   2451 			goto out;
   2452 		pa->used += dlused;
   2453 		pa->comp += dlcomp;
   2454 		pa->uncomp += dluncomp;
   2455 	}
   2456 
   2457 	/*
   2458 	 * If we are a clone of a clone then we never reached ORIGIN,
   2459 	 * so we need to subtract out the clone origin's used space.
   2460 	 */
   2461 	if (pa->origin_origin) {
   2462 		pa->used -= pa->origin_origin->ds_phys->ds_used_bytes;
   2463 		pa->comp -= pa->origin_origin->ds_phys->ds_compressed_bytes;
   2464 		pa->uncomp -= pa->origin_origin->ds_phys->ds_uncompressed_bytes;
   2465 	}
   2466 
   2467 	/* Check that there is enough space here */
   2468 	err = dsl_dir_transfer_possible(origin_ds->ds_dir, hds->ds_dir,
   2469 	    pa->used);
   2470 	if (err)
   2471 		return (err);
   2472 
   2473 	/*
   2474 	 * Compute the amounts of space that will be used by snapshots
   2475 	 * after the promotion (for both origin and clone).  For each,
   2476 	 * it is the amount of space that will be on all of their
   2477 	 * deadlists (that was not born before their new origin).
   2478 	 */
   2479 	if (hds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
   2480 		uint64_t space;
   2481 
   2482 		/*
   2483 		 * Note, typically this will not be a clone of a clone,
   2484 		 * so snap->ds->ds_origin_txg will be < TXG_INITIAL, so
   2485 		 * these snaplist_space() -> bplist_space_birthrange()
   2486 		 * calls will be fast because they do not have to
   2487 		 * iterate over all bps.
   2488 		 */
   2489 		snap = list_head(&pa->origin_snaps);
   2490 		err = snaplist_space(&pa->shared_snaps,
   2491 		    snap->ds->ds_origin_txg, &pa->cloneusedsnap);
   2492 		if (err)
   2493 			return (err);
   2494 
   2495 		err = snaplist_space(&pa->clone_snaps,
   2496 		    snap->ds->ds_origin_txg, &space);
   2497 		if (err)
   2498 			return (err);
   2499 		pa->cloneusedsnap += space;
   2500 	}
   2501 	if (origin_ds->ds_dir->dd_phys->dd_flags & DD_FLAG_USED_BREAKDOWN) {
   2502 		err = snaplist_space(&pa->origin_snaps,
   2503 		    origin_ds->ds_phys->ds_creation_txg, &pa->originusedsnap);
   2504 		if (err)
   2505 			return (err);
   2506 	}
   2507 
   2508 	return (0);
   2509 out:
   2510 	pa->err_ds =  snap->ds->ds_snapname;
   2511 	return (err);
   2512 }
   2513 
   2514 static void
   2515 dsl_dataset_promote_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
   2516 {
   2517 	dsl_dataset_t *hds = arg1;
   2518 	struct promotearg *pa = arg2;
   2519 	struct promotenode *snap = list_head(&pa->shared_snaps);
   2520 	dsl_dataset_t *origin_ds = snap->ds;
   2521 	dsl_dataset_t *origin_head;
   2522 	dsl_dir_t *dd = hds->ds_dir;
   2523 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
   2524 	dsl_dir_t *odd = NULL;
   2525 	uint64_t oldnext_obj;
   2526 	int64_t delta;
   2527 
   2528 	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
   2529 
   2530 	snap = list_head(&pa->origin_snaps);
   2531 	origin_head = snap->ds;
   2532 
   2533 	/*
   2534 	 * We need to explicitly open odd, since origin_ds's dd will be
   2535 	 * changing.
   2536 	 */
   2537 	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
   2538 	    NULL, FTAG, &odd));
   2539 
   2540 	/* change origin's next snap */
   2541 	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
   2542 	oldnext_obj = origin_ds->ds_phys->ds_next_snap_obj;
   2543 	snap = list_tail(&pa->clone_snaps);
   2544 	ASSERT3U(snap->ds->ds_phys->ds_prev_snap_obj, ==, origin_ds->ds_object);
   2545 	origin_ds->ds_phys->ds_next_snap_obj = snap->ds->ds_object;
   2546 
   2547 	/* change the origin's next clone */
   2548 	if (origin_ds->ds_phys->ds_next_clones_obj) {
   2549 		remove_from_next_clones(origin_ds, snap->ds->ds_object, tx);
   2550 		VERIFY3U(0, ==, zap_add_int(dp->dp_meta_objset,
   2551 		    origin_ds->ds_phys->ds_next_clones_obj,
   2552 		    oldnext_obj, tx));
   2553 	}
   2554 
   2555 	/* change origin */
   2556 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
   2557 	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
   2558 	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
   2559 	hds->ds_origin_txg = origin_head->ds_origin_txg;
   2560 	dmu_buf_will_dirty(odd->dd_dbuf, tx);
   2561 	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
   2562 	origin_head->ds_origin_txg = origin_ds->ds_phys->ds_creation_txg;
   2563 
   2564 	/* move snapshots to this dir */
   2565 	for (snap = list_head(&pa->shared_snaps); snap;
   2566 	    snap = list_next(&pa->shared_snaps, snap)) {
   2567 		dsl_dataset_t *ds = snap->ds;
   2568 
   2569 		/* unregister props as dsl_dir is changing */
   2570 		if (ds->ds_objset) {
   2571 			dmu_objset_evict(ds->ds_objset);
   2572 			ds->ds_objset = NULL;
   2573 		}
   2574 		/* move snap name entry */
   2575 		VERIFY(0 == dsl_dataset_get_snapname(ds));
   2576 		VERIFY(0 == dsl_dataset_snap_remove(origin_head,
   2577 		    ds->ds_snapname, tx));
   2578 		VERIFY(0 == zap_add(dp->dp_meta_objset,
   2579 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
   2580 		    8, 1, &ds->ds_object, tx));
   2581 		/* change containing dsl_dir */
   2582 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
   2583 		ASSERT3U(ds->ds_phys->ds_dir_obj, ==, odd->dd_object);
   2584 		ds->ds_phys->ds_dir_obj = dd->dd_object;
   2585 		ASSERT3P(ds->ds_dir, ==, odd);
   2586 		dsl_dir_close(ds->ds_dir, ds);
   2587 		VERIFY(0 == dsl_dir_open_obj(dp, dd->dd_object,
   2588 		    NULL, ds, &ds->ds_dir));
   2589 
   2590 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
   2591 	}
   2592 
   2593 	/*
   2594 	 * Change space accounting.
   2595 	 * Note, pa->*usedsnap and dd_used_breakdown[SNAP] will either
   2596 	 * both be valid, or both be 0 (resulting in delta == 0).  This
   2597 	 * is true for each of {clone,origin} independently.
   2598 	 */
   2599 
   2600 	delta = pa->cloneusedsnap -
   2601 	    dd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
   2602 	ASSERT3S(delta, >=, 0);
   2603 	ASSERT3U(pa->used, >=, delta);
   2604 	dsl_dir_diduse_space(dd, DD_USED_SNAP, delta, 0, 0, tx);
   2605 	dsl_dir_diduse_space(dd, DD_USED_HEAD,
   2606 	    pa->used - delta, pa->comp, pa->uncomp, tx);
   2607 
   2608 	delta = pa->originusedsnap -
   2609 	    odd->dd_phys->dd_used_breakdown[DD_USED_SNAP];
   2610 	ASSERT3S(delta, <=, 0);
   2611 	ASSERT3U(pa->used, >=, -delta);
   2612 	dsl_dir_diduse_space(odd, DD_USED_SNAP, delta, 0, 0, tx);
   2613 	dsl_dir_diduse_space(odd, DD_USED_HEAD,
   2614 	    -pa->used - delta, -pa->comp, -pa->uncomp, tx);
   2615 
   2616 	origin_ds->ds_phys->ds_unique_bytes = pa->unique;
   2617 
   2618 	/* log history record */
   2619 	spa_history_internal_log(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
   2620 	    cr, "dataset = %llu", hds->ds_object);
   2621 
   2622 	dsl_dir_close(odd, FTAG);
   2623 }
   2624 
   2625 static char *snaplist_tag = "snaplist";
   2626 /*
   2627  * Make a list of dsl_dataset_t's for the snapshots between first_obj
   2628  * (exclusive) and last_obj (inclusive).  The list will be in reverse
   2629  * order (last_obj will be the list_head()).  If first_obj == 0, do all
   2630  * snapshots back to this dataset's origin.
   2631  */
   2632 static int
   2633 snaplist_make(dsl_pool_t *dp, boolean_t own,
   2634     uint64_t first_obj, uint64_t last_obj, list_t *l)
   2635 {
   2636 	uint64_t obj = last_obj;
   2637 
   2638 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock));
   2639 
   2640 	list_create(l, sizeof (struct promotenode),
   2641 	    offsetof(struct promotenode, link));
   2642 
   2643 	while (obj != first_obj) {
   2644 		dsl_dataset_t *ds;
   2645 		struct promotenode *snap;
   2646 		int err;
   2647 
   2648 		if (own) {
   2649 			err = dsl_dataset_own_obj(dp, obj,
   2650 			    0, snaplist_tag, &ds);
   2651 			if (err == 0)
   2652 				dsl_dataset_make_exclusive(ds, snaplist_tag);
   2653 		} else {
   2654 			err = dsl_dataset_hold_obj(dp, obj, snaplist_tag, &ds);
   2655 		}
   2656 		if (err == ENOENT) {
   2657 			/* lost race with snapshot destroy */
   2658 			struct promotenode *last = list_tail(l);
   2659 			ASSERT(obj != last->ds->ds_phys->ds_prev_snap_obj);
   2660 			obj = last->ds->ds_phys->ds_prev_snap_obj;
   2661 			continue;
   2662 		} else if (err) {
   2663 			return (err);
   2664 		}
   2665 
   2666 		if (first_obj == 0)
   2667 			first_obj = ds->ds_dir->dd_phys->dd_origin_obj;
   2668 
   2669 		snap = kmem_alloc(sizeof (struct promotenode), KM_SLEEP);
   2670 		snap->ds = ds;
   2671 		list_insert_tail(l, snap);
   2672 		obj = ds->ds_phys->ds_prev_snap_obj;
   2673 	}
   2674 
   2675 	return (0);
   2676 }
   2677 
   2678 static int
   2679 snaplist_space(list_t *l, uint64_t mintxg, uint64_t *spacep)
   2680 {
   2681 	struct promotenode *snap;
   2682 
   2683 	*spacep = 0;
   2684 	for (snap = list_head(l); snap; snap = list_next(l, snap)) {
   2685 		uint64_t used;
   2686 		int err = bplist_space_birthrange(&snap->ds->ds_deadlist,
   2687 		    mintxg, UINT64_MAX, &used);
   2688 		if (err)
   2689 			return (err);
   2690 		*spacep += used;
   2691 	}
   2692 	return (0);
   2693 }
   2694 
   2695 static void
   2696 snaplist_destroy(list_t *l, boolean_t own)
   2697 {
   2698 	struct promotenode *snap;
   2699 
   2700 	if (!l || !list_link_active(&l->list_head))
   2701 		return;
   2702 
   2703 	while ((snap = list_tail(l)) != NULL) {
   2704 		list_remove(l, snap);
   2705 		if (own)
   2706 			dsl_dataset_disown(snap->ds, snaplist_tag);
   2707 		else
   2708 			dsl_dataset_rele(snap->ds, snaplist_tag);
   2709 		kmem_free(snap, sizeof (struct promotenode));
   2710 	}
   2711 	list_destroy(l);
   2712 }
   2713 
   2714 /*
   2715  * Promote a clone.  Nomenclature note:
   2716  * "clone" or "cds": the original clone which is being promoted
   2717  * "origin" or "ods": the snapshot which is originally clone's origin
   2718  * "origin head" or "ohds": the dataset which is the head
   2719  * (filesystem/volume) for the origin
   2720  * "origin origin": the origin of the origin's filesystem (typically
   2721  * NULL, indicating that the clone is not a clone of a clone).
   2722  */
   2723 int
   2724 dsl_dataset_promote(const char *name, char *conflsnap)
   2725 {
   2726 	dsl_dataset_t *ds;
   2727 	dsl_dir_t *dd;
   2728 	dsl_pool_t *dp;
   2729 	dmu_object_info_t doi;
   2730 	struct promotearg pa = { 0 };
   2731 	struct promotenode *snap;
   2732 	int err;
   2733 
   2734 	err = dsl_dataset_hold(name, FTAG, &ds);
   2735 	if (err)
   2736 		return (err);
   2737 	dd = ds->ds_dir;
   2738 	dp = dd->dd_pool;
   2739 
   2740 	err = dmu_object_info(dp->dp_meta_objset,
   2741 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
   2742 	if (err) {
   2743 		dsl_dataset_rele(ds, FTAG);
   2744 		return (err);
   2745 	}
   2746 
   2747 	if (dsl_dataset_is_snapshot(ds) || dd->dd_phys->dd_origin_obj == 0) {
   2748 		dsl_dataset_rele(ds, FTAG);
   2749 		return (EINVAL);
   2750 	}
   2751 
   2752 	/*
   2753 	 * We are going to inherit all the snapshots taken before our
   2754 	 * origin (i.e., our new origin will be our parent's origin).
   2755 	 * Take ownership of them so that we can rename them into our
   2756 	 * namespace.
   2757 	 */
   2758 	rw_enter(&dp->dp_config_rwlock, RW_READER);
   2759 
   2760 	err = snaplist_make(dp, B_TRUE, 0, dd->dd_phys->dd_origin_obj,
   2761 	    &pa.shared_snaps);
   2762 	if (err != 0)
   2763 		goto out;
   2764 
   2765 	err = snaplist_make(dp, B_FALSE, 0, ds->ds_object, &pa.clone_snaps);
   2766 	if (err != 0)
   2767 		goto out;
   2768 
   2769 	snap = list_head(&pa.shared_snaps);
   2770 	ASSERT3U(snap->ds->ds_object, ==, dd->dd_phys->dd_origin_obj);
   2771 	err = snaplist_make(dp, B_FALSE, dd->dd_phys->dd_origin_obj,
   2772 	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, &pa.origin_snaps);
   2773 	if (err != 0)
   2774 		goto out;
   2775 
   2776 	if (dsl_dir_is_clone(snap->ds->ds_dir)) {
   2777 		err = dsl_dataset_own_obj(dp,
   2778 		    snap->ds->ds_dir->dd_phys->dd_origin_obj,
   2779 		    0, FTAG, &pa.origin_origin);
   2780 		if (err != 0)
   2781 			goto out;
   2782 	}
   2783 
   2784 out:
   2785 	rw_exit(&dp->dp_config_rwlock);
   2786 
   2787 	/*
   2788 	 * Add in 128x the snapnames zapobj size, since we will be moving
   2789 	 * a bunch of snapnames to the promoted ds, and dirtying their
   2790 	 * bonus buffers.
   2791 	 */
   2792 	if (err == 0) {
   2793 		err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
   2794 		    dsl_dataset_promote_sync, ds, &pa,
   2795 		    2 + 2 * doi.doi_physical_blocks_512);
   2796 		if (err && pa.err_ds && conflsnap)
   2797 			(void) strncpy(conflsnap, pa.err_ds, MAXNAMELEN);
   2798 	}
   2799 
   2800 	snaplist_destroy(&pa.shared_snaps, B_TRUE);
   2801 	snaplist_destroy(&pa.clone_snaps, B_FALSE);
   2802 	snaplist_destroy(&pa.origin_snaps, B_FALSE);
   2803 	if (pa.origin_origin)
   2804 		dsl_dataset_disown(pa.origin_origin, FTAG);
   2805 	dsl_dataset_rele(ds, FTAG);
   2806 	return (err);
   2807 }
   2808 
   2809 struct cloneswaparg {
   2810 	dsl_dataset_t *cds; /* clone dataset */
   2811 	dsl_dataset_t *ohds; /* origin's head dataset */
   2812 	boolean_t force;
   2813 	int64_t unused_refres_delta; /* change in unconsumed refreservation */
   2814 };
   2815 
   2816 /* ARGSUSED */
   2817 static int
   2818 dsl_dataset_clone_swap_check(void *arg1, void *arg2, dmu_tx_t *tx)
   2819 {
   2820 	struct cloneswaparg *csa = arg1;
   2821 
   2822 	/* they should both be heads */
   2823 	if (dsl_dataset_is_snapshot(csa->cds) ||
   2824 	    dsl_dataset_is_snapshot(csa->ohds))
   2825 		return (EINVAL);
   2826 
   2827 	/* the branch point should be just before them */
   2828 	if (csa->cds->ds_prev != csa->ohds->ds_prev)
   2829 		return (EINVAL);
   2830 
   2831 	/* cds should be the clone (unless they are unrelated) */
   2832 	if (csa->cds->ds_prev != NULL &&
   2833 	    csa->cds->ds_prev != csa->cds->ds_dir->dd_pool->dp_origin_snap &&
   2834 	    csa->ohds->ds_object !=
   2835 	    csa->cds->ds_prev->ds_phys->ds_next_snap_obj)
   2836 		return (EINVAL);
   2837 
   2838 	/* the clone should be a child of the origin */
   2839 	if (csa->cds->ds_dir->dd_parent != csa->ohds->ds_dir)
   2840 		return (EINVAL);
   2841 
   2842 	/* ohds shouldn't be modified unless 'force' */
   2843 	if (!csa->force && dsl_dataset_modified_since_lastsnap(csa->ohds))
   2844 		return (ETXTBSY);
   2845 
   2846 	/* adjust amount of any unconsumed refreservation */
   2847 	csa->unused_refres_delta =
   2848 	    (int64_t)MIN(csa->ohds->ds_reserved,
   2849 	    csa->ohds->ds_phys->ds_unique_bytes) -
   2850 	    (int64_t)MIN(csa->ohds->ds_reserved,
   2851 	    csa->cds->ds_phys->ds_unique_bytes);
   2852 
   2853 	if (csa->unused_refres_delta > 0 &&
   2854 	    csa->unused_refres_delta >
   2855 	    dsl_dir_space_available(csa->ohds->ds_dir, NULL, 0, TRUE))
   2856 		return (ENOSPC);
   2857 
   2858 	if (csa->ohds->ds_quota != 0 &&
   2859 	    csa->cds->ds_phys->ds_unique_bytes > csa->ohds->ds_quota)
   2860 		return (EDQUOT);
   2861 
   2862 	return (0);
   2863 }
   2864 
   2865 /* ARGSUSED */
   2866 static void
   2867 dsl_dataset_clone_swap_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
   2868 {
   2869 	struct cloneswaparg *csa = arg1;
   2870 	dsl_pool_t *dp = csa->cds->ds_dir->dd_pool;
   2871 
   2872 	ASSERT(csa->cds->ds_reserved == 0);
   2873 	ASSERT(csa->ohds->ds_quota == 0 ||
   2874 	    csa->cds->ds_phys->ds_unique_bytes <= csa->ohds->ds_quota);
   2875 
   2876 	dmu_buf_will_dirty(csa->cds->ds_dbuf, tx);
   2877 	dmu_buf_will_dirty(csa->ohds->ds_dbuf, tx);
   2878 
   2879 	if (csa->cds->ds_objset != NULL) {
   2880 		dmu_objset_evict(csa->cds->ds_objset);
   2881 		csa->cds->ds_objset = NULL;
   2882 	}
   2883 
   2884 	if (csa->ohds->ds_objset != NULL) {
   2885 		dmu_objset_evict(csa->ohds->ds_objset);
   2886 		csa->ohds->ds_objset = NULL;
   2887 	}
   2888 
   2889 	/*
   2890 	 * Reset origin's unique bytes, if it exists.
   2891 	 */
   2892 	if (csa->cds->ds_prev) {
   2893 		dsl_dataset_t *origin = csa->cds->ds_prev;
   2894 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
   2895 		VERIFY(0 == bplist_space_birthrange(&csa->cds->ds_deadlist,
   2896 		    origin->ds_phys->ds_prev_snap_txg, UINT64_MAX,
   2897 		    &origin->ds_phys->ds_unique_bytes));
   2898 	}
   2899 
   2900 	/* swap blkptrs */
   2901 	{
   2902 		blkptr_t tmp;
   2903 		tmp = csa->ohds->ds_phys->ds_bp;
   2904 		csa->ohds->ds_phys->ds_bp = csa->cds->ds_phys->ds_bp;
   2905 		csa->cds->ds_phys->ds_bp = tmp;
   2906 	}
   2907 
   2908 	/* set dd_*_bytes */
   2909 	{
   2910 		int64_t dused, dcomp, duncomp;
   2911 		uint64_t cdl_used, cdl_comp, cdl_uncomp;
   2912 		uint64_t odl_used, odl_comp, odl_uncomp;
   2913 
   2914 		ASSERT3U(csa->cds->ds_dir->dd_phys->
   2915 		    dd_used_breakdown[DD_USED_SNAP], ==, 0);
   2916 
   2917 		VERIFY(0 == bplist_space(&csa->cds->ds_deadlist, &cdl_used,
   2918 		    &cdl_comp, &cdl_uncomp));
   2919 		VERIFY(0 == bplist_space(&csa->ohds->ds_deadlist, &odl_used,
   2920 		    &odl_comp, &odl_uncomp));
   2921 
   2922 		dused = csa->cds->ds_phys->ds_used_bytes + cdl_used -
   2923 		    (csa->ohds->ds_phys->ds_used_bytes + odl_used);
   2924 		dcomp = csa->cds->ds_phys->ds_compressed_bytes + cdl_comp -
   2925 		    (csa->ohds->ds_phys->ds_compressed_bytes + odl_comp);
   2926 		duncomp = csa->cds->ds_phys->ds_uncompressed_bytes +
   2927 		    cdl_uncomp -
   2928 		    (csa->ohds->ds_phys->ds_uncompressed_bytes + odl_uncomp);
   2929 
   2930 		dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_HEAD,
   2931 		    dused, dcomp, duncomp, tx);
   2932 		dsl_dir_diduse_space(csa->cds->ds_dir, DD_USED_HEAD,
   2933 		    -dused, -dcomp, -duncomp, tx);
   2934 
   2935 		/*
   2936 		 * The difference in the space used by snapshots is the
   2937 		 * difference in snapshot space due to the head's
   2938 		 * deadlist (since that's the only thing that's
   2939 		 * changing that affects the snapused).
   2940 		 */
   2941 		VERIFY(0 == bplist_space_birthrange(&csa->cds->ds_deadlist,
   2942 		    csa->ohds->ds_origin_txg, UINT64_MAX, &cdl_used));
   2943 		VERIFY(0 == bplist_space_birthrange(&csa->ohds->ds_deadlist,
   2944 		    csa->ohds->ds_origin_txg, UINT64_MAX, &odl_used));
   2945 		dsl_dir_transfer_space(csa->ohds->ds_dir, cdl_used - odl_used,
   2946 		    DD_USED_HEAD, DD_USED_SNAP, tx);
   2947 	}
   2948 
   2949 #define	SWITCH64(x, y) \
   2950 	{ \
   2951 		uint64_t __tmp = (x); \
   2952 		(x) = (y); \
   2953 		(y) = __tmp; \
   2954 	}
   2955 
   2956 	/* swap ds_*_bytes */
   2957 	SWITCH64(csa->ohds->ds_phys->ds_used_bytes,
   2958 	    csa->cds->ds_phys->ds_used_bytes);
   2959 	SWITCH64(csa->ohds->ds_phys->ds_compressed_bytes,
   2960 	    csa->cds->ds_phys->ds_compressed_bytes);
   2961 	SWITCH64(csa->ohds->ds_phys->ds_uncompressed_bytes,
   2962 	    csa->cds->ds_phys->ds_uncompressed_bytes);
   2963 	SWITCH64(csa->ohds->ds_phys->ds_unique_bytes,
   2964 	    csa->cds->ds_phys->ds_unique_bytes);
   2965 
   2966 	/* apply any parent delta for change in unconsumed refreservation */
   2967 	dsl_dir_diduse_space(csa->ohds->ds_dir, DD_USED_REFRSRV,
   2968 	    csa->unused_refres_delta, 0, 0, tx);
   2969 
   2970 	/* swap deadlists */
   2971 	bplist_close(&csa->cds->ds_deadlist);
   2972 	bplist_close(&csa->ohds->ds_deadlist);
   2973 	SWITCH64(csa->ohds->ds_phys->ds_deadlist_obj,
   2974 	    csa->cds->ds_phys->ds_deadlist_obj);
   2975 	VERIFY(0 == bplist_open(&csa->cds->ds_deadlist, dp->dp_meta_objset,
   2976 	    csa->cds->ds_phys->ds_deadlist_obj));
   2977 	VERIFY(0 == bplist_open(&csa->ohds->ds_deadlist, dp->dp_meta_objset,
   2978 	    csa->ohds->ds_phys->ds_deadlist_obj));
   2979 
   2980 	dsl_pool_ds_clone_swapped(csa->ohds, csa->cds, tx);
   2981 }
   2982 
   2983 /*
   2984  * Swap 'clone' with its origin head datasets.  Used at the end of "zfs
   2985  * recv" into an existing fs to swizzle the file system to the new
   2986  * version, and by "zfs rollback".  Can also be used to swap two
   2987  * independent head datasets if neither has any snapshots.
   2988  */
   2989 int
   2990 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
   2991     boolean_t force)
   2992 {
   2993 	struct cloneswaparg csa;
   2994 	int error;
   2995 
   2996 	ASSERT(clone->ds_owner);
   2997 	ASSERT(origin_head->ds_owner);
   2998 retry:
   2999 	/* Need exclusive access for the swap */
   3000 	rw_enter(&clone->ds_rwlock, RW_WRITER);
   3001 	if (!rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
   3002 		rw_exit(&clone->ds_rwlock);
   3003 		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
   3004 		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
   3005 			rw_exit(&origin_head->ds_rwlock);
   3006 			goto retry;
   3007 		}
   3008 	}
   3009 	csa.cds = clone;
   3010 	csa.ohds = origin_head;
   3011 	csa.force = force;
   3012 	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
   3013 	    dsl_dataset_clone_swap_check,
   3014 	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
   3015 	return (error);
   3016 }
   3017 
   3018 /*
   3019  * Given a pool name and a dataset object number in that pool,
   3020  * return the name of that dataset.
   3021  */
   3022 int
   3023 dsl_dsobj_to_dsname(char *pname, uint64_t obj, char *buf)
   3024 {
   3025 	spa_t *spa;
   3026 	dsl_pool_t *dp;
   3027 	dsl_dataset_t *ds;
   3028 	int error;
   3029 
   3030 	if ((error = spa_open(pname, &spa, FTAG)) != 0)
   3031 		return (error);
   3032 	dp = spa_get_dsl(spa);
   3033 	rw_enter(&dp->dp_config_rwlock, RW_READER);
   3034 	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
   3035 		dsl_dataset_name(ds, buf);
   3036 		dsl_dataset_rele(ds, FTAG);
   3037 	}
   3038 	rw_exit(&dp->dp_config_rwlock);
   3039 	spa_close(spa, FTAG);
   3040 
   3041 	return (error);
   3042 }
   3043 
   3044 int
   3045 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
   3046     uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
   3047 {
   3048 	int error = 0;
   3049 
   3050 	ASSERT3S(asize, >, 0);
   3051 
   3052 	/*
   3053 	 * *ref_rsrv is the portion of asize that will come from any
   3054 	 * unconsumed refreservation space.
   3055 	 */
   3056 	*ref_rsrv = 0;
   3057 
   3058 	mutex_enter(&ds->ds_lock);
   3059 	/*
   3060 	 * Make a space adjustment for reserved bytes.
   3061 	 */
   3062 	if (ds->ds_reserved > ds->ds_phys->ds_unique_bytes) {
   3063 		ASSERT3U(*used, >=,
   3064 		    ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
   3065 		*used -= (ds->ds_reserved - ds->ds_phys->ds_unique_bytes);
   3066 		*ref_rsrv =
   3067 		    asize - MIN(asize, parent_delta(ds, asize + inflight));
   3068 	}
   3069 
   3070 	if (!check_quota || ds->ds_quota == 0) {
   3071 		mutex_exit(&ds->ds_lock);
   3072 		return (0);
   3073 	}
   3074 	/*
   3075 	 * If they are requesting more space, and our current estimate
   3076 	 * is over quota, they get to try again unless the actual
   3077 	 * on-disk is over quota and there are no pending changes (which
   3078 	 * may free up space for us).
   3079 	 */
   3080 	if (ds->ds_phys->ds_used_bytes + inflight >= ds->ds_quota) {
   3081 		if (inflight > 0 || ds->ds_phys->ds_used_bytes < ds->ds_quota)
   3082 			error = ERESTART;
   3083 		else
   3084 			error = EDQUOT;
   3085 	}
   3086 	mutex_exit(&ds->ds_lock);
   3087 
   3088 	return (error);
   3089 }
   3090 
   3091 /* ARGSUSED */
   3092 static int
   3093 dsl_dataset_set_quota_check(void *arg1, void *arg2, dmu_tx_t *tx)
   3094 {
   3095 	dsl_dataset_t *ds = arg1;
   3096 	dsl_prop_setarg_t *psa = arg2;
   3097 	int err;
   3098 
   3099 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_REFQUOTA)
   3100 		return (ENOTSUP);
   3101 
   3102 	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
   3103 		return (err);
   3104 
   3105 	if (psa->psa_effective_value == 0)
   3106 		return (0);
   3107 
   3108 	if (psa->psa_effective_value < ds->ds_phys->ds_used_bytes ||
   3109 	    psa->psa_effective_value < ds->ds_reserved)
   3110 		return (ENOSPC);
   3111 
   3112 	return (0);
   3113 }
   3114 
   3115 extern void dsl_prop_set_sync(void *, void *, cred_t *, dmu_tx_t *);
   3116 
   3117 void
   3118 dsl_dataset_set_quota_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
   3119 {
   3120 	dsl_dataset_t *ds = arg1;
   3121 	dsl_prop_setarg_t *psa = arg2;
   3122 	uint64_t effective_value = psa->psa_effective_value;
   3123 
   3124 	dsl_prop_set_sync(ds, psa, cr, tx);
   3125 	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
   3126 
   3127 	if (ds->ds_quota != effective_value) {
   3128 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
   3129 		ds->ds_quota = effective_value;
   3130 
   3131 		spa_history_internal_log(LOG_DS_REFQUOTA,
   3132 		    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu ",
   3133 		    (longlong_t)ds->ds_quota, ds->ds_object);
   3134 	}
   3135 }
   3136 
   3137 int
   3138 dsl_dataset_set_quota(const char *dsname, zprop_source_t source, uint64_t quota)
   3139 {
   3140 	dsl_dataset_t *ds;
   3141 	dsl_prop_setarg_t psa;
   3142 	int err;
   3143 
   3144 	dsl_prop_setarg_init_uint64(&psa, "refquota", source, &quota);
   3145 
   3146 	err = dsl_dataset_hold(dsname, FTAG, &ds);
   3147 	if (err)
   3148 		return (err);
   3149 
   3150 	/*
   3151 	 * If someone removes a file, then tries to set the quota, we
   3152 	 * want to make sure the file freeing takes effect.
   3153 	 */
   3154 	txg_wait_open(ds->ds_dir->dd_pool, 0);
   3155 
   3156 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
   3157 	    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
   3158 	    ds, &psa, 0);
   3159 
   3160 	dsl_dataset_rele(ds, FTAG);
   3161 	return (err);
   3162 }
   3163 
   3164 static int
   3165 dsl_dataset_set_reservation_check(void *arg1, void *arg2, dmu_tx_t *tx)
   3166 {
   3167 	dsl_dataset_t *ds = arg1;
   3168 	dsl_prop_setarg_t *psa = arg2;
   3169 	uint64_t effective_value;
   3170 	uint64_t unique;
   3171 	int err;
   3172 
   3173 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) <
   3174 	    SPA_VERSION_REFRESERVATION)
   3175 		return (ENOTSUP);
   3176 
   3177 	if (dsl_dataset_is_snapshot(ds))
   3178 		return (EINVAL);
   3179 
   3180 	if ((err = dsl_prop_predict_sync(ds->ds_dir, psa)) != 0)
   3181 		return (err);
   3182 
   3183 	effective_value = psa->psa_effective_value;
   3184 
   3185 	/*
   3186 	 * If we are doing the preliminary check in open context, the
   3187 	 * space estimates may be inaccurate.
   3188 	 */
   3189 	if (!dmu_tx_is_syncing(tx))
   3190 		return (0);
   3191 
   3192 	mutex_enter(&ds->ds_lock);
   3193 	unique = dsl_dataset_unique(ds);
   3194 	mutex_exit(&ds->ds_lock);
   3195 
   3196 	if (MAX(unique, effective_value) > MAX(unique, ds->ds_reserved)) {
   3197 		uint64_t delta = MAX(unique, effective_value) -
   3198 		    MAX(unique, ds->ds_reserved);
   3199 
   3200 		if (delta > dsl_dir_space_available(ds->ds_dir, NULL, 0, TRUE))
   3201 			return (ENOSPC);
   3202 		if (ds->ds_quota > 0 &&
   3203 		    effective_value > ds->ds_quota)
   3204 			return (ENOSPC);
   3205 	}
   3206 
   3207 	return (0);
   3208 }
   3209 
   3210 /* ARGSUSED */
   3211 static void
   3212 dsl_dataset_set_reservation_sync(void *arg1, void *arg2, cred_t *cr,
   3213     dmu_tx_t *tx)
   3214 {
   3215 	dsl_dataset_t *ds = arg1;
   3216 	dsl_prop_setarg_t *psa = arg2;
   3217 	uint64_t effective_value = psa->psa_effective_value;
   3218 	uint64_t unique;
   3219 	int64_t delta;
   3220 
   3221 	dsl_prop_set_sync(ds, psa, cr, tx);
   3222 	DSL_PROP_CHECK_PREDICTION(ds->ds_dir, psa);
   3223 
   3224 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
   3225 
   3226 	mutex_enter(&ds->ds_dir->dd_lock);
   3227 	mutex_enter(&ds->ds_lock);
   3228 	unique = dsl_dataset_unique(ds);
   3229 	delta = MAX(0, (int64_t)(effective_value - unique)) -
   3230 	    MAX(0, (int64_t)(ds->ds_reserved - unique));
   3231 	ds->ds_reserved = effective_value;
   3232 	mutex_exit(&ds->ds_lock);
   3233 
   3234 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_REFRSRV, delta, 0, 0, tx);
   3235 	mutex_exit(&ds->ds_dir->dd_lock);
   3236 
   3237 	spa_history_internal_log(LOG_DS_REFRESERV,
   3238 	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "%lld dataset = %llu",
   3239 	    (longlong_t)effective_value, ds->ds_object);
   3240 }
   3241 
   3242 int
   3243 dsl_dataset_set_reservation(const char *dsname, zprop_source_t source,
   3244     uint64_t reservation)
   3245 {
   3246 	dsl_dataset_t *ds;
   3247 	dsl_prop_setarg_t psa;
   3248 	int err;
   3249 
   3250 	dsl_prop_setarg_init_uint64(&psa, "refreservation", source,
   3251 	    &reservation);
   3252 
   3253 	err = dsl_dataset_hold(dsname, FTAG, &ds);
   3254 	if (err)
   3255 		return (err);
   3256 
   3257 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
   3258 	    dsl_dataset_set_reservation_check,
   3259 	    dsl_dataset_set_reservation_sync, ds, &psa, 0);
   3260 
   3261 	dsl_dataset_rele(ds, FTAG);
   3262 	return (err);
   3263 }
   3264 
   3265 struct dsl_ds_holdarg {
   3266 	dsl_sync_task_group_t *dstg;
   3267 	char *htag;
   3268 	char *snapname;
   3269 	boolean_t recursive;
   3270 	boolean_t gotone;
   3271 	boolean_t temphold;
   3272 	char failed[MAXPATHLEN];
   3273 };
   3274 
   3275 /*
   3276  * The max length of a temporary tag prefix is the number of hex digits
   3277  * required to express UINT64_MAX plus one for the hyphen.
   3278  */
   3279 #define	MAX_TAG_PREFIX_LEN	17
   3280 
   3281 static int
   3282 dsl_dataset_user_hold_check(void *arg1, void *arg2, dmu_tx_t *tx)
   3283 {
   3284 	dsl_dataset_t *ds = arg1;
   3285 	struct dsl_ds_holdarg *ha = arg2;
   3286 	char *htag = ha->htag;
   3287 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
   3288 	int error = 0;
   3289 
   3290 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
   3291 		return (ENOTSUP);
   3292 
   3293 	if (!dsl_dataset_is_snapshot(ds))
   3294 		return (EINVAL);
   3295 
   3296 	/* tags must be unique */
   3297 	mutex_enter(&ds->ds_lock);
   3298 	if (ds->ds_phys->ds_userrefs_obj) {
   3299 		error = zap_lookup(mos, ds->ds_phys->ds_userrefs_obj, htag,
   3300 		    8, 1, tx);
   3301 		if (error == 0)
   3302 			error = EEXIST;
   3303 		else if (error == ENOENT)
   3304 			error = 0;
   3305 	}
   3306 	mutex_exit(&ds->ds_lock);
   3307 
   3308 	if (error == 0 && ha->temphold &&
   3309 	    strlen(htag) + MAX_TAG_PREFIX_LEN >= MAXNAMELEN)
   3310 		error = E2BIG;
   3311 
   3312 	return (error);
   3313 }
   3314 
   3315 static void
   3316 dsl_dataset_user_hold_sync(void *arg1, void *arg2, cred_t *cr, dmu_tx_t *tx)
   3317 {
   3318 	dsl_dataset_t *ds = arg1;
   3319 	struct dsl_ds_holdarg *ha = arg2;
   3320 	char *htag = ha->htag;
   3321 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
   3322 	objset_t *mos = dp->dp_meta_objset;
   3323 	uint64_t now = gethrestime_sec();
   3324 	uint64_t zapobj;
   3325 
   3326 	mutex_enter(&ds->ds_lock);
   3327 	if (ds->ds_phys->ds_userrefs_obj == 0) {
   3328 		/*
   3329 		 * This is the first user hold for this dataset.  Create
   3330 		 * the userrefs zap object.
   3331 		 */
   3332 		dmu_buf_will_dirty(ds->ds_dbuf, tx);
   3333 		zapobj = ds->ds_phys->ds_userrefs_obj =
   3334 		    zap_create(mos, DMU_OT_USERREFS, DMU_OT_NONE, 0, tx);
   3335 	} else {
   3336 		zapobj = ds->ds_phys->ds_userrefs_obj;
   3337 	}
   3338 	ds->ds_userrefs++;
   3339 	mutex_exit(&ds->ds_lock);
   3340 
   3341 	VERIFY(0 == zap_add(mos, zapobj, htag, 8, 1, &now, tx));
   3342 
   3343 	if (ha->temphold) {
   3344 		VERIFY(0 == dsl_pool_user_hold(dp, ds->ds_object,
   3345 		    htag, &now, tx));
   3346 	}
   3347 
   3348 	spa_history_internal_log(LOG_DS_USER_HOLD,
   3349 	    dp->dp_spa, tx, cr, "<%s> temp = %d dataset = %llu", htag,
   3350 	    (int)ha->temphold, ds->ds_object);
   3351 }
   3352 
   3353 static int
   3354 dsl_dataset_user_hold_one(const char *dsname, void *arg)
   3355 {
   3356 	struct dsl_ds_holdarg *ha = arg;
   3357 	dsl_dataset_t *ds;
   3358 	int error;
   3359 	char *name;
   3360 
   3361 	/* alloc a buffer to hold dsname@snapname plus terminating NULL */
   3362 	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
   3363 	error = dsl_dataset_hold(name, ha->dstg, &ds);
   3364 	strfree(name);
   3365 	if (error == 0) {
   3366 		ha->gotone = B_TRUE;
   3367 		dsl_sync_task_create(ha->dstg, dsl_dataset_user_hold_check,
   3368 		    dsl_dataset_user_hold_sync, ds, ha, 0);
   3369 	} else if (error == ENOENT && ha->recursive) {
   3370 		error = 0;
   3371 	} else {
   3372 		(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
   3373 	}
   3374 	return (error);
   3375 }
   3376 
   3377 int
   3378 dsl_dataset_user_hold(char *dsname, char *snapname, char *htag,
   3379     boolean_t recursive, boolean_t temphold)
   3380 {
   3381 	struct dsl_ds_holdarg *ha;
   3382 	dsl_sync_task_t *dst;
   3383 	spa_t *spa;
   3384 	int error;
   3385 
   3386 	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
   3387 
   3388 	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
   3389 
   3390 	error = spa_open(dsname, &spa, FTAG);
   3391 	if (error) {
   3392 		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
   3393 		return (error);
   3394 	}
   3395 
   3396 	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
   3397 	ha->htag = htag;
   3398 	ha->snapname = snapname;
   3399 	ha->recursive = recursive;
   3400 	ha->temphold = temphold;
   3401 	if (recursive) {
   3402 		error = dmu_objset_find(dsname, dsl_dataset_user_hold_one,
   3403 		    ha, DS_FIND_CHILDREN);
   3404 	} else {
   3405 		error = dsl_dataset_user_hold_one(dsname, ha);
   3406 	}
   3407 	if (error == 0)
   3408 		error = dsl_sync_task_group_wait(ha->dstg);
   3409 
   3410 	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
   3411 	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
   3412 		dsl_dataset_t *ds = dst->dst_arg1;
   3413 
   3414 		if (dst->dst_err) {
   3415 			dsl_dataset_name(ds, ha->failed);
   3416 			*strchr(ha->failed, '@') = '\0';
   3417 		}
   3418 		dsl_dataset_rele(ds, ha->dstg);
   3419 	}
   3420 
   3421 	if (error == 0 && recursive && !ha->gotone)
   3422 		error = ENOENT;
   3423 
   3424 	if (error)
   3425 		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
   3426 
   3427 	dsl_sync_task_group_destroy(ha->dstg);
   3428 	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
   3429 	spa_close(spa, FTAG);
   3430 	return (error);
   3431 }
   3432 
   3433 struct dsl_ds_releasearg {
   3434 	dsl_dataset_t *ds;
   3435 	const char *htag;
   3436 	boolean_t own;		/* do we own or just hold ds? */
   3437 };
   3438 
   3439 static int
   3440 dsl_dataset_release_might_destroy(dsl_dataset_t *ds, const char *htag,
   3441     boolean_t *might_destroy)
   3442 {
   3443 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
   3444 	uint64_t zapobj;
   3445 	uint64_t tmp;
   3446 	int error;
   3447 
   3448 	*might_destroy = B_FALSE;
   3449 
   3450 	mutex_enter(&ds->ds_lock);
   3451 	zapobj = ds->ds_phys->ds_userrefs_obj;
   3452 	if (zapobj == 0) {
   3453 		/* The tag can't possibly exist */
   3454 		mutex_exit(&ds->ds_lock);
   3455 		return (ESRCH);
   3456 	}
   3457 
   3458 	/* Make sure the tag exists */
   3459 	error = zap_lookup(mos, zapobj, htag, 8, 1, &tmp);
   3460 	if (error) {
   3461 		mutex_exit(&ds->ds_lock);
   3462 		if (error == ENOENT)
   3463 			error = ESRCH;
   3464 		return (error);
   3465 	}
   3466 
   3467 	if (ds->ds_userrefs == 1 && ds->ds_phys->ds_num_children == 1 &&
   3468 	    DS_IS_DEFER_DESTROY(ds))
   3469 		*might_destroy = B_TRUE;
   3470 
   3471 	mutex_exit(&ds->ds_lock);
   3472 	return (0);
   3473 }
   3474 
   3475 static int
   3476 dsl_dataset_user_release_check(void *arg1, void *tag, dmu_tx_t *tx)
   3477 {
   3478 	struct dsl_ds_releasearg *ra = arg1;
   3479 	dsl_dataset_t *ds = ra->ds;
   3480 	boolean_t might_destroy;
   3481 	int error;
   3482 
   3483 	if (spa_version(ds->ds_dir->dd_pool->dp_spa) < SPA_VERSION_USERREFS)
   3484 		return (ENOTSUP);
   3485 
   3486 	error = dsl_dataset_release_might_destroy(ds, ra->htag, &might_destroy);
   3487 	if (error)
   3488 		return (error);
   3489 
   3490 	if (might_destroy) {
   3491 		struct dsl_ds_destroyarg dsda = {0};
   3492 
   3493 		if (dmu_tx_is_syncing(tx)) {
   3494 			/*
   3495 			 * If we're not prepared to remove the snapshot,
   3496 			 * we can't allow the release to happen right now.
   3497 			 */
   3498 			if (!ra->own)
   3499 				return (EBUSY);
   3500 			if (ds->ds_objset) {
   3501 				dmu_objset_evict(ds->ds_objset);
   3502 				ds->ds_objset = NULL;
   3503 			}
   3504 		}
   3505 		dsda.ds = ds;
   3506 		dsda.releasing = B_TRUE;
   3507 		return (dsl_dataset_destroy_check(&dsda, tag, tx));
   3508 	}
   3509 
   3510 	return (0);
   3511 }
   3512 
   3513 static void
   3514 dsl_dataset_user_release_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
   3515 {
   3516 	struct dsl_ds_releasearg *ra = arg1;
   3517 	dsl_dataset_t *ds = ra->ds;
   3518 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
   3519 	objset_t *mos = dp->dp_meta_objset;
   3520 	uint64_t zapobj;
   3521 	uint64_t dsobj = ds->ds_object;
   3522 	uint64_t refs;
   3523 	int error;
   3524 
   3525 	mutex_enter(&ds->ds_lock);
   3526 	ds->ds_userrefs--;
   3527 	refs = ds->ds_userrefs;
   3528 	mutex_exit(&ds->ds_lock);
   3529 	error = dsl_pool_user_release(dp, ds->ds_object, ra->htag, tx);
   3530 	VERIFY(error == 0 || error == ENOENT);
   3531 	zapobj = ds->ds_phys->ds_userrefs_obj;
   3532 	VERIFY(0 == zap_remove(mos, zapobj, ra->htag, tx));
   3533 	if (ds->ds_userrefs == 0 && ds->ds_phys->ds_num_children == 1 &&
   3534 	    DS_IS_DEFER_DESTROY(ds)) {
   3535 		struct dsl_ds_destroyarg dsda = {0};
   3536 
   3537 		ASSERT(ra->own);
   3538 		dsda.ds = ds;
   3539 		dsda.releasing = B_TRUE;
   3540 		/* We already did the destroy_check */
   3541 		dsl_dataset_destroy_sync(&dsda, tag, cr, tx);
   3542 	}
   3543 
   3544 	spa_history_internal_log(LOG_DS_USER_RELEASE,
   3545 	    dp->dp_spa, tx, cr, "<%s> %lld dataset = %llu",
   3546 	    ra->htag, (longlong_t)refs, dsobj);
   3547 }
   3548 
   3549 static int
   3550 dsl_dataset_user_release_one(const char *dsname, void *arg)
   3551 {
   3552 	struct dsl_ds_holdarg *ha = arg;
   3553 	struct dsl_ds_releasearg *ra;
   3554 	dsl_dataset_t *ds;
   3555 	int error;
   3556 	void *dtag = ha->dstg;
   3557 	char *name;
   3558 	boolean_t own = B_FALSE;
   3559 	boolean_t might_destroy;
   3560 
   3561 	/* alloc a buffer to hold dsname@snapname, plus the terminating NULL */
   3562 	name = kmem_asprintf("%s@%s", dsname, ha->snapname);
   3563 	error = dsl_dataset_hold(name, dtag, &ds);
   3564 	strfree(name);
   3565 	if (error == ENOENT && ha->recursive)
   3566 		return (0);
   3567 	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
   3568 	if (error)
   3569 		return (error);
   3570 
   3571 	ha->gotone = B_TRUE;
   3572 
   3573 	ASSERT(dsl_dataset_is_snapshot(ds));
   3574 
   3575 	error = dsl_dataset_release_might_destroy(ds, ha->htag, &might_destroy);
   3576 	if (error) {
   3577 		dsl_dataset_rele(ds, dtag);
   3578 		return (error);
   3579 	}
   3580 
   3581 	if (might_destroy) {
   3582 #ifdef _KERNEL
   3583 		error = zfs_unmount_snap(name, NULL);
   3584 		if (error) {
   3585 			dsl_dataset_rele(ds, dtag);
   3586 			return (error);
   3587 		}
   3588 #endif
   3589 		if (!dsl_dataset_tryown(ds, B_TRUE, dtag)) {
   3590 			dsl_dataset_rele(ds, dtag);
   3591 			return (EBUSY);
   3592 		} else {
   3593 			own = B_TRUE;
   3594 			dsl_dataset_make_exclusive(ds, dtag);
   3595 		}
   3596 	}
   3597 
   3598 	ra = kmem_alloc(sizeof (struct dsl_ds_releasearg), KM_SLEEP);
   3599 	ra->ds = ds;
   3600 	ra->htag = ha->htag;
   3601 	ra->own = own;
   3602 	dsl_sync_task_create(ha->dstg, dsl_dataset_user_release_check,
   3603 	    dsl_dataset_user_release_sync, ra, dtag, 0);
   3604 
   3605 	return (0);
   3606 }
   3607 
   3608 int
   3609 dsl_dataset_user_release(char *dsname, char *snapname, char *htag,
   3610     boolean_t recursive)
   3611 {
   3612 	struct dsl_ds_holdarg *ha;
   3613 	dsl_sync_task_t *dst;
   3614 	spa_t *spa;
   3615 	int error;
   3616 
   3617 top:
   3618 	ha = kmem_zalloc(sizeof (struct dsl_ds_holdarg), KM_SLEEP);
   3619 
   3620 	(void) strlcpy(ha->failed, dsname, sizeof (ha->failed));
   3621 
   3622 	error = spa_open(dsname, &spa, FTAG);
   3623 	if (error) {
   3624 		kmem_free(ha, sizeof (struct dsl_ds_holdarg));
   3625 		return (error);
   3626 	}
   3627 
   3628 	ha->dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
   3629 	ha->htag = htag;
   3630 	ha->snapname = snapname;
   3631 	ha->recursive = recursive;
   3632 	if (recursive) {
   3633 		error = dmu_objset_find(dsname, dsl_dataset_user_release_one,
   3634 		    ha, DS_FIND_CHILDREN);
   3635 	} else {
   3636 		error = dsl_dataset_user_release_one(dsname, ha);
   3637 	}
   3638 	if (error == 0)
   3639 		error = dsl_sync_task_group_wait(ha->dstg);
   3640 
   3641 	for (dst = list_head(&ha->dstg->dstg_tasks); dst;
   3642 	    dst = list_next(&ha->dstg->dstg_tasks, dst)) {
   3643 		struct dsl_ds_releasearg *ra = dst->dst_arg1;
   3644 		dsl_dataset_t *ds = ra->ds;
   3645 
   3646 		if (dst->dst_err)
   3647 			dsl_dataset_name(ds, ha->failed);
   3648 
   3649 		if (ra->own)
   3650 			dsl_dataset_disown(ds, ha->dstg);
   3651 		else
   3652 			dsl_dataset_rele(ds, ha->dstg);
   3653 
   3654 		kmem_free(ra, sizeof (struct dsl_ds_releasearg));
   3655 	}
   3656 
   3657 	if (error == 0 && recursive && !ha->gotone)
   3658 		error = ENOENT;
   3659 
   3660 	if (error && error != EBUSY)
   3661 		(void) strlcpy(dsname, ha->failed, sizeof (ha->failed));
   3662 
   3663 	dsl_sync_task_group_destroy(ha->dstg);
   3664 	kmem_free(ha, sizeof (struct dsl_ds_holdarg));
   3665 	spa_close(spa, FTAG);
   3666 
   3667 	/*
   3668 	 * We can get EBUSY if we were racing with deferred destroy and
   3669 	 * dsl_dataset_user_release_check() hadn't done the necessary
   3670 	 * open context setup.  We can also get EBUSY if we're racing
   3671 	 * with destroy and that thread is the ds_owner.  Either way
   3672 	 * the busy condition should be transient, and we should retry
   3673 	 * the release operation.
   3674 	 */
   3675 	if (error == EBUSY)
   3676 		goto top;
   3677 
   3678 	return (error);
   3679 }
   3680 
   3681 /*
   3682  * Called at spa_load time to release a stale temporary user hold.
   3683  */
   3684 int
   3685 dsl_dataset_user_release_tmp(dsl_pool_t *dp, uint64_t dsobj, char *htag)
   3686 {
   3687 	dsl_dataset_t *ds;
   3688 	char *snap;
   3689 	char *name;
   3690 	int namelen;
   3691 	int error;
   3692 
   3693 	rw_enter(&dp->dp_config_rwlock, RW_READER);
   3694 	error = dsl_dataset_hold_obj(dp, dsobj, FTAG, &ds);
   3695 	rw_exit(&dp->dp_config_rwlock);
   3696 	if (error)
   3697 		return (error);
   3698 	namelen = dsl_dataset_namelen(ds)+1;
   3699 	name = kmem_alloc(namelen, KM_SLEEP);
   3700 	dsl_dataset_name(ds, name);
   3701 	dsl_dataset_rele(ds, FTAG);
   3702 
   3703 	snap = strchr(name, '@');
   3704 	*snap = '\0';
   3705 	++snap;
   3706 	return (dsl_dataset_user_release(name, snap, htag, B_FALSE));
   3707 }
   3708 
   3709 int
   3710 dsl_dataset_get_holds(const char *dsname, nvlist_t **nvp)
   3711 {
   3712 	dsl_dataset_t *ds;
   3713 	int err;
   3714 
   3715 	err = dsl_dataset_hold(dsname, FTAG, &ds);
   3716 	if (err)
   3717 		return (err);
   3718 
   3719 	VERIFY(0 == nvlist_alloc(nvp, NV_UNIQUE_NAME, KM_SLEEP));
   3720 	if (ds->ds_phys->ds_userrefs_obj != 0) {
   3721 		zap_attribute_t *za;
   3722 		zap_cursor_t zc;
   3723 
   3724 		za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
   3725 		for (zap_cursor_init(&zc, ds->ds_dir->dd_pool->dp_meta_objset,
   3726 		    ds->ds_phys->ds_userrefs_obj);
   3727 		    zap_cursor_retrieve(&zc, za) == 0;
   3728 		    zap_cursor_advance(&zc)) {
   3729 			VERIFY(0 == nvlist_add_uint64(*nvp, za->za_name,
   3730 			    za->za_first_integer));
   3731 		}
   3732 		zap_cursor_fini(&zc);
   3733 		kmem_free(za, sizeof (zap_attribute_t));
   3734 	}
   3735 	dsl_dataset_rele(ds, FTAG);
   3736 	return (0);
   3737 }
   3738 
   3739 /*
   3740  * Note, this fuction is used as the callback for dmu_objset_find().  We
   3741  * always return 0 so that we will continue to find and process
   3742  * inconsistent datasets, even if we encounter an error trying to
   3743  * process one of them.
   3744  */
   3745 /* ARGSUSED */
   3746 int
   3747 dsl_destroy_inconsistent(const char *dsname, void *arg)
   3748 {
   3749 	dsl_dataset_t *ds;
   3750 
   3751 	if (dsl_dataset_own(dsname, B_TRUE, FTAG, &ds) == 0) {
   3752 		if (DS_IS_INCONSISTENT(ds))
   3753 			(void) dsl_dataset_destroy(ds, FTAG, B_FALSE);
   3754 		else
   3755 			dsl_dataset_disown(ds, FTAG);
   3756 	}
   3757 	return (0);
   3758 }
   3759