Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/zfs_context.h>
     27 #include <sys/txg_impl.h>
     28 #include <sys/dmu_impl.h>
     29 #include <sys/dmu_tx.h>
     30 #include <sys/dsl_pool.h>
     31 #include <sys/callb.h>
     32 
     33 /*
     34  * Pool-wide transaction groups.
     35  */
     36 
     37 static void txg_sync_thread(dsl_pool_t *dp);
     38 static void txg_quiesce_thread(dsl_pool_t *dp);
     39 
     40 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
     41 
     42 /*
     43  * Prepare the txg subsystem.
     44  */
     45 void
     46 txg_init(dsl_pool_t *dp, uint64_t txg)
     47 {
     48 	tx_state_t *tx = &dp->dp_tx;
     49 	int c;
     50 	bzero(tx, sizeof (tx_state_t));
     51 
     52 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
     53 
     54 	for (c = 0; c < max_ncpus; c++) {
     55 		int i;
     56 
     57 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
     58 		for (i = 0; i < TXG_SIZE; i++) {
     59 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
     60 			    NULL);
     61 			list_create(&tx->tx_cpu[c].tc_callbacks[i],
     62 			    sizeof (dmu_tx_callback_t),
     63 			    offsetof(dmu_tx_callback_t, dcb_node));
     64 		}
     65 	}
     66 
     67 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
     68 
     69 	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
     70 	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
     71 	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
     72 	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
     73 	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
     74 
     75 	tx->tx_open_txg = txg;
     76 }
     77 
     78 /*
     79  * Close down the txg subsystem.
     80  */
     81 void
     82 txg_fini(dsl_pool_t *dp)
     83 {
     84 	tx_state_t *tx = &dp->dp_tx;
     85 	int c;
     86 
     87 	ASSERT(tx->tx_threads == 0);
     88 
     89 	mutex_destroy(&tx->tx_sync_lock);
     90 
     91 	cv_destroy(&tx->tx_sync_more_cv);
     92 	cv_destroy(&tx->tx_sync_done_cv);
     93 	cv_destroy(&tx->tx_quiesce_more_cv);
     94 	cv_destroy(&tx->tx_quiesce_done_cv);
     95 	cv_destroy(&tx->tx_exit_cv);
     96 
     97 	for (c = 0; c < max_ncpus; c++) {
     98 		int i;
     99 
    100 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
    101 		for (i = 0; i < TXG_SIZE; i++) {
    102 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
    103 			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
    104 		}
    105 	}
    106 
    107 	if (tx->tx_commit_cb_taskq != NULL)
    108 		taskq_destroy(tx->tx_commit_cb_taskq);
    109 
    110 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
    111 
    112 	bzero(tx, sizeof (tx_state_t));
    113 }
    114 
    115 /*
    116  * Start syncing transaction groups.
    117  */
    118 void
    119 txg_sync_start(dsl_pool_t *dp)
    120 {
    121 	tx_state_t *tx = &dp->dp_tx;
    122 
    123 	mutex_enter(&tx->tx_sync_lock);
    124 
    125 	dprintf("pool %p\n", dp);
    126 
    127 	ASSERT(tx->tx_threads == 0);
    128 
    129 	tx->tx_threads = 2;
    130 
    131 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    132 	    dp, 0, &p0, TS_RUN, minclsyspri);
    133 
    134 	/*
    135 	 * The sync thread can need a larger-than-default stack size on
    136 	 * 32-bit x86.  This is due in part to nested pools and
    137 	 * scrub_visitbp() recursion.
    138 	 */
    139 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
    140 	    dp, 0, &p0, TS_RUN, minclsyspri);
    141 
    142 	mutex_exit(&tx->tx_sync_lock);
    143 }
    144 
    145 static void
    146 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    147 {
    148 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    149 	mutex_enter(&tx->tx_sync_lock);
    150 }
    151 
    152 static void
    153 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    154 {
    155 	ASSERT(*tpp != NULL);
    156 	*tpp = NULL;
    157 	tx->tx_threads--;
    158 	cv_broadcast(&tx->tx_exit_cv);
    159 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    160 	thread_exit();
    161 }
    162 
    163 static void
    164 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
    165 {
    166 	CALLB_CPR_SAFE_BEGIN(cpr);
    167 
    168 	if (time)
    169 		(void) cv_timedwait(cv, &tx->tx_sync_lock,
    170 		    ddi_get_lbolt() + time);
    171 	else
    172 		cv_wait(cv, &tx->tx_sync_lock);
    173 
    174 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    175 }
    176 
    177 /*
    178  * Stop syncing transaction groups.
    179  */
    180 void
    181 txg_sync_stop(dsl_pool_t *dp)
    182 {
    183 	tx_state_t *tx = &dp->dp_tx;
    184 
    185 	dprintf("pool %p\n", dp);
    186 	/*
    187 	 * Finish off any work in progress.
    188 	 */
    189 	ASSERT(tx->tx_threads == 2);
    190 
    191 	/*
    192 	 * We need to ensure that we've vacated the deferred space_maps.
    193 	 */
    194 	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
    195 
    196 	/*
    197 	 * Wake all sync threads and wait for them to die.
    198 	 */
    199 	mutex_enter(&tx->tx_sync_lock);
    200 
    201 	ASSERT(tx->tx_threads == 2);
    202 
    203 	tx->tx_exiting = 1;
    204 
    205 	cv_broadcast(&tx->tx_quiesce_more_cv);
    206 	cv_broadcast(&tx->tx_quiesce_done_cv);
    207 	cv_broadcast(&tx->tx_sync_more_cv);
    208 
    209 	while (tx->tx_threads != 0)
    210 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    211 
    212 	tx->tx_exiting = 0;
    213 
    214 	mutex_exit(&tx->tx_sync_lock);
    215 }
    216 
    217 uint64_t
    218 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    219 {
    220 	tx_state_t *tx = &dp->dp_tx;
    221 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    222 	uint64_t txg;
    223 
    224 	mutex_enter(&tc->tc_lock);
    225 
    226 	txg = tx->tx_open_txg;
    227 	tc->tc_count[txg & TXG_MASK]++;
    228 
    229 	th->th_cpu = tc;
    230 	th->th_txg = txg;
    231 
    232 	return (txg);
    233 }
    234 
    235 void
    236 txg_rele_to_quiesce(txg_handle_t *th)
    237 {
    238 	tx_cpu_t *tc = th->th_cpu;
    239 
    240 	mutex_exit(&tc->tc_lock);
    241 }
    242 
    243 void
    244 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
    245 {
    246 	tx_cpu_t *tc = th->th_cpu;
    247 	int g = th->th_txg & TXG_MASK;
    248 
    249 	mutex_enter(&tc->tc_lock);
    250 	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
    251 	mutex_exit(&tc->tc_lock);
    252 }
    253 
    254 void
    255 txg_rele_to_sync(txg_handle_t *th)
    256 {
    257 	tx_cpu_t *tc = th->th_cpu;
    258 	int g = th->th_txg & TXG_MASK;
    259 
    260 	mutex_enter(&tc->tc_lock);
    261 	ASSERT(tc->tc_count[g] != 0);
    262 	if (--tc->tc_count[g] == 0)
    263 		cv_broadcast(&tc->tc_cv[g]);
    264 	mutex_exit(&tc->tc_lock);
    265 
    266 	th->th_cpu = NULL;	/* defensive */
    267 }
    268 
    269 static void
    270 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    271 {
    272 	tx_state_t *tx = &dp->dp_tx;
    273 	int g = txg & TXG_MASK;
    274 	int c;
    275 
    276 	/*
    277 	 * Grab all tx_cpu locks so nobody else can get into this txg.
    278 	 */
    279 	for (c = 0; c < max_ncpus; c++)
    280 		mutex_enter(&tx->tx_cpu[c].tc_lock);
    281 
    282 	ASSERT(txg == tx->tx_open_txg);
    283 	tx->tx_open_txg++;
    284 
    285 	/*
    286 	 * Now that we've incremented tx_open_txg, we can let threads
    287 	 * enter the next transaction group.
    288 	 */
    289 	for (c = 0; c < max_ncpus; c++)
    290 		mutex_exit(&tx->tx_cpu[c].tc_lock);
    291 
    292 	/*
    293 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    294 	 */
    295 	for (c = 0; c < max_ncpus; c++) {
    296 		tx_cpu_t *tc = &tx->tx_cpu[c];
    297 		mutex_enter(&tc->tc_lock);
    298 		while (tc->tc_count[g] != 0)
    299 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    300 		mutex_exit(&tc->tc_lock);
    301 	}
    302 }
    303 
    304 static void
    305 txg_do_callbacks(list_t *cb_list)
    306 {
    307 	dmu_tx_do_callbacks(cb_list, 0);
    308 
    309 	list_destroy(cb_list);
    310 
    311 	kmem_free(cb_list, sizeof (list_t));
    312 }
    313 
    314 /*
    315  * Dispatch the commit callbacks registered on this txg to worker threads.
    316  */
    317 static void
    318 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
    319 {
    320 	int c;
    321 	tx_state_t *tx = &dp->dp_tx;
    322 	list_t *cb_list;
    323 
    324 	for (c = 0; c < max_ncpus; c++) {
    325 		tx_cpu_t *tc = &tx->tx_cpu[c];
    326 		/* No need to lock tx_cpu_t at this point */
    327 
    328 		int g = txg & TXG_MASK;
    329 
    330 		if (list_is_empty(&tc->tc_callbacks[g]))
    331 			continue;
    332 
    333 		if (tx->tx_commit_cb_taskq == NULL) {
    334 			/*
    335 			 * Commit callback taskq hasn't been created yet.
    336 			 */
    337 			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
    338 			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
    339 			    TASKQ_PREPOPULATE);
    340 		}
    341 
    342 		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
    343 		list_create(cb_list, sizeof (dmu_tx_callback_t),
    344 		    offsetof(dmu_tx_callback_t, dcb_node));
    345 
    346 		list_move_tail(&tc->tc_callbacks[g], cb_list);
    347 
    348 		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
    349 		    txg_do_callbacks, cb_list, TQ_SLEEP);
    350 	}
    351 }
    352 
    353 static void
    354 txg_sync_thread(dsl_pool_t *dp)
    355 {
    356 	tx_state_t *tx = &dp->dp_tx;
    357 	callb_cpr_t cpr;
    358 	uint64_t start, delta;
    359 
    360 	txg_thread_enter(tx, &cpr);
    361 
    362 	start = delta = 0;
    363 	for (;;) {
    364 		uint64_t timer, timeout = zfs_txg_timeout * hz;
    365 		uint64_t txg;
    366 
    367 		/*
    368 		 * We sync when we're scrubbing, there's someone waiting
    369 		 * on us, or the quiesce thread has handed off a txg to
    370 		 * us, or we have reached our timeout.
    371 		 */
    372 		timer = (delta >= timeout ? 0 : timeout - delta);
    373 		while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
    374 		    spa_shutting_down(dp->dp_spa)) &&
    375 		    !tx->tx_exiting && timer > 0 &&
    376 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    377 		    tx->tx_quiesced_txg == 0) {
    378 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    379 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    380 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
    381 			delta = ddi_get_lbolt() - start;
    382 			timer = (delta > timeout ? 0 : timeout - delta);
    383 		}
    384 
    385 		/*
    386 		 * Wait until the quiesce thread hands off a txg to us,
    387 		 * prompting it to do so if necessary.
    388 		 */
    389 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    390 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    391 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    392 			cv_broadcast(&tx->tx_quiesce_more_cv);
    393 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    394 		}
    395 
    396 		if (tx->tx_exiting)
    397 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    398 
    399 		/*
    400 		 * Consume the quiesced txg which has been handed off to
    401 		 * us.  This may cause the quiescing thread to now be
    402 		 * able to quiesce another txg, so we must signal it.
    403 		 */
    404 		txg = tx->tx_quiesced_txg;
    405 		tx->tx_quiesced_txg = 0;
    406 		tx->tx_syncing_txg = txg;
    407 		cv_broadcast(&tx->tx_quiesce_more_cv);
    408 
    409 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    410 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    411 		mutex_exit(&tx->tx_sync_lock);
    412 
    413 		start = ddi_get_lbolt();
    414 		spa_sync(dp->dp_spa, txg);
    415 		delta = ddi_get_lbolt() - start;
    416 
    417 		mutex_enter(&tx->tx_sync_lock);
    418 		tx->tx_synced_txg = txg;
    419 		tx->tx_syncing_txg = 0;
    420 		cv_broadcast(&tx->tx_sync_done_cv);
    421 
    422 		/*
    423 		 * Dispatch commit callbacks to worker threads.
    424 		 */
    425 		txg_dispatch_callbacks(dp, txg);
    426 	}
    427 }
    428 
    429 static void
    430 txg_quiesce_thread(dsl_pool_t *dp)
    431 {
    432 	tx_state_t *tx = &dp->dp_tx;
    433 	callb_cpr_t cpr;
    434 
    435 	txg_thread_enter(tx, &cpr);
    436 
    437 	for (;;) {
    438 		uint64_t txg;
    439 
    440 		/*
    441 		 * We quiesce when there's someone waiting on us.
    442 		 * However, we can only have one txg in "quiescing" or
    443 		 * "quiesced, waiting to sync" state.  So we wait until
    444 		 * the "quiesced, waiting to sync" txg has been consumed
    445 		 * by the sync thread.
    446 		 */
    447 		while (!tx->tx_exiting &&
    448 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    449 		    tx->tx_quiesced_txg != 0))
    450 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    451 
    452 		if (tx->tx_exiting)
    453 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    454 
    455 		txg = tx->tx_open_txg;
    456 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    457 		    txg, tx->tx_quiesce_txg_waiting,
    458 		    tx->tx_sync_txg_waiting);
    459 		mutex_exit(&tx->tx_sync_lock);
    460 		txg_quiesce(dp, txg);
    461 		mutex_enter(&tx->tx_sync_lock);
    462 
    463 		/*
    464 		 * Hand this txg off to the sync thread.
    465 		 */
    466 		dprintf("quiesce done, handing off txg %llu\n", txg);
    467 		tx->tx_quiesced_txg = txg;
    468 		cv_broadcast(&tx->tx_sync_more_cv);
    469 		cv_broadcast(&tx->tx_quiesce_done_cv);
    470 	}
    471 }
    472 
    473 /*
    474  * Delay this thread by 'ticks' if we are still in the open transaction
    475  * group and there is already a waiting txg quiesing or quiesced.  Abort
    476  * the delay if this txg stalls or enters the quiesing state.
    477  */
    478 void
    479 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
    480 {
    481 	tx_state_t *tx = &dp->dp_tx;
    482 	int timeout = ddi_get_lbolt() + ticks;
    483 
    484 	/* don't delay if this txg could transition to quiesing immediately */
    485 	if (tx->tx_open_txg > txg ||
    486 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
    487 		return;
    488 
    489 	mutex_enter(&tx->tx_sync_lock);
    490 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
    491 		mutex_exit(&tx->tx_sync_lock);
    492 		return;
    493 	}
    494 
    495 	while (ddi_get_lbolt() < timeout &&
    496 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
    497 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
    498 		    timeout);
    499 
    500 	mutex_exit(&tx->tx_sync_lock);
    501 }
    502 
    503 void
    504 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    505 {
    506 	tx_state_t *tx = &dp->dp_tx;
    507 
    508 	mutex_enter(&tx->tx_sync_lock);
    509 	ASSERT(tx->tx_threads == 2);
    510 	if (txg == 0)
    511 		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
    512 	if (tx->tx_sync_txg_waiting < txg)
    513 		tx->tx_sync_txg_waiting = txg;
    514 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    515 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    516 	while (tx->tx_synced_txg < txg) {
    517 		dprintf("broadcasting sync more "
    518 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    519 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    520 		cv_broadcast(&tx->tx_sync_more_cv);
    521 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    522 	}
    523 	mutex_exit(&tx->tx_sync_lock);
    524 }
    525 
    526 void
    527 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    528 {
    529 	tx_state_t *tx = &dp->dp_tx;
    530 
    531 	mutex_enter(&tx->tx_sync_lock);
    532 	ASSERT(tx->tx_threads == 2);
    533 	if (txg == 0)
    534 		txg = tx->tx_open_txg + 1;
    535 	if (tx->tx_quiesce_txg_waiting < txg)
    536 		tx->tx_quiesce_txg_waiting = txg;
    537 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    538 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    539 	while (tx->tx_open_txg < txg) {
    540 		cv_broadcast(&tx->tx_quiesce_more_cv);
    541 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    542 	}
    543 	mutex_exit(&tx->tx_sync_lock);
    544 }
    545 
    546 boolean_t
    547 txg_stalled(dsl_pool_t *dp)
    548 {
    549 	tx_state_t *tx = &dp->dp_tx;
    550 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    551 }
    552 
    553 boolean_t
    554 txg_sync_waiting(dsl_pool_t *dp)
    555 {
    556 	tx_state_t *tx = &dp->dp_tx;
    557 
    558 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
    559 	    tx->tx_quiesced_txg != 0);
    560 }
    561 
    562 /*
    563  * Per-txg object lists.
    564  */
    565 void
    566 txg_list_create(txg_list_t *tl, size_t offset)
    567 {
    568 	int t;
    569 
    570 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    571 
    572 	tl->tl_offset = offset;
    573 
    574 	for (t = 0; t < TXG_SIZE; t++)
    575 		tl->tl_head[t] = NULL;
    576 }
    577 
    578 void
    579 txg_list_destroy(txg_list_t *tl)
    580 {
    581 	int t;
    582 
    583 	for (t = 0; t < TXG_SIZE; t++)
    584 		ASSERT(txg_list_empty(tl, t));
    585 
    586 	mutex_destroy(&tl->tl_lock);
    587 }
    588 
    589 int
    590 txg_list_empty(txg_list_t *tl, uint64_t txg)
    591 {
    592 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    593 }
    594 
    595 /*
    596  * Add an entry to the list.
    597  * Returns 0 if it's a new entry, 1 if it's already there.
    598  */
    599 int
    600 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    601 {
    602 	int t = txg & TXG_MASK;
    603 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    604 	int already_on_list;
    605 
    606 	mutex_enter(&tl->tl_lock);
    607 	already_on_list = tn->tn_member[t];
    608 	if (!already_on_list) {
    609 		tn->tn_member[t] = 1;
    610 		tn->tn_next[t] = tl->tl_head[t];
    611 		tl->tl_head[t] = tn;
    612 	}
    613 	mutex_exit(&tl->tl_lock);
    614 
    615 	return (already_on_list);
    616 }
    617 
    618 /*
    619  * Remove the head of the list and return it.
    620  */
    621 void *
    622 txg_list_remove(txg_list_t *tl, uint64_t txg)
    623 {
    624 	int t = txg & TXG_MASK;
    625 	txg_node_t *tn;
    626 	void *p = NULL;
    627 
    628 	mutex_enter(&tl->tl_lock);
    629 	if ((tn = tl->tl_head[t]) != NULL) {
    630 		p = (char *)tn - tl->tl_offset;
    631 		tl->tl_head[t] = tn->tn_next[t];
    632 		tn->tn_next[t] = NULL;
    633 		tn->tn_member[t] = 0;
    634 	}
    635 	mutex_exit(&tl->tl_lock);
    636 
    637 	return (p);
    638 }
    639 
    640 /*
    641  * Remove a specific item from the list and return it.
    642  */
    643 void *
    644 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    645 {
    646 	int t = txg & TXG_MASK;
    647 	txg_node_t *tn, **tp;
    648 
    649 	mutex_enter(&tl->tl_lock);
    650 
    651 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    652 		if ((char *)tn - tl->tl_offset == p) {
    653 			*tp = tn->tn_next[t];
    654 			tn->tn_next[t] = NULL;
    655 			tn->tn_member[t] = 0;
    656 			mutex_exit(&tl->tl_lock);
    657 			return (p);
    658 		}
    659 	}
    660 
    661 	mutex_exit(&tl->tl_lock);
    662 
    663 	return (NULL);
    664 }
    665 
    666 int
    667 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    668 {
    669 	int t = txg & TXG_MASK;
    670 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    671 
    672 	return (tn->tn_member[t]);
    673 }
    674 
    675 /*
    676  * Walk a txg list -- only safe if you know it's not changing.
    677  */
    678 void *
    679 txg_list_head(txg_list_t *tl, uint64_t txg)
    680 {
    681 	int t = txg & TXG_MASK;
    682 	txg_node_t *tn = tl->tl_head[t];
    683 
    684 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    685 }
    686 
    687 void *
    688 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    689 {
    690 	int t = txg & TXG_MASK;
    691 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    692 
    693 	tn = tn->tn_next[t];
    694 
    695 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    696 }
    697