Home | History | Annotate | Download | only in patches
      1  9549  sh162551 Index: libedataserver/e-msgport.c
      2  9549  sh162551 ===================================================================
      3  9549  sh162551 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.c,v
      4  9549  sh162551 retrieving revision 1.10
      5  9549  sh162551 diff -u -p -r1.10 e-msgport.c
      6  9549  sh162551 --- libedataserver/e-msgport.c	26 Jul 2006 18:12:08 -0000	1.10
      7  9549  sh162551 +++ libedataserver/e-msgport.c	21 Sep 2006 16:49:41 -0000
      8  9549  sh162551 @@ -486,29 +486,19 @@ em_cache_clear(EMCache *emc)
      9  9549  sh162551  }
     10  9549  sh162551  
     11  9549  sh162551  struct _EMsgPort {
     12  9549  sh162551 -	EDList queue;
     13  9549  sh162551 -	int condwait;		/* how many waiting in condwait */
     14  9549  sh162551 -	union {
     15  9549  sh162551 -		int pipe[2];	/* On Win32 actually a pair of SOCKETs */
     16  9549  sh162551 -		struct {
     17  9549  sh162551 -			int read;
     18  9549  sh162551 -			int write;
     19  9549  sh162551 -		} fd;
     20  9549  sh162551 -	} pipe;
     21  9549  sh162551 +	GAsyncQueue *queue;
     22  9549  sh162551 +	EMsg *cache;
     23  9549  sh162551 +	gint pipe[2];  /* on Win32, actually a pair of SOCKETs */
     24  9549  sh162551  #ifdef HAVE_NSS
     25  9549  sh162551 -	union {
     26  9549  sh162551 -		PRFileDesc *pipe[2];
     27  9549  sh162551 -		struct {
     28  9549  sh162551 -			PRFileDesc *read;
     29  9549  sh162551 -			PRFileDesc *write;
     30  9549  sh162551 -		} fd;
     31  9549  sh162551 -	} prpipe;
     32  9549  sh162551 -#endif
     33  9549  sh162551 -	/* @#@$#$ glib stuff */
     34  9549  sh162551 -	GCond *cond;
     35  9549  sh162551 -	GMutex *lock;
     36  9549  sh162551 +	PRFileDesc *prpipe[2];
     37  9549  sh162551 +#endif
     38  9549  sh162551  };
     39  9549  sh162551  
     40  9549  sh162551 +/* message flags */
     41  9549  sh162551 +enum {
     42  9549  sh162551 +	MSG_FLAG_SYNC_WITH_PIPE    = 1 << 0,
     43  9549  sh162551 +	MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
     44  9549  sh162551 +};
     45  9549  sh162551  
     46  9549  sh162551  #ifdef HAVE_NSS
     47  9549  sh162551  static int
     48  9549  sh162551 @@ -529,189 +519,237 @@ e_prpipe (PRFileDesc **fds)
     49  9549  sh162551  }
     50  9549  sh162551  #endif
     51  9549  sh162551  
     52  9549  sh162551 -EMsgPort *e_msgport_new(void)
     53  9549  sh162551 +static void
     54  9549  sh162551 +msgport_sync_with_pipe (gint fd)
     55  9549  sh162551 +{
     56  9549  sh162551 +	gchar buffer[1];
     57  9549  sh162551 +
     58  9549  sh162551 +	while (fd >= 0) {
     59  9549  sh162551 +		if (E_READ (fd, buffer, 1) > 0)
     60  9549  sh162551 +			break;
     61  9549  sh162551 +		else if (!E_IS_STATUS_INTR ()) {
     62  9549  sh162551 +			g_warning ("%s: Failed to read from pipe: %s",
     63  9549  sh162551 +				G_STRFUNC, g_strerror (errno));
     64  9549  sh162551 +			break;
     65  9549  sh162551 +		}
     66  9549  sh162551 +	}
     67  9549  sh162551 +}
     68  9549  sh162551 +
     69  9549  sh162551 +#ifdef HAVE_NSS
     70  9549  sh162551 +static void
     71  9549  sh162551 +msgport_sync_with_prpipe (PRFileDesc *prfd)
     72  9549  sh162551 +{
     73  9549  sh162551 +	gchar buffer[1];
     74  9549  sh162551 +
     75  9549  sh162551 +	while (prfd != NULL) {
     76  9549  sh162551 +		if (PR_Read (prfd, buffer, 1) > 0)
     77  9549  sh162551 +			break;
     78  9549  sh162551 +		else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
     79  9549  sh162551 +			gchar *text = g_alloca (PR_GetErrorTextLength ());
     80  9549  sh162551 +			PR_GetErrorText (text);
     81  9549  sh162551 +			g_warning ("%s: Failed to read from NSPR pipe: %s",
     82  9549  sh162551 +				G_STRFUNC, text);
     83  9549  sh162551 +			break;
     84  9549  sh162551 +		}
     85  9549  sh162551 +	}
     86  9549  sh162551 +}
     87  9549  sh162551 +#endif
     88  9549  sh162551 +
     89  9549  sh162551 +EMsgPort *
     90  9549  sh162551 +e_msgport_new (void)
     91  9549  sh162551  {
     92  9549  sh162551 -	EMsgPort *mp;
     93  9549  sh162551 +	EMsgPort *msgport;
     94  9549  sh162551  
     95  9549  sh162551 -	mp = g_malloc(sizeof(*mp));
     96  9549  sh162551 -	e_dlist_init(&mp->queue);
     97  9549  sh162551 -	mp->lock = g_mutex_new();
     98  9549  sh162551 -	mp->cond = g_cond_new();
     99  9549  sh162551 -	e_pipe (mp->pipe.pipe);
    100  9549  sh162551 +	msgport = g_slice_new (EMsgPort);
    101  9549  sh162551 +	msgport->queue = g_async_queue_new ();
    102  9549  sh162551 +	msgport->cache = NULL;
    103  9549  sh162551 +	msgport->pipe[0] = -1;
    104  9549  sh162551 +	msgport->pipe[1] = -1;
    105  9549  sh162551  #ifdef HAVE_NSS
    106  9549  sh162551 -	e_prpipe (mp->prpipe.pipe);
    107  9549  sh162551 +	msgport->prpipe[0] = NULL;
    108  9549  sh162551 +	msgport->prpipe[1] = NULL;
    109  9549  sh162551  #endif
    110  9549  sh162551 -	mp->condwait = 0;
    111  9549  sh162551  
    112  9549  sh162551 -	return mp;
    113  9549  sh162551 +	return msgport;
    114  9549  sh162551  }
    115  9549  sh162551  
    116  9549  sh162551 -void e_msgport_destroy(EMsgPort *mp)
    117  9549  sh162551 +void
    118  9549  sh162551 +e_msgport_destroy (EMsgPort *msgport)
    119  9549  sh162551  {
    120  9549  sh162551 -	g_mutex_free(mp->lock);
    121  9549  sh162551 -	g_cond_free(mp->cond);
    122  9549  sh162551 -	if (mp->pipe.fd.read != -1) {
    123  9549  sh162551 -		E_CLOSE(mp->pipe.fd.read);
    124  9549  sh162551 -		E_CLOSE(mp->pipe.fd.write);
    125  9549  sh162551 +	g_return_if_fail (msgport != NULL);
    126  9549  sh162551 +
    127  9549  sh162551 +	if (msgport->pipe[0] >= 0) {
    128  9549  sh162551 +		E_CLOSE (msgport->pipe[0]);
    129  9549  sh162551 +		E_CLOSE (msgport->pipe[1]);
    130  9549  sh162551  	}
    131  9549  sh162551  #ifdef HAVE_NSS
    132  9549  sh162551 -	if (mp->prpipe.fd.read) {
    133  9549  sh162551 -		PR_Close(mp->prpipe.fd.read);
    134  9549  sh162551 -		PR_Close(mp->prpipe.fd.write);
    135  9549  sh162551 +	if (msgport->prpipe[0] != NULL) {
    136  9549  sh162551 +		PR_Close (msgport->prpipe[0]);
    137  9549  sh162551 +		PR_Close (msgport->prpipe[1]);
    138  9549  sh162551  	}
    139  9549  sh162551  #endif
    140  9549  sh162551 -	g_free(mp);
    141  9549  sh162551 +
    142  9549  sh162551 +	g_async_queue_unref (msgport->queue);
    143  9549  sh162551 +	g_slice_free (EMsgPort, msgport);
    144  9549  sh162551  }
    145  9549  sh162551  
    146  9549  sh162551 -/* get a fd that can be used to wait on the port asynchronously */
    147  9549  sh162551 -int e_msgport_fd(EMsgPort *mp)
    148  9549  sh162551 +int
    149  9549  sh162551 +e_msgport_fd (EMsgPort *msgport)
    150  9549  sh162551  {
    151  9549  sh162551 -	return mp->pipe.fd.read;
    152  9549  sh162551 +	gint fd;
    153  9549  sh162551 +
    154  9549  sh162551 +	g_return_val_if_fail (msgport != NULL, -1);
    155  9549  sh162551 +
    156  9549  sh162551 +	g_async_queue_lock (msgport->queue);
    157  9549  sh162551 +	fd = msgport->pipe[0];
    158  9549  sh162551 +	if (fd < 0 && e_pipe (msgport->pipe) == 0)
    159  9549  sh162551 +		fd = msgport->pipe[0];
    160  9549  sh162551 +	g_async_queue_unlock (msgport->queue);
    161  9549  sh162551 +
    162  9549  sh162551 +	return fd;
    163  9549  sh162551  }
    164  9549  sh162551  
    165  9549  sh162551  #ifdef HAVE_NSS
    166  9549  sh162551 -PRFileDesc *e_msgport_prfd(EMsgPort *mp)
    167  9549  sh162551 +PRFileDesc *
    168  9549  sh162551 +e_msgport_prfd (EMsgPort *msgport)
    169  9549  sh162551  {
    170  9549  sh162551 -	return mp->prpipe.fd.read;
    171  9549  sh162551 +	PRFileDesc *prfd;
    172  9549  sh162551 +
    173  9549  sh162551 +	g_return_val_if_fail (msgport != NULL, NULL);
    174  9549  sh162551 +
    175  9549  sh162551 +	g_async_queue_lock (msgport->queue);
    176  9549  sh162551 +	prfd = msgport->prpipe[0];
    177  9549  sh162551 +	if (prfd == NULL && e_prpipe (msgport->prpipe) == 0)
    178  9549  sh162551 +		prfd = msgport->prpipe[0];
    179  9549  sh162551 +	g_async_queue_unlock (msgport->queue);
    180  9549  sh162551 +
    181  9549  sh162551 +	return prfd;
    182  9549  sh162551  }
    183  9549  sh162551  #endif
    184  9549  sh162551  
    185  9549  sh162551 -void e_msgport_put(EMsgPort *mp, EMsg *msg)
    186  9549  sh162551 +void
    187  9549  sh162551 +e_msgport_put (EMsgPort *msgport, EMsg *msg)
    188  9549  sh162551  {
    189  9549  sh162551 +	gint fd;
    190  9549  sh162551  #ifdef HAVE_NSS
    191  9549  sh162551  	PRFileDesc *prfd;
    192  9549  sh162551  #endif
    193  9549  sh162551 -	ssize_t w;
    194  9549  sh162551 -	int fd;
    195  9549  sh162551 -	
    196  9549  sh162551 -	m(printf("put:\n"));
    197  9549  sh162551 -	g_mutex_lock(mp->lock);
    198  9549  sh162551 -	e_dlist_addtail(&mp->queue, &msg->ln);
    199  9549  sh162551 -	if (mp->condwait > 0) {
    200  9549  sh162551 -		m(printf("put: condwait > 0, waking up\n"));
    201  9549  sh162551 -		g_cond_signal(mp->cond);
    202  9549  sh162551 +
    203  9549  sh162551 +	g_return_if_fail (msgport != NULL);
    204  9549  sh162551 +	g_return_if_fail (msg != NULL);
    205  9549  sh162551 +
    206  9549  sh162551 +	g_async_queue_lock (msgport->queue);
    207  9549  sh162551 +
    208  9549  sh162551 +	msg->flags = 0;
    209  9549  sh162551 +
    210  9549  sh162551 +	fd = msgport->pipe[1];
    211  9549  sh162551 +	while (fd >= 0) {
    212  9549  sh162551 +		if (E_WRITE (fd, "E", 1) > 0) {
    213  9549  sh162551 +			msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
    214  9549  sh162551 +			break;
    215  9549  sh162551 +		} else if (!E_IS_STATUS_INTR ()) {
    216  9549  sh162551 +			g_warning ("%s: Failed to write to pipe: %s",
    217  9549  sh162551 +				G_STRFUNC, g_strerror (errno));
    218  9549  sh162551 +			break;
    219  9549  sh162551 +		}
    220  9549  sh162551  	}
    221  9549  sh162551 -	
    222  9549  sh162551 -	fd = mp->pipe.fd.write;
    223  9549  sh162551 -#ifdef HAVE_NSS
    224  9549  sh162551 -	prfd = mp->prpipe.fd.write;
    225  9549  sh162551 -#endif
    226  9549  sh162551 -	g_mutex_unlock(mp->lock);
    227  9549  sh162551  
    228  9549  sh162551  #ifdef HAVE_NSS
    229  9549  sh162551 -	if (prfd != NULL) {
    230  9549  sh162551 -		m(printf("put: have pr pipe, writing notification to it\n"));
    231  9549  sh162551 -		do {
    232  9549  sh162551 -			w = PR_Write (prfd, "E", 1);
    233  9549  sh162551 -		} while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
    234  9549  sh162551 +	prfd = msgport->prpipe[1];
    235  9549  sh162551 +	while (prfd != NULL) {
    236  9549  sh162551 +		if (PR_Write (prfd, "E", 1) > 0) {
    237  9549  sh162551 +			msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
    238  9549  sh162551 +			break;
    239  9549  sh162551 +		} else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
    240  9549  sh162551 +			gchar *text = g_alloca (PR_GetErrorTextLength ());
    241  9549  sh162551 +			PR_GetErrorText (text);
    242  9549  sh162551 +			g_warning ("%s: Failed to write to NSPR pipe: %s",
    243  9549  sh162551 +				G_STRFUNC, text);
    244  9549  sh162551 +			break;
    245  9549  sh162551 +		}
    246  9549  sh162551  	}
    247  9549  sh162551  #endif
    248  9549  sh162551 -	if (fd != -1) {
    249  9549  sh162551 -		m(printf("put: have pipe, writing notification to it\n"));
    250  9549  sh162551 -		do {
    251  9549  sh162551 -			w = E_WRITE (fd, "E", 1);
    252  9549  sh162551 -		} while (w == -1 && E_IS_STATUS_INTR ());
    253  9549  sh162551 -	}
    254  9549  sh162551  
    255  9549  sh162551 -	m(printf("put: done\n"));
    256  9549  sh162551 +	g_async_queue_push_unlocked (msgport->queue, msg);
    257  9549  sh162551 +	g_async_queue_unlock (msgport->queue);
    258  9549  sh162551  }
    259  9549  sh162551  
    260  9549  sh162551 -static void
    261  9549  sh162551 -msgport_cleanlock(void *data)
    262  9549  sh162551 +EMsg *
    263  9549  sh162551 +e_msgport_wait (EMsgPort *msgport)
    264  9549  sh162551  {
    265  9549  sh162551 -	EMsgPort *mp = data;
    266  9549  sh162551 +	EMsg *msg;
    267  9549  sh162551  
    268  9549  sh162551 -	g_mutex_unlock(mp->lock);
    269  9549  sh162551 -}
    270  9549  sh162551 +	g_return_val_if_fail (msgport != NULL, NULL);
    271  9549  sh162551  
    272  9549  sh162551 -EMsg *e_msgport_wait(EMsgPort *mp)
    273  9549  sh162551 -{
    274  9549  sh162551 -	EMsg *msg;
    275  9549  sh162551 +	g_async_queue_lock (msgport->queue);
    276  9549  sh162551  
    277  9549  sh162551 -	m(printf("wait:\n"));
    278  9549  sh162551 -	g_mutex_lock(mp->lock);
    279  9549  sh162551 -	while (e_dlist_empty(&mp->queue)) {
    280  9549  sh162551 -		if (mp->pipe.fd.read != -1) {
    281  9549  sh162551 -			fd_set rfds;
    282  9549  sh162551 -			int retry;
    283  9549  sh162551 -
    284  9549  sh162551 -			m(printf("wait: waiting on pipe\n"));
    285  9549  sh162551 -			g_mutex_unlock(mp->lock);
    286  9549  sh162551 -			do {
    287  9549  sh162551 -				FD_ZERO(&rfds);
    288  9549  sh162551 -				FD_SET(mp->pipe.fd.read, &rfds);
    289  9549  sh162551 -				retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR();
    290  9549  sh162551 -				pthread_testcancel();
    291  9549  sh162551 -			} while (retry);
    292  9549  sh162551 -			g_mutex_lock(mp->lock);
    293  9549  sh162551 -			m(printf("wait: got pipe\n"));
    294  9549  sh162551 -#ifdef HAVE_NSS
    295  9549  sh162551 -		} else if (mp->prpipe.fd.read != NULL) {
    296  9549  sh162551 -			PRPollDesc rfds[1];
    297  9549  sh162551 -			int retry;
    298  9549  sh162551 -
    299  9549  sh162551 -			m(printf("wait: waitng on pr pipe\n"));
    300  9549  sh162551 -			g_mutex_unlock(mp->lock);
    301  9549  sh162551 -			do {
    302  9549  sh162551 -				rfds[0].fd = mp->prpipe.fd.read;
    303  9549  sh162551 -				rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR;
    304  9549  sh162551 -				retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR;
    305  9549  sh162551 -				pthread_testcancel();
    306  9549  sh162551 -			} while (retry);
    307  9549  sh162551 -			g_mutex_lock(mp->lock);
    308  9549  sh162551 -			m(printf("wait: got pr pipe\n"));
    309  9549  sh162551 -#endif /* HAVE_NSS */
    310  9549  sh162551 -		} else {
    311  9549  sh162551 -			m(printf("wait: waiting on condition\n"));
    312  9549  sh162551 -			mp->condwait++;
    313  9549  sh162551 -			/* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
    314  9549  sh162551 -			pthread_cleanup_push(msgport_cleanlock, mp);
    315  9549  sh162551 -			g_cond_wait(mp->cond, mp->lock);
    316  9549  sh162551 -			pthread_cleanup_pop(0);
    317  9549  sh162551 -			m(printf("wait: got condition\n"));
    318  9549  sh162551 -			mp->condwait--;
    319  9549  sh162551 -		}
    320  9549  sh162551 +	/* check the cache first */
    321  9549  sh162551 +	if (msgport->cache != NULL) {
    322  9549  sh162551 +		msg = msgport->cache;
    323  9549  sh162551 +		/* don't clear the cache */
    324  9549  sh162551 +		g_async_queue_unlock (msgport->queue);
    325  9549  sh162551 +		return msg;
    326  9549  sh162551  	}
    327  9549  sh162551 -	msg = (EMsg *)mp->queue.head;
    328  9549  sh162551 -	m(printf("wait: message = %p\n", msg));
    329  9549  sh162551 -	g_mutex_unlock(mp->lock);
    330  9549  sh162551 -	m(printf("wait: done\n"));
    331  9549  sh162551 +
    332  9549  sh162551 +	msg = g_async_queue_pop_unlocked (msgport->queue);
    333  9549  sh162551 +
    334  9549  sh162551 +	g_assert (msg != NULL);
    335  9549  sh162551 +
    336  9549  sh162551 +	/* The message is not actually "removed" from the EMsgPort until
    337  9549  sh162551 + 	 * e_msgport_get() is called.  So we cache the popped message. */
    338  9549  sh162551 +	msgport->cache = msg;
    339  9549  sh162551 +
    340  9549  sh162551 +	if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
    341  9549  sh162551 +		msgport_sync_with_pipe (msgport->pipe[0]);
    342  9549  sh162551 +#ifdef HAVE_NSS
    343  9549  sh162551 +	if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
    344  9549  sh162551 +		msgport_sync_with_prpipe (msgport->prpipe[0]);
    345  9549  sh162551 +#endif
    346  9549  sh162551 +
    347  9549  sh162551 +	g_async_queue_unlock (msgport->queue);
    348  9549  sh162551 +
    349  9549  sh162551  	return msg;
    350  9549  sh162551  }
    351  9549  sh162551  
    352  9549  sh162551 -EMsg *e_msgport_get(EMsgPort *mp)
    353  9549  sh162551 +EMsg *
    354  9549  sh162551 +e_msgport_get (EMsgPort *msgport)
    355  9549  sh162551  {
    356  9549  sh162551  	EMsg *msg;
    357  9549  sh162551 -	char dummy[1];
    358  9549  sh162551 -	ssize_t n;
    359  9549  sh162551 -	
    360  9549  sh162551 -	g_mutex_lock(mp->lock);
    361  9549  sh162551 -	msg = (EMsg *)e_dlist_remhead(&mp->queue);
    362  9549  sh162551 -	if (msg) {
    363  9549  sh162551 -		if (mp->pipe.fd.read != -1) {
    364  9549  sh162551 -			do {
    365  9549  sh162551 -				n = E_READ (mp->pipe.fd.read, dummy, 1);
    366  9549  sh162551 -			} while (n == -1 && E_IS_STATUS_INTR ());
    367  9549  sh162551 -		}
    368  9549  sh162551 +
    369  9549  sh162551 +	g_return_val_if_fail (msgport != NULL, NULL);
    370  9549  sh162551 +
    371  9549  sh162551 +	g_async_queue_lock (msgport->queue);
    372  9549  sh162551 +
    373  9549  sh162551 +	/* check the cache first */
    374  9549  sh162551 +	if (msgport->cache != NULL) {
    375  9549  sh162551 +		msg = msgport->cache;
    376  9549  sh162551 +		msgport->cache = NULL;
    377  9549  sh162551 +		g_async_queue_unlock (msgport->queue);
    378  9549  sh162551 +		return msg;
    379  9549  sh162551 +	}
    380  9549  sh162551 +
    381  9549  sh162551 +	msg = g_async_queue_try_pop_unlocked (msgport->queue);
    382  9549  sh162551 +
    383  9549  sh162551 +	if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
    384  9549  sh162551 +		msgport_sync_with_pipe (msgport->pipe[0]);
    385  9549  sh162551  #ifdef HAVE_NSS
    386  9549  sh162551 -		if (mp->prpipe.fd.read != NULL) {
    387  9549  sh162551 -			do {
    388  9549  sh162551 -				n = PR_Read (mp->prpipe.fd.read, dummy, 1);
    389  9549  sh162551 -			} while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
    390  9549  sh162551 -		}
    391  9549  sh162551 +	if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
    392  9549  sh162551 +		msgport_sync_with_prpipe (msgport->prpipe[0]);
    393  9549  sh162551  #endif
    394  9549  sh162551 -	}
    395  9549  sh162551 -	m(printf("get: message = %p\n", msg));
    396  9549  sh162551 -	g_mutex_unlock(mp->lock);
    397  9549  sh162551 +
    398  9549  sh162551 +	g_async_queue_unlock (msgport->queue);
    399  9549  sh162551  
    400  9549  sh162551  	return msg;
    401  9549  sh162551  }
    402  9549  sh162551  
    403  9549  sh162551 -void e_msgport_reply(EMsg *msg)
    404  9549  sh162551 +void
    405  9549  sh162551 +e_msgport_reply (EMsg *msg)
    406  9549  sh162551  {
    407  9549  sh162551 -	if (msg->reply_port) {
    408  9549  sh162551 -		e_msgport_put(msg->reply_port, msg);
    409  9549  sh162551 -	}
    410  9549  sh162551 +	g_return_if_fail (msg != NULL);
    411  9549  sh162551 +
    412  9549  sh162551 +	if (msg->reply_port)
    413  9549  sh162551 +		e_msgport_put (msg->reply_port, msg);
    414  9549  sh162551 +
    415  9549  sh162551  	/* else lost? */
    416  9549  sh162551  }
    417  9549  sh162551  
    418  9549  sh162551 @@ -1099,7 +1137,7 @@ void e_thread_put(EThread *e, EMsg *msg)
    419  9549  sh162551  	switch(e->type) {
    420  9549  sh162551  	case E_THREAD_QUEUE:
    421  9549  sh162551  		/* if the queue is full, lose this new addition */
    422  9549  sh162551 -		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
    423  9549  sh162551 +		if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
    424  9549  sh162551  			e_msgport_put(e->server_port, msg);
    425  9549  sh162551  		} else {
    426  9549  sh162551  			printf("queue limit reached, dropping new message\n");
    427  9549  sh162551 @@ -1108,7 +1146,7 @@ void e_thread_put(EThread *e, EMsg *msg)
    428  9549  sh162551  		break;
    429  9549  sh162551  	case E_THREAD_DROP:
    430  9549  sh162551  		/* if the queue is full, lose the oldest (unprocessed) message */
    431  9549  sh162551 -		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
    432  9549  sh162551 +		if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
    433  9549  sh162551  			e_msgport_put(e->server_port, msg);
    434  9549  sh162551  		} else {
    435  9549  sh162551  			printf("queue limit reached, dropping old message\n");
    436  9549  sh162551 Index: libedataserver/e-msgport.h
    437  9549  sh162551 ===================================================================
    438  9549  sh162551 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.h,v
    439  9549  sh162551 retrieving revision 1.3
    440  9549  sh162551 diff -u -p -r1.3 e-msgport.h
    441  9549  sh162551 --- libedataserver/e-msgport.h	3 Dec 2004 03:33:06 -0000	1.3
    442  9549  sh162551 +++ libedataserver/e-msgport.h	21 Sep 2006 16:49:41 -0000
    443  9549  sh162551 @@ -56,6 +56,7 @@ typedef struct _EMsgPort EMsgPort;
    444  9549  sh162551  typedef struct _EMsg {
    445  9549  sh162551  	EDListNode ln;
    446  9549  sh162551  	EMsgPort *reply_port;
    447  9549  sh162551 +	gint flags;
    448  9549  sh162551  } EMsg;
    449  9549  sh162551  
    450  9549  sh162551  EMsgPort *e_msgport_new(void);
    451  9549  sh162551 
    452