Home | History | Annotate | Download | only in rds
      1 /*
      2  * CDDL HEADER START
      3  *
      4  * The contents of this file are subject to the terms of the
      5  * Common Development and Distribution License (the "License").
      6  * You may not use this file except in compliance with the License.
      7  *
      8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
      9  * or http://www.opensolaris.org/os/licensing.
     10  * See the License for the specific language governing permissions
     11  * and limitations under the License.
     12  *
     13  * When distributing Covered Code, include this CDDL HEADER in each
     14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
     15  * If applicable, add the following below this CDDL HEADER, with the
     16  * fields enclosed by brackets "[]" replaced with your own identifying
     17  * information: Portions Copyright [yyyy] [name of copyright owner]
     18  *
     19  * CDDL HEADER END
     20  */
     21 /*
     22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
     23  * Use is subject to license terms.
     24  */
     25 /*
     26  * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
     27  *
     28  * This software is available to you under a choice of one of two
     29  * licenses.  You may choose to be licensed under the terms of the GNU
     30  * General Public License (GPL) Version 2, available from the file
     31  * COPYING in the main directory of this source tree, or the
     32  * OpenIB.org BSD license below:
     33  *
     34  *     Redistribution and use in source and binary forms, with or
     35  *     without modification, are permitted provided that the following
     36  *     conditions are met:
     37  *
     38  *	- Redistributions of source code must retain the above
     39  *	  copyright notice, this list of conditions and the following
     40  *	  disclaimer.
     41  *
     42  *	- Redistributions in binary form must reproduce the above
     43  *	  copyright notice, this list of conditions and the following
     44  *	  disclaimer in the documentation and/or other materials
     45  *	  provided with the distribution.
     46  *
     47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     54  * SOFTWARE.
     55  *
     56  */
     57 /*
     58  * Sun elects to include this software in Sun product
     59  * under the OpenIB BSD license.
     60  *
     61  *
     62  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
     63  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     64  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     65  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
     66  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     67  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
     68  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     69  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     70  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     71  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
     72  * POSSIBILITY OF SUCH DAMAGE.
     73  */
     74 
     75 #include <sys/ib/clients/rds/rdsib_cm.h>
     76 #include <sys/ib/clients/rds/rdsib_ib.h>
     77 #include <sys/ib/clients/rds/rdsib_buf.h>
     78 #include <sys/ib/clients/rds/rdsib_ep.h>
     79 #include <sys/ib/clients/rds/rds_kstat.h>
     80 
     81 /*
     82  * This File contains the buffer management code
     83  */
     84 
     85 #define	DUMP_USER_PARAMS()	\
     86 	RDS_DPRINTF3(LABEL, "MaxNodes = %d", MaxNodes); \
     87 	RDS_DPRINTF3(LABEL, "UserBufferSize = %d", UserBufferSize); \
     88 	RDS_DPRINTF3(LABEL, "RdsPktSize = %d", RdsPktSize); \
     89 	RDS_DPRINTF3(LABEL, "MaxDataSendBuffers = %d", MaxDataSendBuffers); \
     90 	RDS_DPRINTF3(LABEL, "MaxDataRecvBuffers = %d", MaxDataRecvBuffers); \
     91 	RDS_DPRINTF3(LABEL, "MaxCtrlSendBuffers = %d", MaxCtrlSendBuffers); \
     92 	RDS_DPRINTF3(LABEL, "MaxCtrlRecvBuffers = %d", MaxCtrlRecvBuffers); \
     93 	RDS_DPRINTF3(LABEL, "DataRecvBufferLWM = %d", DataRecvBufferLWM); \
     94 	RDS_DPRINTF3(LABEL, "PendingRxPktsHWM = %d", PendingRxPktsHWM); \
     95 	RDS_DPRINTF3(LABEL, "MinRnrRetry = %d", MinRnrRetry)
     96 
     97 uint_t	rds_nbuffers_to_putback;
     98 
     99 static void
    100 rds_free_mblk(char *arg)
    101 {
    102 	rds_buf_t *bp = (rds_buf_t *)(uintptr_t)arg;
    103 
    104 	/* Free the recv buffer */
    105 	RDS_DPRINTF4("rds_free_mblk", "Enter: BP(%p)", bp);
    106 	ASSERT(bp->buf_state == RDS_RCVBUF_ONSOCKQ);
    107 	rds_free_recv_buf(bp, 1);
    108 	RDS_DECR_RXPKTS_PEND(1);
    109 	RDS_DPRINTF4("rds_free_mblk", "Return: BP(%p)", bp);
    110 }
    111 
    112 void
    113 rds_free_recv_caches(rds_state_t *statep)
    114 {
    115 	rds_hca_t	*hcap;
    116 	int		ret;
    117 
    118 	RDS_DPRINTF4("rds_free_recv_caches", "Enter");
    119 
    120 	mutex_enter(&rds_dpool.pool_lock);
    121 	if (rds_dpool.pool_memp == NULL) {
    122 		RDS_DPRINTF2("rds_free_recv_caches", "Caches are empty");
    123 		mutex_exit(&rds_dpool.pool_lock);
    124 		return;
    125 	}
    126 
    127 	/*
    128 	 * All buffers must have been freed as all sessions are closed
    129 	 * and destroyed
    130 	 */
    131 	ASSERT(rds_dpool.pool_nbusy == 0);
    132 	RDS_DPRINTF2("rds_free_recv_caches", "Data Pool has "
    133 	    "pending buffers: %d", rds_dpool.pool_nbusy);
    134 	while (rds_dpool.pool_nbusy != 0) {
    135 		mutex_exit(&rds_dpool.pool_lock);
    136 		delay(drv_usectohz(1000000));
    137 		mutex_enter(&rds_dpool.pool_lock);
    138 	}
    139 
    140 	hcap = statep->rds_hcalistp;
    141 	while (hcap != NULL) {
    142 		if (hcap->hca_mrhdl != NULL) {
    143 			ret = ibt_deregister_mr(hcap->hca_hdl,
    144 			    hcap->hca_mrhdl);
    145 			if (ret == IBT_SUCCESS) {
    146 				hcap->hca_mrhdl = NULL;
    147 				hcap->hca_lkey = 0;
    148 				hcap->hca_rkey = 0;
    149 			} else {
    150 				RDS_DPRINTF2(LABEL, "ibt_deregister_mr "
    151 				    "failed: %d, mrhdl: 0x%p", ret,
    152 				    hcap->hca_mrhdl);
    153 			}
    154 		}
    155 		hcap = hcap->hca_nextp;
    156 	}
    157 
    158 	kmem_free(rds_dpool.pool_bufmemp, (rds_dpool.pool_nbuffers +
    159 	    rds_cpool.pool_nbuffers) * sizeof (rds_buf_t));
    160 	rds_dpool.pool_bufmemp = NULL;
    161 
    162 	kmem_free(rds_dpool.pool_memp, rds_dpool.pool_memsize);
    163 	rds_dpool.pool_memp = NULL;
    164 
    165 	mutex_exit(&rds_dpool.pool_lock);
    166 
    167 	RDS_DPRINTF4("rds_free_recv_caches", "Return");
    168 }
    169 
    170 int
    171 rds_init_recv_caches(rds_state_t *statep)
    172 {
    173 	uint8_t		*mp;
    174 	rds_buf_t	*bp;
    175 	rds_hca_t	*hcap;
    176 	uint32_t	nsessions;
    177 	uint_t		ix;
    178 	uint_t		nctrlrx;
    179 	uint8_t		*memp;
    180 	uint_t		memsize, nbuf;
    181 	rds_buf_t	*bufmemp;
    182 	ibt_mr_attr_t	mem_attr;
    183 	ibt_mr_desc_t	mem_desc;
    184 	int		ret;
    185 
    186 	RDS_DPRINTF4("rds_init_recv_caches", "Enter");
    187 
    188 	DUMP_USER_PARAMS();
    189 
    190 	mutex_enter(&rds_dpool.pool_lock);
    191 	if (rds_dpool.pool_memp != NULL) {
    192 		RDS_DPRINTF2("rds_init_recv_caches", "Pools are already "
    193 		    "initialized");
    194 		mutex_exit(&rds_dpool.pool_lock);
    195 		return (0);
    196 	}
    197 
    198 	/*
    199 	 * High water mark for the receive buffers in the system. If the
    200 	 * number of buffers used crosses this mark then all sockets in
    201 	 * would be stalled. The port quota for the sockets is set based
    202 	 * on this limit.
    203 	 */
    204 	rds_rx_pkts_pending_hwm = (PendingRxPktsHWM * NDataRX)/100;
    205 
    206 	rds_nbuffers_to_putback = min(MaxCtrlRecvBuffers, MaxDataRecvBuffers);
    207 
    208 	/* nsessions can never be less than 1 */
    209 	nsessions = MaxNodes - 1;
    210 	nctrlrx = (nsessions + 1) * MaxCtrlRecvBuffers * 2;
    211 
    212 	RDS_DPRINTF3(LABEL, "Number of Possible Sessions: %d", nsessions);
    213 
    214 	/* Add the hdr */
    215 	RdsPktSize = UserBufferSize + RDS_DATA_HDR_SZ;
    216 
    217 	memsize = (NDataRX * RdsPktSize) + (nctrlrx * RDS_CTRLPKT_SIZE);
    218 	nbuf = NDataRX + nctrlrx;
    219 	RDS_DPRINTF3(LABEL, "RDS Buffer Pool Memory: %lld", memsize);
    220 	RDS_DPRINTF3(LABEL, "Total Buffers: %d", nbuf);
    221 
    222 	memp = (uint8_t *)kmem_zalloc(memsize, KM_NOSLEEP);
    223 	if (memp == NULL) {
    224 		RDS_DPRINTF1(LABEL, "RDS Memory allocation failed");
    225 		mutex_exit(&rds_dpool.pool_lock);
    226 		return (-1);
    227 	}
    228 
    229 	RDS_DPRINTF3(LABEL, "RDS Buffer Entries Memory: %lld",
    230 	    nbuf * sizeof (rds_buf_t));
    231 
    232 	/* allocate memory for buffer entries */
    233 	bufmemp = (rds_buf_t *)kmem_zalloc(nbuf * sizeof (rds_buf_t),
    234 	    KM_SLEEP);
    235 
    236 	/* register the memory with all HCAs */
    237 	mem_attr.mr_vaddr = (ib_vaddr_t)(uintptr_t)memp;
    238 	mem_attr.mr_len = memsize;
    239 	mem_attr.mr_as = NULL;
    240 	mem_attr.mr_flags = IBT_MR_ENABLE_LOCAL_WRITE;
    241 
    242 	rw_enter(&statep->rds_hca_lock, RW_WRITER);
    243 
    244 	hcap = statep->rds_hcalistp;
    245 	while (hcap != NULL) {
    246 		if (hcap->hca_state != RDS_HCA_STATE_OPEN) {
    247 			hcap = hcap->hca_nextp;
    248 			continue;
    249 		}
    250 
    251 		ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
    252 		    &mem_attr, &hcap->hca_mrhdl, &mem_desc);
    253 		if (ret != IBT_SUCCESS) {
    254 			RDS_DPRINTF2(LABEL, "ibt_register_mr failed: %d", ret);
    255 			hcap = statep->rds_hcalistp;
    256 			while ((hcap) && (hcap->hca_mrhdl != NULL)) {
    257 				ret = ibt_deregister_mr(hcap->hca_hdl,
    258 				    hcap->hca_mrhdl);
    259 				if (ret == IBT_SUCCESS) {
    260 					hcap->hca_mrhdl = NULL;
    261 					hcap->hca_lkey = 0;
    262 					hcap->hca_rkey = 0;
    263 				} else {
    264 					RDS_DPRINTF2(LABEL, "ibt_deregister_mr "
    265 					    "failed: %d, mrhdl: 0x%p", ret,
    266 					    hcap->hca_mrhdl);
    267 				}
    268 				hcap = hcap->hca_nextp;
    269 			}
    270 			kmem_free(bufmemp, nbuf * sizeof (rds_buf_t));
    271 			kmem_free(memp, memsize);
    272 			rw_exit(&statep->rds_hca_lock);
    273 			mutex_exit(&rds_dpool.pool_lock);
    274 			return (-1);
    275 		}
    276 
    277 		hcap->hca_state = RDS_HCA_STATE_MEM_REGISTERED;
    278 		hcap->hca_lkey = mem_desc.md_lkey;
    279 		hcap->hca_rkey = mem_desc.md_rkey;
    280 
    281 		hcap = hcap->hca_nextp;
    282 	}
    283 	rw_exit(&statep->rds_hca_lock);
    284 
    285 	/* Initialize data pool */
    286 	rds_dpool.pool_memp = memp;
    287 	rds_dpool.pool_memsize = memsize;
    288 	rds_dpool.pool_bufmemp = bufmemp;
    289 	rds_dpool.pool_nbuffers = NDataRX;
    290 	rds_dpool.pool_nbusy = 0;
    291 	rds_dpool.pool_nfree = NDataRX;
    292 
    293 	/* chain the buffers */
    294 	mp = memp;
    295 	bp = bufmemp;
    296 	for (ix = 0; ix < NDataRX; ix++) {
    297 		bp[ix].buf_nextp = &bp[ix + 1];
    298 		bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    299 		bp[ix].buf_state = RDS_RCVBUF_FREE;
    300 		bp[ix].buf_frtn.free_func = rds_free_mblk;
    301 		bp[ix].buf_frtn.free_arg = (char *)&bp[ix];
    302 		mp = mp + RdsPktSize;
    303 	}
    304 	bp[NDataRX - 1].buf_nextp = NULL;
    305 	rds_dpool.pool_headp = &bp[0];
    306 	rds_dpool.pool_tailp = &bp[NDataRX - 1];
    307 
    308 	/* Initialize ctrl pool */
    309 	rds_cpool.pool_nbuffers = nctrlrx;
    310 	rds_cpool.pool_nbusy = 0;
    311 	rds_cpool.pool_nfree = nctrlrx;
    312 
    313 	/* chain the buffers */
    314 	for (ix = NDataRX; ix < nbuf - 1; ix++) {
    315 		bp[ix].buf_nextp = &bp[ix + 1];
    316 		bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    317 		mp = mp + RDS_CTRLPKT_SIZE;
    318 	}
    319 	bp[nbuf - 1].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    320 	bp[nbuf - 1].buf_nextp = NULL;
    321 	rds_cpool.pool_headp = &bp[NDataRX];
    322 	rds_cpool.pool_tailp = &bp[nbuf - 1];
    323 
    324 	mutex_exit(&rds_dpool.pool_lock);
    325 
    326 	RDS_DPRINTF3(LABEL, "rdsmemp start: %p end: %p", memp, mp);
    327 	RDS_DPRINTF4("rds_init_recv_caches", "Return");
    328 	return (0);
    329 }
    330 
    331 rds_hca_t *rds_lkup_hca(ib_guid_t hca_guid);
    332 
    333 void
    334 rds_free_send_pool(rds_ep_t *ep)
    335 {
    336 	rds_bufpool_t   *pool;
    337 	rds_hca_t	*hcap;
    338 	int		ret;
    339 
    340 	pool = &ep->ep_sndpool;
    341 
    342 	mutex_enter(&pool->pool_lock);
    343 	if (pool->pool_memp == NULL) {
    344 		mutex_exit(&pool->pool_lock);
    345 		RDS_DPRINTF2("rds_free_send_pool",
    346 		    "EP(%p) DOUBLE Free on Send Pool", ep);
    347 		return;
    348 	}
    349 
    350 	/* get the hcap for the HCA hosting this channel */
    351 	hcap = rds_lkup_hca(ep->ep_hca_guid);
    352 	if (hcap == NULL) {
    353 		RDS_DPRINTF2("rds_free_send_pool", "HCA (0x%llx) not found",
    354 		    ep->ep_hca_guid);
    355 	} else {
    356 		ret = ibt_deregister_mr(hcap->hca_hdl, ep->ep_snd_mrhdl);
    357 		if (ret != IBT_SUCCESS) {
    358 			RDS_DPRINTF2(LABEL,
    359 			    "ibt_deregister_mr failed: %d, mrhdl: 0x%p",
    360 			    ret, ep->ep_snd_mrhdl);
    361 		}
    362 
    363 		if (ep->ep_ack_addr) {
    364 			ret = ibt_deregister_mr(hcap->hca_hdl, ep->ep_ackhdl);
    365 			if (ret != IBT_SUCCESS) {
    366 				RDS_DPRINTF2(LABEL,
    367 				    "ibt_deregister_mr ackhdl failed: %d, "
    368 				    "mrhdl: 0x%p", ret, ep->ep_ackhdl);
    369 			}
    370 
    371 			kmem_free((void *)ep->ep_ack_addr, sizeof (uintptr_t));
    372 			ep->ep_ack_addr = NULL;
    373 		}
    374 	}
    375 
    376 	kmem_free(pool->pool_memp, pool->pool_memsize);
    377 	kmem_free(pool->pool_bufmemp,
    378 	    pool->pool_nbuffers * sizeof (rds_buf_t));
    379 	pool->pool_memp = NULL;
    380 	pool->pool_bufmemp = NULL;
    381 	mutex_exit(&pool->pool_lock);
    382 }
    383 
    384 int
    385 rds_init_send_pool(rds_ep_t *ep, ib_guid_t hca_guid)
    386 {
    387 	uint8_t		*mp;
    388 	rds_buf_t	*bp;
    389 	rds_hca_t	*hcap;
    390 	uint_t		ix, rcv_len;
    391 	ibt_mr_attr_t   mem_attr;
    392 	ibt_mr_desc_t   mem_desc;
    393 	uint8_t		*memp;
    394 	rds_buf_t	*bufmemp;
    395 	uintptr_t	ack_addr = NULL;
    396 	uint_t		memsize;
    397 	uint_t		nbuf;
    398 	rds_bufpool_t   *spool;
    399 	rds_data_hdr_t	*pktp;
    400 	int		ret;
    401 
    402 	RDS_DPRINTF2("rds_init_send_pool", "Enter");
    403 
    404 	spool = &ep->ep_sndpool;
    405 
    406 	ASSERT(spool->pool_memp == NULL);
    407 	ASSERT(ep->ep_hca_guid == 0);
    408 
    409 	/* get the hcap for the HCA hosting this channel */
    410 	hcap = rds_get_hcap(rdsib_statep, hca_guid);
    411 	if (hcap == NULL) {
    412 		RDS_DPRINTF2("rds_init_send_pool", "HCA (0x%llx) not found",
    413 		    hca_guid);
    414 		return (-1);
    415 	}
    416 
    417 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
    418 		spool->pool_nbuffers = MaxDataSendBuffers;
    419 		spool->pool_nbusy = 0;
    420 		spool->pool_nfree = MaxDataSendBuffers;
    421 		memsize = (MaxDataSendBuffers * RdsPktSize) +
    422 		    sizeof (uintptr_t);
    423 		rcv_len = RdsPktSize;
    424 	} else {
    425 		spool->pool_nbuffers = MaxCtrlSendBuffers;
    426 		spool->pool_nbusy = 0;
    427 		spool->pool_nfree = MaxCtrlSendBuffers;
    428 		memsize = MaxCtrlSendBuffers * RDS_CTRLPKT_SIZE;
    429 		rcv_len = RDS_CTRLPKT_SIZE;
    430 	}
    431 	nbuf = spool->pool_nbuffers;
    432 
    433 	RDS_DPRINTF3(LABEL, "RDS Send Pool Memory: %lld", memsize);
    434 
    435 	memp = (uint8_t *)kmem_zalloc(memsize, KM_NOSLEEP);
    436 	if (memp == NULL) {
    437 		RDS_DPRINTF1(LABEL, "RDS Send Memory allocation failed");
    438 		return (-1);
    439 	}
    440 
    441 	RDS_DPRINTF3(LABEL, "RDS Buffer Entries Memory: %lld",
    442 	    nbuf * sizeof (rds_buf_t));
    443 
    444 	/* allocate memory for buffer entries */
    445 	bufmemp = (rds_buf_t *)kmem_zalloc(nbuf * sizeof (rds_buf_t),
    446 	    KM_SLEEP);
    447 
    448 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
    449 		ack_addr = (uintptr_t)kmem_zalloc(sizeof (uintptr_t), KM_SLEEP);
    450 
    451 		/* register the memory with the HCA for this channel */
    452 		mem_attr.mr_vaddr = (ib_vaddr_t)ack_addr;
    453 		mem_attr.mr_len = sizeof (uintptr_t);
    454 		mem_attr.mr_as = NULL;
    455 		mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE |
    456 		    IBT_MR_ENABLE_REMOTE_WRITE;
    457 
    458 		ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
    459 		    &mem_attr, &ep->ep_ackhdl, &mem_desc);
    460 		if (ret != IBT_SUCCESS) {
    461 			RDS_DPRINTF2("rds_init_send_pool",
    462 			    "EP(%p): ibt_register_mr for ack failed: %d",
    463 			    ep, ret);
    464 			kmem_free(memp, memsize);
    465 			kmem_free(bufmemp, nbuf * sizeof (rds_buf_t));
    466 			kmem_free((void *)ack_addr, sizeof (uintptr_t));
    467 			return (-1);
    468 		}
    469 		ep->ep_ack_rkey = mem_desc.md_rkey;
    470 		ep->ep_ack_addr = ack_addr;
    471 	}
    472 
    473 	/* register the memory with the HCA for this channel */
    474 	mem_attr.mr_vaddr = (ib_vaddr_t)(uintptr_t)memp;
    475 	mem_attr.mr_len = memsize;
    476 	mem_attr.mr_as = NULL;
    477 	mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE;
    478 
    479 	ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
    480 	    &mem_attr, &ep->ep_snd_mrhdl, &mem_desc);
    481 	if (ret != IBT_SUCCESS) {
    482 		RDS_DPRINTF2("rds_init_send_pool", "EP(%p): ibt_register_mr "
    483 		    "failed: %d", ep, ret);
    484 		kmem_free(memp, memsize);
    485 		kmem_free(bufmemp, nbuf * sizeof (rds_buf_t));
    486 		if (ack_addr != NULL)
    487 			kmem_free((void *)ack_addr, sizeof (uintptr_t));
    488 		return (-1);
    489 	}
    490 	ep->ep_snd_lkey = mem_desc.md_lkey;
    491 
    492 
    493 	/* Initialize the pool */
    494 	spool->pool_memp = memp;
    495 	spool->pool_memsize = memsize;
    496 	spool->pool_bufmemp = bufmemp;
    497 	spool->pool_sqpoll_pending = B_FALSE;
    498 
    499 	/* chain the buffers and initialize them */
    500 	mp = memp;
    501 	bp = bufmemp;
    502 
    503 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
    504 		for (ix = 0; ix < nbuf - 1; ix++) {
    505 			bp[ix].buf_nextp = &bp[ix + 1];
    506 			bp[ix].buf_ep = ep;
    507 			bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    508 			bp[ix].buf_ds.ds_key = ep->ep_snd_lkey;
    509 			bp[ix].buf_state = RDS_SNDBUF_FREE;
    510 			pktp = (rds_data_hdr_t *)(uintptr_t)mp;
    511 			pktp->dh_bufid = (uintptr_t)&bp[ix];
    512 			mp = mp + rcv_len;
    513 		}
    514 		bp[nbuf - 1].buf_nextp = NULL;
    515 		bp[nbuf - 1].buf_ep = ep;
    516 		bp[nbuf - 1].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    517 		bp[nbuf - 1].buf_ds.ds_key = ep->ep_snd_lkey;
    518 		bp[nbuf - 1].buf_state = RDS_SNDBUF_FREE;
    519 		pktp = (rds_data_hdr_t *)(uintptr_t)mp;
    520 		pktp->dh_bufid = (uintptr_t)&bp[nbuf - 1];
    521 
    522 		spool->pool_headp = &bp[0];
    523 		spool->pool_tailp = &bp[nbuf - 1];
    524 
    525 		mp = mp + rcv_len;
    526 		ep->ep_ackds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    527 		ep->ep_ackds.ds_key = ep->ep_snd_lkey;
    528 		ep->ep_ackds.ds_len = sizeof (uintptr_t);
    529 
    530 		*(uintptr_t *)ep->ep_ack_addr = (uintptr_t)spool->pool_tailp;
    531 	} else {
    532 		/* control send pool */
    533 		for (ix = 0; ix < nbuf - 1; ix++) {
    534 			bp[ix].buf_nextp = &bp[ix + 1];
    535 			bp[ix].buf_ep = ep;
    536 			bp[ix].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    537 			bp[ix].buf_ds.ds_key = ep->ep_snd_lkey;
    538 			bp[ix].buf_state = RDS_SNDBUF_FREE;
    539 			mp = mp + rcv_len;
    540 		}
    541 		bp[nbuf - 1].buf_nextp = NULL;
    542 		bp[nbuf - 1].buf_ep = ep;
    543 		bp[nbuf - 1].buf_ds.ds_va = (ib_vaddr_t)(uintptr_t)mp;
    544 		bp[nbuf - 1].buf_ds.ds_key = ep->ep_snd_lkey;
    545 		bp[nbuf - 1].buf_state = RDS_SNDBUF_FREE;
    546 		spool->pool_headp = &bp[0];
    547 		spool->pool_tailp = &bp[nbuf - 1];
    548 	}
    549 
    550 	RDS_DPRINTF3(LABEL, "rdsmemp start: %p end: %p", memp, mp);
    551 	RDS_DPRINTF2("rds_init_send_pool", "Return");
    552 
    553 	return (0);
    554 }
    555 
    556 int
    557 rds_reinit_send_pool(rds_ep_t *ep, ib_guid_t hca_guid)
    558 {
    559 	rds_buf_t	*bp;
    560 	rds_hca_t	*hcap;
    561 	ibt_mr_attr_t   mem_attr;
    562 	ibt_mr_desc_t   mem_desc;
    563 	rds_bufpool_t   *spool;
    564 	int		ret;
    565 
    566 	RDS_DPRINTF2("rds_reinit_send_pool", "Enter: EP(%p)", ep);
    567 
    568 	spool = &ep->ep_sndpool;
    569 	ASSERT(spool->pool_memp != NULL);
    570 
    571 	/* deregister the send pool memory from the previous HCA */
    572 	hcap = rds_get_hcap(rdsib_statep, ep->ep_hca_guid);
    573 	if (hcap == NULL) {
    574 		RDS_DPRINTF2("rds_reinit_send_pool", "HCA (0x%llx) not found",
    575 		    ep->ep_hca_guid);
    576 	} else {
    577 		if (ep->ep_snd_mrhdl != NULL) {
    578 			(void) ibt_deregister_mr(hcap->hca_hdl,
    579 			    ep->ep_snd_mrhdl);
    580 			ep->ep_snd_mrhdl = NULL;
    581 			ep->ep_snd_lkey = 0;
    582 		}
    583 
    584 		if ((ep->ep_type == RDS_EP_TYPE_DATA) &&
    585 		    (ep->ep_ackhdl != NULL)) {
    586 			(void) ibt_deregister_mr(hcap->hca_hdl, ep->ep_ackhdl);
    587 			ep->ep_ackhdl = NULL;
    588 			ep->ep_ack_rkey = 0;
    589 		}
    590 
    591 		ep->ep_hca_guid = NULL;
    592 	}
    593 
    594 	/* get the hcap for the new HCA */
    595 	hcap = rds_get_hcap(rdsib_statep, hca_guid);
    596 	if (hcap == NULL) {
    597 		RDS_DPRINTF2("rds_reinit_send_pool", "HCA (0x%llx) not found",
    598 		    hca_guid);
    599 		return (-1);
    600 	}
    601 
    602 	/* register the send memory */
    603 	mem_attr.mr_vaddr = (ib_vaddr_t)(uintptr_t)spool->pool_memp;
    604 	mem_attr.mr_len = spool->pool_memsize;
    605 	mem_attr.mr_as = NULL;
    606 	mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE;
    607 
    608 	ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
    609 	    &mem_attr, &ep->ep_snd_mrhdl, &mem_desc);
    610 	if (ret != IBT_SUCCESS) {
    611 		RDS_DPRINTF2("rds_reinit_send_pool",
    612 		    "EP(%p): ibt_register_mr failed: %d", ep, ret);
    613 		return (-1);
    614 	}
    615 	ep->ep_snd_lkey = mem_desc.md_lkey;
    616 
    617 	/* register the acknowledgement space */
    618 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
    619 		mem_attr.mr_vaddr = (ib_vaddr_t)ep->ep_ack_addr;
    620 		mem_attr.mr_len = sizeof (uintptr_t);
    621 		mem_attr.mr_as = NULL;
    622 		mem_attr.mr_flags = IBT_MR_SLEEP | IBT_MR_ENABLE_LOCAL_WRITE |
    623 		    IBT_MR_ENABLE_REMOTE_WRITE;
    624 
    625 		ret = ibt_register_mr(hcap->hca_hdl, hcap->hca_pdhdl,
    626 		    &mem_attr, &ep->ep_ackhdl, &mem_desc);
    627 		if (ret != IBT_SUCCESS) {
    628 			RDS_DPRINTF2("rds_reinit_send_pool",
    629 			    "EP(%p): ibt_register_mr for ack failed: %d",
    630 			    ep, ret);
    631 			(void) ibt_deregister_mr(hcap->hca_hdl,
    632 			    ep->ep_snd_mrhdl);
    633 			ep->ep_snd_mrhdl = NULL;
    634 			ep->ep_snd_lkey = 0;
    635 			return (-1);
    636 		}
    637 		ep->ep_ack_rkey = mem_desc.md_rkey;
    638 
    639 		/* update the LKEY in the acknowledgement WR */
    640 		ep->ep_ackds.ds_key = ep->ep_snd_lkey;
    641 	}
    642 
    643 	/* update the LKEY in each buffer */
    644 	bp = spool->pool_headp;
    645 	while (bp) {
    646 		bp->buf_ds.ds_key = ep->ep_snd_lkey;
    647 		bp = bp->buf_nextp;
    648 	}
    649 
    650 	ep->ep_hca_guid = hca_guid;
    651 
    652 	RDS_DPRINTF2("rds_reinit_send_pool", "Return: EP(%p)", ep);
    653 
    654 	return (0);
    655 }
    656 
    657 void
    658 rds_free_recv_pool(rds_ep_t *ep)
    659 {
    660 	rds_bufpool_t *pool;
    661 
    662 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
    663 		pool = &rds_dpool;
    664 	} else {
    665 		pool = &rds_cpool;
    666 	}
    667 
    668 	mutex_enter(&ep->ep_rcvpool.pool_lock);
    669 	if (ep->ep_rcvpool.pool_nfree != 0) {
    670 		rds_free_buf(pool, ep->ep_rcvpool.pool_headp,
    671 		    ep->ep_rcvpool.pool_nfree);
    672 		ep->ep_rcvpool.pool_nfree = 0;
    673 		ep->ep_rcvpool.pool_headp = NULL;
    674 		ep->ep_rcvpool.pool_tailp = NULL;
    675 	}
    676 	mutex_exit(&ep->ep_rcvpool.pool_lock);
    677 }
    678 
    679 int
    680 rds_init_recv_pool(rds_ep_t *ep)
    681 {
    682 	rds_bufpool_t	*rpool;
    683 	rds_qp_t	*recvqp;
    684 
    685 	recvqp = &ep->ep_recvqp;
    686 	rpool = &ep->ep_rcvpool;
    687 	if (ep->ep_type == RDS_EP_TYPE_DATA) {
    688 		recvqp->qp_depth = MaxDataRecvBuffers;
    689 		recvqp->qp_level = 0;
    690 		recvqp->qp_lwm = (DataRecvBufferLWM * MaxDataRecvBuffers)/100;
    691 		recvqp->qp_taskqpending = B_FALSE;
    692 
    693 		rpool->pool_nbuffers = MaxDataRecvBuffers;
    694 		rpool->pool_nbusy = 0;
    695 		rpool->pool_nfree = 0;
    696 	} else {
    697 		recvqp->qp_depth = MaxCtrlRecvBuffers;
    698 		recvqp->qp_level = 0;
    699 		recvqp->qp_lwm = (CtrlRecvBufferLWM * MaxCtrlRecvBuffers)/100;
    700 		recvqp->qp_taskqpending = B_FALSE;
    701 
    702 		rpool->pool_nbuffers = MaxCtrlRecvBuffers;
    703 		rpool->pool_nbusy = 0;
    704 		rpool->pool_nfree = 0;
    705 	}
    706 
    707 	return (0);
    708 }
    709 
    710 /* Free buffers to the global pool, either cpool or dpool */
    711 void
    712 rds_free_buf(rds_bufpool_t *pool, rds_buf_t *bp, uint_t nbuf)
    713 {
    714 	uint_t		ix;
    715 
    716 	RDS_DPRINTF4("rds_free_buf", "Enter");
    717 
    718 	ASSERT(nbuf != 0);
    719 
    720 	mutex_enter(&pool->pool_lock);
    721 
    722 	if (pool->pool_nfree != 0) {
    723 		pool->pool_tailp->buf_nextp = bp;
    724 	} else {
    725 		pool->pool_headp = bp;
    726 	}
    727 
    728 	if (nbuf == 1) {
    729 		ASSERT(bp->buf_state == RDS_RCVBUF_FREE);
    730 		bp->buf_ep = NULL;
    731 		bp->buf_nextp = NULL;
    732 		pool->pool_tailp = bp;
    733 	} else {
    734 		for (ix = 1; ix < nbuf; ix++) {
    735 			ASSERT(bp->buf_state == RDS_RCVBUF_FREE);
    736 			bp->buf_ep = NULL;
    737 			bp = bp->buf_nextp;
    738 		}
    739 		ASSERT(bp->buf_state == RDS_RCVBUF_FREE);
    740 		bp->buf_ep = NULL;
    741 		bp->buf_nextp = NULL;
    742 		pool->pool_tailp = bp;
    743 	}
    744 	/* tail is always the last buffer */
    745 	pool->pool_tailp->buf_nextp = NULL;
    746 
    747 	pool->pool_nfree += nbuf;
    748 	pool->pool_nbusy -= nbuf;
    749 
    750 	mutex_exit(&pool->pool_lock);
    751 
    752 	RDS_DPRINTF4("rds_free_buf", "Return");
    753 }
    754 
    755 /* Get buffers from the global pools, either cpool or dpool */
    756 rds_buf_t *
    757 rds_get_buf(rds_bufpool_t *pool, uint_t nbuf, uint_t *nret)
    758 {
    759 	rds_buf_t	*bp = NULL, *bp1;
    760 	uint_t		ix;
    761 
    762 	RDS_DPRINTF4("rds_get_buf", "Enter");
    763 
    764 	mutex_enter(&pool->pool_lock);
    765 
    766 	RDS_DPRINTF3("rds_get_buf", "Available: %d Needed: %d",
    767 	    pool->pool_nfree, nbuf);
    768 
    769 	if (nbuf < pool->pool_nfree) {
    770 		*nret = nbuf;
    771 
    772 		bp1 = pool->pool_headp;
    773 		for (ix = 1; ix < nbuf; ix++) {
    774 			bp1 = bp1->buf_nextp;
    775 		}
    776 
    777 		bp = pool->pool_headp;
    778 		pool->pool_headp = bp1->buf_nextp;
    779 		bp1->buf_nextp = NULL;
    780 
    781 		pool->pool_nfree -= nbuf;
    782 		pool->pool_nbusy += nbuf;
    783 	} else if (nbuf >= pool->pool_nfree) {
    784 		*nret = pool->pool_nfree;
    785 
    786 		bp = pool->pool_headp;
    787 
    788 		pool->pool_headp = NULL;
    789 		pool->pool_tailp = NULL;
    790 
    791 		pool->pool_nbusy += pool->pool_nfree;
    792 		pool->pool_nfree = 0;
    793 	}
    794 
    795 	mutex_exit(&pool->pool_lock);
    796 
    797 	RDS_DPRINTF4("rds_get_buf", "Return");
    798 
    799 	return (bp);
    800 }
    801 
    802 boolean_t
    803 rds_is_recvq_empty(rds_ep_t *ep, boolean_t wait)
    804 {
    805 	rds_qp_t	*recvqp;
    806 	rds_bufpool_t	*rpool;
    807 	boolean_t ret = B_TRUE;
    808 
    809 	recvqp = &ep->ep_recvqp;
    810 	mutex_enter(&recvqp->qp_lock);
    811 	RDS_DPRINTF2("rds_is_recvq_empty", "EP(%p): QP has %d WRs",
    812 	    ep, recvqp->qp_level);
    813 	if (wait) {
    814 		/* wait until the RQ is empty */
    815 		while (recvqp->qp_level != 0) {
    816 			/* wait one second and try again */
    817 			mutex_exit(&recvqp->qp_lock);
    818 			delay(drv_usectohz(1000000));
    819 			mutex_enter(&recvqp->qp_lock);
    820 		}
    821 	} else if (recvqp->qp_level != 0) {
    822 			ret = B_FALSE;
    823 	}
    824 	mutex_exit(&recvqp->qp_lock);
    825 
    826 	rpool = &ep->ep_rcvpool;
    827 	mutex_enter(&rpool->pool_lock);
    828 
    829 	/*
    830 	 * During failovers/reconnects, the app may still have some buffers
    831 	 * on thier socket queues. Waiting here for those buffers may
    832 	 * cause a hang. It seems ok for those buffers to get freed later.
    833 	 */
    834 	if (rpool->pool_nbusy != 0) {
    835 		RDS_DPRINTF2("rds_is_recvq_empty", "EP(%p): "
    836 		    "There are %d pending buffers on sockqs", ep,
    837 		    rpool->pool_nbusy);
    838 		ret = B_FALSE;
    839 	}
    840 	mutex_exit(&rpool->pool_lock);
    841 
    842 	return (ret);
    843 }
    844 
    845 boolean_t
    846 rds_is_sendq_empty(rds_ep_t *ep, uint_t wait)
    847 {
    848 	rds_bufpool_t	*spool;
    849 	rds_buf_t	*bp;
    850 	boolean_t	ret1 = B_TRUE;
    851 
    852 	/* check if all the sends completed */
    853 	spool = &ep->ep_sndpool;
    854 	mutex_enter(&spool->pool_lock);
    855 	RDS_DPRINTF2("rds_is_sendq_empty", "EP(%p): "
    856 	    "Send Pool contains: %d", ep, spool->pool_nbusy);
    857 	if (wait) {
    858 		while (spool->pool_nbusy != 0) {
    859 			if (rds_no_interrupts) {
    860 				/* wait one second and try again */
    861 				delay(drv_usectohz(1000000));
    862 				rds_poll_send_completions(ep->ep_sendcq, ep,
    863 				    B_TRUE);
    864 			} else {
    865 				/* wait one second and try again */
    866 				mutex_exit(&spool->pool_lock);
    867 				delay(drv_usectohz(1000000));
    868 				mutex_enter(&spool->pool_lock);
    869 			}
    870 		}
    871 
    872 		if ((wait == 2) && (ep->ep_type == RDS_EP_TYPE_DATA)) {
    873 			rds_buf_t	*ackbp;
    874 			rds_buf_t	*prev_ackbp;
    875 
    876 			/*
    877 			 * If the last one is acknowledged then everything
    878 			 * is acknowledged
    879 			 */
    880 			bp = spool->pool_tailp;
    881 			ackbp = *(rds_buf_t **)ep->ep_ack_addr;
    882 			prev_ackbp = ackbp;
    883 			RDS_DPRINTF2("rds_is_sendq_empty", "EP(%p): "
    884 			    "Checking for acknowledgements", ep);
    885 			while (bp != ackbp) {
    886 				RDS_DPRINTF2("rds_is_sendq_empty",
    887 				    "EP(%p) BP(0x%p/0x%p) last "
    888 				    "sent/acknowledged", ep, bp, ackbp);
    889 				mutex_exit(&spool->pool_lock);
    890 				delay(drv_usectohz(1000000));
    891 				mutex_enter(&spool->pool_lock);
    892 
    893 				bp = spool->pool_tailp;
    894 				ackbp = *(rds_buf_t **)ep->ep_ack_addr;
    895 				if (ackbp == prev_ackbp) {
    896 					RDS_DPRINTF2("rds_is_sendq_empty",
    897 					    "There has been no progress,"
    898 					    "give up and proceed");
    899 					break;
    900 				}
    901 				prev_ackbp = ackbp;
    902 			}
    903 		}
    904 	} else if (spool->pool_nbusy != 0) {
    905 			ret1 = B_FALSE;
    906 	}
    907 	mutex_exit(&spool->pool_lock);
    908 
    909 	/* check if all the rdma acks completed */
    910 	mutex_enter(&ep->ep_lock);
    911 	RDS_DPRINTF2("rds_is_sendq_empty", "EP(%p): "
    912 	    "Outstanding RDMA Acks: %d", ep, ep->ep_rdmacnt);
    913 	if (wait) {
    914 		while (ep->ep_rdmacnt != 0) {
    915 			if (rds_no_interrupts) {
    916 				/* wait one second and try again */
    917 				delay(drv_usectohz(1000000));
    918 				rds_poll_send_completions(ep->ep_sendcq, ep,
    919 				    B_FALSE);
    920 			} else {
    921 				/* wait one second and try again */
    922 				mutex_exit(&ep->ep_lock);
    923 				delay(drv_usectohz(1000000));
    924 				mutex_enter(&ep->ep_lock);
    925 			}
    926 		}
    927 	} else if (ep->ep_rdmacnt != 0) {
    928 			ret1 = B_FALSE;
    929 	}
    930 	mutex_exit(&ep->ep_lock);
    931 
    932 	return (ret1);
    933 }
    934 
    935 /* Get buffers from the send pool */
    936 rds_buf_t *
    937 rds_get_send_buf(rds_ep_t *ep, uint_t nbuf)
    938 {
    939 	rds_buf_t	*bp = NULL, *bp1;
    940 	rds_bufpool_t	*spool;
    941 	uint_t		waittime = rds_waittime_ms * 1000;
    942 	uint_t		ix;
    943 	int		ret;
    944 
    945 	RDS_DPRINTF4("rds_get_send_buf", "Enter: EP(%p) Buffers requested: %d",
    946 	    ep, nbuf);
    947 
    948 	spool = &ep->ep_sndpool;
    949 	mutex_enter(&spool->pool_lock);
    950 
    951 	if (rds_no_interrupts) {
    952 		if ((spool->pool_sqpoll_pending == B_FALSE) &&
    953 		    (spool->pool_nbusy >
    954 		    (spool->pool_nbuffers * rds_poll_percent_full)/100)) {
    955 			spool->pool_sqpoll_pending = B_TRUE;
    956 			mutex_exit(&spool->pool_lock);
    957 			rds_poll_send_completions(ep->ep_sendcq, ep, B_FALSE);
    958 			mutex_enter(&spool->pool_lock);
    959 			spool->pool_sqpoll_pending = B_FALSE;
    960 		}
    961 	}
    962 
    963 	if (spool->pool_nfree < nbuf) {
    964 		/* wait for buffers to become available */
    965 		spool->pool_cv_count += nbuf;
    966 		ret = cv_reltimedwait_sig(&spool->pool_cv, &spool->pool_lock,
    967 		    drv_usectohz(waittime), TR_CLOCK_TICK);
    968 		/* ret = cv_wait_sig(&spool->pool_cv, &spool->pool_lock); */
    969 		if (ret == 0) {
    970 			/* signal pending */
    971 			spool->pool_cv_count -= nbuf;
    972 			mutex_exit(&spool->pool_lock);
    973 			return (NULL);
    974 		}
    975 
    976 		spool->pool_cv_count -= nbuf;
    977 	}
    978 
    979 	/* Have the number of buffers needed */
    980 	if (spool->pool_nfree > nbuf) {
    981 		bp = spool->pool_headp;
    982 
    983 		if (ep->ep_type == RDS_EP_TYPE_DATA) {
    984 			rds_buf_t *ackbp;
    985 			ackbp = *(rds_buf_t **)ep->ep_ack_addr;
    986 
    987 			/* check if all the needed buffers are acknowledged */
    988 			bp1 = bp;
    989 			for (ix = 0; ix < nbuf; ix++) {
    990 				if ((bp1 == ackbp) ||
    991 				    (bp1->buf_state != RDS_SNDBUF_FREE)) {
    992 					/*
    993 					 * The buffer is not yet signalled or
    994 					 * is not yet acknowledged
    995 					 */
    996 					RDS_DPRINTF5("rds_get_send_buf",
    997 					    "EP(%p) Buffer (%p) not yet "
    998 					    "acked/completed", ep, bp1);
    999 					mutex_exit(&spool->pool_lock);
   1000 					return (NULL);
   1001 				}
   1002 
   1003 				bp1 = bp1->buf_nextp;
   1004 			}
   1005 		}
   1006 
   1007 		/* mark the buffers as pending */
   1008 		bp1 = bp;
   1009 		for (ix = 1; ix < nbuf; ix++) {
   1010 			ASSERT(bp1->buf_state == RDS_SNDBUF_FREE);
   1011 			bp1->buf_state = RDS_SNDBUF_PENDING;
   1012 			bp1 = bp1->buf_nextp;
   1013 		}
   1014 		ASSERT(bp1->buf_state == RDS_SNDBUF_FREE);
   1015 		bp1->buf_state = RDS_SNDBUF_PENDING;
   1016 
   1017 		spool->pool_headp = bp1->buf_nextp;
   1018 		bp1->buf_nextp = NULL;
   1019 		if (spool->pool_headp == NULL)
   1020 			spool->pool_tailp = NULL;
   1021 		spool->pool_nfree -= nbuf;
   1022 		spool->pool_nbusy += nbuf;
   1023 	}
   1024 	mutex_exit(&spool->pool_lock);
   1025 
   1026 	RDS_DPRINTF4("rds_get_send_buf", "Return: EP(%p) Buffers requested: %d",
   1027 	    ep, nbuf);
   1028 
   1029 	return (bp);
   1030 }
   1031 
   1032 #define	RDS_MIN_BUF_TO_WAKE_THREADS	10
   1033 
   1034 void
   1035 rds_free_send_buf(rds_ep_t *ep, rds_buf_t *headp, rds_buf_t *tailp, uint_t nbuf,
   1036     boolean_t lock)
   1037 {
   1038 	rds_bufpool_t	*spool;
   1039 	rds_buf_t	*tmp;
   1040 
   1041 	RDS_DPRINTF4("rds_free_send_buf", "Enter");
   1042 
   1043 	ASSERT(nbuf != 0);
   1044 
   1045 	if (tailp == NULL) {
   1046 		if (nbuf > 1) {
   1047 			tmp = headp;
   1048 			while (tmp->buf_nextp) {
   1049 				tmp = tmp->buf_nextp;
   1050 			}
   1051 			tailp = tmp;
   1052 		} else {
   1053 			tailp = headp;
   1054 		}
   1055 	}
   1056 
   1057 	spool = &ep->ep_sndpool;
   1058 
   1059 	if (lock == B_FALSE) {
   1060 		/* lock is not held outside */
   1061 		mutex_enter(&spool->pool_lock);
   1062 	}
   1063 
   1064 	if (spool->pool_nfree) {
   1065 		spool->pool_tailp->buf_nextp = headp;
   1066 	} else {
   1067 		spool->pool_headp = headp;
   1068 	}
   1069 	spool->pool_tailp = tailp;
   1070 
   1071 	spool->pool_nfree += nbuf;
   1072 	spool->pool_nbusy -= nbuf;
   1073 
   1074 	if ((spool->pool_cv_count > 0) &&
   1075 	    (spool->pool_nfree > RDS_MIN_BUF_TO_WAKE_THREADS)) {
   1076 		if (spool->pool_nfree >= spool->pool_cv_count)
   1077 			cv_broadcast(&spool->pool_cv);
   1078 		else
   1079 			cv_signal(&spool->pool_cv);
   1080 	}
   1081 
   1082 	if (lock == B_FALSE) {
   1083 		mutex_exit(&spool->pool_lock);
   1084 	}
   1085 
   1086 	RDS_DPRINTF4("rds_free_send_buf", "Return");
   1087 }
   1088 
   1089 void
   1090 rds_free_recv_buf(rds_buf_t *bp, uint_t nbuf)
   1091 {
   1092 	rds_ep_t	*ep;
   1093 	rds_bufpool_t	*rpool;
   1094 	rds_buf_t	*bp1;
   1095 	uint_t		ix;
   1096 
   1097 	RDS_DPRINTF4("rds_free_recv_buf", "Enter");
   1098 
   1099 	ASSERT(nbuf != 0);
   1100 
   1101 	ep = bp->buf_ep;
   1102 	rpool = &ep->ep_rcvpool;
   1103 
   1104 	mutex_enter(&rpool->pool_lock);
   1105 
   1106 	/* Add the buffers to the local pool */
   1107 	if (rpool->pool_tailp == NULL) {
   1108 		ASSERT(rpool->pool_headp == NULL);
   1109 		ASSERT(rpool->pool_nfree == 0);
   1110 		rpool->pool_headp = bp;
   1111 		bp1 = bp;
   1112 		for (ix = 1; ix < nbuf; ix++) {
   1113 			if (bp1->buf_state == RDS_RCVBUF_ONSOCKQ) {
   1114 				rpool->pool_nbusy--;
   1115 			}
   1116 			bp1->buf_state = RDS_RCVBUF_FREE;
   1117 			bp1 = bp1->buf_nextp;
   1118 		}
   1119 		bp1->buf_nextp = NULL;
   1120 		if (bp->buf_state == RDS_RCVBUF_ONSOCKQ) {
   1121 			rpool->pool_nbusy--;
   1122 		}
   1123 		bp->buf_state = RDS_RCVBUF_FREE;
   1124 		rpool->pool_tailp = bp1;
   1125 		rpool->pool_nfree += nbuf;
   1126 	} else {
   1127 		bp1 = bp;
   1128 		for (ix = 1; ix < nbuf; ix++) {
   1129 			if (bp1->buf_state == RDS_RCVBUF_ONSOCKQ) {
   1130 				rpool->pool_nbusy--;
   1131 			}
   1132 			bp1->buf_state = RDS_RCVBUF_FREE;
   1133 			bp1 = bp1->buf_nextp;
   1134 		}
   1135 		bp1->buf_nextp = NULL;
   1136 		if (bp->buf_state == RDS_RCVBUF_ONSOCKQ) {
   1137 			rpool->pool_nbusy--;
   1138 		}
   1139 		bp->buf_state = RDS_RCVBUF_FREE;
   1140 		rpool->pool_tailp->buf_nextp = bp;
   1141 		rpool->pool_tailp = bp1;
   1142 		rpool->pool_nfree += nbuf;
   1143 	}
   1144 
   1145 	if (rpool->pool_nfree >= rds_nbuffers_to_putback) {
   1146 		bp = rpool->pool_headp;
   1147 		nbuf = rpool->pool_nfree;
   1148 		rpool->pool_headp = NULL;
   1149 		rpool->pool_tailp = NULL;
   1150 		rpool->pool_nfree = 0;
   1151 		mutex_exit(&rpool->pool_lock);
   1152 
   1153 		/* Free the buffers to the global pool */
   1154 		if (ep->ep_type == RDS_EP_TYPE_DATA) {
   1155 			rds_free_buf(&rds_dpool, bp, nbuf);
   1156 		} else {
   1157 			rds_free_buf(&rds_cpool, bp, nbuf);
   1158 		}
   1159 
   1160 		return;
   1161 	}
   1162 	mutex_exit(&rpool->pool_lock);
   1163 
   1164 	RDS_DPRINTF4("rds_free_recv_buf", "Return");
   1165 }
   1166