Home | History | Annotate | Download | only in patches
      1 Index: libedataserver/e-msgport.c
      2 ===================================================================
      3 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.c,v
      4 retrieving revision 1.10
      5 diff -u -p -r1.10 e-msgport.c
      6 --- libedataserver/e-msgport.c	26 Jul 2006 18:12:08 -0000	1.10
      7 +++ libedataserver/e-msgport.c	21 Sep 2006 16:49:41 -0000
      8 @@ -486,29 +486,19 @@ em_cache_clear(EMCache *emc)
      9  }
     10  
     11  struct _EMsgPort {
     12 -	EDList queue;
     13 -	int condwait;		/* how many waiting in condwait */
     14 -	union {
     15 -		int pipe[2];	/* On Win32 actually a pair of SOCKETs */
     16 -		struct {
     17 -			int read;
     18 -			int write;
     19 -		} fd;
     20 -	} pipe;
     21 +	GAsyncQueue *queue;
     22 +	EMsg *cache;
     23 +	gint pipe[2];  /* on Win32, actually a pair of SOCKETs */
     24  #ifdef HAVE_NSS
     25 -	union {
     26 -		PRFileDesc *pipe[2];
     27 -		struct {
     28 -			PRFileDesc *read;
     29 -			PRFileDesc *write;
     30 -		} fd;
     31 -	} prpipe;
     32 -#endif
     33 -	/* @#@$#$ glib stuff */
     34 -	GCond *cond;
     35 -	GMutex *lock;
     36 +	PRFileDesc *prpipe[2];
     37 +#endif
     38  };
     39  
     40 +/* message flags */
     41 +enum {
     42 +	MSG_FLAG_SYNC_WITH_PIPE    = 1 << 0,
     43 +	MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
     44 +};
     45  
     46  #ifdef HAVE_NSS
     47  static int
     48 @@ -529,189 +519,237 @@ e_prpipe (PRFileDesc **fds)
     49  }
     50  #endif
     51  
     52 -EMsgPort *e_msgport_new(void)
     53 +static void
     54 +msgport_sync_with_pipe (gint fd)
     55 +{
     56 +	gchar buffer[1];
     57 +
     58 +	while (fd >= 0) {
     59 +		if (E_READ (fd, buffer, 1) > 0)
     60 +			break;
     61 +		else if (!E_IS_STATUS_INTR ()) {
     62 +			g_warning ("%s: Failed to read from pipe: %s",
     63 +				G_STRFUNC, g_strerror (errno));
     64 +			break;
     65 +		}
     66 +	}
     67 +}
     68 +
     69 +#ifdef HAVE_NSS
     70 +static void
     71 +msgport_sync_with_prpipe (PRFileDesc *prfd)
     72 +{
     73 +	gchar buffer[1];
     74 +
     75 +	while (prfd != NULL) {
     76 +		if (PR_Read (prfd, buffer, 1) > 0)
     77 +			break;
     78 +		else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
     79 +			gchar *text = g_alloca (PR_GetErrorTextLength ());
     80 +			PR_GetErrorText (text);
     81 +			g_warning ("%s: Failed to read from NSPR pipe: %s",
     82 +				G_STRFUNC, text);
     83 +			break;
     84 +		}
     85 +	}
     86 +}
     87 +#endif
     88 +
     89 +EMsgPort *
     90 +e_msgport_new (void)
     91  {
     92 -	EMsgPort *mp;
     93 +	EMsgPort *msgport;
     94  
     95 -	mp = g_malloc(sizeof(*mp));
     96 -	e_dlist_init(&mp->queue);
     97 -	mp->lock = g_mutex_new();
     98 -	mp->cond = g_cond_new();
     99 -	e_pipe (mp->pipe.pipe);
    100 +	msgport = g_slice_new (EMsgPort);
    101 +	msgport->queue = g_async_queue_new ();
    102 +	msgport->cache = NULL;
    103 +	msgport->pipe[0] = -1;
    104 +	msgport->pipe[1] = -1;
    105  #ifdef HAVE_NSS
    106 -	e_prpipe (mp->prpipe.pipe);
    107 +	msgport->prpipe[0] = NULL;
    108 +	msgport->prpipe[1] = NULL;
    109  #endif
    110 -	mp->condwait = 0;
    111  
    112 -	return mp;
    113 +	return msgport;
    114  }
    115  
    116 -void e_msgport_destroy(EMsgPort *mp)
    117 +void
    118 +e_msgport_destroy (EMsgPort *msgport)
    119  {
    120 -	g_mutex_free(mp->lock);
    121 -	g_cond_free(mp->cond);
    122 -	if (mp->pipe.fd.read != -1) {
    123 -		E_CLOSE(mp->pipe.fd.read);
    124 -		E_CLOSE(mp->pipe.fd.write);
    125 +	g_return_if_fail (msgport != NULL);
    126 +
    127 +	if (msgport->pipe[0] >= 0) {
    128 +		E_CLOSE (msgport->pipe[0]);
    129 +		E_CLOSE (msgport->pipe[1]);
    130  	}
    131  #ifdef HAVE_NSS
    132 -	if (mp->prpipe.fd.read) {
    133 -		PR_Close(mp->prpipe.fd.read);
    134 -		PR_Close(mp->prpipe.fd.write);
    135 +	if (msgport->prpipe[0] != NULL) {
    136 +		PR_Close (msgport->prpipe[0]);
    137 +		PR_Close (msgport->prpipe[1]);
    138  	}
    139  #endif
    140 -	g_free(mp);
    141 +
    142 +	g_async_queue_unref (msgport->queue);
    143 +	g_slice_free (EMsgPort, msgport);
    144  }
    145  
    146 -/* get a fd that can be used to wait on the port asynchronously */
    147 -int e_msgport_fd(EMsgPort *mp)
    148 +int
    149 +e_msgport_fd (EMsgPort *msgport)
    150  {
    151 -	return mp->pipe.fd.read;
    152 +	gint fd;
    153 +
    154 +	g_return_val_if_fail (msgport != NULL, -1);
    155 +
    156 +	g_async_queue_lock (msgport->queue);
    157 +	fd = msgport->pipe[0];
    158 +	if (fd < 0 && e_pipe (msgport->pipe) == 0)
    159 +		fd = msgport->pipe[0];
    160 +	g_async_queue_unlock (msgport->queue);
    161 +
    162 +	return fd;
    163  }
    164  
    165  #ifdef HAVE_NSS
    166 -PRFileDesc *e_msgport_prfd(EMsgPort *mp)
    167 +PRFileDesc *
    168 +e_msgport_prfd (EMsgPort *msgport)
    169  {
    170 -	return mp->prpipe.fd.read;
    171 +	PRFileDesc *prfd;
    172 +
    173 +	g_return_val_if_fail (msgport != NULL, NULL);
    174 +
    175 +	g_async_queue_lock (msgport->queue);
    176 +	prfd = msgport->prpipe[0];
    177 +	if (prfd == NULL && e_prpipe (msgport->prpipe) == 0)
    178 +		prfd = msgport->prpipe[0];
    179 +	g_async_queue_unlock (msgport->queue);
    180 +
    181 +	return prfd;
    182  }
    183  #endif
    184  
    185 -void e_msgport_put(EMsgPort *mp, EMsg *msg)
    186 +void
    187 +e_msgport_put (EMsgPort *msgport, EMsg *msg)
    188  {
    189 +	gint fd;
    190  #ifdef HAVE_NSS
    191  	PRFileDesc *prfd;
    192  #endif
    193 -	ssize_t w;
    194 -	int fd;
    195 -	
    196 -	m(printf("put:\n"));
    197 -	g_mutex_lock(mp->lock);
    198 -	e_dlist_addtail(&mp->queue, &msg->ln);
    199 -	if (mp->condwait > 0) {
    200 -		m(printf("put: condwait > 0, waking up\n"));
    201 -		g_cond_signal(mp->cond);
    202 +
    203 +	g_return_if_fail (msgport != NULL);
    204 +	g_return_if_fail (msg != NULL);
    205 +
    206 +	g_async_queue_lock (msgport->queue);
    207 +
    208 +	msg->flags = 0;
    209 +
    210 +	fd = msgport->pipe[1];
    211 +	while (fd >= 0) {
    212 +		if (E_WRITE (fd, "E", 1) > 0) {
    213 +			msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
    214 +			break;
    215 +		} else if (!E_IS_STATUS_INTR ()) {
    216 +			g_warning ("%s: Failed to write to pipe: %s",
    217 +				G_STRFUNC, g_strerror (errno));
    218 +			break;
    219 +		}
    220  	}
    221 -	
    222 -	fd = mp->pipe.fd.write;
    223 -#ifdef HAVE_NSS
    224 -	prfd = mp->prpipe.fd.write;
    225 -#endif
    226 -	g_mutex_unlock(mp->lock);
    227  
    228  #ifdef HAVE_NSS
    229 -	if (prfd != NULL) {
    230 -		m(printf("put: have pr pipe, writing notification to it\n"));
    231 -		do {
    232 -			w = PR_Write (prfd, "E", 1);
    233 -		} while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
    234 +	prfd = msgport->prpipe[1];
    235 +	while (prfd != NULL) {
    236 +		if (PR_Write (prfd, "E", 1) > 0) {
    237 +			msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
    238 +			break;
    239 +		} else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
    240 +			gchar *text = g_alloca (PR_GetErrorTextLength ());
    241 +			PR_GetErrorText (text);
    242 +			g_warning ("%s: Failed to write to NSPR pipe: %s",
    243 +				G_STRFUNC, text);
    244 +			break;
    245 +		}
    246  	}
    247  #endif
    248 -	if (fd != -1) {
    249 -		m(printf("put: have pipe, writing notification to it\n"));
    250 -		do {
    251 -			w = E_WRITE (fd, "E", 1);
    252 -		} while (w == -1 && E_IS_STATUS_INTR ());
    253 -	}
    254  
    255 -	m(printf("put: done\n"));
    256 +	g_async_queue_push_unlocked (msgport->queue, msg);
    257 +	g_async_queue_unlock (msgport->queue);
    258  }
    259  
    260 -static void
    261 -msgport_cleanlock(void *data)
    262 +EMsg *
    263 +e_msgport_wait (EMsgPort *msgport)
    264  {
    265 -	EMsgPort *mp = data;
    266 +	EMsg *msg;
    267  
    268 -	g_mutex_unlock(mp->lock);
    269 -}
    270 +	g_return_val_if_fail (msgport != NULL, NULL);
    271  
    272 -EMsg *e_msgport_wait(EMsgPort *mp)
    273 -{
    274 -	EMsg *msg;
    275 +	g_async_queue_lock (msgport->queue);
    276  
    277 -	m(printf("wait:\n"));
    278 -	g_mutex_lock(mp->lock);
    279 -	while (e_dlist_empty(&mp->queue)) {
    280 -		if (mp->pipe.fd.read != -1) {
    281 -			fd_set rfds;
    282 -			int retry;
    283 -
    284 -			m(printf("wait: waiting on pipe\n"));
    285 -			g_mutex_unlock(mp->lock);
    286 -			do {
    287 -				FD_ZERO(&rfds);
    288 -				FD_SET(mp->pipe.fd.read, &rfds);
    289 -				retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR();
    290 -				pthread_testcancel();
    291 -			} while (retry);
    292 -			g_mutex_lock(mp->lock);
    293 -			m(printf("wait: got pipe\n"));
    294 -#ifdef HAVE_NSS
    295 -		} else if (mp->prpipe.fd.read != NULL) {
    296 -			PRPollDesc rfds[1];
    297 -			int retry;
    298 -
    299 -			m(printf("wait: waitng on pr pipe\n"));
    300 -			g_mutex_unlock(mp->lock);
    301 -			do {
    302 -				rfds[0].fd = mp->prpipe.fd.read;
    303 -				rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR;
    304 -				retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR;
    305 -				pthread_testcancel();
    306 -			} while (retry);
    307 -			g_mutex_lock(mp->lock);
    308 -			m(printf("wait: got pr pipe\n"));
    309 -#endif /* HAVE_NSS */
    310 -		} else {
    311 -			m(printf("wait: waiting on condition\n"));
    312 -			mp->condwait++;
    313 -			/* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
    314 -			pthread_cleanup_push(msgport_cleanlock, mp);
    315 -			g_cond_wait(mp->cond, mp->lock);
    316 -			pthread_cleanup_pop(0);
    317 -			m(printf("wait: got condition\n"));
    318 -			mp->condwait--;
    319 -		}
    320 +	/* check the cache first */
    321 +	if (msgport->cache != NULL) {
    322 +		msg = msgport->cache;
    323 +		/* don't clear the cache */
    324 +		g_async_queue_unlock (msgport->queue);
    325 +		return msg;
    326  	}
    327 -	msg = (EMsg *)mp->queue.head;
    328 -	m(printf("wait: message = %p\n", msg));
    329 -	g_mutex_unlock(mp->lock);
    330 -	m(printf("wait: done\n"));
    331 +
    332 +	msg = g_async_queue_pop_unlocked (msgport->queue);
    333 +
    334 +	g_assert (msg != NULL);
    335 +
    336 +	/* The message is not actually "removed" from the EMsgPort until
    337 + 	 * e_msgport_get() is called.  So we cache the popped message. */
    338 +	msgport->cache = msg;
    339 +
    340 +	if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
    341 +		msgport_sync_with_pipe (msgport->pipe[0]);
    342 +#ifdef HAVE_NSS
    343 +	if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
    344 +		msgport_sync_with_prpipe (msgport->prpipe[0]);
    345 +#endif
    346 +
    347 +	g_async_queue_unlock (msgport->queue);
    348 +
    349  	return msg;
    350  }
    351  
    352 -EMsg *e_msgport_get(EMsgPort *mp)
    353 +EMsg *
    354 +e_msgport_get (EMsgPort *msgport)
    355  {
    356  	EMsg *msg;
    357 -	char dummy[1];
    358 -	ssize_t n;
    359 -	
    360 -	g_mutex_lock(mp->lock);
    361 -	msg = (EMsg *)e_dlist_remhead(&mp->queue);
    362 -	if (msg) {
    363 -		if (mp->pipe.fd.read != -1) {
    364 -			do {
    365 -				n = E_READ (mp->pipe.fd.read, dummy, 1);
    366 -			} while (n == -1 && E_IS_STATUS_INTR ());
    367 -		}
    368 +
    369 +	g_return_val_if_fail (msgport != NULL, NULL);
    370 +
    371 +	g_async_queue_lock (msgport->queue);
    372 +
    373 +	/* check the cache first */
    374 +	if (msgport->cache != NULL) {
    375 +		msg = msgport->cache;
    376 +		msgport->cache = NULL;
    377 +		g_async_queue_unlock (msgport->queue);
    378 +		return msg;
    379 +	}
    380 +
    381 +	msg = g_async_queue_try_pop_unlocked (msgport->queue);
    382 +
    383 +	if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
    384 +		msgport_sync_with_pipe (msgport->pipe[0]);
    385  #ifdef HAVE_NSS
    386 -		if (mp->prpipe.fd.read != NULL) {
    387 -			do {
    388 -				n = PR_Read (mp->prpipe.fd.read, dummy, 1);
    389 -			} while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
    390 -		}
    391 +	if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
    392 +		msgport_sync_with_prpipe (msgport->prpipe[0]);
    393  #endif
    394 -	}
    395 -	m(printf("get: message = %p\n", msg));
    396 -	g_mutex_unlock(mp->lock);
    397 +
    398 +	g_async_queue_unlock (msgport->queue);
    399  
    400  	return msg;
    401  }
    402  
    403 -void e_msgport_reply(EMsg *msg)
    404 +void
    405 +e_msgport_reply (EMsg *msg)
    406  {
    407 -	if (msg->reply_port) {
    408 -		e_msgport_put(msg->reply_port, msg);
    409 -	}
    410 +	g_return_if_fail (msg != NULL);
    411 +
    412 +	if (msg->reply_port)
    413 +		e_msgport_put (msg->reply_port, msg);
    414 +
    415  	/* else lost? */
    416  }
    417  
    418 @@ -1099,7 +1137,7 @@ void e_thread_put(EThread *e, EMsg *msg)
    419  	switch(e->type) {
    420  	case E_THREAD_QUEUE:
    421  		/* if the queue is full, lose this new addition */
    422 -		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
    423 +		if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
    424  			e_msgport_put(e->server_port, msg);
    425  		} else {
    426  			printf("queue limit reached, dropping new message\n");
    427 @@ -1108,7 +1146,7 @@ void e_thread_put(EThread *e, EMsg *msg)
    428  		break;
    429  	case E_THREAD_DROP:
    430  		/* if the queue is full, lose the oldest (unprocessed) message */
    431 -		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
    432 +		if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
    433  			e_msgport_put(e->server_port, msg);
    434  		} else {
    435  			printf("queue limit reached, dropping old message\n");
    436 Index: libedataserver/e-msgport.h
    437 ===================================================================
    438 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.h,v
    439 retrieving revision 1.3
    440 diff -u -p -r1.3 e-msgport.h
    441 --- libedataserver/e-msgport.h	3 Dec 2004 03:33:06 -0000	1.3
    442 +++ libedataserver/e-msgport.h	21 Sep 2006 16:49:41 -0000
    443 @@ -56,6 +56,7 @@ typedef struct _EMsgPort EMsgPort;
    444  typedef struct _EMsg {
    445  	EDListNode ln;
    446  	EMsgPort *reply_port;
    447 +	gint flags;
    448  } EMsg;
    449  
    450  EMsgPort *e_msgport_new(void);
    451 
    452