root/fs/nfs/rpcsock.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpc_insque
  2. rpc_remque
  3. rpc_sendmsg
  4. rpc_recvmsg
  5. rpc_select
  6. rpc_reserve
  7. rpc_release
  8. rpc_cwnd_adjust
  9. rpc_send_check
  10. rpc_send
  11. rpc_transmit
  12. rpc_grok
  13. rpc_recv
  14. rpc_doio
  15. rpc_call
  16. rpc_makesock
  17. rpc_closesock

   1 /*
   2  *  linux/fs/nfs/rpcsock.c
   3  *
   4  *  This is a generic RPC call interface for datagram sockets that is able
   5  *  to place several concurrent RPC requests at the same time. It works like
   6  *  this:
   7  *
   8  *  -   When a process places a call, it allocates a request slot if
   9  *      one is available. Otherwise, it sleeps on the backlog queue
  10  *      (rpc_reserve).
  11  *  -   Then, the message is transmitted via rpc_send (exported by name of
  12  *      rpc_transmit).
  13  *  -   Finally, the process waits for the call to complete (rpc_doio):
  14  *      The first process on the receive queue waits for the next RPC packet,
  15  *      and peeks at the XID. If it finds a matching request, it receives
  16  *      the datagram on behalf of that process and wakes it up. Otherwise,
  17  *      the datagram is discarded.
  18  *  -   If the process having received the datagram was the first one on
  19  *      the receive queue, it wakes up the next one to listen for replies.
  20  *  -   It then removes itself from the request queue (rpc_release).
  21  *      If there are more callers waiting on the backlog queue, they are
  22  *      woken up, too.
  23  *
  24  * Mar 1996:
  25  *  -   Split up large functions into smaller chunks as per Linus' coding
  26  *      style. Found an interesting bug this way, too.
  27  *  -   Added entry points for nfsiod.
  28  *
  29  *  Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
  30  */
  31 
  32 #include <linux/types.h>
  33 #include <linux/malloc.h>
  34 #include <linux/sched.h>
  35 #include <linux/nfs_fs.h>
  36 #include <linux/errno.h>
  37 #include <linux/socket.h>
  38 #include <linux/fcntl.h>
  39 #include <linux/in.h>
  40 #include <linux/net.h>
  41 #include <linux/mm.h>
  42 #include <linux/rpcsock.h>
  43 
  44 #include <linux/udp.h>
  45 #include <net/sock.h>
  46 
  47 #include <asm/segment.h>
  48 
  49 #define msleep(sec)     { current->timeout = sec * HZ / 1000; \
  50                           current->state = TASK_INTERRUPTIBLE; \
  51                           schedule(); \
  52                         }
  53 
  54 #undef DEBUG_RPC
  55 #ifdef DEBUG_RPC                        
  56 #define dprintk(args...)        printk(## args)
  57 #else
  58 #define dprintk(args...)
  59 #endif
  60 
  61 
  62 /*
  63  * Insert new request into wait list. We make sure list is sorted by
  64  * increasing timeout value.
  65  */
  66 static inline void
  67 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  68 {
  69         struct rpc_wait *next = rsock->pending;
  70 
  71         slot->w_next = next;
  72         slot->w_prev = NULL;
  73         if (next)
  74                 next->w_prev = slot;
  75         rsock->pending = slot;
  76         slot->w_queued = 1;
  77 
  78         dprintk("RPC: inserted %p into queue\n", slot);
  79 }
  80 
  81 /*
  82  * Remove request from request queue
  83  */
  84 static inline void
  85 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  86 {
  87         struct rpc_wait *prev = slot->w_prev,
  88                         *next = slot->w_next;
  89 
  90         if (prev != NULL)
  91                 prev->w_next = next;
  92         else
  93                 rsock->pending = next;
  94         if (next != NULL)
  95                 next->w_prev = prev;
  96 
  97         slot->w_queued = 0;
  98         dprintk("RPC: removed %p from queue, head now %p.\n",
  99                         slot, rsock->pending);
 100 }
 101 
 102 /*
 103  * Write data to socket.
 104  */
 105 static inline int
 106 rpc_sendmsg(struct rpc_sock *rsock, struct iovec *iov, int nr, int len,
     /* [previous][next][first][last][top][bottom][index][help] */
 107                                 struct sockaddr *sap, int salen)
 108 {
 109         struct socket   *sock = rsock->sock;
 110         struct msghdr   msg;
 111         unsigned long   oldfs;
 112         int             result;
 113 
 114         msg.msg_iov     = iov;
 115         msg.msg_iovlen  = nr;
 116         msg.msg_name    = sap;
 117         msg.msg_namelen = salen;
 118         msg.msg_accrights = NULL;
 119 
 120         oldfs = get_fs();
 121         set_fs(get_ds());
 122         result = sock->ops->sendmsg(sock, &msg, len, 0, 0);
 123         set_fs(oldfs);
 124 
 125         dprintk("RPC: rpc_sendmsg(iov %p, len %d) = %d\n", iov, len, result);
 126         return result;
 127 }
 128 /*
 129  * Read data from socket
 130  */
 131 static inline int
 132 rpc_recvmsg(struct rpc_sock *rsock, struct iovec *iov,
     /* [previous][next][first][last][top][bottom][index][help] */
 133                         int nr, int len, int flags)
 134 {
 135         struct socket   *sock = rsock->sock;
 136         struct sockaddr sa;
 137         struct msghdr   msg;
 138         unsigned long   oldfs;
 139         int             result, alen;
 140 
 141         msg.msg_iov     = iov;
 142         msg.msg_iovlen  = nr;
 143         msg.msg_name    = &sa;
 144         msg.msg_namelen = sizeof(sa);
 145         msg.msg_accrights = NULL;
 146 
 147         oldfs = get_fs();
 148         set_fs(get_ds());
 149         result = sock->ops->recvmsg(sock, &msg, len, 1, flags, &alen);
 150         set_fs(oldfs);
 151 
 152         dprintk("RPC: rpc_recvmsg(iov %p, len %d) = %d\n", iov, len, result);
 153         return result;
 154 }
 155 
 156 /*
 157  * This code is slightly complicated. Since the networking code does not
 158  * honor the current->timeout value, we have to select on the socket.
 159  */
 160 static inline int
 161 rpc_select(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 162 {
 163         struct select_table_entry entry;
 164         struct file     *file = rsock->file;
 165         select_table    wait_table;
 166 
 167         dprintk("RPC: selecting on socket...\n");
 168         wait_table.nr = 0;
 169         wait_table.entry = &entry;
 170         current->state = TASK_INTERRUPTIBLE;
 171         if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
 172          && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
 173                 schedule();
 174                 remove_wait_queue(entry.wait_address, &entry.wait);
 175                 current->state = TASK_RUNNING;
 176                 if (current->signal & ~current->blocked)
 177                         return -ERESTARTSYS;
 178                 if (current->timeout == 0)
 179                         return -ETIMEDOUT;
 180         } else if (wait_table.nr)
 181                 remove_wait_queue(entry.wait_address, &entry.wait);
 182         current->state = TASK_RUNNING;
 183         dprintk("RPC: ...Okay, there appears to be some data.\n");
 184         return 0;
 185 }
 186 
 187 /*
 188  * Reserve an RPC call slot. nocwait determines whether we wait in case
 189  * of congestion or not.
 190  */
 191 int
 192 rpc_reserve(struct rpc_sock *rsock, struct rpc_ioreq *req, int nocwait)
     /* [previous][next][first][last][top][bottom][index][help] */
 193 {
 194         struct rpc_wait *slot;
 195 
 196         req->rq_slot = NULL;
 197 
 198         while (!(slot = rsock->free) || rsock->cong >= rsock->cwnd) {
 199                 if (nocwait) {
 200                         current->timeout = 0;
 201                         return -ENOBUFS;
 202                 }
 203                 dprintk("RPC: rpc_reserve waiting on backlog\n");
 204                 interruptible_sleep_on(&rsock->backlog);
 205                 if (current->timeout == 0)
 206                         return -ETIMEDOUT;
 207                 if (current->signal & ~current->blocked)
 208                         return -ERESTARTSYS;
 209                 if (rsock->shutdown)
 210                         return -EIO;
 211         }
 212 
 213         rsock->free = slot->w_next;
 214         rsock->cong += RPC_CWNDSCALE;   /* bump congestion value */
 215 
 216         slot->w_queued = 0;
 217         slot->w_gotit = 0;
 218         slot->w_req = req;
 219 
 220         dprintk("RPC: reserved slot %p\n", slot);
 221         req->rq_slot = slot;
 222         return 0;
 223 }
 224 
 225 /*
 226  * Release an RPC call slot
 227  */
 228 void
 229 rpc_release(struct rpc_sock *rsock, struct rpc_ioreq *req)
     /* [previous][next][first][last][top][bottom][index][help] */
 230 {
 231         struct rpc_wait *slot = req->rq_slot;
 232 
 233         if (slot != NULL) {
 234                 dprintk("RPC: release slot %p\n", slot);
 235 
 236                 /* Wake up the next receiver */
 237                 if (slot == rsock->pending && slot->w_next != NULL)
 238                         wake_up(&slot->w_next->w_wait);
 239 
 240                 /* remove slot from queue of pending */
 241                 if (slot->w_queued)
 242                         rpc_remque(rsock, slot);
 243                 slot->w_next = rsock->free;
 244                 rsock->free = slot;
 245 
 246                 /* decrease congestion value */
 247                 rsock->cong -= RPC_CWNDSCALE;
 248                 if (rsock->cong < rsock->cwnd && rsock->backlog)
 249                         wake_up(&rsock->backlog);
 250                 if (rsock->shutdown)
 251                         wake_up(&rsock->shutwait);
 252 
 253                 req->rq_slot = NULL;
 254         }
 255 }
 256 
 257 /*
 258  * Adjust RPC congestion window
 259  */
 260 static void
 261 rpc_cwnd_adjust(struct rpc_sock *rsock, int timeout)
     /* [previous][next][first][last][top][bottom][index][help] */
 262 {
 263         unsigned long   cwnd = rsock->cwnd;
 264 
 265         if (!timeout) {
 266                 if (rsock->cong >= cwnd) {
 267                         /* The (cwnd >> 1) term makes sure
 268                          * the result gets rounded properly. */
 269                         cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE +
 270                                         (cwnd >> 1)) / cwnd;
 271                         if (cwnd > RPC_MAXCWND)
 272                                 cwnd = RPC_MAXCWND;
 273                 }
 274         } else {
 275                 if ((cwnd >>= 1) < RPC_CWNDSCALE)
 276                         cwnd = RPC_CWNDSCALE;
 277                 dprintk("RPC: cwnd decrease %08lx\n", cwnd);
 278         }
 279         dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx\n",
 280                         rsock->cong, rsock->cwnd, cwnd);
 281 
 282         rsock->cwnd = cwnd;
 283 }
 284 
 285 static inline void
 286 rpc_send_check(char *where, u32 *ptr)
     /* [previous][next][first][last][top][bottom][index][help] */
 287 {
 288         if (ptr[1] != 0 || ptr[2] != htonl(2) || ptr[3] != htonl(100003)) {
 289                 printk("RPC: %s sending evil packet:\n"
 290                        "     %08x %08x %08x %08x %08x %08x %08x %08x\n",
 291                        where,
 292                        ptr[0], ptr[1], ptr[2], ptr[3],
 293                        ptr[4], ptr[5], ptr[6], ptr[7]);
 294         }
 295 }
 296 
 297 /*
 298  * Place the actual RPC call.
 299  * We have to copy the iovec because sendmsg fiddles with its contents.
 300  */
 301 static inline int
 302 rpc_send(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
 303 {
 304         struct rpc_ioreq *req = slot->w_req;
 305         struct iovec    iov[MAX_IOVEC];
 306 
 307         if (rsock->shutdown)
 308                 return -EIO;
 309 
 310         memcpy(iov, req->rq_svec, req->rq_snr * sizeof(iov[0]));
 311         slot->w_xid = *(u32 *)(iov[0].iov_base);
 312         if (!slot->w_queued)
 313                 rpc_insque(rsock, slot);
 314 
 315         dprintk("rpc_send(%p, %x)\n", slot, slot->w_xid);
 316         rpc_send_check("rpc_send", (u32 *) req->rq_svec[0].iov_base);
 317         return rpc_sendmsg(rsock, iov, req->rq_snr, req->rq_slen,
 318                                 req->rq_addr, req->rq_alen);
 319 }
 320 
 321 /*
 322  * This is the same as rpc_send but for the functions exported to nfsiod
 323  */
 324 int
 325 rpc_transmit(struct rpc_sock *rsock, struct rpc_ioreq *req)
     /* [previous][next][first][last][top][bottom][index][help] */
 326 {
 327         rpc_send_check("rpc_transmit", (u32 *) req->rq_svec[0].iov_base);
 328         return rpc_send(rsock, req->rq_slot);
 329 }
 330 
 331 /*
 332  * Receive and dispatch a single reply
 333  */
 334 static inline int
 335 rpc_grok(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 336 {
 337         struct rpc_wait *rovr;
 338         struct rpc_ioreq *req;
 339         struct iovec    iov[MAX_IOVEC];
 340         u32             xid;
 341         int             safe, result;
 342 
 343         iov[0].iov_base = (void *) &xid;
 344         iov[0].iov_len  = sizeof(xid);
 345         result = rpc_recvmsg(rsock, iov, 1, sizeof(xid), MSG_PEEK);
 346 
 347         if (result < 0) {
 348                 switch (-result) {
 349                 case EAGAIN: case ECONNREFUSED:
 350                         return 0;
 351                 case ERESTARTSYS:
 352                         return result;
 353                 default:
 354                         dprintk("rpc_grok: recv error = %d\n", result);
 355                 }
 356         }
 357         if (result < 4) {
 358                 printk(KERN_WARNING "RPC: impossible RPC reply size %d\n",
 359                                                 result);
 360                 return 0;
 361         }
 362 
 363         dprintk("RPC: rpc_grok: got xid %08lx\n", (unsigned long) xid);
 364 
 365         /* Look for the caller */
 366         safe = 0;
 367         for (rovr = rsock->pending; rovr; rovr = rovr->w_next) {
 368                 if (rovr->w_xid == xid)
 369                         break;
 370                 if (safe++ > RPC_MAXREQS) {
 371                         printk(KERN_WARNING "RPC: loop in request Q!!\n");
 372                         rovr = NULL;
 373                         break;
 374                 }
 375         }
 376 
 377         if (!rovr || rovr->w_gotit) {
 378                 /* discard dgram */
 379                 dprintk("RPC: rpc_grok: %s.\n",
 380                         rovr? "duplicate reply" : "bad XID");
 381                 iov[0].iov_base = (void *) &xid;
 382                 iov[0].iov_len  = sizeof(xid);
 383                 rpc_recvmsg(rsock, iov, 1, sizeof(xid), 0);
 384                 return 0;
 385         }
 386         req = rovr->w_req;
 387 
 388         /* Now receive the reply... Copy the iovec first because of 
 389          * memcpy_fromiovec fiddling. */
 390         memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
 391         result = rpc_recvmsg(rsock, iov, req->rq_rnr, req->rq_rlen, 0);
 392         rovr->w_result = result;
 393         rovr->w_gotit = 1;
 394 
 395         /* ... and wake up the process */
 396         wake_up(&rovr->w_wait);
 397 
 398         return result;
 399 }
 400 
 401 /*
 402  * Wait for the reply to our call.
 403  */
 404 static int
 405 rpc_recv(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
 406 {
 407         int     result;
 408 
 409         do {
 410                 /* If we are not the receiver, wait on the sidelines */
 411                 dprintk("RPC: rpc_recv TP1\n");
 412                 while (rsock->pending != slot) {
 413                         if (!slot->w_gotit)
 414                                 interruptible_sleep_on(&slot->w_wait);
 415                         if (slot->w_gotit) {
 416                                 result = slot->w_result; /* quite important */
 417                                 return result;
 418                         }
 419                         if (current->signal & ~current->blocked)
 420                                 return -ERESTARTSYS;
 421                         if (rsock->shutdown)
 422                                 return -EIO;
 423                         if (current->timeout == 0)
 424                                 return -ETIMEDOUT;
 425                 }
 426 
 427                 /* Wait for data to arrive */
 428                 if ((result = rpc_select(rsock)) < 0) {
 429                         dprintk("RPC: select error = %d\n", result);
 430                         break;
 431                 }
 432 
 433                 /* Receive and dispatch */
 434                 if ((result = rpc_grok(rsock)) < 0)
 435                         break;
 436         } while (current->timeout && !slot->w_gotit);
 437 
 438         return slot->w_gotit? result : -ETIMEDOUT;
 439 }
 440 
 441 /*
 442  * Generic RPC call routine. This handles retries and timeouts etc pp.
 443  *
 444  * If sent is non-null, it assumes the called has already sent out the
 445  * message, so it won't need to do so unless a timeout occurs.
 446  */
 447 int
 448 rpc_doio(struct rpc_sock *rsock, struct rpc_ioreq *req,
     /* [previous][next][first][last][top][bottom][index][help] */
 449                         struct rpc_timeout *strategy, int sent)
 450 {
 451         struct rpc_wait *slot;
 452         int             result, retries;
 453         unsigned long   timeout;
 454 
 455         timeout = strategy->to_initval;
 456         retries = 0;
 457         slot = req->rq_slot;
 458 
 459         do {
 460                 dprintk("RPC: rpc_doio: TP1 (req %p)\n", req);
 461                 current->timeout = jiffies + timeout;
 462                 if (slot == NULL) {
 463                         result = rpc_reserve(rsock, req, 0);
 464                         if (result == -ETIMEDOUT)
 465                                 goto timedout;
 466                         if (result < 0)
 467                                 break;
 468                         slot = req->rq_slot;
 469                         rpc_send_check("rpc_doio",
 470                                 (u32 *) req->rq_svec[0].iov_base);
 471                         rpc_insque(rsock, slot);
 472                 }
 473 
 474                 /* This check is for loopback NFS. Sometimes replies come
 475                  * in before biod has called rpc_doio... */
 476                 if (slot->w_gotit) {
 477                         result = slot->w_result;
 478                         break;
 479                 }
 480 
 481                 dprintk("RPC: rpc_doio: TP2\n");
 482                 if (sent || (result = rpc_send(rsock, slot)) >= 0) {
 483                         result = rpc_recv(rsock, slot);
 484                         sent = 0;
 485                 }
 486 
 487                 if (result != -ETIMEDOUT) {
 488                         /* dprintk("RPC: rpc_recv returned %d\n", result); */
 489                         rpc_cwnd_adjust(rsock, 0);
 490                         break;
 491                 }
 492 
 493                 rpc_cwnd_adjust(rsock, 1);
 494 
 495 timedout:
 496                 dprintk("RPC: rpc_recv returned timeout.\n");
 497                 if (strategy->to_exponential)
 498                         timeout <<= 1;
 499                 else
 500                         timeout += strategy->to_increment;
 501                 if (strategy->to_maxval && timeout >= strategy->to_maxval)
 502                         timeout = strategy->to_maxval;
 503                 if (strategy->to_retries && ++retries >= strategy->to_retries)
 504                         break;
 505         } while (1);
 506 
 507         dprintk("RPC: rpc_doio: TP3\n");
 508         current->timeout = 0;
 509         return result;
 510 }
 511 
 512 /*
 513  */
 514 int
 515 rpc_call(struct rpc_sock *rsock, struct rpc_ioreq *req,
     /* [previous][next][first][last][top][bottom][index][help] */
 516                         struct rpc_timeout *strategy)
 517 {
 518         int     result;
 519 
 520         result = rpc_doio(rsock, req, strategy, 0);
 521         if (req->rq_slot == NULL)
 522                 printk(KERN_WARNING "RPC: bad: rq_slot == NULL\n");
 523         rpc_release(rsock, req);
 524         return result;
 525 }
 526 
 527 struct rpc_sock *
 528 rpc_makesock(struct file *file)
     /* [previous][next][first][last][top][bottom][index][help] */
 529 {
 530         struct rpc_sock *rsock;
 531         struct socket   *sock;
 532         struct sock     *sk;
 533         struct rpc_wait *slot;
 534         int             i;
 535 
 536         dprintk("RPC: make RPC socket...\n");
 537         sock = &file->f_inode->u.socket_i;
 538         if (sock->type != SOCK_DGRAM || sock->ops->family != AF_INET) {
 539                 printk(KERN_WARNING "RPC: only UDP sockets supported\n");
 540                 return NULL;
 541         }
 542         sk = (struct sock *) sock->data;
 543 
 544         if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 545                 return NULL;
 546         memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 547 
 548         rsock->sock = sock;
 549         rsock->inet = sk;
 550         rsock->file = file;
 551         rsock->cwnd = RPC_INITCWND;
 552 
 553         dprintk("RPC: slots %p, %p, ...\n", rsock->waiting, rsock->waiting + 1);
 554         rsock->free = rsock->waiting;
 555         for (i = 0, slot = rsock->waiting; i < RPC_MAXREQS-1; i++, slot++)
 556                 slot->w_next = slot + 1;
 557         slot->w_next = NULL;
 558 
 559         dprintk("RPC: made socket %p\n", rsock);
 560         return rsock;
 561 }
 562 
 563 int
 564 rpc_closesock(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 565 {
 566         unsigned long   t0 = jiffies;
 567 
 568         rsock->shutdown = 1;
 569         while (rsock->pending || rsock->backlog) {
 570                 interruptible_sleep_on(&rsock->shutwait);
 571                 if (current->signal & ~current->blocked)
 572                         return -EINTR;
 573 #if 1
 574                 if (t0 && t0 - jiffies > 60 * HZ) {
 575                         printk(KERN_WARNING "RPC: hanging in rpc_closesock.\n");
 576                         t0 = 0;
 577                 }
 578 #endif
 579         }
 580 
 581         kfree(rsock);
 582         return 0;
 583 }

/* [previous][next][first][last][top][bottom][index][help] */