1 Index: libedataserver/e-msgport.c 2 =================================================================== 3 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.c,v 4 retrieving revision 1.10 5 diff -u -p -r1.10 e-msgport.c 6 --- libedataserver/e-msgport.c 26 Jul 2006 18:12:08 -0000 1.10 7 +++ libedataserver/e-msgport.c 21 Sep 2006 16:49:41 -0000 8 @@ -486,29 +486,19 @@ em_cache_clear(EMCache *emc) 9 } 10 11 struct _EMsgPort { 12 - EDList queue; 13 - int condwait; /* how many waiting in condwait */ 14 - union { 15 - int pipe[2]; /* On Win32 actually a pair of SOCKETs */ 16 - struct { 17 - int read; 18 - int write; 19 - } fd; 20 - } pipe; 21 + GAsyncQueue *queue; 22 + EMsg *cache; 23 + gint pipe[2]; /* on Win32, actually a pair of SOCKETs */ 24 #ifdef HAVE_NSS 25 - union { 26 - PRFileDesc *pipe[2]; 27 - struct { 28 - PRFileDesc *read; 29 - PRFileDesc *write; 30 - } fd; 31 - } prpipe; 32 -#endif 33 - /* @#@$#$ glib stuff */ 34 - GCond *cond; 35 - GMutex *lock; 36 + PRFileDesc *prpipe[2]; 37 +#endif 38 }; 39 40 +/* message flags */ 41 +enum { 42 + MSG_FLAG_SYNC_WITH_PIPE = 1 << 0, 43 + MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1 44 +}; 45 46 #ifdef HAVE_NSS 47 static int 48 @@ -529,189 +519,237 @@ e_prpipe (PRFileDesc **fds) 49 } 50 #endif 51 52 -EMsgPort *e_msgport_new(void) 53 +static void 54 +msgport_sync_with_pipe (gint fd) 55 +{ 56 + gchar buffer[1]; 57 + 58 + while (fd >= 0) { 59 + if (E_READ (fd, buffer, 1) > 0) 60 + break; 61 + else if (!E_IS_STATUS_INTR ()) { 62 + g_warning ("%s: Failed to read from pipe: %s", 63 + G_STRFUNC, g_strerror (errno)); 64 + break; 65 + } 66 + } 67 +} 68 + 69 +#ifdef HAVE_NSS 70 +static void 71 +msgport_sync_with_prpipe (PRFileDesc *prfd) 72 +{ 73 + gchar buffer[1]; 74 + 75 + while (prfd != NULL) { 76 + if (PR_Read (prfd, buffer, 1) > 0) 77 + break; 78 + else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { 79 + gchar *text = g_alloca (PR_GetErrorTextLength ()); 80 + PR_GetErrorText (text); 81 + g_warning ("%s: Failed to read from NSPR pipe: %s", 82 + G_STRFUNC, text); 83 + break; 84 + } 85 + } 86 +} 87 +#endif 88 + 89 +EMsgPort * 90 +e_msgport_new (void) 91 { 92 - EMsgPort *mp; 93 + EMsgPort *msgport; 94 95 - mp = g_malloc(sizeof(*mp)); 96 - e_dlist_init(&mp->queue); 97 - mp->lock = g_mutex_new(); 98 - mp->cond = g_cond_new(); 99 - e_pipe (mp->pipe.pipe); 100 + msgport = g_slice_new (EMsgPort); 101 + msgport->queue = g_async_queue_new (); 102 + msgport->cache = NULL; 103 + msgport->pipe[0] = -1; 104 + msgport->pipe[1] = -1; 105 #ifdef HAVE_NSS 106 - e_prpipe (mp->prpipe.pipe); 107 + msgport->prpipe[0] = NULL; 108 + msgport->prpipe[1] = NULL; 109 #endif 110 - mp->condwait = 0; 111 112 - return mp; 113 + return msgport; 114 } 115 116 -void e_msgport_destroy(EMsgPort *mp) 117 +void 118 +e_msgport_destroy (EMsgPort *msgport) 119 { 120 - g_mutex_free(mp->lock); 121 - g_cond_free(mp->cond); 122 - if (mp->pipe.fd.read != -1) { 123 - E_CLOSE(mp->pipe.fd.read); 124 - E_CLOSE(mp->pipe.fd.write); 125 + g_return_if_fail (msgport != NULL); 126 + 127 + if (msgport->pipe[0] >= 0) { 128 + E_CLOSE (msgport->pipe[0]); 129 + E_CLOSE (msgport->pipe[1]); 130 } 131 #ifdef HAVE_NSS 132 - if (mp->prpipe.fd.read) { 133 - PR_Close(mp->prpipe.fd.read); 134 - PR_Close(mp->prpipe.fd.write); 135 + if (msgport->prpipe[0] != NULL) { 136 + PR_Close (msgport->prpipe[0]); 137 + PR_Close (msgport->prpipe[1]); 138 } 139 #endif 140 - g_free(mp); 141 + 142 + g_async_queue_unref (msgport->queue); 143 + g_slice_free (EMsgPort, msgport); 144 } 145 146 -/* get a fd that can be used to wait on the port asynchronously */ 147 -int e_msgport_fd(EMsgPort *mp) 148 +int 149 +e_msgport_fd (EMsgPort *msgport) 150 { 151 - return mp->pipe.fd.read; 152 + gint fd; 153 + 154 + g_return_val_if_fail (msgport != NULL, -1); 155 + 156 + g_async_queue_lock (msgport->queue); 157 + fd = msgport->pipe[0]; 158 + if (fd < 0 && e_pipe (msgport->pipe) == 0) 159 + fd = msgport->pipe[0]; 160 + g_async_queue_unlock (msgport->queue); 161 + 162 + return fd; 163 } 164 165 #ifdef HAVE_NSS 166 -PRFileDesc *e_msgport_prfd(EMsgPort *mp) 167 +PRFileDesc * 168 +e_msgport_prfd (EMsgPort *msgport) 169 { 170 - return mp->prpipe.fd.read; 171 + PRFileDesc *prfd; 172 + 173 + g_return_val_if_fail (msgport != NULL, NULL); 174 + 175 + g_async_queue_lock (msgport->queue); 176 + prfd = msgport->prpipe[0]; 177 + if (prfd == NULL && e_prpipe (msgport->prpipe) == 0) 178 + prfd = msgport->prpipe[0]; 179 + g_async_queue_unlock (msgport->queue); 180 + 181 + return prfd; 182 } 183 #endif 184 185 -void e_msgport_put(EMsgPort *mp, EMsg *msg) 186 +void 187 +e_msgport_put (EMsgPort *msgport, EMsg *msg) 188 { 189 + gint fd; 190 #ifdef HAVE_NSS 191 PRFileDesc *prfd; 192 #endif 193 - ssize_t w; 194 - int fd; 195 - 196 - m(printf("put:\n")); 197 - g_mutex_lock(mp->lock); 198 - e_dlist_addtail(&mp->queue, &msg->ln); 199 - if (mp->condwait > 0) { 200 - m(printf("put: condwait > 0, waking up\n")); 201 - g_cond_signal(mp->cond); 202 + 203 + g_return_if_fail (msgport != NULL); 204 + g_return_if_fail (msg != NULL); 205 + 206 + g_async_queue_lock (msgport->queue); 207 + 208 + msg->flags = 0; 209 + 210 + fd = msgport->pipe[1]; 211 + while (fd >= 0) { 212 + if (E_WRITE (fd, "E", 1) > 0) { 213 + msg->flags |= MSG_FLAG_SYNC_WITH_PIPE; 214 + break; 215 + } else if (!E_IS_STATUS_INTR ()) { 216 + g_warning ("%s: Failed to write to pipe: %s", 217 + G_STRFUNC, g_strerror (errno)); 218 + break; 219 + } 220 } 221 - 222 - fd = mp->pipe.fd.write; 223 -#ifdef HAVE_NSS 224 - prfd = mp->prpipe.fd.write; 225 -#endif 226 - g_mutex_unlock(mp->lock); 227 228 #ifdef HAVE_NSS 229 - if (prfd != NULL) { 230 - m(printf("put: have pr pipe, writing notification to it\n")); 231 - do { 232 - w = PR_Write (prfd, "E", 1); 233 - } while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR); 234 + prfd = msgport->prpipe[1]; 235 + while (prfd != NULL) { 236 + if (PR_Write (prfd, "E", 1) > 0) { 237 + msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE; 238 + break; 239 + } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { 240 + gchar *text = g_alloca (PR_GetErrorTextLength ()); 241 + PR_GetErrorText (text); 242 + g_warning ("%s: Failed to write to NSPR pipe: %s", 243 + G_STRFUNC, text); 244 + break; 245 + } 246 } 247 #endif 248 - if (fd != -1) { 249 - m(printf("put: have pipe, writing notification to it\n")); 250 - do { 251 - w = E_WRITE (fd, "E", 1); 252 - } while (w == -1 && E_IS_STATUS_INTR ()); 253 - } 254 255 - m(printf("put: done\n")); 256 + g_async_queue_push_unlocked (msgport->queue, msg); 257 + g_async_queue_unlock (msgport->queue); 258 } 259 260 -static void 261 -msgport_cleanlock(void *data) 262 +EMsg * 263 +e_msgport_wait (EMsgPort *msgport) 264 { 265 - EMsgPort *mp = data; 266 + EMsg *msg; 267 268 - g_mutex_unlock(mp->lock); 269 -} 270 + g_return_val_if_fail (msgport != NULL, NULL); 271 272 -EMsg *e_msgport_wait(EMsgPort *mp) 273 -{ 274 - EMsg *msg; 275 + g_async_queue_lock (msgport->queue); 276 277 - m(printf("wait:\n")); 278 - g_mutex_lock(mp->lock); 279 - while (e_dlist_empty(&mp->queue)) { 280 - if (mp->pipe.fd.read != -1) { 281 - fd_set rfds; 282 - int retry; 283 - 284 - m(printf("wait: waiting on pipe\n")); 285 - g_mutex_unlock(mp->lock); 286 - do { 287 - FD_ZERO(&rfds); 288 - FD_SET(mp->pipe.fd.read, &rfds); 289 - retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR(); 290 - pthread_testcancel(); 291 - } while (retry); 292 - g_mutex_lock(mp->lock); 293 - m(printf("wait: got pipe\n")); 294 -#ifdef HAVE_NSS 295 - } else if (mp->prpipe.fd.read != NULL) { 296 - PRPollDesc rfds[1]; 297 - int retry; 298 - 299 - m(printf("wait: waitng on pr pipe\n")); 300 - g_mutex_unlock(mp->lock); 301 - do { 302 - rfds[0].fd = mp->prpipe.fd.read; 303 - rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR; 304 - retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR; 305 - pthread_testcancel(); 306 - } while (retry); 307 - g_mutex_lock(mp->lock); 308 - m(printf("wait: got pr pipe\n")); 309 -#endif /* HAVE_NSS */ 310 - } else { 311 - m(printf("wait: waiting on condition\n")); 312 - mp->condwait++; 313 - /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */ 314 - pthread_cleanup_push(msgport_cleanlock, mp); 315 - g_cond_wait(mp->cond, mp->lock); 316 - pthread_cleanup_pop(0); 317 - m(printf("wait: got condition\n")); 318 - mp->condwait--; 319 - } 320 + /* check the cache first */ 321 + if (msgport->cache != NULL) { 322 + msg = msgport->cache; 323 + /* don't clear the cache */ 324 + g_async_queue_unlock (msgport->queue); 325 + return msg; 326 } 327 - msg = (EMsg *)mp->queue.head; 328 - m(printf("wait: message = %p\n", msg)); 329 - g_mutex_unlock(mp->lock); 330 - m(printf("wait: done\n")); 331 + 332 + msg = g_async_queue_pop_unlocked (msgport->queue); 333 + 334 + g_assert (msg != NULL); 335 + 336 + /* The message is not actually "removed" from the EMsgPort until 337 + * e_msgport_get() is called. So we cache the popped message. */ 338 + msgport->cache = msg; 339 + 340 + if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE) 341 + msgport_sync_with_pipe (msgport->pipe[0]); 342 +#ifdef HAVE_NSS 343 + if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) 344 + msgport_sync_with_prpipe (msgport->prpipe[0]); 345 +#endif 346 + 347 + g_async_queue_unlock (msgport->queue); 348 + 349 return msg; 350 } 351 352 -EMsg *e_msgport_get(EMsgPort *mp) 353 +EMsg * 354 +e_msgport_get (EMsgPort *msgport) 355 { 356 EMsg *msg; 357 - char dummy[1]; 358 - ssize_t n; 359 - 360 - g_mutex_lock(mp->lock); 361 - msg = (EMsg *)e_dlist_remhead(&mp->queue); 362 - if (msg) { 363 - if (mp->pipe.fd.read != -1) { 364 - do { 365 - n = E_READ (mp->pipe.fd.read, dummy, 1); 366 - } while (n == -1 && E_IS_STATUS_INTR ()); 367 - } 368 + 369 + g_return_val_if_fail (msgport != NULL, NULL); 370 + 371 + g_async_queue_lock (msgport->queue); 372 + 373 + /* check the cache first */ 374 + if (msgport->cache != NULL) { 375 + msg = msgport->cache; 376 + msgport->cache = NULL; 377 + g_async_queue_unlock (msgport->queue); 378 + return msg; 379 + } 380 + 381 + msg = g_async_queue_try_pop_unlocked (msgport->queue); 382 + 383 + if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE) 384 + msgport_sync_with_pipe (msgport->pipe[0]); 385 #ifdef HAVE_NSS 386 - if (mp->prpipe.fd.read != NULL) { 387 - do { 388 - n = PR_Read (mp->prpipe.fd.read, dummy, 1); 389 - } while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR); 390 - } 391 + if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) 392 + msgport_sync_with_prpipe (msgport->prpipe[0]); 393 #endif 394 - } 395 - m(printf("get: message = %p\n", msg)); 396 - g_mutex_unlock(mp->lock); 397 + 398 + g_async_queue_unlock (msgport->queue); 399 400 return msg; 401 } 402 403 -void e_msgport_reply(EMsg *msg) 404 +void 405 +e_msgport_reply (EMsg *msg) 406 { 407 - if (msg->reply_port) { 408 - e_msgport_put(msg->reply_port, msg); 409 - } 410 + g_return_if_fail (msg != NULL); 411 + 412 + if (msg->reply_port) 413 + e_msgport_put (msg->reply_port, msg); 414 + 415 /* else lost? */ 416 } 417 418 @@ -1099,7 +1137,7 @@ void e_thread_put(EThread *e, EMsg *msg) 419 switch(e->type) { 420 case E_THREAD_QUEUE: 421 /* if the queue is full, lose this new addition */ 422 - if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { 423 + if (g_async_queue_length(e->server_port->queue) < e->queue_limit) { 424 e_msgport_put(e->server_port, msg); 425 } else { 426 printf("queue limit reached, dropping new message\n"); 427 @@ -1108,7 +1146,7 @@ void e_thread_put(EThread *e, EMsg *msg) 428 break; 429 case E_THREAD_DROP: 430 /* if the queue is full, lose the oldest (unprocessed) message */ 431 - if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { 432 + if (g_async_queue_length(e->server_port->queue) < e->queue_limit) { 433 e_msgport_put(e->server_port, msg); 434 } else { 435 printf("queue limit reached, dropping old message\n"); 436 Index: libedataserver/e-msgport.h 437 =================================================================== 438 RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.h,v 439 retrieving revision 1.3 440 diff -u -p -r1.3 e-msgport.h 441 --- libedataserver/e-msgport.h 3 Dec 2004 03:33:06 -0000 1.3 442 +++ libedataserver/e-msgport.h 21 Sep 2006 16:49:41 -0000 443 @@ -56,6 +56,7 @@ typedef struct _EMsgPort EMsgPort; 444 typedef struct _EMsg { 445 EDListNode ln; 446 EMsgPort *reply_port; 447 + gint flags; 448 } EMsg; 449 450 EMsgPort *e_msgport_new(void); 451 452