Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/dmu_objset.h>
     27 #include <sys/dsl_dataset.h>
     28 #include <sys/dsl_dir.h>
     29 #include <sys/dsl_prop.h>
     30 #include <sys/dsl_synctask.h>
     31 #include <sys/dmu_traverse.h>
     32 #include <sys/dmu_tx.h>
     33 #include <sys/arc.h>
     34 #include <sys/zio.h>
     35 #include <sys/zap.h>
     36 #include <sys/unique.h>
     37 #include <sys/zfs_context.h>
     38 #include <sys/zfs_ioctl.h>
     39 #include <sys/spa.h>
     40 #include <sys/zfs_znode.h>
     41 #include <sys/sunddi.h>
     42 
     43 static char *dsl_reaper = "the grim reaper";
     44 
     45 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
     46 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
     47 static dsl_checkfunc_t dsl_dataset_rollback_check;
     48 static dsl_syncfunc_t dsl_dataset_rollback_sync;
     49 static dsl_syncfunc_t dsl_dataset_set_reservation_sync;
     50 
     51 #define	DS_REF_MAX	(1ULL << 62)
     52 
     53 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
     54 
     55 #define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
     56 
     57 
     58 /*
     59  * Figure out how much of this delta should be propogated to the dsl_dir
     60  * layer.  If there's a refreservation, that space has already been
     61  * partially accounted for in our ancestors.
     62  */
     63 static int64_t
     64 parent_delta(dsl_dataset_t *ds, int64_t delta)
     65 {
     66 	uint64_t old_bytes, new_bytes;
     67 
     68 	if (ds->ds_reserved == 0)
     69 		return (delta);
     70 
     71 	old_bytes = MAX(ds->ds_phys->ds_unique_bytes, ds->ds_reserved);
     72 	new_bytes = MAX(ds->ds_phys->ds_unique_bytes + delta, ds->ds_reserved);
     73 
     74 	ASSERT3U(ABS((int64_t)(new_bytes - old_bytes)), <=, ABS(delta));
     75 	return (new_bytes - old_bytes);
     76 }
     77 
     78 void
     79 dsl_dataset_block_born(dsl_dataset_t *ds, blkptr_t *bp, dmu_tx_t *tx)
     80 {
     81 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
     82 	int compressed = BP_GET_PSIZE(bp);
     83 	int uncompressed = BP_GET_UCSIZE(bp);
     84 	int64_t delta;
     85 
     86 	dprintf_bp(bp, "born, ds=%p\n", ds);
     87 
     88 	ASSERT(dmu_tx_is_syncing(tx));
     89 	/* It could have been compressed away to nothing */
     90 	if (BP_IS_HOLE(bp))
     91 		return;
     92 	ASSERT(BP_GET_TYPE(bp) != DMU_OT_NONE);
     93 	ASSERT3U(BP_GET_TYPE(bp), <, DMU_OT_NUMTYPES);
     94 	if (ds == NULL) {
     95 		/*
     96 		 * Account for the meta-objset space in its placeholder
     97 		 * dsl_dir.
     98 		 */
     99 		ASSERT3U(compressed, ==, uncompressed); /* it's all metadata */
    100 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
    101 		    used, compressed, uncompressed, tx);
    102 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    103 		return;
    104 	}
    105 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    106 	mutex_enter(&ds->ds_dir->dd_lock);
    107 	mutex_enter(&ds->ds_lock);
    108 	delta = parent_delta(ds, used);
    109 	ds->ds_phys->ds_used_bytes += used;
    110 	ds->ds_phys->ds_compressed_bytes += compressed;
    111 	ds->ds_phys->ds_uncompressed_bytes += uncompressed;
    112 	ds->ds_phys->ds_unique_bytes += used;
    113 	mutex_exit(&ds->ds_lock);
    114 	dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD, delta,
    115 	    compressed, uncompressed, tx);
    116 	dsl_dir_transfer_space(ds->ds_dir, used - delta,
    117 	    DD_USED_REFRSRV, DD_USED_HEAD, tx);
    118 	mutex_exit(&ds->ds_dir->dd_lock);
    119 }
    120 
    121 int
    122 dsl_dataset_block_kill(dsl_dataset_t *ds, blkptr_t *bp, zio_t *pio,
    123     dmu_tx_t *tx)
    124 {
    125 	int used = bp_get_dasize(tx->tx_pool->dp_spa, bp);
    126 	int compressed = BP_GET_PSIZE(bp);
    127 	int uncompressed = BP_GET_UCSIZE(bp);
    128 
    129 	ASSERT(pio != NULL);
    130 	ASSERT(dmu_tx_is_syncing(tx));
    131 	/* No block pointer => nothing to free */
    132 	if (BP_IS_HOLE(bp))
    133 		return (0);
    134 
    135 	ASSERT(used > 0);
    136 	if (ds == NULL) {
    137 		int err;
    138 		/*
    139 		 * Account for the meta-objset space in its placeholder
    140 		 * dataset.
    141 		 */
    142 		err = dsl_free(pio, tx->tx_pool,
    143 		    tx->tx_txg, bp, NULL, NULL, ARC_NOWAIT);
    144 		ASSERT(err == 0);
    145 
    146 		dsl_dir_diduse_space(tx->tx_pool->dp_mos_dir, DD_USED_HEAD,
    147 		    -used, -compressed, -uncompressed, tx);
    148 		dsl_dir_dirty(tx->tx_pool->dp_mos_dir, tx);
    149 		return (used);
    150 	}
    151 	ASSERT3P(tx->tx_pool, ==, ds->ds_dir->dd_pool);
    152 
    153 	ASSERT(!dsl_dataset_is_snapshot(ds));
    154 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
    155 
    156 	if (bp->blk_birth > ds->ds_phys->ds_prev_snap_txg) {
    157 		int err;
    158 		int64_t delta;
    159 
    160 		dprintf_bp(bp, "freeing: %s", "");
    161 		err = dsl_free(pio, tx->tx_pool,
    162 		    tx->tx_txg, bp, NULL, NULL, ARC_NOWAIT);
    163 		ASSERT(err == 0);
    164 
    165 		mutex_enter(&ds->ds_dir->dd_lock);
    166 		mutex_enter(&ds->ds_lock);
    167 		ASSERT(ds->ds_phys->ds_unique_bytes >= used ||
    168 		    !DS_UNIQUE_IS_ACCURATE(ds));
    169 		delta = parent_delta(ds, -used);
    170 		ds->ds_phys->ds_unique_bytes -= used;
    171 		mutex_exit(&ds->ds_lock);
    172 		dsl_dir_diduse_space(ds->ds_dir, DD_USED_HEAD,
    173 		    delta, -compressed, -uncompressed, tx);
    174 		dsl_dir_transfer_space(ds->ds_dir, -used - delta,
    175 		    DD_USED_REFRSRV, DD_USED_HEAD, tx);
    176 		mutex_exit(&ds->ds_dir->dd_lock);
    177 	} else {
    178 		dprintf_bp(bp, "putting on dead list: %s", "");
    179 		VERIFY(0 == bplist_enqueue(&ds->ds_deadlist, bp, tx));
    180 		ASSERT3U(ds->ds_prev->ds_object, ==,
    181 		    ds->ds_phys->ds_prev_snap_obj);
    182 		ASSERT(ds->ds_prev->ds_phys->ds_num_children > 0);
    183 		/* if (bp->blk_birth > prev prev snap txg) prev unique += bs */
    184 		if (ds->ds_prev->ds_phys->ds_next_snap_obj ==
    185 		    ds->ds_object && bp->blk_birth >
    186 		    ds->ds_prev->ds_phys->ds_prev_snap_txg) {
    187 			dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
    188 			mutex_enter(&ds->ds_prev->ds_lock);
    189 			ds->ds_prev->ds_phys->ds_unique_bytes += used;
    190 			mutex_exit(&ds->ds_prev->ds_lock);
    191 		}
    192 		if (bp->blk_birth > ds->ds_origin_txg) {
    193 			dsl_dir_transfer_space(ds->ds_dir, used,
    194 			    DD_USED_HEAD, DD_USED_SNAP, tx);
    195 		}
    196 	}
    197 	mutex_enter(&ds->ds_lock);
    198 	ASSERT3U(ds->ds_phys->ds_used_bytes, >=, used);
    199 	ds->ds_phys->ds_used_bytes -= used;
    200 	ASSERT3U(ds->ds_phys->ds_compressed_bytes, >=, compressed);
    201 	ds->ds_phys->ds_compressed_bytes -= compressed;
    202 	ASSERT3U(ds->ds_phys->ds_uncompressed_bytes, >=, uncompressed);
    203 	ds->ds_phys->ds_uncompressed_bytes -= uncompressed;
    204 	mutex_exit(&ds->ds_lock);
    205 
    206 	return (used);
    207 }
    208 
    209 uint64_t
    210 dsl_dataset_prev_snap_txg(dsl_dataset_t *ds)
    211 {
    212 	uint64_t trysnap = 0;
    213 
    214 	if (ds == NULL)
    215 		return (0);
    216 	/*
    217 	 * The snapshot creation could fail, but that would cause an
    218 	 * incorrect FALSE return, which would only result in an
    219 	 * overestimation of the amount of space that an operation would
    220 	 * consume, which is OK.
    221 	 *
    222 	 * There's also a small window where we could miss a pending
    223 	 * snapshot, because we could set the sync task in the quiescing
    224 	 * phase.  So this should only be used as a guess.
    225 	 */
    226 	if (ds->ds_trysnap_txg >
    227 	    spa_last_synced_txg(ds->ds_dir->dd_pool->dp_spa))
    228 		trysnap = ds->ds_trysnap_txg;
    229 	return (MAX(ds->ds_phys->ds_prev_snap_txg, trysnap));
    230 }
    231 
    232 int
    233 dsl_dataset_block_freeable(dsl_dataset_t *ds, uint64_t blk_birth)
    234 {
    235 	return (blk_birth > dsl_dataset_prev_snap_txg(ds));
    236 }
    237 
    238 /* ARGSUSED */
    239 static void
    240 dsl_dataset_evict(dmu_buf_t *db, void *dsv)
    241 {
    242 	dsl_dataset_t *ds = dsv;
    243 
    244 	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
    245 
    246 	dprintf_ds(ds, "evicting %s\n", "");
    247 
    248 	unique_remove(ds->ds_fsid_guid);
    249 
    250 	if (ds->ds_user_ptr != NULL)
    251 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
    252 
    253 	if (ds->ds_prev) {
    254 		dsl_dataset_drop_ref(ds->ds_prev, ds);
    255 		ds->ds_prev = NULL;
    256 	}
    257 
    258 	bplist_close(&ds->ds_deadlist);
    259 	if (ds->ds_dir)
    260 		dsl_dir_close(ds->ds_dir, ds);
    261 
    262 	ASSERT(!list_link_active(&ds->ds_synced_link));
    263 
    264 	mutex_destroy(&ds->ds_lock);
    265 	mutex_destroy(&ds->ds_opening_lock);
    266 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
    267 	rw_destroy(&ds->ds_rwlock);
    268 	cv_destroy(&ds->ds_exclusive_cv);
    269 
    270 	kmem_free(ds, sizeof (dsl_dataset_t));
    271 }
    272 
    273 static int
    274 dsl_dataset_get_snapname(dsl_dataset_t *ds)
    275 {
    276 	dsl_dataset_phys_t *headphys;
    277 	int err;
    278 	dmu_buf_t *headdbuf;
    279 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    280 	objset_t *mos = dp->dp_meta_objset;
    281 
    282 	if (ds->ds_snapname[0])
    283 		return (0);
    284 	if (ds->ds_phys->ds_next_snap_obj == 0)
    285 		return (0);
    286 
    287 	err = dmu_bonus_hold(mos, ds->ds_dir->dd_phys->dd_head_dataset_obj,
    288 	    FTAG, &headdbuf);
    289 	if (err)
    290 		return (err);
    291 	headphys = headdbuf->db_data;
    292 	err = zap_value_search(dp->dp_meta_objset,
    293 	    headphys->ds_snapnames_zapobj, ds->ds_object, 0, ds->ds_snapname);
    294 	dmu_buf_rele(headdbuf, FTAG);
    295 	return (err);
    296 }
    297 
    298 static int
    299 dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
    300 {
    301 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
    302 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
    303 	matchtype_t mt;
    304 	int err;
    305 
    306 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
    307 		mt = MT_FIRST;
    308 	else
    309 		mt = MT_EXACT;
    310 
    311 	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
    312 	    value, mt, NULL, 0, NULL);
    313 	if (err == ENOTSUP && mt == MT_FIRST)
    314 		err = zap_lookup(mos, snapobj, name, 8, 1, value);
    315 	return (err);
    316 }
    317 
    318 static int
    319 dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
    320 {
    321 	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
    322 	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
    323 	matchtype_t mt;
    324 	int err;
    325 
    326 	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
    327 		mt = MT_FIRST;
    328 	else
    329 		mt = MT_EXACT;
    330 
    331 	err = zap_remove_norm(mos, snapobj, name, mt, tx);
    332 	if (err == ENOTSUP && mt == MT_FIRST)
    333 		err = zap_remove(mos, snapobj, name, tx);
    334 	return (err);
    335 }
    336 
    337 static int
    338 dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
    339     dsl_dataset_t **dsp)
    340 {
    341 	objset_t *mos = dp->dp_meta_objset;
    342 	dmu_buf_t *dbuf;
    343 	dsl_dataset_t *ds;
    344 	int err;
    345 
    346 	ASSERT(RW_LOCK_HELD(&dp->dp_config_rwlock) ||
    347 	    dsl_pool_sync_context(dp));
    348 
    349 	err = dmu_bonus_hold(mos, dsobj, tag, &dbuf);
    350 	if (err)
    351 		return (err);
    352 	ds = dmu_buf_get_user(dbuf);
    353 	if (ds == NULL) {
    354 		dsl_dataset_t *winner;
    355 
    356 		ds = kmem_zalloc(sizeof (dsl_dataset_t), KM_SLEEP);
    357 		ds->ds_dbuf = dbuf;
    358 		ds->ds_object = dsobj;
    359 		ds->ds_phys = dbuf->db_data;
    360 
    361 		mutex_init(&ds->ds_lock, NULL, MUTEX_DEFAULT, NULL);
    362 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
    363 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
    364 		    NULL);
    365 		rw_init(&ds->ds_rwlock, 0, 0, 0);
    366 		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
    367 
    368 		err = bplist_open(&ds->ds_deadlist,
    369 		    mos, ds->ds_phys->ds_deadlist_obj);
    370 		if (err == 0) {
    371 			err = dsl_dir_open_obj(dp,
    372 			    ds->ds_phys->ds_dir_obj, NULL, ds, &ds->ds_dir);
    373 		}
    374 		if (err) {
    375 			/*
    376 			 * we don't really need to close the blist if we
    377 			 * just opened it.
    378 			 */
    379 			mutex_destroy(&ds->ds_lock);
    380 			mutex_destroy(&ds->ds_opening_lock);
    381 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
    382 			rw_destroy(&ds->ds_rwlock);
    383 			cv_destroy(&ds->ds_exclusive_cv);
    384 			kmem_free(ds, sizeof (dsl_dataset_t));
    385 			dmu_buf_rele(dbuf, tag);
    386 			return (err);
    387 		}
    388 
    389 		if (!dsl_dataset_is_snapshot(ds)) {
    390 			ds->ds_snapname[0] = '\0';
    391 			if (ds->ds_phys->ds_prev_snap_obj) {
    392 				err = dsl_dataset_get_ref(dp,
    393 				    ds->ds_phys->ds_prev_snap_obj,
    394 				    ds, &ds->ds_prev);
    395 			}
    396 
    397 			if (err == 0 && dsl_dir_is_clone(ds->ds_dir)) {
    398 				dsl_dataset_t *origin;
    399 
    400 				err = dsl_dataset_hold_obj(dp,
    401 				    ds->ds_dir->dd_phys->dd_origin_obj,
    402 				    FTAG, &origin);
    403 				if (err == 0) {
    404 					ds->ds_origin_txg =
    405 					    origin->ds_phys->ds_creation_txg;
    406 					dsl_dataset_rele(origin, FTAG);
    407 				}
    408 			}
    409 		} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
    410 			err = dsl_dataset_get_snapname(ds);
    411 		}
    412 
    413 		if (err == 0 && !dsl_dataset_is_snapshot(ds)) {
    414 			/*
    415 			 * In sync context, we're called with either no lock
    416 			 * or with the write lock.  If we're not syncing,
    417 			 * we're always called with the read lock held.
    418 			 */
    419 			boolean_t need_lock =
    420 			    !RW_WRITE_HELD(&dp->dp_config_rwlock) &&
    421 			    dsl_pool_sync_context(dp);
    422 
    423 			if (need_lock)
    424 				rw_enter(&dp->dp_config_rwlock, RW_READER);
    425 
    426 			err = dsl_prop_get_ds(ds,
    427 			    "refreservation", sizeof (uint64_t), 1,
    428 			    &ds->ds_reserved, NULL);
    429 			if (err == 0) {
    430 				err = dsl_prop_get_ds(ds,
    431 				    "refquota", sizeof (uint64_t), 1,
    432 				    &ds->ds_quota, NULL);
    433 			}
    434 
    435 			if (need_lock)
    436 				rw_exit(&dp->dp_config_rwlock);
    437 		} else {
    438 			ds->ds_reserved = ds->ds_quota = 0;
    439 		}
    440 
    441 		if (err == 0) {
    442 			winner = dmu_buf_set_user_ie(dbuf, ds, &ds->ds_phys,
    443 			    dsl_dataset_evict);
    444 		}
    445 		if (err || winner) {
    446 			bplist_close(&ds->ds_deadlist);
    447 			if (ds->ds_prev)
    448 				dsl_dataset_drop_ref(ds->ds_prev, ds);
    449 			dsl_dir_close(ds->ds_dir, ds);
    450 			mutex_destroy(&ds->ds_lock);
    451 			mutex_destroy(&ds->ds_opening_lock);
    452 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
    453 			rw_destroy(&ds->ds_rwlock);
    454 			cv_destroy(&ds->ds_exclusive_cv);
    455 			kmem_free(ds, sizeof (dsl_dataset_t));
    456 			if (err) {
    457 				dmu_buf_rele(dbuf, tag);
    458 				return (err);
    459 			}
    460 			ds = winner;
    461 		} else {
    462 			ds->ds_fsid_guid =
    463 			    unique_insert(ds->ds_phys->ds_fsid_guid);
    464 		}
    465 	}
    466 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
    467 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
    468 	ASSERT(ds->ds_phys->ds_prev_snap_obj != 0 ||
    469 	    spa_version(dp->dp_spa) < SPA_VERSION_ORIGIN ||
    470 	    dp->dp_origin_snap == NULL || ds == dp->dp_origin_snap);
    471 	mutex_enter(&ds->ds_lock);
    472 	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
    473 		mutex_exit(&ds->ds_lock);
    474 		dmu_buf_rele(ds->ds_dbuf, tag);
    475 		return (ENOENT);
    476 	}
    477 	mutex_exit(&ds->ds_lock);
    478 	*dsp = ds;
    479 	return (0);
    480 }
    481 
    482 static int
    483 dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
    484 {
    485 	dsl_pool_t *dp = ds->ds_dir->dd_pool;
    486 
    487 	/*
    488 	 * In syncing context we don't want the rwlock lock: there
    489 	 * may be an existing writer waiting for sync phase to
    490 	 * finish.  We don't need to worry about such writers, since
    491 	 * sync phase is single-threaded, so the writer can't be
    492 	 * doing anything while we are active.
    493 	 */
    494 	if (dsl_pool_sync_context(dp)) {
    495 		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
    496 		return (0);
    497 	}
    498 
    499 	/*
    500 	 * Normal users will hold the ds_rwlock as a READER until they
    501 	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
    502 	 * drop their READER lock after they set the ds_owner field.
    503 	 *
    504 	 * If the dataset is being destroyed, the destroy thread will
    505 	 * obtain a WRITER lock for exclusive access after it's done its
    506 	 * open-context work and then change the ds_owner to
    507 	 * dsl_reaper once destruction is assured.  So threads
    508 	 * may block here temporarily, until the "destructability" of
    509 	 * the dataset is determined.
    510 	 */
    511 	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
    512 	mutex_enter(&ds->ds_lock);
    513 	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
    514 		rw_exit(&dp->dp_config_rwlock);
    515 		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
    516 		if (DSL_DATASET_IS_DESTROYED(ds)) {
    517 			mutex_exit(&ds->ds_lock);
    518 			dsl_dataset_drop_ref(ds, tag);
    519 			rw_enter(&dp->dp_config_rwlock, RW_READER);
    520 			return (ENOENT);
    521 		}
    522 		rw_enter(&dp->dp_config_rwlock, RW_READER);
    523 	}
    524 	mutex_exit(&ds->ds_lock);
    525 	return (0);
    526 }
    527 
    528 int
    529 dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
    530     dsl_dataset_t **dsp)
    531 {
    532 	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
    533 
    534 	if (err)
    535 		return (err);
    536 	return (dsl_dataset_hold_ref(*dsp, tag));
    537 }
    538 
    539 int
    540 dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, int flags, void *owner,
    541     dsl_dataset_t **dsp)
    542 {
    543 	int err = dsl_dataset_hold_obj(dp, dsobj, owner, dsp);
    544 
    545 	ASSERT(DS_MODE_TYPE(flags) != DS_MODE_USER);
    546 
    547 	if (err)
    548 		return (err);
    549 	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
    550 		dsl_dataset_rele(*dsp, owner);
    551 		return (EBUSY);
    552 	}
    553 	return (0);
    554 }
    555 
    556 int
    557 dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
    558 {
    559 	dsl_dir_t *dd;
    560 	dsl_pool_t *dp;
    561 	const char *snapname;
    562 	uint64_t obj;
    563 	int err = 0;
    564 
    565 	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
    566 	if (err)
    567 		return (err);
    568 
    569 	dp = dd->dd_pool;
    570 	obj = dd->dd_phys->dd_head_dataset_obj;
    571 	rw_enter(&dp->dp_config_rwlock, RW_READER);
    572 	if (obj)
    573 		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
    574 	else
    575 		err = ENOENT;
    576 	if (err)
    577 		goto out;
    578 
    579 	err = dsl_dataset_hold_ref(*dsp, tag);
    580 
    581 	/* we may be looking for a snapshot */
    582 	if (err == 0 && snapname != NULL) {
    583 		dsl_dataset_t *ds = NULL;
    584 
    585 		if (*snapname++ != '@') {
    586 			dsl_dataset_rele(*dsp, tag);
    587 			err = ENOENT;
    588 			goto out;
    589 		}
    590 
    591 		dprintf("looking for snapshot '%s'\n", snapname);
    592 		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
    593 		if (err == 0)
    594 			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
    595 		dsl_dataset_rele(*dsp, tag);
    596 
    597 		ASSERT3U((err == 0), ==, (ds != NULL));
    598 
    599 		if (ds) {
    600 			mutex_enter(&ds->ds_lock);
    601 			if (ds->ds_snapname[0] == 0)
    602 				(void) strlcpy(ds->ds_snapname, snapname,
    603 				    sizeof (ds->ds_snapname));
    604 			mutex_exit(&ds->ds_lock);
    605 			err = dsl_dataset_hold_ref(ds, tag);
    606 			*dsp = err ? NULL : ds;
    607 		}
    608 	}
    609 out:
    610 	rw_exit(&dp->dp_config_rwlock);
    611 	dsl_dir_close(dd, FTAG);
    612 	return (err);
    613 }
    614 
    615 int
    616 dsl_dataset_own(const char *name, int flags, void *owner, dsl_dataset_t **dsp)
    617 {
    618 	int err = dsl_dataset_hold(name, owner, dsp);
    619 	if (err)
    620 		return (err);
    621 	if ((*dsp)->ds_phys->ds_num_children > 0 &&
    622 	    !DS_MODE_IS_READONLY(flags)) {
    623 		dsl_dataset_rele(*dsp, owner);
    624 		return (EROFS);
    625 	}
    626 	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
    627 		dsl_dataset_rele(*dsp, owner);
    628 		return (EBUSY);
    629 	}
    630 	return (0);
    631 }
    632 
    633 void
    634 dsl_dataset_name(dsl_dataset_t *ds, char *name)
    635 {
    636 	if (ds == NULL) {
    637 		(void) strcpy(name, "mos");
    638 	} else {
    639 		dsl_dir_name(ds->ds_dir, name);
    640 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    641 		if (ds->ds_snapname[0]) {
    642 			(void) strcat(name, "@");
    643 			/*
    644 			 * We use a "recursive" mutex so that we
    645 			 * can call dprintf_ds() with ds_lock held.
    646 			 */
    647 			if (!MUTEX_HELD(&ds->ds_lock)) {
    648 				mutex_enter(&ds->ds_lock);
    649 				(void) strcat(name, ds->ds_snapname);
    650 				mutex_exit(&ds->ds_lock);
    651 			} else {
    652 				(void) strcat(name, ds->ds_snapname);
    653 			}
    654 		}
    655 	}
    656 }
    657 
    658 static int
    659 dsl_dataset_namelen(dsl_dataset_t *ds)
    660 {
    661 	int result;
    662 
    663 	if (ds == NULL) {
    664 		result = 3;	/* "mos" */
    665 	} else {
    666 		result = dsl_dir_namelen(ds->ds_dir);
    667 		VERIFY(0 == dsl_dataset_get_snapname(ds));
    668 		if (ds->ds_snapname[0]) {
    669 			++result;	/* adding one for the @-sign */
    670 			if (!MUTEX_HELD(&ds->ds_lock)) {
    671 				mutex_enter(&ds->ds_lock);
    672 				result += strlen(ds->ds_snapname);
    673 				mutex_exit(&ds->ds_lock);
    674 			} else {
    675 				result += strlen(ds->ds_snapname);
    676 			}
    677 		}
    678 	}
    679 
    680 	return (result);
    681 }
    682 
    683 void
    684 dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
    685 {
    686 	dmu_buf_rele(ds->ds_dbuf, tag);
    687 }
    688 
    689 void
    690 dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
    691 {
    692 	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
    693 		rw_exit(&ds->ds_rwlock);
    694 	}
    695 	dsl_dataset_drop_ref(ds, tag);
    696 }
    697 
    698 void
    699 dsl_dataset_disown(dsl_dataset_t *ds, void *owner)
    700 {
    701 	ASSERT((ds->ds_owner == owner && ds->ds_dbuf) ||
    702 	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
    703 
    704 	mutex_enter(&ds->ds_lock);
    705 	ds->ds_owner = NULL;
    706 	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
    707 		rw_exit(&ds->ds_rwlock);
    708 		cv_broadcast(&ds->ds_exclusive_cv);
    709 	}
    710 	mutex_exit(&ds->ds_lock);
    711 	if (ds->ds_dbuf)
    712 		dsl_dataset_drop_ref(ds, owner);
    713 	else
    714 		dsl_dataset_evict(ds->ds_dbuf, ds);
    715 }
    716 
    717 boolean_t
    718 dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *owner)
    719 {
    720 	boolean_t gotit = FALSE;
    721 
    722 	mutex_enter(&ds->ds_lock);
    723 	if (ds->ds_owner == NULL &&
    724 	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
    725 		ds->ds_owner = owner;
    726 		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
    727 			rw_exit(&ds->ds_rwlock);
    728 		gotit = TRUE;
    729 	}
    730 	mutex_exit(&ds->ds_lock);
    731 	return (gotit);
    732 }
    733 
    734 void
    735 dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
    736 {
    737 	ASSERT3P(owner, ==, ds->ds_owner);
    738 	if (!RW_WRITE_HELD(&ds->ds_rwlock))
    739 		rw_enter(&ds->ds_rwlock, RW_WRITER);
    740 }
    741 
    742 uint64_t
    743 dsl_dataset_create_sync_dd(dsl_dir_t *dd, dsl_dataset_t *origin,
    744     uint64_t flags, dmu_tx_t *tx)
    745 {
    746 	dsl_pool_t *dp = dd->dd_pool;
    747 	dmu_buf_t *dbuf;
    748 	dsl_dataset_phys_t *dsphys;
    749 	uint64_t dsobj;
    750 	objset_t *mos = dp->dp_meta_objset;
    751 
    752 	if (origin == NULL)
    753 		origin = dp->dp_origin_snap;
    754 
    755 	ASSERT(origin == NULL || origin->ds_dir->dd_pool == dp);
    756 	ASSERT(origin == NULL || origin->ds_phys->ds_num_children > 0);
    757 	ASSERT(dmu_tx_is_syncing(tx));
    758 	ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
    759 
    760 	dsobj = dmu_object_alloc(mos, DMU_OT_DSL_DATASET, 0,
    761 	    DMU_OT_DSL_DATASET, sizeof (dsl_dataset_phys_t), tx);
    762 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
    763 	dmu_buf_will_dirty(dbuf, tx);
    764 	dsphys = dbuf->db_data;
    765 	bzero(dsphys, sizeof (dsl_dataset_phys_t));
    766 	dsphys->ds_dir_obj = dd->dd_object;
    767 	dsphys->ds_flags = flags;
    768 	dsphys->ds_fsid_guid = unique_create();
    769 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
    770 	    sizeof (dsphys->ds_guid));
    771 	dsphys->ds_snapnames_zapobj =
    772 	    zap_create_norm(mos, U8_TEXTPREP_TOUPPER, DMU_OT_DSL_DS_SNAP_MAP,
    773 	    DMU_OT_NONE, 0, tx);
    774 	dsphys->ds_creation_time = gethrestime_sec();
    775 	dsphys->ds_creation_txg = tx->tx_txg == TXG_INITIAL ? 1 : tx->tx_txg;
    776 	dsphys->ds_deadlist_obj =
    777 	    bplist_create(mos, DSL_DEADLIST_BLOCKSIZE, tx);
    778 
    779 	if (origin) {
    780 		dsphys->ds_prev_snap_obj = origin->ds_object;
    781 		dsphys->ds_prev_snap_txg =
    782 		    origin->ds_phys->ds_creation_txg;
    783 		dsphys->ds_used_bytes =
    784 		    origin->ds_phys->ds_used_bytes;
    785 		dsphys->ds_compressed_bytes =
    786 		    origin->ds_phys->ds_compressed_bytes;
    787 		dsphys->ds_uncompressed_bytes =
    788 		    origin->ds_phys->ds_uncompressed_bytes;
    789 		dsphys->ds_bp = origin->ds_phys->ds_bp;
    790 		dsphys->ds_flags |= origin->ds_phys->ds_flags;
    791 
    792 		dmu_buf_will_dirty(origin->ds_dbuf, tx);
    793 		origin->ds_phys->ds_num_children++;
    794 
    795 		if (spa_version(dp->dp_spa) >= SPA_VERSION_NEXT_CLONES) {
    796 			if (origin->ds_phys->ds_next_clones_obj == 0) {
    797 				origin->ds_phys->ds_next_clones_obj =
    798 				    zap_create(mos,
    799 				    DMU_OT_NEXT_CLONES, DMU_OT_NONE, 0, tx);
    800 			}
    801 			VERIFY(0 == zap_add_int(mos,
    802 			    origin->ds_phys->ds_next_clones_obj,
    803 			    dsobj, tx));
    804 		}
    805 
    806 		dmu_buf_will_dirty(dd->dd_dbuf, tx);
    807 		dd->dd_phys->dd_origin_obj = origin->ds_object;
    808 	}
    809 
    810 	if (spa_version(dp->dp_spa) >= SPA_VERSION_UNIQUE_ACCURATE)
    811 		dsphys->ds_flags |= DS_FLAG_UNIQUE_ACCURATE;
    812 
    813 	dmu_buf_rele(dbuf, FTAG);
    814 
    815 	dmu_buf_will_dirty(dd->dd_dbuf, tx);
    816 	dd->dd_phys->dd_head_dataset_obj = dsobj;
    817 
    818 	return (dsobj);
    819 }
    820 
    821 uint64_t
    822 dsl_dataset_create_sync(dsl_dir_t *pdd, const char *lastname,
    823     dsl_dataset_t *origin, uint64_t flags, cred_t *cr, dmu_tx_t *tx)
    824 {
    825 	dsl_pool_t *dp = pdd->dd_pool;
    826 	uint64_t dsobj, ddobj;
    827 	dsl_dir_t *dd;
    828 
    829 	ASSERT(lastname[0] != '@');
    830 
    831 	ddobj = dsl_dir_create_sync(dp, pdd, lastname, tx);
    832 	VERIFY(0 == dsl_dir_open_obj(dp, ddobj, lastname, FTAG, &dd));
    833 
    834 	dsobj = dsl_dataset_create_sync_dd(dd, origin, flags, tx);
    835 
    836 	dsl_deleg_set_create_perms(dd, tx, cr);
    837 
    838 	dsl_dir_close(dd, FTAG);
    839 
    840 	return (dsobj);
    841 }
    842 
    843 struct destroyarg {
    844 	dsl_sync_task_group_t *dstg;
    845 	char *snapname;
    846 	char *failed;
    847 };
    848 
    849 static int
    850 dsl_snapshot_destroy_one(char *name, void *arg)
    851 {
    852 	struct destroyarg *da = arg;
    853 	dsl_dataset_t *ds;
    854 	char *cp;
    855 	int err;
    856 
    857 	(void) strcat(name, "@");
    858 	(void) strcat(name, da->snapname);
    859 	err = dsl_dataset_own(name, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
    860 	    da->dstg, &ds);
    861 	cp = strchr(name, '@');
    862 	*cp = '\0';
    863 	if (err == 0) {
    864 		dsl_dataset_make_exclusive(ds, da->dstg);
    865 		if (ds->ds_user_ptr) {
    866 			ds->ds_user_evict_func(ds, ds->ds_user_ptr);
    867 			ds->ds_user_ptr = NULL;
    868 		}
    869 		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
    870 		    dsl_dataset_destroy_sync, ds, da->dstg, 0);
    871 	} else if (err == ENOENT) {
    872 		err = 0;
    873 	} else {
    874 		(void) strcpy(da->failed, name);
    875 	}
    876 	return (err);
    877 }
    878 
    879 /*
    880  * Destroy 'snapname' in all descendants of 'fsname'.
    881  */
    882 #pragma weak dmu_snapshots_destroy = dsl_snapshots_destroy
    883 int
    884 dsl_snapshots_destroy(char *fsname, char *snapname)
    885 {
    886 	int err;
    887 	struct destroyarg da;
    888 	dsl_sync_task_t *dst;
    889 	spa_t *spa;
    890 
    891 	err = spa_open(fsname, &spa, FTAG);
    892 	if (err)
    893 		return (err);
    894 	da.dstg = dsl_sync_task_group_create(spa_get_dsl(spa));
    895 	da.snapname = snapname;
    896 	da.failed = fsname;
    897 
    898 	err = dmu_objset_find(fsname,
    899 	    dsl_snapshot_destroy_one, &da, DS_FIND_CHILDREN);
    900 
    901 	if (err == 0)
    902 		err =