1 9549 sh162551 Index: libedataserver/e-msgport.c 2 9549 sh162551 =================================================================== 3 9549 sh162551 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.c,v 4 9549 sh162551 retrieving revision 1.10 5 9549 sh162551 diff -u -p -r1.10 e-msgport.c 6 9549 sh162551 --- libedataserver/e-msgport.c 26 Jul 2006 18:12:08 -0000 1.10 7 9549 sh162551 +++ libedataserver/e-msgport.c 21 Sep 2006 16:49:41 -0000 8 9549 sh162551 @@ -486,29 +486,19 @@ em_cache_clear(EMCache *emc) 9 9549 sh162551 } 10 9549 sh162551 11 9549 sh162551 struct _EMsgPort { 12 9549 sh162551 - EDList queue; 13 9549 sh162551 - int condwait; /* how many waiting in condwait */ 14 9549 sh162551 - union { 15 9549 sh162551 - int pipe[2]; /* On Win32 actually a pair of SOCKETs */ 16 9549 sh162551 - struct { 17 9549 sh162551 - int read; 18 9549 sh162551 - int write; 19 9549 sh162551 - } fd; 20 9549 sh162551 - } pipe; 21 9549 sh162551 + GAsyncQueue *queue; 22 9549 sh162551 + EMsg *cache; 23 9549 sh162551 + gint pipe[2]; /* on Win32, actually a pair of SOCKETs */ 24 9549 sh162551 #ifdef HAVE_NSS 25 9549 sh162551 - union { 26 9549 sh162551 - PRFileDesc *pipe[2]; 27 9549 sh162551 - struct { 28 9549 sh162551 - PRFileDesc *read; 29 9549 sh162551 - PRFileDesc *write; 30 9549 sh162551 - } fd; 31 9549 sh162551 - } prpipe; 32 9549 sh162551 -#endif 33 9549 sh162551 - /* @#@$#$ glib stuff */ 34 9549 sh162551 - GCond *cond; 35 9549 sh162551 - GMutex *lock; 36 9549 sh162551 + PRFileDesc *prpipe[2]; 37 9549 sh162551 +#endif 38 9549 sh162551 }; 39 9549 sh162551 40 9549 sh162551 +/* message flags */ 41 9549 sh162551 +enum { 42 9549 sh162551 + MSG_FLAG_SYNC_WITH_PIPE = 1 << 0, 43 9549 sh162551 + MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1 44 9549 sh162551 +}; 45 9549 sh162551 46 9549 sh162551 #ifdef HAVE_NSS 47 9549 sh162551 static int 48 9549 sh162551 @@ -529,189 +519,237 @@ e_prpipe (PRFileDesc **fds) 49 9549 sh162551 } 50 9549 sh162551 #endif 51 9549 sh162551 52 9549 sh162551 -EMsgPort *e_msgport_new(void) 53 9549 sh162551 +static void 54 9549 sh162551 +msgport_sync_with_pipe (gint fd) 55 9549 sh162551 +{ 56 9549 sh162551 + gchar buffer[1]; 57 9549 sh162551 + 58 9549 sh162551 + while (fd >= 0) { 59 9549 sh162551 + if (E_READ (fd, buffer, 1) > 0) 60 9549 sh162551 + break; 61 9549 sh162551 + else if (!E_IS_STATUS_INTR ()) { 62 9549 sh162551 + g_warning ("%s: Failed to read from pipe: %s", 63 9549 sh162551 + G_STRFUNC, g_strerror (errno)); 64 9549 sh162551 + break; 65 9549 sh162551 + } 66 9549 sh162551 + } 67 9549 sh162551 +} 68 9549 sh162551 + 69 9549 sh162551 +#ifdef HAVE_NSS 70 9549 sh162551 +static void 71 9549 sh162551 +msgport_sync_with_prpipe (PRFileDesc *prfd) 72 9549 sh162551 +{ 73 9549 sh162551 + gchar buffer[1]; 74 9549 sh162551 + 75 9549 sh162551 + while (prfd != NULL) { 76 9549 sh162551 + if (PR_Read (prfd, buffer, 1) > 0) 77 9549 sh162551 + break; 78 9549 sh162551 + else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { 79 9549 sh162551 + gchar *text = g_alloca (PR_GetErrorTextLength ()); 80 9549 sh162551 + PR_GetErrorText (text); 81 9549 sh162551 + g_warning ("%s: Failed to read from NSPR pipe: %s", 82 9549 sh162551 + G_STRFUNC, text); 83 9549 sh162551 + break; 84 9549 sh162551 + } 85 9549 sh162551 + } 86 9549 sh162551 +} 87 9549 sh162551 +#endif 88 9549 sh162551 + 89 9549 sh162551 +EMsgPort * 90 9549 sh162551 +e_msgport_new (void) 91 9549 sh162551 { 92 9549 sh162551 - EMsgPort *mp; 93 9549 sh162551 + EMsgPort *msgport; 94 9549 sh162551 95 9549 sh162551 - mp = g_malloc(sizeof(*mp)); 96 9549 sh162551 - e_dlist_init(&mp->queue); 97 9549 sh162551 - mp->lock = g_mutex_new(); 98 9549 sh162551 - mp->cond = g_cond_new(); 99 9549 sh162551 - e_pipe (mp->pipe.pipe); 100 9549 sh162551 + msgport = g_slice_new (EMsgPort); 101 9549 sh162551 + msgport->queue = g_async_queue_new (); 102 9549 sh162551 + msgport->cache = NULL; 103 9549 sh162551 + msgport->pipe[0] = -1; 104 9549 sh162551 + msgport->pipe[1] = -1; 105 9549 sh162551 #ifdef HAVE_NSS 106 9549 sh162551 - e_prpipe (mp->prpipe.pipe); 107 9549 sh162551 + msgport->prpipe[0] = NULL; 108 9549 sh162551 + msgport->prpipe[1] = NULL; 109 9549 sh162551 #endif 110 9549 sh162551 - mp->condwait = 0; 111 9549 sh162551 112 9549 sh162551 - return mp; 113 9549 sh162551 + return msgport; 114 9549 sh162551 } 115 9549 sh162551 116 9549 sh162551 -void e_msgport_destroy(EMsgPort *mp) 117 9549 sh162551 +void 118 9549 sh162551 +e_msgport_destroy (EMsgPort *msgport) 119 9549 sh162551 { 120 9549 sh162551 - g_mutex_free(mp->lock); 121 9549 sh162551 - g_cond_free(mp->cond); 122 9549 sh162551 - if (mp->pipe.fd.read != -1) { 123 9549 sh162551 - E_CLOSE(mp->pipe.fd.read); 124 9549 sh162551 - E_CLOSE(mp->pipe.fd.write); 125 9549 sh162551 + g_return_if_fail (msgport != NULL); 126 9549 sh162551 + 127 9549 sh162551 + if (msgport->pipe[0] >= 0) { 128 9549 sh162551 + E_CLOSE (msgport->pipe[0]); 129 9549 sh162551 + E_CLOSE (msgport->pipe[1]); 130 9549 sh162551 } 131 9549 sh162551 #ifdef HAVE_NSS 132 9549 sh162551 - if (mp->prpipe.fd.read) { 133 9549 sh162551 - PR_Close(mp->prpipe.fd.read); 134 9549 sh162551 - PR_Close(mp->prpipe.fd.write); 135 9549 sh162551 + if (msgport->prpipe[0] != NULL) { 136 9549 sh162551 + PR_Close (msgport->prpipe[0]); 137 9549 sh162551 + PR_Close (msgport->prpipe[1]); 138 9549 sh162551 } 139 9549 sh162551 #endif 140 9549 sh162551 - g_free(mp); 141 9549 sh162551 + 142 9549 sh162551 + g_async_queue_unref (msgport->queue); 143 9549 sh162551 + g_slice_free (EMsgPort, msgport); 144 9549 sh162551 } 145 9549 sh162551 146 9549 sh162551 -/* get a fd that can be used to wait on the port asynchronously */ 147 9549 sh162551 -int e_msgport_fd(EMsgPort *mp) 148 9549 sh162551 +int 149 9549 sh162551 +e_msgport_fd (EMsgPort *msgport) 150 9549 sh162551 { 151 9549 sh162551 - return mp->pipe.fd.read; 152 9549 sh162551 + gint fd; 153 9549 sh162551 + 154 9549 sh162551 + g_return_val_if_fail (msgport != NULL, -1); 155 9549 sh162551 + 156 9549 sh162551 + g_async_queue_lock (msgport->queue); 157 9549 sh162551 + fd = msgport->pipe[0]; 158 9549 sh162551 + if (fd < 0 && e_pipe (msgport->pipe) == 0) 159 9549 sh162551 + fd = msgport->pipe[0]; 160 9549 sh162551 + g_async_queue_unlock (msgport->queue); 161 9549 sh162551 + 162 9549 sh162551 + return fd; 163 9549 sh162551 } 164 9549 sh162551 165 9549 sh162551 #ifdef HAVE_NSS 166 9549 sh162551 -PRFileDesc *e_msgport_prfd(EMsgPort *mp) 167 9549 sh162551 +PRFileDesc * 168 9549 sh162551 +e_msgport_prfd (EMsgPort *msgport) 169 9549 sh162551 { 170 9549 sh162551 - return mp->prpipe.fd.read; 171 9549 sh162551 + PRFileDesc *prfd; 172 9549 sh162551 + 173 9549 sh162551 + g_return_val_if_fail (msgport != NULL, NULL); 174 9549 sh162551 + 175 9549 sh162551 + g_async_queue_lock (msgport->queue); 176 9549 sh162551 + prfd = msgport->prpipe[0]; 177 9549 sh162551 + if (prfd == NULL && e_prpipe (msgport->prpipe) == 0) 178 9549 sh162551 + prfd = msgport->prpipe[0]; 179 9549 sh162551 + g_async_queue_unlock (msgport->queue); 180 9549 sh162551 + 181 9549 sh162551 + return prfd; 182 9549 sh162551 } 183 9549 sh162551 #endif 184 9549 sh162551 185 9549 sh162551 -void e_msgport_put(EMsgPort *mp, EMsg *msg) 186 9549 sh162551 +void 187 9549 sh162551 +e_msgport_put (EMsgPort *msgport, EMsg *msg) 188 9549 sh162551 { 189 9549 sh162551 + gint fd; 190 9549 sh162551 #ifdef HAVE_NSS 191 9549 sh162551 PRFileDesc *prfd; 192 9549 sh162551 #endif 193 9549 sh162551 - ssize_t w; 194 9549 sh162551 - int fd; 195 9549 sh162551 - 196 9549 sh162551 - m(printf("put:\n")); 197 9549 sh162551 - g_mutex_lock(mp->lock); 198 9549 sh162551 - e_dlist_addtail(&mp->queue, &msg->ln); 199 9549 sh162551 - if (mp->condwait > 0) { 200 9549 sh162551 - m(printf("put: condwait > 0, waking up\n")); 201 9549 sh162551 - g_cond_signal(mp->cond); 202 9549 sh162551 + 203 9549 sh162551 + g_return_if_fail (msgport != NULL); 204 9549 sh162551 + g_return_if_fail (msg != NULL); 205 9549 sh162551 + 206 9549 sh162551 + g_async_queue_lock (msgport->queue); 207 9549 sh162551 + 208 9549 sh162551 + msg->flags = 0; 209 9549 sh162551 + 210 9549 sh162551 + fd = msgport->pipe[1]; 211 9549 sh162551 + while (fd >= 0) { 212 9549 sh162551 + if (E_WRITE (fd, "E", 1) > 0) { 213 9549 sh162551 + msg->flags |= MSG_FLAG_SYNC_WITH_PIPE; 214 9549 sh162551 + break; 215 9549 sh162551 + } else if (!E_IS_STATUS_INTR ()) { 216 9549 sh162551 + g_warning ("%s: Failed to write to pipe: %s", 217 9549 sh162551 + G_STRFUNC, g_strerror (errno)); 218 9549 sh162551 + break; 219 9549 sh162551 + } 220 9549 sh162551 } 221 9549 sh162551 - 222 9549 sh162551 - fd = mp->pipe.fd.write; 223 9549 sh162551 -#ifdef HAVE_NSS 224 9549 sh162551 - prfd = mp->prpipe.fd.write; 225 9549 sh162551 -#endif 226 9549 sh162551 - g_mutex_unlock(mp->lock); 227 9549 sh162551 228 9549 sh162551 #ifdef HAVE_NSS 229 9549 sh162551 - if (prfd != NULL) { 230 9549 sh162551 - m(printf("put: have pr pipe, writing notification to it\n")); 231 9549 sh162551 - do { 232 9549 sh162551 - w = PR_Write (prfd, "E", 1); 233 9549 sh162551 - } while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR); 234 9549 sh162551 + prfd = msgport->prpipe[1]; 235 9549 sh162551 + while (prfd != NULL) { 236 9549 sh162551 + if (PR_Write (prfd, "E", 1) > 0) { 237 9549 sh162551 + msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE; 238 9549 sh162551 + break; 239 9549 sh162551 + } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { 240 9549 sh162551 + gchar *text = g_alloca (PR_GetErrorTextLength ()); 241 9549 sh162551 + PR_GetErrorText (text); 242 9549 sh162551 + g_warning ("%s: Failed to write to NSPR pipe: %s", 243 9549 sh162551 + G_STRFUNC, text); 244 9549 sh162551 + break; 245 9549 sh162551 + } 246 9549 sh162551 } 247 9549 sh162551 #endif 248 9549 sh162551 - if (fd != -1) { 249 9549 sh162551 - m(printf("put: have pipe, writing notification to it\n")); 250 9549 sh162551 - do { 251 9549 sh162551 - w = E_WRITE (fd, "E", 1); 252 9549 sh162551 - } while (w == -1 && E_IS_STATUS_INTR ()); 253 9549 sh162551 - } 254 9549 sh162551 255 9549 sh162551 - m(printf("put: done\n")); 256 9549 sh162551 + g_async_queue_push_unlocked (msgport->queue, msg); 257 9549 sh162551 + g_async_queue_unlock (msgport->queue); 258 9549 sh162551 } 259 9549 sh162551 260 9549 sh162551 -static void 261 9549 sh162551 -msgport_cleanlock(void *data) 262 9549 sh162551 +EMsg * 263 9549 sh162551 +e_msgport_wait (EMsgPort *msgport) 264 9549 sh162551 { 265 9549 sh162551 - EMsgPort *mp = data; 266 9549 sh162551 + EMsg *msg; 267 9549 sh162551 268 9549 sh162551 - g_mutex_unlock(mp->lock); 269 9549 sh162551 -} 270 9549 sh162551 + g_return_val_if_fail (msgport != NULL, NULL); 271 9549 sh162551 272 9549 sh162551 -EMsg *e_msgport_wait(EMsgPort *mp) 273 9549 sh162551 -{ 274 9549 sh162551 - EMsg *msg; 275 9549 sh162551 + g_async_queue_lock (msgport->queue); 276 9549 sh162551 277 9549 sh162551 - m(printf("wait:\n")); 278 9549 sh162551 - g_mutex_lock(mp->lock); 279 9549 sh162551 - while (e_dlist_empty(&mp->queue)) { 280 9549 sh162551 - if (mp->pipe.fd.read != -1) { 281 9549 sh162551 - fd_set rfds; 282 9549 sh162551 - int retry; 283 9549 sh162551 - 284 9549 sh162551 - m(printf("wait: waiting on pipe\n")); 285 9549 sh162551 - g_mutex_unlock(mp->lock); 286 9549 sh162551 - do { 287 9549 sh162551 - FD_ZERO(&rfds); 288 9549 sh162551 - FD_SET(mp->pipe.fd.read, &rfds); 289 9549 sh162551 - retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR(); 290 9549 sh162551 - pthread_testcancel(); 291 9549 sh162551 - } while (retry); 292 9549 sh162551 - g_mutex_lock(mp->lock); 293 9549 sh162551 - m(printf("wait: got pipe\n")); 294 9549 sh162551 -#ifdef HAVE_NSS 295 9549 sh162551 - } else if (mp->prpipe.fd.read != NULL) { 296 9549 sh162551 - PRPollDesc rfds[1]; 297 9549 sh162551 - int retry; 298 9549 sh162551 - 299 9549 sh162551 - m(printf("wait: waitng on pr pipe\n")); 300 9549 sh162551 - g_mutex_unlock(mp->lock); 301 9549 sh162551 - do { 302 9549 sh162551 - rfds[0].fd = mp->prpipe.fd.read; 303 9549 sh162551 - rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR; 304 9549 sh162551 - retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR; 305 9549 sh162551 - pthread_testcancel(); 306 9549 sh162551 - } while (retry); 307 9549 sh162551 - g_mutex_lock(mp->lock); 308 9549 sh162551 - m(printf("wait: got pr pipe\n")); 309 9549 sh162551 -#endif /* HAVE_NSS */ 310 9549 sh162551 - } else { 311 9549 sh162551 - m(printf("wait: waiting on condition\n")); 312 9549 sh162551 - mp->condwait++; 313 9549 sh162551 - /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */ 314 9549 sh162551 - pthread_cleanup_push(msgport_cleanlock, mp); 315 9549 sh162551 - g_cond_wait(mp->cond, mp->lock); 316 9549 sh162551 - pthread_cleanup_pop(0); 317 9549 sh162551 - m(printf("wait: got condition\n")); 318 9549 sh162551 - mp->condwait--; 319 9549 sh162551 - } 320 9549 sh162551 + /* check the cache first */ 321 9549 sh162551 + if (msgport->cache != NULL) { 322 9549 sh162551 + msg = msgport->cache; 323 9549 sh162551 + /* don't clear the cache */ 324 9549 sh162551 + g_async_queue_unlock (msgport->queue); 325 9549 sh162551 + return msg; 326 9549 sh162551 } 327 9549 sh162551 - msg = (EMsg *)mp->queue.head; 328 9549 sh162551 - m(printf("wait: message = %p\n", msg)); 329 9549 sh162551 - g_mutex_unlock(mp->lock); 330 9549 sh162551 - m(printf("wait: done\n")); 331 9549 sh162551 + 332 9549 sh162551 + msg = g_async_queue_pop_unlocked (msgport->queue); 333 9549 sh162551 + 334 9549 sh162551 + g_assert (msg != NULL); 335 9549 sh162551 + 336 9549 sh162551 + /* The message is not actually "removed" from the EMsgPort until 337 9549 sh162551 + * e_msgport_get() is called. So we cache the popped message. */ 338 9549 sh162551 + msgport->cache = msg; 339 9549 sh162551 + 340 9549 sh162551 + if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE) 341 9549 sh162551 + msgport_sync_with_pipe (msgport->pipe[0]); 342 9549 sh162551 +#ifdef HAVE_NSS 343 9549 sh162551 + if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) 344 9549 sh162551 + msgport_sync_with_prpipe (msgport->prpipe[0]); 345 9549 sh162551 +#endif 346 9549 sh162551 + 347 9549 sh162551 + g_async_queue_unlock (msgport->queue); 348 9549 sh162551 + 349 9549 sh162551 return msg; 350 9549 sh162551 } 351 9549 sh162551 352 9549 sh162551 -EMsg *e_msgport_get(EMsgPort *mp) 353 9549 sh162551 +EMsg * 354 9549 sh162551 +e_msgport_get (EMsgPort *msgport) 355 9549 sh162551 { 356 9549 sh162551 EMsg *msg; 357 9549 sh162551 - char dummy[1]; 358 9549 sh162551 - ssize_t n; 359 9549 sh162551 - 360 9549 sh162551 - g_mutex_lock(mp->lock); 361 9549 sh162551 - msg = (EMsg *)e_dlist_remhead(&mp->queue); 362 9549 sh162551 - if (msg) { 363 9549 sh162551 - if (mp->pipe.fd.read != -1) { 364 9549 sh162551 - do { 365 9549 sh162551 - n = E_READ (mp->pipe.fd.read, dummy, 1); 366 9549 sh162551 - } while (n == -1 && E_IS_STATUS_INTR ()); 367 9549 sh162551 - } 368 9549 sh162551 + 369 9549 sh162551 + g_return_val_if_fail (msgport != NULL, NULL); 370 9549 sh162551 + 371 9549 sh162551 + g_async_queue_lock (msgport->queue); 372 9549 sh162551 + 373 9549 sh162551 + /* check the cache first */ 374 9549 sh162551 + if (msgport->cache != NULL) { 375 9549 sh162551 + msg = msgport->cache; 376 9549 sh162551 + msgport->cache = NULL; 377 9549 sh162551 + g_async_queue_unlock (msgport->queue); 378 9549 sh162551 + return msg; 379 9549 sh162551 + } 380 9549 sh162551 + 381 9549 sh162551 + msg = g_async_queue_try_pop_unlocked (msgport->queue); 382 9549 sh162551 + 383 9549 sh162551 + if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE) 384 9549 sh162551 + msgport_sync_with_pipe (msgport->pipe[0]); 385 9549 sh162551 #ifdef HAVE_NSS 386 9549 sh162551 - if (mp->prpipe.fd.read != NULL) { 387 9549 sh162551 - do { 388 9549 sh162551 - n = PR_Read (mp->prpipe.fd.read, dummy, 1); 389 9549 sh162551 - } while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR); 390 9549 sh162551 - } 391 9549 sh162551 + if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) 392 9549 sh162551 + msgport_sync_with_prpipe (msgport->prpipe[0]); 393 9549 sh162551 #endif 394 9549 sh162551 - } 395 9549 sh162551 - m(printf("get: message = %p\n", msg)); 396 9549 sh162551 - g_mutex_unlock(mp->lock); 397 9549 sh162551 + 398 9549 sh162551 + g_async_queue_unlock (msgport->queue); 399 9549 sh162551 400 9549 sh162551 return msg; 401 9549 sh162551 } 402 9549 sh162551 403 9549 sh162551 -void e_msgport_reply(EMsg *msg) 404 9549 sh162551 +void 405 9549 sh162551 +e_msgport_reply (EMsg *msg) 406 9549 sh162551 { 407 9549 sh162551 - if (msg->reply_port) { 408 9549 sh162551 - e_msgport_put(msg->reply_port, msg); 409 9549 sh162551 - } 410 9549 sh162551 + g_return_if_fail (msg != NULL); 411 9549 sh162551 + 412 9549 sh162551 + if (msg->reply_port) 413 9549 sh162551 + e_msgport_put (msg->reply_port, msg); 414 9549 sh162551 + 415 9549 sh162551 /* else lost? */ 416 9549 sh162551 } 417 9549 sh162551 418 9549 sh162551 @@ -1099,7 +1137,7 @@ void e_thread_put(EThread *e, EMsg *msg) 419 9549 sh162551 switch(e->type) { 420 9549 sh162551 case E_THREAD_QUEUE: 421 9549 sh162551 /* if the queue is full, lose this new addition */ 422 9549 sh162551 - if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { 423 9549 sh162551 + if (g_async_queue_length(e->server_port->queue) < e->queue_limit) { 424 9549 sh162551 e_msgport_put(e->server_port, msg); 425 9549 sh162551 } else { 426 9549 sh162551 printf("queue limit reached, dropping new message\n"); 427 9549 sh162551 @@ -1108,7 +1146,7 @@ void e_thread_put(EThread *e, EMsg *msg) 428 9549 sh162551 break; 429 9549 sh162551 case E_THREAD_DROP: 430 9549 sh162551 /* if the queue is full, lose the oldest (unprocessed) message */ 431 9549 sh162551 - if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { 432 9549 sh162551 + if (g_async_queue_length(e->server_port->queue) < e->queue_limit) { 433 9549 sh162551 e_msgport_put(e->server_port, msg); 434 9549 sh162551 } else { 435 9549 sh162551 printf("queue limit reached, dropping old message\n"); 436 9549 sh162551 Index: libedataserver/e-msgport.h 437 9549 sh162551 =================================================================== 438 9549 sh162551 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.h,v 439 9549 sh162551 retrieving revision 1.3 440 9549 sh162551 diff -u -p -r1.3 e-msgport.h 441 9549 sh162551 --- libedataserver/e-msgport.h 3 Dec 2004 03:33:06 -0000 1.3 442 9549 sh162551 +++ libedataserver/e-msgport.h 21 Sep 2006 16:49:41 -0000 443 9549 sh162551 @@ -56,6 +56,7 @@ typedef struct _EMsgPort EMsgPort; 444 9549 sh162551 typedef struct _EMsg { 445 9549 sh162551 EDListNode ln; 446 9549 sh162551 EMsgPort *reply_port; 447 9549 sh162551 + gint flags; 448 9549 sh162551 } EMsg; 449 9549 sh162551 450 9549 sh162551 EMsgPort *e_msgport_new(void); 451 9549 sh162551 452