This source file includes following definitions.
- rpc_insque
- rpc_remque
- rpc_sendmsg
- rpc_recvmsg
- rpc_select
- rpc_reserve
- rpc_release
- rpc_cwnd_adjust
- rpc_send_check
- rpc_send
- rpc_transmit
- rpc_grok
- rpc_recv
- rpc_doio
- rpc_call
- rpc_makesock
- rpc_closesock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 #include <linux/types.h>
33 #include <linux/malloc.h>
34 #include <linux/sched.h>
35 #include <linux/nfs_fs.h>
36 #include <linux/errno.h>
37 #include <linux/socket.h>
38 #include <linux/fcntl.h>
39 #include <linux/in.h>
40 #include <linux/net.h>
41 #include <linux/mm.h>
42 #include <linux/rpcsock.h>
43
44 #include <linux/udp.h>
45 #include <net/sock.h>
46
47 #include <asm/segment.h>
48
49 #define msleep(sec) { current->timeout = sec * HZ / 1000; \
50 current->state = TASK_INTERRUPTIBLE; \
51 schedule(); \
52 }
53
54 #undef DEBUG_RPC
55 #ifdef DEBUG_RPC
56 #define dprintk(args...) printk(## args)
57 #else
58 #define dprintk(args...)
59 #endif
60
61
62
63
64
65
66 static inline void
67 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
68 {
69 struct rpc_wait *next = rsock->pending;
70
71 slot->w_next = next;
72 slot->w_prev = NULL;
73 if (next)
74 next->w_prev = slot;
75 rsock->pending = slot;
76 slot->w_queued = 1;
77
78 dprintk("RPC: inserted %p into queue\n", slot);
79 }
80
81
82
83
84 static inline void
85 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
86 {
87 struct rpc_wait *prev = slot->w_prev,
88 *next = slot->w_next;
89
90 if (prev != NULL)
91 prev->w_next = next;
92 else
93 rsock->pending = next;
94 if (next != NULL)
95 next->w_prev = prev;
96
97 slot->w_queued = 0;
98 dprintk("RPC: removed %p from queue, head now %p.\n",
99 slot, rsock->pending);
100 }
101
102
103
104
105 static inline int
106 rpc_sendmsg(struct rpc_sock *rsock, struct iovec *iov, int nr, int len,
107 struct sockaddr *sap, int salen)
108 {
109 struct socket *sock = rsock->sock;
110 struct msghdr msg;
111 unsigned long oldfs;
112 int result;
113
114 msg.msg_iov = iov;
115 msg.msg_iovlen = nr;
116 msg.msg_name = sap;
117 msg.msg_namelen = salen;
118 msg.msg_accrights = NULL;
119
120 oldfs = get_fs();
121 set_fs(get_ds());
122 result = sock->ops->sendmsg(sock, &msg, len, 0, 0);
123 set_fs(oldfs);
124
125 dprintk("RPC: rpc_sendmsg(iov %p, len %d) = %d\n", iov, len, result);
126 return result;
127 }
128
129
130
131 static inline int
132 rpc_recvmsg(struct rpc_sock *rsock, struct iovec *iov,
133 int nr, int len, int flags)
134 {
135 struct socket *sock = rsock->sock;
136 struct sockaddr sa;
137 struct msghdr msg;
138 unsigned long oldfs;
139 int result, alen;
140
141 msg.msg_iov = iov;
142 msg.msg_iovlen = nr;
143 msg.msg_name = &sa;
144 msg.msg_namelen = sizeof(sa);
145 msg.msg_accrights = NULL;
146
147 oldfs = get_fs();
148 set_fs(get_ds());
149 result = sock->ops->recvmsg(sock, &msg, len, 1, flags, &alen);
150 set_fs(oldfs);
151
152 dprintk("RPC: rpc_recvmsg(iov %p, len %d) = %d\n", iov, len, result);
153 return result;
154 }
155
156
157
158
159
160 static inline int
161 rpc_select(struct rpc_sock *rsock)
162 {
163 struct select_table_entry entry;
164 struct file *file = rsock->file;
165 select_table wait_table;
166
167 dprintk("RPC: selecting on socket...\n");
168 wait_table.nr = 0;
169 wait_table.entry = &entry;
170 current->state = TASK_INTERRUPTIBLE;
171 if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
172 && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
173 schedule();
174 remove_wait_queue(entry.wait_address, &entry.wait);
175 current->state = TASK_RUNNING;
176 if (current->signal & ~current->blocked)
177 return -ERESTARTSYS;
178 if (current->timeout == 0)
179 return -ETIMEDOUT;
180 } else if (wait_table.nr)
181 remove_wait_queue(entry.wait_address, &entry.wait);
182 current->state = TASK_RUNNING;
183 dprintk("RPC: ...Okay, there appears to be some data.\n");
184 return 0;
185 }
186
187
188
189
190
191 int
192 rpc_reserve(struct rpc_sock *rsock, struct rpc_ioreq *req, int nocwait)
193 {
194 struct rpc_wait *slot;
195
196 req->rq_slot = NULL;
197
198 while (!(slot = rsock->free) || rsock->cong >= rsock->cwnd) {
199 if (nocwait) {
200 current->timeout = 0;
201 return -ENOBUFS;
202 }
203 dprintk("RPC: rpc_reserve waiting on backlog\n");
204 interruptible_sleep_on(&rsock->backlog);
205 if (current->timeout == 0)
206 return -ETIMEDOUT;
207 if (current->signal & ~current->blocked)
208 return -ERESTARTSYS;
209 if (rsock->shutdown)
210 return -EIO;
211 }
212
213 rsock->free = slot->w_next;
214 rsock->cong += RPC_CWNDSCALE;
215
216 slot->w_queued = 0;
217 slot->w_gotit = 0;
218 slot->w_req = req;
219
220 dprintk("RPC: reserved slot %p\n", slot);
221 req->rq_slot = slot;
222 return 0;
223 }
224
225
226
227
228 void
229 rpc_release(struct rpc_sock *rsock, struct rpc_ioreq *req)
230 {
231 struct rpc_wait *slot = req->rq_slot;
232
233 if (slot != NULL) {
234 dprintk("RPC: release slot %p\n", slot);
235
236
237 if (slot == rsock->pending && slot->w_next != NULL)
238 wake_up(&slot->w_next->w_wait);
239
240
241 if (slot->w_queued)
242 rpc_remque(rsock, slot);
243 slot->w_next = rsock->free;
244 rsock->free = slot;
245
246
247 rsock->cong -= RPC_CWNDSCALE;
248 if (rsock->cong < rsock->cwnd && rsock->backlog)
249 wake_up(&rsock->backlog);
250 if (rsock->shutdown)
251 wake_up(&rsock->shutwait);
252
253 req->rq_slot = NULL;
254 }
255 }
256
257
258
259
260 static void
261 rpc_cwnd_adjust(struct rpc_sock *rsock, int timeout)
262 {
263 unsigned long cwnd = rsock->cwnd;
264
265 if (!timeout) {
266 if (rsock->cong >= cwnd) {
267
268
269 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE +
270 (cwnd >> 1)) / cwnd;
271 if (cwnd > RPC_MAXCWND)
272 cwnd = RPC_MAXCWND;
273 }
274 } else {
275 if ((cwnd >>= 1) < RPC_CWNDSCALE)
276 cwnd = RPC_CWNDSCALE;
277 dprintk("RPC: cwnd decrease %08lx\n", cwnd);
278 }
279 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx\n",
280 rsock->cong, rsock->cwnd, cwnd);
281
282 rsock->cwnd = cwnd;
283 }
284
285 static inline void
286 rpc_send_check(char *where, u32 *ptr)
287 {
288 if (ptr[1] != 0 || ptr[2] != htonl(2) || ptr[3] != htonl(100003)) {
289 printk("RPC: %s sending evil packet:\n"
290 " %08x %08x %08x %08x %08x %08x %08x %08x\n",
291 where,
292 ptr[0], ptr[1], ptr[2], ptr[3],
293 ptr[4], ptr[5], ptr[6], ptr[7]);
294 }
295 }
296
297
298
299
300
301 static inline int
302 rpc_send(struct rpc_sock *rsock, struct rpc_wait *slot)
303 {
304 struct rpc_ioreq *req = slot->w_req;
305 struct iovec iov[MAX_IOVEC];
306
307 if (rsock->shutdown)
308 return -EIO;
309
310 memcpy(iov, req->rq_svec, req->rq_snr * sizeof(iov[0]));
311 slot->w_xid = *(u32 *)(iov[0].iov_base);
312 if (!slot->w_queued)
313 rpc_insque(rsock, slot);
314
315 dprintk("rpc_send(%p, %x)\n", slot, slot->w_xid);
316 rpc_send_check("rpc_send", (u32 *) req->rq_svec[0].iov_base);
317 return rpc_sendmsg(rsock, iov, req->rq_snr, req->rq_slen,
318 req->rq_addr, req->rq_alen);
319 }
320
321
322
323
324 int
325 rpc_transmit(struct rpc_sock *rsock, struct rpc_ioreq *req)
326 {
327 rpc_send_check("rpc_transmit", (u32 *) req->rq_svec[0].iov_base);
328 return rpc_send(rsock, req->rq_slot);
329 }
330
331
332
333
334 static inline int
335 rpc_grok(struct rpc_sock *rsock)
336 {
337 struct rpc_wait *rovr;
338 struct rpc_ioreq *req;
339 struct iovec iov[MAX_IOVEC];
340 u32 xid;
341 int safe, result;
342
343 iov[0].iov_base = (void *) &xid;
344 iov[0].iov_len = sizeof(xid);
345 result = rpc_recvmsg(rsock, iov, 1, sizeof(xid), MSG_PEEK);
346
347 if (result < 0) {
348 switch (-result) {
349 case EAGAIN: case ECONNREFUSED:
350 return 0;
351 case ERESTARTSYS:
352 return result;
353 default:
354 dprintk("rpc_grok: recv error = %d\n", result);
355 }
356 }
357 if (result < 4) {
358 printk(KERN_WARNING "RPC: impossible RPC reply size %d\n",
359 result);
360 return 0;
361 }
362
363 dprintk("RPC: rpc_grok: got xid %08lx\n", (unsigned long) xid);
364
365
366 safe = 0;
367 for (rovr = rsock->pending; rovr; rovr = rovr->w_next) {
368 if (rovr->w_xid == xid)
369 break;
370 if (safe++ > RPC_MAXREQS) {
371 printk(KERN_WARNING "RPC: loop in request Q!!\n");
372 rovr = NULL;
373 break;
374 }
375 }
376
377 if (!rovr || rovr->w_gotit) {
378
379 dprintk("RPC: rpc_grok: %s.\n",
380 rovr? "duplicate reply" : "bad XID");
381 iov[0].iov_base = (void *) &xid;
382 iov[0].iov_len = sizeof(xid);
383 rpc_recvmsg(rsock, iov, 1, sizeof(xid), 0);
384 return 0;
385 }
386 req = rovr->w_req;
387
388
389
390 memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
391 result = rpc_recvmsg(rsock, iov, req->rq_rnr, req->rq_rlen, 0);
392 rovr->w_result = result;
393 rovr->w_gotit = 1;
394
395
396 wake_up(&rovr->w_wait);
397
398 return result;
399 }
400
401
402
403
404 static int
405 rpc_recv(struct rpc_sock *rsock, struct rpc_wait *slot)
406 {
407 int result;
408
409 do {
410
411 dprintk("RPC: rpc_recv TP1\n");
412 while (rsock->pending != slot) {
413 if (!slot->w_gotit)
414 interruptible_sleep_on(&slot->w_wait);
415 if (slot->w_gotit) {
416 result = slot->w_result;
417 return result;
418 }
419 if (current->signal & ~current->blocked)
420 return -ERESTARTSYS;
421 if (rsock->shutdown)
422 return -EIO;
423 if (current->timeout == 0)
424 return -ETIMEDOUT;
425 }
426
427
428 if ((result = rpc_select(rsock)) < 0) {
429 dprintk("RPC: select error = %d\n", result);
430 break;
431 }
432
433
434 if ((result = rpc_grok(rsock)) < 0)
435 break;
436 } while (current->timeout && !slot->w_gotit);
437
438 return slot->w_gotit? result : -ETIMEDOUT;
439 }
440
441
442
443
444
445
446
447 int
448 rpc_doio(struct rpc_sock *rsock, struct rpc_ioreq *req,
449 struct rpc_timeout *strategy, int sent)
450 {
451 struct rpc_wait *slot;
452 int result, retries;
453 unsigned long timeout;
454
455 timeout = strategy->to_initval;
456 retries = 0;
457 slot = req->rq_slot;
458
459 do {
460 dprintk("RPC: rpc_doio: TP1 (req %p)\n", req);
461 current->timeout = jiffies + timeout;
462 if (slot == NULL) {
463 result = rpc_reserve(rsock, req, 0);
464 if (result == -ETIMEDOUT)
465 goto timedout;
466 if (result < 0)
467 break;
468 slot = req->rq_slot;
469 rpc_send_check("rpc_doio",
470 (u32 *) req->rq_svec[0].iov_base);
471 rpc_insque(rsock, slot);
472 }
473
474
475
476 if (slot->w_gotit) {
477 result = slot->w_result;
478 break;
479 }
480
481 dprintk("RPC: rpc_doio: TP2\n");
482 if (sent || (result = rpc_send(rsock, slot)) >= 0) {
483 result = rpc_recv(rsock, slot);
484 sent = 0;
485 }
486
487 if (result != -ETIMEDOUT) {
488
489 rpc_cwnd_adjust(rsock, 0);
490 break;
491 }
492
493 rpc_cwnd_adjust(rsock, 1);
494
495 timedout:
496 dprintk("RPC: rpc_recv returned timeout.\n");
497 if (strategy->to_exponential)
498 timeout <<= 1;
499 else
500 timeout += strategy->to_increment;
501 if (strategy->to_maxval && timeout >= strategy->to_maxval)
502 timeout = strategy->to_maxval;
503 if (strategy->to_retries && ++retries >= strategy->to_retries)
504 break;
505 } while (1);
506
507 dprintk("RPC: rpc_doio: TP3\n");
508 current->timeout = 0;
509 return result;
510 }
511
512
513
514 int
515 rpc_call(struct rpc_sock *rsock, struct rpc_ioreq *req,
516 struct rpc_timeout *strategy)
517 {
518 int result;
519
520 result = rpc_doio(rsock, req, strategy, 0);
521 if (req->rq_slot == NULL)
522 printk(KERN_WARNING "RPC: bad: rq_slot == NULL\n");
523 rpc_release(rsock, req);
524 return result;
525 }
526
527 struct rpc_sock *
528 rpc_makesock(struct file *file)
529 {
530 struct rpc_sock *rsock;
531 struct socket *sock;
532 struct sock *sk;
533 struct rpc_wait *slot;
534 int i;
535
536 dprintk("RPC: make RPC socket...\n");
537 sock = &file->f_inode->u.socket_i;
538 if (sock->type != SOCK_DGRAM || sock->ops->family != AF_INET) {
539 printk(KERN_WARNING "RPC: only UDP sockets supported\n");
540 return NULL;
541 }
542 sk = (struct sock *) sock->data;
543
544 if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
545 return NULL;
546 memset(rsock, 0, sizeof(*rsock));
547
548 rsock->sock = sock;
549 rsock->inet = sk;
550 rsock->file = file;
551 rsock->cwnd = RPC_INITCWND;
552
553 dprintk("RPC: slots %p, %p, ...\n", rsock->waiting, rsock->waiting + 1);
554 rsock->free = rsock->waiting;
555 for (i = 0, slot = rsock->waiting; i < RPC_MAXREQS-1; i++, slot++)
556 slot->w_next = slot + 1;
557 slot->w_next = NULL;
558
559 dprintk("RPC: made socket %p\n", rsock);
560 return rsock;
561 }
562
563 int
564 rpc_closesock(struct rpc_sock *rsock)
565 {
566 unsigned long t0 = jiffies;
567
568 rsock->shutdown = 1;
569 while (rsock->pending || rsock->backlog) {
570 interruptible_sleep_on(&rsock->shutwait);
571 if (current->signal & ~current->blocked)
572 return -EINTR;
573 #if 1
574 if (t0 && t0 - jiffies > 60 * HZ) {
575 printk(KERN_WARNING "RPC: hanging in rpc_closesock.\n");
576 t0 = 0;
577 }
578 #endif
579 }
580
581 kfree(rsock);
582 return 0;
583 }