This source file includes following definitions.
- rpc_insque
- rpc_remque
- rpc_sendmsg
- rpc_recvmsg
- rpc_select
- rpc_reserve
- rpc_release
- rpc_cwnd_adjust
- rpc_send_check
- rpc_send
- rpc_transmit
- rpc_grok
- rpc_recv
- rpc_doio
- rpc_call
- rpc_makesock
- rpc_closesock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 #include <linux/types.h>
33 #include <linux/malloc.h>
34 #include <linux/sched.h>
35 #include <linux/nfs_fs.h>
36 #include <linux/errno.h>
37 #include <linux/socket.h>
38 #include <linux/fcntl.h>
39 #include <linux/in.h>
40 #include <linux/net.h>
41 #include <linux/mm.h>
42 #include <linux/rpcsock.h>
43
44 #include <linux/udp.h>
45 #include <net/sock.h>
46
47 #include <asm/segment.h>
48
49 #define msleep(sec) { current->timeout = sec * HZ / 1000; \
50 current->state = TASK_INTERRUPTIBLE; \
51 schedule(); \
52 }
53
54 #undef DEBUG_RPC
55 #ifdef DEBUG_RPC
56 #define dprintk(args...) printk(## args)
57 #else
58 #define dprintk(args...)
59 #endif
60
61
62
63
64
65
66 static inline void
67 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
68 {
69 struct rpc_wait *next = rsock->pending;
70
71 slot->w_next = next;
72 slot->w_prev = NULL;
73 if (next)
74 next->w_prev = slot;
75 rsock->pending = slot;
76 slot->w_queued = 1;
77
78 dprintk("RPC: inserted %p into queue\n", slot);
79 }
80
81
82
83
84 static inline void
85 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
86 {
87 struct rpc_wait *prev = slot->w_prev,
88 *next = slot->w_next;
89
90 if (prev != NULL)
91 prev->w_next = next;
92 else
93 rsock->pending = next;
94 if (next != NULL)
95 next->w_prev = prev;
96
97 slot->w_queued = 0;
98 dprintk("RPC: removed %p from queue, head now %p.\n",
99 slot, rsock->pending);
100 }
101
102
103
104
105 static inline int
106 rpc_sendmsg(struct rpc_sock *rsock, struct iovec *iov, int nr, int len,
107 struct sockaddr *sap, int salen)
108 {
109 struct socket *sock = rsock->sock;
110 struct msghdr msg;
111 unsigned long oldfs;
112 int result;
113
114 msg.msg_iov = iov;
115 msg.msg_iovlen = nr;
116 msg.msg_name = sap;
117 msg.msg_namelen = salen;
118 msg.msg_accrights = NULL;
119
120 oldfs = get_fs();
121 set_fs(get_ds());
122 result = sock->ops->sendmsg(sock, &msg, len, 0, 0);
123 set_fs(oldfs);
124
125 dprintk("RPC: rpc_sendmsg(iov %p, len %d) = %d\n", iov, len, result);
126 return result;
127 }
128
129
130
131 static inline int
132 rpc_recvmsg(struct rpc_sock *rsock, struct iovec *iov,
133 int nr, int len, int flags)
134 {
135 struct socket *sock = rsock->sock;
136 struct sockaddr sa;
137 struct msghdr msg;
138 unsigned long oldfs;
139 int result, alen;
140
141 msg.msg_iov = iov;
142 msg.msg_iovlen = nr;
143 msg.msg_name = &sa;
144 msg.msg_namelen = sizeof(sa);
145 msg.msg_accrights = NULL;
146
147 oldfs = get_fs();
148 set_fs(get_ds());
149 result = sock->ops->recvmsg(sock, &msg, len, 1, flags, &alen);
150 set_fs(oldfs);
151
152 dprintk("RPC: rpc_recvmsg(iov %p, len %d) = %d\n", iov, len, result);
153 return result;
154 }
155
156
157
158
159
160 static inline int
161 rpc_select(struct rpc_sock *rsock)
162 {
163 struct select_table_entry entry;
164 struct file *file = rsock->file;
165 select_table wait_table;
166
167 dprintk("RPC: selecting on socket...\n");
168 wait_table.nr = 0;
169 wait_table.entry = &entry;
170 current->state = TASK_INTERRUPTIBLE;
171 if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
172 && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
173 schedule();
174 remove_wait_queue(entry.wait_address, &entry.wait);
175 current->state = TASK_RUNNING;
176 if (current->signal & ~current->blocked)
177 return -ERESTARTSYS;
178 if (current->timeout == 0)
179 return -ETIMEDOUT;
180 } else if (wait_table.nr)
181 remove_wait_queue(entry.wait_address, &entry.wait);
182 current->state = TASK_RUNNING;
183 dprintk("RPC: ...Okay, there appears to be some data.\n");
184 return 0;
185 }
186
187
188
189
190
191 int
192 rpc_reserve(struct rpc_sock *rsock, struct rpc_ioreq *req, int nocwait)
193 {
194 struct rpc_wait *slot;
195
196 req->rq_slot = NULL;
197
198 while (!(slot = rsock->free) || rsock->cong >= rsock->cwnd) {
199 if (nocwait) {
200 current->timeout = 0;
201 return -ENOBUFS;
202 }
203 dprintk("RPC: rpc_reserve waiting on backlog\n");
204 interruptible_sleep_on(&rsock->backlog);
205 if (current->timeout == 0)
206 return -ETIMEDOUT;
207 if (current->signal & ~current->blocked)
208 return -ERESTARTSYS;
209 if (rsock->shutdown)
210 return -EIO;
211 }
212
213 rsock->free = slot->w_next;
214 rsock->cong += RPC_CWNDSCALE;
215
216 slot->w_queued = 0;
217 slot->w_gotit = 0;
218 slot->w_req = req;
219
220 dprintk("RPC: reserved slot %p\n", slot);
221 req->rq_slot = slot;
222 return 0;
223 }
224
225
226
227
228 void
229 rpc_release(struct rpc_sock *rsock, struct rpc_ioreq *req)
230 {
231 struct rpc_wait *slot = req->rq_slot;
232
233 if (slot != NULL) {
234 dprintk("RPC: release slot %p\n", slot);
235
236
237 if (slot == rsock->pending && slot->w_next != NULL)
238 wake_up(&slot->w_next->w_wait);
239
240
241 if (slot->w_queued)
242 rpc_remque(rsock, slot);
243 slot->w_next = rsock->free;
244 rsock->free = slot;
245
246
247 rsock->cong -= RPC_CWNDSCALE;
248 if (rsock->cong < rsock->cwnd && rsock->backlog)
249 wake_up(&rsock->backlog);
250 if (rsock->shutdown)
251 wake_up(&rsock->shutwait);
252
253 req->rq_slot = NULL;
254 }
255 }
256
257
258
259
260 static void
261 rpc_cwnd_adjust(struct rpc_sock *rsock, int timeout)
262 {
263 unsigned long cwnd = rsock->cwnd;
264
265 if (!timeout) {
266 if (rsock->cong >= cwnd) {
267
268
269 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE +
270 (cwnd >> 1)) / cwnd;
271 if (cwnd > RPC_MAXCWND)
272 cwnd = RPC_MAXCWND;
273 }
274 } else {
275 if ((cwnd >>= 1) < RPC_CWNDSCALE)
276 cwnd = RPC_CWNDSCALE;
277 dprintk("RPC: cwnd decrease %08lx\n", cwnd);
278 }
279 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx\n",
280 rsock->cong, rsock->cwnd, cwnd);
281
282 rsock->cwnd = cwnd;
283 }
284
285 static inline void
286 rpc_send_check(char *where, u32 *ptr)
287 {
288 if (ptr[1] != htonl(RPC_CALL) || ptr[2] != htonl(RPC_VERSION)) {
289 printk("RPC: %s sending evil packet:\n"
290 " %08x %08x %08x %08x %08x %08x %08x %08x\n",
291 where,
292 ptr[0], ptr[1], ptr[2], ptr[3],
293 ptr[4], ptr[5], ptr[6], ptr[7]);
294 }
295 }
296
297
298
299
300
301 static inline int
302 rpc_send(struct rpc_sock *rsock, struct rpc_wait *slot)
303 {
304 struct rpc_ioreq *req = slot->w_req;
305 struct iovec iov[MAX_IOVEC];
306
307 if (rsock->shutdown)
308 return -EIO;
309
310 memcpy(iov, req->rq_svec, req->rq_snr * sizeof(iov[0]));
311 slot->w_xid = *(u32 *)(iov[0].iov_base);
312 if (!slot->w_queued)
313 rpc_insque(rsock, slot);
314
315 dprintk("rpc_send(%p, %x)\n", slot, slot->w_xid);
316 rpc_send_check("rpc_send", (u32 *) req->rq_svec[0].iov_base);
317 return rpc_sendmsg(rsock, iov, req->rq_snr, req->rq_slen,
318 req->rq_addr, req->rq_alen);
319 }
320
321
322
323
324 int
325 rpc_transmit(struct rpc_sock *rsock, struct rpc_ioreq *req)
326 {
327 rpc_send_check("rpc_transmit", (u32 *) req->rq_svec[0].iov_base);
328 return rpc_send(rsock, req->rq_slot);
329 }
330
331
332
333
334 static inline int
335 rpc_grok(struct rpc_sock *rsock)
336 {
337 struct rpc_wait *rovr;
338 struct rpc_ioreq *req;
339 struct iovec iov[MAX_IOVEC];
340 u32 xid;
341 int safe, result;
342
343 iov[0].iov_base = (void *) &xid;
344 iov[0].iov_len = sizeof(xid);
345 result = rpc_recvmsg(rsock, iov, 1, sizeof(xid), MSG_PEEK);
346
347 if (result < 0) {
348 switch (-result) {
349 case EAGAIN: case ECONNREFUSED:
350 return 0;
351 case ERESTARTSYS:
352 return result;
353 default:
354 dprintk("rpc_grok: recv error = %d\n", result);
355 }
356 }
357 if (result < 4) {
358 printk(KERN_WARNING "RPC: impossible RPC reply size %d\n",
359 result);
360 return 0;
361 }
362
363 dprintk("RPC: rpc_grok: got xid %08lx\n", (unsigned long) xid);
364
365
366 safe = 0;
367 for (rovr = rsock->pending; rovr; rovr = rovr->w_next) {
368 if (rovr->w_xid == xid)
369 break;
370 if (safe++ > RPC_MAXREQS) {
371 printk(KERN_WARNING "RPC: loop in request Q!!\n");
372 rovr = NULL;
373 break;
374 }
375 }
376
377 if (!rovr || rovr->w_gotit) {
378
379 dprintk("RPC: rpc_grok: %s.\n",
380 rovr? "duplicate reply" : "bad XID");
381 iov[0].iov_base = (void *) &xid;
382 iov[0].iov_len = sizeof(xid);
383 rpc_recvmsg(rsock, iov, 1, sizeof(xid), 0);
384 return 0;
385 }
386 req = rovr->w_req;
387
388
389
390 memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
391 result = rpc_recvmsg(rsock, iov, req->rq_rnr, req->rq_rlen, 0);
392 rovr->w_result = result;
393 rovr->w_gotit = 1;
394
395
396 wake_up(&rovr->w_wait);
397
398 return result;
399 }
400
401
402
403
404 static int
405 rpc_recv(struct rpc_sock *rsock, struct rpc_wait *slot)
406 {
407 int result;
408
409 do {
410
411 dprintk("RPC: rpc_recv TP1\n");
412 while (rsock->pending != slot) {
413 if (!slot->w_gotit)
414 interruptible_sleep_on(&slot->w_wait);
415 if (slot->w_gotit)
416 return slot->w_result;
417 if (current->signal & ~current->blocked)
418 return -ERESTARTSYS;
419 if (rsock->shutdown)
420 return -EIO;
421 if (current->timeout == 0)
422 return -ETIMEDOUT;
423 }
424
425
426 if ((result = rpc_select(rsock)) < 0) {
427 dprintk("RPC: select error = %d\n", result);
428 return result;
429 }
430
431
432 if ((result = rpc_grok(rsock)) < 0)
433 return result;
434 } while (current->timeout && !slot->w_gotit);
435
436 return slot->w_gotit? slot->w_result : -ETIMEDOUT;
437 }
438
439
440
441
442
443
444
445 int
446 rpc_doio(struct rpc_sock *rsock, struct rpc_ioreq *req,
447 struct rpc_timeout *strategy, int sent)
448 {
449 struct rpc_wait *slot;
450 int result, retries;
451 unsigned long timeout;
452
453 timeout = strategy->to_initval;
454 retries = 0;
455 slot = req->rq_slot;
456
457 do {
458 dprintk("RPC: rpc_doio: TP1 (req %p)\n", req);
459 current->timeout = jiffies + timeout;
460 if (slot == NULL) {
461 result = rpc_reserve(rsock, req, 0);
462 if (result == -ETIMEDOUT)
463 goto timedout;
464 if (result < 0)
465 break;
466 slot = req->rq_slot;
467 rpc_send_check("rpc_doio",
468 (u32 *) req->rq_svec[0].iov_base);
469 rpc_insque(rsock, slot);
470 }
471
472
473
474 if (slot->w_gotit) {
475 result = slot->w_result;
476 break;
477 }
478
479 dprintk("RPC: rpc_doio: TP2\n");
480 if (sent || (result = rpc_send(rsock, slot)) >= 0) {
481 result = rpc_recv(rsock, slot);
482 sent = 0;
483 }
484
485 if (result != -ETIMEDOUT) {
486
487 rpc_cwnd_adjust(rsock, 0);
488 break;
489 }
490
491 rpc_cwnd_adjust(rsock, 1);
492
493 timedout:
494 dprintk("RPC: rpc_recv returned timeout.\n");
495 if (strategy->to_exponential)
496 timeout <<= 1;
497 else
498 timeout += strategy->to_increment;
499 if (strategy->to_maxval && timeout >= strategy->to_maxval)
500 timeout = strategy->to_maxval;
501 if (strategy->to_retries && ++retries >= strategy->to_retries)
502 break;
503 } while (1);
504
505 dprintk("RPC: rpc_doio: TP3\n");
506 current->timeout = 0;
507 return result;
508 }
509
510
511
512 int
513 rpc_call(struct rpc_sock *rsock, struct rpc_ioreq *req,
514 struct rpc_timeout *strategy)
515 {
516 int result;
517
518 result = rpc_doio(rsock, req, strategy, 0);
519 if (req->rq_slot == NULL)
520 printk(KERN_WARNING "RPC: bad: rq_slot == NULL\n");
521 rpc_release(rsock, req);
522 return result;
523 }
524
525 struct rpc_sock *
526 rpc_makesock(struct file *file)
527 {
528 struct rpc_sock *rsock;
529 struct socket *sock;
530 struct sock *sk;
531 struct rpc_wait *slot;
532 int i;
533
534 dprintk("RPC: make RPC socket...\n");
535 sock = &file->f_inode->u.socket_i;
536 if (sock->type != SOCK_DGRAM || sock->ops->family != AF_INET) {
537 printk(KERN_WARNING "RPC: only UDP sockets supported\n");
538 return NULL;
539 }
540 sk = (struct sock *) sock->data;
541
542 if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
543 return NULL;
544 memset(rsock, 0, sizeof(*rsock));
545
546 rsock->sock = sock;
547 rsock->inet = sk;
548 rsock->file = file;
549 rsock->cwnd = RPC_INITCWND;
550
551 dprintk("RPC: slots %p, %p, ...\n", rsock->waiting, rsock->waiting + 1);
552 rsock->free = rsock->waiting;
553 for (i = 0, slot = rsock->waiting; i < RPC_MAXREQS-1; i++, slot++)
554 slot->w_next = slot + 1;
555 slot->w_next = NULL;
556
557 dprintk("RPC: made socket %p\n", rsock);
558 return rsock;
559 }
560
561 int
562 rpc_closesock(struct rpc_sock *rsock)
563 {
564 unsigned long t0 = jiffies;
565
566 rsock->shutdown = 1;
567 while (rsock->pending || rsock->backlog) {
568 interruptible_sleep_on(&rsock->shutwait);
569 if (current->signal & ~current->blocked)
570 return -EINTR;
571 #if 1
572 if (t0 && t0 - jiffies > 60 * HZ) {
573 printk(KERN_WARNING "RPC: hanging in rpc_closesock.\n");
574 t0 = 0;
575 }
576 #endif
577 }
578
579 kfree(rsock);
580 return 0;
581 }