Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     27 
     28 #include <sys/zfs_context.h>
     29 #include <sys/txg_impl.h>
     30 #include <sys/dmu_impl.h>
     31 #include <sys/dsl_pool.h>
     32 #include <sys/callb.h>
     33 
     34 /*
     35  * Pool-wide transaction groups.
     36  */
     37 
     38 static void txg_sync_thread(dsl_pool_t *dp);
     39 static void txg_quiesce_thread(dsl_pool_t *dp);
     40 
     41 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
     42 int zfs_txg_synctime = 5;	/* target seconds to sync a txg */
     43 
     44 int zfs_write_limit_shift = 3;	/* 1/8th of physical memory */
     45 
     46 uint64_t zfs_write_limit_min = 32 << 20; /* min write limit is 32MB */
     47 uint64_t zfs_write_limit_max = 0; /* max data payload per txg */
     48 uint64_t zfs_write_limit_inflated = 0;
     49 
     50 /*
     51  * Prepare the txg subsystem.
     52  */
     53 void
     54 txg_init(dsl_pool_t *dp, uint64_t txg)
     55 {
     56 	tx_state_t *tx = &dp->dp_tx;
     57 	int c;
     58 	bzero(tx, sizeof (tx_state_t));
     59 
     60 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
     61 
     62 	for (c = 0; c < max_ncpus; c++) {
     63 		int i;
     64 
     65 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
     66 		for (i = 0; i < TXG_SIZE; i++) {
     67 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
     68 			    NULL);
     69 		}
     70 	}
     71 
     72 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
     73 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
     74 
     75 	tx->tx_open_txg = txg;
     76 }
     77 
     78 /*
     79  * Close down the txg subsystem.
     80  */
     81 void
     82 txg_fini(dsl_pool_t *dp)
     83 {
     84 	tx_state_t *tx = &dp->dp_tx;
     85 	int c;
     86 
     87 	ASSERT(tx->tx_threads == 0);
     88 
     89 	rw_destroy(&tx->tx_suspend);
     90 	mutex_destroy(&tx->tx_sync_lock);
     91 
     92 	for (c = 0; c < max_ncpus; c++) {
     93 		int i;
     94 
     95 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
     96 		for (i = 0; i < TXG_SIZE; i++)
     97 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
     98 	}
     99 
    100 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
    101 
    102 	bzero(tx, sizeof (tx_state_t));
    103 }
    104 
    105 /*
    106  * Start syncing transaction groups.
    107  */
    108 void
    109 txg_sync_start(dsl_pool_t *dp)
    110 {
    111 	tx_state_t *tx = &dp->dp_tx;
    112 
    113 	mutex_enter(&tx->tx_sync_lock);
    114 
    115 	dprintf("pool %p\n", dp);
    116 
    117 	ASSERT(tx->tx_threads == 0);
    118 
    119 	tx->tx_threads = 2;
    120 
    121 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    122 	    dp, 0, &p0, TS_RUN, minclsyspri);
    123 
    124 	tx->tx_sync_thread = thread_create(NULL, 0, txg_sync_thread,
    125 	    dp, 0, &p0, TS_RUN, minclsyspri);
    126 
    127 	mutex_exit(&tx->tx_sync_lock);
    128 }
    129 
    130 static void
    131 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    132 {
    133 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    134 	mutex_enter(&tx->tx_sync_lock);
    135 }
    136 
    137 static void
    138 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    139 {
    140 	ASSERT(*tpp != NULL);
    141 	*tpp = NULL;
    142 	tx->tx_threads--;
    143 	cv_broadcast(&tx->tx_exit_cv);
    144 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    145 	thread_exit();
    146 }
    147 
    148 static void
    149 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
    150 {
    151 	CALLB_CPR_SAFE_BEGIN(cpr);
    152 
    153 	if (time)
    154 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
    155 	else
    156 		cv_wait(cv, &tx->tx_sync_lock);
    157 
    158 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    159 }
    160 
    161 /*
    162  * Stop syncing transaction groups.
    163  */
    164 void
    165 txg_sync_stop(dsl_pool_t *dp)
    166 {
    167 	tx_state_t *tx = &dp->dp_tx;
    168 
    169 	dprintf("pool %p\n", dp);
    170 	/*
    171 	 * Finish off any work in progress.
    172 	 */
    173 	ASSERT(tx->tx_threads == 2);
    174 	txg_wait_synced(dp, 0);
    175 
    176 	/*
    177 	 * Wake all sync threads and wait for them to die.
    178 	 */
    179 	mutex_enter(&tx->tx_sync_lock);
    180 
    181 	ASSERT(tx->tx_threads == 2);
    182 
    183 	tx->tx_exiting = 1;
    184 
    185 	cv_broadcast(&tx->tx_quiesce_more_cv);
    186 	cv_broadcast(&tx->tx_quiesce_done_cv);
    187 	cv_broadcast(&tx->tx_sync_more_cv);
    188 
    189 	while (tx->tx_threads != 0)
    190 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    191 
    192 	tx->tx_exiting = 0;
    193 
    194 	mutex_exit(&tx->tx_sync_lock);
    195 }
    196 
    197 uint64_t
    198 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    199 {
    200 	tx_state_t *tx = &dp->dp_tx;
    201 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    202 	uint64_t txg;
    203 
    204 	mutex_enter(&tc->tc_lock);
    205 
    206 	txg = tx->tx_open_txg;
    207 	tc->tc_count[txg & TXG_MASK]++;
    208 
    209 	th->th_cpu = tc;
    210 	th->th_txg = txg;
    211 
    212 	return (txg);
    213 }
    214 
    215 void
    216 txg_rele_to_quiesce(txg_handle_t *th)
    217 {
    218 	tx_cpu_t *tc = th->th_cpu;
    219 
    220 	mutex_exit(&tc->tc_lock);
    221 }
    222 
    223 void
    224 txg_rele_to_sync(txg_handle_t *th)
    225 {
    226 	tx_cpu_t *tc = th->th_cpu;
    227 	int g = th->th_txg & TXG_MASK;
    228 
    229 	mutex_enter(&tc->tc_lock);
    230 	ASSERT(tc->tc_count[g] != 0);
    231 	if (--tc->tc_count[g] == 0)
    232 		cv_broadcast(&tc->tc_cv[g]);
    233 	mutex_exit(&tc->tc_lock);
    234 
    235 	th->th_cpu = NULL;	/* defensive */
    236 }
    237 
    238 static void
    239 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    240 {
    241 	tx_state_t *tx = &dp->dp_tx;
    242 	int g = txg & TXG_MASK;
    243 	int c;
    244 
    245 	/*
    246 	 * Grab all tx_cpu locks so nobody else can get into this txg.
    247 	 */
    248 	for (c = 0; c < max_ncpus; c++)
    249 		mutex_enter(&tx->tx_cpu[c].tc_lock);
    250 
    251 	ASSERT(txg == tx->tx_open_txg);
    252 	tx->tx_open_txg++;
    253 
    254 	/*
    255 	 * Now that we've incremented tx_open_txg, we can let threads
    256 	 * enter the next transaction group.
    257 	 */
    258 	for (c = 0; c < max_ncpus; c++)
    259 		mutex_exit(&tx->tx_cpu[c].tc_lock);
    260 
    261 	/*
    262 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    263 	 */
    264 	for (c = 0; c < max_ncpus; c++) {
    265 		tx_cpu_t *tc = &tx->tx_cpu[c];
    266 		mutex_enter(&tc->tc_lock);
    267 		while (tc->tc_count[g] != 0)
    268 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    269 		mutex_exit(&tc->tc_lock);
    270 	}
    271 }
    272 
    273 static void
    274 txg_sync_thread(dsl_pool_t *dp)
    275 {
    276 	tx_state_t *tx = &dp->dp_tx;
    277 	callb_cpr_t cpr;
    278 	uint64_t timeout, start, delta, timer;
    279 	int target;
    280 
    281 	txg_thread_enter(tx, &cpr);
    282 
    283 	start = delta = 0;
    284 	timeout = zfs_txg_timeout * hz;
    285 	for (;;) {
    286 		uint64_t txg, written;
    287 
    288 		/*
    289 		 * We sync when there's someone waiting on us, or the
    290 		 * quiesce thread has handed off a txg to us, or we have
    291 		 * reached our timeout.
    292 		 */
    293 		timer = (delta >= timeout ? 0 : timeout - delta);
    294 		while (!tx->tx_exiting && timer > 0 &&
    295 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    296 		    tx->tx_quiesced_txg == 0) {
    297 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    298 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    299 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
    300 			delta = lbolt - start;
    301 			timer = (delta > timeout ? 0 : timeout - delta);
    302 		}
    303 
    304 		/*
    305 		 * Wait until the quiesce thread hands off a txg to us,
    306 		 * prompting it to do so if necessary.
    307 		 */
    308 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    309 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    310 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    311 			cv_broadcast(&tx->tx_quiesce_more_cv);
    312 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    313 		}
    314 
    315 		if (tx->tx_exiting)
    316 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    317 
    318 		rw_enter(&tx->tx_suspend, RW_WRITER);
    319 
    320 		/*
    321 		 * Consume the quiesced txg which has been handed off to
    322 		 * us.  This may cause the quiescing thread to now be
    323 		 * able to quiesce another txg, so we must signal it.
    324 		 */
    325 		txg = tx->tx_quiesced_txg;
    326 		tx->tx_quiesced_txg = 0;
    327 		tx->tx_syncing_txg = txg;
    328 		cv_broadcast(&tx->tx_quiesce_more_cv);
    329 		rw_exit(&tx->tx_suspend);
    330 
    331 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    332 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    333 		mutex_exit(&tx->tx_sync_lock);
    334 		start = lbolt;
    335 		spa_sync(dp->dp_spa, txg);
    336 		delta = (lbolt - start) + 1;
    337 
    338 		written = dp->dp_space_towrite[txg & TXG_MASK];
    339 		dp->dp_space_towrite[txg & TXG_MASK] = 0;
    340 		ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
    341 
    342 		/*
    343 		 * If the write limit max has not been explicitly set, set it
    344 		 * to a fraction of available phisical memory (default 1/8th).
    345 		 * Note that we must inflate the limit because the spa
    346 		 * inflates write sizes to account for data replication.
    347 		 * Check this each sync phase to catch changing memory size.
    348 		 */
    349 		if (zfs_write_limit_inflated == 0 ||
    350 		    (zfs_write_limit_shift && zfs_write_limit_max !=
    351 		    physmem * PAGESIZE >> zfs_write_limit_shift)) {
    352 			zfs_write_limit_max =
    353 			    physmem * PAGESIZE >> zfs_write_limit_shift;
    354 			zfs_write_limit_inflated =
    355 			    spa_get_asize(dp->dp_spa, zfs_write_limit_max);
    356 			if (zfs_write_limit_min > zfs_write_limit_inflated)
    357 				zfs_write_limit_inflated = zfs_write_limit_min;
    358 		}
    359 
    360 		/*
    361 		 * Attempt to keep the sync time consistant by adjusting the
    362 		 * amount of write traffic allowed into each transaction group.
    363 		 */
    364 		target = zfs_txg_synctime * hz;
    365 		if (delta > target) {
    366 			uint64_t old = MIN(dp->dp_write_limit, written);
    367 
    368 			dp->dp_write_limit = MAX(zfs_write_limit_min,
    369 			    old * target / delta);
    370 		} else if (written >= dp->dp_write_limit &&
    371 		    delta >> 3 < target >> 3) {
    372 			uint64_t rescale =
    373 			    MIN((100 * target) / delta, 200);
    374 
    375 			dp->dp_write_limit = MIN(zfs_write_limit_inflated,
    376 			    written * rescale / 100);
    377 		}
    378 
    379 		mutex_enter(&tx->tx_sync_lock);
    380 		rw_enter(&tx->tx_suspend, RW_WRITER);
    381 		tx->tx_synced_txg = txg;
    382 		tx->tx_syncing_txg = 0;
    383 		rw_exit(&tx->tx_suspend);
    384 		cv_broadcast(&tx->tx_sync_done_cv);
    385 	}
    386 }
    387 
    388 static void
    389 txg_quiesce_thread(dsl_pool_t *dp)
    390 {
    391 	tx_state_t *tx = &dp->dp_tx;
    392 	callb_cpr_t cpr;
    393 
    394 	txg_thread_enter(tx, &cpr);
    395 
    396 	for (;;) {
    397 		uint64_t txg;
    398 
    399 		/*
    400 		 * We quiesce when there's someone waiting on us.
    401 		 * However, we can only have one txg in "quiescing" or
    402 		 * "quiesced, waiting to sync" state.  So we wait until
    403 		 * the "quiesced, waiting to sync" txg has been consumed
    404 		 * by the sync thread.
    405 		 */
    406 		while (!tx->tx_exiting &&
    407 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    408 		    tx->tx_quiesced_txg != 0))
    409 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    410 
    411 		if (tx->tx_exiting)
    412 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    413 
    414 		txg = tx->tx_open_txg;
    415 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    416 		    txg, tx->tx_quiesce_txg_waiting,
    417 		    tx->tx_sync_txg_waiting);
    418 		mutex_exit(&tx->tx_sync_lock);
    419 		txg_quiesce(dp, txg);
    420 		mutex_enter(&tx->tx_sync_lock);
    421 
    422 		/*
    423 		 * Hand this txg off to the sync thread.
    424 		 */
    425 		dprintf("quiesce done, handing off txg %llu\n", txg);
    426 		tx->tx_quiesced_txg = txg;
    427 		cv_broadcast(&tx->tx_sync_more_cv);
    428 		cv_broadcast(&tx->tx_quiesce_done_cv);
    429 	}
    430 }
    431 
    432 /*
    433  * Delay this thread by 'ticks' if we are still in the open transaction
    434  * group and there is already a waiting txg quiesing or quiesced.  Abort
    435  * the delay if this txg stalls or enters the quiesing state.
    436  */
    437 void
    438 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
    439 {
    440 	tx_state_t *tx = &dp->dp_tx;
    441 	int timeout = lbolt + ticks;
    442 
    443 	/* don't delay if this txg could transition to quiesing immediately */
    444 	if (tx->tx_open_txg > txg ||
    445 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
    446 		return;
    447 
    448 	mutex_enter(&tx->tx_sync_lock);
    449 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
    450 		mutex_exit(&tx->tx_sync_lock);
    451 		return;
    452 	}
    453 
    454 	while (lbolt < timeout &&
    455 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
    456 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
    457 		    timeout);
    458 
    459 	mutex_exit(&tx->tx_sync_lock);
    460 }
    461 
    462 void
    463 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    464 {
    465 	tx_state_t *tx = &dp->dp_tx;
    466 
    467 	mutex_enter(&tx->tx_sync_lock);
    468 	ASSERT(tx->tx_threads == 2);
    469 	if (txg == 0)
    470 		txg = tx->tx_open_txg;
    471 	if (tx->tx_sync_txg_waiting < txg)
    472 		tx->tx_sync_txg_waiting = txg;
    473 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    474 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    475 	while (tx->tx_synced_txg < txg) {
    476 		dprintf("broadcasting sync more "
    477 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    478 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    479 		cv_broadcast(&tx->tx_sync_more_cv);
    480 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    481 	}
    482 	mutex_exit(&tx->tx_sync_lock);
    483 }
    484 
    485 void
    486 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    487 {
    488 	tx_state_t *tx = &dp->dp_tx;
    489 
    490 	mutex_enter(&tx->tx_sync_lock);
    491 	ASSERT(tx->tx_threads == 2);
    492 	if (txg == 0)
    493 		txg = tx->tx_open_txg + 1;
    494 	if (tx->tx_quiesce_txg_waiting < txg)
    495 		tx->tx_quiesce_txg_waiting = txg;
    496 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    497 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    498 	while (tx->tx_open_txg < txg) {
    499 		cv_broadcast(&tx->tx_quiesce_more_cv);
    500 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    501 	}
    502 	mutex_exit(&tx->tx_sync_lock);
    503 }
    504 
    505 int
    506 txg_stalled(dsl_pool_t *dp)
    507 {
    508 	tx_state_t *tx = &dp->dp_tx;
    509 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    510 }
    511 
    512 void
    513 txg_suspend(dsl_pool_t *dp)
    514 {
    515 	tx_state_t *tx = &dp->dp_tx;
    516 	/* XXX some code paths suspend when they are already suspended! */
    517 	rw_enter(&tx->tx_suspend, RW_READER);
    518 }
    519 
    520 void
    521 txg_resume(dsl_pool_t *dp)
    522 {
    523 	tx_state_t *tx = &dp->dp_tx;
    524 	rw_exit(&tx->tx_suspend);
    525 }
    526 
    527 /*
    528  * Per-txg object lists.
    529  */
    530 void
    531 txg_list_create(txg_list_t *tl, size_t offset)
    532 {
    533 	int t;
    534 
    535 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    536 
    537 	tl->tl_offset = offset;
    538 
    539 	for (t = 0; t < TXG_SIZE; t++)
    540 		tl->tl_head[t] = NULL;
    541 }
    542 
    543 void
    544 txg_list_destroy(txg_list_t *tl)
    545 {
    546 	int t;
    547 
    548 	for (t = 0; t < TXG_SIZE; t++)
    549 		ASSERT(txg_list_empty(tl, t));
    550 
    551 	mutex_destroy(&tl->tl_lock);
    552 }
    553 
    554 int
    555 txg_list_empty(txg_list_t *tl, uint64_t txg)
    556 {
    557 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    558 }
    559 
    560 /*
    561  * Add an entry to the list.
    562  * Returns 0 if it's a new entry, 1 if it's already there.
    563  */
    564 int
    565 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    566 {
    567 	int t = txg & TXG_MASK;
    568 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    569 	int already_on_list;
    570 
    571 	mutex_enter(&tl->tl_lock);
    572 	already_on_list = tn->tn_member[t];
    573 	if (!already_on_list) {
    574 		tn->tn_member[t] = 1;
    575 		tn->tn_next[t] = tl->tl_head[t];
    576 		tl->tl_head[t] = tn;
    577 	}
    578 	mutex_exit(&tl->tl_lock);
    579 
    580 	return (already_on_list);
    581 }
    582 
    583 /*
    584  * Remove the head of the list and return it.
    585  */
    586 void *
    587 txg_list_remove(txg_list_t *tl, uint64_t txg)
    588 {
    589 	int t = txg & TXG_MASK;
    590 	txg_node_t *tn;
    591 	void *p = NULL;
    592 
    593 	mutex_enter(&tl->tl_lock);
    594 	if ((tn = tl->tl_head[t]) != NULL) {
    595 		p = (char *)tn - tl->tl_offset;
    596 		tl->tl_head[t] = tn->tn_next[t];
    597 		tn->tn_next[t] = NULL;
    598 		tn->tn_member[t] = 0;
    599 	}
    600 	mutex_exit(&tl->tl_lock);
    601 
    602 	return (p);
    603 }
    604 
    605 /*
    606  * Remove a specific item from the list and return it.
    607  */
    608 void *
    609 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    610 {
    611 	int t = txg & TXG_MASK;
    612 	txg_node_t *tn, **tp;
    613 
    614 	mutex_enter(&tl->tl_lock);
    615 
    616 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    617 		if ((char *)tn - tl->tl_offset == p) {
    618 			*tp = tn->tn_next[t];
    619 			tn->tn_next[t] = NULL;
    620 			tn->tn_member[t] = 0;
    621 			mutex_exit(&tl->tl_lock);
    622 			return (p);
    623 		}
    624 	}
    625 
    626 	mutex_exit(&tl->tl_lock);
    627 
    628 	return (NULL);
    629 }
    630 
    631 int
    632 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    633 {
    634 	int t = txg & TXG_MASK;
    635 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    636 
    637 	return (tn->tn_member[t]);
    638 }
    639 
    640 /*
    641  * Walk a txg list -- only safe if you know it's not changing.
    642  */
    643 void *
    644 txg_list_head(txg_list_t *tl, uint64_t txg)
    645 {
    646 	int t = txg & TXG_MASK;
    647 	txg_node_t *tn = tl->tl_head[t];
    648 
    649 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    650 }
    651 
    652 void *
    653 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    654 {
    655 	int t = txg & TXG_MASK;
    656 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    657 
    658 	tn = tn->tn_next[t];
    659 
    660 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    661 }
    662