Home | History | Annotate | Download | only in zfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/zfs_context.h>
     27 #include <sys/txg_impl.h>
     28 #include <sys/dmu_impl.h>
     29 #include <sys/dsl_pool.h>
     30 #include <sys/callb.h>
     31 
     32 /*
     33  * Pool-wide transaction groups.
     34  */
     35 
     36 static void txg_sync_thread(dsl_pool_t *dp);
     37 static void txg_quiesce_thread(dsl_pool_t *dp);
     38 
     39 int zfs_txg_timeout = 30;	/* max seconds worth of delta per txg */
     40 
     41 /*
     42  * Prepare the txg subsystem.
     43  */
     44 void
     45 txg_init(dsl_pool_t *dp, uint64_t txg)
     46 {
     47 	tx_state_t *tx = &dp->dp_tx;
     48 	int c;
     49 	bzero(tx, sizeof (tx_state_t));
     50 
     51 	tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
     52 
     53 	for (c = 0; c < max_ncpus; c++) {
     54 		int i;
     55 
     56 		mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
     57 		for (i = 0; i < TXG_SIZE; i++) {
     58 			cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
     59 			    NULL);
     60 		}
     61 	}
     62 
     63 	rw_init(&tx->tx_suspend, NULL, RW_DEFAULT, NULL);
     64 	mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
     65 
     66 	cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
     67 	cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
     68 	cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
     69 	cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
     70 	cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
     71 
     72 	tx->tx_open_txg = txg;
     73 }
     74 
     75 /*
     76  * Close down the txg subsystem.
     77  */
     78 void
     79 txg_fini(dsl_pool_t *dp)
     80 {
     81 	tx_state_t *tx = &dp->dp_tx;
     82 	int c;
     83 
     84 	ASSERT(tx->tx_threads == 0);
     85 
     86 	rw_destroy(&tx->tx_suspend);
     87 	mutex_destroy(&tx->tx_sync_lock);
     88 
     89 	cv_destroy(&tx->tx_sync_more_cv);
     90 	cv_destroy(&tx->tx_sync_done_cv);
     91 	cv_destroy(&tx->tx_quiesce_more_cv);
     92 	cv_destroy(&tx->tx_quiesce_done_cv);
     93 	cv_destroy(&tx->tx_exit_cv);
     94 
     95 	for (c = 0; c < max_ncpus; c++) {
     96 		int i;
     97 
     98 		mutex_destroy(&tx->tx_cpu[c].tc_lock);
     99 		for (i = 0; i < TXG_SIZE; i++)
    100 			cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
    101 	}
    102 
    103 	kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
    104 
    105 	bzero(tx, sizeof (tx_state_t));
    106 }
    107 
    108 /*
    109  * Start syncing transaction groups.
    110  */
    111 void
    112 txg_sync_start(dsl_pool_t *dp)
    113 {
    114 	tx_state_t *tx = &dp->dp_tx;
    115 
    116 	mutex_enter(&tx->tx_sync_lock);
    117 
    118 	dprintf("pool %p\n", dp);
    119 
    120 	ASSERT(tx->tx_threads == 0);
    121 
    122 	tx->tx_threads = 2;
    123 
    124 	tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
    125 	    dp, 0, &p0, TS_RUN, minclsyspri);
    126 
    127 	/*
    128 	 * The sync thread can need a larger-than-default stack size on
    129 	 * 32-bit x86.  This is due in part to nested pools and
    130 	 * scrub_visitbp() recursion.
    131 	 */
    132 	tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread,
    133 	    dp, 0, &p0, TS_RUN, minclsyspri);
    134 
    135 	mutex_exit(&tx->tx_sync_lock);
    136 }
    137 
    138 static void
    139 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
    140 {
    141 	CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
    142 	mutex_enter(&tx->tx_sync_lock);
    143 }
    144 
    145 static void
    146 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
    147 {
    148 	ASSERT(*tpp != NULL);
    149 	*tpp = NULL;
    150 	tx->tx_threads--;
    151 	cv_broadcast(&tx->tx_exit_cv);
    152 	CALLB_CPR_EXIT(cpr);		/* drops &tx->tx_sync_lock */
    153 	thread_exit();
    154 }
    155 
    156 static void
    157 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
    158 {
    159 	CALLB_CPR_SAFE_BEGIN(cpr);
    160 
    161 	if (time)
    162 		(void) cv_timedwait(cv, &tx->tx_sync_lock, lbolt + time);
    163 	else
    164 		cv_wait(cv, &tx->tx_sync_lock);
    165 
    166 	CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
    167 }
    168 
    169 /*
    170  * Stop syncing transaction groups.
    171  */
    172 void
    173 txg_sync_stop(dsl_pool_t *dp)
    174 {
    175 	tx_state_t *tx = &dp->dp_tx;
    176 
    177 	dprintf("pool %p\n", dp);
    178 	/*
    179 	 * Finish off any work in progress.
    180 	 */
    181 	ASSERT(tx->tx_threads == 2);
    182 	txg_wait_synced(dp, 0);
    183 
    184 	/*
    185 	 * Wake all sync threads and wait for them to die.
    186 	 */
    187 	mutex_enter(&tx->tx_sync_lock);
    188 
    189 	ASSERT(tx->tx_threads == 2);
    190 
    191 	tx->tx_exiting = 1;
    192 
    193 	cv_broadcast(&tx->tx_quiesce_more_cv);
    194 	cv_broadcast(&tx->tx_quiesce_done_cv);
    195 	cv_broadcast(&tx->tx_sync_more_cv);
    196 
    197 	while (tx->tx_threads != 0)
    198 		cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
    199 
    200 	tx->tx_exiting = 0;
    201 
    202 	mutex_exit(&tx->tx_sync_lock);
    203 }
    204 
    205 uint64_t
    206 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
    207 {
    208 	tx_state_t *tx = &dp->dp_tx;
    209 	tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
    210 	uint64_t txg;
    211 
    212 	mutex_enter(&tc->tc_lock);
    213 
    214 	txg = tx->tx_open_txg;
    215 	tc->tc_count[txg & TXG_MASK]++;
    216 
    217 	th->th_cpu = tc;
    218 	th->th_txg = txg;
    219 
    220 	return (txg);
    221 }
    222 
    223 void
    224 txg_rele_to_quiesce(txg_handle_t *th)
    225 {
    226 	tx_cpu_t *tc = th->th_cpu;
    227 
    228 	mutex_exit(&tc->tc_lock);
    229 }
    230 
    231 void
    232 txg_rele_to_sync(txg_handle_t *th)
    233 {
    234 	tx_cpu_t *tc = th->th_cpu;
    235 	int g = th->th_txg & TXG_MASK;
    236 
    237 	mutex_enter(&tc->tc_lock);
    238 	ASSERT(tc->tc_count[g] != 0);
    239 	if (--tc->tc_count[g] == 0)
    240 		cv_broadcast(&tc->tc_cv[g]);
    241 	mutex_exit(&tc->tc_lock);
    242 
    243 	th->th_cpu = NULL;	/* defensive */
    244 }
    245 
    246 static void
    247 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
    248 {
    249 	tx_state_t *tx = &dp->dp_tx;
    250 	int g = txg & TXG_MASK;
    251 	int c;
    252 
    253 	/*
    254 	 * Grab all tx_cpu locks so nobody else can get into this txg.
    255 	 */
    256 	for (c = 0; c < max_ncpus; c++)
    257 		mutex_enter(&tx->tx_cpu[c].tc_lock);
    258 
    259 	ASSERT(txg == tx->tx_open_txg);
    260 	tx->tx_open_txg++;
    261 
    262 	/*
    263 	 * Now that we've incremented tx_open_txg, we can let threads
    264 	 * enter the next transaction group.
    265 	 */
    266 	for (c = 0; c < max_ncpus; c++)
    267 		mutex_exit(&tx->tx_cpu[c].tc_lock);
    268 
    269 	/*
    270 	 * Quiesce the transaction group by waiting for everyone to txg_exit().
    271 	 */
    272 	for (c = 0; c < max_ncpus; c++) {
    273 		tx_cpu_t *tc = &tx->tx_cpu[c];
    274 		mutex_enter(&tc->tc_lock);
    275 		while (tc->tc_count[g] != 0)
    276 			cv_wait(&tc->tc_cv[g], &tc->tc_lock);
    277 		mutex_exit(&tc->tc_lock);
    278 	}
    279 }
    280 
    281 static void
    282 txg_sync_thread(dsl_pool_t *dp)
    283 {
    284 	tx_state_t *tx = &dp->dp_tx;
    285 	callb_cpr_t cpr;
    286 	uint64_t start, delta;
    287 
    288 	txg_thread_enter(tx, &cpr);
    289 
    290 	start = delta = 0;
    291 	for (;;) {
    292 		uint64_t timer, timeout = zfs_txg_timeout * hz;
    293 		uint64_t txg;
    294 
    295 		/*
    296 		 * We sync when we're scrubbing, there's someone waiting
    297 		 * on us, or the quiesce thread has handed off a txg to
    298 		 * us, or we have reached our timeout.
    299 		 */
    300 		timer = (delta >= timeout ? 0 : timeout - delta);
    301 		while ((dp->dp_scrub_func == SCRUB_FUNC_NONE ||
    302 		    spa_shutting_down(dp->dp_spa)) &&
    303 		    !tx->tx_exiting && timer > 0 &&
    304 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
    305 		    tx->tx_quiesced_txg == 0) {
    306 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
    307 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    308 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
    309 			delta = lbolt - start;
    310 			timer = (delta > timeout ? 0 : timeout - delta);
    311 		}
    312 
    313 		/*
    314 		 * Wait until the quiesce thread hands off a txg to us,
    315 		 * prompting it to do so if necessary.
    316 		 */
    317 		while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
    318 			if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
    319 				tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
    320 			cv_broadcast(&tx->tx_quiesce_more_cv);
    321 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
    322 		}
    323 
    324 		if (tx->tx_exiting)
    325 			txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
    326 
    327 		rw_enter(&tx->tx_suspend, RW_WRITER);
    328 
    329 		/*
    330 		 * Consume the quiesced txg which has been handed off to
    331 		 * us.  This may cause the quiescing thread to now be
    332 		 * able to quiesce another txg, so we must signal it.
    333 		 */
    334 		txg = tx->tx_quiesced_txg;
    335 		tx->tx_quiesced_txg = 0;
    336 		tx->tx_syncing_txg = txg;
    337 		cv_broadcast(&tx->tx_quiesce_more_cv);
    338 		rw_exit(&tx->tx_suspend);
    339 
    340 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    341 		    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    342 		mutex_exit(&tx->tx_sync_lock);
    343 
    344 		start = lbolt;
    345 		spa_sync(dp->dp_spa, txg);
    346 		delta = lbolt - start;
    347 
    348 		mutex_enter(&tx->tx_sync_lock);
    349 		rw_enter(&tx->tx_suspend, RW_WRITER);
    350 		tx->tx_synced_txg = txg;
    351 		tx->tx_syncing_txg = 0;
    352 		rw_exit(&tx->tx_suspend);
    353 		cv_broadcast(&tx->tx_sync_done_cv);
    354 	}
    355 }
    356 
    357 static void
    358 txg_quiesce_thread(dsl_pool_t *dp)
    359 {
    360 	tx_state_t *tx = &dp->dp_tx;
    361 	callb_cpr_t cpr;
    362 
    363 	txg_thread_enter(tx, &cpr);
    364 
    365 	for (;;) {
    366 		uint64_t txg;
    367 
    368 		/*
    369 		 * We quiesce when there's someone waiting on us.
    370 		 * However, we can only have one txg in "quiescing" or
    371 		 * "quiesced, waiting to sync" state.  So we wait until
    372 		 * the "quiesced, waiting to sync" txg has been consumed
    373 		 * by the sync thread.
    374 		 */
    375 		while (!tx->tx_exiting &&
    376 		    (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
    377 		    tx->tx_quiesced_txg != 0))
    378 			txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
    379 
    380 		if (tx->tx_exiting)
    381 			txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
    382 
    383 		txg = tx->tx_open_txg;
    384 		dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    385 		    txg, tx->tx_quiesce_txg_waiting,
    386 		    tx->tx_sync_txg_waiting);
    387 		mutex_exit(&tx->tx_sync_lock);
    388 		txg_quiesce(dp, txg);
    389 		mutex_enter(&tx->tx_sync_lock);
    390 
    391 		/*
    392 		 * Hand this txg off to the sync thread.
    393 		 */
    394 		dprintf("quiesce done, handing off txg %llu\n", txg);
    395 		tx->tx_quiesced_txg = txg;
    396 		cv_broadcast(&tx->tx_sync_more_cv);
    397 		cv_broadcast(&tx->tx_quiesce_done_cv);
    398 	}
    399 }
    400 
    401 /*
    402  * Delay this thread by 'ticks' if we are still in the open transaction
    403  * group and there is already a waiting txg quiesing or quiesced.  Abort
    404  * the delay if this txg stalls or enters the quiesing state.
    405  */
    406 void
    407 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
    408 {
    409 	tx_state_t *tx = &dp->dp_tx;
    410 	int timeout = lbolt + ticks;
    411 
    412 	/* don't delay if this txg could transition to quiesing immediately */
    413 	if (tx->tx_open_txg > txg ||
    414 	    tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
    415 		return;
    416 
    417 	mutex_enter(&tx->tx_sync_lock);
    418 	if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
    419 		mutex_exit(&tx->tx_sync_lock);
    420 		return;
    421 	}
    422 
    423 	while (lbolt < timeout &&
    424 	    tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
    425 		(void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
    426 		    timeout);
    427 
    428 	mutex_exit(&tx->tx_sync_lock);
    429 }
    430 
    431 void
    432 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
    433 {
    434 	tx_state_t *tx = &dp->dp_tx;
    435 
    436 	mutex_enter(&tx->tx_sync_lock);
    437 	ASSERT(tx->tx_threads == 2);
    438 	if (txg == 0)
    439 		txg = tx->tx_open_txg;
    440 	if (tx->tx_sync_txg_waiting < txg)
    441 		tx->tx_sync_txg_waiting = txg;
    442 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    443 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    444 	while (tx->tx_synced_txg < txg) {
    445 		dprintf("broadcasting sync more "
    446 		    "tx_synced=%llu waiting=%llu dp=%p\n",
    447 		    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
    448 		cv_broadcast(&tx->tx_sync_more_cv);
    449 		cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
    450 	}
    451 	mutex_exit(&tx->tx_sync_lock);
    452 }
    453 
    454 void
    455 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
    456 {
    457 	tx_state_t *tx = &dp->dp_tx;
    458 
    459 	mutex_enter(&tx->tx_sync_lock);
    460 	ASSERT(tx->tx_threads == 2);
    461 	if (txg == 0)
    462 		txg = tx->tx_open_txg + 1;
    463 	if (tx->tx_quiesce_txg_waiting < txg)
    464 		tx->tx_quiesce_txg_waiting = txg;
    465 	dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
    466 	    txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
    467 	while (tx->tx_open_txg < txg) {
    468 		cv_broadcast(&tx->tx_quiesce_more_cv);
    469 		cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
    470 	}
    471 	mutex_exit(&tx->tx_sync_lock);
    472 }
    473 
    474 boolean_t
    475 txg_stalled(dsl_pool_t *dp)
    476 {
    477 	tx_state_t *tx = &dp->dp_tx;
    478 	return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
    479 }
    480 
    481 boolean_t
    482 txg_sync_waiting(dsl_pool_t *dp)
    483 {
    484 	tx_state_t *tx = &dp->dp_tx;
    485 
    486 	return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
    487 	    tx->tx_quiesced_txg != 0);
    488 }
    489 
    490 void
    491 txg_suspend(dsl_pool_t *dp)
    492 {
    493 	tx_state_t *tx = &dp->dp_tx;
    494 	/* XXX some code paths suspend when they are already suspended! */
    495 	rw_enter(&tx->tx_suspend, RW_READER);
    496 }
    497 
    498 void
    499 txg_resume(dsl_pool_t *dp)
    500 {
    501 	tx_state_t *tx = &dp->dp_tx;
    502 	rw_exit(&tx->tx_suspend);
    503 }
    504 
    505 /*
    506  * Per-txg object lists.
    507  */
    508 void
    509 txg_list_create(txg_list_t *tl, size_t offset)
    510 {
    511 	int t;
    512 
    513 	mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
    514 
    515 	tl->tl_offset = offset;
    516 
    517 	for (t = 0; t < TXG_SIZE; t++)
    518 		tl->tl_head[t] = NULL;
    519 }
    520 
    521 void
    522 txg_list_destroy(txg_list_t *tl)
    523 {
    524 	int t;
    525 
    526 	for (t = 0; t < TXG_SIZE; t++)
    527 		ASSERT(txg_list_empty(tl, t));
    528 
    529 	mutex_destroy(&tl->tl_lock);
    530 }
    531 
    532 int
    533 txg_list_empty(txg_list_t *tl, uint64_t txg)
    534 {
    535 	return (tl->tl_head[txg & TXG_MASK] == NULL);
    536 }
    537 
    538 /*
    539  * Add an entry to the list.
    540  * Returns 0 if it's a new entry, 1 if it's already there.
    541  */
    542 int
    543 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
    544 {
    545 	int t = txg & TXG_MASK;
    546 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    547 	int already_on_list;
    548 
    549 	mutex_enter(&tl->tl_lock);
    550 	already_on_list = tn->tn_member[t];
    551 	if (!already_on_list) {
    552 		tn->tn_member[t] = 1;
    553 		tn->tn_next[t] = tl->tl_head[t];
    554 		tl->tl_head[t] = tn;
    555 	}
    556 	mutex_exit(&tl->tl_lock);
    557 
    558 	return (already_on_list);
    559 }
    560 
    561 /*
    562  * Remove the head of the list and return it.
    563  */
    564 void *
    565 txg_list_remove(txg_list_t *tl, uint64_t txg)
    566 {
    567 	int t = txg & TXG_MASK;
    568 	txg_node_t *tn;
    569 	void *p = NULL;
    570 
    571 	mutex_enter(&tl->tl_lock);
    572 	if ((tn = tl->tl_head[t]) != NULL) {
    573 		p = (char *)tn - tl->tl_offset;
    574 		tl->tl_head[t] = tn->tn_next[t];
    575 		tn->tn_next[t] = NULL;
    576 		tn->tn_member[t] = 0;
    577 	}
    578 	mutex_exit(&tl->tl_lock);
    579 
    580 	return (p);
    581 }
    582 
    583 /*
    584  * Remove a specific item from the list and return it.
    585  */
    586 void *
    587 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
    588 {
    589 	int t = txg & TXG_MASK;
    590 	txg_node_t *tn, **tp;
    591 
    592 	mutex_enter(&tl->tl_lock);
    593 
    594 	for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
    595 		if ((char *)tn - tl->tl_offset == p) {
    596 			*tp = tn->tn_next[t];
    597 			tn->tn_next[t] = NULL;
    598 			tn->tn_member[t] = 0;
    599 			mutex_exit(&tl->tl_lock);
    600 			return (p);
    601 		}
    602 	}
    603 
    604 	mutex_exit(&tl->tl_lock);
    605 
    606 	return (NULL);
    607 }
    608 
    609 int
    610 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
    611 {
    612 	int t = txg & TXG_MASK;
    613 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    614 
    615 	return (tn->tn_member[t]);
    616 }
    617 
    618 /*
    619  * Walk a txg list -- only safe if you know it's not changing.
    620  */
    621 void *
    622 txg_list_head(txg_list_t *tl, uint64_t txg)
    623 {
    624 	int t = txg & TXG_MASK;
    625 	txg_node_t *tn = tl->tl_head[t];
    626 
    627 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    628 }
    629 
    630 void *
    631 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
    632 {
    633 	int t = txg & TXG_MASK;
    634 	txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
    635 
    636 	tn = tn->tn_next[t];
    637 
    638 	return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
    639 }
    640