root/fs/nfs/rpcsock.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpc_insque
  2. rpc_remque
  3. rpc_sendmsg
  4. rpc_select
  5. rpc_recvmsg
  6. rpc_call_one
  7. rpc_call
  8. rpc_makesock
  9. rpc_closesock

   1 /*
   2  *  linux/fs/nfs/rpcsock.c
   3  *
   4  *  This is a generic RPC call interface for datagram sockets that is able
   5  *  to place several concurrent RPC requests at the same time. It works like
   6  *  this:
   7  *
   8  *  -   When a process places a call, it allocates a request slot if
   9  *      one is available. Otherwise, it sleeps on the backlog queue.
  10  *  -   The first process on the receive queue waits for the next RPC reply,
  11  *      and peeks at the XID. If it finds a matching request, it receives
  12  *      the datagram on behalf of that process and wakes it up. Otherwise,
  13  *      the datagram is discarded.
  14  *  -   If the process having received the datagram was the first one on
  15  *      the receive queue, it wakes up the next one to listen for replies.
  16  *  -   It then removes itself from the request queue. If there are more
  17  *      callers waiting on the backlog queue, they are woken up, too.
  18  *
  19  *  Copyright (C) 1995, Olaf Kirch <okir@monad.swb.de>
  20  */
  21 
  22 #include <linux/types.h>
  23 #include <linux/malloc.h>
  24 #include <linux/sched.h>
  25 #include <linux/nfs_fs.h>
  26 #include <linux/errno.h>
  27 #include <linux/socket.h>
  28 #include <linux/fcntl.h>
  29 #include <linux/in.h>
  30 #include <linux/net.h>
  31 #include <linux/mm.h>
  32 #include <linux/rpcsock.h>
  33 
  34 #include <asm/segment.h>
  35 
  36 #define msleep(sec)     { current->timeout = sec * HZ / 1000; \
  37                           current->state = TASK_INTERRUPTIBLE; \
  38                           schedule(); \
  39                         }
  40 #define dprintk         if (0) printk
  41 
  42 static inline void
  43 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  44 {
  45         struct rpc_wait *tmp;
  46 
  47         if ((tmp = rsock->tail) != NULL) {
  48                 tmp->next = slot;
  49         } else {
  50                 rsock->head = slot;
  51         }
  52         rsock->tail = slot;
  53         slot->prev = tmp;
  54         slot->next = NULL;
  55         dprintk("RPC: inserted %08lx into queue.\n", (long)slot);
  56         dprintk("RPC: head = %08lx, tail = %08lx.\n",
  57                         (long) rsock->head, (long) rsock->tail);
  58 }
  59 
  60 static inline void
  61 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  62 {
  63         struct rpc_wait *prev = slot->prev,
  64                         *next = slot->next;
  65 
  66         if (prev != NULL)
  67                 prev->next = next;
  68         else
  69                 rsock->head = next;
  70         if (next != NULL)
  71                 next->prev = prev;
  72         else
  73                 rsock->tail = prev;
  74         dprintk("RPC: removed %08lx from queue.\n", (long)slot);
  75         dprintk("RPC: head = %08lx, tail = %08lx.\n",
  76                         (long) rsock->head, (long) rsock->tail);
  77 }
  78 
  79 static inline int
  80 rpc_sendmsg(struct rpc_sock *rsock, struct msghdr *msg, int len)
     /* [previous][next][first][last][top][bottom][index][help] */
  81 {
  82         struct socket   *sock = rsock->sock;
  83         unsigned long   oldfs;
  84         int             result;
  85 
  86         dprintk("RPC: sending %d bytes (buf %p)\n", len, msg->msg_iov[0].iov_base);
  87         oldfs = get_fs();
  88         set_fs(get_ds());
  89         result = sock->ops->sendmsg(sock, msg, len, 0, 0);
  90         set_fs(oldfs);
  91         dprintk("RPC: result = %d\n", result);
  92 
  93         return result;
  94 }
  95 
  96 /*
  97  * This code is slightly complicated. Since the networking code does not
  98  * honor the current->timeout value, we have to select on the socket.
  99  */
 100 static inline int
 101 rpc_select(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 102 {
 103         struct select_table_entry entry;
 104         struct file     *file = rsock->file;
 105         select_table    wait_table;
 106 
 107         dprintk("RPC: selecting on socket...\n");
 108         wait_table.nr = 0;
 109         wait_table.entry = &entry;
 110         current->state = TASK_INTERRUPTIBLE;
 111         if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
 112          && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
 113                 schedule();
 114                 remove_wait_queue(entry.wait_address, &entry.wait);
 115                 current->state = TASK_RUNNING;
 116                 if (current->signal & ~current->blocked)
 117                         return -ERESTARTSYS;
 118                 if (current->timeout == 0)
 119                         return -ETIMEDOUT;
 120         } else if (wait_table.nr)
 121                 remove_wait_queue(entry.wait_address, &entry.wait);
 122         current->state = TASK_RUNNING;
 123         dprintk("RPC: ...Okay, there appears to be some data.\n");
 124         return 0;
 125 }
 126 
 127 static inline int
 128 rpc_recvmsg(struct rpc_sock *rsock, struct msghdr *msg, int len,int flags)
     /* [previous][next][first][last][top][bottom][index][help] */
 129 {
 130         struct socket   *sock = rsock->sock;
 131         struct sockaddr sa;
 132         int             alen = sizeof(sa);
 133         unsigned long   oldfs;
 134         int             result;
 135 
 136         dprintk("RPC: receiving %d bytes max (buf %p)\n", len, msg->msg_iov[0].iov_base);
 137         oldfs = get_fs();
 138         set_fs(get_ds());
 139         result = sock->ops->recvmsg(sock, msg, len, 1, flags, &alen);
 140         set_fs(oldfs);
 141         dprintk("RPC: result = %d\n", result);
 142 
 143 #if 0
 144         if (alen != salen || memcmp(&sa, sap, alen)) {
 145                 dprintk("RPC: reply address mismatch... rejected.\n");
 146                 result = -EAGAIN;
 147         }
 148 #endif
 149 
 150         return result;
 151 }
 152 
 153 /*
 154  * Place the actual RPC call.
 155  */
 156 static int
 157 rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
     /* [previous][next][first][last][top][bottom][index][help] */
 158                 struct sockaddr *sap, int salen,
 159                 const int *sndbuf, int slen, int *rcvbuf, int rlen)
 160 {
 161         struct rpc_wait *rovr = NULL;
 162         int             result;
 163         u32             xid;
 164         int             safe;
 165         struct msghdr   msg;
 166         struct iovec    iov;
 167         
 168         msg.msg_iov     =       &iov;
 169         msg.msg_iovlen  =       1;
 170         msg.msg_name    =       (void *)sap;
 171         msg.msg_namelen =       salen;
 172         msg.msg_accrights =     NULL;
 173         iov.iov_base    =       (void *)sndbuf;
 174         iov.iov_len     =       slen;
 175 
 176         dprintk("RPC: placing one call, rsock = %08lx, slot = %08lx, "
 177                 "sap = %08lx, salen = %d, "
 178                 "sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
 179                 (long) rsock, (long) slot, (long) sap, 
 180                 salen, (long) sndbuf, slen, (long) rcvbuf, rlen);
 181 
 182         result = rpc_sendmsg(rsock, &msg, slen);
 183         if (result < 0)
 184                 return result;
 185 
 186         do {
 187                 /* We are not the receiver. Wait on the side lines. */
 188                 if (rsock->head != slot) {
 189                         interruptible_sleep_on(&slot->wait);
 190                         if (slot->gotit)
 191                                 break;
 192                         if (current->timeout != 0)
 193                                 continue;
 194                         if (rsock->shutdown) {
 195                                 printk("RPC: aborting call due to shutdown.\n");
 196                                 return -EIO;
 197                         }
 198                         return -ETIMEDOUT;
 199                 }
 200                 
 201                 /* wait for data to arrive */
 202                 result = rpc_select(rsock);
 203                 if (result < 0) {
 204                         dprintk("RPC: select error = %d\n", result);
 205                         break;
 206                 }
 207 
 208                 iov.iov_base=(void *)&xid;
 209                 iov.iov_len=sizeof(xid);
 210                 
 211                 result = rpc_recvmsg(rsock, &msg, sizeof(xid), MSG_PEEK);
 212                 if (result < 0) {
 213                         switch (-result) {
 214                         case EAGAIN: case ECONNREFUSED:
 215                                 continue;
 216                         default:
 217                                 dprintk("rpc_call: recv error = %d\n", result);
 218                         case ERESTARTSYS:
 219                                 return result;
 220                         }
 221                 }
 222 
 223                 /* Look for the caller */
 224                 safe = 0;
 225                 for (rovr = rsock->head; rovr; rovr = rovr->next) {
 226                         if (safe++ > NRREQS) {
 227                                 printk("RPC: loop in request Q!!\n");
 228                                 rovr = NULL;
 229                                 break;
 230                         }
 231                         if (rovr->xid == xid)
 232                                 break;
 233                 }
 234 
 235                 if (!rovr || rovr->gotit) {
 236                         /* bad XID or duplicate reply, discard dgram */
 237                         dprintk("RPC: bad XID or duplicate reply.\n");
 238                         iov.iov_base=(void *)&xid;
 239                         iov.iov_len=sizeof(xid);
 240                         rpc_recvmsg(rsock, &msg, sizeof(xid),0);
 241                         continue;
 242                 }
 243                 rovr->gotit = 1;
 244 
 245                 /* Now receive the reply */
 246                 
 247                 iov.iov_base=rovr->buf;
 248                 iov.iov_len=rovr->len;
 249                 
 250                 result = rpc_recvmsg(rsock, &msg, rovr->len, 0);
 251 
 252                 /* If this is not for ourselves, wake up the caller */
 253                 if (rovr != slot)
 254                         wake_up(&rovr->wait);
 255         } while (rovr != slot);
 256 
 257         /* This is somewhat tricky. We rely on the fact that we are able to
 258          * remove ourselves from the queues before the next reader is scheduled,
 259          * otherwise it would find that we're still at the head of the queue
 260          * and go to sleep again.
 261          */
 262         if (rsock->head == slot && slot->next != NULL)
 263                 wake_up(&slot->next->wait);
 264 
 265         return result;
 266 }
 267 
 268 /*
 269  * Generic RPC call routine. This handles retries and timeouts etc pp
 270  */
 271 int
 272 rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
     /* [previous][next][first][last][top][bottom][index][help] */
 273                 const int *sndbuf, int slen, int *rcvbuf, int rlen,
 274                 struct rpc_timeout *strategy, int flag)
 275 {
 276         struct rpc_wait         *slot;
 277         int                     result, retries;
 278         unsigned long           timeout;
 279 
 280         timeout = strategy->init_timeout;
 281         retries = 0;
 282         slot = NULL;
 283 
 284         do {
 285                 dprintk("RPC call TP1\n");
 286                 current->timeout = jiffies + timeout;
 287                 if (slot == NULL) {
 288                         while ((slot = rsock->free) == NULL) {
 289                                 if (!flag) {
 290                                         current->timeout = 0;
 291                                         return -ENOBUFS;
 292                                 }
 293                                 interruptible_sleep_on(&rsock->backlog);
 294                                 if (current->timeout == 0) {
 295                                         result = -ETIMEDOUT;
 296                                         goto timedout;
 297                                 }
 298                                 if (rsock->shutdown) {
 299                                         printk("RPC: aborting call due to shutdown.\n");
 300                                         current->timeout = 0;
 301                                         return -EIO;
 302                                 }
 303                         }
 304                         dprintk("RPC call TP2\n");
 305                         slot->gotit = 0;
 306                         slot->xid = *(u32 *)sndbuf;
 307                         slot->buf = rcvbuf;
 308                         slot->len = rlen;
 309                         rsock->free = slot->next;
 310                         rpc_insque(rsock, slot);
 311                 }
 312 
 313                 dprintk("RPC call TP3\n");
 314                 result = rpc_call_one(rsock, slot, sap, addrlen,
 315                                         sndbuf, slen, rcvbuf, rlen);
 316                 if (result != -ETIMEDOUT)
 317                         break;
 318 
 319 timedout:
 320                 dprintk("RPC call TP4\n");
 321                 dprintk("RPC: rpc_call_one returned timeout.\n");
 322                 if (strategy->exponential)
 323                         timeout <<= 1;
 324                 else
 325                         timeout += strategy->increment;
 326                 if (strategy->max_timeout && timeout >= strategy->max_timeout)
 327                         timeout = strategy->max_timeout;
 328                 if (strategy->retries && ++retries >= strategy->retries)
 329                         break;
 330         } while (1);
 331 
 332         dprintk("RPC call TP5\n");
 333         current->timeout = 0;
 334         if (slot != NULL) {
 335                 dprintk("RPC call TP6\n");
 336                 rpc_remque(rsock, slot);
 337                 slot->next = rsock->free;
 338                 rsock->free = slot;
 339 
 340                 /* wake up tasks that haven't sent anything yet. (Waking
 341                  * up the first one on the wait queue would be enough) */
 342                 if (rsock->backlog)
 343                         wake_up(&rsock->backlog);
 344         }
 345 
 346         if (rsock->shutdown)
 347                 wake_up(&rsock->shutwait);
 348 
 349         return result;
 350 }
 351 
 352 struct rpc_sock *
 353 rpc_makesock(struct file *file)
     /* [previous][next][first][last][top][bottom][index][help] */
 354 {
 355         struct rpc_sock *rsock;
 356         struct rpc_wait *slot;
 357         int             i;
 358 
 359         dprintk("RPC: make RPC socket...\n");
 360         if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 361                 return NULL;
 362         memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 363 
 364         rsock->sock = &file->f_inode->u.socket_i;
 365         rsock->file = file;
 366 
 367         rsock->free = rsock->waiting;
 368         for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
 369                 slot->next = slot + 1;
 370         slot->next = NULL;
 371 
 372         /* --- taken care of by memset above ---
 373         rsock->backlog = NULL;
 374         rsock->head = rsock->tail = NULL;
 375 
 376         rsock->shutwait = NULL;
 377         rsock->shutdown = 0;
 378          */
 379 
 380         dprintk("RPC: made socket %08lx", (long) rsock);
 381         return rsock;
 382 }
 383 
 384 int
 385 rpc_closesock(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 386 {
 387         unsigned long   t0 = jiffies;
 388 
 389         rsock->shutdown = 1;
 390         while (rsock->head || rsock->backlog) {
 391                 interruptible_sleep_on(&rsock->shutwait);
 392                 if (current->signal & ~current->blocked)
 393                         return -EINTR;
 394 #if 1
 395                 if (t0 && t0 - jiffies > 60 * HZ) {
 396                         printk("RPC: hanging in rpc_closesock.\n");
 397                         t0 = 0;
 398                 }
 399 #endif
 400         }
 401 
 402         kfree(rsock);
 403         return 0;
 404 }

/* [previous][next][first][last][top][bottom][index][help] */