Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/zfs_context.h>
     27 #include <sys/txg_impl.h>
     28 #include <sys/dmu_impl.h>
     29 #include <sys/dmu_tx.h>
     30 #include <sys/dsl_pool.h>
     31 #include <sys/callb.h>
     32 
     33 /*
     34  * Pool-wide transaction groups.
     35  */
     36 
     37 static void txg_sync_thread(dsl_pool_t *dp);
     38 static void txg_quiesce_thread(dsl_pool_t *dp);
     39 
     40 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
     41 
     42 /*
     43  * Prepare the txg subsystem.
     44  */
     45 void
     46 txg_init(dsl_pool_t *dp, uint64_t txg)
     47 {
     48 	tx_state_t *tx = &dp->dp_tx;
     49 	int c;
     50 	bzero(tx, sizeof (tx_state_t));
     51 
     52 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
     53 
     54 	for (c = 0; c < max_ncpus; c++) {
     55 		int i;
     56 
     57 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
     58 		for (i = 0; i < TXG_SIZE; i++) {
     59 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
     60 			    NULL);
     61 			list_create(&tx->tx_cpu[c].tc_callbacks[i],
     62 			    sizeof (dmu_tx_callback_t),
     63 			    offsetof(dmu_tx_callback_t, dcb_node));
     64 		}
     65 	}
     66 
     67 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
     68 
     69 	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
     70 	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
     71 	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
     72 	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
     73 	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
     74 
     75 	tx->tx_open_txg = txg;
     76 }
     77 
     78 /*
     79  * Close down the txg subsystem.
     80  */
     81 void
     82 txg_fini(dsl_pool_t *dp)
     83 {
     84 	tx_state_t *tx = &dp->dp_tx;
     85 	int c;
     86 
     87 	ASSERT(tx->tx_threads == 0);
     88 
     89 	mutex_destroy(&tx->tx_sync_lock);
     90 
     91 	cv_destroy(&tx->tx_sync_more_cv);
     92 	cv_destroy(&tx->tx_sync_done_cv);
     93 	cv_destroy(&tx->tx_quiesce_more_cv);
     94 	cv_destroy(&tx->tx_quiesce_done_cv);
     95 	cv_destroy(&tx->tx_exit_cv);
     96 
     97 	for (c = 0; c < max_ncpus; c++) {
     98 		int i;
     99 
    100 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
    101 		for (i = 0; i < TXG_SIZE; i++) {
    102 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
    103 			list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
    104 		}
    105 	}
    106 
    107 	if (tx->tx_commit_cb_taskq != NULL)
    108 		taskq_destroy(tx->tx_commit_cb_taskq);
    109 
    110 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
    111 
    112 	bzero(tx, sizeof (tx_state_t));
    113 }
    114 
    115 /*
    116  * Start syncing transaction groups.
    117  */
    118 void
    119 txg_sync_start(dsl_pool_t *dp)
    120 {
    121 	tx_state_t *tx = &dp->dp_tx;
    122 
    123 	mutex_enter(&tx->tx_sync_lock);
    124 
    125 	dprintf("pool %p\n", dp);
    126 
    127 	ASSERT(tx->tx_threads == 0);
    128 
    129 	tx->tx_threads = 2;
    130 
    131 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    132 	    dp, 0, &p0, TS_RUN, minclsyspri);
    133 
    134 	/*
    135 	 * The sync thread can need a larger-than-default stack size on
    136 	 * 32-bit x86.  This is due in part to nested pools and
    137 	 * scrub_visitbp() recursion.
    138 	 */
    139 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
    140 	    dp, 0, &p0, TS_RUN, minclsyspri);
    141 
    142 	mutex_exit(&tx->tx_sync_lock);
    143 }
    144 
    145 static void
    146 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    147 {
    148 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    149 	mutex_enter(&tx->tx_sync_lock);
    150 }
    151 
    152 static void
    153 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    154 {
    155 	ASSERT(*tpp != NULL);
    156 	*tpp = NULL;
    157 	tx->tx_threads--;
    158 	cv_broadcast(&tx->tx_exit_cv);
    159 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    160 	thread_exit();
    161 }
    162 
    163 static void
    164 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
    165 {
    166 	CALLB_CPR_SAFE_BEGIN(cpr);
    167 
    168 	if (time)
    169 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
    170 	else
    171 		cv_wait(cv, &tx->tx_sync_lock);
    172 
    173 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    174 }
    175 
    176 /*
    177  * Stop syncing transaction groups.
    178  */
    179 void
    180 txg_sync_stop(dsl_pool_t *dp)
    181 {
    182 	tx_state_t *tx = &dp->dp_tx;
    183 
    184 	dprintf("pool %p\n", dp);
    185 	/*
    186 	 * Finish off any work in progress.
    187 	 */
    188 	ASSERT(tx->tx_threads == 2);
    189 
    190 	/*
    191 	 * We need to ensure that we've vacated the deferred space_maps.
    192 	 */
    193 	txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
    194 
    195 	/*
    196 	 * Wake all sync threads and wait for them to die.
    197 	 */
    198 	mutex_enter(&tx->tx_sync_lock);
    199 
    200 	ASSERT(tx->tx_threads == 2);
    201 
    202 	tx->tx_exiting = 1;
    203 
    204 	cv_broadcast(&tx->tx_quiesce_more_cv);
    205 	cv_broadcast(&tx->tx_quiesce_done_cv);
    206 	cv_broadcast(&tx->tx_sync_more_cv);
    207 
    208 	while (tx->tx_threads != 0)
    209 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    210 
    211 	tx->tx_exiting = 0;
    212 
    213 	mutex_exit(&tx->tx_sync_lock);
    214 }
    215 
    216 uint64_t
    217 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    218 {
    219 	tx_state_t *tx = &dp->dp_tx;
    220 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    221 	uint64_t txg;
    222 
    223 	mutex_enter(&tc->tc_lock);
    224 
    225 	txg = tx->tx_open_txg;
    226 	tc->tc_count[txg & TXG_MASK]++;
    227 
    228 	th->th_cpu = tc;
    229 	th->th_txg = txg;
    230 
    231 	return (txg);
    232 }
    233 
    234 void
    235 txg_rele_to_quiesce(txg_handle_t *th)
    236 {
    237 	tx_cpu_t *tc = th->th_cpu;
    238 
    239 	mutex_exit(&tc->tc_lock);
    240 }
    241 
    242 void
    243 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
    244 {
    245 	tx_cpu_t *tc = th->th_cpu;
    246 	int g = th->th_txg & TXG_MASK;
    247 
    248 	mutex_enter(&tc->tc_lock);
    249 	list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
    250 	mutex_exit(&tc->tc_lock);
    251 }
    252 
    253 void
    254 txg_rele_to_sync(txg_handle_t *th)
    255 {
    256 	tx_cpu_t *tc = th->th_cpu;
    257 	int g = th->th_txg & TXG_MASK;
    258 
    259 	mutex_enter(&tc->tc_lock);
    260 	ASSERT(tc->tc_count[g] != 0);
    261 	if (--tc->tc_count[g] == 0)
    262 		cv_broadcast(&tc->tc_cv[g]);
    263 	mutex_exit(&tc->tc_lock);
    264 
    265 	th->th_cpu = NULL;	/* defensive */
    266 }
    267 
    268 static void
    269 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    270 {
    271 	tx_state_t *tx = &dp->dp_tx;
    272 	int g = txg & TXG_MASK;
    273 	int c;
    274 
    275 	/*
    276 	 * Grab all tx_cpu locks so nobody else can get into this txg.
    277 	 */
    278 	for (c = 0; c < max_ncpus; c++)
    279 		mutex_enter(&tx->tx_cpu[c].tc_lock);
    280 
    281 	ASSERT(txg == tx->tx_open_txg);
    282 	tx->tx_open_txg++;
    283 
    284 	/*
    285 	 * Now that we've incremented tx_open_txg, we can let threads
    286 	 * enter the next transaction group.
    287 	 */
    288 	for (c = 0; c < max_ncpus; c++)
    289 		mutex_exit(&tx->tx_cpu[c].tc_lock);
    290 
    291 	/*
    292 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    293 	 */
    294 	for (c = 0; c < max_ncpus; c++) {
    295 		tx_cpu_t *tc = &tx->tx_cpu[c];
    296 		mutex_enter(&tc->tc_lock);
    297 		while (tc->tc_count[g] != 0)
    298 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    299 		mutex_exit(&tc->tc_lock);
    300 	}
    301 }
    302 
    303 static void
    304 txg_do_callbacks(list_t *cb_list)
    305 {
    306 	dmu_tx_do_callbacks(cb_list, 0);
    307 
    308 	list_destroy(cb_list);
    309 
    310 	kmem_free(cb_list, sizeof (list_t));
    311 }
    312 
    313 /*
    314  * Dispatch the commit callbacks registered on this txg to worker threads.
    315  */
    316 static void
    317 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
    318 {
    319 	int c;
    320 	tx_state_t *tx = &dp->dp_tx;
    321 	list_t *cb_list;
    322 
    323 	for (c = 0; c < max_ncpus; c++) {
    324 		tx_cpu_t *tc = &tx->tx_cpu[c];
    325 		/* No need to lock tx_cpu_t at this point */
    326 
    327 		int g = txg & TXG_MASK;
    328 
    329 		if (list_is_empty(&tc->tc_callbacks[g]))
    330 			continue;
    331 
    332 		if (tx->tx_commit_cb_taskq == NULL) {
    333 			/*
    334 			 * Commit callback taskq hasn't been created yet.
    335 			 */
    336 			tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
    337 			    max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
    338 			    TASKQ_PREPOPULATE);
    339 		}
    340 
    341 		cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
    342 		list_create(cb_list, sizeof (dmu_tx_callback_t),
    343 		    offsetof(dmu_tx_callback_t, dcb_node));
    344 
    345 		list_move_tail(&tc->tc_callbacks[g], cb_list);
    346 
    347 		(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
    348 		    txg_do_callbacks, cb_list, TQ_SLEEP);
    349 	}
    350 }
    351 
    352 static void
    353 txg_sync_thread(dsl_pool_t *dp)
    354 {
    355 	tx_state_t *tx = &dp->dp_tx;
    356 	callb_cpr_t cpr;
    357 	uint64_t start, delta;
    358 
    359 	txg_thread_enter(tx, &cpr);
    360 
    361 	start = delta = 0;
    362 	for (;;) {
    363 		uint64_t timer, timeout = zfs_txg_timeout * hz;
    364 		uint64_t txg;
    365 
    366 		/*
    367 		 * We sync when we're scrubbing, there's someone waiting
    368 		 * on us, or the quiesce thread has handed off a txg to
    369 		 * us, or we have reached our timeout.
    370 		 */
    371 		timer = (delta >= timeout ? 0 : timeout - delta);
    372 		while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
    373 		    spa_shutting_down(dp->dp_spa)) &&
    374 		    !tx->tx_exiting && timer > 0 &&
    375 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    376 		    tx->tx_quiesced_txg == 0) {
    377 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    378 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    379 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
    380 			delta = lbolt - start;
    381 			timer = (delta > timeout ? 0 : timeout - delta);
    382 		}
    383 
    384 		/*
    385 		 * Wait until the quiesce thread hands off a txg to us,
    386 		 * prompting it to do so if necessary.
    387 		 */
    388 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    389 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    390 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    391 			cv_broadcast(&tx->tx_quiesce_more_cv);
    392 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    393 		}
    394 
    395 		if (tx->tx_exiting)
    396 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    397 
    398 		/*
    399 		 * Consume the quiesced txg which has been handed off to
    400 		 * us.  This may cause the quiescing thread to now be
    401 		 * able to quiesce another txg, so we must signal it.
    402 		 */
    403 		txg = tx->tx_quiesced_txg;
    404 		tx->tx_quiesced_txg = 0;
    405 		tx->tx_syncing_txg = txg;
    406 		cv_broadcast(&tx->tx_quiesce_more_cv);
    407 
    408 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    409 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    410 		mutex_exit(&tx->tx_sync_lock);
    411 
    412 		start = lbolt;
    413 		spa_sync(dp->dp_spa, txg);
    414 		delta = lbolt - start;
    415 
    416 		mutex_enter(&tx->tx_sync_lock);
    417 		tx->tx_synced_txg = txg;
    418 		tx->tx_syncing_txg = 0;
    419 		cv_broadcast(&tx->tx_sync_done_cv);
    420 
    421 		/*
    422 		 * Dispatch commit callbacks to worker threads.
    423 		 */
    424 		txg_dispatch_callbacks(dp, txg);
    425 	}
    426 }
    427 
    428 static void
    429 txg_quiesce_thread(dsl_pool_t *dp)
    430 {
    431 	tx_state_t *tx = &dp->dp_tx;
    432 	callb_cpr_t cpr;
    433 
    434 	txg_thread_enter(tx, &cpr);
    435 
    436 	for (;;) {
    437 		uint64_t txg;
    438 
    439 		/*
    440 		 * We quiesce when there's someone waiting on us.
    441 		 * However, we can only have one txg in "quiescing" or
    442 		 * "quiesced, waiting to sync" state.  So we wait until
    443 		 * the "quiesced, waiting to sync" txg has been consumed
    444 		 * by the sync thread.
    445 		 */
    446 		while (!tx->tx_exiting &&
    447 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    448 		    tx->tx_quiesced_txg != 0))
    449 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    450 
    451 		if (tx->tx_exiting)
    452 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    453 
    454 		txg = tx->tx_open_txg;
    455 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    456 		    txg, tx->tx_quiesce_txg_waiting,
    457 		    tx->tx_sync_txg_waiting);
    458 		mutex_exit(&tx->tx_sync_lock);
    459 		txg_quiesce(dp, txg);
    460 		mutex_enter(&tx->tx_sync_lock);
    461 
    462 		/*
    463 		 * Hand this txg off to the sync thread.
    464 		 */
    465 		dprintf("quiesce done, handing off txg %llu\n", txg);
    466 		tx->tx_quiesced_txg = txg;
    467 		cv_broadcast(&tx->tx_sync_more_cv);
    468 		cv_broadcast(&tx->tx_quiesce_done_cv);
    469 	}
    470 }
    471 
    472 /*
    473  * Delay this thread by 'ticks' if we are still in the open transaction
    474  * group and there is already a waiting txg quiesing or quiesced.  Abort
    475  * the delay if this txg stalls or enters the quiesing state.
    476  */
    477 void
    478 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
    479 {
    480 	tx_state_t *tx = &dp->dp_tx;
    481 	int timeout = lbolt + ticks;
    482 
    483 	/* don't delay if this txg could transition to quiesing immediately */
    484 	if (tx->tx_open_txg > txg ||
    485 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
    486 		return;
    487 
    488 	mutex_enter(&tx->tx_sync_lock);
    489 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
    490 		mutex_exit(&tx->tx_sync_lock);
    491 		return;
    492 	}
    493 
    494 	while (lbolt < timeout &&
    495 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
    496 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
    497 		    timeout);
    498 
    499 	mutex_exit(&tx->tx_sync_lock);
    500 }
    501 
    502 void
    503 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    504 {
    505 	tx_state_t *tx = &dp->dp_tx;
    506 
    507 	mutex_enter(&tx->tx_sync_lock);
    508 	ASSERT(tx->tx_threads == 2);
    509 	if (txg == 0)
    510 		txg = tx->tx_open_txg + TXG_DEFER_SIZE;
    511 	if (tx->tx_sync_txg_waiting < txg)
    512 		tx->tx_sync_txg_waiting = txg;
    513 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    514 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    515 	while (tx->tx_synced_txg < txg) {
    516 		dprintf("broadcasting sync more "
    517 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    518 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    519 		cv_broadcast(&tx->tx_sync_more_cv);
    520 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    521 	}
    522 	mutex_exit(&tx->tx_sync_lock);
    523 }
    524 
    525 void
    526 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    527 {
    528 	tx_state_t *tx = &dp->dp_tx;
    529 
    530 	mutex_enter(&tx->tx_sync_lock);
    531 	ASSERT(tx->tx_threads == 2);
    532 	if (txg == 0)
    533 		txg = tx->tx_open_txg + 1;
    534 	if (tx->tx_quiesce_txg_waiting < txg)
    535 		tx->tx_quiesce_txg_waiting = txg;
    536 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    537 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    538 	while (tx->tx_open_txg < txg) {
    539 		cv_broadcast(&tx->tx_quiesce_more_cv);
    540 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    541 	}
    542 	mutex_exit(&tx->tx_sync_lock);
    543 }
    544 
    545 boolean_t
    546 txg_stalled(dsl_pool_t *dp)
    547 {
    548 	tx_state_t *tx = &dp->dp_tx;
    549 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    550 }
    551 
    552 boolean_t
    553 txg_sync_waiting(dsl_pool_t *dp)
    554 {
    555 	tx_state_t *tx = &dp->dp_tx;
    556 
    557 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
    558 	    tx->tx_quiesced_txg != 0);
    559 }
    560 
    561 /*
    562  * Per-txg object lists.
    563  */
    564 void
    565 txg_list_create(txg_list_t *tl, size_t offset)
    566 {
    567 	int t;
    568 
    569 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    570 
    571 	tl->tl_offset = offset;
    572 
    573 	for (t = 0; t < TXG_SIZE; t++)
    574 		tl->tl_head[t] = NULL;
    575 }
    576 
    577 void
    578 txg_list_destroy(txg_list_t *tl)
    579 {
    580 	int t;
    581 
    582 	for (t = 0; t < TXG_SIZE; t++)
    583 		ASSERT(txg_list_empty(tl, t));
    584 
    585 	mutex_destroy(&tl->tl_lock);
    586 }
    587 
    588 int
    589 txg_list_empty(txg_list_t *tl, uint64_t txg)
    590 {
    591 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    592 }
    593 
    594 /*
    595  * Add an entry to the list.
    596  * Returns 0 if it's a new entry, 1 if it's already there.
    597  */
    598 int
    599 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    600 {
    601 	int t = txg & TXG_MASK;
    602 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    603 	int already_on_list;
    604 
    605 	mutex_enter(&tl->tl_lock);
    606 	already_on_list = tn->tn_member[t];
    607 	if (!already_on_list) {
    608 		tn->tn_member[t] = 1;
    609 		tn->tn_next[t] = tl->tl_head[t];
    610 		tl->tl_head[t] = tn;
    611 	}
    612 	mutex_exit(&tl->tl_lock);
    613 
    614 	return (already_on_list);
    615 }
    616 
    617 /*
    618  * Remove the head of the list and return it.
    619  */
    620 void *
    621 txg_list_remove(txg_list_t *tl, uint64_t txg)
    622 {
    623 	int t = txg & TXG_MASK;
    624 	txg_node_t *tn;
    625 	void *p = NULL;
    626 
    627 	mutex_enter(&tl->tl_lock);
    628 	if ((tn = tl->tl_head[t]) != NULL) {
    629 		p = (char *)tn - tl->tl_offset;
    630 		tl->tl_head[t] = tn->tn_next[t];
    631 		tn->tn_next[t] = NULL;
    632 		tn->tn_member[t] = 0;
    633 	}
    634 	mutex_exit(&tl->tl_lock);
    635 
    636 	return (p);
    637 }
    638 
    639 /*
    640  * Remove a specific item from the list and return it.
    641  */
    642 void *
    643 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    644 {
    645 	int t = txg & TXG_MASK;
    646 	txg_node_t *tn, **tp;
    647 
    648 	mutex_enter(&tl->tl_lock);
    649 
    650 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    651 		if ((char *)tn - tl->tl_offset == p) {
    652 			*tp = tn->tn_next[t];
    653 			tn->tn_next[t] = NULL;
    654 			tn->tn_member[t] = 0;
    655 			mutex_exit(&tl->tl_lock);
    656 			return (p);
    657 		}
    658 	}
    659 
    660 	mutex_exit(&tl->tl_lock);
    661 
    662 	return (NULL);
    663 }
    664 
    665 int
    666 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    667 {
    668 	int t = txg & TXG_MASK;
    669 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    670 
    671 	return (tn->tn_member[t]);
    672 }
    673 
    674 /*
    675  * Walk a txg list -- only safe if you know it's not changing.
    676  */
    677 void *
    678 txg_list_head(txg_list_t *tl, uint64_t txg)
    679 {
    680 	int t = txg & TXG_MASK;
    681 	txg_node_t *tn = tl->tl_head[t];
    682 
    683 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    684 }
    685 
    686 void *
    687 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    688 {
    689 	int t = txg & TXG_MASK;
    690 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    691 
    692 	tn = tn->tn_next[t];
    693 
    694 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    695 }
    696