root/fs/nfs/rpcsock.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpc_insque
  2. rpc_remque
  3. rpc_sendmsg
  4. rpc_select
  5. rpc_recvmsg
  6. rpc_call_one
  7. rpc_call
  8. rpc_makesock
  9. rpc_closesock

   1 /*
   2  *  linux/fs/nfs/rpcsock.c
   3  *
   4  *  This is a generic RPC call interface for datagram sockets that is able
   5  *  to place several concurrent RPC requests at the same time. It works like
   6  *  this:
   7  *
   8  *  -   When a process places a call, it allocates a request slot if
   9  *      one is available. Otherwise, it sleeps on the backlog queue.
  10  *  -   The first process on the receive queue waits for the next RPC reply,
  11  *      and peeks at the XID. If it finds a matching request, it receives
  12  *      the datagram on behalf of that process and wakes it up. Otherwise,
  13  *      the datagram is discarded.
  14  *  -   If the process having received the datagram was the first one on
  15  *      the receive queue, it wakes up the next one to listen for replies.
  16  *  -   It then removes itself from the request queue. If there are more
  17  *      callers waiting on the backlog queue, they are woken up, too.
  18  *
  19  *  Copyright (C) 1995, Olaf Kirch <okir@monad.swb.de>
  20  */
  21 
  22 #ifdef MODULE
  23 #include <linux/module.h>
  24 #endif
  25 
  26 #include <linux/types.h>
  27 #include <linux/malloc.h>
  28 #include <linux/sched.h>
  29 #include <linux/nfs_fs.h>
  30 #include <linux/errno.h>
  31 #include <linux/socket.h>
  32 #include <linux/fcntl.h>
  33 #include <asm/segment.h>
  34 #include <linux/in.h>
  35 #include <linux/net.h>
  36 #include <linux/mm.h>
  37 #include <linux/rpcsock.h>
  38 
  39 #define msleep(sec)     { current->timeout = sec * HZ / 1000; \
  40                           current->state = TASK_INTERRUPTIBLE; \
  41                           schedule(); \
  42                         }
  43 #define dprintk         if (0) printk
  44 
  45 static inline void
  46 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  47 {
  48         struct rpc_wait *tmp;
  49 
  50         if ((tmp = rsock->tail) != NULL) {
  51                 tmp->next = slot;
  52         } else {
  53                 rsock->head = slot;
  54         }
  55         rsock->tail = slot;
  56         slot->prev = tmp;
  57         slot->next = NULL;
  58         dprintk("RPC: inserted %08lx into queue.\n", (long)slot);
  59         dprintk("RPC: head = %08lx, tail = %08lx.\n",
  60                         (long) rsock->head, (long) rsock->tail);
  61 }
  62 
  63 static inline void
  64 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  65 {
  66         struct rpc_wait *prev = slot->prev,
  67                         *next = slot->next;
  68 
  69         if (prev != NULL)
  70                 prev->next = next;
  71         else
  72                 rsock->head = next;
  73         if (next != NULL)
  74                 next->prev = prev;
  75         else
  76                 rsock->tail = prev;
  77         dprintk("RPC: removed %08lx from queue.\n", (long)slot);
  78         dprintk("RPC: head = %08lx, tail = %08lx.\n",
  79                         (long) rsock->head, (long) rsock->tail);
  80 }
  81 
  82 static inline int
  83 rpc_sendmsg(struct rpc_sock *rsock, struct msghdr *msg, int len)
     /* [previous][next][first][last][top][bottom][index][help] */
  84 {
  85         struct socket   *sock = rsock->sock;
  86         unsigned long   oldfs;
  87         int             result;
  88 
  89         dprintk("RPC: sending %d bytes (buf %p)\n", len, msg->msg_iov[0].iov_base);
  90         oldfs = get_fs();
  91         set_fs(get_ds());
  92         result = sock->ops->sendmsg(sock, msg, len, 0, 0);
  93         set_fs(oldfs);
  94         dprintk("RPC: result = %d\n", result);
  95 
  96         return result;
  97 }
  98 
  99 /*
 100  * This code is slightly complicated. Since the networking code does not
 101  * honor the current->timeout value, we have to select on the socket.
 102  */
 103 static inline int
 104 rpc_select(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 105 {
 106         struct select_table_entry entry;
 107         struct file     *file = rsock->file;
 108         select_table    wait_table;
 109 
 110         dprintk("RPC: selecting on socket...\n");
 111         wait_table.nr = 0;
 112         wait_table.entry = &entry;
 113         current->state = TASK_INTERRUPTIBLE;
 114         if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
 115          && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
 116                 schedule();
 117                 remove_wait_queue(entry.wait_address, &entry.wait);
 118                 current->state = TASK_RUNNING;
 119                 if (current->signal & ~current->blocked)
 120                         return -ERESTARTSYS;
 121                 if (current->timeout == 0)
 122                         return -ETIMEDOUT;
 123         } else if (wait_table.nr)
 124                 remove_wait_queue(entry.wait_address, &entry.wait);
 125         current->state = TASK_RUNNING;
 126         dprintk("RPC: ...Okay, there appears to be some data.\n");
 127         return 0;
 128 }
 129 
 130 static inline int
 131 rpc_recvmsg(struct rpc_sock *rsock, struct msghdr *msg, int len,int flags)
     /* [previous][next][first][last][top][bottom][index][help] */
 132 {
 133         struct socket   *sock = rsock->sock;
 134         struct sockaddr sa;
 135         int             alen = sizeof(sa);
 136         unsigned long   oldfs;
 137         int             result;
 138 
 139         dprintk("RPC: receiving %d bytes max (buf %p)\n", len, msg->msg_iov[0].iov_base);
 140         oldfs = get_fs();
 141         set_fs(get_ds());
 142         result = sock->ops->recvmsg(sock, msg, len, 1, flags, &alen);
 143         set_fs(oldfs);
 144         dprintk("RPC: result = %d\n", result);
 145 
 146 #if 0
 147         if (alen != salen || memcmp(&sa, sap, alen)) {
 148                 dprintk("RPC: reply address mismatch... rejected.\n");
 149                 result = -EAGAIN;
 150         }
 151 #endif
 152 
 153         return result;
 154 }
 155 
 156 /*
 157  * Place the actual RPC call.
 158  */
 159 static int
 160 rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
     /* [previous][next][first][last][top][bottom][index][help] */
 161                 struct sockaddr *sap, int salen,
 162                 const int *sndbuf, int slen, int *rcvbuf, int rlen)
 163 {
 164         struct rpc_wait *rovr = NULL;
 165         int             result;
 166         u32             xid;
 167         int             safe;
 168         struct msghdr   msg;
 169         struct iovec    iov;
 170         
 171         msg.msg_iov     =       &iov;
 172         msg.msg_iovlen  =       1;
 173         msg.msg_name    =       (void *)sap;
 174         msg.msg_namelen =       salen;
 175         msg.msg_accrights =     NULL;
 176         iov.iov_base    =       (void *)sndbuf;
 177         iov.iov_len     =       slen;
 178 
 179         dprintk("RPC: placing one call, rsock = %08lx, slot = %08lx, "
 180                 "sap = %08lx, salen = %d, "
 181                 "sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
 182                 (long) rsock, (long) slot, (long) sap, 
 183                 salen, (long) sndbuf, slen, (long) rcvbuf, rlen);
 184 
 185         result = rpc_sendmsg(rsock, &msg, slen);
 186         if (result < 0)
 187                 return result;
 188 
 189         do {
 190                 /* We are not the receiver. Wait on the side lines. */
 191                 if (rsock->head != slot) {
 192                         interruptible_sleep_on(&slot->wait);
 193                         if (slot->gotit)
 194                                 break;
 195                         if (current->timeout != 0)
 196                                 continue;
 197                         if (rsock->shutdown) {
 198                                 printk("RPC: aborting call due to shutdown.\n");
 199                                 return -EIO;
 200                         }
 201                         return -ETIMEDOUT;
 202                 }
 203                 
 204                 /* wait for data to arrive */
 205                 result = rpc_select(rsock);
 206                 if (result < 0) {
 207                         dprintk("RPC: select error = %d\n", result);
 208                         break;
 209                 }
 210 
 211                 iov.iov_base=(void *)&xid;
 212                 iov.iov_len=sizeof(xid);
 213                 
 214                 result = rpc_recvmsg(rsock, &msg, sizeof(xid), MSG_PEEK);
 215                 if (result < 0) {
 216                         switch (-result) {
 217                         case EAGAIN: case ECONNREFUSED:
 218                                 continue;
 219                         default:
 220                                 dprintk("rpc_call: recv error = %d\n", result);
 221                         case ERESTARTSYS:
 222                                 return result;
 223                         }
 224                 }
 225 
 226                 /* Look for the caller */
 227                 safe = 0;
 228                 for (rovr = rsock->head; rovr; rovr = rovr->next) {
 229                         if (safe++ > NRREQS) {
 230                                 printk("RPC: loop in request Q!!\n");
 231                                 rovr = NULL;
 232                                 break;
 233                         }
 234                         if (rovr->xid == xid)
 235                                 break;
 236                 }
 237 
 238                 if (!rovr || rovr->gotit) {
 239                         /* bad XID or duplicate reply, discard dgram */
 240                         dprintk("RPC: bad XID or duplicate reply.\n");
 241                         iov.iov_base=(void *)&xid;
 242                         iov.iov_len=sizeof(xid);
 243                         rpc_recvmsg(rsock, &msg, sizeof(xid),0);
 244                         continue;
 245                 }
 246                 rovr->gotit = 1;
 247 
 248                 /* Now receive the reply */
 249                 
 250                 iov.iov_base=rovr->buf;
 251                 iov.iov_len=rovr->len;
 252                 
 253                 result = rpc_recvmsg(rsock, &msg, rovr->len, 0);
 254 
 255                 /* If this is not for ourselves, wake up the caller */
 256                 if (rovr != slot)
 257                         wake_up(&rovr->wait);
 258         } while (rovr != slot);
 259 
 260         /* This is somewhat tricky. We rely on the fact that we are able to
 261          * remove ourselves from the queues before the next reader is scheduled,
 262          * otherwise it would find that we're still at the head of the queue
 263          * and go to sleep again.
 264          */
 265         if (rsock->head == slot && slot->next != NULL)
 266                 wake_up(&slot->next->wait);
 267 
 268         return result;
 269 }
 270 
 271 /*
 272  * Generic RPC call routine. This handles retries and timeouts etc pp
 273  */
 274 int
 275 rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
     /* [previous][next][first][last][top][bottom][index][help] */
 276                 const int *sndbuf, int slen, int *rcvbuf, int rlen,
 277                 struct rpc_timeout *strategy, int flag)
 278 {
 279         struct rpc_wait         *slot;
 280         int                     result, retries;
 281         unsigned long           timeout;
 282 
 283         timeout = strategy->init_timeout;
 284         retries = 0;
 285         slot = NULL;
 286 
 287         do {
 288                 dprintk("RPC call TP1\n");
 289                 current->timeout = jiffies + timeout;
 290                 if (slot == NULL) {
 291                         while ((slot = rsock->free) == NULL) {
 292                                 if (!flag) {
 293                                         current->timeout = 0;
 294                                         return -ENOBUFS;
 295                                 }
 296                                 interruptible_sleep_on(&rsock->backlog);
 297                                 if (current->timeout == 0) {
 298                                         result = -ETIMEDOUT;
 299                                         goto timedout;
 300                                 }
 301                                 if (rsock->shutdown) {
 302                                         printk("RPC: aborting call due to shutdown.\n");
 303                                         current->timeout = 0;
 304                                         return -EIO;
 305                                 }
 306                         }
 307                         dprintk("RPC call TP2\n");
 308                         slot->gotit = 0;
 309                         slot->xid = *(u32 *)sndbuf;
 310                         slot->buf = rcvbuf;
 311                         slot->len = rlen;
 312                         rsock->free = slot->next;
 313                         rpc_insque(rsock, slot);
 314                 }
 315 
 316                 dprintk("RPC call TP3\n");
 317                 result = rpc_call_one(rsock, slot, sap, addrlen,
 318                                         sndbuf, slen, rcvbuf, rlen);
 319                 if (result != -ETIMEDOUT)
 320                         break;
 321 
 322 timedout:
 323                 dprintk("RPC call TP4\n");
 324                 dprintk("RPC: rpc_call_one returned timeout.\n");
 325                 if (strategy->exponential)
 326                         timeout <<= 1;
 327                 else
 328                         timeout += strategy->increment;
 329                 if (strategy->max_timeout && timeout >= strategy->max_timeout)
 330                         timeout = strategy->max_timeout;
 331                 if (strategy->retries && ++retries >= strategy->retries)
 332                         break;
 333         } while (1);
 334 
 335         dprintk("RPC call TP5\n");
 336         current->timeout = 0;
 337         if (slot != NULL) {
 338                 dprintk("RPC call TP6\n");
 339                 rpc_remque(rsock, slot);
 340                 slot->next = rsock->free;
 341                 rsock->free = slot;
 342 
 343                 /* wake up tasks that haven't sent anything yet. (Waking
 344                  * up the first one on the wait queue would be enough) */
 345                 if (rsock->backlog)
 346                         wake_up(&rsock->backlog);
 347         }
 348 
 349         if (rsock->shutdown)
 350                 wake_up(&rsock->shutwait);
 351 
 352         return result;
 353 }
 354 
 355 struct rpc_sock *
 356 rpc_makesock(struct file *file)
     /* [previous][next][first][last][top][bottom][index][help] */
 357 {
 358         struct rpc_sock *rsock;
 359         struct rpc_wait *slot;
 360         int             i;
 361 
 362         dprintk("RPC: make RPC socket...\n");
 363         if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 364                 return NULL;
 365         memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 366 
 367         rsock->sock = &file->f_inode->u.socket_i;
 368         rsock->file = file;
 369 
 370         rsock->free = rsock->waiting;
 371         for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
 372                 slot->next = slot + 1;
 373         slot->next = NULL;
 374 
 375         /* --- taken care of by memset above ---
 376         rsock->backlog = NULL;
 377         rsock->head = rsock->tail = NULL;
 378 
 379         rsock->shutwait = NULL;
 380         rsock->shutdown = 0;
 381          */
 382 
 383         dprintk("RPC: made socket %08lx", (long) rsock);
 384         return rsock;
 385 }
 386 
 387 int
 388 rpc_closesock(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 389 {
 390         unsigned long   t0 = jiffies;
 391 
 392         rsock->shutdown = 1;
 393         while (rsock->head || rsock->backlog) {
 394                 interruptible_sleep_on(&rsock->shutwait);
 395                 if (current->signal & ~current->blocked)
 396                         return -EINTR;
 397 #if 1
 398                 if (t0 && t0 - jiffies > 60 * HZ) {
 399                         printk("RPC: hanging in rpc_closesock.\n");
 400                         t0 = 0;
 401                 }
 402 #endif
 403         }
 404 
 405         kfree(rsock);
 406         return 0;
 407 }

/* [previous][next][first][last][top][bottom][index][help] */