This source file includes following definitions.
- rpc_insque
- rpc_remque
- rpc_sendto
- rpc_select
- rpc_recvfrom
- rpc_call_one
- rpc_call
- rpc_makesock
- rpc_closesock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 #ifdef MODULE
23 #include <linux/module.h>
24 #endif
25
26 #include <linux/types.h>
27 #include <linux/malloc.h>
28 #include <linux/sched.h>
29 #include <linux/nfs_fs.h>
30 #include <linux/errno.h>
31 #include <linux/socket.h>
32 #include <linux/fcntl.h>
33 #include <asm/segment.h>
34 #include <linux/in.h>
35 #include <linux/net.h>
36 #include <linux/mm.h>
37 #include <linux/rpcsock.h>
38
39 #define msleep(sec) { current->timeout = sec * HZ / 1000; \
40 current->state = TAKS_INTERRUPTIBLE; \
41 schedule(); \
42 }
43 #define dprintk if (0) printk
44
45 static inline void
46 rpc_insque(struct rpc_sock *rsock, struct rpc_wait *slot)
47 {
48 struct rpc_wait *tmp;
49
50 if ((tmp = rsock->tail) != NULL) {
51 tmp->next = slot;
52 } else {
53 rsock->head = slot;
54 }
55 rsock->tail = slot;
56 slot->prev = tmp;
57 slot->next = NULL;
58 dprintk("RPC: inserted %08lx into queue.\n", (long)slot);
59 dprintk("RPC: head = %08lx, tail = %08lx.\n",
60 (long) rsock->head, (long) rsock->tail);
61 }
62
63 static inline void
64 rpc_remque(struct rpc_sock *rsock, struct rpc_wait *slot)
65 {
66 struct rpc_wait *prev = slot->prev,
67 *next = slot->next;
68
69 if (prev != NULL)
70 prev->next = next;
71 else
72 rsock->head = next;
73 if (next != NULL)
74 next->prev = prev;
75 else
76 rsock->tail = prev;
77 dprintk("RPC: removed %08lx from queue.\n", (long)slot);
78 dprintk("RPC: head = %08lx, tail = %08lx.\n",
79 (long) rsock->head, (long) rsock->tail);
80 }
81
82 static inline int
83 rpc_sendto(struct rpc_sock *rsock, const int *buf, int len,
84 struct sockaddr *sap, int salen)
85 {
86 struct socket *sock = rsock->sock;
87 unsigned long oldfs;
88 int result;
89
90 dprintk("RPC: sending %d bytes (buf %08lx)\n", len, (long) buf);
91 oldfs = get_fs();
92 set_fs(get_ds());
93 result = sock->ops->sendto(sock, buf, len, 0, 0, sap, salen);
94 set_fs(oldfs);
95 dprintk("RPC: result = %d\n", result);
96
97 return result;
98 }
99
100
101
102
103
104 static inline int
105 rpc_select(struct rpc_sock *rsock)
106 {
107 struct select_table_entry entry;
108 struct file *file = rsock->file;
109 select_table wait_table;
110
111 dprintk("RPC: selecting on socket...\n");
112 wait_table.nr = 0;
113 wait_table.entry = &entry;
114 current->state = TASK_INTERRUPTIBLE;
115 if (!file->f_op->select(file->f_inode, file, SEL_IN, &wait_table)
116 && !file->f_op->select(file->f_inode, file, SEL_IN, NULL)) {
117 schedule();
118 remove_wait_queue(entry.wait_address, &entry.wait);
119 current->state = TASK_RUNNING;
120 if (current->signal & ~current->blocked)
121 return -ERESTARTSYS;
122 if (current->timeout == 0)
123 return -ETIMEDOUT;
124 } else if (wait_table.nr)
125 remove_wait_queue(entry.wait_address, &entry.wait);
126 current->state = TASK_RUNNING;
127 dprintk("RPC: ...Okay, there appears to be some data.\n");
128 return 0;
129 }
130
131 static inline int
132 rpc_recvfrom(struct rpc_sock *rsock, int *buf, int len,
133 struct sockaddr *sap, int salen, int flags)
134 {
135 struct socket *sock = rsock->sock;
136 struct sockaddr sa;
137 int alen = sizeof(sa);
138 unsigned long oldfs;
139 int result;
140
141 dprintk("RPC: receiving %d bytes max (buf %08lx)\n", len, (long) buf);
142 oldfs = get_fs();
143 set_fs(get_ds());
144 result = sock->ops->recvfrom(sock, buf, len, 1, flags, &sa, &alen);
145 set_fs(oldfs);
146 dprintk("RPC: result = %d\n", result);
147
148 #if 0
149 if (alen != salen || memcmp(&sa, sap, alen)) {
150 dprintk("RPC: reply address mismatch... rejected.\n");
151 result = -EAGAIN;
152 }
153 #endif
154
155 return result;
156 }
157
158
159
160
161 static int
162 rpc_call_one(struct rpc_sock *rsock, struct rpc_wait *slot,
163 struct sockaddr *sap, int salen,
164 const int *sndbuf, int slen, int *rcvbuf, int rlen)
165 {
166 struct rpc_wait *rovr = NULL;
167 int result;
168 u32 xid;
169 int safe;
170
171 dprintk("RPC: placing one call, rsock = %08lx, slot = %08lx, "
172 "sap = %08lx, salen = %d, "
173 "sndbuf = %08lx, slen = %d, rcvbuf = %08lx, rlen = %d\n",
174 (long) rsock, (long) slot, (long) sap,
175 salen, (long) sndbuf, slen, (long) rcvbuf, rlen);
176
177 result = rpc_sendto(rsock, sndbuf, slen, sap, salen);
178 if (result < 0)
179 return result;
180
181 do {
182
183 if (rsock->head != slot) {
184 slot->wait = NULL;
185 interruptible_sleep_on(&slot->wait);
186 if (slot->gotit)
187 break;
188 if (current->timeout != 0)
189 continue;
190 if (rsock->shutdown) {
191 printk("RPC: aborting call due to shutdown.\n");
192 return -EIO;
193 }
194 return -ETIMEDOUT;
195 }
196
197
198 result = rpc_select(rsock);
199 if (result < 0) {
200 dprintk("RPC: select error = %d\n", result);
201 break;
202 }
203
204 result = rpc_recvfrom(rsock, (int *)&xid, sizeof(xid),
205 sap, salen, MSG_PEEK);
206 if (result < 0) {
207 switch (-result) {
208 case EAGAIN: case ECONNREFUSED:
209 continue;
210 default:
211 dprintk("rpc_call: recv error = %d\n", result);
212 case ERESTARTSYS:
213 return result;
214 }
215 }
216
217
218 safe = 0;
219 for (rovr = rsock->head; rovr; rovr = rovr->next) {
220 if (safe++ > NRREQS) {
221 printk("RPC: loop in request Q!!\n");
222 rovr = NULL;
223 break;
224 }
225 if (rovr->xid == xid)
226 break;
227 }
228
229 if (!rovr || rovr->gotit) {
230
231 dprintk("RPC: bad XID or duplicate reply.\n");
232 rpc_recvfrom(rsock, (int *)&xid, sizeof(xid),
233 sap, salen, 0);
234 continue;
235 }
236 rovr->gotit = 1;
237
238
239 result = rpc_recvfrom(rsock, rovr->buf, rovr->len,
240 sap, salen, 0);
241
242
243 if (rovr != slot)
244 wake_up(&rovr->wait);
245 } while (rovr != slot);
246
247
248
249
250
251
252 if (rsock->head == slot && slot->next != NULL)
253 wake_up(&slot->next->wait);
254
255 return result;
256 }
257
258
259
260
261 int
262 rpc_call(struct rpc_sock *rsock, struct sockaddr *sap, int addrlen,
263 const int *sndbuf, int slen, int *rcvbuf, int rlen,
264 struct rpc_timeout *strategy, int flag)
265 {
266 struct rpc_wait *slot;
267 int result, retries;
268 unsigned long timeout;
269
270 timeout = strategy->init_timeout;
271 retries = 0;
272 slot = NULL;
273
274 do {
275 dprintk("RPC call TP1\n");
276 current->timeout = jiffies + timeout;
277 if (slot == NULL) {
278 while ((slot = rsock->free) == NULL) {
279 if (!flag) {
280 current->timeout = 0;
281 return -ENOBUFS;
282 }
283 interruptible_sleep_on(&rsock->backlog);
284 if (current->timeout == 0) {
285 result = -ETIMEDOUT;
286 goto timedout;
287 }
288 if (rsock->shutdown) {
289 printk("RPC: aborting call due to shutdown.\n");
290 current->timeout = 0;
291 return -EIO;
292 }
293 }
294 dprintk("RPC call TP2\n");
295 slot->gotit = 0;
296 slot->xid = *(u32 *)sndbuf;
297 slot->buf = rcvbuf;
298 slot->len = rlen;
299 rsock->free = slot->next;
300 rpc_insque(rsock, slot);
301 }
302
303 dprintk("RPC call TP3\n");
304 result = rpc_call_one(rsock, slot, sap, addrlen,
305 sndbuf, slen, rcvbuf, rlen);
306 if (result != -ETIMEDOUT)
307 break;
308
309 timedout:
310 dprintk("RPC call TP4\n");
311 dprintk("RPC: rpc_call_one returned timeout.\n");
312 if (strategy->exponential)
313 timeout <<= 1;
314 else
315 timeout += strategy->increment;
316 if (strategy->max_timeout && timeout >= strategy->max_timeout)
317 timeout = strategy->max_timeout;
318 if (strategy->retries && ++retries >= strategy->retries)
319 break;
320 } while (1);
321
322 dprintk("RPC call TP5\n");
323 current->timeout = 0;
324 if (slot != NULL) {
325 dprintk("RPC call TP6\n");
326 rpc_remque(rsock, slot);
327 slot->next = rsock->free;
328 rsock->free = slot;
329
330
331
332 if (rsock->backlog)
333 wake_up(&rsock->backlog);
334 }
335
336 if (rsock->shutdown)
337 wake_up(&rsock->shutwait);
338
339 return result;
340 }
341
342 struct rpc_sock *
343 rpc_makesock(struct file *file)
344 {
345 struct rpc_sock *rsock;
346 struct rpc_wait *slot;
347 int i;
348
349 dprintk("RPC: make RPC socket...\n");
350 if ((rsock = kmalloc(sizeof(struct rpc_sock), GFP_KERNEL)) == NULL)
351 return NULL;
352
353 rsock->sock = &file->f_inode->u.socket_i;
354 rsock->file = file;
355
356 rsock->free = rsock->waiting;
357 for (i = 0, slot = rsock->waiting; i < NRREQS-1; i++, slot++)
358 slot->next = slot + 1;
359 slot->next = NULL;
360
361 rsock->backlog = NULL;
362 rsock->head = rsock->tail = NULL;
363
364 rsock->shutwait = NULL;
365 rsock->shutdown = 0;
366
367 dprintk("RPC: made socket %08lx", (long) rsock);
368 return rsock;
369 }
370
371 int
372 rpc_closesock(struct rpc_sock *rsock)
373 {
374 unsigned long t0 = jiffies;
375
376 rsock->shutdown = 1;
377 while (rsock->head) {
378 interruptible_sleep_on(&rsock->shutwait);
379 if (current->signal & ~current->blocked)
380 return -EINTR;
381 #if 1
382 if (t0 && t0 - jiffies > 60 * HZ) {
383 printk("RPC: hanging in rpc_closesock.\n");
384 t0 = 0;
385 }
386 #endif
387 }
388
389 kfree(rsock);
390 return 0;
391 }