1 789 ahrens /* 2 789 ahrens * CDDL HEADER START 3 789 ahrens * 4 789 ahrens * The contents of this file are subject to the terms of the 5 2856 nd150628 * Common Development and Distribution License (the "License"). 6 2856 nd150628 * You may not use this file except in compliance with the License. 7 789 ahrens * 8 789 ahrens * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 789 ahrens * or http://www.opensolaris.org/os/licensing. 10 789 ahrens * See the License for the specific language governing permissions 11 789 ahrens * and limitations under the License. 12 789 ahrens * 13 789 ahrens * When distributing Covered Code, include this CDDL HEADER in each 14 789 ahrens * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 789 ahrens * If applicable, add the following below this CDDL HEADER, with the 16 789 ahrens * fields enclosed by brackets "[]" replaced with your own identifying 17 789 ahrens * information: Portions Copyright [yyyy] [name of copyright owner] 18 789 ahrens * 19 789 ahrens * CDDL HEADER END 20 789 ahrens */ 21 789 ahrens /* 22 10612 Ricardo * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 789 ahrens * Use is subject to license terms. 24 789 ahrens */ 25 789 ahrens 26 789 ahrens #include <sys/zfs_context.h> 27 789 ahrens #include <sys/txg_impl.h> 28 789 ahrens #include <sys/dmu_impl.h> 29 10612 Ricardo #include <sys/dmu_tx.h> 30 789 ahrens #include <sys/dsl_pool.h> 31 789 ahrens #include <sys/callb.h> 32 789 ahrens 33 789 ahrens /* 34 789 ahrens * Pool-wide transaction groups. 35 789 ahrens */ 36 789 ahrens 37 789 ahrens static void txg_sync_thread(dsl_pool_t *dp); 38 789 ahrens static void txg_quiesce_thread(dsl_pool_t *dp); 39 789 ahrens 40 6245 maybee int zfs_txg_timeout = 30; /* max seconds worth of delta per txg */ 41 789 ahrens 42 789 ahrens /* 43 789 ahrens * Prepare the txg subsystem. 44 789 ahrens */ 45 789 ahrens void 46 789 ahrens txg_init(dsl_pool_t *dp, uint64_t txg) 47 789 ahrens { 48 789 ahrens tx_state_t *tx = &dp->dp_tx; 49 2856 nd150628 int c; 50 789 ahrens bzero(tx, sizeof (tx_state_t)); 51 789 ahrens 52 789 ahrens tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP); 53 789 ahrens 54 5765 ek110237 for (c = 0; c < max_ncpus; c++) { 55 5765 ek110237 int i; 56 5765 ek110237 57 2856 nd150628 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL); 58 5765 ek110237 for (i = 0; i < TXG_SIZE; i++) { 59 5765 ek110237 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, 60 5765 ek110237 NULL); 61 10612 Ricardo list_create(&tx->tx_cpu[c].tc_callbacks[i], 62 10612 Ricardo sizeof (dmu_tx_callback_t), 63 10612 Ricardo offsetof(dmu_tx_callback_t, dcb_node)); 64 5765 ek110237 } 65 5765 ek110237 } 66 2856 nd150628 67 2856 nd150628 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL); 68 789 ahrens 69 8214 Ricardo cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL); 70 8214 Ricardo cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL); 71 8214 Ricardo cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL); 72 8214 Ricardo cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL); 73 8214 Ricardo cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL); 74 8214 Ricardo 75 789 ahrens tx->tx_open_txg = txg; 76 789 ahrens } 77 789 ahrens 78 789 ahrens /* 79 789 ahrens * Close down the txg subsystem. 80 789 ahrens */ 81 789 ahrens void 82 789 ahrens txg_fini(dsl_pool_t *dp) 83 789 ahrens { 84 789 ahrens tx_state_t *tx = &dp->dp_tx; 85 2856 nd150628 int c; 86 789 ahrens 87 789 ahrens ASSERT(tx->tx_threads == 0); 88 789 ahrens 89 2856 nd150628 mutex_destroy(&tx->tx_sync_lock); 90 8214 Ricardo 91 8214 Ricardo cv_destroy(&tx->tx_sync_more_cv); 92 8214 Ricardo cv_destroy(&tx->tx_sync_done_cv); 93 8214 Ricardo cv_destroy(&tx->tx_quiesce_more_cv); 94 8214 Ricardo cv_destroy(&tx->tx_quiesce_done_cv); 95 8214 Ricardo cv_destroy(&tx->tx_exit_cv); 96 2856 nd150628 97 5765 ek110237 for (c = 0; c < max_ncpus; c++) { 98 5765 ek110237 int i; 99 5765 ek110237 100 2856 nd150628 mutex_destroy(&tx->tx_cpu[c].tc_lock); 101 10612 Ricardo for (i = 0; i < TXG_SIZE; i++) { 102 5765 ek110237 cv_destroy(&tx->tx_cpu[c].tc_cv[i]); 103 10612 Ricardo list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); 104 10612 Ricardo } 105 5765 ek110237 } 106 10612 Ricardo 107 10612 Ricardo if (tx->tx_commit_cb_taskq != NULL) 108 10612 Ricardo taskq_destroy(tx->tx_commit_cb_taskq); 109 789 ahrens 110 789 ahrens kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); 111 789 ahrens 112 789 ahrens bzero(tx, sizeof (tx_state_t)); 113 789 ahrens } 114 789 ahrens 115 789 ahrens /* 116 789 ahrens * Start syncing transaction groups. 117 789 ahrens */ 118 789 ahrens void 119 789 ahrens txg_sync_start(dsl_pool_t *dp) 120 789 ahrens { 121 789 ahrens tx_state_t *tx = &dp->dp_tx; 122 789 ahrens 123 789 ahrens mutex_enter(&tx->tx_sync_lock); 124 789 ahrens 125 789 ahrens dprintf("pool %p\n", dp); 126 789 ahrens 127 789 ahrens ASSERT(tx->tx_threads == 0); 128 789 ahrens 129 6245 maybee tx->tx_threads = 2; 130 789 ahrens 131 789 ahrens tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread, 132 789 ahrens dp, 0, &p0, TS_RUN, minclsyspri); 133 789 ahrens 134 7046 ahrens /* 135 7046 ahrens * The sync thread can need a larger-than-default stack size on 136 7046 ahrens * 32-bit x86. This is due in part to nested pools and 137 7046 ahrens * scrub_visitbp() recursion. 138 7046 ahrens */ 139 7046 ahrens tx->tx_sync_thread = thread_create(NULL, 12<<10, txg_sync_thread, 140 789 ahrens dp, 0, &p0, TS_RUN, minclsyspri); 141 789 ahrens 142 789 ahrens mutex_exit(&tx->tx_sync_lock); 143 789 ahrens } 144 789 ahrens 145 789 ahrens static void 146 789 ahrens txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr) 147 789 ahrens { 148 789 ahrens CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG); 149 789 ahrens mutex_enter(&tx->tx_sync_lock); 150 789 ahrens } 151 789 ahrens 152 789 ahrens static void 153 789 ahrens txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp) 154 789 ahrens { 155 789 ahrens ASSERT(*tpp != NULL); 156 789 ahrens *tpp = NULL; 157 789 ahrens tx->tx_threads--; 158 789 ahrens cv_broadcast(&tx->tx_exit_cv); 159 789 ahrens CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */ 160 789 ahrens thread_exit(); 161 789 ahrens } 162 789 ahrens 163 789 ahrens static void 164 6245 maybee txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time) 165 789 ahrens { 166 789 ahrens CALLB_CPR_SAFE_BEGIN(cpr); 167 789 ahrens 168 6245 maybee if (time) 169 11066 rafael (void) cv_timedwait(cv, &tx->tx_sync_lock, 170 11066 rafael ddi_get_lbolt() + time); 171 789 ahrens else 172 789 ahrens cv_wait(cv, &tx->tx_sync_lock); 173 789 ahrens 174 789 ahrens CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock); 175 789 ahrens } 176 789 ahrens 177 789 ahrens /* 178 789 ahrens * Stop syncing transaction groups. 179 789 ahrens */ 180 789 ahrens void 181 789 ahrens txg_sync_stop(dsl_pool_t *dp) 182 789 ahrens { 183 789 ahrens tx_state_t *tx = &dp->dp_tx; 184 789 ahrens 185 789 ahrens dprintf("pool %p\n", dp); 186 789 ahrens /* 187 789 ahrens * Finish off any work in progress. 188 789 ahrens */ 189 6245 maybee ASSERT(tx->tx_threads == 2); 190 10921 Tim 191 10921 Tim /* 192 10921 Tim * We need to ensure that we've vacated the deferred space_maps. 193 10921 Tim */ 194 10921 Tim txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE); 195 789 ahrens 196 789 ahrens /* 197 6245 maybee * Wake all sync threads and wait for them to die. 198 789 ahrens */ 199 789 ahrens mutex_enter(&tx->tx_sync_lock); 200 789 ahrens 201 6245 maybee ASSERT(tx->tx_threads == 2); 202 789 ahrens 203 789 ahrens tx->tx_exiting = 1; 204 789 ahrens 205 789 ahrens cv_broadcast(&tx->tx_quiesce_more_cv); 206 789 ahrens cv_broadcast(&tx->tx_quiesce_done_cv); 207 789 ahrens cv_broadcast(&tx->tx_sync_more_cv); 208 789 ahrens 209 789 ahrens while (tx->tx_threads != 0) 210 789 ahrens cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock); 211 789 ahrens 212 789 ahrens tx->tx_exiting = 0; 213 789 ahrens 214 789 ahrens mutex_exit(&tx->tx_sync_lock); 215 789 ahrens } 216 789 ahrens 217 789 ahrens uint64_t 218 789 ahrens txg_hold_open(dsl_pool_t *dp, txg_handle_t *th) 219 789 ahrens { 220 789 ahrens tx_state_t *tx = &dp->dp_tx; 221 789 ahrens tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID]; 222 789 ahrens uint64_t txg; 223 789 ahrens 224 789 ahrens mutex_enter(&tc->tc_lock); 225 789 ahrens 226 789 ahrens txg = tx->tx_open_txg; 227 789 ahrens tc->tc_count[txg & TXG_MASK]++; 228 789 ahrens 229 789 ahrens th->th_cpu = tc; 230 789 ahrens th->th_txg = txg; 231 789 ahrens 232 789 ahrens return (txg); 233 789 ahrens } 234 789 ahrens 235 789 ahrens void 236 789 ahrens txg_rele_to_quiesce(txg_handle_t *th) 237 789 ahrens { 238 789 ahrens tx_cpu_t *tc = th->th_cpu; 239 789 ahrens 240 789 ahrens mutex_exit(&tc->tc_lock); 241 789 ahrens } 242 789 ahrens 243 789 ahrens void 244 10612 Ricardo txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks) 245 10612 Ricardo { 246 10612 Ricardo tx_cpu_t *tc = th->th_cpu; 247 10612 Ricardo int g = th->th_txg & TXG_MASK; 248 10612 Ricardo 249 10612 Ricardo mutex_enter(&tc->tc_lock); 250 10612 Ricardo list_move_tail(&tc->tc_callbacks[g], tx_callbacks); 251 10612 Ricardo mutex_exit(&tc->tc_lock); 252 10612 Ricardo } 253 10612 Ricardo 254 10612 Ricardo void 255 789 ahrens txg_rele_to_sync(txg_handle_t *th) 256 789 ahrens { 257 789 ahrens tx_cpu_t *tc = th->th_cpu; 258 789 ahrens int g = th->th_txg & TXG_MASK; 259 789 ahrens 260 789 ahrens mutex_enter(&tc->tc_lock); 261 789 ahrens ASSERT(tc->tc_count[g] != 0); 262 789 ahrens if (--tc->tc_count[g] == 0) 263 789 ahrens cv_broadcast(&tc->tc_cv[g]); 264 789 ahrens mutex_exit(&tc->tc_lock); 265 789 ahrens 266 789 ahrens th->th_cpu = NULL; /* defensive */ 267 789 ahrens } 268 789 ahrens 269 789 ahrens static void 270 789 ahrens txg_quiesce(dsl_pool_t *dp, uint64_t txg) 271 789 ahrens { 272 789 ahrens tx_state_t *tx = &dp->dp_tx; 273 789 ahrens int g = txg & TXG_MASK; 274 789 ahrens int c; 275 789 ahrens 276 789 ahrens /* 277 789 ahrens * Grab all tx_cpu locks so nobody else can get into this txg. 278 789 ahrens */ 279 789 ahrens for (c = 0; c < max_ncpus; c++) 280 789 ahrens mutex_enter(&tx->tx_cpu[c].tc_lock); 281 789 ahrens 282 789 ahrens ASSERT(txg == tx->tx_open_txg); 283 789 ahrens tx->tx_open_txg++; 284 789 ahrens 285 789 ahrens /* 286 789 ahrens * Now that we've incremented tx_open_txg, we can let threads 287 789 ahrens * enter the next transaction group. 288 789 ahrens */ 289 789 ahrens for (c = 0; c < max_ncpus; c++) 290 789 ahrens mutex_exit(&tx->tx_cpu[c].tc_lock); 291 789 ahrens 292 789 ahrens /* 293 789 ahrens * Quiesce the transaction group by waiting for everyone to txg_exit(). 294 789 ahrens */ 295 789 ahrens for (c = 0; c < max_ncpus; c++) { 296 789 ahrens tx_cpu_t *tc = &tx->tx_cpu[c]; 297 789 ahrens mutex_enter(&tc->tc_lock); 298 789 ahrens while (tc->tc_count[g] != 0) 299 789 ahrens cv_wait(&tc->tc_cv[g], &tc->tc_lock); 300 789 ahrens mutex_exit(&tc->tc_lock); 301 10612 Ricardo } 302 10612 Ricardo } 303 10612 Ricardo 304 10612 Ricardo static void 305 10612 Ricardo txg_do_callbacks(list_t *cb_list) 306 10612 Ricardo { 307 10612 Ricardo dmu_tx_do_callbacks(cb_list, 0); 308 10612 Ricardo 309 10612 Ricardo list_destroy(cb_list); 310 10612 Ricardo 311 10612 Ricardo kmem_free(cb_list, sizeof (list_t)); 312 10612 Ricardo } 313 10612 Ricardo 314 10612 Ricardo /* 315 10612 Ricardo * Dispatch the commit callbacks registered on this txg to worker threads. 316 10612 Ricardo */ 317 10612 Ricardo static void 318 10612 Ricardo txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg) 319 10612 Ricardo { 320 10612 Ricardo int c; 321 10612 Ricardo tx_state_t *tx = &dp->dp_tx; 322 10612 Ricardo list_t *cb_list; 323 10612 Ricardo 324 10612 Ricardo for (c = 0; c < max_ncpus; c++) { 325 10612 Ricardo tx_cpu_t *tc = &tx->tx_cpu[c]; 326 10612 Ricardo /* No need to lock tx_cpu_t at this point */ 327 10612 Ricardo 328 10612 Ricardo int g = txg & TXG_MASK; 329 10612 Ricardo 330 10612 Ricardo if (list_is_empty(&tc->tc_callbacks[g])) 331 10612 Ricardo continue; 332 10612 Ricardo 333 10612 Ricardo if (tx->tx_commit_cb_taskq == NULL) { 334 10612 Ricardo /* 335 10612 Ricardo * Commit callback taskq hasn't been created yet. 336 10612 Ricardo */ 337 10612 Ricardo tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb", 338 10612 Ricardo max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2, 339 10612 Ricardo TASKQ_PREPOPULATE); 340 10612 Ricardo } 341 10612 Ricardo 342 10612 Ricardo cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP); 343 10612 Ricardo list_create(cb_list, sizeof (dmu_tx_callback_t), 344 10612 Ricardo offsetof(dmu_tx_callback_t, dcb_node)); 345 10612 Ricardo 346 10612 Ricardo list_move_tail(&tc->tc_callbacks[g], cb_list); 347 10612 Ricardo 348 10612 Ricardo (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *) 349 10612 Ricardo txg_do_callbacks, cb_list, TQ_SLEEP); 350 789 ahrens } 351 789 ahrens } 352 789 ahrens 353 789 ahrens static void 354 789 ahrens txg_sync_thread(dsl_pool_t *dp) 355 789 ahrens { 356 11147 George spa_t *spa = dp->dp_spa; 357 789 ahrens tx_state_t *tx = &dp->dp_tx; 358 789 ahrens callb_cpr_t cpr; 359 7468 Mark uint64_t start, delta; 360 789 ahrens 361 789 ahrens txg_thread_enter(tx, &cpr); 362 789 ahrens 363 6245 maybee start = delta = 0; 364 789 ahrens for (;;) { 365 7468 Mark uint64_t timer, timeout = zfs_txg_timeout * hz; 366 7468 Mark uint64_t txg; 367 789 ahrens 368 789 ahrens /* 369 7837 Matthew * We sync when we're scrubbing, there's someone waiting 370 7837 Matthew * on us, or the quiesce thread has handed off a txg to 371 7837 Matthew * us, or we have reached our timeout. 372 789 ahrens */ 373 6245 maybee timer = (delta >= timeout ? 0 : timeout - delta); 374 7837 Matthew while ((dp->dp_scrub_func == SCRUB_FUNC_NONE || 375 11147 George spa_load_state(spa) != SPA_LOAD_NONE || 376 11147 George spa_shutting_down(spa)) && 377 7837 Matthew !tx->tx_exiting && timer > 0 && 378 789 ahrens tx->tx_synced_txg >= tx->tx_sync_txg_waiting && 379 789 ahrens tx->tx_quiesced_txg == 0) { 380 789 ahrens dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n", 381 789 ahrens tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 382 6245 maybee txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer); 383 11066 rafael delta = ddi_get_lbolt() - start; 384 6245 maybee timer = (delta > timeout ? 0 : timeout - delta); 385 789 ahrens } 386 789 ahrens 387 789 ahrens /* 388 789 ahrens * Wait until the quiesce thread hands off a txg to us, 389 789 ahrens * prompting it to do so if necessary. 390 789 ahrens */ 391 789 ahrens while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) { 392 789 ahrens if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1) 393 789 ahrens tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1; 394 789 ahrens cv_broadcast(&tx->tx_quiesce_more_cv); 395 789 ahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0); 396 789 ahrens } 397 789 ahrens 398 789 ahrens if (tx->tx_exiting) 399 789 ahrens txg_thread_exit(tx, &cpr, &tx->tx_sync_thread); 400 789 ahrens 401 789 ahrens /* 402 789 ahrens * Consume the quiesced txg which has been handed off to 403 789 ahrens * us. This may cause the quiescing thread to now be 404 789 ahrens * able to quiesce another txg, so we must signal it. 405 789 ahrens */ 406 789 ahrens txg = tx->tx_quiesced_txg; 407 789 ahrens tx->tx_quiesced_txg = 0; 408 789 ahrens tx->tx_syncing_txg = txg; 409 789 ahrens cv_broadcast(&tx->tx_quiesce_more_cv); 410 789 ahrens 411 789 ahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 412 5765 ek110237 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 413 789 ahrens mutex_exit(&tx->tx_sync_lock); 414 7468 Mark 415 11066 rafael start = ddi_get_lbolt(); 416 11147 George spa_sync(spa, txg); 417 11066 rafael delta = ddi_get_lbolt() - start; 418 6245 maybee 419 789 ahrens mutex_enter(&tx->tx_sync_lock); 420 789 ahrens tx->tx_synced_txg = txg; 421 789 ahrens tx->tx_syncing_txg = 0; 422 789 ahrens cv_broadcast(&tx->tx_sync_done_cv); 423 10612 Ricardo 424 10612 Ricardo /* 425 10612 Ricardo * Dispatch commit callbacks to worker threads. 426 10612 Ricardo */ 427 10612 Ricardo txg_dispatch_callbacks(dp, txg); 428 789 ahrens } 429 789 ahrens } 430 789 ahrens 431 789 ahrens static void 432 789 ahrens txg_quiesce_thread(dsl_pool_t *dp) 433 789 ahrens { 434 789 ahrens tx_state_t *tx = &dp->dp_tx; 435 789 ahrens callb_cpr_t cpr; 436 789 ahrens 437 789 ahrens txg_thread_enter(tx, &cpr); 438 789 ahrens 439 789 ahrens for (;;) { 440 789 ahrens uint64_t txg; 441 789 ahrens 442 789 ahrens /* 443 789 ahrens * We quiesce when there's someone waiting on us. 444 789 ahrens * However, we can only have one txg in "quiescing" or 445 789 ahrens * "quiesced, waiting to sync" state. So we wait until 446 789 ahrens * the "quiesced, waiting to sync" txg has been consumed 447 789 ahrens * by the sync thread. 448 789 ahrens */ 449 789 ahrens while (!tx->tx_exiting && 450 789 ahrens (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting || 451 789 ahrens tx->tx_quiesced_txg != 0)) 452 789 ahrens txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0); 453 789 ahrens 454 789 ahrens if (tx->tx_exiting) 455 789 ahrens txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread); 456 789 ahrens 457 789 ahrens txg = tx->tx_open_txg; 458 789 ahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 459 789 ahrens txg, tx->tx_quiesce_txg_waiting, 460 789 ahrens tx->tx_sync_txg_waiting); 461 789 ahrens mutex_exit(&tx->tx_sync_lock); 462 789 ahrens txg_quiesce(dp, txg); 463 789 ahrens mutex_enter(&tx->tx_sync_lock); 464 789 ahrens 465 789 ahrens /* 466 789 ahrens * Hand this txg off to the sync thread. 467 789 ahrens */ 468 789 ahrens dprintf("quiesce done, handing off txg %llu\n", txg); 469 789 ahrens tx->tx_quiesced_txg = txg; 470 789 ahrens cv_broadcast(&tx->tx_sync_more_cv); 471 789 ahrens cv_broadcast(&tx->tx_quiesce_done_cv); 472 789 ahrens } 473 789 ahrens } 474 789 ahrens 475 6245 maybee /* 476 6245 maybee * Delay this thread by 'ticks' if we are still in the open transaction 477 6245 maybee * group and there is already a waiting txg quiesing or quiesced. Abort 478 6245 maybee * the delay if this txg stalls or enters the quiesing state. 479 6245 maybee */ 480 6245 maybee void 481 6245 maybee txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks) 482 6245 maybee { 483 6245 maybee tx_state_t *tx = &dp->dp_tx; 484 11066 rafael int timeout = ddi_get_lbolt() + ticks; 485 6245 maybee 486 6245 maybee /* don't delay if this txg could transition to quiesing immediately */ 487 6245 maybee if (tx->tx_open_txg > txg || 488 6245 maybee tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1) 489 6245 maybee return; 490 6245 maybee 491 6245 maybee mutex_enter(&tx->tx_sync_lock); 492 6245 maybee if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) { 493 6245 maybee mutex_exit(&tx->tx_sync_lock); 494 6245 maybee return; 495 6245 maybee } 496 6245 maybee 497 11066 rafael while (ddi_get_lbolt() < timeout && 498 6245 maybee tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) 499 6245 maybee (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock, 500 6245 maybee timeout); 501 6245 maybee 502 6245 maybee mutex_exit(&tx->tx_sync_lock); 503 6245 maybee } 504 6245 maybee 505 789 ahrens void 506 789 ahrens txg_wait_synced(dsl_pool_t *dp, uint64_t txg) 507 789 ahrens { 508 789 ahrens tx_state_t *tx = &dp->dp_tx; 509 789 ahrens 510 789 ahrens mutex_enter(&tx->tx_sync_lock); 511 6245 maybee ASSERT(tx->tx_threads == 2); 512 789 ahrens if (txg == 0) 513 10922 Jeff txg = tx->tx_open_txg + TXG_DEFER_SIZE; 514 789 ahrens if (tx->tx_sync_txg_waiting < txg) 515 789 ahrens tx->tx_sync_txg_waiting = txg; 516 789 ahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 517 789 ahrens txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 518 789 ahrens while (tx->tx_synced_txg < txg) { 519 789 ahrens dprintf("broadcasting sync more " 520 789 ahrens "tx_synced=%llu waiting=%llu dp=%p\n", 521 789 ahrens tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp); 522 789 ahrens cv_broadcast(&tx->tx_sync_more_cv); 523 789 ahrens cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock); 524 789 ahrens } 525 789 ahrens mutex_exit(&tx->tx_sync_lock); 526 789 ahrens } 527 789 ahrens 528 789 ahrens void 529 789 ahrens txg_wait_open(dsl_pool_t *dp, uint64_t txg) 530 789 ahrens { 531 789 ahrens tx_state_t *tx = &dp->dp_tx; 532 789 ahrens 533 789 ahrens mutex_enter(&tx->tx_sync_lock); 534 6245 maybee ASSERT(tx->tx_threads == 2); 535 789 ahrens if (txg == 0) 536 789 ahrens txg = tx->tx_open_txg + 1; 537 789 ahrens if (tx->tx_quiesce_txg_waiting < txg) 538 789 ahrens tx->tx_quiesce_txg_waiting = txg; 539 789 ahrens dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", 540 789 ahrens txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting); 541 789 ahrens while (tx->tx_open_txg < txg) { 542 789 ahrens cv_broadcast(&tx->tx_quiesce_more_cv); 543 789 ahrens cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock); 544 789 ahrens } 545 789 ahrens mutex_exit(&tx->tx_sync_lock); 546 789 ahrens } 547 789 ahrens 548 7046 ahrens boolean_t 549 789 ahrens txg_stalled(dsl_pool_t *dp) 550 789 ahrens { 551 789 ahrens tx_state_t *tx = &dp->dp_tx; 552 789 ahrens return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg); 553 7046 ahrens } 554 7046 ahrens 555 7046 ahrens boolean_t 556 7046 ahrens txg_sync_waiting(dsl_pool_t *dp) 557 7046 ahrens { 558 7046 ahrens tx_state_t *tx = &dp->dp_tx; 559 7046 ahrens 560 7046 ahrens return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting || 561 7046 ahrens tx->tx_quiesced_txg != 0); 562 789 ahrens } 563 789 ahrens 564 789 ahrens /* 565 789 ahrens * Per-txg object lists. 566 789 ahrens */ 567 789 ahrens void 568 789 ahrens txg_list_create(txg_list_t *tl, size_t offset) 569 789 ahrens { 570 789 ahrens int t; 571 789 ahrens 572 789 ahrens mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL); 573 789 ahrens 574 789 ahrens tl->tl_offset = offset; 575 789 ahrens 576 789 ahrens for (t = 0; t < TXG_SIZE; t++) 577 789 ahrens tl->tl_head[t] = NULL; 578 789 ahrens } 579 789 ahrens 580 789 ahrens void 581 789 ahrens txg_list_destroy(txg_list_t *tl) 582 789 ahrens { 583 789 ahrens int t; 584 789 ahrens 585 789 ahrens for (t = 0; t < TXG_SIZE; t++) 586 789 ahrens ASSERT(txg_list_empty(tl, t)); 587 789 ahrens 588 789 ahrens mutex_destroy(&tl->tl_lock); 589 789 ahrens } 590 789 ahrens 591 789 ahrens int 592 789 ahrens txg_list_empty(txg_list_t *tl, uint64_t txg) 593 789 ahrens { 594 789 ahrens return (tl->tl_head[txg & TXG_MASK] == NULL); 595 789 ahrens } 596 789 ahrens 597 789 ahrens /* 598 789 ahrens * Add an entry to the list. 599 789 ahrens * Returns 0 if it's a new entry, 1 if it's already there. 600 789 ahrens */ 601 789 ahrens int 602 789 ahrens txg_list_add(txg_list_t *tl, void *p, uint64_t txg) 603 789 ahrens { 604 789 ahrens int t = txg & TXG_MASK; 605 789 ahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 606 789 ahrens int already_on_list; 607 789 ahrens 608 789 ahrens mutex_enter(&tl->tl_lock); 609 789 ahrens already_on_list = tn->tn_member[t]; 610 789 ahrens if (!already_on_list) { 611 789 ahrens tn->tn_member[t] = 1; 612 789 ahrens tn->tn_next[t] = tl->tl_head[t]; 613 789 ahrens tl->tl_head[t] = tn; 614 789 ahrens } 615 789 ahrens mutex_exit(&tl->tl_lock); 616 789 ahrens 617 789 ahrens return (already_on_list); 618 789 ahrens } 619 789 ahrens 620 789 ahrens /* 621 789 ahrens * Remove the head of the list and return it. 622 789 ahrens */ 623 789 ahrens void * 624 789 ahrens txg_list_remove(txg_list_t *tl, uint64_t txg) 625 789 ahrens { 626 789 ahrens int t = txg & TXG_MASK; 627 789 ahrens txg_node_t *tn; 628 789 ahrens void *p = NULL; 629 789 ahrens 630 789 ahrens mutex_enter(&tl->tl_lock); 631 789 ahrens if ((tn = tl->tl_head[t]) != NULL) { 632 789 ahrens p = (char *)tn - tl->tl_offset; 633 789 ahrens tl->tl_head[t] = tn->tn_next[t]; 634 789 ahrens tn->tn_next[t] = NULL; 635 789 ahrens tn->tn_member[t] = 0; 636 789 ahrens } 637 789 ahrens mutex_exit(&tl->tl_lock); 638 789 ahrens 639 789 ahrens return (p); 640 789 ahrens } 641 789 ahrens 642 789 ahrens /* 643 789 ahrens * Remove a specific item from the list and return it. 644 789 ahrens */ 645 789 ahrens void * 646 789 ahrens txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg) 647 789 ahrens { 648 789 ahrens int t = txg & TXG_MASK; 649 789 ahrens txg_node_t *tn, **tp; 650 789 ahrens 651 789 ahrens mutex_enter(&tl->tl_lock); 652 789 ahrens 653 789 ahrens for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) { 654 789 ahrens if ((char *)tn - tl->tl_offset == p) { 655 789 ahrens *tp = tn->tn_next[t]; 656 789 ahrens tn->tn_next[t] = NULL; 657 789 ahrens tn->tn_member[t] = 0; 658 789 ahrens mutex_exit(&tl->tl_lock); 659 789 ahrens return (p); 660 789 ahrens } 661 789 ahrens } 662 789 ahrens 663 789 ahrens mutex_exit(&tl->tl_lock); 664 789 ahrens 665 789 ahrens return (NULL); 666 789 ahrens } 667 789 ahrens 668 789 ahrens int 669 789 ahrens txg_list_member(txg_list_t *tl, void *p, uint64_t txg) 670 789 ahrens { 671 789 ahrens int t = txg & TXG_MASK; 672 789 ahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 673 789 ahrens 674 789 ahrens return (tn->tn_member[t]); 675 789 ahrens } 676 789 ahrens 677 789 ahrens /* 678 789 ahrens * Walk a txg list -- only safe if you know it's not changing. 679 789 ahrens */ 680 789 ahrens void * 681 789 ahrens txg_list_head(txg_list_t *tl, uint64_t txg) 682 789 ahrens { 683 789 ahrens int t = txg & TXG_MASK; 684 789 ahrens txg_node_t *tn = tl->tl_head[t]; 685 789 ahrens 686 789 ahrens return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 687 789 ahrens } 688 789 ahrens 689 789 ahrens void * 690 789 ahrens txg_list_next(txg_list_t *tl, void *p, uint64_t txg) 691 789 ahrens { 692 789 ahrens int t = txg & TXG_MASK; 693 789 ahrens txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset); 694 789 ahrens 695 789 ahrens tn = tn->tn_next[t]; 696 789 ahrens 697 789 ahrens return (tn == NULL ? NULL : (char *)tn - tl->tl_offset); 698 789 ahrens } 699