Home | History | Annotate | Download | only in nfs
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 
     26 #include <sys/flock.h>
     27 #include <nfs/export.h>
     28 #include <sys/cmn_err.h>
     29 #include <sys/atomic.h>
     30 #include <nfs/nfs.h>
     31 #include <nfs/nfs4.h>
     32 #include <nfs/nfssys.h>
     33 #include <nfs/lm.h>
     34 #include <sys/pathname.h>
     35 #include <sys/sdt.h>
     36 #include <sys/nvpair.h>
     37 #include <sys/sdt.h>
     38 #include <sys/disp.h>
     39 #include <sys/id_space.h>
     40 
     41 extern u_longlong_t nfs4_srv_caller_id;
     42 
     43 #include <nfs/nfs_sstor_impl.h>
     44 #include <nfs/mds_state.h>
     45 #include <nfs/nfs41_sessions.h>
     46 
     47 #include <nfs/nfs41_filehandle.h>
     48 
     49 #include <nfs/spe_impl.h>
     50 
     51 static void mds_do_lorecall(mds_lorec_t *);
     52 static int  mds_lorecall_cmd(struct mds_reclo_args *, cred_t *);
     53 static int  mds_notify_device_cmd(struct mds_notifydev_args *, cred_t *);
     54 
     55 extern void mds_do_cb_recall(struct rfs4_deleg_state *, bool_t);
     56 
     57 /*
     58  * XXX - slrc_slot_size will more than likely have to be
     59  *	 computed dynamically as the server adjusts the
     60  *	 sessions' slot replay cache size. This should be
     61  *	 good for proto.
     62  */
     63 slotid4 slrc_slot_size = MAXSLOTS;
     64 slotid4	bc_slot_tab = 0;	/* backchan slots are set by client */
     65 
     66 /* The values below are rfs4_lease_time units */
     67 
     68 #ifdef DEBUG
     69 #define	SESSION_CACHE_TIME 1
     70 #else
     71 #define	SESSION_CACHE_TIME 10
     72 #endif
     73 
     74 #define	ONES_64	(0xFFFFFFFFFFFFFFFFuLL)
     75 
     76 /* Sessions */
     77 static void mds_session_destroy(rfs4_entry_t);
     78 static bool_t mds_session_expiry(rfs4_entry_t);
     79 static bool_t mds_session_create(rfs4_entry_t, void *);
     80 static uint32_t sessid_hash(void *);
     81 static bool_t sessid_compare(rfs4_entry_t, void *);
     82 static void *sessid_mkkey(rfs4_entry_t);
     83 
     84 /* function pointers for mdsadm */
     85 
     86 extern int (*mds_recall_lo)(struct mds_reclo_args *, cred_t *);
     87 extern int (*mds_notify_device)(struct mds_notifydev_args *, cred_t *);
     88 
     89 extern char *kstrdup(const char *);
     90 
     91 extern rfs4_client_t *findclient(nfs_server_instance_t *, nfs_client_id4 *,
     92     bool_t *, rfs4_client_t *);
     93 
     94 extern rfs4_client_t *findclient_by_id(nfs_server_instance_t *, clientid4);
     95 
     96 extern rfs4_openowner_t *findopenowner(nfs_server_instance_t *, open_owner4 *,
     97     bool_t *, seqid4);
     98 
     99 extern void v4prot_sstor_init(nfs_server_instance_t *);
    100 
    101 extern void rfs4_ss_retrieve_state(nfs_server_instance_t *);
    102 extern int nfs_doorfd;
    103 
    104 #ifdef DEBUG
    105 #define	MDS_TABSIZE 17
    106 #else
    107 #define	MDS_TABSIZE 2047
    108 #endif
    109 
    110 #define	MDS_MAXTABSZ 1024*1024
    111 
    112 extern uint32_t clientid_hash(void *);
    113 
    114 /*
    115  * Returns the instances capabilities flag word
    116  * the form of:
    117  *
    118  *  EXCHGID4_FLAG_USE_NON_PNFS
    119  *  EXCHGID4_FLAG_USE_PNFS_MDS
    120  *  EXCHGID4_FLAG_USE_PNFS_DS
    121  *
    122  */
    123 uint32_t
    124 mds_get_capabilities(nfs_server_instance_t *instp)
    125 {
    126 	uint32_t my_abilities = 0;
    127 
    128 	if (instp)
    129 		my_abilities =
    130 		    instp->inst_flags & EXCHGID4_FLAG_MASK_PNFS;
    131 	return (my_abilities);
    132 }
    133 
    134 
    135 /*ARGSUSED*/
    136 static bool_t
    137 mds_do_not_expire(rfs4_entry_t u_entry)
    138 {
    139 	return (FALSE);
    140 }
    141 
    142 /*ARGSUSED*/
    143 static stateid_t
    144 mds_create_stateid(rfs4_dbe_t *dbe, stateid_type_t id_type)
    145 {
    146 	stateid_t id;
    147 
    148 	id.v41_bits.boottime = dbe_to_instp(dbe)->start_time;
    149 	id.v41_bits.state_ident = rfs4_dbe_getid(dbe);
    150 	id.v41_bits.chgseq = 0;
    151 	id.v41_bits.type = id_type;
    152 	id.v41_bits.pid = 0;
    153 
    154 	return (id);
    155 }
    156 
    157 
    158 rfs4_openowner_t *
    159 mds_findopenowner(nfs_server_instance_t *instp, open_owner4 *openowner,
    160     bool_t *create)
    161 {
    162 	rfs4_openowner_t *oo;
    163 	rfs4_openowner_t arg;
    164 
    165 	arg.ro_owner = *openowner;
    166 	arg.ro_open_seqid = 0;
    167 	oo = (rfs4_openowner_t *)rfs4_dbsearch(instp->openowner_idx,
    168 	    openowner, create, &arg, RFS4_DBS_VALID);
    169 	return (oo);
    170 }
    171 
    172 rfs4_lo_state_t *
    173 mds_findlo_state_by_owner(rfs4_lockowner_t *lo,
    174 			rfs4_state_t *sp, bool_t *create)
    175 {
    176 	rfs4_lo_state_t *lsp;
    177 	rfs4_lo_state_t arg;
    178 	nfs_server_instance_t *instp;
    179 
    180 	arg.rls_locker = lo;
    181 	arg.rls_state = sp;
    182 
    183 	instp = dbe_to_instp(lo->rl_dbe);
    184 
    185 	lsp = (rfs4_lo_state_t *)rfs4_dbsearch(instp->lo_state_owner_idx,
    186 	    &arg, create, &arg, RFS4_DBS_VALID);
    187 
    188 	return (lsp);
    189 }
    190 
    191 /* XXX: well clearly this needs to be cleaned up.. */
    192 typedef union {
    193 	struct {
    194 		uint32_t start_time;
    195 		uint32_t c_id;
    196 	} impl_id;
    197 	clientid4 id4;
    198 } cid;
    199 
    200 int
    201 mds_check_stateid_seqid(rfs4_state_t *sp, stateid4 *stateid)
    202 {
    203 	stateid_t *id = (stateid_t *)stateid;
    204 
    205 	if (rfs4_lease_expired(sp->rs_owner->ro_client))
    206 		return (NFS4_CHECK_STATEID_EXPIRED);
    207 
    208 	/* Stateid is some time in the future - that's bad */
    209 	if (sp->rs_stateid.v41_bits.chgseq < id->v41_bits.chgseq)
    210 		return (NFS4_CHECK_STATEID_BAD);
    211 
    212 	if (sp->rs_closed == TRUE)
    213 		return (NFS4_CHECK_STATEID_CLOSED);
    214 
    215 	return (NFS4_CHECK_STATEID_OKAY);
    216 }
    217 
    218 int
    219 mds_fh_is_exi(struct exportinfo *exi, nfs41_fh_fmt_t *fhp)
    220 {
    221 	if (exi->exi_fid.fid_len != fhp->fh.v1.export_fid.len)
    222 		return (0);
    223 
    224 	if (bcmp(exi->exi_fid.fid_data, fhp->fh.v1.export_fid.val,
    225 	    fhp->fh.v1.export_fid.len) != 0)
    226 		return (0);
    227 
    228 	if (exi->exi_fsid.val[0] != fhp->fh.v1.export_fsid.val[0] ||
    229 	    exi->exi_fsid.val[1] != fhp->fh.v1.export_fsid.val[1])
    230 		return (0);
    231 
    232 	return (1);
    233 }
    234 
    235 /*
    236  * This function is used as a target for the rfs4_dbe_walk() call
    237  * below.  The purpose of this function is to see if the
    238  * lockowner_state refers to a file that resides within the exportinfo
    239  * export.  If so, then remove the lock_owner state (file locks and
    240  * share "locks") for this object since the intent is the server is
    241  * unexporting the specified directory.  Be sure to invalidate the
    242  * object after the state has been released
    243  */
    244 void
    245 mds_lo_state_walk_callout(rfs4_entry_t u_entry, void *e)
    246 {
    247 	rfs4_lo_state_t *lsp = (rfs4_lo_state_t *)u_entry;
    248 	struct exportinfo *exi = (struct exportinfo *)e;
    249 	nfs41_fh_fmt_t   *fhp;
    250 
    251 	fhp = (nfs41_fh_fmt_t *)
    252 	    lsp->rls_state->rs_finfo->rf_filehandle.nfs_fh4_val;
    253 
    254 	if (mds_fh_is_exi(exi, fhp)) {
    255 		rfs4_state_close(lsp->rls_state, FALSE, FALSE, CRED());
    256 		rfs4_dbe_invalidate(lsp->rls_dbe);
    257 		rfs4_dbe_invalidate(lsp->rls_state->rs_dbe);
    258 	}
    259 }
    260 
    261 /*
    262  * This function is used as a target for the rfs4_dbe_walk() call
    263  * below.  The purpose of this function is to see if the state refers
    264  * to a file that resides within the exportinfo export.  If so, then
    265  * remove the open state for this object since the intent is the
    266  * server is unexporting the specified directory.  The main result for
    267  * this type of entry is to invalidate it such it will not be found in
    268  * the future.
    269  */
    270 void
    271 mds_state_walk_callout(rfs4_entry_t u_entry, void *e)
    272 {
    273 	rfs4_state_t *sp = (rfs4_state_t *)u_entry;
    274 	struct exportinfo *exi = (struct exportinfo *)e;
    275 	nfs41_fh_fmt_t   *fhp;
    276 
    277 	fhp =
    278 	    (nfs41_fh_fmt_t *)sp->rs_finfo->rf_filehandle.nfs_fh4_val;
    279 
    280 	if (mds_fh_is_exi(exi, fhp)) {
    281 		rfs4_state_close(sp, TRUE, FALSE, CRED());
    282 		rfs4_dbe_invalidate(sp->rs_dbe);
    283 	}
    284 }
    285 
    286 /*
    287  * This function is used as a target for the rfs4_dbe_walk() call
    288  * below.  The purpose of this function is to see if the state refers
    289  * to a file that resides within the exportinfo export.  If so, then
    290  * remove the deleg state for this object since the intent is the
    291  * server is unexporting the specified directory.  The main result for
    292  * this type of entry is to invalidate it such it will not be found in
    293  * the future.
    294  */
    295 void
    296 mds_deleg_state_walk_callout(rfs4_entry_t u_entry, void *e)
    297 {
    298 	rfs4_deleg_state_t *dsp = (rfs4_deleg_state_t *)u_entry;
    299 	struct exportinfo *exi = (struct exportinfo *)e;
    300 	nfs41_fh_fmt_t   *fhp;
    301 
    302 	fhp =
    303 	    (nfs41_fh_fmt_t *)dsp->rds_finfo->rf_filehandle.nfs_fh4_val;
    304 
    305 	if (mds_fh_is_exi(exi, fhp)) {
    306 		rfs4_dbe_invalidate(dsp->rds_dbe);
    307 	}
    308 }
    309 
    310 /*
    311  * This function is used as a target for the rfs4_dbe_walk() call
    312  * below.  The purpose of this function is to see if the state refers
    313  * to a file that resides within the exportinfo export.  If so, then
    314  * release vnode hold for this object since the intent is the server
    315  * is unexporting the specified directory.  Invalidation will prevent
    316  * this struct from being found in the future.
    317  */
    318 void
    319 mds_file_walk_callout(rfs4_entry_t u_entry, void *e)
    320 {
    321 	rfs4_file_t *fp = (rfs4_file_t *)u_entry;
    322 	struct exportinfo *exi = (struct exportinfo *)e;
    323 	nfs41_fh_fmt_t   *fhp;
    324 	vnode_t *vp;
    325 	nfs_server_instance_t *instp;
    326 
    327 	fhp = (nfs41_fh_fmt_t *)fp->rf_filehandle.nfs_fh4_val;
    328 
    329 	if (mds_fh_is_exi(exi, fhp) == 0)
    330 		return;
    331 
    332 	if ((vp = fp->rf_vp) != NULL) {
    333 		instp = dbe_to_instp(fp->rf_dbe);
    334 		ASSERT(instp);
    335 
    336 		/*
    337 		 * don't leak monitors and remove the reference
    338 		 * put on the vnode when the delegation was granted.
    339 		 */
    340 		if (fp->rf_dinfo->rd_dtype == OPEN_DELEGATE_READ) {
    341 			(void) fem_uninstall(vp, instp->deleg_rdops,
    342 			    (void *)fp);
    343 			vn_open_downgrade(vp, FREAD);
    344 		} else if (fp->rf_dinfo->rd_dtype == OPEN_DELEGATE_WRITE) {
    345 			(void) fem_uninstall(vp, instp->deleg_wrops,
    346 			    (void *)fp);
    347 			vn_open_downgrade(vp, FREAD|FWRITE);
    348 		}
    349 
    350 		mutex_enter(&vp->v_lock);
    351 		(void) vsd_set(vp, instp->vkey, NULL);
    352 		mutex_exit(&vp->v_lock);
    353 		VN_RELE(vp);
    354 		fp->rf_vp = NULL;
    355 	}
    356 
    357 	rfs4_dbe_invalidate(fp->rf_dbe);
    358 }
    359 
    360 /*
    361  * --------------------------------------------------------
    362  * MDS - NFSv4.1  Sessions
    363  * --------------------------------------------------------
    364  */
    365 static uint32_t
    366 sessid_hash(void *key)
    367 {
    368 	sid *idp = key;
    369 
    370 	return (idp->impl_id.s_id);
    371 }
    372 
    373 static bool_t
    374 sessid_compare(rfs4_entry_t entry, void *key)
    375 {
    376 	mds_session_t	*sp = (mds_session_t *)entry;
    377 	sessionid4	*idp = (sessionid4 *)key;
    378 
    379 	return (bcmp(idp, &sp->sn_sessid, sizeof (sessionid4)) == 0);
    380 }
    381 
    382 static void *
    383 sessid_mkkey(rfs4_entry_t entry)
    384 {
    385 	mds_session_t *sp = (mds_session_t *)entry;
    386 
    387 	return (&sp->sn_sessid);
    388 }
    389 
    390 static bool_t
    391 sess_clid_compare(rfs4_entry_t entry, void *key)
    392 {
    393 	mds_session_t *sp = (mds_session_t *)entry;
    394 	clientid4 *idp = key;
    395 
    396 	return (*idp == sp->sn_clnt->rc_clientid);
    397 }
    398 
    399 static void *
    400 sess_clid_mkkey(rfs4_entry_t entry)
    401 {
    402 	return (&(((mds_session_t *)entry)->sn_clnt->rc_clientid));
    403 }
    404 
    405 void
    406 rfs41_session_rele(mds_session_t *sp)
    407 {
    408 	rfs4_dbe_rele(sp->sn_dbe);
    409 }
    410 
    411 mds_session_t *
    412 mds_findsession_by_id(nfs_server_instance_t *instp, sessionid4 sessid)
    413 {
    414 	mds_session_t	*sp;
    415 	rfs4_index_t	*idx = instp->mds_session_idx;
    416 	bool_t		 create = FALSE;
    417 
    418 	rw_enter(&instp->findsession_lock, RW_READER);
    419 	sp = (mds_session_t *)rfs4_dbsearch(idx, sessid, &create, NULL,
    420 	    RFS4_DBS_VALID);
    421 	rw_exit(&instp->findsession_lock);
    422 
    423 	return (sp);
    424 }
    425 
    426 mds_session_t *
    427 mds_findsession_by_clid(nfs_server_instance_t *instp, clientid4 clid)
    428 {
    429 	mds_session_t	*sp;
    430 	bool_t		 create = FALSE;
    431 
    432 	rw_enter(&instp->findsession_lock, RW_READER);
    433 	sp = (mds_session_t *)rfs4_dbsearch(instp->mds_sess_clientid_idx, &clid,
    434 	    &create, NULL, RFS4_DBS_VALID);
    435 	rw_exit(&instp->findsession_lock);
    436 
    437 	return (sp);
    438 }
    439 
    440 /*
    441  * A clientid can have multiple sessions associated with it. Hence,
    442  * performing a raw 'mds_findsession' (even for a create) might
    443  * yield a list of sessions associated with the clientid in question.
    444  * Instead of delving deep into the rfs4_dbsearch engine to correct
    445  * this now, we'll call our function directly and create an association
    446  * between the session table and both primary (sessionid) index and
    447  * secondary (clientid) index for the newly created session.
    448  */
    449 mds_session_t	*
    450 mds_createsession(nfs_server_instance_t *instp, session41_create_t *ap)
    451 {
    452 	mds_session_t	*sp = NULL;
    453 	rfs4_index_t	*idx = instp->mds_session_idx;
    454 
    455 	rw_enter(&instp->findsession_lock, RW_WRITER);
    456 	if ((sp = (mds_session_t *)rfs4_dbcreate(idx, (void *)ap)) == NULL) {
    457 		DTRACE_PROBE1(mds__srv__createsession__fail,
    458 		    session41_create_t *, ap);
    459 	}
    460 	rw_exit(&instp->findsession_lock);
    461 	return (sp);
    462 }
    463 
    464 /*
    465  * mds_session_inval invalidates the session so other
    466  * threads won't "find" the session to place additional
    467  * callbacks. Destroy session even if no backchannel has
    468  * been established.
    469  */
    470 nfsstat4
    471 mds_session_inval(mds_session_t	*sp)
    472 {
    473 	nfsstat4	status;
    474 
    475 	ASSERT(sp != NULL);
    476 	ASSERT(rfs4_dbe_islocked(sp->sn_dbe));
    477 
    478 	if (SN_CB_CHAN_EST(sp)) {
    479 		sess_channel_t	*bcp = sp->sn_back;
    480 		sess_bcsd_t	*bsdp;
    481 
    482 		rw_enter(&bcp->cn_lock, RW_READER);
    483 		if ((bsdp = CTOBSD(bcp)) == NULL)
    484 			cmn_err(CE_PANIC, "mds_session_inval: BCSD Not Set");
    485 
    486 		rw_enter(&bsdp->bsd_rwlock, RW_READER);
    487 		status = bsdp->bsd_stat = slot_cb_status(bsdp->bsd_stok);
    488 		rw_exit(&bsdp->bsd_rwlock);
    489 
    490 		rw_exit(&bcp->cn_lock);
    491 	} else {
    492 		cmn_err(CE_NOTE, "No back chan established");
    493 		status = NFS4_OK;
    494 	}
    495 
    496 	/* only invalidate sess if no bc traffic */
    497 	if (status == NFS4_OK)
    498 		rfs4_dbe_invalidate(sp->sn_dbe);
    499 
    500 	return (status);
    501 }
    502 
    503 /*
    504  * 1) Invalidate the session in the DB (so it can't be found anymore)
    505  * 2) Verify that there's no outstanding CB traffic. If so, return err.
    506  * 3) Eventually the session will be reaped by the reaper_thread
    507  */
    508 nfsstat4
    509 mds_destroysession(mds_session_t *sp)
    510 {
    511 	nfsstat4	cbs;
    512 
    513 	rfs4_dbe_lock(sp->sn_dbe);
    514 	cbs = mds_session_inval(sp);
    515 	rfs4_dbe_unlock(sp->sn_dbe);
    516 
    517 	/*
    518 	 * The reference/hold maintained from the session to the client
    519 	 * struct gets nuked when the DB calls rfs4_dbe_destroy, which
    520 	 * in turn calls mds_session_destroy.
    521 	 */
    522 	if (cbs == NFS4_OK)
    523 		rfs41_session_rele(sp);
    524 
    525 	return (cbs);
    526 }
    527 
    528 sn_chan_dir_t
    529 pd2cd(channel_dir_from_server4 dir)
    530 {
    531 	switch (dir) {
    532 	case CDFS4_FORE:
    533 		return (SN_CHAN_FORE);
    534 
    535 	case CDFS4_BACK:
    536 		return (SN_CHAN_BACK);
    537 
    538 	case CDFS4_BOTH:
    539 	default:
    540 		return (SN_CHAN_BOTH);
    541 	}
    542 	/* NOTREACHED */
    543 }
    544 
    545 /*
    546  * Delegation CB race detection support
    547  */
    548 void
    549 rfs41_deleg_rs_hold(rfs4_deleg_state_t *dsp)
    550 {
    551 	atomic_add_32(&dsp->rds_rs.refcnt, 1);
    552 }
    553 
    554 void
    555 rfs41_deleg_rs_rele(rfs4_deleg_state_t *dsp)
    556 {
    557 	ASSERT(dsp->rds_rs.refcnt > 0);
    558 	atomic_add_32(&dsp->rds_rs.refcnt, -1);
    559 	if (dsp->rds_rs.refcnt == 0) {
    560 		bzero(dsp->rds_rs.sessid, sizeof (sessionid4));
    561 		dsp->rds_rs.seqid = dsp->rds_rs.slotno = 0;
    562 	}
    563 }
    564 
    565 void
    566 rfs41_seq4_hold(void *data, uint32_t flag)
    567 {
    568 	bit_attr_t	*p = (bit_attr_t *)data;
    569 	uint32_t	 idx = log2(flag);
    570 
    571 	ASSERT(p[idx].ba_bit == flag);
    572 	atomic_add_32(&p[idx].ba_refcnt, 1);
    573 	p[idx].ba_trigger = gethrestime_sec();
    574 }
    575 
    576 void
    577 rfs41_seq4_rele(void *data, uint32_t flag)
    578 {
    579 	bit_attr_t	*p = (bit_attr_t *)data;
    580 	uint32_t	 idx = log2(flag);
    581 
    582 	ASSERT(p[idx].ba_bit == flag);
    583 	if (p[idx].ba_refcnt > 0)
    584 		atomic_add_32(&p[idx].ba_refcnt, -1);
    585 	p[idx].ba_trigger = gethrestime_sec();
    586 }
    587 
    588 sess_channel_t *
    589 rfs41_create_session_channel(channel_dir_from_server4 dir)
    590 {
    591 	sess_channel_t   *cp;
    592 	sess_bcsd_t	 *bp;
    593 
    594 	cp = (sess_channel_t *)kmem_zalloc(sizeof (sess_channel_t), KM_SLEEP);
    595 	rw_init(&cp->cn_lock, NULL, RW_DEFAULT, NULL);
    596 
    597 	switch (dir) {
    598 	case CDFS4_FORE:
    599 		break;
    600 
    601 	case CDFS4_BOTH:
    602 	case CDFS4_BACK:
    603 		/* BackChan Specific Data */
    604 		bp = (sess_bcsd_t *)kmem_zalloc(sizeof (sess_bcsd_t), KM_SLEEP);
    605 		rw_init(&bp->bsd_rwlock, NULL, RW_DEFAULT, NULL);
    606 		cp->cn_csd = (sess_bcsd_t *)bp;
    607 		break;
    608 	}
    609 	return (cp);
    610 }
    611 
    612 void
    613 rfs41_destroy_session_channel(mds_session_t *sp, channel_dir_from_server4 dir)
    614 {
    615 	sess_channel_t	*cp;
    616 	sess_bcsd_t	*bp;
    617 
    618 	if (sp == NULL)
    619 		return;
    620 	if (dir == CDFS4_FORE && sp->sn_fore == NULL)
    621 		return;
    622 	if (dir == CDFS4_BACK && sp->sn_back == NULL)
    623 		return;
    624 
    625 	if (sp->sn_bdrpc) {
    626 		ASSERT(sp->sn_fore == sp->sn_back);
    627 		sp->sn_fore = NULL;
    628 		goto back;
    629 	}
    630 
    631 	if (dir == CDFS4_FORE || dir == CDFS4_BOTH) {
    632 fore:
    633 		if (sp->sn_fore == NULL)
    634 			return;
    635 		cp = sp->sn_fore;
    636 
    637 		rw_destroy(&cp->cn_lock);
    638 		kmem_free(cp, sizeof (sess_channel_t));
    639 		sp->sn_fore = NULL;
    640 	}
    641 
    642 	if (dir == CDFS4_BACK || dir == CDFS4_BOTH) {
    643 back:
    644 		if (sp->sn_back == NULL)
    645 			return;
    646 		cp = sp->sn_back;
    647 
    648 		bp = (sess_bcsd_t *)cp->cn_csd;
    649 		rw_destroy(&bp->bsd_rwlock);
    650 		kmem_free(bp, sizeof (sess_bcsd_t));
    651 
    652 		rw_destroy(&cp->cn_lock);
    653 		kmem_free(cp, sizeof (sess_channel_t));
    654 		sp->sn_back = NULL;
    655 	}
    656 }
    657 
    658 /*
    659  * Create/Initialize the session for this rfs4_client_t. Also
    660  * create its slot replay cache as per the server's resource
    661  * constraints.
    662  */
    663 /* ARGSUSED */
    664 static bool_t
    665 mds_session_create(rfs4_entry_t u_entry, void *arg)
    666 {
    667 	mds_session_t		*sp = (mds_session_t *)u_entry;
    668 	session41_create_t	*ap = (session41_create_t *)arg;
    669 	sess_channel_t		*ocp = NULL;
    670 	sid			*sidp;
    671 	SVCMASTERXPRT		*mxprt;
    672 	uint32_t		 i;
    673 	int			 bdrpc;
    674 	rpcprog_t		 prog;
    675 	channel_dir_from_server4 dir;
    676 	sess_bcsd_t		*bsdp;
    677 	nfs_server_instance_t	*instp;
    678 	int			 max_slots;
    679 	nfsstat4		 sle;
    680 	struct svc_req		*req;
    681 
    682 	ASSERT(sp != NULL);
    683 	if (sp == NULL)
    684 		return (FALSE);
    685 
    686 	instp = dbe_to_instp(sp->sn_dbe);
    687 
    688 	/*
    689 	 * Back pointer/ref to parent data struct (rfs4_client_t)
    690 	 */
    691 	sp->sn_clnt = (rfs4_client_t *)ap->cs_client;
    692 	rfs4_dbe_hold(sp->sn_clnt->rc_dbe);
    693 	req = (struct svc_req *)ap->cs_req;
    694 	mxprt = (SVCMASTERXPRT *)req->rq_xprt->xp_master;
    695 
    696 	/*
    697 	 * Handcrafting the session id
    698 	 */
    699 	sidp = (sid *)&sp->sn_sessid;
    700 	sidp->impl_id.pad0 = 0x00000000;
    701 	sidp->impl_id.pad1 = 0xFFFFFFFF;
    702 	sidp->impl_id.start_time = instp->start_time;
    703 	sidp->impl_id.s_id = (uint32_t)rfs4_dbe_getid(sp->sn_dbe);
    704 
    705 	/*
    706 	 * Process csa_flags; note that CREATE_SESSION4_FLAG_CONN_BACK_CHAN
    707 	 * is processed below since it affects direction and setup of the
    708 	 * backchannel accordingly.
    709 	 */
    710 	sp->sn_csflags = 0;
    711 	if (ap->cs_aotw.csa_flags & CREATE_SESSION4_FLAG_PERSIST)
    712 		/* XXX - Worry about persistence later */
    713 		sp->sn_csflags &= ~CREATE_SESSION4_FLAG_PERSIST;
    714 
    715 	if (ap->cs_aotw.csa_flags & CREATE_SESSION4_FLAG_CONN_RDMA)
    716 		/* XXX - No RDMA for now */
    717 		sp->sn_csflags &= ~CREATE_SESSION4_FLAG_CONN_RDMA;
    718 
    719 	/*
    720 	 * Initialize some overall sessions values
    721 	 */
    722 	sp->sn_bc.progno = ap->cs_aotw.csa_cb_program;
    723 	sp->sn_laccess = gethrestime_sec();
    724 	sp->sn_flags = 0;
    725 
    726 	/*
    727 	 * Check if client has specified that the FORE channel should
    728 	 * also be used for call back traffic (ie. bidir RPC). If so,
    729 	 * let's try to accomodate the request.
    730 	 */
    731 	DTRACE_PROBE1(csa__flags, uint32_t, ap->cs_aotw.csa_flags);
    732 	bdrpc = ap->cs_aotw.csa_flags & CREATE_SESSION4_FLAG_CONN_BACK_CHAN;
    733 
    734 	if (bdrpc) {
    735 		SVCCB_ARGS cbargs;
    736 		prog = sp->sn_bc.progno;
    737 		cbargs.xprt = mxprt;
    738 		cbargs.prog = prog;
    739 		cbargs.vers = NFS_CB;
    740 		cbargs.family = AF_INET;
    741 		cbargs.tag = (void *)sp->sn_sessid;
    742 
    743 		if (SVC_CTL(req->rq_xprt, SVCCTL_SET_CBCONN, (void *)&cbargs)) {
    744 			/*
    745 			 * Couldn't create a bi-dir RPC connection. Reset
    746 			 * bdrpc so that the session's channel flags are
    747 			 * set appropriately and the client knows it needs
    748 			 * to do the BIND_CONN_TO_SESSION dance in order
    749 			 * to establish a callback path.
    750 			 */
    751 			bdrpc = 0;
    752 		}
    753 	}
    754 
    755 	/*
    756 	 * Session's channel flags depending on bdrpc
    757 	 */
    758 	sp->sn_bdrpc = bdrpc;
    759 	dir = sp->sn_bdrpc ? (CDFS4_FORE | CDFS4_BACK) : CDFS4_FORE;
    760 	ocp = rfs41_create_session_channel(dir);
    761 	ocp->cn_dir = dir;
    762 	sp->sn_fore = ocp;
    763 
    764 	/*
    765 	 * Check if channel attrs will be flexible enough for future
    766 	 * purposes. Channel attribute enforcement is done as part of
    767 	 * COMPOUND processing.
    768 	 */
    769 	ocp->cn_attrs = ap->cs_aotw.csa_fore_chan_attrs;
    770 	if (sle = sess_chan_limits(ocp)) {
    771 		ap->cs_error = sle;
    772 		return (FALSE);
    773 	}
    774 
    775 	/*
    776 	 * No need for locks/synchronization at this time,
    777 	 * since we're barely creating the session.
    778 	 */
    779 	if (sp->sn_bdrpc) {
    780 		/*
    781 		 * bcsd got built as part of the channel's construction.
    782 		 */
    783 		if ((bsdp = CTOBSD(ocp)) == NULL) {
    784 			cmn_err(CE_PANIC, "Back Chan Spec Data Not Set\t"
    785 			    "<Internal Inconsistency>");
    786 		}
    787 		bc_slot_tab = ap->cs_aotw.csa_back_chan_attrs.ca_maxrequests;
    788 		slrc_table_create(&bsdp->bsd_stok, bc_slot_tab);
    789 		sp->sn_csflags |= CREATE_SESSION4_FLAG_CONN_BACK_CHAN;
    790 		sp->sn_back = ocp;
    791 
    792 	} else {
    793 		/*
    794 		 * If not doing bdrpc, then we expect the client to perform
    795 		 * an explicit BIND_CONN_TO_SESSION if it wants callback
    796 		 * traffic. Subsequently, the cb channel should be set up
    797 		 * at that point along with its corresponding slot (see
    798 		 * rfs41_bc_setup).
    799 		 */
    800 		sp->sn_csflags &= ~CREATE_SESSION4_FLAG_CONN_BACK_CHAN;
    801 		sp->sn_back = NULL;
    802 		prog = 0;
    803 
    804 		/*
    805 		 * XXX 08/15/2008 (rick) - if the channel is not bidir when
    806 		 *	created in CREATE_SESSION, then we should save off
    807 		 *	the ap->cs_aotw.csa_back_chan_attrs in case later
    808 		 *	a bc2s is called to create the back channel.
    809 		 */
    810 	}
    811 
    812 	/*
    813 	 * We're just creating the session... there _shouldn't_ be any
    814 	 * other threads wanting to add connections to this sessions'
    815 	 * conn list, so we purposefully do _not_ take the ocp->cn_lock
    816 	 *
    817 	 * sn_bc fields are all initialized to 0 (via zalloc)
    818 	 */
    819 
    820 	SVC_CTL(req->rq_xprt, SVCCTL_SET_TAG, (void *)sp->sn_sessid);
    821 
    822 	if (sp->sn_bdrpc) {
    823 		atomic_add_32(&sp->sn_bc.pngcnt, 1);
    824 	}
    825 
    826 	/*
    827 	 * Now we allocate space for the slrc, initializing each slot's
    828 	 * sequenceid and slotid to zero and a (pre)cached result of
    829 	 * NFS4ERR_SEQ_MISORDERED. Note that we zero out the entries
    830 	 * by virtue of the z-alloc.
    831 	 */
    832 	max_slots = ocp->cn_attrs.ca_maxrequests;
    833 	slrc_table_create(&sp->sn_replay, max_slots);
    834 
    835 	/* only initialize bits relevant to session scope */
    836 	bzero(&sp->sn_seq4, sizeof (bit_attr_t) * BITS_PER_WORD);
    837 	for (i = 1; i <= SEQ4_HIGH_BIT && i != 0; i <<= 1) {
    838 		uint32_t idx = log2(i);
    839 
    840 		switch (i) {
    841 		case SEQ4_STATUS_CB_GSS_CONTEXTS_EXPIRING:
    842 		case SEQ4_STATUS_CB_GSS_CONTEXTS_EXPIRED:
    843 		case SEQ4_STATUS_CB_PATH_DOWN_SESSION:
    844 		case SEQ4_STATUS_BACKCHANNEL_FAULT:
    845 			sp->sn_seq4[idx].ba_bit = i;
    846 			break;
    847 		default:
    848 			/* already bzero'ed */
    849 			break;
    850 		}
    851 	}
    852 
    853 	if (sp->sn_bdrpc) {
    854 		/*
    855 		 * Recall that for CB_PATH_DOWN[_SESSION], the refcnt
    856 		 * indicates the number of active back channel conns
    857 		 */
    858 		rfs41_seq4_hold(&sp->sn_seq4, SEQ4_STATUS_CB_PATH_DOWN_SESSION);
    859 		rfs41_seq4_hold(&sp->sn_clnt->rc_seq4,
    860 		    SEQ4_STATUS_CB_PATH_DOWN);
    861 	}
    862 	return (TRUE);
    863 }
    864 
    865 /* ARGSUSED */
    866 static void
    867 mds_session_destroy(rfs4_entry_t u_entry)
    868 {
    869 	mds_session_t	*sp = (mds_session_t *)u_entry;
    870 	sess_bcsd_t	*bsdp;
    871 
    872 	if (SN_CB_CHAN_EST(sp) && ((bsdp = CTOBSD(sp->sn_back)) != NULL))
    873 		slrc_table_destroy(bsdp->bsd_stok);
    874 
    875 	/*
    876 	 * XXX - A session can have multiple BC clnt handles that need
    877 	 *	 to be discarded. mds_session_inval calls CLNT_DESTROY
    878 	 *	 which will remove the CB client handle from the global
    879 	 *	 list (cb_clnt_list) now. This will have to change once
    880 	 *	 we manage the BC clnt handles per session.
    881 	 */
    882 
    883 	/*
    884 	 * Remove the fore and back channels.
    885 	 */
    886 	rfs41_destroy_session_channel(sp, CDFS4_BOTH);
    887 
    888 	/*
    889 	 * Nuke slot replay cache for this session
    890 	 */
    891 	if (sp->sn_replay) {
    892 		slrc_table_destroy(sp->sn_replay);
    893 		sp->sn_replay = NULL;
    894 	}
    895 
    896 	/*
    897 	 * Remove reference to parent data struct
    898 	 */
    899 	if (sp->sn_clnt)
    900 		rfs4_client_rele(sp->sn_clnt);
    901 }
    902 
    903 static bool_t
    904 mds_session_expiry(rfs4_entry_t u_entry)
    905 {
    906 	mds_session_t	*sp = (mds_session_t *)u_entry;
    907 
    908 	if (sp == NULL || rfs4_dbe_is_invalid(sp->sn_dbe))
    909 		return (TRUE);
    910 
    911 	if (rfs4_lease_expired(sp->sn_clnt))
    912 		return (TRUE);
    913 
    914 	return (FALSE);
    915 }
    916 
    917 void
    918 mds_kill_session_callout(rfs4_entry_t u_entry, void *arg)
    919 {
    920 	rfs4_client_t *cp = (rfs4_client_t *)arg;
    921 	mds_session_t *sp = (mds_session_t *)u_entry;
    922 
    923 	if (sp->sn_clnt == cp && !(rfs4_dbe_is_invalid(sp->sn_dbe))) {
    924 		/*
    925 		 * client is going away; so no need to check for
    926 		 * CB channel traffic before destroying a session.
    927 		 */
    928 		rfs4_dbe_invalidate(sp->sn_dbe);
    929 	}
    930 }
    931 
    932 void
    933 mds_clean_up_sessions(rfs4_client_t *cp)
    934 {
    935 	nfs_server_instance_t *instp;
    936 
    937 	instp = dbe_to_instp(cp->rc_dbe);
    938 
    939 	if (instp->mds_session_tab != NULL)
    940 		rfs4_dbe_walk(instp->mds_session_tab,
    941 		    mds_kill_session_callout, cp);
    942 }
    943 
    944 /*
    945  * -----------------------------------------------
    946  * MDS: Layout tables.
    947  * -----------------------------------------------
    948  */
    949 static uint32_t
    950 mds_layout_hash(void *key)
    951 {
    952 	layout_core_t	*lc = (layout_core_t *)key;
    953 	int		i;
    954 	uint32_t	hash = 0;
    955 
    956 	if (lc->lc_stripe_count == 0)
    957 		return (0);
    958 
    959 	/*
    960 	 * Hash the first mds_sid
    961 	 */
    962 	for (i = 0; i < lc->lc_mds_sids[0].len; i++) {
    963 		hash <<= 1;
    964 		hash += (uint_t)lc->lc_mds_sids[0].val[i];
    965 	}
    966 
    967 	return (hash);
    968 }
    969 
    970 static bool_t
    971 mds_layout_compare(rfs4_entry_t entry, void *key)
    972 {
    973 	mds_layout_t	*lp = (mds_layout_t *)entry;
    974 	layout_core_t	*lc = (layout_core_t *)key;
    975 
    976 	int		i;
    977 
    978 	if (lc->lc_stripe_unit == lp->mlo_lc.lc_stripe_unit) {
    979 		if (lc->lc_stripe_count ==
    980 		    lp->mlo_lc.lc_stripe_count) {
    981 			for (i = 0; i < lc->lc_stripe_count; i++) {
    982 				if (lc->lc_mds_sids[i].len !=
    983 				    lp->mlo_lc.lc_mds_sids[i].len) {
    984 					return (0);
    985 				}
    986 
    987 				if (bcmp(lc->lc_mds_sids[i].val,
    988 				    lp->mlo_lc.lc_mds_sids[i].val,
    989 				    lc->lc_mds_sids[i].len)) {
    990 					return (0);
    991 				}
    992 			}
    993 
    994 			/*
    995 			 * Everything matches!
    996 			 */
    997 			return (1);
    998 		}
    999 	}
   1000 
   1001 	return (0);
   1002 }
   1003 
   1004 static void *
   1005 mds_layout_mkkey(rfs4_entry_t entry)
   1006 {
   1007 	mds_layout_t *lp = (mds_layout_t *)entry;
   1008 
   1009 	return ((void *)&lp->mlo_lc);
   1010 }
   1011 
   1012 static uint32_t
   1013 mds_layout_id_hash(void *key)
   1014 {
   1015 	return ((uint32_t)(uintptr_t)key);
   1016 }
   1017 
   1018 static bool_t
   1019 mds_layout_id_compare(rfs4_entry_t entry, void *key)
   1020 {
   1021 	mds_layout_t *lp = (mds_layout_t *)entry;
   1022 
   1023 	return (lp->mlo_id == (int)(uintptr_t)key);
   1024 }
   1025 
   1026 static void *
   1027 mds_layout_id_mkkey(rfs4_entry_t entry)
   1028 {
   1029 	mds_layout_t *lp = (mds_layout_t *)entry;
   1030 
   1031 	return ((void *)(uintptr_t)lp->mlo_id);
   1032 }
   1033 
   1034 typedef struct {
   1035 	uint32_t			id;
   1036 	nfsv4_1_file_layout_ds_addr4	*ds_addr4;
   1037 } mds_addmpd_t;
   1038 
   1039 /*
   1040  * ================================================================
   1041  *	XXX: Both mds_gather_mds_sids and mds_gen_default_layout
   1042  *	have been left in to support installations with no
   1043  *	policies defined. In short, we do not force people to
   1044  *	set up a policy system. Whenever the SMF portion of the
   1045  *	code comes along, we will nuke these functions and
   1046  *	force a real default to exist.
   1047  *  ================================================================
   1048  */
   1049 
   1050 struct mds_gather_args {
   1051 	layout_core_t	lc;
   1052 	int 		found;
   1053 };
   1054 
   1055 static void
   1056 mds_gather_mds_sids(rfs4_entry_t entry, void *arg)
   1057 {
   1058 	ds_guid_info_t		*pgi = (ds_guid_info_t *)entry;
   1059 	struct mds_gather_args	*gap = (struct mds_gather_args *)arg;
   1060 
   1061 	int i, j;
   1062 
   1063 	if (rfs4_dbe_skip_or_invalid(pgi->dbe))
   1064 		return;
   1065 
   1066 	if (gap->found < gap->lc.lc_stripe_count) {
   1067 		/*
   1068 		 * Insert in order.
   1069 		 */
   1070 		for (i = 0; i < gap->found; i++) {
   1071 			if ((pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_len <
   1072 			    gap->lc.lc_mds_sids[i].len) ||
   1073 			    (pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_len ==
   1074 			    gap->lc.lc_mds_sids[i].len &&
   1075 			    bcmp(pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_val,
   1076 			    gap->lc.lc_mds_sids[i].val,
   1077 			    gap->lc.lc_mds_sids[i].len) < 0)) {
   1078 				for (j = gap->found; j > i; j--) {
   1079 					gap->lc.lc_mds_sids[j].len =
   1080 					    gap->lc.lc_mds_sids[j - 1].len;
   1081 					gap->lc.lc_mds_sids[j - 1].val =
   1082 					    gap->lc.lc_mds_sids[j].val;
   1083 				}
   1084 
   1085 				break;
   1086 			}
   1087 		}
   1088 
   1089 		/*
   1090 		 * Either we found it and i is where it goes or we didn't
   1091 		 * find it and i is the tail. Either way, same thing happens!
   1092 		 */
   1093 		gap->lc.lc_mds_sids[i].len =
   1094 		    pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_len;
   1095 		gap->lc.lc_mds_sids[i].val =
   1096 		    kmem_alloc(gap->lc.lc_mds_sids[i].len, KM_SLEEP);
   1097 		bcopy(pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_val,
   1098 		    gap->lc.lc_mds_sids[i].val,
   1099 		    gap->lc.lc_mds_sids[i].len);
   1100 
   1101 		gap->found++;
   1102 	}
   1103 }
   1104 
   1105 int mds_default_stripe = 32;
   1106 
   1107 mds_layout_t *
   1108 mds_gen_default_layout(nfs_server_instance_t *instp)
   1109 {
   1110 	struct mds_gather_args	gap;
   1111 	mds_layout_t		*lp;
   1112 
   1113 	int			i;
   1114 
   1115 	bzero(&gap, sizeof (gap));
   1116 
   1117 	gap.found = 0;
   1118 
   1119 	rw_enter(&instp->ds_guid_info_lock, RW_READER);
   1120 	gap.lc.lc_stripe_count = instp->ds_guid_info_count;
   1121 	rw_exit(&instp->ds_guid_info_lock);
   1122 
   1123 	gap.lc.lc_mds_sids = kmem_zalloc(gap.lc.lc_stripe_count *
   1124 	    sizeof (mds_sid), KM_SLEEP);
   1125 
   1126 	rw_enter(&instp->ds_guid_info_lock, RW_READER);
   1127 	rfs4_dbe_walk(instp->ds_guid_info_tab, mds_gather_mds_sids, &gap);
   1128 	rw_exit(&instp->ds_guid_info_lock);
   1129 
   1130 	/*
   1131 	 * If we didn't find any devices then we do no service
   1132 	 */
   1133 	if (gap.found == 0) {
   1134 		kmem_free(gap.lc.lc_mds_sids, gap.lc.lc_stripe_count *
   1135 		    sizeof (mds_sid));
   1136 		return (NULL);
   1137 	}
   1138 
   1139 	/*
   1140 	 * XXX: What if found != stripe_count ?
   1141 	 */
   1142 
   1143 	gap.lc.lc_stripe_unit = mds_default_stripe * 1024;
   1144 
   1145 	rw_enter(&instp->mds_layout_lock, RW_WRITER);
   1146 	lp = (mds_layout_t *)rfs4_dbcreate(instp->mds_layout_idx,
   1147 	    (void *)&gap.lc);
   1148 	if (lp) {
   1149 		instp->mds_layout_default_idx = lp->mlo_id;
   1150 	}
   1151 	rw_exit(&instp->mds_layout_lock);
   1152 
   1153 	for (i = 0; i < gap.lc.lc_stripe_count; i++) {
   1154 		kmem_free(gap.lc.lc_mds_sids[i].val,
   1155 		    gap.lc.lc_mds_sids[i].len);
   1156 	}
   1157 
   1158 	kmem_free(gap.lc.lc_mds_sids, gap.lc.lc_stripe_count *
   1159 	    sizeof (mds_sid));
   1160 	return (lp);
   1161 }
   1162 
   1163 /* ================================================================ */
   1164 
   1165 
   1166 /*
   1167  * Given a layout, which now is comprised of mds_dataset_ids, instead of
   1168  * devices, generate the list of devices...
   1169  */
   1170 static mds_mpd_t *
   1171 mds_gen_mpd(nfs_server_instance_t *instp, mds_layout_t *lp)
   1172 {
   1173 	nfsv4_1_file_layout_ds_addr4	ds_dev;
   1174 
   1175 	/*
   1176 	 * The key to understanding the way these data structures
   1177 	 * interact is that map points to ds_dev. And map is stuck
   1178 	 * into the mds_mpd_idx database.
   1179 	 */
   1180 	mds_addmpd_t	map = { .id = 0, .ds_addr4 = &ds_dev };
   1181 	mds_mpd_t	*mp = NULL;
   1182 	uint_t		len;
   1183 	int		 i, iLoaded = 0;
   1184 	uint32_t	*sivp;
   1185 	multipath_list4	*mplp;
   1186 
   1187 	ds_addrlist_t	**adp = NULL;
   1188 
   1189 	ASSERT(instp->mds_mpd_id_space != NULL);
   1190 	map.id = id_alloc(instp->mds_mpd_id_space);
   1191 
   1192 	/*
   1193 	 * build a nfsv4_1_file_layout_ds_addr4, encode it and
   1194 	 * cache it in state_store.
   1195 	 */
   1196 	len = lp->mlo_lc.lc_stripe_count;
   1197 
   1198 	/* allocate space for the indices */
   1199 	sivp = ds_dev.nflda_stripe_indices.nflda_stripe_indices_val =
   1200 	    kmem_zalloc(len * sizeof (uint32_t), KM_SLEEP);
   1201 
   1202 	ds_dev.nflda_stripe_indices.nflda_stripe_indices_len = len;
   1203 
   1204 	/* populate the stripe indices */
   1205 	for (i = 0; i < len; i++)
   1206 		sivp[i] = i;
   1207 
   1208 	/*
   1209 	 * allocate space for the multipath_list4 (for now we just
   1210 	 * have the one path)
   1211 	 */
   1212 	mplp = ds_dev.nflda_multipath_ds_list.nflda_multipath_ds_list_val =
   1213 	    kmem_zalloc(len * sizeof (multipath_list4), KM_SLEEP);
   1214 
   1215 	ds_dev.nflda_multipath_ds_list.nflda_multipath_ds_list_len = len;
   1216 
   1217 	adp = kmem_zalloc(len * sizeof (ds_addrlist_t *), KM_SLEEP);
   1218 
   1219 	/*
   1220 	 * Now populate the netaddrs using the stashed ds_addr
   1221 	 * pointers
   1222 	 */
   1223 	for (i = 0; i < len; i++) {
   1224 		ds_addrlist_t	*dp;
   1225 
   1226 		mplp[i].multipath_list4_len = 1;
   1227 		dp = mds_find_ds_addrlist_by_mds_sid(instp,
   1228 		    &lp->mlo_lc.lc_mds_sids[i]);
   1229 		if (!dp) {
   1230 			iLoaded = i;
   1231 			goto cleanup;
   1232 		}
   1233 
   1234 		mplp[i].multipath_list4_val = &dp->dev_addr;
   1235 		adp[i] = dp;
   1236 	}
   1237 
   1238 	iLoaded = len;
   1239 
   1240 	/*
   1241 	 * Add the multipath_list4, this will encode and cache
   1242 	 * the result.
   1243 	 */
   1244 	rw_enter(&instp->mds_mpd_lock, RW_WRITER);
   1245 
   1246 	/*
   1247 	 * XXX: Each layout has its own mpd.
   1248 	 *
   1249 	 * Note that we should fix this....
   1250 	 */
   1251 	mp = (mds_mpd_t *)rfs4_dbcreate(instp->mds_mpd_idx, (void *)&map);
   1252 	if (mp) {
   1253 		lp->mlo_mpd_id = mp->mpd_id;
   1254 
   1255 		/*
   1256 		 * Put the layout on the layouts list.
   1257 		 * Note that we don't decrement the refcnt
   1258 		 * here, we keep a hold on it for inserting
   1259 		 * this layout on it.
   1260 		 */
   1261 		list_insert_tail(&mp->mpd_layouts_list, lp);
   1262 	}
   1263 
   1264 	rw_exit(&instp->mds_mpd_lock);
   1265 
   1266 cleanup:
   1267 
   1268 	for (i = 0; i < iLoaded; i++) {
   1269 		rfs4_dbe_rele(adp[i]->dbe);
   1270 	}
   1271 
   1272 	kmem_free(adp, len * sizeof (ds_addrlist_t *));
   1273 	kmem_free(mplp, len * sizeof (multipath_list4));
   1274 	kmem_free(sivp, len * sizeof (uint32_t));
   1275 
   1276 	if (mp == NULL)
   1277 		id_free(instp->mds_mpd_id_space, map.id);
   1278 
   1279 	return (mp);
   1280 }
   1281 
   1282 void
   1283 mds_nuke_layout(nfs_server_instance_t *instp, uint32_t mlo_id)
   1284 {
   1285 	bool_t create = FALSE;
   1286 	rfs4_entry_t e;
   1287 
   1288 	rw_enter(&instp->mds_layout_lock, RW_WRITER);
   1289 	if ((e = rfs4_dbsearch(instp->mds_layout_ID_idx,
   1290 	    (void *)(uintptr_t)mlo_id,
   1291 	    &create,
   1292 	    NULL,
   1293 	    RFS4_DBS_VALID)) != NULL) {
   1294 		rfs4_dbe_invalidate(e->dbe);
   1295 		rfs4_dbe_rele(e->dbe);
   1296 	}
   1297 	rw_exit(&instp->mds_layout_lock);
   1298 }
   1299 
   1300 /*ARGSUSED*/
   1301 static bool_t
   1302 mds_layout_create(rfs4_entry_t u_entry, void *arg)
   1303 {
   1304 	mds_layout_t	*lp = (mds_layout_t *)u_entry;
   1305 	layout_core_t	*lc = (layout_core_t *)arg;
   1306 
   1307 	nfs_server_instance_t *instp;
   1308 	int i;
   1309 	bool_t rc = TRUE;
   1310 
   1311 	instp = dbe_to_instp(lp->mlo_dbe);
   1312 
   1313 	lp->mlo_id = rfs4_dbe_getid(lp->mlo_dbe);
   1314 
   1315 	lp->mlo_type = LAYOUT4_NFSV4_1_FILES;
   1316 	lp->mlo_lc.lc_stripe_unit = lc->lc_stripe_unit;
   1317 	lp->mlo_lc.lc_stripe_count = lc->lc_stripe_count;
   1318 
   1319 	lp->mlo_lc.lc_mds_sids = kmem_zalloc(lp->mlo_lc.lc_stripe_count *
   1320 	    sizeof (mds_sid), KM_SLEEP);
   1321 
   1322 	for (i = 0; i < lp->mlo_lc.lc_stripe_count; i++) {
   1323 		lp->mlo_lc.lc_mds_sids[i].len = lc->lc_mds_sids[i].len;
   1324 		lp->mlo_lc.lc_mds_sids[i].val =
   1325 		    kmem_alloc(lp->mlo_lc.lc_mds_sids[i].len, KM_SLEEP);
   1326 		bcopy(lc->lc_mds_sids[i].val, lp->mlo_lc.lc_mds_sids[i].val,
   1327 		    lp->mlo_lc.lc_mds_sids[i].len);
   1328 	}
   1329 
   1330 	/* Need to generate a device for this layout */
   1331 	lp->mlo_mpd = mds_gen_mpd(instp, lp);
   1332 	if (lp->mlo_mpd == NULL) {
   1333 		for (i = 0; i < lp->mlo_lc.lc_stripe_count; i++) {
   1334 			kmem_free(lp->mlo_lc.lc_mds_sids[i].val,
   1335 			    lp->mlo_lc.lc_mds_sids[i].len);
   1336 		}
   1337 
   1338 		kmem_free(lp->mlo_lc.lc_mds_sids, lp->mlo_lc.lc_stripe_count *
   1339 		    sizeof (mds_sid));
   1340 		lp->mlo_lc.lc_mds_sids = NULL;
   1341 		rc = FALSE;
   1342 	}
   1343 
   1344 	return (rc);
   1345 }
   1346 
   1347 /*ARGSUSED*/
   1348 static void
   1349 mds_layout_destroy(rfs4_entry_t u_entry)
   1350 {
   1351 	mds_layout_t		*lp = (mds_layout_t *)u_entry;
   1352 	nfs_server_instance_t	*instp;
   1353 	int			i;
   1354 
   1355 	instp = dbe_to_instp(u_entry->dbe);
   1356 
   1357 	rw_enter(&instp->mds_mpd_lock, RW_WRITER);
   1358 	if (lp->mlo_mpd != NULL) {
   1359 		list_remove(&lp->mlo_mpd->mpd_layouts_list, lp);
   1360 		rfs4_dbe_rele(lp->mlo_mpd->mpd_dbe);
   1361 		lp->mlo_mpd = NULL;
   1362 	}
   1363 	rw_exit(&instp->mds_mpd_lock);
   1364 
   1365 	if (lp->mlo_lc.lc_mds_sids != NULL) {
   1366 		for (i = 0; i < lp->mlo_lc.lc_stripe_count; i++) {
   1367 			kmem_free(lp->mlo_lc.lc_mds_sids[i].val,
   1368 			    lp->mlo_lc.lc_mds_sids[i].len);
   1369 		}
   1370 
   1371 		kmem_free(lp->mlo_lc.lc_mds_sids, lp->mlo_lc.lc_stripe_count *
   1372 		    sizeof (mds_sid));
   1373 		lp->mlo_lc.lc_mds_sids = NULL;
   1374 	}
   1375 }
   1376 
   1377 mds_layout_t *
   1378 mds_add_layout(layout_core_t *lc)
   1379 {
   1380 	bool_t create = FALSE;
   1381 	mds_layout_t *lp;
   1382 
   1383 	rw_enter(&mds_server->mds_layout_lock, RW_WRITER);
   1384 
   1385 	/*
   1386 	 * If it is already in memory, then we can just
   1387 	 * bump the refcnt.
   1388 	 */
   1389 	lp = (mds_layout_t *)rfs4_dbsearch(mds_server->mds_layout_idx,
   1390 	    (void *)lc, &create, NULL,
   1391 	    RFS4_DBS_VALID);
   1392 	if (lp != NULL) {
   1393 		rw_exit(&mds_server->mds_layout_lock);
   1394 		return (lp);
   1395 	}
   1396 
   1397 	lp = (mds_layout_t *)rfs4_dbcreate(mds_server->mds_layout_idx,
   1398 	    (void *)lc);
   1399 	rw_exit(&mds_server->mds_layout_lock);
   1400 
   1401 	if (lp == NULL) {
   1402 		printf("mds_add_layout: failed\n");
   1403 		(void) set_errno(EFAULT);
   1404 	}
   1405 
   1406 	return (lp);
   1407 }
   1408 
   1409 #define	ADDRHASH(key) ((unsigned long)(key) >> 3)
   1410 
   1411 /*
   1412  * -----------------------------------------------
   1413  * MDS: Layout Grant tables.
   1414  * -----------------------------------------------
   1415  *
   1416  */
   1417 static uint32_t
   1418 mds_layout_grant_hash(void *key)
   1419 {
   1420 	mds_layout_grant_t *lg = (mds_layout_grant_t *)key;
   1421 
   1422 	return (ADDRHASH(lg->lo_cp) ^ ADDRHASH(lg->lo_fp));
   1423 }
   1424 
   1425 static bool_t
   1426 mds_layout_grant_compare(rfs4_entry_t u_entry, void *key)
   1427 {
   1428 	mds_layout_grant_t *lg = (mds_layout_grant_t *)u_entry;
   1429 	mds_layout_grant_t *klg = (mds_layout_grant_t *)key;
   1430 
   1431 	return (lg->lo_cp == klg->lo_cp && lg->lo_fp == klg->lo_fp);
   1432 }
   1433 
   1434 static void *
   1435 mds_layout_grant_mkkey(rfs4_entry_t entry)
   1436 {
   1437 	return (entry);
   1438 }
   1439 
   1440 #ifdef NOT_USED_NOW
   1441 static uint32_t
   1442 mds_layout_grant_id_hash(void *key)
   1443 {
   1444 	stateid_t *id = (stateid_t *)key;
   1445 
   1446 	return (id->v41_bits.state_ident);
   1447 }
   1448 
   1449 static bool_t
   1450 mds_layout_grant_id_compare(rfs4_entry_t entry, void *key)
   1451 {
   1452 	mds_layout_grant_t *lg = (mds_layout_grant_t *)entry;
   1453 	stateid_t *id = (stateid_t *)key;
   1454 	bool_t rc;
   1455 
   1456 	if (id->v41_bits.type != LAYOUTID)
   1457 		return (FALSE);
   1458 
   1459 	rc = (lg->lo_stateid.v41_bits.boottime == id->v41_bits.boottime &&
   1460 	    lg->lo_stateid.v41_bits.state_ident == id->v41_bits.state_ident);
   1461 
   1462 	return (rc);
   1463 }
   1464 
   1465 static void *
   1466 mds_layout_grant_id_mkkey(rfs4_entry_t entry)
   1467 {
   1468 	mds_layout_grant_t *lg = (mds_layout_grant_t *)entry;
   1469 
   1470 	return (&lg->lo_stateid);
   1471 }
   1472 #endif
   1473 
   1474 /*ARGSUSED*/
   1475 static bool_t
   1476 mds_layout_grant_create(rfs4_entry_t u_entry, void *arg)
   1477 {
   1478 	mds_layout_grant_t *lg = (mds_layout_grant_t *)u_entry;
   1479 	rfs4_file_t *fp = ((mds_layout_grant_t *)arg)->lo_fp;
   1480 	rfs4_client_t *cp = ((mds_layout_grant_t *)arg)->lo_cp;
   1481 
   1482 	/*
   1483 	 * We hold onto the rfs4_file_t until we are done with it.
   1484 	 */
   1485 	rfs4_dbe_hold(fp->rf_dbe);
   1486 
   1487 	lg->lo_status = LO_GRANTED;
   1488 	lg->lo_stateid = mds_create_stateid(lg->lo_dbe, LAYOUTID);
   1489 	lg->lo_fp = fp;
   1490 	lg->lo_cp = cp;
   1491 	lg->lor_seqid = lg->lor_reply = 0;
   1492 	mutex_init(&lg->lo_lock, NULL, MUTEX_DEFAULT, NULL);
   1493 
   1494 	/* Init layout grant lists for remque/insque */
   1495 	lg->lo_grant_list.next = lg->lo_grant_list.prev =
   1496 	    &lg->lo_grant_list;
   1497 	lg->lo_grant_list.lg = lg;
   1498 
   1499 	lg->lo_clientgrantlist.next = lg->lo_clientgrantlist.prev =
   1500 	    &lg->lo_clientgrantlist;
   1501 	lg->lo_clientgrantlist.lg = lg;
   1502 
   1503 	lg->lo_range = nfs_range_create();
   1504 
   1505 	return (TRUE);
   1506 }
   1507 
   1508 /*ARGSUSED*/
   1509 static void
   1510 mds_layout_grant_destroy(rfs4_entry_t entry)
   1511 {
   1512 	mds_layout_grant_t *lg = (mds_layout_grant_t *)entry;
   1513 
   1514 	/*
   1515 	 * The code which invalidated this node should have
   1516 	 * gone ahead and released the rfs4_file_t.
   1517 	 */
   1518 	ASSERT(lg->lo_fp == NULL);
   1519 
   1520 	mutex_destroy(&lg->lo_lock);
   1521 
   1522 	nfs_range_destroy(lg->lo_range);
   1523 	lg->lo_range = NULL;
   1524 }
   1525 
   1526 mds_layout_grant_t *
   1527 rfs41_findlogrant(struct compound_state *cs, rfs4_file_t *fp,
   1528     rfs4_client_t *cp, bool_t *create)
   1529 {
   1530 	mds_layout_grant_t args, *lg;
   1531 
   1532 	args.lo_cp = cp;
   1533 	args.lo_fp = fp;
   1534 
   1535 	lg = (mds_layout_grant_t *)rfs4_dbsearch(
   1536 	    cs->instp->mds_layout_grant_idx, &args, create,
   1537 	    &args, RFS4_DBS_VALID);
   1538 
   1539 	return (lg);
   1540 }
   1541 
   1542 void
   1543 rfs41_lo_grant_hold(mds_layout_grant_t *lg)
   1544 {
   1545 	rfs4_dbe_hold(lg->lo_dbe);
   1546 }
   1547 
   1548 void
   1549 rfs41_lo_grant_rele(mds_layout_grant_t *lg)
   1550 {
   1551 	rfs4_dbe_rele(lg->lo_dbe);
   1552 }
   1553 
   1554 /*
   1555  * -----------------------------------------------
   1556  * MDS: Ever Grant tables.
   1557  * -----------------------------------------------
   1558  *
   1559  */
   1560 static uint32_t
   1561 mds_ever_grant_hash(void *key)
   1562 {
   1563 	mds_ever_grant_t *eg = (mds_ever_grant_t *)key;
   1564 
   1565 	return (ADDRHASH(eg->eg_cp) ^ ADDRHASH(eg->eg_key));
   1566 }
   1567 
   1568 static bool_t
   1569 mds_ever_grant_compare(rfs4_entry_t u_entry, void *key)
   1570 {
   1571 	mds_ever_grant_t *eg = (mds_ever_grant_t *)u_entry;
   1572 	mds_ever_grant_t *keg = (mds_ever_grant_t *)key;
   1573 
   1574 	return (eg->eg_cp == keg->eg_cp &&
   1575 	    eg->eg_fsid.val[0] == keg->eg_fsid.val[0] &&
   1576 	    eg->eg_fsid.val[1] == keg->eg_fsid.val[1]);
   1577 }
   1578 
   1579 static void *
   1580 mds_ever_grant_mkkey(rfs4_entry_t entry)
   1581 {
   1582 	return (entry);
   1583 }
   1584 
   1585 static bool_t
   1586 mds_ever_grant_fsid_compare(rfs4_entry_t entry, void *key)
   1587 {
   1588 	mds_ever_grant_t *eg = (mds_ever_grant_t *)entry;
   1589 	int64_t g_key = (int64_t)(uintptr_t)key;
   1590 
   1591 	return (eg->eg_key == g_key);
   1592 }
   1593 
   1594 #ifdef NOT_USED_NOW
   1595 static uint32_t
   1596 mds_ever_grant_fsid_hash(void *key)
   1597 {
   1598 	return ((uint32_t)(uintptr_t)key);
   1599 }
   1600 
   1601 static void *
   1602 mds_ever_grant_fsid_mkkey(rfs4_entry_t entry)
   1603 {
   1604 	mds_ever_grant_t *eg = (mds_ever_grant_t *)entry;
   1605 
   1606 	return ((void*)(uintptr_t)eg->eg_key);
   1607 }
   1608 #endif
   1609 
   1610 /*ARGSUSED*/
   1611 static bool_t
   1612 mds_ever_grant_create(rfs4_entry_t u_entry, void *arg)
   1613 {
   1614 	mds_ever_grant_t *eg = (mds_ever_grant_t *)u_entry;
   1615 	rfs4_client_t *cp = ((mds_ever_grant_t *)arg)->eg_cp;
   1616 
   1617 	eg->eg_cp = cp;
   1618 	eg->eg_fsid = ((mds_ever_grant_t *)arg)->eg_fsid;
   1619 
   1620 	return (TRUE);
   1621 }
   1622 
   1623 /*ARGSUSED*/
   1624 static void
   1625 mds_ever_grant_destroy(rfs4_entry_t foo)
   1626 {
   1627 }
   1628 
   1629 mds_ever_grant_t *
   1630 rfs41_findevergrant(rfs4_client_t *cp, vnode_t *vp, bool_t *create)
   1631 {
   1632 	nfs_server_instance_t *instp;
   1633 	mds_ever_grant_t args, *eg;
   1634 
   1635 	instp = dbe_to_instp(cp->rc_dbe);
   1636 	args.eg_cp = cp;
   1637 	args.eg_fsid = vp->v_vfsp->vfs_fsid;
   1638 
   1639 	eg = (mds_ever_grant_t *)rfs4_dbsearch(
   1640 	    instp->mds_ever_grant_idx, &args, create, &args,
   1641 	    RFS4_DBS_VALID);
   1642 
   1643 	return (eg);
   1644 }
   1645 
   1646 void
   1647 rfs41_ever_grant_rele(mds_ever_grant_t *eg)
   1648 {
   1649 	rfs4_dbe_rele(eg->eg_dbe);
   1650 }
   1651 
   1652 void
   1653 mds_kill_eg_callout(rfs4_entry_t u_entry, void *arg)
   1654 {
   1655 	mds_ever_grant_t *eg = (mds_ever_grant_t *)u_entry;
   1656 	rfs4_client_t *cp = (rfs4_client_t *)arg;
   1657 
   1658 	if (eg->eg_cp == cp) {
   1659 		eg->eg_cp = NULL;
   1660 		rfs4_dbe_invalidate(eg->eg_dbe);
   1661 		rfs4_dbe_rele_nolock(eg->eg_dbe);
   1662 	}
   1663 }
   1664 
   1665 void
   1666 mds_clean_up_grants(rfs4_client_t *cp)
   1667 {
   1668 	mds_layout_grant_t *lg;
   1669 	nfs_server_instance_t *instp;
   1670 
   1671 	rfs4_dbe_lock(cp->rc_dbe);
   1672 	while (cp->rc_clientgrantlist.next->lg != NULL) {
   1673 		lg = cp->rc_clientgrantlist.next->lg;
   1674 		remque(&lg->lo_clientgrantlist);
   1675 		lg->lo_clientgrantlist.next = lg->lo_clientgrantlist.prev =
   1676 		    &lg->lo_clientgrantlist;
   1677 		lg->lo_cp = NULL;
   1678 
   1679 		rfs4_dbe_lock(lg->lo_fp->rf_dbe);
   1680 		remque(&lg->lo_grant_list);
   1681 		rfs4_dbe_unlock(lg->lo_fp->rf_dbe);
   1682 
   1683 		lg->lo_grant_list.next = lg->lo_grant_list.prev =
   1684 		    &lg->lo_grant_list;
   1685 		rfs4_file_rele(lg->lo_fp);
   1686 
   1687 		lg->lo_fp = NULL;
   1688 		rfs4_dbe_invalidate(lg->lo_dbe);
   1689 		rfs41_lo_grant_rele(lg);
   1690 	}
   1691 
   1692 	instp = dbe_to_instp(cp->rc_dbe);
   1693 	rfs4_dbe_unlock(cp->rc_dbe);
   1694 
   1695 	rw_enter(&instp->mds_ever_grant_lock, RW_READER);
   1696 	rfs4_dbe_walk(instp->mds_ever_grant_tab, mds_kill_eg_callout, cp);
   1697 	rw_exit(&instp->mds_ever_grant_lock);
   1698 }
   1699 
   1700 struct grant_arg {
   1701 	rfs4_client_t *cp;
   1702 	vnode_t *vp;
   1703 };
   1704 
   1705 void
   1706 mds_rm_grant_callout(rfs4_entry_t u_entry, void *arg)
   1707 {
   1708 	mds_layout_grant_t	*lg = (mds_layout_grant_t *)u_entry;
   1709 	struct grant_arg	*ga = (struct grant_arg *)arg;
   1710 	vnode_t			*vp;
   1711 
   1712 	if (rfs4_dbe_skip_or_invalid(lg->lo_dbe)) {
   1713 		ASSERT(lg->lo_fp == NULL);
   1714 		return;
   1715 	}
   1716 
   1717 	ASSERT(lg->lo_fp != NULL);
   1718 	vp = lg->lo_fp->rf_vp;
   1719 
   1720 	if (ga->cp == lg->lo_cp && vp && ga->vp->v_vfsp == vp->v_vfsp) {
   1721 		rfs4_dbe_lock(lg->lo_cp->rc_dbe);
   1722 		remque(&lg->lo_clientgrantlist);
   1723 		rfs4_dbe_unlock(lg->lo_cp->rc_dbe);
   1724 
   1725 		lg->lo_clientgrantlist.next = lg->lo_clientgrantlist.prev =
   1726 		    &lg->lo_clientgrantlist;
   1727 		lg->lo_cp = NULL;
   1728 
   1729 		rfs4_dbe_lock(lg->lo_fp->rf_dbe);
   1730 		remque(&lg->lo_grant_list);
   1731 		rfs4_dbe_unlock(lg->lo_fp->rf_dbe);
   1732 
   1733 		lg->lo_grant_list.next = lg->lo_grant_list.prev =
   1734 		    &lg->lo_grant_list;
   1735 		rfs4_file_rele(lg->lo_fp);
   1736 
   1737 		lg->lo_fp = NULL;
   1738 		rfs4_dbe_invalidate(lg->lo_dbe);
   1739 		rfs4_dbe_rele_nolock(lg->lo_dbe);
   1740 	}
   1741 }
   1742 
   1743 void
   1744 mds_clean_grants_by_fsid(rfs4_client_t *cp, vnode_t *vp)
   1745 {
   1746 	struct grant_arg ga;
   1747 	nfs_server_instance_t *instp;
   1748 
   1749 	ga.cp = cp;
   1750 	ga.vp = vp;
   1751 	instp = dbe_to_instp(cp->rc_dbe);
   1752 
   1753 	rw_enter(&instp->mds_layout_grant_lock, RW_READER);
   1754 	rfs4_dbe_walk(instp->mds_layout_grant_tab, mds_rm_grant_callout, &ga);
   1755 	rw_exit(&instp->mds_layout_grant_lock);
   1756 }
   1757 
   1758 /*
   1759  * Conforms to Section 12.5.5.2.1.4 of draft-25
   1760  */
   1761 void
   1762 rfs41_lo_seqid(stateid_t *sp)
   1763 {
   1764 	if (sp == NULL)
   1765 		return;
   1766 
   1767 	if ((sp->v41_bits.chgseq + 1) & (uint32_t)~0)
   1768 		atomic_inc_32(&sp->v41_bits.chgseq);
   1769 	else
   1770 		(void) atomic_swap_32(&sp->v41_bits.chgseq, 1);
   1771 }
   1772 
   1773 bool_t
   1774 rfs41_lo_still_granted(mds_layout_grant_t *lg)
   1775 {
   1776 	bool_t	found = TRUE;
   1777 
   1778 	/*
   1779 	 * We currently have the layout grant, but is it still valid?
   1780 	 * If it has been returned, then the status will be updated as
   1781 	 * returned or recalled.  However, it is possible that the client
   1782 	 * has gone away while we are still holding this.  When the client
   1783 	 * is cleaned up, the pointer to the client and the file will be
   1784 	 * set to NULL and it will have been removed from all lists, waiting
   1785 	 * to be released and reaped.  In this case, the status may not
   1786 	 * have been updated.
   1787 	 */
   1788 	rfs4_dbe_lock(lg->lo_dbe);
   1789 	if (lg->lo_status == LO_RETURNED || lg->lo_status == LO_RECALLED ||
   1790 	    lg->lo_cp == NULL)
   1791 		found = FALSE;
   1792 	rfs4_dbe_unlock(lg->lo_dbe);
   1793 
   1794 	return (found);
   1795 }
   1796 
   1797 static void
   1798 rfs41_revoke_layout(mds_layout_grant_t *lg)
   1799 {
   1800 	cmn_err(CE_NOTE, "rfs41_revoke_layout: layout revoked");
   1801 	rfs41_seq4_hold(&lg->lo_cp->rc_seq4,
   1802 	    SEQ4_STATUS_RECALLABLE_STATE_REVOKED);
   1803 
   1804 	/* XXX - rest of this function TBD */
   1805 }
   1806 
   1807 static void
   1808 mds_do_lorecall(mds_lorec_t *lorec)
   1809 {
   1810 	CB_COMPOUND4args	 cb4_args;
   1811 	CB_COMPOUND4res		 cb4_res;
   1812 	CB_SEQUENCE4args	*cbsap;
   1813 	CB_LAYOUTRECALL4args	*cblrap;
   1814 	nfs_cb_argop4		*argops;
   1815 	struct timeval		 timeout;
   1816 	enum clnt_stat		 call_stat = RPC_FAILED;
   1817 	int			 zilch = 0;
   1818 	layoutrecall_file4	*lorf;
   1819 	CLIENT			*ch;
   1820 	int			 numops;
   1821 	int			 argsz;
   1822 	mds_session_t		*sp;
   1823 	slot_ent_t		*p;
   1824 	mds_layout_grant_t	*lg;
   1825 	uint32_t		 sc = 0;
   1826 	int			 retried = 0;
   1827 
   1828 	DTRACE_PROBE1(nfssrv__i__sess_lorecall_fh, mds_lorec_t *, lorec);
   1829 	if ((sp = lorec->lor_sess) == NULL) {
   1830 		kmem_free(lorec, sizeof (mds_lorec_t));
   1831 		return;
   1832 
   1833 	} else if (!SN_CB_CHAN_EST(sp)) {
   1834 		kmem_free(lorec, sizeof (mds_lorec_t));
   1835 		rfs41_session_rele(sp);
   1836 		return;
   1837 	}
   1838 
   1839 	/*
   1840 	 * Per-type pre-processing
   1841 	 */
   1842 	switch (lorec->lor_type) {
   1843 	case LAYOUTRECALL4_FILE:
   1844 		if (lorec->lor_lg == NULL)
   1845 			return;
   1846 		lg = lorec->lor_lg;
   1847 		break;
   1848 
   1849 	case LAYOUTRECALL4_FSID:
   1850 		sp->sn_clnt->rc_bulk_recall = LAYOUTRETURN4_FSID;
   1851 		break;
   1852 
   1853 	case LAYOUTRECALL4_ALL:
   1854 		sp->sn_clnt->rc_bulk_recall = LAYOUTRETURN4_ALL;
   1855 		break;
   1856 	default:
   1857 		break;
   1858 	}
   1859 
   1860 	/*
   1861 	 * set up the compound args
   1862 	 */
   1863 	numops = 2;	/* CB_SEQUENCE + CB_LAYOUTRECALL */
   1864 	argsz = numops * sizeof (nfs_cb_argop4);
   1865 	argops = kmem_zalloc(argsz, KM_SLEEP);
   1866 
   1867 	argops[0].argop = OP_CB_SEQUENCE;
   1868 	cbsap = &argops[0].nfs_cb_argop4_u.opcbsequence;
   1869 
   1870 	argops[1].argop = OP_CB_LAYOUTRECALL;
   1871 	cblrap = &argops[1].nfs_cb_argop4_u.opcblayoutrecall;
   1872 
   1873 	(void) str_to_utf8("cb_lo_recall", &cb4_args.tag);
   1874 	cb4_args.minorversion = CB4_MINOR_v1;
   1875 
   1876 	cb4_args.callback_ident = sp->sn_bc.progno;
   1877 	cb4_args.array_len = numops;
   1878 	cb4_args.array = argops;
   1879 
   1880 	cb4_res.tag.utf8string_val = NULL;
   1881 	cb4_res.array = NULL;
   1882 
   1883 	/*
   1884 	 * CB_SEQUENCE
   1885 	 */
   1886 	bcopy(sp->sn_sessid, cbsap->csa_sessionid, sizeof (sessionid4));
   1887 	p = svc_slot_alloc(sp);
   1888 	mutex_enter(&p->se_lock);
   1889 	cbsap->csa_slotid = p->se_sltno;
   1890 	cbsap->csa_sequenceid = p->se_seqid;
   1891 	cbsap->csa_highest_slotid = svc_slot_maxslot(sp);
   1892 	cbsap->csa_cachethis = FALSE;
   1893 
   1894 	/* no referring calling list for lo recall */
   1895 	cbsap->csa_rcall_llen = 0;
   1896 	cbsap->csa_rcall_lval = NULL;
   1897 	mutex_exit(&p->se_lock);
   1898 
   1899 	/*
   1900 	 * CB_LAYOUTRECALL
   1901 	 *
   1902 	 * clora_change:
   1903 	 *	1: server prefers that client write modified data through
   1904 	 *	   MDS when pushing modified data due to layout recall
   1905 	 *	0: server has no DS/MDS preference
   1906 	 */
   1907 	cblrap->clora_type = LAYOUT4_NFSV4_1_FILES;
   1908 	cblrap->clora_iomode = LAYOUTIOMODE4_ANY;
   1909 	cblrap->clora_changed = 0;
   1910 	cblrap->clora_recall.lor_recalltype = lorec->lor_type;
   1911 
   1912 	switch (lorec->lor_type) {
   1913 	case LAYOUTRECALL4_FILE:
   1914 		lorf = &cblrap->clora_recall.layoutrecall4_u.lor_layout;
   1915 		lorf->lor_offset = 0;
   1916 		lorf->lor_length = ONES_64;
   1917 		lorf->lor_fh.nfs_fh4_len = lorec->lor_fh.fh_len;
   1918 		lorf->lor_fh.nfs_fh4_val = (char *)&lorec->lor_fh.fh_buf;
   1919 		bcopy(&lorec->lor_stid, &lorf->lor_stateid, sizeof (stateid4));
   1920 		(void) atomic_swap_32(&lg->lor_reply, 0);
   1921 		break;
   1922 
   1923 	case LAYOUTRECALL4_FSID:
   1924 		cblrap->clora_recall.layoutrecall4_u.lor_fsid = lorec->lor_fsid;
   1925 		break;
   1926 
   1927 	case LAYOUTRECALL4_ALL:
   1928 	default:
   1929 		break;
   1930 	}
   1931 
   1932 	/*
   1933 	 * Set up the timeout for the callback and make the actual call.
   1934 	 * Timeout will be 80% of the lease period.
   1935 	 */
   1936 	timeout.tv_sec = (dbe_to_instp(sp->sn_dbe)->lease_period * 80) / 100;
   1937 	timeout.tv_usec = 0;
   1938 retry:
   1939 	ch = rfs41_cb_getch(sp);
   1940 	(void) CLNT_CONTROL(ch, CLSET_XID, (char *)&zilch);
   1941 	call_stat = clnt_call(ch, CB_COMPOUND,
   1942 	    xdr_CB_COMPOUND4args_srv, (caddr_t)&cb4_args,
   1943 	    xdr_CB_COMPOUND4res, (caddr_t)&cb4_res, timeout);
   1944 	rfs41_cb_freech(sp, ch);
   1945 
   1946 	if (call_stat != RPC_SUCCESS) {
   1947 		switch (lorec->lor_type) {
   1948 		case LAYOUTRECALL4_FILE:
   1949 			if (!retried)
   1950 				delay(SEC_TO_TICK(rfs4_lease_time));
   1951 
   1952 			if (rfs41_lo_still_granted(lg)) {
   1953 				if (!retried) {
   1954 					retried = 1;
   1955 					goto retry;
   1956 				}
   1957 
   1958 				/*
   1959 				 * We want to make sure that the layout is
   1960 				 * still granted lest we assert a SEQ4 flag
   1961 				 * that will never be turned off.
   1962 				 */
   1963 				rfs41_revoke_layout(lg);
   1964 			}
   1965 			sc = (call_stat == RPC_CANTSEND ||
   1966 			    call_stat == RPC_CANTRECV);
   1967 			rfs41_cb_path_down(sp, sc);
   1968 			goto done;
   1969 
   1970 		case LAYOUTRECALL4_FSID:
   1971 		case LAYOUTRECALL4_ALL:
   1972 			sp->sn_clnt->rc_bulk_recall = 0;
   1973 			/*
   1974 			 * XXX - how do we determine if layouts still
   1975 			 *	 outstanding for fsid/all cases ?
   1976 			 */
   1977 		default:
   1978 			break;
   1979 		}
   1980 
   1981 	} else {	/* RPC_SUCCESS */
   1982 
   1983 		/*
   1984 		 * Per-type results processing
   1985 		 */
   1986 		switch (lorec->lor_type) {
   1987 		case LAYOUTRECALL4_FILE:
   1988 			(void) atomic_swap_32(&lg->lor_reply, 1);
   1989 			break;
   1990 
   1991 		case LAYOUTRECALL4_FSID:
   1992 		case LAYOUTRECALL4_ALL:
   1993 		default:
   1994 			break;
   1995 		}
   1996 	}
   1997 
   1998 	if (cb4_res.status != NFS4_OK) {
   1999 		nfsstat4	s = cb4_res.status;
   2000 
   2001 		switch (s) {
   2002 		case NFS4ERR_BADHANDLE:
   2003 		case NFS4ERR_BADIOMODE:
   2004 		case NFS4ERR_BADXDR:
   2005 		case NFS4ERR_INVAL:
   2006 		case NFS4ERR_NOMATCHING_LAYOUT:
   2007 		case NFS4ERR_NOTSUPP:
   2008 		case NFS4ERR_OP_NOT_IN_SESSION:
   2009 		case NFS4ERR_REP_TOO_BIG:
   2010 		case NFS4ERR_REP_TOO_BIG_TO_CACHE:
   2011 		case NFS4ERR_REQ_TOO_BIG:
   2012 		case NFS4ERR_TOO_MANY_OPS:
   2013 		case NFS4ERR_UNKNOWN_LAYOUTTYPE:
   2014 		case NFS4ERR_WRONG_TYPE:
   2015 			/* What do we do when it's our own fault ? */
   2016 			cmn_err(CE_NOTE, "cb_lo_recall: %s", nfs41_strerror(s));
   2017 			break;
   2018 
   2019 		case NFS4ERR_DELAY:
   2020 			switch (lorec->lor_type) {
   2021 			case LAYOUTRECALL4_FILE:
   2022 				{
   2023 				bool_t	granted = FALSE;
   2024 
   2025 				if (!retried)
   2026 					delay(SEC_TO_TICK(rfs4_lease_time));
   2027 
   2028 				granted = rfs41_lo_still_granted(lg);
   2029 				if (!granted)
   2030 					break;
   2031 
   2032 				if (!retried) {
   2033 					retried = 1;
   2034 					goto retry;
   2035 				}
   2036 
   2037 				if (granted)
   2038 					rfs41_revoke_layout(lg);
   2039 				break;
   2040 				}
   2041 
   2042 			case LAYOUTRECALL4_FSID:
   2043 			case LAYOUTRECALL4_ALL:
   2044 			default:
   2045 				break;
   2046 			}
   2047 			break;
   2048 
   2049 		case NFS4ERR_BAD_STATEID:	/* XXX - retry BAD_STATEID ? */
   2050 		default:
   2051 			if (lorec->lor_type == LAYOUTRECALL4_FILE)
   2052 				if (rfs41_lo_still_granted(lg))
   2053 					rfs41_revoke_layout(lg);
   2054 			break;
   2055 		}
   2056 
   2057 	}
   2058 	svc_slot_cb_seqid(&cb4_res, p);
   2059 done:
   2060 	kmem_free(lorec, sizeof (mds_lorec_t));
   2061 	rfs4freeargres(&cb4_args, &cb4_res);
   2062 
   2063 	svc_slot_free(sp, p);
   2064 	rfs41_session_rele(sp);
   2065 
   2066 	/*
   2067 	 * Per-type post-processing
   2068 	 */
   2069 	switch (lorec->lor_type) {
   2070 	case LAYOUTRECALL4_FILE:
   2071 		rfs41_lo_grant_rele(lg);
   2072 		break;
   2073 
   2074 	case LAYOUTRECALL4_FSID:
   2075 	case LAYOUTRECALL4_ALL:
   2076 	default:
   2077 		break;
   2078 	}
   2079 }
   2080 
   2081 /*
   2082  * Bulk Layout Recall (ALL)
   2083  */
   2084 static void
   2085 all_lor(rfs4_entry_t entry, void *args)
   2086 {
   2087 	mds_session_t	*sp = (mds_session_t *)entry;
   2088 	mds_lorec_t	*lrp = (mds_lorec_t *)args;
   2089 	mds_lorec_t	*lorec;
   2090 
   2091 	if (sp == NULL || lrp == NULL)
   2092 		return;
   2093 
   2094 	ASSERT(rfs4_dbe_islocked(sp->sn_dbe));
   2095 	lorec = kmem_zalloc(sizeof (mds_lorec_t), KM_SLEEP);
   2096 	bcopy(args, lorec, sizeof (mds_lorec_t));
   2097 
   2098 	rfs4_dbe_hold(sp->sn_dbe);
   2099 	lorec->lor_sess = sp;
   2100 
   2101 	(void) thread_create(NULL, 0, mds_do_lorecall, lorec, 0, &p0, TS_RUN,
   2102 	    minclsyspri);
   2103 }
   2104 
   2105 /*
   2106  * Layout Recall by FSID
   2107  */
   2108 static void
   2109 fsid_lor(rfs4_entry_t u_entry, void *args)
   2110 {
   2111 	mds_lorec_t		*lrp = (mds_lorec_t *)args;
   2112 	mds_ever_grant_t	*eg = (mds_ever_grant_t *)u_entry;
   2113 	mds_ever_grant_t	key;
   2114 	vnode_t			*vp = NULL;
   2115 
   2116 	if (eg == NULL || lrp == NULL || rfs4_dbe_is_invalid(eg->eg_dbe))
   2117 		return;
   2118 
   2119 	ASSERT(rfs4_dbe_islocked(eg->eg_dbe));
   2120 	if ((vp = (vnode_t *)lrp->lor_vp) == NULL)
   2121 		return;
   2122 
   2123 	key.eg_fsid = vp->v_vfsp->vfs_fsid;
   2124 	if (mds_ever_grant_fsid_compare(u_entry,
   2125 	    (void *)(uintptr_t)key.eg_key)) {
   2126 		mds_lorec_t	*lorec;
   2127 		mds_session_t	*sp;
   2128 		nfs_server_instance_t	*instp;
   2129 
   2130 		instp = dbe_to_instp(u_entry->dbe);
   2131 
   2132 		lorec = kmem_zalloc(sizeof (mds_lorec_t), KM_SLEEP);
   2133 		bcopy(args, lorec, sizeof (mds_lorec_t));
   2134 
   2135 		ASSERT(eg->eg_cp != NULL);
   2136 		sp = mds_findsession_by_clid(instp, eg->eg_cp->rc_clientid);
   2137 		if (sp == NULL) {
   2138 			kmem_free(lorec, sizeof (mds_lorec_t));
   2139 			return;
   2140 		}
   2141 		lorec->lor_sess = sp;	/* hold courtesy of findsession */
   2142 
   2143 		(void) thread_create(NULL, 0, mds_do_lorecall, lorec, 0, &p0,
   2144 		    TS_RUN, minclsyspri);
   2145 	}
   2146 }
   2147 
   2148 /*
   2149  * Layout Recall by File
   2150  */
   2151 static void
   2152 file_lor(rfs4_entry_t entry, void *arg)
   2153 {
   2154 	mds_lorec_t *lorec;
   2155 
   2156 	lorec = kmem_alloc(sizeof (mds_lorec_t), KM_SLEEP);
   2157 	bcopy(arg, lorec, sizeof (mds_lorec_t));
   2158 	lorec->lor_sess = (mds_session_t *)entry;
   2159 
   2160 	(void) thread_create(NULL, 0, mds_do_lorecall, lorec, 0, &p0, TS_RUN,
   2161 	    minclsyspri);
   2162 }
   2163 
   2164 
   2165 /*
   2166  * Recall a layout:
   2167  *
   2168  *   Either all layouts
   2169  *
   2170  *   ... or
   2171  *
   2172  *   For a given pathname construct FH first (same thing we do
   2173  *   for nfs_sys(GETFH)) args have already been copied into kernel
   2174  *   adspace
   2175  */
   2176 static int
   2177 mds_lorecall_cmd(struct mds_reclo_args *args, cred_t *cr)
   2178 {
   2179 	int			 error;
   2180 	nfs_fh4			 fh4;
   2181 	struct exportinfo	*exi;
   2182 	mds_lorec_t		 lorec;
   2183 	vnode_t			*vp = NULL;
   2184 	vnode_t			*dvp = NULL;
   2185 	rfs4_file_t		*fp = NULL;
   2186 	rfs4_client_t		*cp = NULL;
   2187 	rfs41_grant_list_t	*glp = NULL;
   2188 	mds_session_t		*sp = NULL;
   2189 
   2190 	lorec.lor_type = args->lo_type;
   2191 	switch (args->lo_type) {
   2192 	case LAYOUTRECALL4_ALL:
   2193 		if (mds_server->mds_session_tab == NULL)
   2194 			return (ECANCELED);
   2195 
   2196 		rfs4_dbe_walk(mds_server->mds_session_tab, all_lor, &lorec);
   2197 		return (0);
   2198 
   2199 	case LAYOUTRECALL4_FILE:
   2200 	case LAYOUTRECALL4_FSID:
   2201 		break;
   2202 
   2203 	default:
   2204 		return (EINVAL);
   2205 	}
   2206 
   2207 	if (error = lookupname(args->lo_fname, UIO_SYSSPACE, FOLLOW, &dvp, &vp))
   2208 		return (error);
   2209 
   2210 	if (vp == NULL) {
   2211 		if (dvp != NULL)
   2212 			VN_RELE(dvp);
   2213 		return (ENOENT);
   2214 	}
   2215 
   2216 	/*
   2217 	 * 'vp' may be an AUTOFS node, so we perform a VOP_ACCESS()
   2218 	 * to trigger the mount of the intended filesystem, so we
   2219 	 * can share the intended filesystem instead of the AUTOFS
   2220 	 * filesystem.
   2221 	 */
   2222 	(void) VOP_ACCESS(vp, 0, 0, cr, NULL);
   2223 
   2224 	/*
   2225 	 * We're interested in the top most filesystem. This is
   2226 	 * specially important when uap->dname is a trigger AUTOFS
   2227 	 * node, since we're really interested in sharing the
   2228 	 * filesystem AUTOFS mounted as result of the VOP_ACCESS()
   2229 	 * call, not the AUTOFS node itself.
   2230 	 */
   2231 	if (vn_mountedvfs(vp) != NULL) {
   2232 		if (error = traverse(&vp))
   2233 			goto errout;
   2234 	}
   2235 
   2236 	/*
   2237 	 * The last arg for nfs_vptoexi says to create a v4 FH
   2238 	 * (instead of v3). This will need to be changed to
   2239 	 * select the new MDS FH format.
   2240 	 */
   2241 	rw_enter(&exported_lock, RW_READER);
   2242 	exi = nfs_vptoexi(dvp, vp, cr, NULL, &error, TRUE);
   2243 	rw_exit(&exported_lock);
   2244 
   2245 	/*
   2246 	 * file isn't shared.
   2247 	 */
   2248 	if (exi == NULL)
   2249 		goto errout;
   2250 
   2251 	fh4.nfs_fh4_val = lorec.lor_fh.fh_buf;
   2252 	error = mknfs41_fh(&fh4, vp, exi);
   2253 	lorec.lor_fh.fh_len = fh4.nfs_fh4_len;
   2254 	lorec.lor_sess = NULL;
   2255 
   2256 	switch (lorec.lor_type) {
   2257 	case LAYOUTRECALL4_FILE:
   2258 		mutex_enter(&vp->v_vsd_lock);
   2259 		fp = (rfs4_file_t *)vsd_get(vp, mds_server->vkey);
   2260 		mutex_exit(&vp->v_vsd_lock);
   2261 		if (fp == NULL) {
   2262 			error = EIO;
   2263 			goto errout;
   2264 		}
   2265 
   2266 		/*
   2267 		 * There may be a cleaner way to run the per-file lists,
   2268 		 * but this works for now. This sends a cb_lo_recall to
   2269 		 * the clients that have an active layout for the file,
   2270 		 * only. Stop the blasting !
   2271 		 */
   2272 		glp = fp->rf_lo_grant_list.next;
   2273 		for (; glp && glp->lg; glp = glp->next) {
   2274 
   2275 			if ((cp = glp->lg->lo_cp) == NULL)
   2276 				continue;	/* internal inconsistency ? */
   2277 
   2278 			rfs41_lo_grant_hold(glp->lg);
   2279 			sp = mds_findsession_by_clid(mds_server,
   2280 			    cp->rc_clientid);
   2281 			if (sp != NULL) {
   2282 				/*
   2283 				 * Recall in progress !
   2284 				 *
   2285 				 * As per spec rules, bump up the seqid (of
   2286 				 * the stateid) and make sure we store it in
   2287 				 * the layout grant info; this will eventually
   2288 				 * be used for layout race detection.
   2289 				 */
   2290 				rfs4_dbe_lock(glp->lg->lo_dbe);
   2291 
   2292 				glp->lg->lo_status = LO_RECALL_INPROG;
   2293 				rfs41_lo_seqid(&glp->lg->lo_stateid);
   2294 
   2295 				mutex_enter(&glp->lg->lo_lock);
   2296 				glp->lg->lor_seqid =
   2297 				    glp->lg->lo_stateid.v41_bits.chgseq;
   2298 				mutex_exit(&glp->lg->lo_lock);
   2299 
   2300 				bcopy(&glp->lg->lo_stateid.stateid,
   2301 				    &lorec.lor_stid, sizeof (stateid4));
   2302 				lorec.lor_lg = glp->lg;
   2303 				rfs41_lo_grant_hold(glp->lg);
   2304 
   2305 				rfs4_dbe_unlock(glp->lg->lo_dbe);
   2306 				file_lor((rfs4_entry_t)sp, (void *)&lorec);
   2307 			}
   2308 			rfs41_lo_grant_rele(glp->lg);
   2309 		}
   2310 		break;
   2311 
   2312 	case LAYOUTRECALL4_FSID:
   2313 		/*
   2314 		 * set fsid just like rfs4_fattr4_fsid()
   2315 		 */
   2316 		if (exi->exi_volatile_dev) {
   2317 			int *pmaj = (int *)&lorec.lor_fsid.major;
   2318 
   2319 			pmaj[0] = exi->exi_fsid.val[0];
   2320 			pmaj[1] = exi->exi_fsid.val[1];
   2321 			lorec.lor_fsid.minor = 0;
   2322 		} else {
   2323 			vattr_t va;
   2324 
   2325 			va.va_mask = AT_FSID | AT_TYPE;
   2326 			error = rfs4_vop_getattr(vp, &va, 0, cr);
   2327 
   2328 			if (error == 0 && va.va_type != VREG)
   2329 				error = EINVAL;
   2330 			if (error)
   2331 				goto errout;
   2332 
   2333 			lorec.lor_fsid.major = getmajor(va.va_fsid);
   2334 			lorec.lor_fsid.minor = getminor(va.va_fsid);
   2335 		}
   2336 
   2337 		if (mds_server->mds_ever_grant_tab == NULL) {
   2338 			error = ECANCELED;
   2339 			goto errout;
   2340 		}
   2341 
   2342 		lorec.lor_vp = vp;
   2343 		VN_HOLD(vp);
   2344 		rfs4_dbe_walk(mds_server->mds_ever_grant_tab, fsid_lor, &lorec);
   2345 		VN_RELE(vp);
   2346 		break;
   2347 
   2348 	default:
   2349 		break;
   2350 	}
   2351 
   2352 errout:
   2353 	VN_RELE(vp);
   2354 	if (dvp != NULL)
   2355 		VN_RELE(dvp);
   2356 	return (error);
   2357 }
   2358 
   2359 /* support for device notifications via mdsadm */
   2360 
   2361 typedef struct mds_notify_device {
   2362 	mds_session_t			*nd_sess;
   2363 	struct mds_notifydev_args	 nd_args;
   2364 
   2365 } mds_notify_device_t;
   2366 
   2367 static void
   2368 mds_do_notify_device(mds_notify_device_t *ndp)
   2369 {
   2370 	CB_COMPOUND4args	 cb4_args;
   2371 	CB_COMPOUND4res		 cb4_res;
   2372 	CB_SEQUENCE4args	*cbsap;
   2373 	CB_NOTIFY_DEVICEID4args *cbndap;
   2374 	nfs_cb_argop4		*argops;
   2375 	struct timeval		 timeout;
   2376 	enum clnt_stat		 call_stat = RPC_FAILED;
   2377 	int			 zilch = 0;
   2378 	CLIENT			*ch;
   2379 	int			 numops;
   2380 	int			 argsz;
   2381 	mds_session_t		*sp;
   2382 	slot_ent_t		*p;
   2383 	notify4			 no;
   2384 	char			*xdr_buf = NULL;
   2385 	int			 xdr_size;
   2386 	XDR			 xdr;
   2387 
   2388 	DTRACE_PROBE1(nfssrv__i__sess_notify_device, mds_notify_device_t *,
   2389 	    ndp);
   2390 
   2391 	if (ndp->nd_sess == NULL)
   2392 		return;
   2393 	sp = ndp->nd_sess;
   2394 
   2395 	/*
   2396 	 * XXX - until we fix blasting _all_ sessions for one notification,
   2397 	 *	make sure that the session in question at least has the
   2398 	 *	back chan established.
   2399 	 */
   2400 	if (!SN_CB_CHAN_EST(sp))
   2401 		return;
   2402 
   2403 	/*
   2404 	 * set up the compound args
   2405 	 */
   2406 	numops = 2;	/* CB_SEQUENCE + CB_NOTIFY_DEVICE */
   2407 	argsz = numops * sizeof (nfs_cb_argop4);
   2408 	argops = kmem_zalloc(argsz, KM_SLEEP);
   2409 
   2410 	argops[0].argop = OP_CB_SEQUENCE;
   2411 	cbsap = &argops[0].nfs_cb_argop4_u.opcbsequence;
   2412 
   2413 	argops[1].argop = OP_CB_NOTIFY_DEVICEID;
   2414 	cbndap = &argops[1].nfs_cb_argop4_u.opcbnotify_deviceid;
   2415 
   2416 	(void) str_to_utf8("cb_notify_device", &cb4_args.tag);
   2417 	cb4_args.minorversion = CB4_MINOR_v1;
   2418 
   2419 	cb4_args.callback_ident = sp->sn_bc.progno;
   2420 	cb4_args.array_len = numops;
   2421 	cb4_args.array = argops;
   2422 
   2423 	cb4_res.tag.utf8string_val = NULL;
   2424 	cb4_res.array = NULL;
   2425 
   2426 	/*
   2427 	 * CB_SEQUENCE
   2428 	 */
   2429 	bcopy(sp->sn_sessid, cbsap->csa_sessionid, sizeof (sessionid4));
   2430 	p = svc_slot_alloc(sp);
   2431 	mutex_enter(&p->se_lock);
   2432 	cbsap->csa_slotid = p->se_sltno;
   2433 	cbsap->csa_sequenceid = p->se_seqid;
   2434 	cbsap->csa_highest_slotid = svc_slot_maxslot(sp);
   2435 	cbsap->csa_cachethis = FALSE;
   2436 
   2437 	/* no referring calling list for device notifications */
   2438 	cbsap->csa_rcall_llen = 0;
   2439 	cbsap->csa_rcall_lval = NULL;
   2440 	mutex_exit(&p->se_lock);
   2441 
   2442 	/*
   2443 	 * CB_NOTIFY_DEVICEID (well, d'uh)
   2444 	 */
   2445 	cbndap->cnda_changes.cnda_changes_len = 1;
   2446 	cbndap->cnda_changes.cnda_changes_val = &no;
   2447 	if (ndp->nd_args.notify_how == NOTIFY_DEVICEID4_DELETE) {
   2448 		notify_deviceid_delete4 nodd;
   2449 
   2450 		no.notify_mask = NOTIFY_DEVICEID4_DELETE_MASK;
   2451 		nodd.ndd_layouttype = LAYOUT4_NFSV4_1_FILES;
   2452 		(void) memset(&nodd.ndd_deviceid, 0, sizeof (deviceid4));
   2453 		bcopy(&ndp->nd_args.dev_id, &nodd.ndd_deviceid,
   2454 		    sizeof (ndp->nd_args.dev_id));
   2455 
   2456 		/* encode the notification blob */
   2457 
   2458 		xdr_size = xdr_sizeof(xdr_notify_deviceid_delete4, &nodd);
   2459 		ASSERT(xdr_size);
   2460 		xdr_buf = kmem_alloc(xdr_size, KM_SLEEP);
   2461 		xdrmem_create(&xdr, xdr_buf, xdr_size, XDR_ENCODE);
   2462 
   2463 		if (xdr_notify_deviceid_delete4(&xdr, &nodd) == FALSE)
   2464 			goto done;
   2465 
   2466 		/*
   2467 		 * Once the blob is encoded, we no longer need
   2468 		 * nodd, which goes out of scope here.
   2469 		 */
   2470 
   2471 	} else {
   2472 		notify_deviceid_change4 nodc;
   2473 
   2474 		no.notify_mask = NOTIFY_DEVICEID4_CHANGE_MASK;
   2475 		nodc.ndc_layouttype = LAYOUT4_NFSV4_1_FILES;
   2476 		(void) memset(&nodc.ndc_deviceid, 0, sizeof (deviceid4));
   2477 		bcopy(&ndp->nd_args.dev_id, &nodc.ndc_deviceid,
   2478 		    sizeof (ndp->nd_args.dev_id));
   2479 
   2480 		xdr_size = xdr_sizeof(xdr_notify_deviceid_change4, &nodc);
   2481 		ASSERT(xdr_size);
   2482 		xdr_buf = kmem_alloc(xdr_size, KM_SLEEP);
   2483 		xdrmem_create(&xdr, xdr_buf, xdr_size, XDR_ENCODE);
   2484 
   2485 		if (xdr_notify_deviceid_change4(&xdr, &nodc) == FALSE) {
   2486 			kmem_free(xdr_buf, xdr_size);
   2487 			xdr_size = 0;
   2488 			xdr_buf = NULL;
   2489 		}
   2490 	}
   2491 
   2492 	no.notify_vals.notifylist4_len = xdr_size;
   2493 	no.notify_vals.notifylist4_val = xdr_buf;
   2494 
   2495 	/*
   2496 	 * Set up the timeout for the callback and make the actual call.
   2497 	 * Timeout will be 80% of the lease period.
   2498 	 */
   2499 	timeout.tv_sec =
   2500 	    (dbe_to_instp(sp->sn_dbe)->lease_period * 80) / 100;
   2501 	timeout.tv_usec = 0;
   2502 
   2503 	ch = rfs41_cb_getch(sp);
   2504 	(void) CLNT_CONTROL(ch, CLSET_XID, (char *)&zilch);
   2505 	call_stat = clnt_call(ch, CB_COMPOUND,
   2506 	    xdr_CB_COMPOUND4args_srv, (caddr_t)&cb4_args,
   2507 	    xdr_CB_COMPOUND4res, (caddr_t)&cb4_res, timeout);
   2508 	rfs41_cb_freech(sp, ch);
   2509 
   2510 	/*
   2511 	 * Errors from the client are harmless for now, since this
   2512 	 * is invoked by an administrative action for testing purposes.
   2513 	 * In the future, if this were part of the normal server action,
   2514 	 * these errors would need to be handled.
   2515 	 */
   2516 	if (call_stat != RPC_SUCCESS) {
   2517 		cmn_err(CE_NOTE, "mds_do_notify_device: RPC call failed %d",
   2518 		    call_stat);
   2519 		goto done;
   2520 
   2521 	} else if (cb4_res.status != NFS4_OK) {
   2522 		cmn_err(CE_NOTE, "mds_do_notify_device: compound failed %d",
   2523 		    cb4_res.status);
   2524 
   2525 	}
   2526 	svc_slot_cb_seqid(&cb4_res, p);
   2527 	xdr_free(xdr_CB_COMPOUND4res, (caddr_t)&cb4_res);
   2528 done:
   2529 	kmem_free(cb4_args.tag.utf8string_val, cb4_args.tag.utf8string_len);
   2530 	kmem_free(argops, argsz);
   2531 	kmem_free(ndp, sizeof (*ndp));
   2532 	if (xdr_buf)
   2533 		kmem_free(xdr_buf, xdr_size);
   2534 	svc_slot_free(sp, p);
   2535 }
   2536 
   2537 static void
   2538 mds_sess_notify_device_callout(rfs4_entry_t u_entry, void *arg)
   2539 {
   2540 	mds_notify_device_t *ndp;
   2541 
   2542 	ndp = kmem_alloc(sizeof (*ndp), KM_SLEEP);
   2543 	bcopy(arg, &ndp->nd_args, sizeof (ndp->nd_args));
   2544 	ndp->nd_sess = (mds_session_t *)u_entry;
   2545 
   2546 	(void) thread_create(NULL, 0, mds_do_notify_device, ndp, 0, &p0,
   2547 	    TS_RUN, minclsyspri);
   2548 }
   2549 
   2550 void
   2551 inst_notify_device(nfs_server_instance_t *instp, void *args)
   2552 {
   2553 	if (instp->mds_session_tab != NULL)
   2554 		rfs4_dbe_walk(instp->mds_session_tab,
   2555 		    mds_sess_notify_device_callout, args);
   2556 }
   2557 
   2558 /*ARGSUSED*/
   2559 static int
   2560 mds_notify_device_cmd(struct mds_notifydev_args *args, cred_t *cr)
   2561 {
   2562 	/*
   2563 	 * Walk the list of server instances, asking each
   2564 	 * to notify the specified device.
   2565 	 */
   2566 	nsi_walk(inst_notify_device, args);
   2567 	return (0);
   2568 }
   2569 
   2570 /*
   2571  * -----------------------------------------------
   2572  * MDS: DS_ADDR tables.
   2573  * -----------------------------------------------
   2574  *
   2575  */
   2576 
   2577 static uint32_t
   2578 ds_addrlist_hash(void *key)
   2579 {
   2580 	return ((uint32_t)(uintptr_t)key);
   2581 }
   2582 
   2583 static bool_t
   2584 ds_addrlist_compare(rfs4_entry_t u_entry, void *key)
   2585 {
   2586 	ds_addrlist_t *dp = (ds_addrlist_t *)u_entry;
   2587 
   2588 	return (rfs4_dbe_getid(dp->dbe) == (int)(uintptr_t)key);
   2589 }
   2590 
   2591 static void *
   2592 ds_addrlist_mkkey(rfs4_entry_t entry)
   2593 {
   2594 	ds_addrlist_t *dp = (ds_addrlist_t *)entry;
   2595 
   2596 	return ((void *)(uintptr_t)rfs4_dbe_getid(dp->dbe));
   2597 }
   2598 
   2599 /*ARGSUSED*/
   2600 static bool_t
   2601 ds_addrlist_create(rfs4_entry_t u_entry, void *arg)
   2602 {
   2603 	ds_addrlist_t *dp = (ds_addrlist_t *)u_entry;
   2604 	struct mds_adddev_args *u_dp = (struct mds_adddev_args *)arg;
   2605 
   2606 	dp->dev_addr.na_r_netid = kstrdup(u_dp->dev_netid);
   2607 	dp->dev_addr.na_r_addr = kstrdup(u_dp->dev_addr);
   2608 	dp->ds_owner = NULL;
   2609 	dp->dev_knc = NULL;
   2610 	dp->dev_nb = NULL;
   2611 	dp->ds_addr_key = 0;
   2612 	dp->ds_port_key = 0;
   2613 
   2614 	return (TRUE);
   2615 }
   2616 
   2617 /*ARGSUSED*/
   2618 static void
   2619 ds_addrlist_destroy(rfs4_entry_t u_entry)
   2620 {
   2621 	ds_addrlist_t *dp = (ds_addrlist_t *)u_entry;
   2622 	int	i;
   2623 	nfs_server_instance_t	*instp;
   2624 
   2625 	instp = dbe_to_instp(u_entry->dbe);
   2626 
   2627 	rw_enter(&instp->ds_addrlist_lock, RW_WRITER);
   2628 	if (dp->ds_owner != NULL) {
   2629 		list_remove(&dp->ds_owner->ds_addrlist_list, dp);
   2630 		rfs4_dbe_rele(dp->ds_owner->dbe);
   2631 		dp->ds_owner = NULL;
   2632 	}
   2633 	rw_exit(&instp->ds_addrlist_lock);
   2634 
   2635 	if (dp->dev_addr.na_r_netid) {
   2636 		i = strlen(dp->dev_addr.na_r_netid) + 1;
   2637 		kmem_free(dp->dev_addr.na_r_netid, i);
   2638 	}
   2639 
   2640 	if (dp->dev_addr.na_r_addr) {
   2641 		i = strlen(dp->dev_addr.na_r_addr) + 1;
   2642 		kmem_free(dp->dev_addr.na_r_addr, i);
   2643 	}
   2644 
   2645 	if (dp->dev_knc != NULL)
   2646 		kmem_free(dp->dev_knc, sizeof (struct knetconfig));
   2647 
   2648 	if (dp->dev_nb != NULL) {
   2649 		if (dp->dev_nb->buf)
   2650 			kmem_free(dp->dev_nb->buf, dp->dev_nb->maxlen);
   2651 		kmem_free(dp->dev_nb, sizeof (struct netbuf));
   2652 	}
   2653 }
   2654 
   2655 
   2656 /*
   2657  * Multipath devices.
   2658  */
   2659 static uint32_t
   2660 mds_mpd_hash(void *key)
   2661 {
   2662 	return ((uint32_t)(uintptr_t)key);
   2663 }
   2664 
   2665 static bool_t
   2666 mds_mpd_compare(rfs4_entry_t u_entry, void *key)
   2667 {
   2668 	mds_mpd_t *mp = (mds_mpd_t *)u_entry;
   2669 
   2670 	return (mp->mpd_id == (id_t)(uintptr_t)key);
   2671 }
   2672 
   2673 static void *
   2674 mds_mpd_mkkey(rfs4_entry_t u_entry)
   2675 {
   2676 	mds_mpd_t *mp = (mds_mpd_t *)u_entry;
   2677 
   2678 	return ((void*)(uintptr_t)mp->mpd_id);
   2679 }
   2680 
   2681 void
   2682 mds_mpd_encode(nfsv4_1_file_layout_ds_addr4 *ds_dev, uint_t *len, char **val)
   2683 {
   2684 	char *xdr_ds_dev;
   2685 	int  xdr_size = 0;
   2686 	XDR  xdr;
   2687 
   2688 	ASSERT(val);
   2689 
   2690 	xdr_size = xdr_sizeof(xdr_nfsv4_1_file_layout_ds_addr4, ds_dev);
   2691 
   2692 	ASSERT(xdr_size);
   2693 
   2694 	xdr_ds_dev = kmem_alloc(xdr_size, KM_SLEEP);
   2695 
   2696 	xdrmem_create(&xdr, xdr_ds_dev, xdr_size, XDR_ENCODE);
   2697 
   2698 	if (xdr_nfsv4_1_file_layout_ds_addr4(&xdr, ds_dev) == FALSE) {
   2699 		*len = 0;
   2700 		*val = NULL;
   2701 		kmem_free(xdr_ds_dev, xdr_size);
   2702 		return;
   2703 	}
   2704 
   2705 	*len = xdr_size;
   2706 	*val = xdr_ds_dev;
   2707 }
   2708 
   2709 /*ARGSUSED*/
   2710 static bool_t
   2711 mds_mpd_create(rfs4_entry_t u_entry, void *arg)
   2712 {
   2713 	mds_mpd_t *mp = (mds_mpd_t *)u_entry;
   2714 	mds_addmpd_t *maap = (mds_addmpd_t *)arg;
   2715 
   2716 	mp->mpd_id = maap->id;
   2717 	mds_mpd_encode(maap->ds_addr4, &(mp->mpd_encoded_len),
   2718 	    &(mp->mpd_encoded_val));
   2719 	list_create(&mp->mpd_layouts_list, sizeof (mds_layout_t),
   2720 	    offsetof(mds_layout_t, mpd_layouts_next));
   2721 
   2722 	return (TRUE);
   2723 }
   2724 
   2725 
   2726 /*ARGSUSED*/
   2727 static void
   2728 mds_mpd_destroy(rfs4_entry_t u_entry)
   2729 {
   2730 	mds_mpd_t		*mp = (mds_mpd_t *)u_entry;
   2731 	nfs_server_instance_t	*instp;
   2732 
   2733 	instp = dbe_to_instp(u_entry->dbe);
   2734 	ASSERT(instp->mds_mpd_id_space != NULL);
   2735 	id_free(instp->mds_mpd_id_space, mp->mpd_id);
   2736 
   2737 	kmem_free(mp->mpd_encoded_val, mp->mpd_encoded_len);
   2738 
   2739 #ifdef	DEBUG
   2740 	/*
   2741 	 * We should never get here as the layouts
   2742 	 * entries should be holding a reference against
   2743 	 * this mpd!
   2744 	 */
   2745 	rw_enter(&instp->mds_mpd_lock, RW_WRITER);
   2746 	ASSERT(list_is_empty(&mp->mpd_layouts_list));
   2747 	rw_exit(&instp->mds_mpd_lock);
   2748 #endif
   2749 	list_destroy(&mp->mpd_layouts_list);
   2750 }
   2751 
   2752 /*
   2753  * The OTW device id is 128bits in length, we however are
   2754  * still using a uint_32 internally.
   2755  */
   2756 mds_mpd_t *
   2757 mds_find_mpd(nfs_server_instance_t *instp, id_t id)
   2758 {
   2759 	mds_mpd_t *mp;
   2760 	bool_t create = FALSE;
   2761 
   2762 	mp = (mds_mpd_t *)rfs4_dbsearch(instp->mds_mpd_idx,
   2763 	    (void *)(uintptr_t)id, &create, NULL, RFS4_DBS_VALID);
   2764 	return (mp);
   2765 }
   2766 
   2767 /*
   2768  * Plop kernel deviceid into the 128bit OTW deviceid
   2769  */
   2770 void
   2771 mds_set_deviceid(id_t did, deviceid4 *otw_id)
   2772 {
   2773 	ba_devid_t d;
   2774 
   2775 	bzero(&d, sizeof (d));
   2776 	d.i.did = did;
   2777 	bcopy(&d, otw_id, sizeof (d));
   2778 }
   2779 
   2780 /*
   2781  * Used by the walker to populate the deviceid list.
   2782  */
   2783 void
   2784 mds_mpd_list(rfs4_entry_t entry, void *arg)
   2785 {
   2786 	mds_mpd_t		*mp = (mds_mpd_t *)entry;
   2787 	mds_device_list_t	*mdl = (mds_device_list_t *)arg;
   2788 
   2789 	deviceid4   *dlip;
   2790 
   2791 	/*
   2792 	 * If this entry is invalid or we should skip it
   2793 	 * go to the next one..
   2794 	 */
   2795 	if (rfs4_dbe_skip_or_invalid(mp->mpd_dbe))
   2796 		return;
   2797 
   2798 	dlip = &(mdl->mdl_dl[mdl->mdl_count]);
   2799 
   2800 	mds_set_deviceid(mp->mpd_id, dlip);
   2801 
   2802 	/*
   2803 	 * bump to the next devlist_item4
   2804 	 */
   2805 	mdl->mdl_count++;
   2806 }
   2807 
   2808 /* ARGSUSED */
   2809 ds_addrlist_t *
   2810 mds_find_ds_addrlist_by_mds_sid(nfs_server_instance_t *instp,
   2811     mds_sid *sid)
   2812 {
   2813 	ds_addrlist_t	*dp = NULL;
   2814 	ds_guid_info_t	*pgi;
   2815 	ds_owner_t	*dop;
   2816 	ds_guid_t	guid;
   2817 
   2818 	/*
   2819 	 * Warning, do not, do not ever, free this guid!
   2820 	 */
   2821 	guid.stor_type = ZFS;
   2822 	guid.ds_guid_u.zfsguid.zfsguid_len = sid->len;
   2823 	guid.ds_guid_u.zfsguid.zfsguid_val = sid->val;
   2824 
   2825 	/*
   2826 	 * First we need to find the ds_guid_info_t which
   2827 	 * corresponds to this mds_sid.
   2828 	 */
   2829 	pgi = mds_find_ds_guid_info_by_id(&guid);
   2830 	if (pgi == NULL)
   2831 		return (NULL);
   2832 
   2833 	dop = pgi->ds_owner;
   2834 	if (!dop)
   2835 		goto error;
   2836 
   2837 	/*
   2838 	 * XXX: If a ds_owner has multiple addresses, then just grab the first
   2839 	 * we find.
   2840 	 */
   2841 	dp = list_head(&dop->ds_addrlist_list);
   2842 	if (dp)
   2843 		rfs4_dbe_hold(dp->dbe);
   2844 
   2845 error:
   2846 
   2847 	rfs4_dbe_rele(pgi->dbe);
   2848 	return (dp);
   2849 }
   2850 
   2851 ds_addrlist_t *
   2852 mds_find_ds_addrlist(nfs_server_instance_t *instp, uint32_t id)
   2853 {
   2854 	ds_addrlist_t *dp;
   2855 	bool_t create = FALSE;
   2856 
   2857 	dp = (ds_addrlist_t *)rfs4_dbsearch(instp->ds_addrlist_idx,
   2858 	    (void *)(uintptr_t)id, &create, NULL, RFS4_DBS_VALID);
   2859 	return (dp);
   2860 }
   2861 
   2862 void
   2863 mds_ds_addrlist_rele(ds_addrlist_t *dp)
   2864 {
   2865 	rfs4_dbe_rele(dp->dbe);
   2866 }
   2867 
   2868 /*
   2869  */
   2870 static uint32_t
   2871 mds_str_hash(void *key)
   2872 {
   2873 	char *addr = (char *)key;
   2874 	int i;
   2875 	uint32_t hash = 0;
   2876 
   2877 	for (i = 0; addr[i]; i++) {
   2878 		hash <<= 1;
   2879 		hash += (uint_t)addr[i];
   2880 	}
   2881 
   2882 	return (hash);
   2883 }
   2884 
   2885 static uint32_t
   2886 mds_utf8string_hash(void *key)
   2887 {
   2888 	utf8string *obj = (utf8string *)key;
   2889 	int i;
   2890 	uint32_t hash = 0;
   2891 
   2892 	for (i = 0; i < obj->utf8string_len; i++) {
   2893 		hash <<= 1;
   2894 		hash += (uint_t)obj->utf8string_val[i];
   2895 	}
   2896 
   2897 	return (hash);
   2898 }
   2899 
   2900 static bool_t
   2901 rfs41_invalid_expiry(rfs4_entry_t entry)
   2902 {
   2903 	if (rfs4_dbe_is_invalid(entry->dbe))
   2904 		return (TRUE);
   2905 
   2906 	return (FALSE);
   2907 }
   2908 
   2909 static uint32_t
   2910 ds_addrlist_addrkey_hash(void *key)
   2911 {
   2912 	return ((uint32_t)(uintptr_t)key);
   2913 }
   2914 
   2915 static void *
   2916 ds_addrlist_addrkey_mkkey(rfs4_entry_t entry)
   2917 {
   2918 	ds_addrlist_t *dp = (ds_addrlist_t *)entry;
   2919 
   2920 	return (&dp->ds_addr_key);
   2921 }
   2922 
   2923 /*
   2924  * Only compare the address portion and not the
   2925  * port info. We do this because the DS may
   2926  * have rebooted and gotten a different port
   2927  * number.
   2928  *
   2929  * XXX: What happens if we have multiple DSes
   2930  * on one box? I.e., a valid case for the same
   2931  * IP, but different ports?
   2932  */
   2933 static int
   2934 ds_addrlist_addrkey_compare(rfs4_entry_t entry, void *key)
   2935 {
   2936 	ds_addrlist_t *dp = (ds_addrlist_t *)entry;
   2937 	uint64_t addr_key = *(uint64_t *)key;
   2938 
   2939 	return (addr_key == dp->ds_addr_key);
   2940 }
   2941 
   2942 /*
   2943  * Data-server information (ds_owner)  tables and indexes.
   2944  */
   2945 static uint32_t
   2946 ds_owner_hash(void *key)
   2947 {
   2948 	return ((uint32_t)(uintptr_t)key);
   2949 }
   2950 
   2951 static bool_t
   2952 ds_owner_compare(rfs4_entry_t entry, void *key)
   2953 {
   2954 	ds_owner_t *dop = (ds_owner_t *)entry;
   2955 
   2956 	return (dop->ds_id == (int)(uintptr_t)key);
   2957 
   2958 }
   2959 
   2960 static void *
   2961 ds_owner_mkkey(rfs4_entry_t entry)
   2962 {
   2963 	ds_owner_t *dop = (ds_owner_t *)entry;
   2964 
   2965 	return ((void *)(uintptr_t)dop->ds_id);
   2966 }
   2967 
   2968 static bool_t
   2969 ds_owner_inst_compare(rfs4_entry_t entry, void *key)
   2970 {
   2971 	ds_owner_t *dop = (ds_owner_t *)entry;
   2972 
   2973 	return (strcmp(dop->identity, key) == 0);
   2974 }
   2975 
   2976 static void *
   2977 ds_owner_inst_mkkey(rfs4_entry_t entry)
   2978 {
   2979 	ds_owner_t *dop = (ds_owner_t *)entry;
   2980 	return (dop->identity);
   2981 }
   2982 
   2983 /*ARGSUSED*/
   2984 static bool_t
   2985 ds_owner_create(rfs4_entry_t u_entry, void *arg)
   2986 {
   2987 	ds_owner_t *dop = (ds_owner_t *)u_entry;
   2988 	DS_EXIBIargs *drap = (DS_EXIBIargs *)arg;
   2989 
   2990 	dop->ds_id = rfs4_dbe_getid(dop->dbe);
   2991 	dop->verifier = drap->ds_ident.boot_verifier;
   2992 	dop->identity = kstrdup(drap->ds_ident.instance.instance_val);
   2993 	list_create(&dop->ds_addrlist_list, sizeof (ds_addrlist_t),
   2994 	    offsetof(ds_addrlist_t, ds_addrlist_next));
   2995 	list_create(&dop->ds_guid_list, sizeof (ds_guid_info_t),
   2996 	    offsetof(ds_guid_info_t, ds_guid_next));
   2997 	return (TRUE);
   2998 }
   2999 
   3000 ds_owner_t *
   3001 ds_owner_alloc(DS_EXIBIargs *drap)
   3002 {
   3003 	ds_owner_t *dop;
   3004 
   3005 	rw_enter(&mds_server->ds_owner_lock, RW_WRITER);
   3006 	/* Add the "new" entry */
   3007 	dop = (ds_owner_t *)rfs4_dbcreate(mds_server->ds_owner_inst_idx,
   3008 	    (void *)drap);
   3009 	rw_exit(&mds_server->ds_owner_lock);
   3010 	return (dop);
   3011 }
   3012 
   3013 static void
   3014 ds_owner_destroy(rfs4_entry_t u_entry)
   3015 {
   3016 	ds_owner_t *dop = (ds_owner_t *)u_entry;
   3017 
   3018 	int	i;
   3019 	nfs_server_instance_t	*instp;
   3020 
   3021 	instp = dbe_to_instp(u_entry->dbe);
   3022 
   3023 	i = strlen(dop->identity) + 1;
   3024 	kmem_free(dop->identity, i);
   3025 
   3026 #ifdef	DEBUG
   3027 	/*
   3028 	 * We should never get here as the ds_addrlist
   3029 	 * entries should be holding a reference against
   3030 	 * this owner!
   3031 	 */
   3032 	rw_enter(&instp->ds_addrlist_lock, RW_WRITER);
   3033 	ASSERT(list_is_empty(&dop->ds_addrlist_list));
   3034 	rw_exit(&instp->ds_addrlist_lock);
   3035 
   3036 	/*
   3037 	 * We should never get here as the ds_guid_info
   3038 	 * entries should be holding a reference against
   3039 	 * this owner!
   3040 	 */
   3041 	rw_enter(&instp->ds_guid_info_lock, RW_WRITER);
   3042 	ASSERT(list_is_empty(&dop->ds_guid_list));
   3043 	rw_exit(&instp->ds_guid_info_lock);
   3044 #endif
   3045 
   3046 	list_destroy(&dop->ds_guid_list);
   3047 	list_destroy(&dop->ds_addrlist_list);
   3048 }
   3049 
   3050 void
   3051 ds_guid_free(ds_guid_t *gp)
   3052 {
   3053 	if (gp == NULL)
   3054 		return;
   3055 
   3056 	/*
   3057 	 * Yes, overkill for one stor_type, but ready
   3058 	 * to go for more!
   3059 	 */
   3060 	switch (gp->stor_type) {
   3061 	case ZFS:
   3062 		kmem_free(gp->ds_guid_u.zfsguid.zfsguid_val,
   3063 		    gp->ds_guid_u.zfsguid.zfsguid_len);
   3064 		break;
   3065 	}
   3066 }
   3067 
   3068 /*
   3069  * Duplicate the src guid to dst.
   3070  *
   3071  * return 0 on success or 1 for failure.
   3072  */
   3073 int
   3074 ds_guid_dup(ds_guid_t *src, ds_guid_t *dst)
   3075 {
   3076 	dst = src;
   3077 
   3078 	switch (dst->stor_type) {
   3079 	case ZFS:
   3080 		dst->ds_guid_u.zfsguid.zfsguid_val
   3081 		    = kmem_alloc(dst->ds_guid_u.zfsguid.zfsguid_len, KM_SLEEP);
   3082 		bcopy(src->ds_guid_u.zfsguid.zfsguid_val,
   3083 		    dst->ds_guid_u.zfsguid.zfsguid_val,
   3084 		    dst->ds_guid_u.zfsguid.zfsguid_len);
   3085 		break;
   3086 	default:
   3087 		/* if it's unknown zero out the dst */
   3088 		bzero(dst, sizeof (ds_guid_t));
   3089 		return (1);
   3090 
   3091 	}
   3092 	return (0);
   3093 }
   3094 
   3095 /*
   3096  * compare ds_guids return 0 for not the same or
   3097  * 1 if they are equal..
   3098  */
   3099 int
   3100 ds_guid_compare(ds_guid_t *gp1, ds_guid_t *gp2)
   3101 {
   3102 	if (gp1->stor_type != gp2->stor_type)
   3103 		return (0);
   3104 
   3105 	switch (gp1->stor_type) {
   3106 	case ZFS:
   3107 		if (gp1->ds_guid_u.zfsguid.zfsguid_len !=
   3108 		    gp2->ds_guid_u.zfsguid.zfsguid_len)
   3109 			return (0);
   3110 		if (bcmp(gp1->ds_guid_u.zfsguid.zfsguid_val,
   3111 		    gp2->ds_guid_u.zfsguid.zfsguid_val,
   3112 		    gp2->ds_guid_u.zfsguid.zfsguid_len) != 0)
   3113 			return (0);
   3114 		break;
   3115 
   3116 	default:
   3117 		return (0);
   3118 	}
   3119 
   3120 	return (1);
   3121 }
   3122 
   3123 void
   3124 mds_free_zfsattr(ds_guid_info_t *dst)
   3125 {
   3126 	int i;
   3127 
   3128 	if (dst->ds_attr_len == 0)
   3129 		return;
   3130 
   3131 	for (i = 0; i < dst->ds_attr_len; i++) {
   3132 		UTF8STRING_FREE(dst->ds_attr_val[i].attrname);
   3133 		kmem_free(dst->ds_attr_val[i].attrvalue.attrvalue_val,
   3134 		    dst->ds_attr_val[i].attrvalue.attrvalue_len);
   3135 	}
   3136 }
   3137 
   3138 void
   3139 mds_dup_zfsattr(ds_zfsattr *src, ds_guid_info_t *dst)
   3140 {
   3141 	int i;
   3142 	int len;
   3143 
   3144 	for (i = 0; i < dst->ds_attr_len; i++) {
   3145 		len = dst->ds_attr_val[i].attrname.utf8string_len =
   3146 		    src[i].attrname.utf8string_len;
   3147 
   3148 		dst->ds_attr_val[i].attrname.utf8string_val =
   3149 		    kmem_alloc(len, KM_SLEEP);
   3150 
   3151 		bcopy(src[i].attrname.utf8string_val,
   3152 		    dst->ds_attr_val[i].attrname.utf8string_val, len);
   3153 
   3154 		len = dst->ds_attr_val[i].attrvalue.attrvalue_len =
   3155 		    src[i].attrvalue.attrvalue_len;
   3156 
   3157 		dst->ds_attr_val[i].attrvalue.attrvalue_val
   3158 		    = kmem_alloc(len, KM_SLEEP);
   3159 
   3160 		bcopy(src[i].attrvalue.attrvalue_val,
   3161 		    dst->ds_attr_val[i].attrvalue.attrvalue_val, len);
   3162 	}
   3163 }
   3164 
   3165 static bool_t
   3166 ds_guid_info_create(rfs4_entry_t u_entry, void *arg)
   3167 {
   3168 	ds_guid_info_t	*pgi = (ds_guid_info_t *)u_entry;
   3169 	pinfo_create_t	*pic = (pinfo_create_t *)arg;
   3170 
   3171 	ds_guid		*dest;
   3172 	ds_guid		*src;
   3173 
   3174 	ds_zfsinfo	*dz;
   3175 	char		*sz;
   3176 
   3177 	int		j;
   3178 	uint_t		len;
   3179 
   3180 	/*
   3181 	 * Get the dataset name.
   3182 	 * Note: We do this first to make the error handling
   3183 	 * dead simple, i.e., do nothing!
   3184 	 */
   3185 	pgi->ds_dataset_name.utf8string_val = NULL;
   3186 	pgi->ds_dataset_name.utf8string_len = 0;
   3187 	dz = &pic->si->ds_storinfo_u.zfs_info;
   3188 	for (j = 0; j < dz->attrs.attrs_len; j++) {
   3189 		ds_zfsattr	*attrs_val = &dz->attrs.attrs_val[j];
   3190 		int		cmp;
   3191 
   3192 		sz = utf8_to_str(&attrs_val->attrname, &len, NULL);
   3193 		cmp = strcmp(sz, "dataset");
   3194 		kmem_free(sz, len);
   3195 		if (cmp == 0) {
   3196 			(void) utf8_copy(
   3197 			    (utf8string *)&attrs_val->attrvalue,
   3198 			    &pgi->ds_dataset_name);
   3199 
   3200 			break;
   3201 		}
   3202 	}
   3203 
   3204 	/*
   3205 	 * As the dataset name is an index, it must exist!
   3206 	 */
   3207 	if (UTF8STRING_NULL(pgi->ds_dataset_name)) {
   3208 		return (FALSE);
   3209 	}
   3210 
   3211 	pgi->ds_owner = pic->ds_owner;
   3212 	rfs4_dbe_hold(pgi->ds_owner->dbe);
   3213 
   3214 	list_insert_tail(&pgi->ds_owner->ds_guid_list, pgi);
   3215 	rfs4_dbe_hold(pgi->dbe);
   3216 
   3217 	/* Only supported type is ZFS */
   3218 	ASSERT(pic->si->type == ZFS);
   3219 
   3220 	src = &(pic->si->ds_storinfo_u.zfs_info.guid_map.ds_guid);
   3221 	dest = &pgi->ds_guid;
   3222 	dest->stor_type = src->stor_type;
   3223 
   3224 	/*
   3225 	 * Copy ds_guid
   3226 	 */
   3227 	dest->ds_guid_u.zfsguid.zfsguid_len =
   3228 	    src->ds_guid_u.zfsguid.zfsguid_len;
   3229 	dest->ds_guid_u.zfsguid.zfsguid_val =
   3230 	    kmem_zalloc(dest->ds_guid_u.zfsguid.zfsguid_len,
   3231 	    KM_SLEEP);
   3232 	bcopy(src->ds_guid_u.zfsguid.zfsguid_val,
   3233 	    dest->ds_guid_u.zfsguid.zfsguid_val,
   3234 	    dest->ds_guid_u.zfsguid.zfsguid_len);
   3235 
   3236 	/*
   3237 	 * Copy zfs attrs
   3238 	 */
   3239 	pgi->ds_attr_len = pic->si->ds_storinfo_u.zfs_info.attrs.attrs_len;
   3240 	pgi->ds_attr_val = kmem_alloc(
   3241 	    sizeof (ds_zfsattr) * pgi->ds_attr_len, KM_SLEEP);
   3242 	mds_dup_zfsattr(pic->si->ds_storinfo_u.zfs_info.attrs.attrs_val,
   3243 	    pgi);
   3244 
   3245 	return (TRUE);
   3246 }
   3247 
   3248 static void *
   3249 ds_guid_info_mkkey(rfs4_entry_t u_entry)
   3250 {
   3251 	ds_guid_info_t *pgi = (ds_guid_info_t *)u_entry;
   3252 
   3253 	return ((void *)(uintptr_t)&pgi->ds_guid);
   3254 }
   3255 
   3256 static bool_t
   3257 ds_guid_info_compare(rfs4_entry_t u_entry, void *key)
   3258 {
   3259 	ds_guid_info_t *pgi = (ds_guid_info_t *)u_entry;
   3260 	ds_guid_t *guid = (ds_guid_t *)key;
   3261 
   3262 	return (ds_guid_compare(&pgi->ds_guid, guid));
   3263 }
   3264 
   3265 static uint32_t
   3266 ds_guid_info_hash(void *key)
   3267 {
   3268 	ds_guid_t	*pg = (ds_guid_t *)key;
   3269 	int		i;
   3270 	uint32_t	hash = 0;
   3271 
   3272 	for (i = 0; i < pg->ds_guid_u.zfsguid.zfsguid_len; i++) {
   3273 		hash <<= 1;
   3274 		hash += (uint_t)pg->ds_guid_u.zfsguid.zfsguid_val[i];
   3275 	}
   3276 
   3277 	return (hash);
   3278 }
   3279 
   3280 static void *
   3281 ds_guid_info_dataset_name_mkkey(rfs4_entry_t u_entry)
   3282 {
   3283 	ds_guid_info_t *pgi = (ds_guid_info_t *)u_entry;
   3284 
   3285 	return ((void *)&pgi->ds_dataset_name);
   3286 }
   3287 
   3288 static bool_t
   3289 ds_guid_info_dataset_name_compare(rfs4_entry_t u_entry, void *key)
   3290 {
   3291 	ds_guid_info_t *pgi = (ds_guid_info_t *)u_entry;
   3292 
   3293 	return (utf8_compare((utf8string *)key,
   3294 	    &pgi->ds_dataset_name) == 0);
   3295 }
   3296 
   3297 /*ARGSUSED*/
   3298 static void
   3299 ds_guid_info_destroy(rfs4_entry_t u_entry)
   3300 {
   3301 	ds_guid_info_t *pgi = (ds_guid_info_t *)u_entry;
   3302 	nfs_server_instance_t	*instp;
   3303 
   3304 	instp = dbe_to_instp(u_entry->dbe);
   3305 
   3306 	rw_enter(&instp->ds_guid_info_lock, RW_WRITER);
   3307 	if (pgi->ds_owner) {
   3308 		list_remove(&pgi->ds_owner->ds_guid_list, pgi);
   3309 		rfs4_dbe_rele(pgi->ds_owner->dbe);
   3310 	}
   3311 	rw_exit(&instp->ds_guid_info_lock);
   3312 
   3313 	ds_guid_free(&pgi->ds_guid);
   3314 	mds_free_zfsattr(pgi);
   3315 
   3316 	UTF8STRING_FREE(pgi->ds_dataset_name);
   3317 }
   3318 
   3319 ds_guid_info_t *
   3320 mds_find_ds_guid_info_by_id(ds_guid_t *guid)
   3321 {
   3322 	ds_guid_info_t	*pgi;
   3323 	bool_t		create = FALSE;
   3324 
   3325 	rw_enter(&mds_server->ds_guid_info_lock, RW_READER);
   3326 	pgi = (ds_guid_info_t *)rfs4_dbsearch(mds_server->ds_guid_info_idx,
   3327 	    (void *)guid, &create, NULL, RFS4_DBS_VALID);
   3328 	rw_exit(&mds_server->ds_guid_info_lock);
   3329 
   3330 	return (pgi);
   3331 }
   3332 
   3333 int
   3334 mds_ds_path_to_mds_sid(utf8string *dataset_name, mds_sid *sid)
   3335 {
   3336 	ds_guid_info_t	*pgi;
   3337 	bool_t		create = FALSE;
   3338 
   3339 	rw_enter(&mds_server->ds_guid_info_lock, RW_READER);
   3340 	pgi = (ds_guid_info_t *)rfs4_dbsearch(
   3341 	    mds_server->ds_guid_info_dataset_name_idx,
   3342 	    (void *)dataset_name, &create, NULL, RFS4_DBS_VALID);
   3343 	rw_exit(&mds_server->ds_guid_info_lock);
   3344 
   3345 	if (pgi == NULL)
   3346 		return (1);
   3347 
   3348 	sid->len = pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_len;
   3349 	sid->val = kmem_alloc(sid->len, KM_SLEEP);
   3350 	bcopy(pgi->ds_guid.ds_guid_u.zfsguid.zfsguid_val,
   3351 	    sid->val, sid->len);
   3352 
   3353 	rfs4_dbe_rele(pgi->dbe);
   3354 
   3355 	return (0);
   3356 }
   3357 
   3358 /*
   3359  * XXX this should be populated during startup. we
   3360  * XXX should get the data from stable store. For now
   3361  * XXX we are just going to keep the map that the DS
   3362  * XXX provides us..
   3363  */
   3364 /*ARGSUSED*/
   3365 static bool_t
   3366 mds_mapzap_create(nfs_server_instance_t *instp,
   3367 		rfs4_entry_t e, void *arg)
   3368 {
   3369 	mds_mapzap_t *mzp = (mds_mapzap_t *)e;
   3370 
   3371 	mzp->ds_map = *(ds_guid_map_t *)arg;
   3372 	/* write to disk */
   3373 	return (TRUE);
   3374 }
   3375 
   3376 static void *
   3377 mds_mapzap_mkkey(rfs4_entry_t e)
   3378 {
   3379 	mds_mapzap_t *mzp = (mds_mapzap_t *)e;
   3380 
   3381 	return ((void *)(uintptr_t)&mzp->ds_map.ds_guid);
   3382 }
   3383 
   3384 
   3385 static bool_t
   3386 mds_mapzap_compare(rfs4_entry_t e, void *key)
   3387 {
   3388 	mds_mapzap_t *mzp = (mds_mapzap_t *)e;
   3389 	ds_guid_t   *gp = (ds_guid_t *)key;
   3390 
   3391 	return ((bool_t)ds_guid_compare(&mzp->ds_map.ds_guid, gp));
   3392 
   3393 }
   3394 
   3395 static uint32_t
   3396 mds_mapzap_hash(void *key)
   3397 {
   3398 	return ((uint32_t)(uintptr_t)key);
   3399 }
   3400 
   3401 /*ARGSUSED*/
   3402 static void
   3403 mds_mapzap_destroy(rfs4_entry_t foo)
   3404 {
   3405 }
   3406 
   3407 /*
   3408  * Used to initialize the NFSv4.1 server's state.
   3409  * All of the tables are created and timers are set.
   3410  *
   3411  * Upon success, the state_lock is held.
   3412  */
   3413 int
   3414 sstor_init(nfs_server_instance_t *instp, int def_reap)
   3415 {
   3416 	/*
   3417 	 * If the server state store has already been initialized,
   3418 	 * skip it
   3419 	 */
   3420 	mutex_enter(&instp->state_lock);
   3421 	if (instp->state_store != NULL) {
   3422 		mutex_exit(&instp->state_lock);
   3423 		return (0);
   3424 	}
   3425 
   3426 	/*
   3427 	 * Set the boot time.  If the server has been restarted quickly
   3428 	 * and has had the opportunity to service clients, then the start_time
   3429 	 * needs to be bumped regardless.  A small window but it exists...
   3430 	 */
   3431 	if (instp->start_time != gethrestime_sec())
   3432 		instp->start_time = gethrestime_sec();
   3433 	else
   3434 		instp->start_time++;
   3435 
   3436 	/*
   3437 	 * If a table does not have a specific reap time,
   3438 	 * this value is used.
   3439 	 */
   3440 	instp->reap_time = def_reap * rfs4_lease_time;
   3441 
   3442 	instp->state_store = rfs4_database_create();
   3443 	instp->state_store->db_instp = instp;
   3444 
   3445 	/* reset the "first NFSv4 request" status */
   3446 	instp->seen_first_compound = 0;
   3447 	instp->exi_clean_func = NULL;
   3448 
   3449 	return (1);
   3450 }
   3451 
   3452 /*
   3453  * Create/init just the session stateStore tables.
   3454  * used for data-server
   3455  *
   3456  * NOTE: This code should be very suspect, it has never
   3457  * been called. The DS actually uses the MDS tables!
   3458  */
   3459 void
   3460 ds_sstor_init(nfs_server_instance_t *instp)
   3461 {
   3462 	/*
   3463 	 * Client table.
   3464 	 */
   3465 	rw_init(&instp->findclient_lock, NULL, RW_DEFAULT, NULL);
   3466 
   3467 	instp->client_tab = rfs4_table_create(
   3468 	    instp, "Client", instp->client_cache_time, 2,
   3469 	    rfs4_client_create, rfs4_client_destroy, rfs4_client_expiry,
   3470 	    sizeof (rfs4_client_t), TABSIZE, MAXTABSZ/8, 100);
   3471 
   3472 	instp->nfsclnt_idx = rfs4_index_create(instp->client_tab,
   3473 	    "nfs_client_id4", nfsclnt_hash, nfsclnt_compare, nfsclnt_mkkey,
   3474 	    TRUE);
   3475 
   3476 	instp->clientid_idx = rfs4_index_create(instp->client_tab,
   3477 	    "client_id", clientid_hash, clientid_compare, clientid_mkkey,
   3478 	    FALSE);
   3479 
   3480 	/*
   3481 	 * Session table.
   3482 	 */
   3483 	rw_init(&instp->findsession_lock, NULL, RW_DEFAULT, NULL);
   3484 
   3485 	instp->mds_session_tab = rfs4_table_create(instp,
   3486 	    "Session", instp->reap_time, 2, mds_session_create,
   3487 	    mds_session_destroy, mds_do_not_expire, sizeof (mds_session_t),
   3488 	    MDS_TABSIZE, MDS_MAXTABSZ/8, 100);
   3489 
   3490 	instp->mds_session_idx = rfs4_index_create(instp->mds_session_tab,
   3491 	    "session_idx", sessid_hash, sessid_compare, sessid_mkkey, TRUE);
   3492 
   3493 	instp->mds_sess_clientid_idx = rfs4_index_create(instp->mds_session_tab,
   3494 	    "sess_clnt_idx", clientid_hash, sess_clid_compare, sess_clid_mkkey,
   3495 	    FALSE);
   3496 
   3497 	/*
   3498 	 * Mark it as fully initialized
   3499 	 */
   3500 	instp->inst_flags |= NFS_INST_STORE_INIT | NFS_INST_DS;
   3501 
   3502 	/*
   3503 	 * In case we are ever able to re-init the state,
   3504 	 * make sure we clean-up the termination!
   3505 	 */
   3506 	instp->inst_flags &= ~NFS_INST_TERMINUS;
   3507 }
   3508 
   3509 /*
   3510  * Used to initialize the NFSv4.1 server's state.
   3511  * All of the tables are created and timers are set.
   3512  */
   3513 void
   3514 mds_sstor_init(nfs_server_instance_t *instp)
   3515 {
   3516 	extern rfs4_cbstate_t mds_cbcheck(rfs4_state_t *);
   3517 	int  need_sstor_init;
   3518 
   3519 	/*
   3520 	 * Create the state store and set the
   3521 	 * start-up time.
   3522 	 *
   3523 	 * Upon success, the state_lock is held!
   3524 	 */
   3525 	need_sstor_init = sstor_init(instp, 60);
   3526 	if (need_sstor_init == 0)
   3527 		return;
   3528 
   3529 	instp->deleg_cbrecall = mds_do_cb_recall;
   3530 	instp->deleg_cbcheck  = mds_cbcheck;
   3531 
   3532 	/*
   3533 	 * Make the NFSv4.1 kspe policies.
   3534 	 */
   3535 	nfs41_spe_init();
   3536 
   3537 	/*
   3538 	 * Now create the common tables and indexes
   3539 	 */
   3540 	v4prot_sstor_init(instp);
   3541 
   3542 	rw_init(&instp->mds_mpd_lock, NULL, RW_DEFAULT, NULL);
   3543 	rw_init(&instp->ds_addrlist_lock, NULL, RW_DEFAULT, NULL);
   3544 	rw_init(&instp->ds_guid_info_lock, NULL, RW_DEFAULT, NULL);
   3545 	instp->ds_guid_info_count = 0;
   3546 
   3547 	/*
   3548 	 * Session table.
   3549 	 */
   3550 	rw_init(&instp->findsession_lock, NULL, RW_DEFAULT, NULL);
   3551 
   3552 	instp->mds_session_tab = rfs4_table_create(instp,
   3553 	    "Session", instp->reap_time, 2, mds_session_create,
   3554 	    mds_session_destroy, mds_session_expiry, sizeof (mds_session_t),
   3555 	    MDS_TABSIZE, MDS_MAXTABSZ/8, 100);
   3556 
   3557 	instp->mds_session_idx = rfs4_index_create(instp->mds_session_tab,
   3558 	    "session_idx", sessid_hash, sessid_compare, sessid_mkkey, TRUE);
   3559 
   3560 	instp->mds_sess_clientid_idx = rfs4_index_create(instp->mds_session_tab,
   3561 	    "sess_clnt_idx", clientid_hash, sess_clid_compare, sess_clid_mkkey,
   3562 	    FALSE);
   3563 
   3564 	/*
   3565 	 * pNFS layout table.
   3566 	 */
   3567 	rw_init(&instp->mds_layout_lock, NULL, RW_DEFAULT, NULL);
   3568 
   3569 	/*
   3570 	 * A layout might be in use by many files. So, when one
   3571 	 * file is done with a layout, it can not invlaidate the
   3572 	 * state. Also, as a layout is created, it is immeadiately
   3573 	 * assigned to a file, and thus the refcnt will stay at
   3574 	 * 2. Thus, if the refcnt is ever 1, that means no file
   3575 	 * has a reference and as such, the entry can be reclaimed.
   3576 	 */
   3577 	instp->mds_layout_tab = rfs4_table_create(instp,
   3578 	    "Layout", instp->reap_time, 2, mds_layout_create,
   3579 	    mds_layout_destroy, NULL, sizeof (mds_layout_t),
   3580 	    MDS_TABSIZE, MDS_MAXTABSZ, 100);
   3581 
   3582 	instp->mds_layout_idx = rfs4_index_create(instp->mds_layout_tab,
   3583 	    "layout-idx", mds_layout_hash, mds_layout_compare, mds_layout_mkkey,
   3584 	    TRUE);
   3585 
   3586 	instp->mds_layout_ID_idx =
   3587 	    rfs4_index_create(instp->mds_layout_tab,
   3588 	    "layout-ID-idx", mds_layout_id_hash,
   3589 	    mds_layout_id_compare, mds_layout_id_mkkey, FALSE);
   3590 
   3591 	instp->mds_layout_default_idx = 0;
   3592 
   3593 	/*
   3594 	 * Create the layout_grant table.
   3595 	 *
   3596 	 * This table tracks the layout segments that have been granted
   3597 	 * to clients. It is indexed by the layout state_id and also by client.
   3598 	 */
   3599 	instp->mds_layout_grant_tab = rfs4_table_create(instp,
   3600 	    "Layout_grant", instp->reap_time, 1, mds_layout_grant_create,
   3601 	    mds_layout_grant_destroy, NULL,
   3602 	    sizeof (mds_layout_grant_t), MDS_TABSIZE, MDS_MAXTABSZ, 100);
   3603 
   3604 	instp->mds_layout_grant_idx =
   3605 	    rfs4_index_create(instp->mds_layout_grant_tab,
   3606 	    "layout-grant-idx", mds_layout_grant_hash, mds_layout_grant_compare,
   3607 	    mds_layout_grant_mkkey, TRUE);
   3608 
   3609 #ifdef NOT_USED_NOW
   3610 	instp->mds_layout_grant_ID_idx =
   3611 	    rfs4_index_create(instp->mds_layout_grant_tab,
   3612 	    "layout-grant-ID-idx", mds_layout_grant_id_hash,
   3613 	    mds_layout_grant_id_compare, mds_layout_grant_id_mkkey, FALSE);
   3614 #endif
   3615 
   3616 	/*
   3617 	 * Create the ever_grant table.
   3618 	 *
   3619 	 * This table tracks layouts that have been granted to clients that
   3620 	 * belong to an FSID. It is indexed by the FSID and also by client.
   3621 	 */
   3622 	instp->mds_ever_grant_tab = rfs4_table_create(instp,
   3623 	    "Ever_grant", instp->reap_time, 1, mds_ever_grant_create,
   3624 	    mds_ever_grant_destroy, NULL,
   3625 	    sizeof (mds_ever_grant_t), MDS_TABSIZE, MDS_MAXTABSZ, 100);
   3626 
   3627 	instp->mds_ever_grant_idx =
   3628 	    rfs4_index_create(instp->mds_ever_grant_tab,
   3629 	    "ever-grant-idx", mds_ever_grant_hash, mds_ever_grant_compare,
   3630 	    mds_ever_grant_mkkey, TRUE);
   3631 
   3632 #ifdef NOT_USED_NOW
   3633 	instp->mds_ever_grant_fsid_idx =
   3634 	    rfs4_index_create(instp->mds_ever_grant_tab,
   3635 	    "ever-grant-fsid-idx", mds_ever_grant_fsid_hash,
   3636 	    mds_ever_grant_fsid_compare, mds_ever_grant_fsid_mkkey, FALSE);
   3637 #endif
   3638 
   3639 	/*
   3640 	 * Data server addresses.
   3641 	 */
   3642 	instp->ds_addrlist_tab = rfs4_table_create(instp,
   3643 	    "DSaddrlist", instp->reap_time, 2, ds_addrlist_create,
   3644 	    ds_addrlist_destroy, rfs41_invalid_expiry, sizeof (ds_addrlist_t),
   3645 	    MDS_TABSIZE, MDS_MAXTABSZ, 200);
   3646 
   3647 	instp->ds_addrlist_idx = rfs4_index_create(instp->ds_addrlist_tab,
   3648 	    "dsaddrlist-idx", ds_addrlist_hash, ds_addrlist_compare,
   3649 	    ds_addrlist_mkkey, TRUE);
   3650 
   3651 	instp->ds_addrlist_addrkey_idx =
   3652 	    rfs4_index_create(instp->ds_addrlist_tab,
   3653 	    "dsaddrlist-addrkey-idx", ds_addrlist_addrkey_hash,
   3654 	    ds_addrlist_addrkey_compare, ds_addrlist_addrkey_mkkey, FALSE);
   3655 
   3656 	/*
   3657 	 * Multipath Device table.
   3658 	 */
   3659 	{
   3660 		uint32_t	maxentries = MDS_MAXTABSZ;
   3661 		id_t		start = 200;
   3662 
   3663 		/*
   3664 		 * A mpd might be in use by many layouts. So, when one
   3665 		 * layout is done with a mpd, it can not invalidate the
   3666 		 * state. Also, as a mpd is created, it is immeadiately
   3667 		 * assigned to a layout, and thus the refcnt will stay at
   3668 		 * 2. Thus, if the refcnt is ever 1, that means no layout
   3669 		 * has a reference and as such, the entry can be reclaimed.
   3670 		 */
   3671 		instp->mds_mpd_tab = rfs4_table_create(instp,
   3672 		    "mpd", instp->reap_time, 1, mds_mpd_create,
   3673 		    mds_mpd_destroy, NULL,
   3674 		    sizeof (mds_mpd_t), MDS_TABSIZE, maxentries, start);
   3675 
   3676 		instp->mds_mpd_idx = rfs4_index_create(instp->mds_mpd_tab,
   3677 		    "mpd-idx", mds_mpd_hash, mds_mpd_compare,
   3678 		    mds_mpd_mkkey, TRUE);
   3679 
   3680 		if (MDS_MAXTABSZ + (uint32_t)start > (uint32_t)INT32_MAX)
   3681 			maxentries = INT32_MAX - start;
   3682 
   3683 		instp->mds_mpd_id_space =
   3684 		    id_space_create("mds_mpd_id_space", start,
   3685 		    maxentries + start);
   3686 	}
   3687 
   3688 	/*
   3689 	 * data-server information tables.
   3690 	 */
   3691 	instp->ds_owner_tab = rfs4_table_create(instp,
   3692 	    "DS_owner", instp->reap_time, 2, ds_owner_create,
   3693 	    ds_owner_destroy, mds_do_not_expire,
   3694 	    sizeof (ds_owner_t), MDS_TABSIZE, MDS_MAXTABSZ, 100);
   3695 
   3696 	instp->ds_owner_inst_idx = rfs4_index_create(instp->ds_owner_tab,
   3697 	    "DS_owner-inst-idx", mds_str_hash, ds_owner_inst_compare,
   3698 	    ds_owner_inst_mkkey, TRUE);
   3699 
   3700 	instp->ds_owner_idx = rfs4_index_create(instp->ds_owner_tab,
   3701 	    "DS_owner-idx", ds_owner_hash, ds_owner_compare,
   3702 	    ds_owner_mkkey, FALSE);
   3703 
   3704 	/*
   3705 	 * data-server guid information table.
   3706 	 */
   3707 	instp->ds_guid_info_tab = rfs4_table_create(instp,
   3708 	    "DS_guid", instp->reap_time, 2, ds_guid_info_create,
   3709 	    ds_guid_info_destroy, rfs41_invalid_expiry,
   3710 	    sizeof (ds_guid_info_t), MDS_TABSIZE, MDS_MAXTABSZ, 100);
   3711 
   3712 	instp->ds_guid_info_idx = rfs4_index_create(instp->ds_guid_info_tab,
   3713 	    "DS_guid-idx", ds_guid_info_hash, ds_guid_info_compare,
   3714 	    ds_guid_info_mkkey, TRUE);
   3715 
   3716 	instp->ds_guid_info_dataset_name_idx =
   3717 	    rfs4_index_create(instp->ds_guid_info_tab,
   3718 	    "DS_guid-dataset-name-idx", mds_utf8string_hash,
   3719 	    ds_guid_info_dataset_name_compare, ds_guid_info_dataset_name_mkkey,
   3720 	    FALSE);
   3721 
   3722 	instp->attrvers = 1;
   3723 
   3724 	/*
   3725 	 * Mark it as fully initialized
   3726 	 */
   3727 	instp->inst_flags |= NFS_INST_STORE_INIT | NFS_INST_v41;
   3728 
   3729 	/*
   3730 	 * In case we are ever able to re-init the state,
   3731 	 * make sure we clean-up the termination!
   3732 	 */
   3733 	instp->inst_flags &= ~NFS_INST_TERMINUS;
   3734 
   3735 	mutex_exit(&instp->state_lock);
   3736 }
   3737 
   3738 /*
   3739  * Module load initialization
   3740  */
   3741 void
   3742 mds_srvrinit(void)
   3743 {
   3744 	mds_recall_lo = mds_lorecall_cmd;
   3745 	mds_notify_device = mds_notify_device_cmd;
   3746 }
   3747 
   3748 void
   3749 rfs41_srvrinit(void)
   3750 {
   3751 	rfs41_dispatch_init();
   3752 }
   3753 
   3754 static char *
   3755 mds_read_odl(char *path, int *size)
   3756 {
   3757 	struct uio uio;
   3758 	struct iovec iov;
   3759 
   3760 	char *odlp;
   3761 	vnode_t *vp;
   3762 	vattr_t va;
   3763 	int sz, err, bad_file;
   3764 
   3765 	*size = 0;
   3766 	if (path == NULL)
   3767 		return (NULL);
   3768 
   3769 	/*
   3770 	 * open the layout file.
   3771 	 */
   3772 	if ((err = vn_open(path, UIO_SYSSPACE, FREAD, 0, &vp, 0, 0)) != 0) {
   3773 		return (NULL);
   3774 	}
   3775 
   3776 	if (vp->v_type != VREG) {
   3777 		(void) VOP_CLOSE(vp, FREAD, 1, (offset_t)0, CRED(), NULL);
   3778 		VN_RELE(vp);
   3779 		return (NULL);
   3780 	}
   3781 
   3782 	(void) VOP_RWLOCK(vp, V_WRITELOCK_FALSE, NULL);
   3783 
   3784 	/*
   3785 	 * get the file size.
   3786 	 */
   3787 	va.va_mask = AT_SIZE;
   3788 	err = VOP_GETATTR(vp, &va, 0, CRED(), NULL);
   3789 
   3790 	sz = va.va_size;
   3791 	bad_file = (sz == 0 || sz < sizeof (odl_t));
   3792 
   3793 	if (err || bad_file) {
   3794 		VOP_RWUNLOCK(vp, V_WRITELOCK_FALSE, NULL);
   3795 		(void) VOP_CLOSE(vp, FREAD, 1, (offset_t)0, CRED(), NULL);
   3796 		VN_RELE(vp);
   3797 		return (NULL);
   3798 	}
   3799 
   3800 	odlp = kmem_alloc(sz, KM_SLEEP);
   3801 
   3802 	/*
   3803 	 * build iovec to read in the file.
   3804 	 */
   3805 	iov.iov_base = (caddr_t)odlp;
   3806 	iov.iov_len = sz;
   3807 
   3808 	uio.uio_iov = &iov;
   3809 	uio.uio_iovcnt = 1;
   3810 	uio.uio_segflg = UIO_SYSSPACE;
   3811 	uio.uio_loffset = 0;
   3812 	uio.uio_resid = iov.iov_len;
   3813 
   3814 	if (err = VOP_READ(vp, &uio, FREAD, CRED(), NULL)) {
   3815 		VOP_RWUNLOCK(vp, V_WRITELOCK_FALSE, NULL);
   3816 		(void) VOP_CLOSE(vp, FREAD, 1, (offset_t)0, CRED(), NULL);
   3817 		VN_RELE(vp);
   3818 		kmem_free(odlp, sz);
   3819 		return (NULL);
   3820 	}
   3821 
   3822 	VOP_RWUNLOCK(vp, V_WRITELOCK_FALSE, NULL);
   3823 	(void) VOP_CLOSE(vp, FREAD, 1, (offset_t)0, CRED(), NULL);
   3824 	VN_RELE(vp);
   3825 	*size = sz;
   3826 
   3827 	return (odlp);
   3828 }
   3829 
   3830 /*
   3831  * blah
   3832  */
   3833 static int
   3834 mds_write_odl(char *path, char *odlp, int size)
   3835 {
   3836 	int ioflag, err;
   3837 	struct uio uio;
   3838 	struct iovec iov;
   3839 	vnode_t *vp;
   3840 
   3841 	if (path == NULL)
   3842 		return (-1);
   3843 
   3844 	if (vn_open(path, UIO_SYSSPACE, FCREAT|FWRITE|FTRUNC, 0600, &vp,
   3845 	    CRCREAT, 0)) {
   3846 		return (-1);
   3847 	}
   3848 
   3849 	iov.iov_base = (caddr_t)odlp;
   3850 	iov.iov_len = size;
   3851 
   3852 	uio.uio_iov = &iov;
   3853 	uio.uio_iovcnt = 1;
   3854 	uio.uio_loffset = 0;
   3855 	uio.uio_segflg = UIO_SYSSPACE;
   3856 	uio.uio_llimit = (rlim64_t)MAXOFFSET_T;
   3857 	uio.uio_resid = size;
   3858 
   3859 	ioflag = uio.uio_fmode = (FWRITE|FSYNC);
   3860 	uio.uio_extflg = UIO_COPY_DEFAULT;
   3861 
   3862 	(void) VOP_RWLOCK(vp, V_WRITELOCK_TRUE, NULL);
   3863 	err = VOP_WRITE(vp, &uio, ioflag, CRED(), NULL);
   3864 	VOP_RWUNLOCK(vp, V_WRITELOCK_TRUE, NULL);
   3865 
   3866 	(void) VOP_CLOSE(vp, FWRITE, 1, (offset_t)0, CRED(), NULL);
   3867 	VN_RELE(vp);
   3868 
   3869 	return (err);
   3870 }
   3871 
   3872 static void
   3873 mds_remove_odl(char *path)
   3874 {
   3875 	(void) vn_remove(path, UIO_SYSSPACE, RMFILE);
   3876 }
   3877 
   3878 #define	ODL_DIR	"/var/nfs/v4_state/layouts"
   3879 
   3880 int
   3881 mds_mkdir(char *parent, char *dirnm)
   3882 {
   3883 	int err;
   3884 	vnode_t *dvp, *vp;
   3885 	struct vattr vap;
   3886 	cred_t *cr = CRED();
   3887 
   3888 /*
   3889  *	if (err = lookupname(parent, UIO_SYSSPACE, NO_FOLLOW, NULLVPP, &dvp))
   3890  */
   3891 	if ((err = vn_open(parent, UIO_SYSSPACE, FREAD, 0, &dvp, 0, 0)))
   3892 		return (1);
   3893 
   3894 	vap.va_mask = AT_UID|AT_GID|AT_TYPE|AT_MODE;
   3895 	vap.va_uid = crgetuid(cr);
   3896 	vap.va_gid = crgetgid(cr);
   3897 	vap.va_type = VDIR;
   3898 	vap.va_mode = 0755;
   3899 	err = VOP_MKDIR(dvp, dirnm, &vap, &vp, cr, NULL, 0, NULL);
   3900 
   3901 	(void) VOP_CLOSE(dvp, FREAD, 1, (offset_t)0, CRED(), NULL);
   3902 	VN_RELE(dvp);
   3903 
   3904 	if (err)
   3905 		return (1);
   3906 
   3907 	VN_RELE(vp);
   3908 
   3909 	return (0);
   3910 }
   3911 
   3912 /*
   3913  * Pathname will be /var/nfs/v4_state/layouts/<fsid>/<fid>
   3914  */
   3915 char *
   3916 mds_create_name(vnode_t *vp, int *len)
   3917 {
   3918 	static int parent_created = 0;
   3919 	int plen, err;
   3920 	fid_t fid;
   3921 	statvfs64_t svfs;
   3922 	vnode_t *dvp = NULL;
   3923 	uint64_t name = 0;
   3924 	char *pname;
   3925 	char dir[65];
   3926 
   3927 	*len = 0;
   3928 	if (!parent_created) {
   3929 		if (vn_open(ODL_DIR, UIO_SYSSPACE, FREAD, 0, &dvp, 0, 0)) {
   3930 			err = mds_mkdir("/var/nfs/v4_state", "layouts");
   3931 			if (err)
   3932 				return (NULL);
   3933 		} else {
   3934 			(void) VOP_CLOSE(dvp, FREAD, 1, (offset_t)0,
   3935 			    CRED(), NULL);
   3936 			VN_RELE(dvp);
   3937 		}
   3938 		parent_created = 1;
   3939 	}
   3940 
   3941 	/*
   3942 	 * fsid = vp->v_vfsp->vfs_fsid;
   3943 	 * zfs changes vfs_fsid on reboot, so we can't use it.
   3944 	 */
   3945 	err = VFS_STATVFS(vp->v_vfsp, &svfs);
   3946 	if (err) {
   3947 		return (NULL);
   3948 	}
   3949 
   3950 	(void) snprintf(dir, 65, "%llx", (long long)svfs.f_fsid);
   3951 
   3952 	plen = MAXPATHLEN;
   3953 	pname = kmem_alloc(plen, KM_SLEEP);
   3954 	(void) snprintf(pname, plen, "%s/%s", ODL_DIR, dir);
   3955 
   3956 	/* does this dir already exist */
   3957 	if (vn_open(pname, UIO_SYSSPACE, FREAD, 0, &dvp, 0, 0)) {
   3958 		err = mds_mkdir(ODL_DIR, dir);
   3959 		if (err) {
   3960 			kmem_free(pname, plen);
   3961 			return (NULL);
   3962 		}
   3963 	} else {
   3964 		(void) VOP_CLOSE(dvp, FREAD, 1, (offset_t)0, CRED(), NULL);
   3965 		VN_RELE(dvp);
   3966 	}
   3967 
   3968 	bzero(&fid, sizeof (fid));
   3969 	fid.fid_len = MAXFIDSZ;
   3970 	err = VOP_FID(vp, &fid, NULL);
   3971 	if (err || fid.fid_len == 0) {
   3972 		kmem_free(pname, plen);
   3973 		return (NULL);
   3974 	}
   3975 
   3976 	bcopy(fid.fid_data, &name, fid.fid_len);
   3977 
   3978 	(void) snprintf(pname, plen, "%s/%s/%llx", ODL_DIR, dir,
   3979 	    (long long)name);
   3980 
   3981 	*len = plen;
   3982 	return (pname);
   3983 }
   3984 
   3985 /* xdr encode a mds_layout to the on-disk layout */
   3986 static char *
   3987 xdr_convert_layout(mds_layout_t *lp, int *size)
   3988 {
   3989 	int xdr_size;
   3990 	char *xdr_buf;
   3991 	XDR xdr;
   3992 	odl on_disk;
   3993 	odl_t odlt;
   3994 
   3995 	/* otw_flo.nfl_first_stripe_index hard coded to 0 */
   3996 	odlt.start_idx = 0;
   3997 	odlt.unit_size = lp->mlo_lc.lc_stripe_unit;
   3998 
   3999 	/* offset and length are currently hard coded, as well */
   4000 	odlt.offset = 0;
   4001 	odlt.length = -1;
   4002 
   4003 	odlt.sid.sid_len = lp->mlo_lc.lc_stripe_count;
   4004 	odlt.sid.sid_val = lp->mlo_lc.lc_mds_sids;
   4005 
   4006 	on_disk.odl_type = PNFS;
   4007 	on_disk.odl_u.odl_pnfs.odl_vers = VERS_1;
   4008 	on_disk.odl_u.odl_pnfs.odl_lo_u.odl_content.odl_content_len = 1;
   4009 	on_disk.odl_u.odl_pnfs.odl_lo_u.odl_content.odl_content_val = &odlt;
   4010 
   4011 	xdr_size = xdr_sizeof(xdr_odl, (char *)&on_disk);
   4012 	xdr_buf = kmem_zalloc(xdr_size, KM_SLEEP);
   4013 
   4014 	xdrmem_create(&xdr, xdr_buf, xdr_size, XDR_ENCODE);
   4015 
   4016 	if (xdr_odl(&xdr, &on_disk) == FALSE) {
   4017 		*size = 0;
   4018 		kmem_free(xdr_buf, xdr_size);
   4019 		return (NULL);
   4020 	}
   4021 
   4022 	*size = xdr_size;
   4023 	return (xdr_buf);
   4024 }
   4025 
   4026 /* xdr decode an on-disk layout to an odl struct */
   4027 /*ARGSUSED*/
   4028 static odl *
   4029 xdr_convert_odl(char *odlp, int size)
   4030 {
   4031 	int sz;
   4032 	char *unxdr_buf;
   4033 	XDR xdr;
   4034 
   4035 	sz = sizeof (odl);
   4036 	unxdr_buf = kmem_zalloc(sz, KM_SLEEP);
   4037 
   4038 	xdrmem_create(&xdr, odlp, size, XDR_DECODE);
   4039 
   4040 	if (xdr_odl(&xdr, (odl *)unxdr_buf) == FALSE) {
   4041 		kmem_free(unxdr_buf, sz);
   4042 		return (NULL);
   4043 	}
   4044 
   4045 	return ((odl *)unxdr_buf);
   4046 }
   4047 
   4048 int
   4049 odl_already_written(char *name)
   4050 {
   4051 	vnode_t	*vp;
   4052 
   4053 	ASSERT(name != NULL);
   4054 
   4055 	if (vn_open(name, UIO_SYSSPACE, FREAD, 0, &vp, 0, 0))
   4056 		return (0);	/* does not exist */
   4057 
   4058 	(void) VOP_CLOSE(vp, FREAD, 1, (offset_t)0, CRED(), NULL);
   4059 	VN_RELE(vp);
   4060 	return (1);	/* has already been written */
   4061 }
   4062 
   4063 int
   4064 mds_put_layout(mds_layout_t *lp, vnode_t *vp)
   4065 {
   4066 	char *odlp;
   4067 	char *name;
   4068 	int len, size, err;
   4069 
   4070 	if (lp == NULL) {
   4071 		return (-2);
   4072 	}
   4073 
   4074 	name = mds_create_name(vp, &len);
   4075 	if (name == NULL) {
   4076 		return (-1);
   4077 	}
   4078 
   4079 	if (odl_already_written(name)) {
   4080 		kmem_free(name, len);
   4081 		return (0);
   4082 	}
   4083 
   4084 	/* mythical xdr encode routine */
   4085 	odlp = xdr_convert_layout(lp, &size);
   4086 	if (odlp == NULL) {
   4087 		kmem_free(name, len);
   4088 		return (-1);
   4089 	}
   4090 
   4091 	err = mds_write_odl(name, odlp, size);
   4092 
   4093 	kmem_free(name, len);
   4094 	kmem_free(odlp, size);
   4095 
   4096 	return (err);
   4097 }
   4098 
   4099 int
   4100 mds_get_odl(vnode_t *vp, mds_layout_t **plp)
   4101 {
   4102 	char	*odlp;
   4103 	int	len, size;
   4104 	int	i;
   4105 	char	*name;
   4106 
   4107 	mds_layout_t	*lp;
   4108 	layout_core_t	lc;
   4109 
   4110 	odl	*on_disk;
   4111 	odl_t	*odlt;
   4112 
   4113 	ASSERT(plp != NULL);
   4114 
   4115 	name = mds_create_name(vp, &len);
   4116 	if (name == NULL)
   4117 		return (NFS4ERR_LAYOUTTRYLATER);
   4118 
   4119 	odlp = mds_read_odl(name, &size);
   4120 	if (odlp == NULL) {
   4121 		kmem_free(name, len);
   4122 		return (NFS4ERR_LAYOUTTRYLATER);
   4123 	}
   4124 
   4125 	/* the magic xdr decode routine */
   4126 	on_disk = xdr_convert_odl(odlp, size);
   4127 
   4128 	kmem_free(name, len);
   4129 	kmem_free(odlp, size);
   4130 
   4131 	if (on_disk == NULL)
   4132 		return (NFS4ERR_LAYOUTTRYLATER);
   4133 
   4134 	odlt = on_disk->odl_u.odl_pnfs.odl_lo_u.odl_content.odl_content_val;
   4135 
   4136 	lc.lc_stripe_unit = odlt->unit_size;
   4137 	lc.lc_stripe_count = odlt->sid.sid_len;
   4138 	lc.lc_mds_sids = odlt->sid.sid_val;
   4139 
   4140 	lp = mds_add_layout(&lc);
   4141 
   4142 	/* these were allocated by the xdr decode process */
   4143 
   4144 	for (i = 0; i < odlt->sid.sid_len; i++) {
   4145 		kmem_free(odlt->sid.sid_val[i].val, odlt->sid.sid_val[i].len);
   4146 	}
   4147 
   4148 	kmem_free(odlt->sid.sid_val, (odlt->sid.sid_len * sizeof (mds_sid)));
   4149 	kmem_free(odlt, sizeof (odl_t));
   4150 	kmem_free(on_disk, sizeof (odl));
   4151 
   4152 	if (lp == NULL)
   4153 		return (NFS4ERR_LAYOUTTRYLATER);
   4154 
   4155 	*plp = lp;
   4156 
   4157 	return (NFS4_OK);
   4158 }
   4159 
   4160 void
   4161 mds_delete_layout(vnode_t *vp)
   4162 {
   4163 	int len;
   4164 	char *name;
   4165 
   4166 	name = mds_create_name(vp, &len);
   4167 	if (name == NULL) {
   4168 		return;
   4169 	}
   4170 
   4171 	mds_remove_odl(name);
   4172 
   4173 	kmem_free(name, len);
   4174 }
   4175