root/fs/nfs/rpcsock.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpc_insque
  2. rpc_remque
  3. rpc_sendto
  4. rpc_select
  5. rpc_recvfrom
  6. rpc_call_one
  7. rpc_call
  8. rpc_makesock
  9. rpc_closesock

   1 /*
   2  *  linux/fs/nfs/rpcsock.c
   3  *
   4  *  This is a generic RPC call interface for datagram sockets that is able
   5  *  to place several concurrent RPC requests at the same time. It works like
   6  *  this:
   7  *
   8  *  -   When a process places a call, it allocates a request slot if
   9  *      one is available. Otherwise, it sleeps on the backlog queue.
  10  *  -   The first process on the receive queue waits for the next RPC reply,
  11  *      and peeks at the XID. If it finds a matching request, it receives
  12  *      the datagram on behalf of that process and wakes it up. Otherwise,
  13  *      the datagram is discarded.
  14  *  -   If the process having received the datagram was the first one on
  15  *      the receive queue, it wakes up the next one to listen for replies.
  16  *  -   It then removes itself from the request queue. If there are more
  17  *      callers waiting on the backlog queue, they are woken up, too.
  18  *
  19  *  Copyright (C) 1995, Olaf Kirch <okir@monad.swb.de>
  20  */
  21 
  22 #ifdef MODULE
  23 #include <linux/module.h>
  24 #endif
  25 
  26 #include <linux/types.h>
  27 #include <linux/malloc.h>
  28 #include <linux/sched.h>
  29 #include <linux/nfs_fs.h>
  30 #include <linux/errno.h>
  31 #include <linux/socket.h>
  32 #include <linux/fcntl.h>
  33 #include <asm/segment.h>
  34 #include <linux/in.h>
  35 #include <linux/net.h>
  36 #include <linux/mm.h>
  37 #include <linux/rpcsock.h>
  38 
  39 #define msleep(sec)     { current->timeout = sec * HZ / 1000; \
  40                           current->state = TASK_INTERRUPTIBLE; \
  41                           schedule(); \
  42                         }
  43 #define dprintk         if (0) printk
  44 
  45 static inline void
  46 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  47 {
  48         struct rpc_wait *tmp;
  49 
  50         if ((tmp = rsock->tail) != NULL) {
  51                 tmp->next = slot;
  52         } else {
  53                 rsock->head = slot;
  54         }
  55         rsock->tail = slot;
  56         slot->prev = tmp;
  57         slot->next = NULL;
  58         dprintk("RPC: inserted %08lx into queue.\n", (long)slot);
  59         dprintk("RPC: head = %08lx, tail = %08lx.\n",
  60                         (long) rsock->head, (long) rsock->tail);
  61 }
  62 
  63 static inline void
  64 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  65 {
  66         struct rpc_wait *prev = slot->prev,
  67                         *next = slot->next;
  68 
  69         if (prev != NULL)
  70                 prev->next = next;
  71         else
  72                 rsock->head = next;
  73         if (next != NULL)
  74                 next->prev = prev;
  75         else
  76                 rsock->tail = prev;
  77         dprintk("RPC: removed %08lx from queue.\n", (long)slot);
  78         dprintk("RPC: head = %08lx, tail = %08lx.\n",
  79                         (long) rsock->head, (long) rsock->tail);
  80 }
  81 
  82 static inline int
  83 rpc_sendto(struct rpc_sock *rsock, const int *buf, int len,
     /* [previous][next][first][last][top][bottom][index][help] */
  84                         struct sockaddr *sap, int salen)
  85 {
  86         struct socket   *sock = rsock->sock;
  87         unsigned long   oldfs;
  88         int             result;
  89 
  90         dprintk("RPC: sending %d bytes (buf %08lx)\n", len, (long) buf);
  91         oldfs = get_fs();
  92         set_fs(get_ds());
  93         result = sock->ops->sendto(sock, buf, len, 0, 0, sap, salen);
  94         set_fs(oldfs);
  95         dprintk("RPC: result = %d\n", result);
  96 
  97         return result;
  98 }
  99 
 100 /*
 101  * This code is slightly complicated. Since the networking code does not
 102  * honor the current->timeout value, we have to select on the socket.
 103  */
 104 static inline int
 105 rpc_select(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 106 {
 107         struct select_table_entry entry;
 108         struct file     *file = rsock->file;
 109         select_table    wait_table;
 110 
 111         dprintk("RPC: selecting on socket...\n");
 112         wait_table.nr = 0;
 113         wait_table.entry = &entry;
 114         current->state = TASK_INTERRUPTIBLE;
 115         if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
 116          && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
 117                 schedule();
 118                 remove_wait_queue(entry.wait_address, &entry.wait);
 119                 current->state = TASK_RUNNING;
 120                 if (current->signal & ~current->blocked)
 121                         return -ERESTARTSYS;
 122                 if (current->timeout == 0)
 123                         return -ETIMEDOUT;
 124         } else if (wait_table.nr)
 125                 remove_wait_queue(entry.wait_address, &entry.wait);
 126         current->state = TASK_RUNNING;
 127         dprintk("RPC: ...Okay, there appears to be some data.\n");
 128         return 0;
 129 }
 130 
 131 static inline int
 132 rpc_recvfrom(struct rpc_sock *rsock, int *buf, int len,
     /* [previous][next][first][last][top][bottom][index][help] */
 133                         struct sockaddr *sap, int salen, int flags)
 134 {
 135         struct socket   *sock = rsock->sock;
 136         struct sockaddr sa;
 137         int             alen = sizeof(sa);
 138         unsigned long   oldfs;
 139         int             result;
 140 
 141         dprintk("RPC: receiving %d bytes max (buf %08lx)\n", len, (long) buf);
 142         oldfs = get_fs();
 143         set_fs(get_ds());
 144         result = sock->ops->recvfrom(sock, buf, len, 1, flags, &sa, &alen);
 145         set_fs(oldfs);
 146         dprintk("RPC: result = %d\n", result);
 147 
 148 #if 0
 149         if (alen != salen || memcmp(&sa, sap, alen)) {
 150                 dprintk("RPC: reply address mismatch... rejected.\n");
 151                 result = -EAGAIN;
 152         }
 153 #endif
 154 
 155         return result;
 156 }
 157 
 158 /*
 159  * Place the actual RPC call.
 160  */
 161 static int
 162 rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
     /* [previous][next][first][last][top][bottom][index][help] */
 163                 struct sockaddr *sap, int salen,
 164                 const int *sndbuf, int slen, int *rcvbuf, int rlen)
 165 {
 166         struct rpc_wait *rovr = NULL;
 167         int             result;
 168         u32             xid;
 169         int             safe;
 170 
 171         dprintk("RPC: placing one call, rsock = %08lx, slot = %08lx, "
 172                 "sap = %08lx, salen = %d, "
 173                 "sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
 174                 (long) rsock, (long) slot, (long) sap, 
 175                 salen, (long) sndbuf, slen, (long) rcvbuf, rlen);
 176 
 177         result = rpc_sendto(rsock, sndbuf, slen, sap, salen);
 178         if (result < 0)
 179                 return result;
 180 
 181         do {
 182                 /* We are not the receiver. Wait on the side lines. */
 183                 if (rsock->head != slot) {
 184                         interruptible_sleep_on(&slot->wait);
 185                         if (slot->gotit)
 186                                 break;
 187                         if (current->timeout != 0)
 188                                 continue;
 189                         if (rsock->shutdown) {
 190                                 printk("RPC: aborting call due to shutdown.\n");
 191                                 return -EIO;
 192                         }
 193                         return -ETIMEDOUT;
 194                 }
 195                 
 196                 /* wait for data to arrive */
 197                 result = rpc_select(rsock);
 198                 if (result < 0) {
 199                         dprintk("RPC: select error = %d\n", result);
 200                         break;
 201                 }
 202 
 203                 result = rpc_recvfrom(rsock, (int *)&xid, sizeof(xid),
 204                                                 sap, salen, MSG_PEEK);
 205                 if (result < 0) {
 206                         switch (-result) {
 207                         case EAGAIN: case ECONNREFUSED:
 208                                 continue;
 209                         default:
 210                                 dprintk("rpc_call: recv error = %d\n", result);
 211                         case ERESTARTSYS:
 212                                 return result;
 213                         }
 214                 }
 215 
 216                 /* Look for the caller */
 217                 safe = 0;
 218                 for (rovr = rsock->head; rovr; rovr = rovr->next) {
 219                         if (safe++ > NRREQS) {
 220                                 printk("RPC: loop in request Q!!\n");
 221                                 rovr = NULL;
 222                                 break;
 223                         }
 224                         if (rovr->xid == xid)
 225                                 break;
 226                 }
 227 
 228                 if (!rovr || rovr->gotit) {
 229                         /* bad XID or duplicate reply, discard dgram */
 230                         dprintk("RPC: bad XID or duplicate reply.\n");
 231                         rpc_recvfrom(rsock, (int *)&xid, sizeof(xid),
 232                                                 sap, salen, 0);
 233                         continue;
 234                 }
 235                 rovr->gotit = 1;
 236 
 237                 /* Now receive the reply */
 238                 result = rpc_recvfrom(rsock, rovr->buf, rovr->len,
 239                                                 sap, salen, 0);
 240 
 241                 /* If this is not for ourselves, wake up the caller */
 242                 if (rovr != slot)
 243                         wake_up(&rovr->wait);
 244         } while (rovr != slot);
 245 
 246         /* This is somewhat tricky. We rely on the fact that we are able to
 247          * remove ourselves from the queues before the next reader is scheduled,
 248          * otherwise it would find that we're still at the head of the queue
 249          * and go to sleep again.
 250          */
 251         if (rsock->head == slot && slot->next != NULL)
 252                 wake_up(&slot->next->wait);
 253 
 254         return result;
 255 }
 256 
 257 /*
 258  * Generic RPC call routine. This handles retries and timeouts etc pp
 259  */
 260 int
 261 rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
     /* [previous][next][first][last][top][bottom][index][help] */
 262                 const int *sndbuf, int slen, int *rcvbuf, int rlen,
 263                 struct rpc_timeout *strategy, int flag)
 264 {
 265         struct rpc_wait         *slot;
 266         int                     result, retries;
 267         unsigned long           timeout;
 268 
 269         timeout = strategy->init_timeout;
 270         retries = 0;
 271         slot = NULL;
 272 
 273         do {
 274                 dprintk("RPC call TP1\n");
 275                 current->timeout = jiffies + timeout;
 276                 if (slot == NULL) {
 277                         while ((slot = rsock->free) == NULL) {
 278                                 if (!flag) {
 279                                         current->timeout = 0;
 280                                         return -ENOBUFS;
 281                                 }
 282                                 interruptible_sleep_on(&rsock->backlog);
 283                                 if (current->timeout == 0) {
 284                                         result = -ETIMEDOUT;
 285                                         goto timedout;
 286                                 }
 287                                 if (rsock->shutdown) {
 288                                         printk("RPC: aborting call due to shutdown.\n");
 289                                         current->timeout = 0;
 290                                         return -EIO;
 291                                 }
 292                         }
 293                         dprintk("RPC call TP2\n");
 294                         slot->gotit = 0;
 295                         slot->xid = *(u32 *)sndbuf;
 296                         slot->buf = rcvbuf;
 297                         slot->len = rlen;
 298                         rsock->free = slot->next;
 299                         rpc_insque(rsock, slot);
 300                 }
 301 
 302                 dprintk("RPC call TP3\n");
 303                 result = rpc_call_one(rsock, slot, sap, addrlen,
 304                                         sndbuf, slen, rcvbuf, rlen);
 305                 if (result != -ETIMEDOUT)
 306                         break;
 307 
 308 timedout:
 309                 dprintk("RPC call TP4\n");
 310                 dprintk("RPC: rpc_call_one returned timeout.\n");
 311                 if (strategy->exponential)
 312                         timeout <<= 1;
 313                 else
 314                         timeout += strategy->increment;
 315                 if (strategy->max_timeout && timeout >= strategy->max_timeout)
 316                         timeout = strategy->max_timeout;
 317                 if (strategy->retries && ++retries >= strategy->retries)
 318                         break;
 319         } while (1);
 320 
 321         dprintk("RPC call TP5\n");
 322         current->timeout = 0;
 323         if (slot != NULL) {
 324                 dprintk("RPC call TP6\n");
 325                 rpc_remque(rsock, slot);
 326                 slot->next = rsock->free;
 327                 rsock->free = slot;
 328 
 329                 /* wake up tasks that haven't sent anything yet. (Waking
 330                  * up the first one on the wait queue would be enough) */
 331                 if (rsock->backlog)
 332                         wake_up(&rsock->backlog);
 333         }
 334 
 335         if (rsock->shutdown)
 336                 wake_up(&rsock->shutwait);
 337 
 338         return result;
 339 }
 340 
 341 struct rpc_sock *
 342 rpc_makesock(struct file *file)
     /* [previous][next][first][last][top][bottom][index][help] */
 343 {
 344         struct rpc_sock *rsock;
 345         struct rpc_wait *slot;
 346         int             i;
 347 
 348         dprintk("RPC: make RPC socket...\n");
 349         if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 350                 return NULL;
 351         memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 352 
 353         rsock->sock = &file->f_inode->u.socket_i;
 354         rsock->file = file;
 355 
 356         rsock->free = rsock->waiting;
 357         for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
 358                 slot->next = slot + 1;
 359         slot->next = NULL;
 360 
 361         /* --- taken care of by memset above ---
 362         rsock->backlog = NULL;
 363         rsock->head = rsock->tail = NULL;
 364 
 365         rsock->shutwait = NULL;
 366         rsock->shutdown = 0;
 367          */
 368 
 369         dprintk("RPC: made socket %08lx", (long) rsock);
 370         return rsock;
 371 }
 372 
 373 int
 374 rpc_closesock(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 375 {
 376         unsigned long   t0 = jiffies;
 377 
 378         rsock->shutdown = 1;
 379         while (rsock->head || rsock->backlog) {
 380                 interruptible_sleep_on(&rsock->shutwait);
 381                 if (current->signal & ~current->blocked)
 382                         return -EINTR;
 383 #if 1
 384                 if (t0 && t0 - jiffies > 60 * HZ) {
 385                         printk("RPC: hanging in rpc_closesock.\n");
 386                         t0 = 0;
 387                 }
 388 #endif
 389         }
 390 
 391         kfree(rsock);
 392         return 0;
 393 }

/* [previous][next][first][last][top][bottom][index][help] */