Home | History | Annotate | Download | only in configd
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
     27 
     28 /*
     29  * file_object.c - enter objects into and load them from the backend
     30  *
     31  * The primary entry points in this layer are object_create(),
     32  * object_create_pg(), object_delete(), and object_fill_children().  They each
     33  * take an rc_node_t and use the functions in the object_info_t info array for
     34  * the node's type.
     35  */
     36 
     37 #include <assert.h>
     38 #include <pthread.h>
     39 #include <stdio.h>
     40 #include <stdlib.h>
     41 #include <string.h>
     42 #include <strings.h>
     43 
     44 #include "configd.h"
     45 #include "repcache_protocol.h"
     46 
     47 typedef struct child_info {
     48 	rc_node_t	*ci_parent;
     49 	backend_tx_t	*ci_tx;			/* only for properties */
     50 	rc_node_lookup_t ci_base_nl;
     51 } child_info_t;
     52 
     53 typedef struct delete_ent delete_ent_t;
     54 typedef struct delete_stack delete_stack_t;
     55 typedef struct delete_info delete_info_t;
     56 
     57 typedef int	delete_cb_func(delete_info_t *, const delete_ent_t *);
     58 
     59 struct delete_ent {
     60 	delete_cb_func	*de_cb;		/* callback */
     61 	uint32_t	de_backend;
     62 	uint32_t	de_id;
     63 	uint32_t	de_gen;		/* only for property groups */
     64 };
     65 
     66 struct delete_stack {
     67 	struct delete_stack *ds_next;
     68 	uint32_t	ds_size;	/* number of elements */
     69 	uint32_t	ds_cur;		/* current offset */
     70 	delete_ent_t	ds_buf[1];	/* actually ds_size */
     71 };
     72 #define	DELETE_STACK_SIZE(x)	offsetof(delete_stack_t, ds_buf[(x)])
     73 
     74 struct delete_info {
     75 	backend_tx_t	*di_tx;
     76 	backend_tx_t	*di_np_tx;
     77 	delete_stack_t	*di_stack;
     78 	delete_stack_t	*di_free;
     79 };
     80 
     81 typedef struct object_info {
     82 	uint32_t	obj_type;
     83 	enum id_space	obj_id_space;
     84 
     85 	int (*obj_fill_children)(rc_node_t *);
     86 	int (*obj_setup_child_info)(rc_node_t *, uint32_t, child_info_t *);
     87 	int (*obj_query_child)(backend_query_t *, rc_node_lookup_t *,
     88 	    const char *);
     89 	int (*obj_insert_child)(backend_tx_t *, rc_node_lookup_t *,
     90 	    const char *);
     91 	int (*obj_insert_pg_child)(backend_tx_t *, rc_node_lookup_t *,
     92 	    const char *, const char *, uint32_t, uint32_t);
     93 	int (*obj_delete_start)(rc_node_t *, delete_info_t *);
     94 } object_info_t;
     95 
     96 static void
     97 string_to_id(const char *str, uint32_t *output, const char *fieldname)
     98 {
     99 	if (uu_strtouint(str, output, sizeof (*output), 0, 0, 0) == -1)
    100 		backend_panic("invalid integer \"%s\" in field \"%s\"",
    101 		    str, fieldname);
    102 }
    103 
    104 #define	NUM_NEEDED	50
    105 
    106 static int
    107 delete_stack_push(delete_info_t *dip, uint32_t be, delete_cb_func *cb,
    108     uint32_t id, uint32_t gen)
    109 {
    110 	delete_stack_t *cur = dip->di_stack;
    111 	delete_ent_t *ent;
    112 
    113 	if (cur == NULL || cur->ds_cur == cur->ds_size) {
    114 		delete_stack_t *new = dip->di_free;
    115 		dip->di_free = NULL;
    116 		if (new == NULL) {
    117 			new = uu_zalloc(DELETE_STACK_SIZE(NUM_NEEDED));
    118 			if (new == NULL)
    119 				return (REP_PROTOCOL_FAIL_NO_RESOURCES);
    120 			new->ds_size = NUM_NEEDED;
    121 		}
    122 		new->ds_cur = 0;
    123 		new->ds_next = dip->di_stack;
    124 		dip->di_stack = new;
    125 		cur = new;
    126 	}
    127 	assert(cur->ds_cur < cur->ds_size);
    128 	ent = &cur->ds_buf[cur->ds_cur++];
    129 
    130 	ent->de_backend = be;
    131 	ent->de_cb = cb;
    132 	ent->de_id = id;
    133 	ent->de_gen = gen;
    134 
    135 	return (REP_PROTOCOL_SUCCESS);
    136 }
    137 
    138 static int
    139 delete_stack_pop(delete_info_t *dip, delete_ent_t *out)
    140 {
    141 	delete_stack_t *cur = dip->di_stack;
    142 	delete_ent_t *ent;
    143 
    144 	if (cur == NULL)
    145 		return (0);
    146 	assert(cur->ds_cur > 0 && cur->ds_cur <= cur->ds_size);
    147 	ent = &cur->ds_buf[--cur->ds_cur];
    148 	if (cur->ds_cur == 0) {
    149 		dip->di_stack = cur->ds_next;
    150 		cur->ds_next = NULL;
    151 
    152 		if (dip->di_free != NULL)
    153 			uu_free(dip->di_free);
    154 		dip->di_free = cur;
    155 	}
    156 	if (ent == NULL)
    157 		return (0);
    158 
    159 	*out = *ent;
    160 	return (1);
    161 }
    162 
    163 static void
    164 delete_stack_cleanup(delete_info_t *dip)
    165 {
    166 	delete_stack_t *cur;
    167 	while ((cur = dip->di_stack) != NULL) {
    168 		dip->di_stack = cur->ds_next;
    169 
    170 		uu_free(cur);
    171 	}
    172 
    173 	if ((cur = dip->di_free) != NULL) {
    174 		assert(cur->ds_next == NULL);	/* should only be one */
    175 		uu_free(cur);
    176 		dip->di_free = NULL;
    177 	}
    178 }
    179 
    180 struct delete_cb_info {
    181 	delete_info_t	*dci_dip;
    182 	uint32_t	dci_be;
    183 	delete_cb_func	*dci_cb;
    184 	int		dci_result;
    185 };
    186 
    187 /*ARGSUSED*/
    188 static int
    189 push_delete_callback(void *data, int columns, char **vals, char **names)
    190 {
    191 	struct delete_cb_info *info = data;
    192 
    193 	const char *id_str = *vals++;
    194 	const char *gen_str = *vals++;
    195 
    196 	uint32_t id;
    197 	uint32_t gen;
    198 
    199 	assert(columns == 2);
    200 
    201 	string_to_id(id_str, &id, "id");
    202 	string_to_id(gen_str, &gen, "gen_id");
    203 
    204 	info->dci_result = delete_stack_push(info->dci_dip, info->dci_be,
    205 	    info->dci_cb, id, gen);
    206 
    207 	if (info->dci_result != REP_PROTOCOL_SUCCESS)
    208 		return (BACKEND_CALLBACK_ABORT);
    209 	return (BACKEND_CALLBACK_CONTINUE);
    210 }
    211 
    212 static int
    213 value_delete(delete_info_t *dip, const delete_ent_t *ent)
    214 {
    215 	uint32_t be = ent->de_backend;
    216 	int r;
    217 
    218 	backend_query_t *q;
    219 
    220 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
    221 	    dip->di_np_tx;
    222 
    223 	q = backend_query_alloc();
    224 
    225 	backend_query_add(q,
    226 	    "SELECT 1 FROM prop_lnk_tbl WHERE (lnk_val_id = %d); "
    227 	    "DELETE FROM value_tbl WHERE (value_id = %d); ",
    228 	    ent->de_id, ent->de_id);
    229 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
    230 	backend_query_free(q);
    231 	if (r == REP_PROTOCOL_DONE)
    232 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
    233 	return (r);
    234 }
    235 
    236 static int
    237 pg_lnk_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
    238 {
    239 	struct delete_cb_info info;
    240 	uint32_t be = ent->de_backend;
    241 	int r;
    242 
    243 	backend_query_t *q;
    244 
    245 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
    246 	    dip->di_np_tx;
    247 
    248 	/*
    249 	 * For non-persistent backends, we could only have one parent, and
    250 	 * he's already been deleted.
    251 	 *
    252 	 * For normal backends, we need to check to see if we're in
    253 	 * a snapshot or are the active generation for the property
    254 	 * group.  If we are, there's nothing to be done.
    255 	 */
    256 	if (be == BACKEND_TYPE_NORMAL) {
    257 		q = backend_query_alloc();
    258 		backend_query_add(q,
    259 		    "SELECT 1 "
    260 		    "FROM pg_tbl "
    261 		    "WHERE (pg_id = %d AND pg_gen_id = %d); "
    262 		    "SELECT 1 "
    263 		    "FROM snaplevel_lnk_tbl "
    264 		    "WHERE (snaplvl_pg_id = %d AND snaplvl_gen_id = %d);",
    265 		    ent->de_id, ent->de_gen,
    266 		    ent->de_id, ent->de_gen);
    267 		r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
    268 		backend_query_free(q);
    269 
    270 		if (r == REP_PROTOCOL_DONE)
    271 			return (REP_PROTOCOL_SUCCESS);	/* still in use */
    272 	}
    273 
    274 	info.dci_dip = dip;
    275 	info.dci_be =  be;
    276 	info.dci_cb = &value_delete;
    277 	info.dci_result = REP_PROTOCOL_SUCCESS;
    278 
    279 	q = backend_query_alloc();
    280 	backend_query_add(q,
    281 	    "SELECT DISTINCT lnk_val_id, 0 FROM prop_lnk_tbl "
    282 	    "WHERE "
    283 	    "    (lnk_pg_id = %d AND lnk_gen_id = %d AND lnk_val_id NOTNULL); "
    284 	    "DELETE FROM prop_lnk_tbl "
    285 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
    286 	    ent->de_id, ent->de_gen, ent->de_id, ent->de_gen);
    287 
    288 	r = backend_tx_run(tx, q, push_delete_callback, &info);
    289 	backend_query_free(q);
    290 
    291 	if (r == REP_PROTOCOL_DONE) {
    292 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
    293 		return (info.dci_result);
    294 	}
    295 	return (r);
    296 }
    297 
    298 static int
    299 propertygrp_delete(delete_info_t *dip, const delete_ent_t *ent)
    300 {
    301 	uint32_t be = ent->de_backend;
    302 	backend_query_t *q;
    303 	uint32_t gen;
    304 
    305 	int r;
    306 
    307 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
    308 	    dip->di_np_tx;
    309 
    310 	q = backend_query_alloc();
    311 	backend_query_add(q,
    312 	    "SELECT pg_gen_id FROM pg_tbl WHERE pg_id = %d; "
    313 	    "DELETE FROM pg_tbl WHERE pg_id = %d",
    314 	    ent->de_id, ent->de_id);
    315 	r = backend_tx_run_single_int(tx, q, &gen);
    316 	backend_query_free(q);
    317 
    318 	if (r != REP_PROTOCOL_SUCCESS)
    319 		return (r);
    320 
    321 	return (delete_stack_push(dip, be, &pg_lnk_tbl_delete,
    322 	    ent->de_id, gen));
    323 }
    324 
    325 static int
    326 snaplevel_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
    327 {
    328 	uint32_t be = ent->de_backend;
    329 	backend_query_t *q;
    330 	struct delete_cb_info info;
    331 
    332 	int r;
    333 
    334 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
    335 	    dip->di_np_tx;
    336 
    337 	info.dci_dip = dip;
    338 	info.dci_be = be;
    339 	info.dci_cb = &pg_lnk_tbl_delete;
    340 	info.dci_result = REP_PROTOCOL_SUCCESS;
    341 
    342 	q = backend_query_alloc();
    343 	backend_query_add(q,
    344 	    "SELECT snaplvl_pg_id, snaplvl_gen_id "
    345 	    "    FROM snaplevel_lnk_tbl "
    346 	    "    WHERE snaplvl_level_id = %d; "
    347 	    "DELETE FROM snaplevel_lnk_tbl WHERE snaplvl_level_id = %d",
    348 	    ent->de_id, ent->de_id);
    349 	r = backend_tx_run(tx, q, push_delete_callback, &info);
    350 	backend_query_free(q);
    351 
    352 	if (r == REP_PROTOCOL_DONE) {
    353 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
    354 		return (info.dci_result);
    355 	}
    356 	return (r);
    357 }
    358 
    359 static int
    360 snaplevel_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
    361 {
    362 	uint32_t be = ent->de_backend;
    363 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
    364 	    dip->di_np_tx;
    365 
    366 	struct delete_cb_info info;
    367 	backend_query_t *q;
    368 	int r;
    369 
    370 	assert(be == BACKEND_TYPE_NORMAL);
    371 
    372 	q = backend_query_alloc();
    373 	backend_query_add(q,
    374 	    "SELECT 1 FROM snapshot_lnk_tbl WHERE lnk_snap_id = %d",
    375 	    ent->de_id);
    376 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
    377 	backend_query_free(q);
    378 
    379 	if (r == REP_PROTOCOL_DONE)
    380 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
    381 
    382 	info.dci_dip = dip;
    383 	info.dci_be = be;
    384 	info.dci_cb = &snaplevel_lnk_delete;
    385 	info.dci_result = REP_PROTOCOL_SUCCESS;
    386 
    387 	q = backend_query_alloc();
    388 	backend_query_add(q,
    389 	    "SELECT snap_level_id, 0 FROM snaplevel_tbl WHERE snap_id = %d;"
    390 	    "DELETE FROM snaplevel_tbl WHERE snap_id = %d",
    391 	    ent->de_id, ent->de_id);
    392 	r = backend_tx_run(tx, q, push_delete_callback, &info);
    393 	backend_query_free(q);
    394 
    395 	if (r == REP_PROTOCOL_DONE) {
    396 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
    397 		return (info.dci_result);
    398 	}
    399 	return (r);
    400 }
    401 
    402 static int
    403 snapshot_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
    404 {
    405 	uint32_t be = ent->de_backend;
    406 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
    407 	    dip->di_np_tx;
    408 
    409 	backend_query_t *q;
    410 	uint32_t snapid;
    411 	int r;
    412 
    413 	assert(be == BACKEND_TYPE_NORMAL);
    414 
    415 	q = backend_query_alloc();
    416 	backend_query_add(q,
    417 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
    418 	    "DELETE FROM snapshot_lnk_tbl WHERE lnk_id = %d",
    419 	    ent->de_id, ent->de_id);
    420 	r = backend_tx_run_single_int(tx, q, &snapid);
    421 	backend_query_free(q);
    422 
    423 	if (r != REP_PROTOCOL_SUCCESS)
    424 		return (r);
    425 
    426 	return (delete_stack_push(dip, be, &snaplevel_tbl_delete, snapid, 0));
    427 }
    428 
    429 static int
    430 pgparent_delete_add_pgs(delete_info_t *dip, uint32_t parent_id)
    431 {
    432 	struct delete_cb_info info;
    433 	backend_query_t *q;
    434 	int r;
    435 
    436 	info.dci_dip = dip;
    437 	info.dci_be = BACKEND_TYPE_NORMAL;
    438 	info.dci_cb = &propertygrp_delete;
    439 	info.dci_result = REP_PROTOCOL_SUCCESS;
    440 
    441 	q = backend_query_alloc();
    442 	backend_query_add(q,
    443 	    "SELECT pg_id, 0 FROM pg_tbl WHERE pg_parent_id = %d",
    444 	    parent_id);
    445 
    446 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
    447 
    448 	if (r == REP_PROTOCOL_DONE) {
    449 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
    450 		backend_query_free(q);
    451 		return (info.dci_result);
    452 	}
    453 	if (r != REP_PROTOCOL_SUCCESS) {
    454 		backend_query_free(q);
    455 		return (r);
    456 	}
    457 
    458 	if (dip->di_np_tx != NULL) {
    459 		info.dci_be = BACKEND_TYPE_NONPERSIST;
    460 
    461 		r = backend_tx_run(dip->di_np_tx, q, push_delete_callback,
    462 		    &info);
    463 
    464 		if (r == REP_PROTOCOL_DONE) {
    465 			assert(info.dci_result != REP_PROTOCOL_SUCCESS);
    466 			backend_query_free(q);
    467 			return (info.dci_result);
    468 		}
    469 		if (r != REP_PROTOCOL_SUCCESS) {
    470 			backend_query_free(q);
    471 			return (r);
    472 		}
    473 	}
    474 	backend_query_free(q);
    475 	return (REP_PROTOCOL_SUCCESS);
    476 }
    477 
    478 static int
    479 service_delete(delete_info_t *dip, const delete_ent_t *ent)
    480 {
    481 	int r;
    482 
    483 	r = backend_tx_run_update_changed(dip->di_tx,
    484 	    "DELETE FROM service_tbl WHERE svc_id = %d", ent->de_id);
    485 	if (r != REP_PROTOCOL_SUCCESS)
    486 		return (r);
    487 
    488 	return (pgparent_delete_add_pgs(dip, ent->de_id));
    489 }
    490 
    491 static int
    492 instance_delete(delete_info_t *dip, const delete_ent_t *ent)
    493 {
    494 	struct delete_cb_info info;
    495 	int r;
    496 	backend_query_t *q;
    497 
    498 	r = backend_tx_run_update_changed(dip->di_tx,
    499 	    "DELETE FROM instance_tbl WHERE instance_id = %d", ent->de_id);
    500 	if (r != REP_PROTOCOL_SUCCESS)
    501 		return (r);
    502 
    503 	r = pgparent_delete_add_pgs(dip, ent->de_id);
    504 	if (r != REP_PROTOCOL_SUCCESS)
    505 		return (r);
    506 
    507 	info.dci_dip = dip;
    508 	info.dci_be = BACKEND_TYPE_NORMAL;
    509 	info.dci_cb = &snapshot_lnk_delete;
    510 	info.dci_result = REP_PROTOCOL_SUCCESS;
    511 
    512 	q = backend_query_alloc();
    513 	backend_query_add(q,
    514 	    "SELECT lnk_id, 0 FROM snapshot_lnk_tbl WHERE lnk_inst_id = %d",
    515 	    ent->de_id);
    516 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
    517 	backend_query_free(q);
    518 
    519 	if (r == REP_PROTOCOL_DONE) {
    520 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
    521 		return (info.dci_result);
    522 	}
    523 	return (r);
    524 }
    525 
    526 /*ARGSUSED*/
    527 static int
    528 fill_child_callback(void *data, int columns, char **vals, char **names)
    529 {
    530 	child_info_t *cp = data;
    531 	rc_node_t *np;
    532 	uint32_t main_id;
    533 	const char *name;
    534 	const char *cur;
    535 	rc_node_lookup_t *lp = &cp->ci_base_nl;
    536 
    537 	assert(columns == 2);
    538 
    539 	name = *vals++;
    540 	columns--;
    541 
    542 	cur = *vals++;
    543 	columns--;
    544 	string_to_id(cur, &main_id, "id");
    545 
    546 	lp->rl_main_id = main_id;
    547 
    548 	if ((np = rc_node_alloc()) == NULL)
    549 		return (BACKEND_CALLBACK_ABORT);
    550 
    551 	np = rc_node_setup(np, lp, name, cp->ci_parent);
    552 	rc_node_rele(np);
    553 
    554 	return (BACKEND_CALLBACK_CONTINUE);
    555 }
    556 
    557 /*ARGSUSED*/
    558 static int
    559 fill_snapshot_callback(void *data, int columns, char **vals, char **names)
    560 {
    561 	child_info_t *cp = data;
    562 	rc_node_t *np;
    563 	uint32_t main_id;
    564 	uint32_t snap_id;
    565 	const char *name;
    566 	const char *cur;
    567 	const char *snap;
    568 	rc_node_lookup_t *lp = &cp->ci_base_nl;
    569 
    570 	assert(columns == 3);
    571 
    572 	name = *vals++;
    573 	columns--;
    574 
    575 	cur = *vals++;
    576 	columns--;
    577 	snap = *vals++;
    578 	columns--;
    579 
    580 	string_to_id(cur, &main_id, "lnk_id");
    581 	string_to_id(snap, &snap_id, "lnk_snap_id");
    582 
    583 	lp->rl_main_id = main_id;
    584 
    585 	if ((np = rc_node_alloc()) == NULL)
    586 		return (BACKEND_CALLBACK_ABORT);
    587 
    588 	np = rc_node_setup_snapshot(np, lp, name, snap_id, cp->ci_parent);
    589 	rc_node_rele(np);
    590 
    591 	return (BACKEND_CALLBACK_CONTINUE);
    592 }
    593 
    594 /*ARGSUSED*/
    595 static int
    596 fill_pg_callback(void *data, int columns, char **vals, char **names)
    597 {
    598 	child_info_t *cip = data;
    599 	const char *name;
    600 	const char *type;
    601 	const char *cur;
    602 	uint32_t main_id;
    603 	uint32_t flags;
    604 	uint32_t gen_id;
    605 
    606 	rc_node_lookup_t *lp = &cip->ci_base_nl;
    607 	rc_node_t *newnode, *pg;
    608 
    609 	assert(columns == 5);
    610 
    611 	name = *vals++;		/* pg_name */
    612 	columns--;
    613 
    614 	cur = *vals++;		/* pg_id */
    615 	columns--;
    616 	string_to_id(cur, &main_id, "pg_id");
    617 
    618 	lp->rl_main_id = main_id;
    619 
    620 	cur = *vals++;		/* pg_gen_id */
    621 	columns--;
    622 	string_to_id(cur, &gen_id, "pg_gen_id");
    623 
    624 	type = *vals++;		/* pg_type */
    625 	columns--;
    626 
    627 	cur = *vals++;		/* pg_flags */
    628 	columns--;
    629 	string_to_id(cur, &flags, "pg_flags");
    630 
    631 	if ((newnode = rc_node_alloc()) == NULL)
    632 		return (BACKEND_CALLBACK_ABORT);
    633 
    634 	pg = rc_node_setup_pg(newnode, lp, name, type, flags, gen_id,
    635 	    cip->ci_parent);
    636 	if (pg == NULL) {
    637 		rc_node_destroy(newnode);
    638 		return (BACKEND_CALLBACK_ABORT);
    639 	}
    640 
    641 	rc_node_rele(pg);
    642 
    643 	return (BACKEND_CALLBACK_CONTINUE);
    644 }
    645 
    646 struct property_value_info {
    647 	char		*pvi_base;
    648 	size_t		pvi_pos;
    649 	size_t		pvi_size;
    650 	size_t		pvi_count;
    651 };
    652 
    653 /*ARGSUSED*/
    654 static int
    655 property_value_size_cb(void *data, int columns, char **vals, char **names)
    656 {
    657 	struct property_value_info *info = data;
    658 	assert(columns == 1);
    659 
    660 	info->pvi_size += strlen(vals[0]) + 1;		/* count the '\0' */
    661 
    662 	return (BACKEND_CALLBACK_CONTINUE);
    663 }
    664 
    665 /*ARGSUSED*/
    666 static int
    667 property_value_cb(void *data, int columns, char **vals, char **names)
    668 {
    669 	struct property_value_info *info = data;
    670 	size_t pos, left, len;
    671 
    672 	assert(columns == 1);
    673 	pos = info->pvi_pos;
    674 	left = info->pvi_size - pos;
    675 
    676 	pos = info->pvi_pos;
    677 	left = info->pvi_size - pos;
    678 
    679 	if ((len = strlcpy(&info->pvi_base[pos], vals[0], left)) >= left) {
    680 		/*
    681 		 * since we preallocated, above, this shouldn't happen
    682 		 */
    683 		backend_panic("unexpected database change");
    684 	}
    685 
    686 	len += 1;	/* count the '\0' */
    687 
    688 	info->pvi_pos += len;
    689 	info->pvi_count++;
    690 
    691 	return (BACKEND_CALLBACK_CONTINUE);
    692 }
    693 
    694 /*ARGSUSED*/
    695 void
    696 object_free_values(const char *vals, uint32_t type, size_t count, size_t size)
    697 {
    698 	if (vals != NULL)
    699 		uu_free((void *)vals);
    700 }
    701 
    702 /*ARGSUSED*/
    703 static int
    704 fill_property_callback(void *data, int columns, char **vals, char **names)
    705 {
    706 	child_info_t *cp = data;
    707 	backend_tx_t *tx = cp->ci_tx;
    708 	uint32_t main_id;
    709 	const char *name;
    710 	const char *cur;
    711 	rep_protocol_value_type_t type;
    712 	rc_node_lookup_t *lp = &cp->ci_base_nl;
    713 	struct property_value_info info;
    714 	int rc;
    715 
    716 	assert(columns == 4);
    717 	assert(tx != NULL);
    718 
    719 	info.pvi_base = NULL;
    720 	info.pvi_pos = 0;
    721 	info.pvi_size = 0;
    722 	info.pvi_count = 0;
    723 
    724 	name = *vals++;
    725 
    726 	cur = *vals++;
    727 	string_to_id(cur, &main_id, "lnk_prop_id");
    728 
    729 	cur = *vals++;
    730 	assert(('a' <= cur[0] && 'z' >= cur[0]) ||
    731 	    ('A' <= cur[0] && 'Z' >= cur[0]) &&
    732 	    (cur[1] == 0 || ('a' <= cur[1] && 'z' >= cur[1]) ||
    733 	    ('A' <= cur[1] && 'Z' >= cur[1])));
    734 	type = cur[0] | (cur[1] << 8);
    735 
    736 	lp->rl_main_id = main_id;
    737 
    738 	/*
    739 	 * fill in the values, if any
    740 	 */
    741 	if ((cur = *vals++) != NULL) {
    742 		rep_protocol_responseid_t r;
    743 		backend_query_t *q = backend_query_alloc();
    744 
    745 		/*
    746 		 * Ensure that select operation is reflective
    747 		 * of repository schema.  If the repository has
    748 		 * been upgraded,  make use of value ordering
    749 		 * by retrieving values in order using the
    750 		 * value_order column.  Otherwise, simply
    751 		 * run the select with no order specified.
    752 		 * The order-insensitive select is necessary
    753 		 * as on first reboot post-upgrade,  the repository
    754 		 * contents need to be read before the repository
    755 		 * backend is writable (and upgrade is possible).
    756 		 */
    757 		if (backend_is_upgraded(tx)) {
    758 			backend_query_add(q,
    759 			    "SELECT value_value FROM value_tbl "
    760 			    "WHERE (value_id = '%q') ORDER BY value_order",
    761 			    cur);
    762 		} else {
    763 			backend_query_add(q,
    764 			    "SELECT value_value FROM value_tbl "
    765 			    "WHERE (value_id = '%q')",
    766 			    cur);
    767 		}
    768 
    769 		switch (r = backend_tx_run(tx, q, property_value_size_cb,
    770 		    &info)) {
    771 		case REP_PROTOCOL_SUCCESS:
    772 			break;
    773 
    774 		case REP_PROTOCOL_FAIL_NO_RESOURCES:
    775 			backend_query_free(q);
    776 			return (BACKEND_CALLBACK_ABORT);
    777 
    778 		case REP_PROTOCOL_DONE:
    779 		default:
    780 			backend_panic("backend_tx_run() returned %d", r);
    781 		}
    782 		if (info.pvi_size > 0) {
    783 			info.pvi_base = uu_zalloc(info.pvi_size);
    784 			if (info.pvi_base == NULL) {
    785 				backend_query_free(q);
    786 				return (BACKEND_CALLBACK_ABORT);
    787 			}
    788 			switch (r = backend_tx_run(tx, q, property_value_cb,
    789 			    &info)) {
    790 			case REP_PROTOCOL_SUCCESS:
    791 				break;
    792 
    793 			case REP_PROTOCOL_FAIL_NO_RESOURCES:
    794 				uu_free(info.pvi_base);
    795 				backend_query_free(q);
    796 				return (BACKEND_CALLBACK_ABORT);
    797 
    798 			case REP_PROTOCOL_DONE:
    799 			default:
    800 				backend_panic("backend_tx_run() returned %d",
    801 				    r);
    802 			}
    803 		}
    804 		backend_query_free(q);
    805 	}
    806 
    807 	rc = rc_node_create_property(cp->ci_parent, lp, name, type,
    808 	    info.pvi_base, info.pvi_count, info.pvi_size);
    809 	if (rc != REP_PROTOCOL_SUCCESS) {
    810 		assert(rc == REP_PROTOCOL_FAIL_NO_RESOURCES);
    811 		return (BACKEND_CALLBACK_ABORT);
    812 	}
    813 
    814 	return (BACKEND_CALLBACK_CONTINUE);
    815 }
    816 
    817 /*
    818  * The *_setup_child_info() functions fill in a child_info_t structure with the
    819  * information for the children of np with type type.
    820  *
    821  * They fail with
    822  *   _TYPE_MISMATCH - object cannot have children of type type
    823  */
    824 
    825 static int
    826 scope_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
    827 {
    828 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
    829 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
    830 
    831 	bzero(cip, sizeof (*cip));
    832 	cip->ci_parent = np;
    833 	cip->ci_base_nl.rl_type = type;
    834 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
    835 	return (REP_PROTOCOL_SUCCESS);
    836 }
    837 
    838 static int
    839 service_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
    840 {
    841 	switch (type) {
    842 	case REP_PROTOCOL_ENTITY_INSTANCE:
    843 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
    844 		break;
    845 	default:
    846 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
    847 	}
    848 
    849 	bzero(cip, sizeof (*cip));
    850 	cip->ci_parent = np;
    851 	cip->ci_base_nl.rl_type = type;
    852 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
    853 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_main_id;
    854 
    855 	return (REP_PROTOCOL_SUCCESS);
    856 }
    857 
    858 static int
    859 instance_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
    860 {
    861 	switch (type) {
    862 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
    863 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
    864 		break;
    865 	default:
    866 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
    867 	}
    868 
    869 	bzero(cip, sizeof (*cip));
    870 	cip->ci_parent = np;
    871 	cip->ci_base_nl.rl_type = type;
    872 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
    873 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
    874 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_main_id;
    875 
    876 	return (REP_PROTOCOL_SUCCESS);
    877 }
    878 
    879 static int
    880 snaplevel_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
    881 {
    882 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
    883 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
    884 
    885 	bzero(cip, sizeof (*cip));
    886 	cip->ci_parent = np;
    887 	cip->ci_base_nl.rl_type = type;
    888 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
    889 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
    890 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
    891 	cip->ci_base_nl.rl_ids[ID_NAME] = np->rn_id.rl_ids[ID_NAME];
    892 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = np->rn_id.rl_ids[ID_SNAPSHOT];
    893 	cip->ci_base_nl.rl_ids[ID_LEVEL] = np->rn_id.rl_main_id;
    894 
    895 	return (REP_PROTOCOL_SUCCESS);
    896 }
    897 
    898 static int
    899 propertygrp_setup_child_info(rc_node_t *pg, uint32_t type, child_info_t *cip)
    900 {
    901 	if (type != REP_PROTOCOL_ENTITY_PROPERTY)
    902 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
    903 
    904 	bzero(cip, sizeof (*cip));
    905 	cip->ci_parent = pg;
    906 	cip->ci_base_nl.rl_type = type;
    907 	cip->ci_base_nl.rl_backend = pg->rn_id.rl_backend;
    908 	cip->ci_base_nl.rl_ids[ID_SERVICE] = pg->rn_id.rl_ids[ID_SERVICE];
    909 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = pg->rn_id.rl_ids[ID_INSTANCE];
    910 	cip->ci_base_nl.rl_ids[ID_PG] = pg->rn_id.rl_main_id;
    911 	cip->ci_base_nl.rl_ids[ID_GEN] = pg->rn_gen_id;
    912 	cip->ci_base_nl.rl_ids[ID_NAME] = pg->rn_id.rl_ids[ID_NAME];
    913 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = pg->rn_id.rl_ids[ID_SNAPSHOT];
    914 	cip->ci_base_nl.rl_ids[ID_LEVEL] = pg->rn_id.rl_ids[ID_LEVEL];
    915 
    916 	return (REP_PROTOCOL_SUCCESS);
    917 }
    918 
    919 /*
    920  * The *_fill_children() functions populate the children of the given rc_node_t
    921  * by querying the database and calling rc_node_setup_*() functions (usually
    922  * via a fill_*_callback()).
    923  *
    924  * They fail with
    925  *   _NO_RESOURCES
    926  */
    927 
    928 /*
    929  * Returns
    930  *   _NO_RESOURCES
    931  *   _SUCCESS
    932  */
    933 static int
    934 scope_fill_children(rc_node_t *np)
    935 {
    936 	backend_query_t *q;
    937 	child_info_t ci;
    938 	int res;
    939 
    940 	(void) scope_setup_child_info(np, REP_PROTOCOL_ENTITY_SERVICE, &ci);
    941 
    942 	q = backend_query_alloc();
    943 	backend_query_append(q, "SELECT svc_name, svc_id FROM service_tbl");
    944 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
    945 	backend_query_free(q);
    946 
    947 	if (res == REP_PROTOCOL_DONE)
    948 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
    949 	return (res);
    950 }
    951 
    952 /*
    953  * Returns
    954  *   _NO_RESOURCES
    955  *   _SUCCESS
    956  */
    957 static int
    958 service_fill_children(rc_node_t *np)
    959 {
    960 	backend_query_t *q;
    961 	child_info_t ci;
    962 	int res;
    963 
    964 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
    965 
    966 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_INSTANCE, &ci);
    967 
    968 	q = backend_query_alloc();
    969 	backend_query_add(q,
    970 	    "SELECT instance_name, instance_id FROM instance_tbl"
    971 	    "    WHERE (instance_svc = %d)",
    972 	    np->rn_id.rl_main_id);
    973 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
    974 	backend_query_free(q);
    975 
    976 	if (res == REP_PROTOCOL_DONE)
    977 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
    978 	if (res != REP_PROTOCOL_SUCCESS)
    979 		return (res);
    980 
    981 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
    982 	    &ci);
    983 
    984 	q = backend_query_alloc();
    985 	backend_query_add(q,
    986 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
    987 	    "    WHERE (pg_parent_id = %d)",
    988 	    np->rn_id.rl_main_id);
    989 
    990 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
    991 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
    992 	if (res == REP_PROTOCOL_SUCCESS) {
    993 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
    994 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
    995 		    fill_pg_callback, &ci);
    996 		/* nonpersistant database may not exist */
    997 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
    998 			res = REP_PROTOCOL_SUCCESS;
    999 	}
   1000 	if (res == REP_PROTOCOL_DONE)
   1001 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1002 	backend_query_free(q);
   1003 
   1004 	return (res);
   1005 }
   1006 
   1007 /*
   1008  * Returns
   1009  *   _NO_RESOURCES
   1010  *   _SUCCESS
   1011  */
   1012 static int
   1013 instance_fill_children(rc_node_t *np)
   1014 {
   1015 	backend_query_t *q;
   1016 	child_info_t ci;
   1017 	int res;
   1018 
   1019 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
   1020 
   1021 	/* Get child property groups */
   1022 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
   1023 	    &ci);
   1024 
   1025 	q = backend_query_alloc();
   1026 	backend_query_add(q,
   1027 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
   1028 	    "    WHERE (pg_parent_id = %d)",
   1029 	    np->rn_id.rl_main_id);
   1030 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
   1031 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
   1032 	if (res == REP_PROTOCOL_SUCCESS) {
   1033 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
   1034 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
   1035 		    fill_pg_callback, &ci);
   1036 		/* nonpersistant database may not exist */
   1037 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
   1038 			res = REP_PROTOCOL_SUCCESS;
   1039 	}
   1040 	if (res == REP_PROTOCOL_DONE)
   1041 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1042 	backend_query_free(q);
   1043 
   1044 	if (res != REP_PROTOCOL_SUCCESS)
   1045 		return (res);
   1046 
   1047 	/* Get child snapshots */
   1048 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_SNAPSHOT,
   1049 	    &ci);
   1050 
   1051 	q = backend_query_alloc();
   1052 	backend_query_add(q,
   1053 	    "SELECT lnk_snap_name, lnk_id, lnk_snap_id FROM snapshot_lnk_tbl"
   1054 	    "    WHERE (lnk_inst_id = %d)",
   1055 	    np->rn_id.rl_main_id);
   1056 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_callback, &ci);
   1057 	if (res == REP_PROTOCOL_DONE)
   1058 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1059 	backend_query_free(q);
   1060 
   1061 	return (res);
   1062 }
   1063 
   1064 /*
   1065  * Returns
   1066  *   _NO_RESOURCES
   1067  *   _SUCCESS
   1068  */
   1069 static int
   1070 snapshot_fill_children(rc_node_t *np)
   1071 {
   1072 	rc_node_t *nnp;
   1073 	rc_snapshot_t *sp, *oldsp;
   1074 	rc_snaplevel_t *lvl;
   1075 	rc_node_lookup_t nl;
   1076 	int r;
   1077 
   1078 	/* Get the rc_snapshot_t (& its rc_snaplevel_t's). */
   1079 	(void) pthread_mutex_lock(&np->rn_lock);
   1080 	sp = np->rn_snapshot;
   1081 	(void) pthread_mutex_unlock(&np->rn_lock);
   1082 	if (sp == NULL) {
   1083 		r = rc_snapshot_get(np->rn_snapshot_id, &sp);
   1084 		if (r != REP_PROTOCOL_SUCCESS) {
   1085 			assert(r == REP_PROTOCOL_FAIL_NO_RESOURCES);
   1086 			return (r);
   1087 		}
   1088 		(void) pthread_mutex_lock(&np->rn_lock);
   1089 		oldsp = np->rn_snapshot;
   1090 		assert(oldsp == NULL || oldsp == sp);
   1091 		np->rn_snapshot = sp;
   1092 		(void) pthread_mutex_unlock(&np->rn_lock);
   1093 		if (oldsp != NULL)
   1094 			rc_snapshot_rele(oldsp);
   1095 	}
   1096 
   1097 	bzero(&nl, sizeof (nl));
   1098 	nl.rl_type = REP_PROTOCOL_ENTITY_SNAPLEVEL;
   1099 	nl.rl_backend = np->rn_id.rl_backend;
   1100 	nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
   1101 	nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
   1102 	nl.rl_ids[ID_NAME] = np->rn_id.rl_main_id;
   1103 	nl.rl_ids[ID_SNAPSHOT] = np->rn_snapshot_id;
   1104 
   1105 	/* Create rc_node_t's for the snapshot's rc_snaplevel_t's. */
   1106 	for (lvl = sp->rs_levels; lvl != NULL; lvl = lvl->rsl_next) {
   1107 		nnp = rc_node_alloc();
   1108 		assert(nnp != NULL);
   1109 		nl.rl_main_id = lvl->rsl_level_id;
   1110 		nnp = rc_node_setup_snaplevel(nnp, &nl, lvl, np);
   1111 		rc_node_rele(nnp);
   1112 	}
   1113 
   1114 	return (REP_PROTOCOL_SUCCESS);
   1115 }
   1116 
   1117 /*
   1118  * Returns
   1119  *   _NO_RESOURCES
   1120  *   _SUCCESS
   1121  */
   1122 static int
   1123 snaplevel_fill_children(rc_node_t *np)
   1124 {
   1125 	rc_snaplevel_t *lvl = np->rn_snaplevel;
   1126 	child_info_t ci;
   1127 	int res;
   1128 	backend_query_t *q;
   1129 
   1130 	(void) snaplevel_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
   1131 	    &ci);
   1132 
   1133 	q = backend_query_alloc();
   1134 	backend_query_add(q,
   1135 	    "SELECT snaplvl_pg_name, snaplvl_pg_id, snaplvl_gen_id, "
   1136 	    "    snaplvl_pg_type, snaplvl_pg_flags "
   1137 	    "    FROM snaplevel_lnk_tbl "
   1138 	    "    WHERE (snaplvl_level_id = %d)",
   1139 	    lvl->rsl_level_id);
   1140 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
   1141 	if (res == REP_PROTOCOL_DONE)
   1142 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1143 	backend_query_free(q);
   1144 
   1145 	return (res);
   1146 }
   1147 
   1148 /*
   1149  * Returns
   1150  *   _NO_RESOURCES
   1151  *   _SUCCESS
   1152  */
   1153 static int
   1154 propertygrp_fill_children(rc_node_t *np)
   1155 {
   1156 	backend_query_t *q;
   1157 	child_info_t ci;
   1158 	int res;
   1159 	backend_tx_t *tx;
   1160 
   1161 	backend_type_t backend = np->rn_id.rl_backend;
   1162 
   1163 	(void) propertygrp_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTY,
   1164 	    &ci);
   1165 
   1166 	res = backend_tx_begin_ro(backend, &tx);
   1167 	if (res != REP_PROTOCOL_SUCCESS) {
   1168 		/*
   1169 		 * If the backend didn't exist, we wouldn't have got this
   1170 		 * property group.
   1171 		 */
   1172 		assert(res != REP_PROTOCOL_FAIL_BACKEND_ACCESS);
   1173 		return (res);
   1174 	}
   1175 
   1176 	ci.ci_tx = tx;
   1177 
   1178 	q = backend_query_alloc();
   1179 	backend_query_add(q,
   1180 	    "SELECT lnk_prop_name, lnk_prop_id, lnk_prop_type, lnk_val_id "
   1181 	    "FROM prop_lnk_tbl "
   1182 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
   1183 	    np->rn_id.rl_main_id, np->rn_gen_id);
   1184 	res = backend_tx_run(tx, q, fill_property_callback, &ci);
   1185 	if (res == REP_PROTOCOL_DONE)
   1186 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1187 	backend_query_free(q);
   1188 	backend_tx_end_ro(tx);
   1189 
   1190 	return (res);
   1191 }
   1192 
   1193 /*
   1194  * Fails with
   1195  *   _TYPE_MISMATCH - lp is not for a service
   1196  *   _INVALID_TYPE - lp has invalid type
   1197  *   _BAD_REQUEST - name is invalid
   1198  */
   1199 static int
   1200 scope_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
   1201 {
   1202 	uint32_t type = lp->rl_type;
   1203 	int rc;
   1204 
   1205 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
   1206 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
   1207 
   1208 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
   1209 		return (rc);
   1210 
   1211 	backend_query_add(q,
   1212 	    "SELECT svc_id FROM service_tbl "
   1213 	    "WHERE svc_name = '%q'",
   1214 	    name);
   1215 
   1216 	return (REP_PROTOCOL_SUCCESS);
   1217 }
   1218 
   1219 /*
   1220  * Fails with
   1221  *   _NO_RESOURCES - out of memory
   1222  */
   1223 static int
   1224 scope_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
   1225 {
   1226 	return (backend_tx_run_update(tx,
   1227 	    "INSERT INTO service_tbl (svc_id, svc_name) "
   1228 	    "VALUES (%d, '%q')",
   1229 	    lp->rl_main_id, name));
   1230 }
   1231 
   1232 /*
   1233  * Fails with
   1234  *   _TYPE_MISMATCH - lp is not for an instance or property group
   1235  *   _INVALID_TYPE - lp has invalid type
   1236  *   _BAD_REQUEST - name is invalid
   1237  */
   1238 static int
   1239 service_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
   1240 {
   1241 	uint32_t type = lp->rl_type;
   1242 	int rc;
   1243 
   1244 	if (type != REP_PROTOCOL_ENTITY_INSTANCE &&
   1245 	    type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
   1246 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
   1247 
   1248 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
   1249 		return (rc);
   1250 
   1251 	switch (type) {
   1252 	case REP_PROTOCOL_ENTITY_INSTANCE:
   1253 		backend_query_add(q,
   1254 		    "SELECT instance_id FROM instance_tbl "
   1255 		    "WHERE instance_name = '%q' AND instance_svc = %d",
   1256 		    name, lp->rl_ids[ID_SERVICE]);
   1257 		break;
   1258 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
   1259 		backend_query_add(q,
   1260 		    "SELECT pg_id FROM pg_tbl "
   1261 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
   1262 		    name, lp->rl_ids[ID_SERVICE]);
   1263 		break;
   1264 	default:
   1265 		assert(0);
   1266 		abort();
   1267 	}
   1268 
   1269 	return (REP_PROTOCOL_SUCCESS);
   1270 }
   1271 
   1272 /*
   1273  * Fails with
   1274  *   _NO_RESOURCES - out of memory
   1275  */
   1276 static int
   1277 service_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
   1278 {
   1279 	return (backend_tx_run_update(tx,
   1280 	    "INSERT INTO instance_tbl "
   1281 	    "    (instance_id, instance_name, instance_svc) "
   1282 	    "VALUES (%d, '%q', %d)",
   1283 	    lp->rl_main_id, name, lp->rl_ids[ID_SERVICE]));
   1284 }
   1285 
   1286 /*
   1287  * Fails with
   1288  *   _NO_RESOURCES - out of memory
   1289  */
   1290 static int
   1291 instance_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
   1292 {
   1293 	return (backend_tx_run_update(tx,
   1294 	    "INSERT INTO snapshot_lnk_tbl "
   1295 	    "    (lnk_id, lnk_inst_id, lnk_snap_name, lnk_snap_id) "
   1296 	    "VALUES (%d, %d, '%q', 0)",
   1297 	    lp->rl_main_id, lp->rl_ids[ID_INSTANCE], name));
   1298 }
   1299 
   1300 /*
   1301  * Fails with
   1302  *   _TYPE_MISMATCH - lp is not for a property group or snapshot
   1303  *   _INVALID_TYPE - lp has invalid type
   1304  *   _BAD_REQUEST - name is invalid
   1305  */
   1306 static int
   1307 instance_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
   1308 {
   1309 	uint32_t type = lp->rl_type;
   1310 	int rc;
   1311 
   1312 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP &&
   1313 	    type != REP_PROTOCOL_ENTITY_SNAPSHOT)
   1314 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
   1315 
   1316 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
   1317 		return (rc);
   1318 
   1319 	switch (type) {
   1320 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
   1321 		backend_query_add(q,
   1322 		    "SELECT pg_id FROM pg_tbl "
   1323 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
   1324 		    name, lp->rl_ids[ID_INSTANCE]);
   1325 		break;
   1326 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
   1327 		backend_query_add(q,
   1328 		    "SELECT lnk_id FROM snapshot_lnk_tbl "
   1329 		    "    WHERE lnk_snap_name = '%q' AND lnk_inst_id = %d",
   1330 		    name, lp->rl_ids[ID_INSTANCE]);
   1331 		break;
   1332 	default:
   1333 		assert(0);
   1334 		abort();
   1335 	}
   1336 
   1337 	return (REP_PROTOCOL_SUCCESS);
   1338 }
   1339 
   1340 static int
   1341 generic_insert_pg_child(backend_tx_t *tx, rc_node_lookup_t *lp,
   1342     const char *name, const char *pgtype, uint32_t flags, uint32_t gen)
   1343 {
   1344 	int parent_id = (lp->rl_ids[ID_INSTANCE] != 0)?
   1345 	    lp->rl_ids[ID_INSTANCE] : lp->rl_ids[ID_SERVICE];
   1346 	return (backend_tx_run_update(tx,
   1347 	    "INSERT INTO pg_tbl "
   1348 	    "    (pg_id, pg_name, pg_parent_id, pg_type, pg_flags, pg_gen_id) "
   1349 	    "VALUES (%d, '%q', %d, '%q', %d, %d)",
   1350 	    lp->rl_main_id, name, parent_id, pgtype, flags, gen));
   1351 }
   1352 
   1353 static int
   1354 service_delete_start(rc_node_t *np, delete_info_t *dip)
   1355 {
   1356 	int r;
   1357 	backend_query_t *q = backend_query_alloc();
   1358 
   1359 	/*
   1360 	 * Check for child instances, and refuse to delete if they exist.
   1361 	 */
   1362 	backend_query_add(q,
   1363 	    "SELECT 1 FROM instance_tbl WHERE instance_svc = %d",
   1364 	    np->rn_id.rl_main_id);
   1365 
   1366 	r = backend_tx_run(dip->di_tx, q, backend_fail_if_seen, NULL);
   1367 	backend_query_free(q);
   1368 
   1369 	if (r == REP_PROTOCOL_DONE)
   1370 		return (REP_PROTOCOL_FAIL_EXISTS);	/* instances exist */
   1371 
   1372 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &service_delete,
   1373 	    np->rn_id.rl_main_id, 0));
   1374 }
   1375 
   1376 static int
   1377 instance_delete_start(rc_node_t *np, delete_info_t *dip)
   1378 {
   1379 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &instance_delete,
   1380 	    np->rn_id.rl_main_id, 0));
   1381 }
   1382 
   1383 static int
   1384 snapshot_delete_start(rc_node_t *np, delete_info_t *dip)
   1385 {
   1386 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL,
   1387 	    &snapshot_lnk_delete, np->rn_id.rl_main_id, 0));
   1388 }
   1389 
   1390 static int
   1391 propertygrp_delete_start(rc_node_t *np, delete_info_t *dip)
   1392 {
   1393 	return (delete_stack_push(dip, np->rn_id.rl_backend,
   1394 	    &propertygrp_delete, np->rn_id.rl_main_id, 0));
   1395 }
   1396 
   1397 static object_info_t info[] = {
   1398 	{REP_PROTOCOL_ENTITY_NONE},
   1399 	{REP_PROTOCOL_ENTITY_SCOPE,
   1400 		BACKEND_ID_INVALID,
   1401 		scope_fill_children,
   1402 		scope_setup_child_info,
   1403 		scope_query_child,
   1404 		scope_insert_child,
   1405 		NULL,
   1406 		NULL,
   1407 	},
   1408 	{REP_PROTOCOL_ENTITY_SERVICE,
   1409 		BACKEND_ID_SERVICE_INSTANCE,
   1410 		service_fill_children,
   1411 		service_setup_child_info,
   1412 		service_query_child,
   1413 		service_insert_child,
   1414 		generic_insert_pg_child,
   1415 		service_delete_start,
   1416 	},
   1417 	{REP_PROTOCOL_ENTITY_INSTANCE,
   1418 		BACKEND_ID_SERVICE_INSTANCE,
   1419 		instance_fill_children,
   1420 		instance_setup_child_info,
   1421 		instance_query_child,
   1422 		instance_insert_child,
   1423 		generic_insert_pg_child,
   1424 		instance_delete_start,
   1425 	},
   1426 	{REP_PROTOCOL_ENTITY_SNAPSHOT,
   1427 		BACKEND_ID_SNAPNAME,
   1428 		snapshot_fill_children,
   1429 		NULL,
   1430 		NULL,
   1431 		NULL,
   1432 		NULL,
   1433 		snapshot_delete_start,
   1434 	},
   1435 	{REP_PROTOCOL_ENTITY_SNAPLEVEL,
   1436 		BACKEND_ID_SNAPLEVEL,
   1437 		snaplevel_fill_children,
   1438 		snaplevel_setup_child_info,
   1439 	},
   1440 	{REP_PROTOCOL_ENTITY_PROPERTYGRP,
   1441 		BACKEND_ID_PROPERTYGRP,
   1442 		propertygrp_fill_children,
   1443 		NULL,
   1444 		NULL,
   1445 		NULL,
   1446 		NULL,
   1447 		propertygrp_delete_start,
   1448 	},
   1449 	{REP_PROTOCOL_ENTITY_PROPERTY},
   1450 	{-1UL}
   1451 };
   1452 #define	NUM_INFO (sizeof (info) / sizeof (*info))
   1453 
   1454 /*
   1455  * object_fill_children() populates the child list of an rc_node_t by calling
   1456  * the appropriate <type>_fill_children() which runs backend queries that
   1457  * call an appropriate fill_*_callback() which takes a row of results,
   1458  * decodes them, and calls an rc_node_setup*() function in rc_node.c to create
   1459  * a child.
   1460  *
   1461  * Fails with
   1462  *   _NO_RESOURCES
   1463  */
   1464 int
   1465 object_fill_children(rc_node_t *pp)
   1466 {
   1467 	uint32_t type = pp->rn_id.rl_type;
   1468 	assert(type > 0 && type < NUM_INFO);
   1469 
   1470 	return ((*info[type].obj_fill_children)(pp));
   1471 }
   1472 
   1473 int
   1474 object_delete(rc_node_t *pp)
   1475 {
   1476 	int rc;
   1477 
   1478 	delete_info_t dip;
   1479 	delete_ent_t de;
   1480 
   1481 	uint32_t type = pp->rn_id.rl_type;
   1482 	assert(type > 0 && type < NUM_INFO);
   1483 
   1484 	if (info[type].obj_delete_start == NULL)
   1485 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
   1486 
   1487 	(void) memset(&dip, '\0', sizeof (dip));
   1488 	rc = backend_tx_begin(BACKEND_TYPE_NORMAL, &dip.di_tx);
   1489 	if (rc != REP_PROTOCOL_SUCCESS)
   1490 		return (rc);
   1491 
   1492 	rc = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &dip.di_np_tx);
   1493 	if (rc == REP_PROTOCOL_FAIL_BACKEND_ACCESS ||
   1494 	    rc == REP_PROTOCOL_FAIL_BACKEND_READONLY)
   1495 		dip.di_np_tx = NULL;
   1496 	else if (rc != REP_PROTOCOL_SUCCESS) {
   1497 		backend_tx_rollback(dip.di_tx);
   1498 		return (rc);
   1499 	}
   1500 
   1501 	if ((rc = (*info[type].obj_delete_start)(pp, &dip)) !=
   1502 	    REP_PROTOCOL_SUCCESS) {
   1503 		goto fail;
   1504 	}
   1505 
   1506 	while (delete_stack_pop(&dip, &de)) {
   1507 		rc = (*de.de_cb)(&dip, &de);
   1508 		if (rc != REP_PROTOCOL_SUCCESS)
   1509 			goto fail;
   1510 	}
   1511 
   1512 	rc = backend_tx_commit(dip.di_tx);
   1513 	if (rc != REP_PROTOCOL_SUCCESS)
   1514 		backend_tx_rollback(dip.di_np_tx);
   1515 	else if (dip.di_np_tx)
   1516 		(void) backend_tx_commit(dip.di_np_tx);
   1517 
   1518 	delete_stack_cleanup(&dip);
   1519 
   1520 	return (rc);
   1521 
   1522 fail:
   1523 	backend_tx_rollback(dip.di_tx);
   1524 	backend_tx_rollback(dip.di_np_tx);
   1525 	delete_stack_cleanup(&dip);
   1526 	return (rc);
   1527 }
   1528 
   1529 int
   1530 object_do_create(backend_tx_t *tx, child_info_t *cip, rc_node_t *pp,
   1531     uint32_t type, const char *name, rc_node_t **cpp)
   1532 {
   1533 	uint32_t ptype = pp->rn_id.rl_type;
   1534 
   1535 	backend_query_t *q;
   1536 	uint32_t id;
   1537 	rc_node_t *np = NULL;
   1538 	int rc;
   1539 	object_info_t *ip;
   1540 
   1541 	rc_node_lookup_t *lp = &cip->ci_base_nl;
   1542 
   1543 	assert(ptype > 0 && ptype < NUM_INFO);
   1544 
   1545 	ip = &info[ptype];
   1546 
   1547 	if (type == REP_PROTOCOL_ENTITY_PROPERTYGRP)
   1548 		return (REP_PROTOCOL_FAIL_NOT_APPLICABLE);
   1549 
   1550 	if (ip->obj_setup_child_info == NULL ||
   1551 	    ip->obj_query_child == NULL ||
   1552 	    ip->obj_insert_child == NULL)
   1553 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
   1554 
   1555 	if ((rc = (*ip->obj_setup_child_info)(pp, type, cip)) !=
   1556 	    REP_PROTOCOL_SUCCESS)
   1557 		return (rc);
   1558 
   1559 	q = backend_query_alloc();
   1560 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
   1561 	    REP_PROTOCOL_SUCCESS) {
   1562 		assert(rc == REP_PROTOCOL_FAIL_BAD_REQUEST);
   1563 		backend_query_free(q);
   1564 		return (rc);
   1565 	}
   1566 
   1567 	rc = backend_tx_run_single_int(tx, q, &id);
   1568 	backend_query_free(q);
   1569 
   1570 	if (rc == REP_PROTOCOL_SUCCESS)
   1571 		return (REP_PROTOCOL_FAIL_EXISTS);
   1572 	else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND)
   1573 		return (rc);
   1574 
   1575 	if ((lp->rl_main_id = backend_new_id(tx,
   1576 	    info[type].obj_id_space)) == 0) {
   1577 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
   1578 	}
   1579 
   1580 	if ((np = rc_node_alloc()) == NULL)
   1581 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
   1582 
   1583 	if ((rc = (*ip->obj_insert_child)(tx, lp, name)) !=
   1584 	    REP_PROTOCOL_SUCCESS) {
   1585 		rc_node_destroy(np);
   1586 		return (rc);
   1587 	}
   1588 
   1589 	*cpp = np;
   1590 	return (REP_PROTOCOL_SUCCESS);
   1591 }
   1592 
   1593 /*
   1594  * Fails with
   1595  *   _NOT_APPLICABLE - type is _PROPERTYGRP
   1596  *   _BAD_REQUEST - cannot create children for this type of node
   1597  *		    name is invalid
   1598  *   _TYPE_MISMATCH - object cannot have children of type type
   1599  *   _NO_RESOURCES - out of memory, or could not allocate new id
   1600  *   _BACKEND_READONLY
   1601  *   _BACKEND_ACCESS
   1602  *   _EXISTS - child already exists
   1603  */
   1604 int
   1605 object_create(rc_node_t *pp, uint32_t type, const char *name, rc_node_t **cpp)
   1606 {
   1607 	backend_tx_t *tx;
   1608 	rc_node_t *np = NULL;
   1609 	child_info_t ci;
   1610 	int rc;
   1611 
   1612 	if ((rc = backend_tx_begin(pp->rn_id.rl_backend, &tx)) !=
   1613 	    REP_PROTOCOL_SUCCESS) {
   1614 		return (rc);
   1615 	}
   1616 
   1617 	if ((rc = object_do_create(tx, &ci, pp, type, name, &np)) !=
   1618 	    REP_PROTOCOL_SUCCESS) {
   1619 		backend_tx_rollback(tx);
   1620 		return (rc);
   1621 	}
   1622 
   1623 	rc = backend_tx_commit(tx);
   1624 	if (rc != REP_PROTOCOL_SUCCESS) {
   1625 		rc_node_destroy(np);
   1626 		return (rc);
   1627 	}
   1628 
   1629 	*cpp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
   1630 
   1631 	return (REP_PROTOCOL_SUCCESS);
   1632 }
   1633 
   1634 /*ARGSUSED*/
   1635 int
   1636 object_create_pg(rc_node_t *pp, uint32_t type, const char *name,
   1637     const char *pgtype, uint32_t flags, rc_node_t **cpp)
   1638 {
   1639 	uint32_t ptype = pp->rn_id.rl_type;
   1640 	backend_tx_t *tx_ro, *tx_wr;
   1641 	backend_query_t *q;
   1642 	uint32_t id;
   1643 	uint32_t gen = 0;
   1644 	rc_node_t *np = NULL;
   1645 	int rc;
   1646 	int rc_wr;
   1647 	int rc_ro;
   1648 	object_info_t *ip;
   1649 
   1650 	int nonpersist = (flags & SCF_PG_FLAG_NONPERSISTENT);
   1651 
   1652 	child_info_t ci;
   1653 	rc_node_lookup_t *lp = &ci.ci_base_nl;
   1654 
   1655 	assert(ptype > 0 && ptype < NUM_INFO);
   1656 
   1657 	if (ptype != REP_PROTOCOL_ENTITY_SERVICE &&
   1658 	    ptype != REP_PROTOCOL_ENTITY_INSTANCE)
   1659 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
   1660 
   1661 	ip = &info[ptype];
   1662 
   1663 	assert(ip->obj_setup_child_info != NULL &&
   1664 	    ip->obj_query_child != NULL &&
   1665 	    ip->obj_insert_pg_child != NULL);
   1666 
   1667 	if ((rc = (*ip->obj_setup_child_info)(pp, type, &ci)) !=
   1668 	    REP_PROTOCOL_SUCCESS)
   1669 		return (rc);
   1670 
   1671 	q = backend_query_alloc();
   1672 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
   1673 	    REP_PROTOCOL_SUCCESS) {
   1674 		backend_query_free(q);
   1675 		return (rc);
   1676 	}
   1677 
   1678 	if (!nonpersist) {
   1679 		lp->rl_backend = BACKEND_TYPE_NORMAL;
   1680 		rc_wr = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx_wr);
   1681 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NONPERSIST, &tx_ro);
   1682 	} else {
   1683 		lp->rl_backend = BACKEND_TYPE_NONPERSIST;
   1684 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NORMAL, &tx_ro);
   1685 		rc_wr = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &tx_wr);
   1686 	}
   1687 
   1688 	if (rc_wr != REP_PROTOCOL_SUCCESS) {
   1689 		rc = rc_wr;
   1690 		goto fail;
   1691 	}
   1692 	if (rc_ro != REP_PROTOCOL_SUCCESS &&
   1693 	    rc_ro != REP_PROTOCOL_FAIL_BACKEND_ACCESS) {
   1694 		rc = rc_ro;
   1695 		goto fail;
   1696 	}
   1697 
   1698 	if (tx_ro != NULL) {
   1699 		rc = backend_tx_run_single_int(tx_ro, q, &id);
   1700 
   1701 		if (rc == REP_PROTOCOL_SUCCESS) {
   1702 			backend_query_free(q);
   1703 			rc = REP_PROTOCOL_FAIL_EXISTS;
   1704 			goto fail;
   1705 		} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
   1706 			backend_query_free(q);
   1707 			goto fail;
   1708 		}
   1709 	}
   1710 
   1711 	rc = backend_tx_run_single_int(tx_wr, q, &id);
   1712 	backend_query_free(q);
   1713 
   1714 	if (rc == REP_PROTOCOL_SUCCESS) {
   1715 		rc = REP_PROTOCOL_FAIL_EXISTS;
   1716 		goto fail;
   1717 	} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
   1718 		goto fail;
   1719 	}
   1720 
   1721 	if (tx_ro != NULL)
   1722 		backend_tx_end_ro(tx_ro);
   1723 	tx_ro = NULL;
   1724 
   1725 	if ((lp->rl_main_id = backend_new_id(tx_wr,
   1726 	    info[type].obj_id_space)) == 0) {
   1727 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1728 		goto fail;
   1729 	}
   1730 
   1731 	if ((np = rc_node_alloc()) == NULL) {
   1732 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1733 		goto fail;
   1734 	}
   1735 
   1736 	if ((rc = (*ip->obj_insert_pg_child)(tx_wr, lp, name, pgtype, flags,
   1737 	    gen)) != REP_PROTOCOL_SUCCESS) {
   1738 		rc_node_destroy(np);
   1739 		goto fail;
   1740 	}
   1741 
   1742 	rc = backend_tx_commit(tx_wr);
   1743 	if (rc != REP_PROTOCOL_SUCCESS) {
   1744 		rc_node_destroy(np);
   1745 		return (rc);
   1746 	}
   1747 
   1748 	*cpp = rc_node_setup_pg(np, lp, name, pgtype, flags, gen, ci.ci_parent);
   1749 
   1750 	return (REP_PROTOCOL_SUCCESS);
   1751 
   1752 fail:
   1753 	if (tx_ro != NULL)
   1754 		backend_tx_end_ro(tx_ro);
   1755 	if (tx_wr != NULL)
   1756 		backend_tx_rollback(tx_wr);
   1757 	return (rc);
   1758 }
   1759 
   1760 /*
   1761  * Given a row of snaplevel number, snaplevel id, service id, service name,
   1762  * instance id, & instance name, create a rc_snaplevel_t & prepend it onto the
   1763  * rs_levels list of the rc_snapshot_t passed in as data.
   1764  * Returns _CONTINUE on success or _ABORT if any allocations fail.
   1765  */
   1766 /*ARGSUSED*/
   1767 static int
   1768 fill_snapshot_cb(void *data, int columns, char **vals, char **names)
   1769 {
   1770 	rc_snapshot_t *sp = data;
   1771 	rc_snaplevel_t *lvl;
   1772 	char *num = vals[0];
   1773 	char *id = vals[1];
   1774 	char *service_id = vals[2];
   1775 	char *service = vals[3];
   1776 	char *instance_id = vals[4];
   1777 	char *instance = vals[5];
   1778 	assert(columns == 6);
   1779 
   1780 	lvl = uu_zalloc(sizeof (*lvl));
   1781 	if (lvl == NULL)
   1782 		return (BACKEND_CALLBACK_ABORT);
   1783 	lvl->rsl_parent = sp;
   1784 	lvl->rsl_next = sp->rs_levels;
   1785 	sp->rs_levels = lvl;
   1786 
   1787 	string_to_id(num, &lvl->rsl_level_num, "snap_level_num");
   1788 	string_to_id(id, &lvl->rsl_level_id, "snap_level_id");
   1789 	string_to_id(service_id, &lvl->rsl_service_id, "snap_level_service_id");
   1790 	if (instance_id != NULL)
   1791 		string_to_id(instance_id, &lvl->rsl_instance_id,
   1792 		    "snap_level_instance_id");
   1793 
   1794 	lvl->rsl_scope = (const char *)"localhost";
   1795 	lvl->rsl_service = strdup(service);
   1796 	if (lvl->rsl_service == NULL) {
   1797 		uu_free(lvl);
   1798 		return (BACKEND_CALLBACK_ABORT);
   1799 	}
   1800 	if (instance) {
   1801 		assert(lvl->rsl_instance_id != 0);
   1802 		lvl->rsl_instance = strdup(instance);
   1803 		if (lvl->rsl_instance == NULL) {
   1804 			free((void *)lvl->rsl_instance);
   1805 			uu_free(lvl);
   1806 			return (BACKEND_CALLBACK_ABORT);
   1807 		}
   1808 	} else {
   1809 		assert(lvl->rsl_instance_id == 0);
   1810 	}
   1811 
   1812 	return (BACKEND_CALLBACK_CONTINUE);
   1813 }
   1814 
   1815 /*
   1816  * Populate sp's rs_levels list from the snaplevel_tbl table.
   1817  * Fails with
   1818  *   _NO_RESOURCES
   1819  */
   1820 int
   1821 object_fill_snapshot(rc_snapshot_t *sp)
   1822 {
   1823 	backend_query_t *q;
   1824 	rc_snaplevel_t *sl;
   1825 	int result;
   1826 	int i;
   1827 
   1828 	q = backend_query_alloc();
   1829 	backend_query_add(q,
   1830 	    "SELECT snap_level_num, snap_level_id, "
   1831 	    "    snap_level_service_id, snap_level_service, "
   1832 	    "    snap_level_instance_id, snap_level_instance "
   1833 	    "FROM snaplevel_tbl "
   1834 	    "WHERE snap_id = %d "
   1835 	    "ORDER BY snap_level_id DESC",
   1836 	    sp->rs_snap_id);
   1837 
   1838 	result = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_cb, sp);
   1839 	if (result == REP_PROTOCOL_DONE)
   1840 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
   1841 	backend_query_free(q);
   1842 
   1843 	if (result == REP_PROTOCOL_SUCCESS) {
   1844 		i = 0;
   1845 		for (sl = sp->rs_levels; sl != NULL; sl = sl->rsl_next) {
   1846 			if (sl->rsl_level_num != ++i) {
   1847 				backend_panic("snaplevels corrupt; expected "
   1848 				    "level %d, got %d", i, sl->rsl_level_num);
   1849 			}
   1850 		}
   1851 	}
   1852 	return (result);
   1853 }
   1854 
   1855 /*
   1856  * This represents a property group in a snapshot.
   1857  */
   1858 typedef struct check_snapshot_elem {
   1859 	uint32_t cse_parent;
   1860 	uint32_t cse_pg_id;
   1861 	uint32_t cse_pg_gen;
   1862 	char	cse_seen;
   1863 } check_snapshot_elem_t;
   1864 
   1865 #define	CSI_MAX_PARENTS		COMPOSITION_DEPTH
   1866 typedef struct check_snapshot_info {
   1867 	size_t			csi_count;
   1868 	size_t			csi_array_size;
   1869 	check_snapshot_elem_t	*csi_array;
   1870 	size_t			csi_nparents;
   1871 	uint32_t		csi_parent_ids[CSI_MAX_PARENTS];
   1872 } check_snapshot_info_t;
   1873 
   1874 /*ARGSUSED*/
   1875 static int
   1876 check_snapshot_fill_cb(void *data, int columns, char **vals, char **names)
   1877 {
   1878 	check_snapshot_info_t *csip = data;
   1879 	check_snapshot_elem_t *cur;
   1880 	const char *parent;
   1881 	const char *pg_id;
   1882 	const char *pg_gen_id;
   1883 
   1884 	if (columns == 1) {
   1885 		uint32_t *target;
   1886 
   1887 		if (csip->csi_nparents >= CSI_MAX_PARENTS)
   1888 			backend_panic("snaplevel table has too many elements");
   1889 
   1890 		target = &csip->csi_parent_ids[csip->csi_nparents++];
   1891 		string_to_id(vals[0], target, "snap_level_*_id");
   1892 
   1893 		return (BACKEND_CALLBACK_CONTINUE);
   1894 	}
   1895 
   1896 	assert(columns == 3);
   1897 
   1898 	parent = vals[0];
   1899 	pg_id = vals[1];
   1900 	pg_gen_id = vals[2];
   1901 
   1902 	if (csip->csi_count == csip->csi_array_size) {
   1903 		size_t newsz = (csip->csi_array_size > 0) ?
   1904 		    csip->csi_array_size * 2 : 8;
   1905 		check_snapshot_elem_t *new = uu_zalloc(newsz * sizeof (*new));
   1906 
   1907 		if (new == NULL)
   1908 			return (BACKEND_CALLBACK_ABORT);
   1909 
   1910 		(void) memcpy(new, csip->csi_array,
   1911 		    sizeof (*new) * csip->csi_array_size);
   1912 		uu_free(csip->csi_array);
   1913 		csip->csi_array = new;
   1914 		csip->csi_array_size = newsz;
   1915 	}
   1916 
   1917 	cur = &csip->csi_array[csip->csi_count++];
   1918 
   1919 	string_to_id(parent, &cur->cse_parent, "snap_level_*_id");
   1920 	string_to_id(pg_id, &cur->cse_pg_id, "snaplvl_pg_id");
   1921 	string_to_id(pg_gen_id, &cur->cse_pg_gen, "snaplvl_gen_id");
   1922 	cur->cse_seen = 0;
   1923 
   1924 	return (BACKEND_CALLBACK_CONTINUE);
   1925 }
   1926 
   1927 static int
   1928 check_snapshot_elem_cmp(const void *lhs_arg, const void *rhs_arg)
   1929 {
   1930 	const check_snapshot_elem_t *lhs = lhs_arg;
   1931 	const check_snapshot_elem_t *rhs = rhs_arg;
   1932 
   1933 	if (lhs->cse_parent < rhs->cse_parent)
   1934 		return (-1);
   1935 	if (lhs->cse_parent > rhs->cse_parent)
   1936 		return (1);
   1937 
   1938 	if (lhs->cse_pg_id < rhs->cse_pg_id)
   1939 		return (-1);
   1940 	if (lhs->cse_pg_id > rhs->cse_pg_id)
   1941 		return (1);
   1942 
   1943 	if (lhs->cse_pg_gen < rhs->cse_pg_gen)
   1944 		return (-1);
   1945 	if (lhs->cse_pg_gen > rhs->cse_pg_gen)
   1946 		return (1);
   1947 
   1948 	return (0);
   1949 }
   1950 
   1951 /*ARGSUSED*/
   1952 static int
   1953 check_snapshot_check_cb(void *data, int columns, char **vals, char **names)
   1954 {
   1955 	check_snapshot_info_t *csip = data;
   1956 	check_snapshot_elem_t elem;
   1957 	check_snapshot_elem_t *cur;
   1958 
   1959 	const char *parent = vals[0];
   1960 	const char *pg_id = vals[1];
   1961 	const char *pg_gen_id = vals[2];
   1962 
   1963 	assert(columns == 3);
   1964 
   1965 	string_to_id(parent, &elem.cse_parent, "snap_level_*_id");
   1966 	string_to_id(pg_id, &elem.cse_pg_id, "snaplvl_pg_id");
   1967 	string_to_id(pg_gen_id, &elem.cse_pg_gen, "snaplvl_gen_id");
   1968 
   1969 	if ((cur = bsearch(&elem, csip->csi_array, csip->csi_count,
   1970 	    sizeof (*csip->csi_array), check_snapshot_elem_cmp)) == NULL)
   1971 		return (BACKEND_CALLBACK_ABORT);
   1972 
   1973 	if (cur->cse_seen)
   1974 		backend_panic("duplicate property group reported");
   1975 	cur->cse_seen = 1;
   1976 	return (BACKEND_CALLBACK_CONTINUE);
   1977 }
   1978 
   1979 /*
   1980  * Check that a snapshot matches up with the latest in the repository.
   1981  * Returns:
   1982  *	REP_PROTOCOL_SUCCESS		if it is up-to-date,
   1983  *	REP_PROTOCOL_DONE		if it is out-of-date, or
   1984  *	REP_PROTOCOL_FAIL_NO_RESOURCES	if we ran out of memory.
   1985  */
   1986 static int
   1987 object_check_snapshot(uint32_t snap_id)
   1988 {
   1989 	check_snapshot_info_t csi;
   1990 	backend_query_t *q;
   1991 	int result;
   1992 	size_t idx;
   1993 
   1994 	/* if the snapshot has never been taken, it must be out of date. */
   1995 	if (snap_id == 0)
   1996 		return (REP_PROTOCOL_DONE);
   1997 
   1998 	(void) memset(&csi, '\0', sizeof (csi));
   1999 
   2000 	q = backend_query_alloc();
   2001 	backend_query_add(q,
   2002 	    "SELECT\n"
   2003 	    "    CASE snap_level_instance_id\n"
   2004 	    "        WHEN 0 THEN snap_level_service_id\n"
   2005 	    "        ELSE snap_level_instance_id\n"
   2006 	    "    END\n"
   2007 	    "FROM snaplevel_tbl\n"
   2008 	    "WHERE snap_id = %d;\n"
   2009 	    "\n"
   2010 	    "SELECT\n"
   2011 	    "    CASE snap_level_instance_id\n"
   2012 	    "        WHEN 0 THEN snap_level_service_id\n"
   2013 	    "        ELSE snap_level_instance_id\n"
   2014 	    "    END,\n"
   2015 	    "    snaplvl_pg_id,\n"
   2016 	    "    snaplvl_gen_id\n"
   2017 	    "FROM snaplevel_tbl, snaplevel_lnk_tbl\n"
   2018 	    "WHERE\n"
   2019 	    "    (snaplvl_level_id = snap_level_id AND\n"
   2020 	    "    snap_id = %d);",
   2021 	    snap_id, snap_id);
   2022 
   2023 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_fill_cb,
   2024 	    &csi);
   2025 	if (result == REP_PROTOCOL_DONE)
   2026 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
   2027 	backend_query_free(q);
   2028 
   2029 	if (result != REP_PROTOCOL_SUCCESS)
   2030 		goto fail;
   2031 
   2032 	if (csi.csi_count > 0) {
   2033 		qsort(csi.csi_array, csi.csi_count, sizeof (*csi.csi_array),
   2034 		    check_snapshot_elem_cmp);
   2035 	}
   2036 
   2037 #if COMPOSITION_DEPTH == 2
   2038 	if (csi.csi_nparents != COMPOSITION_DEPTH) {
   2039 		result = REP_PROTOCOL_DONE;
   2040 		goto fail;
   2041 	}
   2042 
   2043 	q = backend_query_alloc();
   2044 	backend_query_add(q,
   2045 	    "SELECT "
   2046 	    "    pg_parent_id, pg_id, pg_gen_id "
   2047 	    "FROM "
   2048 	    "    pg_tbl "
   2049 	    "WHERE (pg_parent_id = %d OR pg_parent_id = %d)",
   2050 	    csi.csi_parent_ids[0], csi.csi_parent_ids[1]);
   2051 
   2052 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_check_cb,
   2053 	    &csi);
   2054 #else
   2055 #error This code must be updated
   2056 #endif
   2057 	/*
   2058 	 * To succeed, the callback must not have aborted, and we must have
   2059 	 * found all of the items.
   2060 	 */
   2061 	if (result == REP_PROTOCOL_SUCCESS) {
   2062 		for (idx = 0; idx < csi.csi_count; idx++) {
   2063 			if (csi.csi_array[idx].cse_seen == 0) {
   2064 				result = REP_PROTOCOL_DONE;
   2065 				goto fail;
   2066 			}
   2067 		}
   2068 	}
   2069 
   2070 fail:
   2071 	uu_free(csi.csi_array);
   2072 	return (result);
   2073 }
   2074 
   2075 /*ARGSUSED*/
   2076 static int
   2077 object_copy_string(void *data_arg, int columns, char **vals, char **names)
   2078 {
   2079 	char **data = data_arg;
   2080 
   2081 	assert(columns == 1);
   2082 
   2083 	if (*data != NULL)
   2084 		free(*data);
   2085 	*data = NULL;
   2086 
   2087 	if (vals[0] != NULL) {
   2088 		if ((*data = strdup(vals[0])) == NULL)
   2089 			return (BACKEND_CALLBACK_ABORT);
   2090 	}
   2091 
   2092 	return (BACKEND_CALLBACK_CONTINUE);
   2093 }
   2094 
   2095 struct snaplevel_add_info {
   2096 	backend_query_t *sai_q;
   2097 	uint32_t	sai_level_id;
   2098 	int		sai_used;		/* sai_q has been used */
   2099 };
   2100 
   2101 /*ARGSUSED*/
   2102 static int
   2103 object_snaplevel_process_pg(void *data_arg, int columns, char **vals,
   2104     char **names)
   2105 {
   2106 	struct snaplevel_add_info *data = data_arg;
   2107 
   2108 	assert(columns == 5);
   2109 
   2110 	backend_query_add(data->sai_q,
   2111 	    "INSERT INTO snaplevel_lnk_tbl "
   2112 	    "    (snaplvl_level_id, snaplvl_pg_id, snaplvl_pg_name, "
   2113 	    "    snaplvl_pg_type, snaplvl_pg_flags, snaplvl_gen_id)"
   2114 	    "VALUES (%d, %s, '%q', '%q', %s, %s);",
   2115 	    data->sai_level_id, vals[0], vals[1], vals[2], vals[3], vals[4]);
   2116 
   2117 	data->sai_used = 1;
   2118 
   2119 	return (BACKEND_CALLBACK_CONTINUE);
   2120 }
   2121 
   2122 /*ARGSUSED*/
   2123 static int
   2124 object_snapshot_add_level(backend_tx_t *tx, uint32_t snap_id,
   2125     uint32_t snap_level_num, uint32_t svc_id, const char *svc_name,
   2126     uint32_t inst_id, const char *inst_name)
   2127 {
   2128 	struct snaplevel_add_info data;
   2129 	backend_query_t *q;
   2130 	int result;
   2131 
   2132 	assert((snap_level_num == 1 && inst_name != NULL) ||
   2133 	    snap_level_num == 2 && inst_name == NULL);
   2134 
   2135 	data.sai_level_id = backend_new_id(tx, BACKEND_ID_SNAPLEVEL);
   2136 	if (data.sai_level_id == 0) {
   2137 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
   2138 	}
   2139 
   2140 	result = backend_tx_run_update(tx,
   2141 	    "INSERT INTO snaplevel_tbl "
   2142 	    "    (snap_id, snap_level_num, snap_level_id, "
   2143 	    "    snap_level_service_id, snap_level_service, "
   2144 	    "    snap_level_instance_id, snap_level_instance) "
   2145 	    "VALUES (%d, %d, %d, %d, %Q, %d, %Q);",
   2146 	    snap_id, snap_level_num, data.sai_level_id, svc_id, svc_name,
   2147 	    inst_id, inst_name);
   2148 
   2149 	q = backend_query_alloc();
   2150 	backend_query_add(q,
   2151 	    "SELECT pg_id, pg_name, pg_type, pg_flags, pg_gen_id FROM pg_tbl "
   2152 	    "WHERE (pg_parent_id = %d);",
   2153 	    (inst_name != NULL)? inst_id : svc_id);
   2154 
   2155 	data.sai_q = backend_query_alloc();
   2156 	data.sai_used = 0;
   2157 	result = backend_tx_run(tx, q, object_snaplevel_process_pg,
   2158 	    &data);
   2159 	backend_query_free(q);
   2160 
   2161 	if (result == REP_PROTOCOL_SUCCESS && data.sai_used != 0)
   2162 		result = backend_tx_run(tx, data.sai_q, NULL, NULL);
   2163 	backend_query_free(data.sai_q);
   2164 
   2165 	return (result);
   2166 }
   2167 
   2168 /*
   2169  * Fails with:
   2170  *	_NO_RESOURCES - no new id or out of disk space
   2171  *	_BACKEND_READONLY - persistent backend is read-only
   2172  */
   2173 static int
   2174 object_snapshot_do_take(uint32_t instid, const char *inst_name,
   2175     uint32_t svcid, const char *svc_name,
   2176     backend_tx_t **tx_out, uint32_t *snapid_out)
   2177 {
   2178 	backend_tx_t *tx;
   2179 	backend_query_t *q;
   2180 	int result;
   2181 
   2182 	char *svc_name_alloc = NULL;
   2183 	char *inst_name_alloc = NULL;
   2184 	uint32_t snapid;
   2185 
   2186 	result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
   2187 	if (result != REP_PROTOCOL_SUCCESS)
   2188 		return (result);
   2189 
   2190 	snapid = backend_new_id(tx, BACKEND_ID_SNAPSHOT);
   2191 	if (snapid == 0) {
   2192 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
   2193 		goto fail;
   2194 	}
   2195 
   2196 	if (svc_name == NULL) {
   2197 		q = backend_query_alloc();
   2198 		backend_query_add(q,
   2199 		    "SELECT svc_name FROM service_tbl "
   2200 		    "WHERE (svc_id = %d)", svcid);
   2201 		result = backend_tx_run(tx, q, object_copy_string,
   2202 		    &svc_name_alloc);
   2203 		backend_query_free(q);
   2204 
   2205 		svc_name = svc_name_alloc;
   2206 
   2207 		if (result == REP_PROTOCOL_DONE) {
   2208 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
   2209 			goto fail;
   2210 		}
   2211 		if (result == REP_PROTOCOL_SUCCESS && svc_name == NULL)
   2212 			backend_panic("unable to find name for svc id %d\n",
   2213 			    svcid);
   2214 
   2215 		if (result != REP_PROTOCOL_SUCCESS)
   2216 			goto fail;
   2217 	}
   2218 
   2219 	if (inst_name == NULL) {
   2220 		q = backend_query_alloc();
   2221 		backend_query_add(q,
   2222 		    "SELECT instance_name FROM instance_tbl "
   2223 		    "WHERE (instance_id = %d)", instid);
   2224 		result = backend_tx_run(tx, q, object_copy_string,
   2225 		    &inst_name_alloc);
   2226 		backend_query_free(q);
   2227 
   2228 		inst_name = inst_name_alloc;
   2229 
   2230 		if (result == REP_PROTOCOL_DONE) {
   2231 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
   2232 			goto fail;
   2233 		}
   2234 
   2235 		if (result == REP_PROTOCOL_SUCCESS && inst_name == NULL)
   2236 			backend_panic(
   2237 			    "unable to find name for instance id %d\n", instid);
   2238 
   2239 		if (result != REP_PROTOCOL_SUCCESS)
   2240 			goto fail;
   2241 	}
   2242 
   2243 	result = object_snapshot_add_level(tx, snapid, 1,
   2244 	    svcid, svc_name, instid, inst_name);
   2245 
   2246 	if (result != REP_PROTOCOL_SUCCESS)
   2247 		goto fail;
   2248 
   2249 	result = object_snapshot_add_level(tx, snapid, 2,
   2250 	    svcid, svc_name, 0, NULL);
   2251 
   2252 	if (result != REP_PROTOCOL_SUCCESS)
   2253 		goto fail;
   2254 
   2255 	*snapid_out = snapid;
   2256 	*tx_out = tx;
   2257 
   2258 	free(svc_name_alloc);
   2259 	free(inst_name_alloc);
   2260 
   2261 	return (REP_PROTOCOL_SUCCESS);
   2262 
   2263 fail:
   2264 	backend_tx_rollback(tx);
   2265 	free(svc_name_alloc);
   2266 	free(inst_name_alloc);
   2267 	return (result);
   2268 }
   2269 
   2270 /*
   2271  * Fails with:
   2272  *	_TYPE_MISMATCH - pp is not an instance
   2273  *	_NO_RESOURCES - no new id or out of disk space
   2274  *	_BACKEND_READONLY - persistent backend is read-only
   2275  */
   2276 int
   2277 object_snapshot_take_new(rc_node_t *pp,
   2278     const char *svc_name, const char *inst_name,
   2279     const char *name, rc_node_t **outp)
   2280 {
   2281 	rc_node_lookup_t *insti = &pp->rn_id;
   2282 
   2283 	uint32_t instid = insti->rl_main_id;
   2284 	uint32_t svcid = insti->rl_ids[ID_SERVICE];
   2285 	uint32_t snapid = 0;
   2286 	backend_tx_t *tx = NULL;
   2287 	child_info_t ci;
   2288 	rc_node_t *np;
   2289 	int result;
   2290 
   2291 	if (insti->rl_type != REP_PROTOCOL_ENTITY_INSTANCE)
   2292 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
   2293 
   2294 	result = object_snapshot_do_take(instid, inst_name, svcid, svc_name,
   2295 	    &tx, &snapid);
   2296 	if (result != REP_PROTOCOL_SUCCESS)
   2297 		return (result);
   2298 
   2299 	if ((result = object_do_create(tx, &ci, pp,
   2300 	    REP_PROTOCOL_ENTITY_SNAPSHOT, name, &np)) != REP_PROTOCOL_SUCCESS) {
   2301 		backend_tx_rollback(tx);
   2302 		return (result);
   2303 	}
   2304 
   2305 	/*
   2306 	 * link the new object to the new snapshot.
   2307 	 */
   2308 	np->rn_snapshot_id = snapid;
   2309 
   2310 	result = backend_tx_run_update(tx,
   2311 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
   2312 	    snapid, ci.ci_base_nl.rl_main_id);
   2313 	if (result != REP_PROTOCOL_SUCCESS) {
   2314 		backend_tx_rollback(tx);
   2315 		rc_node_destroy(np);
   2316 		return (result);
   2317 	}
   2318 	result = backend_tx_commit(tx);
   2319 	if (result != REP_PROTOCOL_SUCCESS) {
   2320 		rc_node_destroy(np);
   2321 		return (result);
   2322 	}
   2323 
   2324 	*outp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
   2325 	return (REP_PROTOCOL_SUCCESS);
   2326 }
   2327 
   2328 /*
   2329  * Fails with:
   2330  *	_TYPE_MISMATCH - pp is not an instance
   2331  *	_NO_RESOURCES - no new id or out of disk space
   2332  *	_BACKEND_READONLY - persistent backend is read-only
   2333  */
   2334 int
   2335 object_snapshot_attach(rc_node_lookup_t *snapi, uint32_t *snapid_ptr,
   2336     int takesnap)
   2337 {
   2338 	uint32_t svcid = snapi->rl_ids[ID_SERVICE];
   2339 	uint32_t instid = snapi->rl_ids[ID_INSTANCE];
   2340 	uint32_t snapid = *snapid_ptr;
   2341 	uint32_t oldsnapid = 0;
   2342 	backend_tx_t *tx = NULL;
   2343 	backend_query_t *q;
   2344 	int result;
   2345 
   2346 	delete_info_t dip;
   2347 	delete_ent_t de;
   2348 
   2349 	if (snapi->rl_type != REP_PROTOCOL_ENTITY_SNAPSHOT)
   2350 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
   2351 
   2352 	if (takesnap) {
   2353 		/* first, check that we're actually out of date */
   2354 		if (object_check_snapshot(snapid) == REP_PROTOCOL_SUCCESS)
   2355 			return (REP_PROTOCOL_SUCCESS);
   2356 
   2357 		result = object_snapshot_do_take(instid, NULL,
   2358 		    svcid, NULL, &tx, &snapid);
   2359 		if (result != REP_PROTOCOL_SUCCESS)
   2360 			return (result);
   2361 	} else {
   2362 		result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
   2363 		if (result != REP_PROTOCOL_SUCCESS)
   2364 			return (result);
   2365 	}
   2366 
   2367 	q = backend_query_alloc();
   2368 	backend_query_add(q,
   2369 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
   2370 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
   2371 	    snapi->rl_main_id, snapid, snapi->rl_main_id);
   2372 	result = backend_tx_run_single_int(tx, q, &oldsnapid);
   2373 	backend_query_free(q);
   2374 
   2375 	if (result == REP_PROTOCOL_FAIL_NOT_FOUND) {
   2376 		backend_tx_rollback(tx);
   2377 		backend_panic("unable to find snapshot id %d",
   2378 		    snapi->rl_main_id);
   2379 	}
   2380 	if (result != REP_PROTOCOL_SUCCESS)
   2381 		goto fail;
   2382 
   2383 	/*
   2384 	 * Now we use the delete stack to handle the possible unreferencing
   2385 	 * of oldsnapid.
   2386 	 */
   2387 	(void) memset(&dip, 0, sizeof (dip));
   2388 	dip.di_tx = tx;
   2389 	dip.di_np_tx = NULL;	/* no need for non-persistant backend */
   2390 
   2391 	if ((result = delete_stack_push(&dip, BACKEND_TYPE_NORMAL,
   2392 	    &snaplevel_tbl_delete, oldsnapid, 0)) != REP_PROTOCOL_SUCCESS)
   2393 		goto fail;
   2394 
   2395 	while (delete_stack_pop(&dip, &de)) {
   2396 		result = (*de.de_cb)(&dip, &de);
   2397 		if (result != REP_PROTOCOL_SUCCESS)
   2398 			goto fail;
   2399 	}
   2400 
   2401 	result = backend_tx_commit(tx);
   2402 	if (result != REP_PROTOCOL_SUCCESS)
   2403 		goto fail;
   2404 
   2405 	delete_stack_cleanup(&dip);
   2406 	*snapid_ptr = snapid;
   2407 	return (REP_PROTOCOL_SUCCESS);
   2408 
   2409 fail:
   2410 	backend_tx_rollback(tx);
   2411 	delete_stack_cleanup(&dip);
   2412 	return (result);
   2413 }
   2414