]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
mutable slots: finish up basic coding on server-side containers, add some tests....
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
8 import itertools
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil
11 from allmydata.storage import BucketWriter, BucketReader, \
12      WriteBucketProxy, ReadBucketProxy, StorageServer
13 from allmydata.interfaces import BadWriteEnablerError
14
15 class Bucket(unittest.TestCase):
16     def make_workdir(self, name):
17         basedir = os.path.join("storage", "Bucket", name)
18         incoming = os.path.join(basedir, "tmp", "bucket")
19         final = os.path.join(basedir, "bucket")
20         fileutil.make_dirs(basedir)
21         fileutil.make_dirs(os.path.join(basedir, "tmp"))
22         return incoming, final
23
24     def bucket_writer_closed(self, bw, consumed):
25         pass
26
27     def make_lease(self):
28         owner_num = 0
29         renew_secret = os.urandom(32)
30         cancel_secret = os.urandom(32)
31         expiration_time = time.time() + 5000
32         return (owner_num, renew_secret, cancel_secret, expiration_time)
33
34     def test_create(self):
35         incoming, final = self.make_workdir("test_create")
36         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
37         bw.remote_write(0, "a"*25)
38         bw.remote_write(25, "b"*25)
39         bw.remote_write(50, "c"*25)
40         bw.remote_write(75, "d"*7)
41         bw.remote_close()
42
43     def test_readwrite(self):
44         incoming, final = self.make_workdir("test_readwrite")
45         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
46         bw.remote_write(0, "a"*25)
47         bw.remote_write(25, "b"*25)
48         bw.remote_write(50, "c"*7) # last block may be short
49         bw.remote_close()
50
51         # now read from it
52         br = BucketReader(final)
53         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
54         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
55         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
56
57 class RemoteBucket:
58
59     def callRemote(self, methname, *args, **kwargs):
60         def _call():
61             meth = getattr(self.target, "remote_" + methname)
62             return meth(*args, **kwargs)
63         return defer.maybeDeferred(_call)
64
65 class BucketProxy(unittest.TestCase):
66     def make_bucket(self, name, size):
67         basedir = os.path.join("storage", "BucketProxy", name)
68         incoming = os.path.join(basedir, "tmp", "bucket")
69         final = os.path.join(basedir, "bucket")
70         fileutil.make_dirs(basedir)
71         fileutil.make_dirs(os.path.join(basedir, "tmp"))
72         bw = BucketWriter(self, incoming, final, size, self.make_lease())
73         rb = RemoteBucket()
74         rb.target = bw
75         return bw, rb, final
76
77     def make_lease(self):
78         owner_num = 0
79         renew_secret = os.urandom(32)
80         cancel_secret = os.urandom(32)
81         expiration_time = time.time() + 5000
82         return (owner_num, renew_secret, cancel_secret, expiration_time)
83
84     def bucket_writer_closed(self, bw, consumed):
85         pass
86
87     def test_create(self):
88         bw, rb, final = self.make_bucket("test_create", 500)
89         bp = WriteBucketProxy(rb,
90                               data_size=300,
91                               segment_size=10,
92                               num_segments=5,
93                               num_share_hashes=3,
94                               uri_extension_size=500)
95         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
96
97     def test_readwrite(self):
98         # Let's pretend each share has 100 bytes of data, and that there are
99         # 4 segments (25 bytes each), and 8 shares total. So the three
100         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
101         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
102         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
103         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
104         # long. That should make the whole share:
105         #
106         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
107
108         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
109                             for i in range(7)]
110         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
111                             for i in range(7)]
112         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
113                         for i in range(7)]
114         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
115                         for i in (1,9,13)]
116         uri_extension = "s" + "E"*498 + "e"
117
118         bw, rb, final = self.make_bucket("test_readwrite", 1414)
119         bp = WriteBucketProxy(rb,
120                               data_size=95,
121                               segment_size=25,
122                               num_segments=4,
123                               num_share_hashes=3,
124                               uri_extension_size=len(uri_extension))
125
126         d = bp.start()
127         d.addCallback(lambda res: bp.put_block(0, "a"*25))
128         d.addCallback(lambda res: bp.put_block(1, "b"*25))
129         d.addCallback(lambda res: bp.put_block(2, "c"*25))
130         d.addCallback(lambda res: bp.put_block(3, "d"*20))
131         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
132         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
133         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
134         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
135         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
136         d.addCallback(lambda res: bp.close())
137
138         # now read everything back
139         def _start_reading(res):
140             br = BucketReader(final)
141             rb = RemoteBucket()
142             rb.target = br
143             rbp = ReadBucketProxy(rb)
144             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
145
146             d1 = rbp.startIfNecessary()
147             d1.addCallback(lambda res: rbp.get_block(0))
148             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
149             d1.addCallback(lambda res: rbp.get_block(1))
150             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
151             d1.addCallback(lambda res: rbp.get_block(2))
152             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
153             d1.addCallback(lambda res: rbp.get_block(3))
154             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
155
156             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
157             d1.addCallback(lambda res:
158                            self.failUnlessEqual(res, plaintext_hashes))
159             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
160             d1.addCallback(lambda res:
161                            self.failUnlessEqual(res, crypttext_hashes))
162             d1.addCallback(lambda res: rbp.get_block_hashes())
163             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
164             d1.addCallback(lambda res: rbp.get_share_hashes())
165             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
166             d1.addCallback(lambda res: rbp.get_uri_extension())
167             d1.addCallback(lambda res:
168                            self.failUnlessEqual(res, uri_extension))
169
170             return d1
171
172         d.addCallback(_start_reading)
173
174         return d
175
176
177
178 class Server(unittest.TestCase):
179
180     def setUp(self):
181         self.sparent = service.MultiService()
182         self._secret = itertools.count()
183     def tearDown(self):
184         return self.sparent.stopService()
185
186     def workdir(self, name):
187         basedir = os.path.join("storage", "Server", name)
188         return basedir
189
190     def create(self, name, sizelimit=None):
191         workdir = self.workdir(name)
192         ss = StorageServer(workdir, sizelimit)
193         ss.setServiceParent(self.sparent)
194         return ss
195
196     def test_create(self):
197         ss = self.create("test_create")
198
199     def allocate(self, ss, storage_index, sharenums, size):
200         renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
201         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
202         return ss.remote_allocate_buckets(storage_index,
203                                           renew_secret, cancel_secret,
204                                           sharenums, size, Referenceable())
205
206     def test_remove_incoming(self):
207         ss = self.create("test_remove_incoming")
208         already, writers = self.allocate(ss, "vid", range(3), 10)
209         for i,wb in writers.items():
210             wb.remote_write(0, "%10d" % i)
211             wb.remote_close()
212         incomingdir = os.path.join(self.workdir("test_remove_incoming"),
213                                    "shares", "incoming")
214         leftover_dirs = os.listdir(incomingdir)
215         self.failUnlessEqual(leftover_dirs, [])
216
217     def test_allocate(self):
218         ss = self.create("test_allocate")
219
220         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
221
222         canary = Referenceable()
223         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
224         self.failUnlessEqual(already, set())
225         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
226
227         # while the buckets are open, they should not count as readable
228         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
229
230         for i,wb in writers.items():
231             wb.remote_write(0, "%25d" % i)
232             wb.remote_close()
233
234         # now they should be readable
235         b = ss.remote_get_buckets("vid")
236         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
237         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
238
239         # now if we about writing again, the server should offer those three
240         # buckets as already present. It should offer them even if we don't
241         # ask about those specific ones.
242         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
243         self.failUnlessEqual(already, set([0,1,2]))
244         self.failUnlessEqual(set(writers.keys()), set([3,4]))
245
246         # while those two buckets are open for writing, the server should
247         # tell new uploaders that they already exist (so that we don't try to
248         # upload into them a second time)
249
250         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
251         self.failUnlessEqual(already, set([0,1,2,3,4]))
252         self.failUnlessEqual(set(writers.keys()), set([5]))
253
254     def test_sizelimits(self):
255         ss = self.create("test_sizelimits", 5000)
256         canary = Referenceable()
257         # a newly created and filled share incurs this much overhead, beyond
258         # the size we request.
259         OVERHEAD = 3*4
260         LEASE_SIZE = 4+32+32+4
261
262         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
263         self.failUnlessEqual(len(writers), 3)
264         # now the StorageServer should have 3000 bytes provisionally
265         # allocated, allowing only 2000 more to be claimed
266         self.failUnlessEqual(len(ss._active_writers), 3)
267
268         # allocating 1001-byte shares only leaves room for one
269         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
270         self.failUnlessEqual(len(writers2), 1)
271         self.failUnlessEqual(len(ss._active_writers), 4)
272
273         # we abandon the first set, so their provisional allocation should be
274         # returned
275         del already
276         del writers
277         self.failUnlessEqual(len(ss._active_writers), 1)
278         # now we have a provisional allocation of 1001 bytes
279
280         # and we close the second set, so their provisional allocation should
281         # become real, long-term allocation, and grows to include the
282         # overhead.
283         for bw in writers2.values():
284             bw.remote_write(0, "a"*25)
285             bw.remote_close()
286         del already2
287         del writers2
288         del bw
289         self.failUnlessEqual(len(ss._active_writers), 0)
290
291         allocated = 1001 + OVERHEAD + LEASE_SIZE
292         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
293         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
294         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
295         self.failUnlessEqual(len(writers3), 39)
296         self.failUnlessEqual(len(ss._active_writers), 39)
297
298         del already3
299         del writers3
300         self.failUnlessEqual(len(ss._active_writers), 0)
301         ss.disownServiceParent()
302         del ss
303
304         # creating a new StorageServer in the same directory should see the
305         # same usage.
306
307         # metadata that goes into the share file is counted upon share close,
308         # as well as at startup. metadata that goes into other files will not
309         # be counted until the next startup, so if we were creating any
310         # extra-file metadata, the allocation would be more than 'allocated'
311         # and this test would need to be changed.
312         ss = self.create("test_sizelimits", 5000)
313         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
314         self.failUnlessEqual(len(writers4), 39)
315         self.failUnlessEqual(len(ss._active_writers), 39)
316
317     def test_seek(self):
318         basedir = self.workdir("test_seek_behavior")
319         fileutil.make_dirs(basedir)
320         filename = os.path.join(basedir, "testfile")
321         f = open(filename, "wb")
322         f.write("start")
323         f.close()
324         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
325         # files. mode="a" preserves previous contents but does not allow
326         # seeking-to-create-holes. mode="r+" allows both.
327         f = open(filename, "rb+")
328         f.seek(100)
329         f.write("100")
330         f.close()
331         filelen = os.stat(filename)[stat.ST_SIZE]
332         self.failUnlessEqual(filelen, 100+3)
333         f2 = open(filename, "rb")
334         self.failUnlessEqual(f2.read(5), "start")
335
336
337     def test_leases(self):
338         ss = self.create("test_leases")
339         canary = Referenceable()
340         sharenums = range(5)
341         size = 100
342
343         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
344                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
345         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
346                                                      sharenums, size, canary)
347         self.failUnlessEqual(len(already), 0)
348         self.failUnlessEqual(len(writers), 5)
349         for wb in writers.values():
350             wb.remote_close()
351
352         leases = list(ss.get_leases("si0"))
353         self.failUnlessEqual(len(leases), 1)
354         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
355
356         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
357                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
358         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
359                                                      sharenums, size, canary)
360         for wb in writers.values():
361             wb.remote_close()
362
363         # take out a second lease on si1
364         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
365                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
366         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
367                                                      sharenums, size, canary)
368         self.failUnlessEqual(len(already), 5)
369         self.failUnlessEqual(len(writers), 0)
370
371         leases = list(ss.get_leases("si1"))
372         self.failUnlessEqual(len(leases), 2)
373         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
374
375         # check that si0 is readable
376         readers = ss.remote_get_buckets("si0")
377         self.failUnlessEqual(len(readers), 5)
378
379         # renew the first lease. Only the proper renew_secret should work
380         ss.remote_renew_lease("si0", rs0)
381         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
382         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
383
384         # check that si0 is still readable
385         readers = ss.remote_get_buckets("si0")
386         self.failUnlessEqual(len(readers), 5)
387
388         # now cancel it
389         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
390         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
391         ss.remote_cancel_lease("si0", cs0)
392
393         # si0 should now be gone
394         readers = ss.remote_get_buckets("si0")
395         self.failUnlessEqual(len(readers), 0)
396         # and the renew should no longer work
397         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
398
399
400         # cancel the first lease on si1, leaving the second in place
401         ss.remote_cancel_lease("si1", cs1)
402         readers = ss.remote_get_buckets("si1")
403         self.failUnlessEqual(len(readers), 5)
404         # the corresponding renew should no longer work
405         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
406
407         leases = list(ss.get_leases("si1"))
408         self.failUnlessEqual(len(leases), 1)
409         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
410
411         ss.remote_renew_lease("si1", rs2)
412         # cancelling the second should make it go away
413         ss.remote_cancel_lease("si1", cs2)
414         readers = ss.remote_get_buckets("si1")
415         self.failUnlessEqual(len(readers), 0)
416         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
417         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
418
419         leases = list(ss.get_leases("si1"))
420         self.failUnlessEqual(len(leases), 0)
421
422
423         # test overlapping uploads
424         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
425                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
426         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
427                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
428         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
429                                                      sharenums, size, canary)
430         self.failUnlessEqual(len(already), 0)
431         self.failUnlessEqual(len(writers), 5)
432         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
433                                                        sharenums, size, canary)
434         self.failUnlessEqual(len(already2), 5)
435         self.failUnlessEqual(len(writers2), 0)
436         for wb in writers.values():
437             wb.remote_close()
438
439         leases = list(ss.get_leases("si3"))
440         self.failUnlessEqual(len(leases), 2)
441
442
443
444 class MutableServer(unittest.TestCase):
445
446     def setUp(self):
447         self.sparent = service.MultiService()
448         self._secret = itertools.count()
449     def tearDown(self):
450         return self.sparent.stopService()
451
452     def workdir(self, name):
453         basedir = os.path.join("storage", "MutableServer", name)
454         return basedir
455
456     def create(self, name, sizelimit=None):
457         workdir = self.workdir(name)
458         ss = StorageServer(workdir, sizelimit)
459         ss.setServiceParent(self.sparent)
460         ss.setNodeID("\x00" * 32)
461         return ss
462
463     def test_create(self):
464         ss = self.create("test_create")
465
466     def write_enabler(self, we_tag):
467         return hashutil.tagged_hash("we_blah", we_tag)
468
469     def allocate(self, ss, storage_index, we_tag, sharenums, size):
470         write_enabler = self.write_enabler(we_tag)
471         renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
472         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
473         return ss.remote_allocate_mutable_slot(storage_index,
474                                                write_enabler,
475                                                renew_secret, cancel_secret,
476                                                sharenums, size)
477
478     def test_allocate(self):
479         ss = self.create("test_allocate")
480         shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
481         self.failUnlessEqual(len(shares), 3)
482         self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
483         shares2 = ss.remote_get_mutable_slot("si1")
484         self.failUnlessEqual(len(shares2), 3)
485         self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
486
487         s0 = shares[0]
488         self.failUnlessEqual(s0.remote_read(0, 10), "")
489         self.failUnlessEqual(s0.remote_read(100, 10), "")
490         # try writing to one
491         WE = self.write_enabler("we1")
492         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
493         answer = s0.remote_testv_and_writev(WE,
494                                             [],
495                                             [(0, data),],
496                                             new_length=None)
497         self.failUnlessEqual(answer, (True, []))
498
499         self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
500         self.failUnlessEqual(s0.remote_read(95, 10), "99999")
501         self.failUnlessEqual(s0.remote_get_length(), 100)
502
503         self.failUnlessRaises(BadWriteEnablerError,
504                               s0.remote_testv_and_writev,
505                               "bad write enabler",
506                               [], [], None)
507         # this testv should fail
508         answer = s0.remote_testv_and_writev(WE,
509                                             [(0, 12, "eq", "444444444444"),
510                                              (20, 5, "eq", "22222"),
511                                              ],
512                                             [(0, "x"*100)], None)
513         self.failUnlessEqual(answer, (False, ["000000000011",
514                                               "22222"]))
515         self.failUnlessEqual(s0.remote_read(0, 100), data)
516
517         # as should this one
518         answer = s0.remote_testv_and_writev(WE,
519                                             [(10, 5, "lt", "11111"),
520                                              ],
521                                             [(0, "x"*100)], None)
522         self.failUnlessEqual(answer, (False, ["11111"]))
523         self.failUnlessEqual(s0.remote_read(0, 100), data)
524
525
526     def test_operators(self):
527         # test operators, the data we're comparing is '11111' in all cases.
528         # test both fail+pass, reset data after each one.
529         ss = self.create("test_operators")
530         shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
531         s0 = shares[0]
532         WE = self.write_enabler("we1")
533         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
534         answer = s0.remote_testv_and_writev(WE,
535                                             [],
536                                             [(0, data),],
537                                             new_length=None)
538
539         #  nop
540         answer = s0.remote_testv_and_writev(WE,
541                                             [(10, 5, "nop", "11111"),
542                                              ],
543                                             [(0, "x"*100)], None)
544         self.failUnlessEqual(answer, (True, ["11111"]))
545         self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
546         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
547
548         #  lt
549         answer = s0.remote_testv_and_writev(WE,
550                                             [(10, 5, "lt", "11110"),
551                                              ],
552                                             [(0, "x"*100)], None)
553         self.failUnlessEqual(answer, (False, ["11111"]))
554         self.failUnlessEqual(s0.remote_read(0, 100), data)
555         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
556
557         answer = s0.remote_testv_and_writev(WE,
558                                             [(10, 5, "lt", "11111"),
559                                              ],
560                                             [(0, "x"*100)], None)
561         self.failUnlessEqual(answer, (False, ["11111"]))
562         self.failUnlessEqual(s0.remote_read(0, 100), data)
563         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
564
565         answer = s0.remote_testv_and_writev(WE,
566                                             [(10, 5, "lt", "11112"),
567                                              ],
568                                             [(0, "y"*100)], None)
569         self.failUnlessEqual(answer, (True, ["11111"]))
570         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
571         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
572
573         #  le
574         answer = s0.remote_testv_and_writev(WE,
575                                             [(10, 5, "le", "11110"),
576                                              ],
577                                             [(0, "x"*100)], None)
578         self.failUnlessEqual(answer, (False, ["11111"]))
579         self.failUnlessEqual(s0.remote_read(0, 100), data)
580         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
581
582         answer = s0.remote_testv_and_writev(WE,
583                                             [(10, 5, "le", "11111"),
584                                              ],
585                                             [(0, "y"*100)], None)
586         self.failUnlessEqual(answer, (True, ["11111"]))
587         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
588         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
589
590         answer = s0.remote_testv_and_writev(WE,
591                                             [(10, 5, "le", "11112"),
592                                              ],
593                                             [(0, "y"*100)], None)
594         self.failUnlessEqual(answer, (True, ["11111"]))
595         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
596         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
597
598         #  eq
599         answer = s0.remote_testv_and_writev(WE,
600                                             [(10, 5, "eq", "11112"),
601                                              ],
602                                             [(0, "x"*100)], None)
603         self.failUnlessEqual(answer, (False, ["11111"]))
604         self.failUnlessEqual(s0.remote_read(0, 100), data)
605         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
606
607         answer = s0.remote_testv_and_writev(WE,
608                                             [(10, 5, "eq", "11111"),
609                                              ],
610                                             [(0, "y"*100)], None)
611         self.failUnlessEqual(answer, (True, ["11111"]))
612         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
613         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
614
615         #  ne
616         answer = s0.remote_testv_and_writev(WE,
617                                             [(10, 5, "ne", "11111"),
618                                              ],
619                                             [(0, "x"*100)], None)
620         self.failUnlessEqual(answer, (False, ["11111"]))
621         self.failUnlessEqual(s0.remote_read(0, 100), data)
622         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
623
624         answer = s0.remote_testv_and_writev(WE,
625                                             [(10, 5, "ne", "11112"),
626                                              ],
627                                             [(0, "y"*100)], None)
628         self.failUnlessEqual(answer, (True, ["11111"]))
629         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
630         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
631
632         #  ge
633         answer = s0.remote_testv_and_writev(WE,
634                                             [(10, 5, "ge", "11110"),
635                                              ],
636                                             [(0, "y"*100)], None)
637         self.failUnlessEqual(answer, (True, ["11111"]))
638         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
639         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
640
641         answer = s0.remote_testv_and_writev(WE,
642                                             [(10, 5, "ge", "11111"),
643                                              ],
644                                             [(0, "y"*100)], None)
645         self.failUnlessEqual(answer, (True, ["11111"]))
646         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
647         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
648
649         answer = s0.remote_testv_and_writev(WE,
650                                             [(10, 5, "ge", "11112"),
651                                              ],
652                                             [(0, "y"*100)], None)
653         self.failUnlessEqual(answer, (False, ["11111"]))
654         self.failUnlessEqual(s0.remote_read(0, 100), data)
655         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
656
657         #  gt
658         answer = s0.remote_testv_and_writev(WE,
659                                             [(10, 5, "gt", "11110"),
660                                              ],
661                                             [(0, "y"*100)], None)
662         self.failUnlessEqual(answer, (True, ["11111"]))
663         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
664         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
665
666         answer = s0.remote_testv_and_writev(WE,
667                                             [(10, 5, "gt", "11111"),
668                                              ],
669                                             [(0, "x"*100)], None)
670         self.failUnlessEqual(answer, (False, ["11111"]))
671         self.failUnlessEqual(s0.remote_read(0, 100), data)
672         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
673
674         answer = s0.remote_testv_and_writev(WE,
675                                             [(10, 5, "gt", "11112"),
676                                              ],
677                                             [(0, "x"*100)], None)
678         self.failUnlessEqual(answer, (False, ["11111"]))
679         self.failUnlessEqual(s0.remote_read(0, 100), data)
680         s0.remote_testv_and_writev(WE, [], [(0,data)], None)