root/fs/nfs/rpcsock.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rpc_insque
  2. rpc_remque
  3. rpc_sendmsg
  4. rpc_select
  5. rpc_recvmsg
  6. rpc_call_one
  7. rpc_call
  8. rpc_makesock
  9. rpc_closesock

   1 /*
   2  *  linux/fs/nfs/rpcsock.c
   3  *
   4  *  This is a generic RPC call interface for datagram sockets that is able
   5  *  to place several concurrent RPC requests at the same time. It works like
   6  *  this:
   7  *
   8  *  -   When a process places a call, it allocates a request slot if
   9  *      one is available. Otherwise, it sleeps on the backlog queue.
  10  *  -   The first process on the receive queue waits for the next RPC reply,
  11  *      and peeks at the XID. If it finds a matching request, it receives
  12  *      the datagram on behalf of that process and wakes it up. Otherwise,
  13  *      the datagram is discarded.
  14  *  -   If the process having received the datagram was the first one on
  15  *      the receive queue, it wakes up the next one to listen for replies.
  16  *  -   It then removes itself from the request queue. If there are more
  17  *      callers waiting on the backlog queue, they are woken up, too.
  18  *
  19  *  Copyright (C) 1995, Olaf Kirch <okir@monad.swb.de>
  20  */
  21 
  22 #include <linux/types.h>
  23 #include <linux/malloc.h>
  24 #include <linux/sched.h>
  25 #include <linux/nfs_fs.h>
  26 #include <linux/errno.h>
  27 #include <linux/socket.h>
  28 #include <linux/fcntl.h>
  29 #include <linux/in.h>
  30 #include <linux/net.h>
  31 #include <linux/mm.h>
  32 #include <linux/rpcsock.h>
  33 
  34 #include <asm/segment.h>
  35 
  36 #define msleep(sec)     { current->timeout = sec * HZ / 1000; \
  37                           current->state = TASK_INTERRUPTIBLE; \
  38                           schedule(); \
  39                         }
  40                         
  41 #ifdef DEBUG_NFS                        
  42 #define dprintk(x)              printk(x)
  43 #else
  44 #define dprintk(x)
  45 #endif
  46 
  47 static inline void
  48 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  49 {
  50         struct rpc_wait *tmp;
  51 
  52         if ((tmp = rsock->tail) != NULL) {
  53                 tmp->next = slot;
  54         } else {
  55                 rsock->head = slot;
  56         }
  57         rsock->tail = slot;
  58         slot->prev = tmp;
  59         slot->next = NULL;
  60         dprintk(("RPC: inserted %08lx into queue.\n", (long)slot));
  61         dprintk(("RPC: head = %08lx, tail = %08lx.\n",
  62                         (long) rsock->head, (long) rsock->tail));
  63 }
  64 
  65 static inline void
  66 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
     /* [previous][next][first][last][top][bottom][index][help] */
  67 {
  68         struct rpc_wait *prev = slot->prev,
  69                         *next = slot->next;
  70 
  71         if (prev != NULL)
  72                 prev->next = next;
  73         else
  74                 rsock->head = next;
  75         if (next != NULL)
  76                 next->prev = prev;
  77         else
  78                 rsock->tail = prev;
  79         dprintk(("RPC: removed %08lx from queue.\n", (long)slot));
  80         dprintk(("RPC: head = %08lx, tail = %08lx.\n",
  81                         (long) rsock->head, (long) rsock->tail));
  82 }
  83 
  84 static inline int
  85 rpc_sendmsg(struct rpc_sock *rsock, struct msghdr *msg, int len)
     /* [previous][next][first][last][top][bottom][index][help] */
  86 {
  87         struct socket   *sock = rsock->sock;
  88         unsigned long   oldfs;
  89         int             result;
  90 
  91         dprintk(("RPC: sending %d bytes (buf %p)\n", len, msg->msg_iov[0].iov_base));
  92         oldfs = get_fs();
  93         set_fs(get_ds());
  94         result = sock->ops->sendmsg(sock, msg, len, 0, 0);
  95         set_fs(oldfs);
  96         dprintk(("RPC: result = %d\n", result));
  97 
  98         return result;
  99 }
 100 
 101 /*
 102  * This code is slightly complicated. Since the networking code does not
 103  * honor the current->timeout value, we have to select on the socket.
 104  */
 105 static inline int
 106 rpc_select(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 107 {
 108         struct select_table_entry entry;
 109         struct file     *file = rsock->file;
 110         select_table    wait_table;
 111 
 112         dprintk(("RPC: selecting on socket...\n"));
 113         wait_table.nr = 0;
 114         wait_table.entry = &entry;
 115         current->state = TASK_INTERRUPTIBLE;
 116         if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
 117          && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
 118                 schedule();
 119                 remove_wait_queue(entry.wait_address, &entry.wait);
 120                 current->state = TASK_RUNNING;
 121                 if (current->signal & ~current->blocked)
 122                         return -ERESTARTSYS;
 123                 if (current->timeout == 0)
 124                         return -ETIMEDOUT;
 125         } else if (wait_table.nr)
 126                 remove_wait_queue(entry.wait_address, &entry.wait);
 127         current->state = TASK_RUNNING;
 128         dprintk(("RPC: ...Okay, there appears to be some data.\n"));
 129         return 0;
 130 }
 131 
 132 static inline int
 133 rpc_recvmsg(struct rpc_sock *rsock, struct msghdr *msg, int len,int flags)
     /* [previous][next][first][last][top][bottom][index][help] */
 134 {
 135         struct socket   *sock = rsock->sock;
 136         struct sockaddr sa;
 137         int             alen = sizeof(sa);
 138         unsigned long   oldfs;
 139         int             result;
 140 
 141         dprintk(("RPC: receiving %d bytes max (buf %p)\n", len, msg->msg_iov[0].iov_base));
 142         oldfs = get_fs();
 143         set_fs(get_ds());
 144         result = sock->ops->recvmsg(sock, msg, len, 1, flags, &alen);
 145         set_fs(oldfs);
 146         dprintk(("RPC: result = %d\n", result));
 147 
 148 #if 0
 149         if (alen != salen || memcmp(&sa, sap, alen)) {
 150                 dprintk(("RPC: reply address mismatch... rejected.\n"));
 151                 result = -EAGAIN;
 152         }
 153 #endif
 154 
 155         return result;
 156 }
 157 
 158 /*
 159  * Place the actual RPC call.
 160  */
 161 static int
 162 rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
     /* [previous][next][first][last][top][bottom][index][help] */
 163                 struct sockaddr *sap, int salen,
 164                 const int *sndbuf, int slen, int *rcvbuf, int rlen)
 165 {
 166         struct rpc_wait *rovr = NULL;
 167         int             result;
 168         u32             xid;
 169         int             safe;
 170         struct msghdr   msg;
 171         struct iovec    iov;
 172         
 173         msg.msg_iov     =       &iov;
 174         msg.msg_iovlen  =       1;
 175         msg.msg_name    =       (void *)sap;
 176         msg.msg_namelen =       salen;
 177         msg.msg_accrights =     NULL;
 178         iov.iov_base    =       (void *)sndbuf;
 179         iov.iov_len     =       slen;
 180 
 181         dprintk(("RPC: placing one call, rsock = %08lx, slot = %08lx, "
 182                 "sap = %08lx, salen = %d, "
 183                 "sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
 184                 (long) rsock, (long) slot, (long) sap, 
 185                 salen, (long) sndbuf, slen, (long) rcvbuf, rlen));
 186 
 187         result = rpc_sendmsg(rsock, &msg, slen);
 188         if (result < 0)
 189                 return result;
 190 
 191         do {
 192                 /* We are not the receiver. Wait on the side lines. */
 193                 if (rsock->head != slot) {
 194                         interruptible_sleep_on(&slot->wait);
 195                         if (slot->gotit)
 196                                 break;
 197                         if (current->timeout != 0)
 198                                 continue;
 199                         if (rsock->shutdown) {
 200                                 printk("RPC: aborting call due to shutdown.\n");
 201                                 return -EIO;
 202                         }
 203                         return -ETIMEDOUT;
 204                 }
 205                 
 206                 /* wait for data to arrive */
 207                 result = rpc_select(rsock);
 208                 if (result < 0) {
 209                         dprintk(("RPC: select error = %d\n", result));
 210                         break;
 211                 }
 212 
 213                 iov.iov_base=(void *)&xid;
 214                 iov.iov_len=sizeof(xid);
 215                 
 216                 result = rpc_recvmsg(rsock, &msg, sizeof(xid), MSG_PEEK);
 217                 if (result < 0) {
 218                         switch (-result) {
 219                         case EAGAIN: case ECONNREFUSED:
 220                                 continue;
 221                         default:
 222                                 dprintk(("rpc_call: recv error = %d\n", result));
 223                         case ERESTARTSYS:
 224                                 return result;
 225                         }
 226                 }
 227 
 228                 /* Look for the caller */
 229                 safe = 0;
 230                 for (rovr = rsock->head; rovr; rovr = rovr->next) {
 231                         if (safe++ > NRREQS) {
 232                                 printk("RPC: loop in request Q!!\n");
 233                                 rovr = NULL;
 234                                 break;
 235                         }
 236                         if (rovr->xid == xid)
 237                                 break;
 238                 }
 239 
 240                 if (!rovr || rovr->gotit) {
 241                         /* bad XID or duplicate reply, discard dgram */
 242                         dprintk(("RPC: bad XID or duplicate reply.\n"));
 243                         iov.iov_base=(void *)&xid;
 244                         iov.iov_len=sizeof(xid);
 245                         rpc_recvmsg(rsock, &msg, sizeof(xid),0);
 246                         continue;
 247                 }
 248                 rovr->gotit = 1;
 249 
 250                 /* Now receive the reply */
 251                 
 252                 iov.iov_base=rovr->buf;
 253                 iov.iov_len=rovr->len;
 254                 
 255                 result = rpc_recvmsg(rsock, &msg, rovr->len, 0);
 256 
 257                 /* If this is not for ourselves, wake up the caller */
 258                 if (rovr != slot)
 259                         wake_up(&rovr->wait);
 260         } while (rovr != slot);
 261 
 262         /* This is somewhat tricky. We rely on the fact that we are able to
 263          * remove ourselves from the queues before the next reader is scheduled,
 264          * otherwise it would find that we're still at the head of the queue
 265          * and go to sleep again.
 266          */
 267         if (rsock->head == slot && slot->next != NULL)
 268                 wake_up(&slot->next->wait);
 269 
 270         return result;
 271 }
 272 
 273 /*
 274  * Generic RPC call routine. This handles retries and timeouts etc pp
 275  */
 276 int
 277 rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
     /* [previous][next][first][last][top][bottom][index][help] */
 278                 const int *sndbuf, int slen, int *rcvbuf, int rlen,
 279                 struct rpc_timeout *strategy, int flag)
 280 {
 281         struct rpc_wait         *slot;
 282         int                     result, retries;
 283         unsigned long           timeout;
 284 
 285         timeout = strategy->init_timeout;
 286         retries = 0;
 287         slot = NULL;
 288 
 289         do {
 290                 dprintk(("RPC call TP1\n"));
 291                 current->timeout = jiffies + timeout;
 292                 if (slot == NULL) {
 293                         while ((slot = rsock->free) == NULL) {
 294                                 if (!flag) {
 295                                         current->timeout = 0;
 296                                         return -ENOBUFS;
 297                                 }
 298                                 interruptible_sleep_on(&rsock->backlog);
 299                                 if (current->timeout == 0) {
 300                                         result = -ETIMEDOUT;
 301                                         goto timedout;
 302                                 }
 303                                 if (rsock->shutdown) {
 304                                         dprintk(("RPC: aborting call due to shutdown.\n"));
 305                                         current->timeout = 0;
 306                                         return -EIO;
 307                                 }
 308                         }
 309                         dprintk(("RPC call TP2\n"));
 310                         slot->gotit = 0;
 311                         slot->xid = *(u32 *)sndbuf;
 312                         slot->buf = rcvbuf;
 313                         slot->len = rlen;
 314                         rsock->free = slot->next;
 315                         rpc_insque(rsock, slot);
 316                 }
 317 
 318                 dprintk(("RPC call TP3\n"));
 319                 result = rpc_call_one(rsock, slot, sap, addrlen,
 320                                         sndbuf, slen, rcvbuf, rlen);
 321                 if (result != -ETIMEDOUT)
 322                         break;
 323 
 324 timedout:
 325                 dprintk(("RPC call TP4\n"));
 326                 dprintk(("RPC: rpc_call_one returned timeout.\n"));
 327                 if (strategy->exponential)
 328                         timeout <<= 1;
 329                 else
 330                         timeout += strategy->increment;
 331                 if (strategy->max_timeout && timeout >= strategy->max_timeout)
 332                         timeout = strategy->max_timeout;
 333                 if (strategy->retries && ++retries >= strategy->retries)
 334                         break;
 335         } while (1);
 336 
 337         dprintk(("RPC call TP5\n"));
 338         current->timeout = 0;
 339         if (slot != NULL) {
 340                 dprintk(("RPC call TP6\n"));
 341                 rpc_remque(rsock, slot);
 342                 slot->next = rsock->free;
 343                 rsock->free = slot;
 344 
 345                 /* wake up tasks that haven't sent anything yet. (Waking
 346                  * up the first one on the wait queue would be enough) */
 347                 if (rsock->backlog)
 348                         wake_up(&rsock->backlog);
 349         }
 350 
 351         if (rsock->shutdown)
 352                 wake_up(&rsock->shutwait);
 353 
 354         return result;
 355 }
 356 
 357 struct rpc_sock *
 358 rpc_makesock(struct file *file)
     /* [previous][next][first][last][top][bottom][index][help] */
 359 {
 360         struct rpc_sock *rsock;
 361         struct rpc_wait *slot;
 362         int             i;
 363 
 364         dprintk(("RPC: make RPC socket...\n"));
 365         if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
 366                 return NULL;
 367         memset(rsock, 0, sizeof(*rsock)); /* Nnnngh! */
 368 
 369         rsock->sock = &file->f_inode->u.socket_i;
 370         rsock->file = file;
 371 
 372         rsock->free = rsock->waiting;
 373         for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
 374                 slot->next = slot + 1;
 375         slot->next = NULL;
 376 
 377         /* --- taken care of by memset above ---
 378         rsock->backlog = NULL;
 379         rsock->head = rsock->tail = NULL;
 380 
 381         rsock->shutwait = NULL;
 382         rsock->shutdown = 0;
 383          */
 384 
 385         dprintk(("RPC: made socket %08lx", (long) rsock));
 386         return rsock;
 387 }
 388 
 389 int
 390 rpc_closesock(struct rpc_sock *rsock)
     /* [previous][next][first][last][top][bottom][index][help] */
 391 {
 392         unsigned long   t0 = jiffies;
 393 
 394         rsock->shutdown = 1;
 395         while (rsock->head || rsock->backlog) {
 396                 interruptible_sleep_on(&rsock->shutwait);
 397                 if (current->signal & ~current->blocked)
 398                         return -EINTR;
 399 #if 1
 400                 if (t0 && t0 - jiffies > 60 * HZ) {
 401                         printk("RPC: hanging in rpc_closesock.\n");
 402                         t0 = 0;
 403                 }
 404 #endif
 405         }
 406 
 407         kfree(rsock);
 408         return 0;
 409 }

/* [previous][next][first][last][top][bottom][index][help] */