root/fs/nfs/rpcsock.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpc_insque
  2. rpc_remque
  3. rpc_sendmsg
  4. rpc_recvmsg
  5. rpc_select
  6. rpc_reserve
  7. rpc_release
  8. rpc_cwnd_adjust
  9. rpc_send_check
  10. rpc_send
  11. rpc_transmit
  12. rpc_grok
  13. rpc_recv
  14. rpc_doio
  15. rpc_call
  16. rpc_makesock
  17. rpc_closesock

   1 /*
   2  *  linux/fs/nfs/rpcsock.c
   3  *
   4  *  This is a generic RPC call interface for datagram sockets that is able
   5  *  to place several concurrent RPC requests at the same time. It works like
   6  *  this:
   7  *
   8  *  -   When a process places a call, it allocates a request slot if
   9  *      one is available. Otherwise, it sleeps on the backlog queue
  10  *      (rpc_reserve).
  11  *  -   Then, the message is transmitted via rpc_send (exported by name of
  12  *      rpc_transmit).
  13  *  -   Finally, the process waits for the call to complete (rpc_doio):
  14  *      The first process on the receive queue waits for the next RPC packet,
  15  *      and peeks at the XID. If it finds a matching request, it receives
  16  *      the datagram on behalf of that process and wakes it up. Otherwise,
  17  *      the datagram is discarded.
  18  *  -   If the process having received the datagram was the first one on
  19  *      the receive queue, it wakes up the next one to listen for replies.
  20  *  -   It then removes itself from the request queue (rpc_release).
  21  *      If there are more callers waiting on the backlog queue, they are
  22  *      woken up, too.
  23  *
  24  * Mar 1996:
  25  *  -   Split up large functions into smaller chunks as per Linus' coding
  26  *      style. Found an interesting bug this way, too.
  27  *  -   Added entry points for nfsiod.
  28  *
  29  *  Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
  30  */
  31 
  32 #include <linux/types.h>
  33 #include <linux/malloc.h>
  34 #include <linux/sched.h>
  35 #include <linux/nfs_fs.h>
  36 #include <linux/errno.h>
  37 #include <linux/socket.h>
  38 #include <linux/fcntl.h>
  39 #include <linux/in.h>
  40 #include <linux/net.h>
  41 #include <linux/mm.h>
  42 #include <linux/rpcsock.h>
  43 
  44 #include <linux/udp.h>
  45 #include <net/sock.h>
  46 
  47 #include <asm/segment.h>
  48 
  49 #define msleep(sec)     { current->timeout = sec * HZ / 1000; \
  50                           current->state = TASK_INTERRUPTIBLE; \
  51                           schedule(); \
  52                         }
  53 
  54 #undef DEBUG_RPC
  55 #ifdef DEBUG_RPC                        
  56 #define dprintk(args...)        printk(## args)
  57 #else
  58 #define dprintk(args...)
  59 #endif
  60 
  61 
  62 /*
  63  * Insert new request into wait list. We make sure list is sorted by
  64  * increasing timeout value.
  65  */
  66 static inline void
  67 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  68 {
  69         struct rpc_wait *next = rsock->pending;
  70 
  71         slot->w_next = next;
  72         slot->w_prev = NULL;
  73         if (next)
  74                 next->w_prev = slot;
  75         rsock->pending = slot;
  76         slot->w_queued = 1;
  77 
  78         dprintk("RPC: inserted %p into queue\n", slot);
  79 }
  80 
  81 /*
  82  * Remove request from request queue
  83  */
  84 static inline void
  85 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  86 {
  87         struct rpc_wait *prev = slot->w_prev,
  88                         *next = slot->w_next;
  89 
  90         if (prev != NULL)
  91                 prev->w_next = next;
  92         else
  93                 rsock->pending = next;
  94         if (next != NULL)
  95                 next->w_prev = prev;
  96 
  97         slot->w_queued = 0;
  98         dprintk("RPC: removed %p from queue, head now %p.\n",
  99                         slot, rsock->pending);
 100 }
 101 
 102 /*
 103  * Write data to socket.
 104  */
 105 static inline int
 106 rpc_sendmsg(struct rpc_sock *rsock, struct iovec *iov, int nr, int len,
     /* [previous][next][first][last][top][bottom][index][help] */
 107                                 struct sockaddr *sap, int salen)
 108 {
 109         struct socket   *sock = rsock->sock;
 110         struct msghdr   msg;
 111         unsigned long   oldfs;
 112         int             result;
 113 
 114         msg.msg_iov     = iov;
 115         msg.msg_iovlen  = nr;
 116         msg.msg_name    = sap;
 117         msg.msg_namelen = salen;
 118         msg.msg_accrights = NULL;
 119 
 120         oldfs = get_fs();
 121         set_fs(get_ds());
 122         result = sock->ops->sendmsg(sock, &msg, len, 0, 0);
 123         set_fs(oldfs);
 124 
 125         dprintk("RPC: rpc_sendmsg(iov %p, len %d) = %d\n", iov, len, result);
 126         return result;
 127 }
 128 /*
 129  * Read data from socket
 130  */
 131 static inline int
 132 rpc_recvmsg(struct rpc_sock *rsock, struct iovec *iov,
     /* [previous][next][first][last][top][bottom][index][help] */
 133                         int nr, int len, int flags)
 134 {
 135         struct socket   *sock = rsock->sock;
 136         struct sockaddr sa;
 137         struct msghdr   msg;
 138         unsigned long   oldfs;
 139         int             result, alen;
 140 
 141         msg.msg_iov     = iov;
 142         msg.msg_iovlen  = nr;
 143         msg.msg_name    = &sa;
 144         msg.msg_namelen = sizeof(sa);
 145         msg.msg_accrights = NULL;
 146 
 147         oldfs = get_fs();
 148         set_fs(get_ds());
 149         result = sock->ops->recvmsg(sock, &msg, len, 1, flags, &alen);
 150         set_fs(oldfs);
 151 
 152         dprintk("RPC: rpc_recvmsg(iov %p, len %d) = %d\n", iov, len, result);
 153         return result;
 154 }
 155 
 156 /*
 157  * This code is slightly complicated. Since the networking code does not
 158  * honor the current->timeout value, we have to select on the socket.
 159  */
 160 static inline int
 161 rpc_select(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 162 {
 163         struct select_table_entry entry;
 164         struct file     *file = rsock->file;
 165         select_table    wait_table;
 166 
 167         dprintk("RPC: selecting on socket...\n");
 168         wait_table.nr = 0;
 169         wait_table.entry = &entry;
 170         current->state = TASK_INTERRUPTIBLE;
 171         if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
 172          && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
 173                 schedule();
 174                 remove_wait_queue(entry.wait_address, &entry.wait);
 175                 current->state = TASK_RUNNING;
 176                 if (current->signal & ~current->blocked)
 177                         return -ERESTARTSYS;
 178                 if (current->timeout == 0)
 179                         return -ETIMEDOUT;
 180         } else if (wait_table.nr)
 181                 remove_wait_queue(entry.wait_address, &entry.wait);
 182         current->state = TASK_RUNNING;
 183         dprintk("RPC: ...Okay, there appears to be some data.\n");
 184         return 0;
 185 }
 186 
 187 /*
 188  * Reserve an RPC call slot. nocwait determines whether we wait in case
 189  * of congestion or not.
 190  */
 191 int
 192 rpc_reserve(struct rpc_sock *rsock, struct rpc_ioreq *req, int nocwait)
     /* [previous][next][first][last][top][bottom][index][help] */
 193 {
 194         struct rpc_wait *slot;
 195 
 196         req->rq_slot = NULL;
 197 
 198         while (!(slot = rsock->free) || rsock->cong >= rsock->cwnd) {
 199                 if (nocwait) {
 200                         current->timeout = 0;
 201                         return -ENOBUFS;
 202                 }
 203                 dprintk("RPC: rpc_reserve waiting on backlog\n");
 204                 interruptible_sleep_on(&rsock->backlog);
 205                 if (current->timeout == 0)
 206                         return -ETIMEDOUT;
 207                 if (current->signal & ~current->blocked)
 208                         return -ERESTARTSYS;
 209                 if (rsock->shutdown)
 210                         return -EIO;
 211         }
 212 
 213         rsock->free = slot->w_next;
 214         rsock->cong += RPC_CWNDSCALE;   /* bump congestion value */
 215 
 216         slot->w_queued = 0;
 217         slot->w_gotit = 0;
 218         slot->w_req = req;
 219 
 220         dprintk("RPC: reserved slot %p\n", slot);
 221         req->rq_slot = slot;
 222         return 0;
 223 }
 224 
 225 /*
 226  * Release an RPC call slot
 227  */
 228 void
 229 rpc_release(struct rpc_sock *rsock, struct rpc_ioreq *req)
     /* [previous][next][first][last][top][bottom][index][help] */
 230 {
 231         struct rpc_wait *slot = req->rq_slot;
 232 
 233         if (slot != NULL) {
 234                 dprintk("RPC: release slot %p\n", slot);
 235 
 236                 /* Wake up the next receiver */
 237                 if (slot == rsock->pending && slot->w_next != NULL)
 238                         wake_up(&slot->w_next->w_wait);
 239 
 240                 /* remove slot from queue of pending */
 241                 if (slot->w_queued)
 242                         rpc_remque(rsock, slot);
 243                 slot->w_next = rsock->free;
 244                 rsock->free = slot;
 245 
 246                 /* decrease congestion value */
 247                 rsock->cong -= RPC_CWNDSCALE;
 248                 if (rsock->cong < rsock->cwnd && rsock->backlog)
 249                         wake_up(&rsock->backlog);
 250                 if (rsock->shutdown)
 251                         wake_up(&rsock->shutwait);
 252 
 253                 req->rq_slot = NULL;
 254         }
 255 }
 256 
 257 /*
 258  * Adjust RPC congestion window
 259  */
 260 static void
 261 rpc_cwnd_adjust(struct rpc_sock *rsock, int timeout)
     /* [previous][next][first][last][top][bottom][index][help] */
 262 {
 263         unsigned long   cwnd = rsock->cwnd;
 264 
 265         if (!timeout) {
 266                 if (rsock->cong >= cwnd) {
 267                         /* The (cwnd >> 1) term makes sure
 268                          * the result gets rounded properly. */
 269                         cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE +
 270                                         (cwnd >> 1)) / cwnd;
 271                         if (cwnd > RPC_MAXCWND)
 272                                 cwnd = RPC_MAXCWND;
 273                 }
 274         } else {
 275                 if ((cwnd >>= 1) < RPC_CWNDSCALE)
 276                         cwnd = RPC_CWNDSCALE;
 277                 dprintk("RPC: cwnd decrease %08lx\n", cwnd);
 278         }
 279         dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx\n",
 280                         rsock->cong, rsock->cwnd, cwnd);
 281 
 282         rsock->cwnd = cwnd;
 283 }
 284 
 285 static inline void
 286 rpc_send_check(char *where, u32 *ptr)
     /* [previous][next][first][last][top][bottom][index][help] */
 287 {
 288         if (ptr[1] != htonl(RPC_CALL) || ptr[2] != htonl(RPC_VERSION)) {
 289                 printk("RPC: %s sending evil packet:\n"
 290                        "     %08x %08x %08x %08x %08x %08x %08x %08x\n",
 291                        where,
 292                        ptr[0], ptr[1], ptr[2], ptr[3],
 293                        ptr[4], ptr[5], ptr[6], ptr[7]);
 294         }
 295 }
 296 
 297 /*
 298  * Place the actual RPC call.
 299  * We have to copy the iovec because sendmsg fiddles with its contents.
 300  */
 301 static inline int
 302 rpc_send(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
 303 {
 304         struct rpc_ioreq *req = slot->w_req;
 305         struct iovec    iov[MAX_IOVEC];
 306 
 307         if (rsock->shutdown)
 308                 return -EIO;
 309 
 310         memcpy(iov, req->rq_svec, req->rq_snr * sizeof(iov[0]));
 311         slot->w_xid = *(u32 *)(iov[0].iov_base);
 312         if (!slot->w_queued)
 313                 rpc_insque(rsock, slot);
 314 
 315         dprintk("rpc_send(%p, %x)\n", slot, slot->w_xid);
 316         rpc_send_check("rpc_send", (u32 *) req->rq_svec[0].iov_base);
 317         return rpc_sendmsg(rsock, iov, req->rq_snr, req->rq_slen,
 318                                 req->rq_addr, req->rq_alen);
 319 }
 320 
 321 /*
 322  * This is the same as rpc_send but for the functions exported to nfsiod
 323  */
 324 int
 325 rpc_transmit(struct rpc_sock *rsock, struct rpc_ioreq *req)
     /* [previous][next][first][last][top][bottom][index][help] */
 326 {
 327         rpc_send_check("rpc_transmit", (u32 *) req->rq_svec[0].iov_base);
 328         return rpc_send(rsock, req->rq_slot);
 329 }
 330 
 331 /*
 332  * Receive and dispatch a single reply
 333  */
 334 static inline int
 335 rpc_grok(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 336 {
 337         struct rpc_wait *rovr;
 338         struct rpc_ioreq *req;
 339         struct iovec    iov[MAX_IOVEC];
 340         u32             xid;
 341         int             safe, result;
 342 
 343         iov[0].iov_base = (void *) &xid;
 344         iov[0].iov_len  = sizeof(xid);
 345         result = rpc_recvmsg(rsock, iov, 1, sizeof(xid), MSG_PEEK);
 346 
 347         if (result < 0) {
 348                 switch (-result) {
 349                 case EAGAIN: case ECONNREFUSED:
 350                         return 0;
 351                 case ERESTARTSYS:
 352                         return result;
 353                 default:
 354                         dprintk("rpc_grok: recv error = %d\n", result);
 355                 }
 356         }
 357         if (result < 4) {
 358                 printk(KERN_WARNING "RPC: impossible RPC reply size %d\n",
 359                                                 result);
 360                 return 0;
 361         }
 362 
 363         dprintk("RPC: rpc_grok: got xid %08lx\n", (unsigned long) xid);
 364 
 365         /* Look for the caller */
 366         safe = 0;
 367         for (rovr = rsock->pending; rovr; rovr = rovr->w_next) {
 368                 if (rovr->w_xid == xid)
 369                         break;
 370                 if (safe++ > RPC_MAXREQS) {
 371                         printk(KERN_WARNING "RPC: loop in request Q!!\n");
 372                         rovr = NULL;
 373                         break;
 374                 }
 375         }
 376 
 377         if (!rovr || rovr->w_gotit) {
 378                 /* discard dgram */
 379                 dprintk("RPC: rpc_grok: %s.\n",
 380                         rovr? "duplicate reply" : "bad XID");
 381                 iov[0].iov_base = (void *) &xid;
 382                 iov[0].iov_len  = sizeof(xid);
 383                 rpc_recvmsg(rsock, iov, 1, sizeof(xid), 0);
 384                 return 0;
 385         }
 386         req = rovr->w_req;
 387 
 388         /* Now receive the reply... Copy the iovec first because of 
 389          * memcpy_fromiovec fiddling. */
 390         memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
 391         result = rpc_recvmsg(rsock, iov, req->rq_rnr, req->rq_rlen, 0);
 392         rovr->w_result = result;
 393         rovr->w_gotit = 1;
 394 
 395         /* ... and wake up the process */
 396         wake_up(&rovr->w_wait);
 397 
 398         return result;
 399 }
 400 
 401 /*
 402  * Wait for the reply to our call.
 403  */
 404 static int
 405 rpc_recv(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
 406 {
 407         int     result;
 408 
 409         do {
 410                 /* If we are not the receiver, wait on the sidelines */
 411                 dprintk("RPC: rpc_recv TP1\n");
 412                 while (rsock->pending != slot) {
 413                         if (!slot->w_gotit)
 414                                 interruptible_sleep_on(&slot->w_wait);
 415                         if (slot->w_gotit)
 416                                 return slot->w_result; /* quite important */
 417                         if (current->signal & ~current->blocked)
 418                                 return -ERESTARTSYS;
 419                         if (rsock->shutdown)
 420                                 return -EIO;
 421                         if (current->timeout == 0)
 422                                 return -ETIMEDOUT;
 423                 }
 424 
 425                 /* Wait for data to arrive */
 426                 if ((result = rpc_select(rsock)) < 0) {
 427                         dprintk("RPC: select error = %d\n", result);
 428                         return result;
 429                 }
 430 
 431                 /* Receive and dispatch */
 432                 if ((result = rpc_grok(rsock)) < 0)
 433                         return result;
 434         } while (current->timeout && !slot->w_gotit);
 435 
 436         return slot->w_gotit? slot->w_result : -ETIMEDOUT;
 437 }
 438 
 439 /*
 440  * Generic RPC call routine. This handles retries and timeouts etc pp.
 441  *
 442  * If sent is non-null, it assumes the called has already sent out the
 443  * message, so it won't need to do so unless a timeout occurs.
 444  */
 445 int
 446 rpc_doio(struct rpc_sock *rsock, struct rpc_ioreq *req,
     /* [previous][next][first][last][top][bottom][index][help] */
 447                         struct rpc_timeout *strategy, int sent)
 448 {
 449         struct rpc_wait *slot;
 450         int             result, retries;
 451         unsigned long   timeout;
 452 
 453         timeout = strategy->to_initval;
 454         retries = 0;
 455         slot = req->rq_slot;
 456 
 457         do {
 458                 dprintk("RPC: rpc_doio: TP1 (req %p)\n", req);
 459                 current->timeout = jiffies + timeout;
 460                 if (slot == NULL) {
 461                         result = rpc_reserve(rsock, req, 0);
 462                         if (result == -ETIMEDOUT)
 463                                 goto timedout;
 464                         if (result < 0)
 465                                 break;
 466                         slot = req->rq_slot;
 467                         rpc_send_check("rpc_doio",
 468                                 (u32 *) req->rq_svec[0].iov_base);
 469                         rpc_insque(rsock, slot);
 470                 }
 471 
 472                 /* This check is for loopback NFS. Sometimes replies come
 473                  * in before biod has called rpc_doio... */
 474                 if (slot->w_gotit) {
 475                         result = slot->w_result;
 476                         break;
 477                 }
 478 
 479                 dprintk("RPC: rpc_doio: TP2\n");
 480                 if (sent || (result = rpc_send(rsock, slot)) >= 0) {
 481                         result = rpc_recv(rsock, slot);
 482                         sent = 0;
 483                 }
 484 
 485                 if (result != -ETIMEDOUT) {
 486                         /* dprintk("RPC: rpc_recv returned %d\n", result); */
 487                         rpc_cwnd_adjust(rsock, 0);
 488                         break;
 489                 }
 490 
 491                 rpc_cwnd_adjust(rsock, 1);
 492 
 493 timedout:
 494                 dprintk("RPC: rpc_recv returned timeout.\n");
 495                 if (strategy->to_exponential)
 496                         timeout <<= 1;
 497                 else
 498                         timeout += strategy->to_increment;
 499                 if (strategy->to_maxval && timeout >= strategy->to_maxval)
 500                         timeout = strategy->to_maxval;
 501                 if (strategy->to_retries && ++retries >= strategy->to_retries)
 502                         break;
 503         } while (1);
 504 
 505         dprintk("RPC: rpc_doio: TP3\n");
 506         current->timeout = 0;
 507         return result;
 508 }
 509 
 510 /*
 511  */
 512 int
 513 rpc_call(struct rpc_sock *rsock, struct rpc_ioreq *req,
     /* [previous][next][first][last][top][bottom][index][help] */
 514                         struct rpc_timeout *strategy)
 515 {
 516         int     result;
 517 
 518         result = rpc_doio(rsock, req, strategy, 0);
 519         if (req->rq_slot == NULL)
 520                 printk(KERN_WARNING "RPC: bad: rq_slot == NULL\n");
 521         rpc_release(rsock, req);
 522         return result;
 523 }
 524 
 525 struct rpc_sock *
 526 rpc_makesock(struct file *file)
     /* [previous][next][first][last][top][bottom][index][help] */
 527 {
 528         struct rpc_sock *rsock;
 529         struct socket   *sock;
 530         struct sock     *sk;
 531         struct rpc_wait *slot;
 532         int             i;
 533 
 534         dprintk("RPC: make RPC socket...\n");
 535         sock = &file->f_inode->u.socket_i;
 536         if (sock->type != SOCK_DGRAM || sock->ops->family != AF_INET) {
 537                 printk(KERN_WARNING "RPC: only UDP sockets supported\n");
 538                 return NULL;
 539         }
 540         sk = (struct sock *) sock->data;
 541 
 542         if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 543                 return NULL;
 544         memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 545 
 546         rsock->sock = sock;
 547         rsock->inet = sk;
 548         rsock->file = file;
 549         rsock->cwnd = RPC_INITCWND;
 550 
 551         dprintk("RPC: slots %p, %p, ...\n", rsock->waiting, rsock->waiting + 1);
 552         rsock->free = rsock->waiting;
 553         for (i = 0, slot = rsock->waiting; i < RPC_MAXREQS-1; i++, slot++)
 554                 slot->w_next = slot + 1;
 555         slot->w_next = NULL;
 556 
 557         dprintk("RPC: made socket %p\n", rsock);
 558         return rsock;
 559 }
 560 
 561 int
 562 rpc_closesock(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 563 {
 564         unsigned long   t0 = jiffies;
 565 
 566         rsock->shutdown = 1;
 567         while (rsock->pending || rsock->backlog) {
 568                 interruptible_sleep_on(&rsock->shutwait);
 569                 if (current->signal & ~current->blocked)
 570                         return -EINTR;
 571 #if 1
 572                 if (t0 && t0 - jiffies > 60 * HZ) {
 573                         printk(KERN_WARNING "RPC: hanging in rpc_closesock.\n");
 574                         t0 = 0;
 575                 }
 576 #endif
 577         }
 578 
 579         kfree(rsock);
 580         return 0;
 581 }

/* [previous][next][first][last][top][bottom][index][help] */