Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/dmu.h>
     27 #include <sys/dmu_impl.h>
     28 #include <sys/dmu_tx.h>
     29 #include <sys/dbuf.h>
     30 #include <sys/dnode.h>
     31 #include <sys/zfs_context.h>
     32 #include <sys/dmu_objset.h>
     33 #include <sys/dmu_traverse.h>
     34 #include <sys/dsl_dataset.h>
     35 #include <sys/dsl_dir.h>
     36 #include <sys/dsl_pool.h>
     37 #include <sys/dsl_synctask.h>
     38 #include <sys/dsl_prop.h>
     39 #include <sys/dmu_zfetch.h>
     40 #include <sys/zfs_ioctl.h>
     41 #include <sys/zap.h>
     42 #include <sys/zio_checksum.h>
     43 #ifdef _KERNEL
     44 #include <sys/vmsystm.h>
     45 #include <sys/zfs_znode.h>
     46 #endif
     47 
     48 const dmu_object_type_info_t dmu_ot[DMU_OT_NUMTYPES] = {
     49 	{	byteswap_uint8_array,	TRUE,	"unallocated"		},
     50 	{	zap_byteswap,		TRUE,	"object directory"	},
     51 	{	byteswap_uint64_array,	TRUE,	"object array"		},
     52 	{	byteswap_uint8_array,	TRUE,	"packed nvlist"		},
     53 	{	byteswap_uint64_array,	TRUE,	"packed nvlist size"	},
     54 	{	byteswap_uint64_array,	TRUE,	"bplist"		},
     55 	{	byteswap_uint64_array,	TRUE,	"bplist header"		},
     56 	{	byteswap_uint64_array,	TRUE,	"SPA space map header"	},
     57 	{	byteswap_uint64_array,	TRUE,	"SPA space map"		},
     58 	{	byteswap_uint64_array,	TRUE,	"ZIL intent log"	},
     59 	{	dnode_buf_byteswap,	TRUE,	"DMU dnode"		},
     60 	{	dmu_objset_byteswap,	TRUE,	"DMU objset"		},
     61 	{	byteswap_uint64_array,	TRUE,	"DSL directory"		},
     62 	{	zap_byteswap,		TRUE,	"DSL directory child map"},
     63 	{	zap_byteswap,		TRUE,	"DSL dataset snap map"	},
     64 	{	zap_byteswap,		TRUE,	"DSL props"		},
     65 	{	byteswap_uint64_array,	TRUE,	"DSL dataset"		},
     66 	{	zfs_znode_byteswap,	TRUE,	"ZFS znode"		},
     67 	{	zfs_oldacl_byteswap,	TRUE,	"ZFS V0 ACL"		},
     68 	{	byteswap_uint8_array,	FALSE,	"ZFS plain file"	},
     69 	{	zap_byteswap,		TRUE,	"ZFS directory"		},
     70 	{	zap_byteswap,		TRUE,	"ZFS master node"	},
     71 	{	zap_byteswap,		TRUE,	"ZFS delete queue"	},
     72 	{	byteswap_uint8_array,	FALSE,	"zvol object"		},
     73 	{	zap_byteswap,		TRUE,	"zvol prop"		},
     74 	{	byteswap_uint8_array,	FALSE,	"other uint8[]"		},
     75 	{	byteswap_uint64_array,	FALSE,	"other uint64[]"	},
     76 	{	zap_byteswap,		TRUE,	"other ZAP"		},
     77 	{	zap_byteswap,		TRUE,	"persistent error log"	},
     78 	{	byteswap_uint8_array,	TRUE,	"SPA history"		},
     79 	{	byteswap_uint64_array,	TRUE,	"SPA history offsets"	},
     80 	{	zap_byteswap,		TRUE,	"Pool properties"	},
     81 	{	zap_byteswap,		TRUE,	"DSL permissions"	},
     82 	{	zfs_acl_byteswap,	TRUE,	"ZFS ACL"		},
     83 	{	byteswap_uint8_array,	TRUE,	"ZFS SYSACL"		},
     84 	{	byteswap_uint8_array,	TRUE,	"FUID table"		},
     85 	{	byteswap_uint64_array,	TRUE,	"FUID table size"	},
     86 	{	zap_byteswap,		TRUE,	"DSL dataset next clones"},
     87 	{	zap_byteswap,		TRUE,	"scrub work queue"	},
     88 };
     89 
     90 int
     91 dmu_buf_hold(objset_t *os, uint64_t object, uint64_t offset,
     92     void *tag, dmu_buf_t **dbp)
     93 {
     94 	dnode_t *dn;
     95 	uint64_t blkid;
     96 	dmu_buf_impl_t *db;
     97 	int err;
     98 
     99 	err = dnode_hold(os->os, object, FTAG, &dn);
    100 	if (err)
    101 		return (err);
    102 	blkid = dbuf_whichblock(dn, offset);
    103 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
    104 	db = dbuf_hold(dn, blkid, tag);
    105 	rw_exit(&dn->dn_struct_rwlock);
    106 	if (db == NULL) {
    107 		err = EIO;
    108 	} else {
    109 		err = dbuf_read(db, NULL, DB_RF_CANFAIL);
    110 		if (err) {
    111 			dbuf_rele(db, tag);
    112 			db = NULL;
    113 		}
    114 	}
    115 
    116 	dnode_rele(dn, FTAG);
    117 	*dbp = &db->db;
    118 	return (err);
    119 }
    120 
    121 int
    122 dmu_bonus_max(void)
    123 {
    124 	return (DN_MAX_BONUSLEN);
    125 }
    126 
    127 int
    128 dmu_set_bonus(dmu_buf_t *db, int newsize, dmu_tx_t *tx)
    129 {
    130 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
    131 
    132 	if (dn->dn_bonus != (dmu_buf_impl_t *)db)
    133 		return (EINVAL);
    134 	if (newsize < 0 || newsize > db->db_size)
    135 		return (EINVAL);
    136 	dnode_setbonuslen(dn, newsize, tx);
    137 	return (0);
    138 }
    139 
    140 /*
    141  * returns ENOENT, EIO, or 0.
    142  */
    143 int
    144 dmu_bonus_hold(objset_t *os, uint64_t object, void *tag, dmu_buf_t **dbp)
    145 {
    146 	dnode_t *dn;
    147 	dmu_buf_impl_t *db;
    148 	int error;
    149 
    150 	error = dnode_hold(os->os, object, FTAG, &dn);
    151 	if (error)
    152 		return (error);
    153 
    154 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
    155 	if (dn->dn_bonus == NULL) {
    156 		rw_exit(&dn->dn_struct_rwlock);
    157 		rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
    158 		if (dn->dn_bonus == NULL)
    159 			dbuf_create_bonus(dn);
    160 	}
    161 	db = dn->dn_bonus;
    162 	rw_exit(&dn->dn_struct_rwlock);
    163 
    164 	/* as long as the bonus buf is held, the dnode will be held */
    165 	if (refcount_add(&db->db_holds, tag) == 1)
    166 		VERIFY(dnode_add_ref(dn, db));
    167 
    168 	dnode_rele(dn, FTAG);
    169 
    170 	VERIFY(0 == dbuf_read(db, NULL, DB_RF_MUST_SUCCEED));
    171 
    172 	*dbp = &db->db;
    173 	return (0);
    174 }
    175 
    176 /*
    177  * Note: longer-term, we should modify all of the dmu_buf_*() interfaces
    178  * to take a held dnode rather than <os, object> -- the lookup is wasteful,
    179  * and can induce severe lock contention when writing to several files
    180  * whose dnodes are in the same block.
    181  */
    182 static int
    183 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset,
    184     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
    185 {
    186 	dsl_pool_t *dp = NULL;
    187 	dmu_buf_t **dbp;
    188 	uint64_t blkid, nblks, i;
    189 	uint32_t flags;
    190 	int err;
    191 	zio_t *zio;
    192 	hrtime_t start;
    193 
    194 	ASSERT(length <= DMU_MAX_ACCESS);
    195 
    196 	flags = DB_RF_CANFAIL | DB_RF_NEVERWAIT;
    197 	if (length > zfetch_array_rd_sz)
    198 		flags |= DB_RF_NOPREFETCH;
    199 
    200 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
    201 	if (dn->dn_datablkshift) {
    202 		int blkshift = dn->dn_datablkshift;
    203 		nblks = (P2ROUNDUP(offset+length, 1ULL<<blkshift) -
    204 		    P2ALIGN(offset, 1ULL<<blkshift)) >> blkshift;
    205 	} else {
    206 		if (offset + length > dn->dn_datablksz) {
    207 			zfs_panic_recover("zfs: accessing past end of object "
    208 			    "%llx/%llx (size=%u access=%llu+%llu)",
    209 			    (longlong_t)dn->dn_objset->
    210 			    os_dsl_dataset->ds_object,
    211 			    (longlong_t)dn->dn_object, dn->dn_datablksz,
    212 			    (longlong_t)offset, (longlong_t)length);
    213 			return (EIO);
    214 		}
    215 		nblks = 1;
    216 	}
    217 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
    218 
    219 	if (dn->dn_objset->os_dsl_dataset)
    220 		dp = dn->dn_objset->os_dsl_dataset->ds_dir->dd_pool;
    221 	if (dp && dsl_pool_sync_context(dp))
    222 		start = gethrtime();
    223 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
    224 	blkid = dbuf_whichblock(dn, offset);
    225 	for (i = 0; i < nblks; i++) {
    226 		dmu_buf_impl_t *db = dbuf_hold(dn, blkid+i, tag);
    227 		if (db == NULL) {
    228 			rw_exit(&dn->dn_struct_rwlock);
    229 			dmu_buf_rele_array(dbp, nblks, tag);
    230 			zio_nowait(zio);
    231 			return (EIO);
    232 		}
    233 		/* initiate async i/o */
    234 		if (read) {
    235 			rw_exit(&dn->dn_struct_rwlock);
    236 			(void) dbuf_read(db, zio, flags);
    237 			rw_enter(&dn->dn_struct_rwlock, RW_READER);
    238 		}
    239 		dbp[i] = &db->db;
    240 	}
    241 	rw_exit(&dn->dn_struct_rwlock);
    242 
    243 	/* wait for async i/o */
    244 	err = zio_wait(zio);
    245 	/* track read overhead when we are in sync context */
    246 	if (dp && dsl_pool_sync_context(dp))
    247 		dp->dp_read_overhead += gethrtime() - start;
    248 	if (err) {
    249 		dmu_buf_rele_array(dbp, nblks, tag);
    250 		return (err);
    251 	}
    252 
    253 	/* wait for other io to complete */
    254 	if (read) {
    255 		for (i = 0; i < nblks; i++) {
    256 			dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbp[i];
    257 			mutex_enter(&db->db_mtx);
    258 			while (db->db_state == DB_READ ||
    259 			    db->db_state == DB_FILL)
    260 				cv_wait(&db->db_changed, &db->db_mtx);
    261 			if (db->db_state == DB_UNCACHED)
    262 				err = EIO;
    263 			mutex_exit(&db->db_mtx);
    264 			if (err) {
    265 				dmu_buf_rele_array(dbp, nblks, tag);
    266 				return (err);
    267 			}
    268 		}
    269 	}
    270 
    271 	*numbufsp = nblks;
    272 	*dbpp = dbp;
    273 	return (0);
    274 }
    275 
    276 static int
    277 dmu_buf_hold_array(objset_t *os, uint64_t object, uint64_t offset,
    278     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
    279 {
    280 	dnode_t *dn;
    281 	int err;
    282 
    283 	err = dnode_hold(os->os, object, FTAG, &dn);
    284 	if (err)
    285 		return (err);
    286 
    287 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
    288 	    numbufsp, dbpp);
    289 
    290 	dnode_rele(dn, FTAG);
    291 
    292 	return (err);
    293 }
    294 
    295 int
    296 dmu_buf_hold_array_by_bonus(dmu_buf_t *db, uint64_t offset,
    297     uint64_t length, int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp)
    298 {
    299 	dnode_t *dn = ((dmu_buf_impl_t *)db)->db_dnode;
    300 	int err;
    301 
    302 	err = dmu_buf_hold_array_by_dnode(dn, offset, length, read, tag,
    303 	    numbufsp, dbpp);
    304 
    305 	return (err);
    306 }
    307 
    308 void
    309 dmu_buf_rele_array(dmu_buf_t **dbp_fake, int numbufs, void *tag)
    310 {
    311 	int i;
    312 	dmu_buf_impl_t **dbp = (dmu_buf_impl_t **)dbp_fake;
    313 
    314 	if (numbufs == 0)
    315 		return;
    316 
    317 	for (i = 0; i < numbufs; i++) {
    318 		if (dbp[i])
    319 			dbuf_rele(dbp[i], tag);
    320 	}
    321 
    322 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
    323 }
    324 
    325 void
    326 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
    327 {
    328 	dnode_t *dn;
    329 	uint64_t blkid;
    330 	int nblks, i, err;
    331 
    332 	if (zfs_prefetch_disable)
    333 		return;
    334 
    335 	if (len == 0) {  /* they're interested in the bonus buffer */
    336 		dn = os->os->os_meta_dnode;
    337 
    338 		if (object == 0 || object >= DN_MAX_OBJECT)
    339 			return;
    340 
    341 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
    342 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
    343 		dbuf_prefetch(dn, blkid);
    344 		rw_exit(&dn->dn_struct_rwlock);
    345 		return;
    346 	}
    347 
    348 	/*
    349 	 * XXX - Note, if the dnode for the requested object is not
    350 	 * already cached, we will do a *synchronous* read in the
    351 	 * dnode_hold() call.  The same is true for any indirects.
    352 	 */
    353 	err = dnode_hold(os->os, object, FTAG, &dn);
    354 	if (err != 0)
    355 		return;
    356 
    357 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
    358 	if (dn->dn_datablkshift) {
    359 		int blkshift = dn->dn_datablkshift;
    360 		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
    361 		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
    362 	} else {
    363 		nblks = (offset < dn->dn_datablksz);
    364 	}
    365 
    366 	if (nblks != 0) {
    367 		blkid = dbuf_whichblock(dn, offset);
    368 		for (i = 0; i < nblks; i++)
    369 			dbuf_prefetch(dn, blkid+i);
    370 	}
    371 
    372 	rw_exit(&dn->dn_struct_rwlock);
    373 
    374 	dnode_rele(dn, FTAG);
    375 }
    376 
    377 static int
    378 get_next_chunk(dnode_t *dn, uint64_t *offset, uint64_t limit)
    379 {
    380 	uint64_t len = *offset - limit;
    381 	uint64_t chunk_len = dn->dn_datablksz * DMU_MAX_DELETEBLKCNT;
    382 	uint64_t subchunk =
    383 	    dn->dn_datablksz * EPB(dn->dn_indblkshift, SPA_BLKPTRSHIFT);
    384 
    385 	ASSERT(limit <= *offset);
    386 
    387 	if (len <= chunk_len) {
    388 		*offset = limit;
    389 		return (0);
    390 	}
    391 
    392 	ASSERT(ISP2(subchunk));
    393 
    394 	while (*offset > limit) {
    395 		uint64_t initial_offset = P2ROUNDUP(*offset, subchunk);
    396 		uint64_t delta;
    397 		int err;
    398 
    399 		/* skip over allocated data */
    400 		err = dnode_next_offset(dn,
    401 		    DNODE_FIND_HOLE|DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
    402 		if (err == ESRCH)
    403 			*offset = limit;
    404 		else if (err)
    405 			return (err);
    406 
    407 		ASSERT3U(*offset, <=, initial_offset);
    408 		*offset = P2ALIGN(*offset, subchunk);
    409 		delta = initial_offset - *offset;
    410 		if (delta >= chunk_len) {
    411 			*offset += delta - chunk_len;
    412 			return (0);
    413 		}
    414 		chunk_len -= delta;
    415 
    416 		/* skip over unallocated data */
    417 		err = dnode_next_offset(dn,
    418 		    DNODE_FIND_BACKWARDS, offset, 1, 1, 0);
    419 		if (err == ESRCH)
    420 			*offset = limit;
    421 		else if (err)
    422 			return (err);
    423 
    424 		if (*offset < limit)
    425 			*offset = limit;
    426 		ASSERT3U(*offset, <, initial_offset);
    427 	}
    428 	return (0);
    429 }
    430 
    431 static int
    432 dmu_free_long_range_impl(objset_t *os, dnode_t *dn, uint64_t offset,
    433     uint64_t length, boolean_t free_dnode)
    434 {
    435 	dmu_tx_t *tx;
    436 	uint64_t object_size, start, end, len;
    437 	boolean_t trunc = (length == DMU_OBJECT_END);
    438 	int align, err;
    439 
    440 	align = 1 << dn->dn_datablkshift;
    441 	ASSERT(align > 0);
    442 	object_size = align == 1 ? dn->dn_datablksz :
    443 	    (dn->dn_maxblkid + 1) << dn->dn_datablkshift;
    444 
    445 	if (trunc || (end = offset + length) > object_size)
    446 		end = object_size;
    447 	if (end <= offset)
    448 		return (0);
    449 	length = end - offset;
    450 
    451 	while (length) {
    452 		start = end;
    453 		err = get_next_chunk(dn, &start, offset);
    454 		if (err)
    455 			return (err);
    456 		len = trunc ? DMU_OBJECT_END : end - start;
    457 
    458 		tx = dmu_tx_create(os);
    459 		dmu_tx_hold_free(tx, dn->dn_object, start, len);
    460 		err = dmu_tx_assign(tx, TXG_WAIT);
    461 		if (err) {
    462 			dmu_tx_abort(tx);
    463 			return (err);
    464 		}
    465 
    466 		dnode_free_range(dn, start, trunc ? -1 : len, tx);
    467 
    468 		if (start == 0 && free_dnode) {
    469 			ASSERT(trunc);
    470 			dnode_free(dn, tx);
    471 		}
    472 
    473 		length -= end - start;
    474 
    475 		dmu_tx_commit(tx);
    476 		end = start;
    477 	}
    478 	return (0);
    479 }
    480 
    481 int
    482 dmu_free_long_range(objset_t *os, uint64_t object,
    483     uint64_t offset, uint64_t length)
    484 {
    485 	dnode_t *dn;
    486 	int err;
    487 
    488 	err = dnode_hold(os->os, object, FTAG, &dn);
    489 	if (err != 0)
    490 		return (err);
    491 	err = dmu_free_long_range_impl(os, dn, offset, length, FALSE);
    492 	dnode_rele(dn, FTAG);
    493 	return (err);
    494 }
    495 
    496 int
    497 dmu_free_object(objset_t *os, uint64_t object)
    498 {
    499 	dnode_t *dn;
    500 	dmu_tx_t *tx;
    501 	int err;
    502 
    503 	err = dnode_hold_impl(os->os, object, DNODE_MUST_BE_ALLOCATED,
    504 	    FTAG, &dn);
    505 	if (err != 0)
    506 		return (err);
    507 	if (dn->dn_nlevels == 1) {
    508 		tx = dmu_tx_create(os);
    509 		dmu_tx_hold_bonus(tx, object);
    510 		dmu_tx_hold_free(tx, dn->dn_object, 0, DMU_OBJECT_END);
    511 		err = dmu_tx_assign(tx, TXG_WAIT);
    512 		if (err == 0) {
    513 			dnode_free_range(dn, 0, DMU_OBJECT_END, tx);
    514 			dnode_free(dn, tx);
    515 			dmu_tx_commit(tx);
    516 		} else {
    517 			dmu_tx_abort(tx);
    518 		}
    519 	} else {
    520 		err = dmu_free_long_range_impl(os, dn, 0, DMU_OBJECT_END, TRUE);
    521 	}
    522 	dnode_rele(dn, FTAG);
    523 	return (err);
    524 }
    525 
    526 int
    527 dmu_free_range(objset_t *os, uint64_t object, uint64_t offset,
    528     uint64_t size, dmu_tx_t *tx)
    529 {
    530 	dnode_t *dn;
    531 	int err = dnode_hold(os->os, object, FTAG, &dn);
    532 	if (err)
    533 		return (err);
    534 	ASSERT(offset < UINT64_MAX);
    535 	ASSERT(size == -1ULL || size <= UINT64_MAX - offset);
    536 	dnode_free_range(dn, offset, size, tx);
    537 	dnode_rele(dn, FTAG);
    538 	return (0);
    539 }
    540 
    541 int
    542 dmu_read(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
    543     void *buf)
    544 {
    545 	dnode_t *dn;
    546 	dmu_buf_t **dbp;
    547 	int numbufs, i, err;
    548 
    549 	err = dnode_hold(os->os, object, FTAG, &dn);
    550 	if (err)
    551 		return (err);
    552 
    553 	/*
    554 	 * Deal with odd block sizes, where there can't be data past the first
    555 	 * block.  If we ever do the tail block optimization, we will need to
    556 	 * handle that here as well.
    557 	 */
    558 	if (dn->dn_datablkshift == 0) {
    559 		int newsz = offset > dn->dn_datablksz ? 0 :
    560 		    MIN(size, dn->dn_datablksz - offset);
    561 		bzero((char *)buf + newsz, size - newsz);
    562 		size = newsz;
    563 	}
    564 
    565 	while (size > 0) {
    566 		uint64_t mylen = MIN(size, DMU_MAX_ACCESS / 2);
    567 
    568 		/*
    569 		 * NB: we could do this block-at-a-time, but it's nice
    570 		 * to be reading in parallel.
    571 		 */
    572 		err = dmu_buf_hold_array_by_dnode(dn, offset, mylen,
    573 		    TRUE, FTAG, &numbufs, &dbp);
    574 		if (err)
    575 			break;
    576 
    577 		for (i = 0; i < numbufs; i++) {
    578 			int tocpy;
    579 			int bufoff;
    580 			dmu_buf_t *db = dbp[i];
    581 
    582 			ASSERT(size > 0);
    583 
    584 			bufoff = offset - db->db_offset;
    585 			tocpy = (int)MIN(db->db_size - bufoff, size);
    586 
    587 			bcopy((char *)db->db_data + bufoff, buf, tocpy);
    588 
    589 			offset += tocpy;
    590 			size -= tocpy;
    591 			buf = (char *)buf + tocpy;
    592 		}
    593 		dmu_buf_rele_array(dbp, numbufs, FTAG);
    594 	}
    595 	dnode_rele(dn, FTAG);
    596 	return (err);
    597 }
    598 
    599 void
    600 dmu_write(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
    601     const void *buf, dmu_tx_t *tx)
    602 {
    603 	dmu_buf_t **dbp;
    604 	int numbufs, i;
    605 
    606 	if (size == 0)
    607 		return;
    608 
    609 	VERIFY(0 == dmu_buf_hold_array(os, object, offset, size,
    610 	    FALSE, FTAG, &numbufs, &dbp));
    611 
    612 	for (i = 0; i < numbufs; i++) {
    613 		int tocpy;
    614 		int bufoff;
    615 		dmu_buf_t *db = dbp[i];
    616 
    617 		ASSERT(size > 0);
    618 
    619 		bufoff = offset - db->db_offset;
    620 		tocpy = (int)MIN(db->db_size - bufoff, size);
    621 
    622 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
    623 
    624 		if (tocpy == db->db_size)
    625 			dmu_buf_will_fill(db, tx);
    626 		else
    627 			dmu_buf_will_dirty(db, tx);
    628 
    629 		bcopy(buf, (char *)db->db_data + bufoff, tocpy);
    630 
    631 		if (tocpy == db->db_size)
    632 			dmu_buf_fill_done(db, tx);
    633 
    634 		offset += tocpy;
    635 		size -= tocpy;
    636 		buf = (char *)buf + tocpy;
    637 	}
    638 	dmu_buf_rele_array(dbp, numbufs, FTAG);
    639 }
    640 
    641 #ifdef _KERNEL
    642 int
    643 dmu_read_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size)
    644 {
    645 	dmu_buf_t **dbp;
    646 	int numbufs, i, err;
    647 
    648 	/*
    649 	 * NB: we could do this block-at-a-time, but it's nice
    650 	 * to be reading in parallel.
    651 	 */
    652 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size, TRUE, FTAG,
    653 	    &numbufs, &dbp);
    654 	if (err)
    655 		return (err);
    656 
    657 	for (i = 0; i < numbufs; i++) {
    658 		int tocpy;
    659 		int bufoff;
    660 		dmu_buf_t *db = dbp[i];
    661 
    662 		ASSERT(size > 0);
    663 
    664 		bufoff = uio->uio_loffset - db->db_offset;
    665 		tocpy = (int)MIN(db->db_size - bufoff, size);
    666 
    667 		err = uiomove((char *)db->db_data + bufoff, tocpy,
    668 		    UIO_READ, uio);
    669 		if (err)
    670 			break;
    671 
    672 		size -= tocpy;
    673 	}
    674 	dmu_buf_rele_array(dbp, numbufs, FTAG);
    675 
    676 	return (err);
    677 }
    678 
    679 int
    680 dmu_write_uio(objset_t *os, uint64_t object, uio_t *uio, uint64_t size,
    681     dmu_tx_t *tx)
    682 {
    683 	dmu_buf_t **dbp;
    684 	int numbufs, i;
    685 	int err = 0;
    686 
    687 	if (size == 0)
    688 		return (0);
    689 
    690 	err = dmu_buf_hold_array(os, object, uio->uio_loffset, size,
    691 	    FALSE, FTAG, &numbufs, &dbp);
    692 	if (err)
    693 		return (err);
    694 
    695 	for (i = 0; i < numbufs; i++) {
    696 		int tocpy;
    697 		int bufoff;
    698 		dmu_buf_t *db = dbp[i];
    699 
    700 		ASSERT(size > 0);
    701 
    702 		bufoff = uio->uio_loffset - db->db_offset;
    703 		tocpy = (int)MIN(db->db_size - bufoff, size);
    704 
    705 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
    706 
    707 		if (tocpy == db->db_size)
    708 			dmu_buf_will_fill(db, tx);
    709 		else
    710 			dmu_buf_will_dirty(db, tx);
    711 
    712 		/*
    713 		 * XXX uiomove could block forever (eg. nfs-backed
    714 		 * pages).  There needs to be a uiolockdown() function
    715 		 * to lock the pages in memory, so that uiomove won't
    716 		 * block.
    717 		 */
    718 		err = uiomove((char *)db->db_data + bufoff, tocpy,
    719 		    UIO_WRITE, uio);
    720 
    721 		if (tocpy == db->db_size)
    722 			dmu_buf_fill_done(db, tx);
    723 
    724 		if (err)
    725 			break;
    726 
    727 		size -= tocpy;
    728 	}
    729 	dmu_buf_rele_array(dbp, numbufs, FTAG);
    730 	return (err);
    731 }
    732 
    733 int
    734 dmu_write_pages(objset_t *os, uint64_t object, uint64_t offset, uint64_t size,
    735     page_t *pp, dmu_tx_t *tx)
    736 {
    737 	dmu_buf_t **dbp;
    738 	int numbufs, i;
    739 	int err;
    740 
    741 	if (size == 0)
    742 		return (0);
    743 
    744 	err = dmu_buf_hold_array(os, object, offset, size,
    745 	    FALSE, FTAG, &numbufs, &dbp);
    746 	if (err)
    747 		return (err);
    748 
    749 	for (i = 0; i < numbufs; i++) {
    750 		int tocpy, copied, thiscpy;
    751 		int bufoff;
    752 		dmu_buf_t *db = dbp[i];
    753 		caddr_t va;
    754 
    755 		ASSERT(size > 0);
    756 		ASSERT3U(db->db_size, >=, PAGESIZE);
    757 
    758 		bufoff = offset - db->db_offset;
    759 		tocpy = (int)MIN(db->db_size - bufoff, size);
    760 
    761 		ASSERT(i == 0 || i == numbufs-1 || tocpy == db->db_size);
    762 
    763 		if (tocpy == db->db_size)
    764 			dmu_buf_will_fill(db, tx);
    765 		else
    766 			dmu_buf_will_dirty(db, tx);
    767 
    768 		for (copied = 0; copied < tocpy; copied += PAGESIZE) {
    769 			ASSERT3U(pp->p_offset, ==, db->db_offset + bufoff);
    770 			thiscpy = MIN(PAGESIZE, tocpy - copied);
    771 			va = zfs_map_page(pp, S_READ);
    772 			bcopy(va, (char *)db->db_data + bufoff, thiscpy);
    773 			zfs_unmap_page(pp, va);
    774 			pp = pp->p_next;
    775 			bufoff += PAGESIZE;
    776 		}
    777 
    778 		if (tocpy == db->db_size)
    779 			dmu_buf_fill_done(db, tx);
    780 
    781 		if (err)
    782 			break;
    783 
    784 		offset += tocpy;
    785 		size -= tocpy;
    786 	}
    787 	dmu_buf_rele_array(dbp, numbufs, FTAG);
    788 	return (err);
    789 }
    790 #endif
    791 
    792 typedef struct {
    793 	dbuf_dirty_record_t	*dr;
    794 	dmu_sync_cb_t		*done;
    795 	void			*arg;
    796 } dmu_sync_arg_t;
    797 
    798 /* ARGSUSED */
    799 static void
    800 dmu_sync_ready(zio_t *zio, arc_buf_t *buf, void *varg)
    801 {
    802 	blkptr_t *bp = zio->io_bp;
    803 
    804 	if (!BP_IS_HOLE(bp)) {
    805 		dmu_sync_arg_t *in = varg;
    806 		dbuf_dirty_record_t *dr = in->dr;
    807 		dmu_buf_impl_t *db = dr->dr_dbuf;
    808 		ASSERT(BP_GET_TYPE(bp) == db->db_dnode->dn_type);
    809 		ASSERT(BP_GET_LEVEL(bp) == 0);
    810 		bp->blk_fill = 1;
    811 	}
    812 }
    813 
    814 /* ARGSUSED */
    815 static void
    816 dmu_sync_done(zio_t *zio, arc_buf_t *buf, void *varg)
    817 {
    818 	dmu_sync_arg_t *in = varg;
    819 	dbuf_dirty_record_t *dr = in->dr;
    820 	dmu_buf_impl_t *db = dr->dr_dbuf;
    821 	dmu_sync_cb_t *done = in->done;
    822 
    823 	mutex_enter(&db->db_mtx);
    824 	ASSERT(dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC);
    825 	dr->dt.dl.dr_overridden_by = *zio->io_bp; /* structure assignment */
    826 	dr->dt.dl.dr_override_state = DR_OVERRIDDEN;
    827 	cv_broadcast(&db->db_changed);
    828 	mutex_exit(&db->db_mtx);
    829 
    830 	if (done)
    831 		done(&(db->db), in->arg);
    832 
    833 	kmem_free(in, sizeof (dmu_sync_arg_t));
    834 }
    835 
    836 /*
    837  * Intent log support: sync the block associated with db to disk.
    838  * N.B. and XXX: the caller is responsible for making sure that the
    839  * data isn't changing while dmu_sync() is writing it.
    840  *
    841  * Return values:
    842  *
    843  *	EEXIST: this txg has already been synced, so there's nothing to to.
    844  *		The caller should not log the write.
    845  *
    846  *	ENOENT: the block was dbuf_free_range()'d, so there's nothing to do.
    847  *		The caller should not log the write.
    848  *
    849  *	EALREADY: this block is already in the process of being synced.
    850  *		The caller should track its progress (somehow).
    851  *
    852  *	EINPROGRESS: the IO has been initiated.
    853  *		The caller should log this blkptr in the callback.
    854  *
    855  *	0: completed.  Sets *bp to the blkptr just written.
    856  *		The caller should log this blkptr immediately.
    857  */
    858 int
    859 dmu_sync(zio_t *pio, dmu_buf_t *db_fake,
    860     blkptr_t *bp, uint64_t txg, dmu_sync_cb_t *done, void *arg)
    861 {
    862 	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
    863 	objset_impl_t *os = db->db_objset;
    864 	dsl_pool_t *dp = os->os_dsl_dataset->ds_dir->dd_pool;
    865 	tx_state_t *tx = &dp->dp_tx;
    866 	dbuf_dirty_record_t *dr;
    867 	dmu_sync_arg_t *in;
    868 	zbookmark_t zb;
    869 	writeprops_t wp = { 0 };
    870 	zio_t *zio;
    871 	int err;
    872 
    873 	ASSERT(BP_IS_HOLE(bp));
    874 	ASSERT(txg != 0);
    875 
    876 	dprintf("dmu_sync txg=%llu, s,o,q %llu %llu %llu\n",
    877 	    txg, tx->tx_synced_txg, tx->tx_open_txg, tx->tx_quiesced_txg);
    878 
    879 	/*
    880 	 * XXX - would be nice if we could do this without suspending...
    881 	 */
    882 	txg_suspend(dp);
    883 
    884 	/*
    885 	 * If this txg already synced, there's nothing to do.
    886 	 */
    887 	if (txg <= tx->tx_synced_txg) {
    888 		txg_resume(dp);
    889 		/*
    890 		 * If we're running ziltest, we need the blkptr regardless.
    891 		 */
    892 		if (txg > spa_freeze_txg(dp->dp_spa)) {
    893 			/* if db_blkptr == NULL, this was an empty write */
    894 			if (db->db_blkptr)
    895 				*bp = *db->db_blkptr; /* structure assignment */
    896 			return (0);
    897 		}
    898 		return (EEXIST);
    899 	}
    900 
    901 	mutex_enter(&db->db_mtx);
    902 
    903 	if (txg == tx->tx_syncing_txg) {
    904 		while (db->db_data_pending) {
    905 			/*
    906 			 * IO is in-progress.  Wait for it to finish.
    907 			 * XXX - would be nice to be able to somehow "attach"
    908 			 * this zio to the parent zio passed in.
    909 			 */
    910 			cv_wait(&db->db_changed, &db->db_mtx);
    911 			if (!db->db_data_pending &&
    912 			    db->db_blkptr && BP_IS_HOLE(db->db_blkptr)) {
    913 				/*
    914 				 * IO was compressed away
    915 				 */
    916 				*bp = *db->db_blkptr; /* structure assignment */
    917 				mutex_exit(&db->db_mtx);
    918 				txg_resume(dp);
    919 				return (0);
    920 			}
    921 			ASSERT(db->db_data_pending ||
    922 			    (db->db_blkptr && db->db_blkptr->blk_birth == txg));
    923 		}
    924 
    925 		if (db->db_blkptr && db->db_blkptr->blk_birth == txg) {
    926 			/*
    927 			 * IO is already completed.
    928 			 */
    929 			*bp = *db->db_blkptr; /* structure assignment */
    930 			mutex_exit(&db->db_mtx);
    931 			txg_resume(dp);
    932 			return (0);
    933 		}
    934 	}
    935 
    936 	dr = db->db_last_dirty;
    937 	while (dr && dr->dr_txg > txg)
    938 		dr = dr->dr_next;
    939 	if (dr == NULL || dr->dr_txg < txg) {
    940 		/*
    941 		 * This dbuf isn't dirty, must have been free_range'd.
    942 		 * There's no need to log writes to freed blocks, so we're done.
    943 		 */
    944 		mutex_exit(&db->db_mtx);
    945 		txg_resume(dp);
    946 		return (ENOENT);
    947 	}
    948 
    949 	ASSERT(dr->dr_txg == txg);
    950 	if (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
    951 		/*
    952 		 * We have already issued a sync write for this buffer.
    953 		 */
    954 		mutex_exit(&db->db_mtx);
    955 		txg_resume(dp);
    956 		return (EALREADY);
    957 	} else if (dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
    958 		/*
    959 		 * This buffer has already been synced.  It could not
    960 		 * have been dirtied since, or we would have cleared the state.
    961 		 */
    962 		*bp = dr->dt.dl.dr_overridden_by; /* structure assignment */
    963 		mutex_exit(&db->db_mtx);
    964 		txg_resume(dp);
    965 		return (0);
    966 	}
    967 
    968 	dr->dt.dl.dr_override_state = DR_IN_DMU_SYNC;
    969 	in = kmem_alloc(sizeof (dmu_sync_arg_t), KM_SLEEP);
    970 	in->dr = dr;
    971 	in->done = done;
    972 	in->arg = arg;
    973 	mutex_exit(&db->db_mtx);
    974 	txg_resume(dp);
    975 
    976 	zb.zb_objset =