]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
mutable slots: add some test coverage for lease-addition
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
8 import itertools
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil
11 from allmydata.storage import BucketWriter, BucketReader, \
12      WriteBucketProxy, ReadBucketProxy, StorageServer
13 from allmydata.interfaces import BadWriteEnablerError
14
15 class Bucket(unittest.TestCase):
16     def make_workdir(self, name):
17         basedir = os.path.join("storage", "Bucket", name)
18         incoming = os.path.join(basedir, "tmp", "bucket")
19         final = os.path.join(basedir, "bucket")
20         fileutil.make_dirs(basedir)
21         fileutil.make_dirs(os.path.join(basedir, "tmp"))
22         return incoming, final
23
24     def bucket_writer_closed(self, bw, consumed):
25         pass
26
27     def make_lease(self):
28         owner_num = 0
29         renew_secret = os.urandom(32)
30         cancel_secret = os.urandom(32)
31         expiration_time = time.time() + 5000
32         return (owner_num, renew_secret, cancel_secret, expiration_time)
33
34     def test_create(self):
35         incoming, final = self.make_workdir("test_create")
36         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
37         bw.remote_write(0, "a"*25)
38         bw.remote_write(25, "b"*25)
39         bw.remote_write(50, "c"*25)
40         bw.remote_write(75, "d"*7)
41         bw.remote_close()
42
43     def test_readwrite(self):
44         incoming, final = self.make_workdir("test_readwrite")
45         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
46         bw.remote_write(0, "a"*25)
47         bw.remote_write(25, "b"*25)
48         bw.remote_write(50, "c"*7) # last block may be short
49         bw.remote_close()
50
51         # now read from it
52         br = BucketReader(final)
53         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
54         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
55         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
56
57 class RemoteBucket:
58
59     def callRemote(self, methname, *args, **kwargs):
60         def _call():
61             meth = getattr(self.target, "remote_" + methname)
62             return meth(*args, **kwargs)
63         return defer.maybeDeferred(_call)
64
65 class BucketProxy(unittest.TestCase):
66     def make_bucket(self, name, size):
67         basedir = os.path.join("storage", "BucketProxy", name)
68         incoming = os.path.join(basedir, "tmp", "bucket")
69         final = os.path.join(basedir, "bucket")
70         fileutil.make_dirs(basedir)
71         fileutil.make_dirs(os.path.join(basedir, "tmp"))
72         bw = BucketWriter(self, incoming, final, size, self.make_lease())
73         rb = RemoteBucket()
74         rb.target = bw
75         return bw, rb, final
76
77     def make_lease(self):
78         owner_num = 0
79         renew_secret = os.urandom(32)
80         cancel_secret = os.urandom(32)
81         expiration_time = time.time() + 5000
82         return (owner_num, renew_secret, cancel_secret, expiration_time)
83
84     def bucket_writer_closed(self, bw, consumed):
85         pass
86
87     def test_create(self):
88         bw, rb, final = self.make_bucket("test_create", 500)
89         bp = WriteBucketProxy(rb,
90                               data_size=300,
91                               segment_size=10,
92                               num_segments=5,
93                               num_share_hashes=3,
94                               uri_extension_size=500)
95         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
96
97     def test_readwrite(self):
98         # Let's pretend each share has 100 bytes of data, and that there are
99         # 4 segments (25 bytes each), and 8 shares total. So the three
100         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
101         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
102         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
103         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
104         # long. That should make the whole share:
105         #
106         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
107
108         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
109                             for i in range(7)]
110         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
111                             for i in range(7)]
112         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
113                         for i in range(7)]
114         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
115                         for i in (1,9,13)]
116         uri_extension = "s" + "E"*498 + "e"
117
118         bw, rb, final = self.make_bucket("test_readwrite", 1414)
119         bp = WriteBucketProxy(rb,
120                               data_size=95,
121                               segment_size=25,
122                               num_segments=4,
123                               num_share_hashes=3,
124                               uri_extension_size=len(uri_extension))
125
126         d = bp.start()
127         d.addCallback(lambda res: bp.put_block(0, "a"*25))
128         d.addCallback(lambda res: bp.put_block(1, "b"*25))
129         d.addCallback(lambda res: bp.put_block(2, "c"*25))
130         d.addCallback(lambda res: bp.put_block(3, "d"*20))
131         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
132         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
133         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
134         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
135         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
136         d.addCallback(lambda res: bp.close())
137
138         # now read everything back
139         def _start_reading(res):
140             br = BucketReader(final)
141             rb = RemoteBucket()
142             rb.target = br
143             rbp = ReadBucketProxy(rb)
144             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
145
146             d1 = rbp.startIfNecessary()
147             d1.addCallback(lambda res: rbp.get_block(0))
148             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
149             d1.addCallback(lambda res: rbp.get_block(1))
150             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
151             d1.addCallback(lambda res: rbp.get_block(2))
152             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
153             d1.addCallback(lambda res: rbp.get_block(3))
154             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
155
156             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
157             d1.addCallback(lambda res:
158                            self.failUnlessEqual(res, plaintext_hashes))
159             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
160             d1.addCallback(lambda res:
161                            self.failUnlessEqual(res, crypttext_hashes))
162             d1.addCallback(lambda res: rbp.get_block_hashes())
163             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
164             d1.addCallback(lambda res: rbp.get_share_hashes())
165             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
166             d1.addCallback(lambda res: rbp.get_uri_extension())
167             d1.addCallback(lambda res:
168                            self.failUnlessEqual(res, uri_extension))
169
170             return d1
171
172         d.addCallback(_start_reading)
173
174         return d
175
176
177
178 class Server(unittest.TestCase):
179
180     def setUp(self):
181         self.sparent = service.MultiService()
182         self._secret = itertools.count()
183     def tearDown(self):
184         return self.sparent.stopService()
185
186     def workdir(self, name):
187         basedir = os.path.join("storage", "Server", name)
188         return basedir
189
190     def create(self, name, sizelimit=None):
191         workdir = self.workdir(name)
192         ss = StorageServer(workdir, sizelimit)
193         ss.setServiceParent(self.sparent)
194         return ss
195
196     def test_create(self):
197         ss = self.create("test_create")
198
199     def allocate(self, ss, storage_index, sharenums, size):
200         renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
201         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
202         return ss.remote_allocate_buckets(storage_index,
203                                           renew_secret, cancel_secret,
204                                           sharenums, size, Referenceable())
205
206     def test_remove_incoming(self):
207         ss = self.create("test_remove_incoming")
208         already, writers = self.allocate(ss, "vid", range(3), 10)
209         for i,wb in writers.items():
210             wb.remote_write(0, "%10d" % i)
211             wb.remote_close()
212         incomingdir = os.path.join(self.workdir("test_remove_incoming"),
213                                    "shares", "incoming")
214         leftover_dirs = os.listdir(incomingdir)
215         self.failUnlessEqual(leftover_dirs, [])
216
217     def test_allocate(self):
218         ss = self.create("test_allocate")
219
220         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
221
222         canary = Referenceable()
223         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
224         self.failUnlessEqual(already, set())
225         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
226
227         # while the buckets are open, they should not count as readable
228         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
229
230         for i,wb in writers.items():
231             wb.remote_write(0, "%25d" % i)
232             wb.remote_close()
233
234         # now they should be readable
235         b = ss.remote_get_buckets("vid")
236         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
237         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
238
239         # now if we about writing again, the server should offer those three
240         # buckets as already present. It should offer them even if we don't
241         # ask about those specific ones.
242         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
243         self.failUnlessEqual(already, set([0,1,2]))
244         self.failUnlessEqual(set(writers.keys()), set([3,4]))
245
246         # while those two buckets are open for writing, the server should
247         # tell new uploaders that they already exist (so that we don't try to
248         # upload into them a second time)
249
250         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
251         self.failUnlessEqual(already, set([0,1,2,3,4]))
252         self.failUnlessEqual(set(writers.keys()), set([5]))
253
254     def test_sizelimits(self):
255         ss = self.create("test_sizelimits", 5000)
256         canary = Referenceable()
257         # a newly created and filled share incurs this much overhead, beyond
258         # the size we request.
259         OVERHEAD = 3*4
260         LEASE_SIZE = 4+32+32+4
261
262         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
263         self.failUnlessEqual(len(writers), 3)
264         # now the StorageServer should have 3000 bytes provisionally
265         # allocated, allowing only 2000 more to be claimed
266         self.failUnlessEqual(len(ss._active_writers), 3)
267
268         # allocating 1001-byte shares only leaves room for one
269         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
270         self.failUnlessEqual(len(writers2), 1)
271         self.failUnlessEqual(len(ss._active_writers), 4)
272
273         # we abandon the first set, so their provisional allocation should be
274         # returned
275         del already
276         del writers
277         self.failUnlessEqual(len(ss._active_writers), 1)
278         # now we have a provisional allocation of 1001 bytes
279
280         # and we close the second set, so their provisional allocation should
281         # become real, long-term allocation, and grows to include the
282         # overhead.
283         for bw in writers2.values():
284             bw.remote_write(0, "a"*25)
285             bw.remote_close()
286         del already2
287         del writers2
288         del bw
289         self.failUnlessEqual(len(ss._active_writers), 0)
290
291         allocated = 1001 + OVERHEAD + LEASE_SIZE
292         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
293         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
294         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
295         self.failUnlessEqual(len(writers3), 39)
296         self.failUnlessEqual(len(ss._active_writers), 39)
297
298         del already3
299         del writers3
300         self.failUnlessEqual(len(ss._active_writers), 0)
301         ss.disownServiceParent()
302         del ss
303
304         # creating a new StorageServer in the same directory should see the
305         # same usage.
306
307         # metadata that goes into the share file is counted upon share close,
308         # as well as at startup. metadata that goes into other files will not
309         # be counted until the next startup, so if we were creating any
310         # extra-file metadata, the allocation would be more than 'allocated'
311         # and this test would need to be changed.
312         ss = self.create("test_sizelimits", 5000)
313         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
314         self.failUnlessEqual(len(writers4), 39)
315         self.failUnlessEqual(len(ss._active_writers), 39)
316
317     def test_seek(self):
318         basedir = self.workdir("test_seek_behavior")
319         fileutil.make_dirs(basedir)
320         filename = os.path.join(basedir, "testfile")
321         f = open(filename, "wb")
322         f.write("start")
323         f.close()
324         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
325         # files. mode="a" preserves previous contents but does not allow
326         # seeking-to-create-holes. mode="r+" allows both.
327         f = open(filename, "rb+")
328         f.seek(100)
329         f.write("100")
330         f.close()
331         filelen = os.stat(filename)[stat.ST_SIZE]
332         self.failUnlessEqual(filelen, 100+3)
333         f2 = open(filename, "rb")
334         self.failUnlessEqual(f2.read(5), "start")
335
336
337     def test_leases(self):
338         ss = self.create("test_leases")
339         canary = Referenceable()
340         sharenums = range(5)
341         size = 100
342
343         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
344                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
345         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
346                                                      sharenums, size, canary)
347         self.failUnlessEqual(len(already), 0)
348         self.failUnlessEqual(len(writers), 5)
349         for wb in writers.values():
350             wb.remote_close()
351
352         leases = list(ss.get_leases("si0"))
353         self.failUnlessEqual(len(leases), 1)
354         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
355
356         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
357                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
358         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
359                                                      sharenums, size, canary)
360         for wb in writers.values():
361             wb.remote_close()
362
363         # take out a second lease on si1
364         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
365                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
366         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
367                                                      sharenums, size, canary)
368         self.failUnlessEqual(len(already), 5)
369         self.failUnlessEqual(len(writers), 0)
370
371         leases = list(ss.get_leases("si1"))
372         self.failUnlessEqual(len(leases), 2)
373         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
374
375         # check that si0 is readable
376         readers = ss.remote_get_buckets("si0")
377         self.failUnlessEqual(len(readers), 5)
378
379         # renew the first lease. Only the proper renew_secret should work
380         ss.remote_renew_lease("si0", rs0)
381         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
382         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
383
384         # check that si0 is still readable
385         readers = ss.remote_get_buckets("si0")
386         self.failUnlessEqual(len(readers), 5)
387
388         # now cancel it
389         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
390         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
391         ss.remote_cancel_lease("si0", cs0)
392
393         # si0 should now be gone
394         readers = ss.remote_get_buckets("si0")
395         self.failUnlessEqual(len(readers), 0)
396         # and the renew should no longer work
397         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
398
399
400         # cancel the first lease on si1, leaving the second in place
401         ss.remote_cancel_lease("si1", cs1)
402         readers = ss.remote_get_buckets("si1")
403         self.failUnlessEqual(len(readers), 5)
404         # the corresponding renew should no longer work
405         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
406
407         leases = list(ss.get_leases("si1"))
408         self.failUnlessEqual(len(leases), 1)
409         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
410
411         ss.remote_renew_lease("si1", rs2)
412         # cancelling the second should make it go away
413         ss.remote_cancel_lease("si1", cs2)
414         readers = ss.remote_get_buckets("si1")
415         self.failUnlessEqual(len(readers), 0)
416         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
417         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
418
419         leases = list(ss.get_leases("si1"))
420         self.failUnlessEqual(len(leases), 0)
421
422
423         # test overlapping uploads
424         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
425                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
426         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
427                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
428         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
429                                                      sharenums, size, canary)
430         self.failUnlessEqual(len(already), 0)
431         self.failUnlessEqual(len(writers), 5)
432         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
433                                                        sharenums, size, canary)
434         self.failUnlessEqual(len(already2), 5)
435         self.failUnlessEqual(len(writers2), 0)
436         for wb in writers.values():
437             wb.remote_close()
438
439         leases = list(ss.get_leases("si3"))
440         self.failUnlessEqual(len(leases), 2)
441
442
443
444 class MutableServer(unittest.TestCase):
445
446     def setUp(self):
447         self.sparent = service.MultiService()
448         self._secret = itertools.count()
449     def tearDown(self):
450         return self.sparent.stopService()
451
452     def workdir(self, name):
453         basedir = os.path.join("storage", "MutableServer", name)
454         return basedir
455
456     def create(self, name, sizelimit=None):
457         workdir = self.workdir(name)
458         ss = StorageServer(workdir, sizelimit)
459         ss.setServiceParent(self.sparent)
460         ss.setNodeID("\x00" * 32)
461         return ss
462
463     def test_create(self):
464         ss = self.create("test_create")
465
466     def write_enabler(self, we_tag):
467         return hashutil.tagged_hash("we_blah", we_tag)
468
469     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
470         write_enabler = self.write_enabler(we_tag)
471         renew_secret = hashutil.tagged_hash("blah", str(lease_tag))
472         cancel_secret = hashutil.tagged_hash("blah", str(lease_tag))
473         return ss.remote_allocate_mutable_slot(storage_index,
474                                                write_enabler,
475                                                renew_secret, cancel_secret,
476                                                sharenums, size)
477
478     def test_allocate(self):
479         ss = self.create("test_allocate")
480         shares = self.allocate(ss, "si1", "we1", self._secret.next(),
481                                set([0,1,2]), 100)
482         self.failUnlessEqual(len(shares), 3)
483         self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
484         shares2 = ss.remote_get_mutable_slot("si1")
485         self.failUnlessEqual(len(shares2), 3)
486         self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
487
488         s0 = shares[0]
489         self.failUnlessEqual(s0.remote_read(0, 10), "")
490         self.failUnlessEqual(s0.remote_read(100, 10), "")
491         # try writing to one
492         WE = self.write_enabler("we1")
493         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
494         answer = s0.remote_testv_and_writev(WE,
495                                             [],
496                                             [(0, data),],
497                                             new_length=None)
498         self.failUnlessEqual(answer, (True, []))
499
500         self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
501         self.failUnlessEqual(s0.remote_read(95, 10), "99999")
502         self.failUnlessEqual(s0.remote_get_length(), 100)
503
504         self.failUnlessRaises(BadWriteEnablerError,
505                               s0.remote_testv_and_writev,
506                               "bad write enabler",
507                               [], [], None)
508         # this testv should fail
509         answer = s0.remote_testv_and_writev(WE,
510                                             [(0, 12, "eq", "444444444444"),
511                                              (20, 5, "eq", "22222"),
512                                              ],
513                                             [(0, "x"*100)], None)
514         self.failUnlessEqual(answer, (False, ["000000000011",
515                                               "22222"]))
516         self.failUnlessEqual(s0.remote_read(0, 100), data)
517
518         # as should this one
519         answer = s0.remote_testv_and_writev(WE,
520                                             [(10, 5, "lt", "11111"),
521                                              ],
522                                             [(0, "x"*100)], None)
523         self.failUnlessEqual(answer, (False, ["11111"]))
524         self.failUnlessEqual(s0.remote_read(0, 100), data)
525
526
527     def test_operators(self):
528         # test operators, the data we're comparing is '11111' in all cases.
529         # test both fail+pass, reset data after each one.
530         ss = self.create("test_operators")
531         shares = self.allocate(ss, "si1", "we1", self._secret.next(),
532                                set([0,1,2]), 100)
533         s0 = shares[0]
534         WE = self.write_enabler("we1")
535         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
536         answer = s0.remote_testv_and_writev(WE,
537                                             [],
538                                             [(0, data),],
539                                             new_length=None)
540
541         #  nop
542         answer = s0.remote_testv_and_writev(WE,
543                                             [(10, 5, "nop", "11111"),
544                                              ],
545                                             [(0, "x"*100)], None)
546         self.failUnlessEqual(answer, (True, ["11111"]))
547         self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
548         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
549
550         #  lt
551         answer = s0.remote_testv_and_writev(WE,
552                                             [(10, 5, "lt", "11110"),
553                                              ],
554                                             [(0, "x"*100)], None)
555         self.failUnlessEqual(answer, (False, ["11111"]))
556         self.failUnlessEqual(s0.remote_read(0, 100), data)
557         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
558
559         answer = s0.remote_testv_and_writev(WE,
560                                             [(10, 5, "lt", "11111"),
561                                              ],
562                                             [(0, "x"*100)], None)
563         self.failUnlessEqual(answer, (False, ["11111"]))
564         self.failUnlessEqual(s0.remote_read(0, 100), data)
565         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
566
567         answer = s0.remote_testv_and_writev(WE,
568                                             [(10, 5, "lt", "11112"),
569                                              ],
570                                             [(0, "y"*100)], None)
571         self.failUnlessEqual(answer, (True, ["11111"]))
572         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
573         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
574
575         #  le
576         answer = s0.remote_testv_and_writev(WE,
577                                             [(10, 5, "le", "11110"),
578                                              ],
579                                             [(0, "x"*100)], None)
580         self.failUnlessEqual(answer, (False, ["11111"]))
581         self.failUnlessEqual(s0.remote_read(0, 100), data)
582         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
583
584         answer = s0.remote_testv_and_writev(WE,
585                                             [(10, 5, "le", "11111"),
586                                              ],
587                                             [(0, "y"*100)], None)
588         self.failUnlessEqual(answer, (True, ["11111"]))
589         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
590         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
591
592         answer = s0.remote_testv_and_writev(WE,
593                                             [(10, 5, "le", "11112"),
594                                              ],
595                                             [(0, "y"*100)], None)
596         self.failUnlessEqual(answer, (True, ["11111"]))
597         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
598         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
599
600         #  eq
601         answer = s0.remote_testv_and_writev(WE,
602                                             [(10, 5, "eq", "11112"),
603                                              ],
604                                             [(0, "x"*100)], None)
605         self.failUnlessEqual(answer, (False, ["11111"]))
606         self.failUnlessEqual(s0.remote_read(0, 100), data)
607         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
608
609         answer = s0.remote_testv_and_writev(WE,
610                                             [(10, 5, "eq", "11111"),
611                                              ],
612                                             [(0, "y"*100)], None)
613         self.failUnlessEqual(answer, (True, ["11111"]))
614         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
615         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
616
617         #  ne
618         answer = s0.remote_testv_and_writev(WE,
619                                             [(10, 5, "ne", "11111"),
620                                              ],
621                                             [(0, "x"*100)], None)
622         self.failUnlessEqual(answer, (False, ["11111"]))
623         self.failUnlessEqual(s0.remote_read(0, 100), data)
624         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
625
626         answer = s0.remote_testv_and_writev(WE,
627                                             [(10, 5, "ne", "11112"),
628                                              ],
629                                             [(0, "y"*100)], None)
630         self.failUnlessEqual(answer, (True, ["11111"]))
631         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
632         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
633
634         #  ge
635         answer = s0.remote_testv_and_writev(WE,
636                                             [(10, 5, "ge", "11110"),
637                                              ],
638                                             [(0, "y"*100)], None)
639         self.failUnlessEqual(answer, (True, ["11111"]))
640         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
641         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
642
643         answer = s0.remote_testv_and_writev(WE,
644                                             [(10, 5, "ge", "11111"),
645                                              ],
646                                             [(0, "y"*100)], None)
647         self.failUnlessEqual(answer, (True, ["11111"]))
648         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
649         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
650
651         answer = s0.remote_testv_and_writev(WE,
652                                             [(10, 5, "ge", "11112"),
653                                              ],
654                                             [(0, "y"*100)], None)
655         self.failUnlessEqual(answer, (False, ["11111"]))
656         self.failUnlessEqual(s0.remote_read(0, 100), data)
657         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
658
659         #  gt
660         answer = s0.remote_testv_and_writev(WE,
661                                             [(10, 5, "gt", "11110"),
662                                              ],
663                                             [(0, "y"*100)], None)
664         self.failUnlessEqual(answer, (True, ["11111"]))
665         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
666         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
667
668         answer = s0.remote_testv_and_writev(WE,
669                                             [(10, 5, "gt", "11111"),
670                                              ],
671                                             [(0, "x"*100)], None)
672         self.failUnlessEqual(answer, (False, ["11111"]))
673         self.failUnlessEqual(s0.remote_read(0, 100), data)
674         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
675
676         answer = s0.remote_testv_and_writev(WE,
677                                             [(10, 5, "gt", "11112"),
678                                              ],
679                                             [(0, "x"*100)], None)
680         self.failUnlessEqual(answer, (False, ["11111"]))
681         self.failUnlessEqual(s0.remote_read(0, 100), data)
682         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
683
684     def test_leases(self):
685         ss = self.create("test_leases")
686         secret = 14
687         shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
688         s0 = shares[0]
689         WE = self.write_enabler("we1")
690         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
691         answer = s0.remote_testv_and_writev(WE,
692                                             [],
693                                             [(0, data),],
694                                             new_length=None)
695
696         # re-allocate the slots and use the same secrets, that should update
697         # the lease
698         shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
699
700         # now allocate them with a bunch of different secrets, to trigger the
701         # extended lease code
702         shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100)
703         shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100)
704         shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100)
705         shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100)
706         shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100)
707
708         # and write enough data to expand the container, forcing the server
709         # to move the leases
710         answer = s0.remote_testv_and_writev(WE,
711                                             [],
712                                             [(0, data),],
713                                             new_length=200)
714
715         # TODO: read back the leases, make sure they're still intact. We need
716         # a renew_lease() call for this.