]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: add readv_slots: get data from all shares
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
8 import itertools
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil, idlib
11 from allmydata.storage import BucketWriter, BucketReader, \
12      WriteBucketProxy, ReadBucketProxy, StorageServer
13 from allmydata.interfaces import BadWriteEnablerError
14
15 class Bucket(unittest.TestCase):
16     def make_workdir(self, name):
17         basedir = os.path.join("storage", "Bucket", name)
18         incoming = os.path.join(basedir, "tmp", "bucket")
19         final = os.path.join(basedir, "bucket")
20         fileutil.make_dirs(basedir)
21         fileutil.make_dirs(os.path.join(basedir, "tmp"))
22         return incoming, final
23
24     def bucket_writer_closed(self, bw, consumed):
25         pass
26
27     def make_lease(self):
28         owner_num = 0
29         renew_secret = os.urandom(32)
30         cancel_secret = os.urandom(32)
31         expiration_time = time.time() + 5000
32         return (owner_num, renew_secret, cancel_secret, expiration_time)
33
34     def test_create(self):
35         incoming, final = self.make_workdir("test_create")
36         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
37         bw.remote_write(0, "a"*25)
38         bw.remote_write(25, "b"*25)
39         bw.remote_write(50, "c"*25)
40         bw.remote_write(75, "d"*7)
41         bw.remote_close()
42
43     def test_readwrite(self):
44         incoming, final = self.make_workdir("test_readwrite")
45         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
46         bw.remote_write(0, "a"*25)
47         bw.remote_write(25, "b"*25)
48         bw.remote_write(50, "c"*7) # last block may be short
49         bw.remote_close()
50
51         # now read from it
52         br = BucketReader(final)
53         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
54         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
55         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
56
57 class RemoteBucket:
58
59     def callRemote(self, methname, *args, **kwargs):
60         def _call():
61             meth = getattr(self.target, "remote_" + methname)
62             return meth(*args, **kwargs)
63         return defer.maybeDeferred(_call)
64
65 class BucketProxy(unittest.TestCase):
66     def make_bucket(self, name, size):
67         basedir = os.path.join("storage", "BucketProxy", name)
68         incoming = os.path.join(basedir, "tmp", "bucket")
69         final = os.path.join(basedir, "bucket")
70         fileutil.make_dirs(basedir)
71         fileutil.make_dirs(os.path.join(basedir, "tmp"))
72         bw = BucketWriter(self, incoming, final, size, self.make_lease())
73         rb = RemoteBucket()
74         rb.target = bw
75         return bw, rb, final
76
77     def make_lease(self):
78         owner_num = 0
79         renew_secret = os.urandom(32)
80         cancel_secret = os.urandom(32)
81         expiration_time = time.time() + 5000
82         return (owner_num, renew_secret, cancel_secret, expiration_time)
83
84     def bucket_writer_closed(self, bw, consumed):
85         pass
86
87     def test_create(self):
88         bw, rb, final = self.make_bucket("test_create", 500)
89         bp = WriteBucketProxy(rb,
90                               data_size=300,
91                               segment_size=10,
92                               num_segments=5,
93                               num_share_hashes=3,
94                               uri_extension_size=500)
95         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
96
97     def test_readwrite(self):
98         # Let's pretend each share has 100 bytes of data, and that there are
99         # 4 segments (25 bytes each), and 8 shares total. So the three
100         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
101         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
102         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
103         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
104         # long. That should make the whole share:
105         #
106         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
107
108         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
109                             for i in range(7)]
110         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
111                             for i in range(7)]
112         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
113                         for i in range(7)]
114         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
115                         for i in (1,9,13)]
116         uri_extension = "s" + "E"*498 + "e"
117
118         bw, rb, final = self.make_bucket("test_readwrite", 1414)
119         bp = WriteBucketProxy(rb,
120                               data_size=95,
121                               segment_size=25,
122                               num_segments=4,
123                               num_share_hashes=3,
124                               uri_extension_size=len(uri_extension))
125
126         d = bp.start()
127         d.addCallback(lambda res: bp.put_block(0, "a"*25))
128         d.addCallback(lambda res: bp.put_block(1, "b"*25))
129         d.addCallback(lambda res: bp.put_block(2, "c"*25))
130         d.addCallback(lambda res: bp.put_block(3, "d"*20))
131         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
132         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
133         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
134         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
135         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
136         d.addCallback(lambda res: bp.close())
137
138         # now read everything back
139         def _start_reading(res):
140             br = BucketReader(final)
141             rb = RemoteBucket()
142             rb.target = br
143             rbp = ReadBucketProxy(rb)
144             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
145
146             d1 = rbp.startIfNecessary()
147             d1.addCallback(lambda res: rbp.get_block(0))
148             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
149             d1.addCallback(lambda res: rbp.get_block(1))
150             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
151             d1.addCallback(lambda res: rbp.get_block(2))
152             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
153             d1.addCallback(lambda res: rbp.get_block(3))
154             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
155
156             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
157             d1.addCallback(lambda res:
158                            self.failUnlessEqual(res, plaintext_hashes))
159             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
160             d1.addCallback(lambda res:
161                            self.failUnlessEqual(res, crypttext_hashes))
162             d1.addCallback(lambda res: rbp.get_block_hashes())
163             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
164             d1.addCallback(lambda res: rbp.get_share_hashes())
165             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
166             d1.addCallback(lambda res: rbp.get_uri_extension())
167             d1.addCallback(lambda res:
168                            self.failUnlessEqual(res, uri_extension))
169
170             return d1
171
172         d.addCallback(_start_reading)
173
174         return d
175
176
177
178 class Server(unittest.TestCase):
179
180     def setUp(self):
181         self.sparent = service.MultiService()
182         self._secret = itertools.count()
183     def tearDown(self):
184         return self.sparent.stopService()
185
186     def workdir(self, name):
187         basedir = os.path.join("storage", "Server", name)
188         return basedir
189
190     def create(self, name, sizelimit=None):
191         workdir = self.workdir(name)
192         ss = StorageServer(workdir, sizelimit)
193         ss.setServiceParent(self.sparent)
194         return ss
195
196     def test_create(self):
197         ss = self.create("test_create")
198
199     def allocate(self, ss, storage_index, sharenums, size):
200         renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
201         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
202         return ss.remote_allocate_buckets(storage_index,
203                                           renew_secret, cancel_secret,
204                                           sharenums, size, Referenceable())
205
206     def test_remove_incoming(self):
207         ss = self.create("test_remove_incoming")
208         already, writers = self.allocate(ss, "vid", range(3), 10)
209         for i,wb in writers.items():
210             wb.remote_write(0, "%10d" % i)
211             wb.remote_close()
212         incomingdir = os.path.join(self.workdir("test_remove_incoming"),
213                                    "shares", "incoming")
214         leftover_dirs = os.listdir(incomingdir)
215         self.failUnlessEqual(leftover_dirs, [])
216
217     def test_allocate(self):
218         ss = self.create("test_allocate")
219
220         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
221
222         canary = Referenceable()
223         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
224         self.failUnlessEqual(already, set())
225         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
226
227         # while the buckets are open, they should not count as readable
228         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
229
230         for i,wb in writers.items():
231             wb.remote_write(0, "%25d" % i)
232             wb.remote_close()
233
234         # now they should be readable
235         b = ss.remote_get_buckets("vid")
236         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
237         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
238
239         # now if we about writing again, the server should offer those three
240         # buckets as already present. It should offer them even if we don't
241         # ask about those specific ones.
242         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
243         self.failUnlessEqual(already, set([0,1,2]))
244         self.failUnlessEqual(set(writers.keys()), set([3,4]))
245
246         # while those two buckets are open for writing, the server should
247         # tell new uploaders that they already exist (so that we don't try to
248         # upload into them a second time)
249
250         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
251         self.failUnlessEqual(already, set([0,1,2,3,4]))
252         self.failUnlessEqual(set(writers.keys()), set([5]))
253
254     def test_sizelimits(self):
255         ss = self.create("test_sizelimits", 5000)
256         canary = Referenceable()
257         # a newly created and filled share incurs this much overhead, beyond
258         # the size we request.
259         OVERHEAD = 3*4
260         LEASE_SIZE = 4+32+32+4
261
262         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
263         self.failUnlessEqual(len(writers), 3)
264         # now the StorageServer should have 3000 bytes provisionally
265         # allocated, allowing only 2000 more to be claimed
266         self.failUnlessEqual(len(ss._active_writers), 3)
267
268         # allocating 1001-byte shares only leaves room for one
269         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
270         self.failUnlessEqual(len(writers2), 1)
271         self.failUnlessEqual(len(ss._active_writers), 4)
272
273         # we abandon the first set, so their provisional allocation should be
274         # returned
275         del already
276         del writers
277         self.failUnlessEqual(len(ss._active_writers), 1)
278         # now we have a provisional allocation of 1001 bytes
279
280         # and we close the second set, so their provisional allocation should
281         # become real, long-term allocation, and grows to include the
282         # overhead.
283         for bw in writers2.values():
284             bw.remote_write(0, "a"*25)
285             bw.remote_close()
286         del already2
287         del writers2
288         del bw
289         self.failUnlessEqual(len(ss._active_writers), 0)
290
291         allocated = 1001 + OVERHEAD + LEASE_SIZE
292         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
293         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
294         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
295         self.failUnlessEqual(len(writers3), 39)
296         self.failUnlessEqual(len(ss._active_writers), 39)
297
298         del already3
299         del writers3
300         self.failUnlessEqual(len(ss._active_writers), 0)
301         ss.disownServiceParent()
302         del ss
303
304         # creating a new StorageServer in the same directory should see the
305         # same usage.
306
307         # metadata that goes into the share file is counted upon share close,
308         # as well as at startup. metadata that goes into other files will not
309         # be counted until the next startup, so if we were creating any
310         # extra-file metadata, the allocation would be more than 'allocated'
311         # and this test would need to be changed.
312         ss = self.create("test_sizelimits", 5000)
313         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
314         self.failUnlessEqual(len(writers4), 39)
315         self.failUnlessEqual(len(ss._active_writers), 39)
316
317     def test_seek(self):
318         basedir = self.workdir("test_seek_behavior")
319         fileutil.make_dirs(basedir)
320         filename = os.path.join(basedir, "testfile")
321         f = open(filename, "wb")
322         f.write("start")
323         f.close()
324         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
325         # files. mode="a" preserves previous contents but does not allow
326         # seeking-to-create-holes. mode="r+" allows both.
327         f = open(filename, "rb+")
328         f.seek(100)
329         f.write("100")
330         f.close()
331         filelen = os.stat(filename)[stat.ST_SIZE]
332         self.failUnlessEqual(filelen, 100+3)
333         f2 = open(filename, "rb")
334         self.failUnlessEqual(f2.read(5), "start")
335
336
337     def test_leases(self):
338         ss = self.create("test_leases")
339         canary = Referenceable()
340         sharenums = range(5)
341         size = 100
342
343         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
344                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
345         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
346                                                      sharenums, size, canary)
347         self.failUnlessEqual(len(already), 0)
348         self.failUnlessEqual(len(writers), 5)
349         for wb in writers.values():
350             wb.remote_close()
351
352         leases = list(ss.get_leases("si0"))
353         self.failUnlessEqual(len(leases), 1)
354         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
355
356         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
357                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
358         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
359                                                      sharenums, size, canary)
360         for wb in writers.values():
361             wb.remote_close()
362
363         # take out a second lease on si1
364         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
365                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
366         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
367                                                      sharenums, size, canary)
368         self.failUnlessEqual(len(already), 5)
369         self.failUnlessEqual(len(writers), 0)
370
371         leases = list(ss.get_leases("si1"))
372         self.failUnlessEqual(len(leases), 2)
373         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
374
375         # check that si0 is readable
376         readers = ss.remote_get_buckets("si0")
377         self.failUnlessEqual(len(readers), 5)
378
379         # renew the first lease. Only the proper renew_secret should work
380         ss.remote_renew_lease("si0", rs0)
381         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
382         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
383
384         # check that si0 is still readable
385         readers = ss.remote_get_buckets("si0")
386         self.failUnlessEqual(len(readers), 5)
387
388         # now cancel it
389         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
390         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
391         ss.remote_cancel_lease("si0", cs0)
392
393         # si0 should now be gone
394         readers = ss.remote_get_buckets("si0")
395         self.failUnlessEqual(len(readers), 0)
396         # and the renew should no longer work
397         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
398
399
400         # cancel the first lease on si1, leaving the second in place
401         ss.remote_cancel_lease("si1", cs1)
402         readers = ss.remote_get_buckets("si1")
403         self.failUnlessEqual(len(readers), 5)
404         # the corresponding renew should no longer work
405         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
406
407         leases = list(ss.get_leases("si1"))
408         self.failUnlessEqual(len(leases), 1)
409         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
410
411         ss.remote_renew_lease("si1", rs2)
412         # cancelling the second should make it go away
413         ss.remote_cancel_lease("si1", cs2)
414         readers = ss.remote_get_buckets("si1")
415         self.failUnlessEqual(len(readers), 0)
416         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
417         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
418
419         leases = list(ss.get_leases("si1"))
420         self.failUnlessEqual(len(leases), 0)
421
422
423         # test overlapping uploads
424         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
425                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
426         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
427                    hashutil.tagged_hash("blah", "%d" % self._secret.next()))
428         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
429                                                      sharenums, size, canary)
430         self.failUnlessEqual(len(already), 0)
431         self.failUnlessEqual(len(writers), 5)
432         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
433                                                        sharenums, size, canary)
434         self.failUnlessEqual(len(already2), 5)
435         self.failUnlessEqual(len(writers2), 0)
436         for wb in writers.values():
437             wb.remote_close()
438
439         leases = list(ss.get_leases("si3"))
440         self.failUnlessEqual(len(leases), 2)
441
442
443
444 class MutableServer(unittest.TestCase):
445
446     def setUp(self):
447         self.sparent = service.MultiService()
448         self._secret = itertools.count()
449     def tearDown(self):
450         return self.sparent.stopService()
451
452     def workdir(self, name):
453         basedir = os.path.join("storage", "MutableServer", name)
454         return basedir
455
456     def create(self, name, sizelimit=None):
457         workdir = self.workdir(name)
458         ss = StorageServer(workdir, sizelimit)
459         ss.setServiceParent(self.sparent)
460         ss.setNodeID("\x00" * 32)
461         return ss
462
463     def test_create(self):
464         ss = self.create("test_create")
465
466     def write_enabler(self, we_tag):
467         return hashutil.tagged_hash("we_blah", we_tag)
468
469     def renew_secret(self, tag):
470         return hashutil.tagged_hash("renew_blah", str(tag))
471
472     def cancel_secret(self, tag):
473         return hashutil.tagged_hash("cancel_blah", str(tag))
474
475     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
476         write_enabler = self.write_enabler(we_tag)
477         renew_secret = self.renew_secret(lease_tag)
478         cancel_secret = self.cancel_secret(lease_tag)
479         return ss.remote_allocate_mutable_slot(storage_index,
480                                                write_enabler,
481                                                renew_secret, cancel_secret,
482                                                sharenums, size)
483
484     def test_allocate(self):
485         ss = self.create("test_allocate")
486         shares = self.allocate(ss, "si1", "we1", self._secret.next(),
487                                set([0,1,2]), 100)
488         self.failUnlessEqual(len(shares), 3)
489         self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
490         shares2 = ss.remote_get_mutable_slot("si1")
491         self.failUnlessEqual(len(shares2), 3)
492         self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
493
494         s0 = shares[0]
495         self.failUnlessEqual(s0.remote_read(0, 10), "")
496         self.failUnlessEqual(s0.remote_read(100, 10), "")
497         # try writing to one
498         WE = self.write_enabler("we1")
499         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
500         answer = s0.remote_testv_and_writev(WE,
501                                             [],
502                                             [(0, data),],
503                                             new_length=None)
504         self.failUnlessEqual(answer, (True, []))
505
506         self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
507         self.failUnlessEqual(s0.remote_read(95, 10), "99999")
508         self.failUnlessEqual(s0.remote_get_length(), 100)
509
510         self.failUnlessRaises(BadWriteEnablerError,
511                               s0.remote_testv_and_writev,
512                               "bad write enabler",
513                               [], [], None)
514         # this testv should fail
515         answer = s0.remote_testv_and_writev(WE,
516                                             [(0, 12, "eq", "444444444444"),
517                                              (20, 5, "eq", "22222"),
518                                              ],
519                                             [(0, "x"*100)], None)
520         self.failUnlessEqual(answer, (False, ["000000000011",
521                                               "22222"]))
522         self.failUnlessEqual(s0.remote_read(0, 100), data)
523
524         # as should this one
525         answer = s0.remote_testv_and_writev(WE,
526                                             [(10, 5, "lt", "11111"),
527                                              ],
528                                             [(0, "x"*100)], None)
529         self.failUnlessEqual(answer, (False, ["11111"]))
530         self.failUnlessEqual(s0.remote_read(0, 100), data)
531
532
533     def test_operators(self):
534         # test operators, the data we're comparing is '11111' in all cases.
535         # test both fail+pass, reset data after each one.
536         ss = self.create("test_operators")
537         shares = self.allocate(ss, "si1", "we1", self._secret.next(),
538                                set([0,1,2]), 100)
539         s0 = shares[0]
540         WE = self.write_enabler("we1")
541         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
542         answer = s0.remote_testv_and_writev(WE,
543                                             [],
544                                             [(0, data),],
545                                             new_length=None)
546
547         #  nop
548         answer = s0.remote_testv_and_writev(WE,
549                                             [(10, 5, "nop", "11111"),
550                                              ],
551                                             [(0, "x"*100)], None)
552         self.failUnlessEqual(answer, (True, ["11111"]))
553         self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
554         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
555
556         #  lt
557         answer = s0.remote_testv_and_writev(WE,
558                                             [(10, 5, "lt", "11110"),
559                                              ],
560                                             [(0, "x"*100)], None)
561         self.failUnlessEqual(answer, (False, ["11111"]))
562         self.failUnlessEqual(s0.remote_read(0, 100), data)
563         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
564
565         answer = s0.remote_testv_and_writev(WE,
566                                             [(10, 5, "lt", "11111"),
567                                              ],
568                                             [(0, "x"*100)], None)
569         self.failUnlessEqual(answer, (False, ["11111"]))
570         self.failUnlessEqual(s0.remote_read(0, 100), data)
571         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
572
573         answer = s0.remote_testv_and_writev(WE,
574                                             [(10, 5, "lt", "11112"),
575                                              ],
576                                             [(0, "y"*100)], None)
577         self.failUnlessEqual(answer, (True, ["11111"]))
578         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
579         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
580
581         #  le
582         answer = s0.remote_testv_and_writev(WE,
583                                             [(10, 5, "le", "11110"),
584                                              ],
585                                             [(0, "x"*100)], None)
586         self.failUnlessEqual(answer, (False, ["11111"]))
587         self.failUnlessEqual(s0.remote_read(0, 100), data)
588         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
589
590         answer = s0.remote_testv_and_writev(WE,
591                                             [(10, 5, "le", "11111"),
592                                              ],
593                                             [(0, "y"*100)], None)
594         self.failUnlessEqual(answer, (True, ["11111"]))
595         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
596         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
597
598         answer = s0.remote_testv_and_writev(WE,
599                                             [(10, 5, "le", "11112"),
600                                              ],
601                                             [(0, "y"*100)], None)
602         self.failUnlessEqual(answer, (True, ["11111"]))
603         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
604         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
605
606         #  eq
607         answer = s0.remote_testv_and_writev(WE,
608                                             [(10, 5, "eq", "11112"),
609                                              ],
610                                             [(0, "x"*100)], None)
611         self.failUnlessEqual(answer, (False, ["11111"]))
612         self.failUnlessEqual(s0.remote_read(0, 100), data)
613         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
614
615         answer = s0.remote_testv_and_writev(WE,
616                                             [(10, 5, "eq", "11111"),
617                                              ],
618                                             [(0, "y"*100)], None)
619         self.failUnlessEqual(answer, (True, ["11111"]))
620         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
621         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
622
623         #  ne
624         answer = s0.remote_testv_and_writev(WE,
625                                             [(10, 5, "ne", "11111"),
626                                              ],
627                                             [(0, "x"*100)], None)
628         self.failUnlessEqual(answer, (False, ["11111"]))
629         self.failUnlessEqual(s0.remote_read(0, 100), data)
630         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
631
632         answer = s0.remote_testv_and_writev(WE,
633                                             [(10, 5, "ne", "11112"),
634                                              ],
635                                             [(0, "y"*100)], None)
636         self.failUnlessEqual(answer, (True, ["11111"]))
637         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
638         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
639
640         #  ge
641         answer = s0.remote_testv_and_writev(WE,
642                                             [(10, 5, "ge", "11110"),
643                                              ],
644                                             [(0, "y"*100)], None)
645         self.failUnlessEqual(answer, (True, ["11111"]))
646         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
647         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
648
649         answer = s0.remote_testv_and_writev(WE,
650                                             [(10, 5, "ge", "11111"),
651                                              ],
652                                             [(0, "y"*100)], None)
653         self.failUnlessEqual(answer, (True, ["11111"]))
654         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
655         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
656
657         answer = s0.remote_testv_and_writev(WE,
658                                             [(10, 5, "ge", "11112"),
659                                              ],
660                                             [(0, "y"*100)], None)
661         self.failUnlessEqual(answer, (False, ["11111"]))
662         self.failUnlessEqual(s0.remote_read(0, 100), data)
663         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
664
665         #  gt
666         answer = s0.remote_testv_and_writev(WE,
667                                             [(10, 5, "gt", "11110"),
668                                              ],
669                                             [(0, "y"*100)], None)
670         self.failUnlessEqual(answer, (True, ["11111"]))
671         self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
672         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
673
674         answer = s0.remote_testv_and_writev(WE,
675                                             [(10, 5, "gt", "11111"),
676                                              ],
677                                             [(0, "x"*100)], None)
678         self.failUnlessEqual(answer, (False, ["11111"]))
679         self.failUnlessEqual(s0.remote_read(0, 100), data)
680         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
681
682         answer = s0.remote_testv_and_writev(WE,
683                                             [(10, 5, "gt", "11112"),
684                                              ],
685                                             [(0, "x"*100)], None)
686         self.failUnlessEqual(answer, (False, ["11111"]))
687         self.failUnlessEqual(s0.remote_read(0, 100), data)
688         s0.remote_testv_and_writev(WE, [], [(0,data)], None)
689
690     def test_readv(self):
691         ss = self.create("test_allocate")
692         shares = self.allocate(ss, "si1", "we1", self._secret.next(),
693                                set([0,1,2]), 100)
694         WE = self.write_enabler("we1")
695         data = [("%d" % i) * 100 for i in range(3)]
696         for i in range(3):
697             rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])],
698                                                    new_length=None)
699             self.failUnlessEqual(rc, (True, []))
700         answer = ss.remote_readv_slots("si1", [(0, 10)])
701         self.failUnlessEqual(answer, {0: ["0"*10],
702                                       1: ["1"*10],
703                                       2: ["2"*10]})
704
705     def compare_leases_without_timestamps(self, a, b):
706         self.failUnlessEqual(len(a), len(b))
707         for i in range(len(a)):
708             (num_a, (ownerid_a, expiration_time_a,
709                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
710             (num_b, (ownerid_b, expiration_time_b,
711                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
712             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
713                                    cancel_secret_a, nodeid_a),
714                                   (num_b, ownerid_b, renew_secret_b,
715                                    cancel_secret_b, nodeid_b) )
716
717     def test_leases(self):
718         ss = self.create("test_leases")
719         secret = 14
720         shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
721         s0 = shares[0]
722         WE = self.write_enabler("we1")
723         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
724         answer = s0.remote_testv_and_writev(WE,
725                                             [],
726                                             [(0, data),],
727                                             new_length=None)
728
729         # create a random non-numeric file in the bucket directory, to
730         # exercise the code that's supposed to ignore those.
731         bucket_dir = os.path.join(self.workdir("test_leases"),
732                                   "shares", idlib.b2a("si1"))
733         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
734         f.write("you ought to be ignoring me\n")
735         f.close()
736
737         # re-allocate the slots and use the same secrets, that should update
738         # the lease
739         shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
740
741         # renew it directly
742         ss.remote_renew_lease("si1", self.renew_secret(secret))
743
744         # now allocate them with a bunch of different secrets, to trigger the
745         # extended lease code
746         shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100)
747         shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100)
748         shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100)
749         shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100)
750         shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100)
751         # cancel one of them
752         ss.remote_cancel_lease("si1", self.cancel_secret(secret+5))
753
754         all_leases = s0.debug_get_leases()
755         self.failUnlessEqual(len(all_leases), 5)
756
757         # and write enough data to expand the container, forcing the server
758         # to move the leases
759         answer = s0.remote_testv_and_writev(WE,
760                                             [],
761                                             [(0, data),],
762                                             new_length=200)
763
764         # read back the leases, make sure they're still intact.
765         self.compare_leases_without_timestamps(all_leases,
766                                                s0.debug_get_leases())
767
768         ss.remote_renew_lease("si1", self.renew_secret(secret))
769         ss.remote_renew_lease("si1", self.renew_secret(secret+1))
770         ss.remote_renew_lease("si1", self.renew_secret(secret+2))
771         ss.remote_renew_lease("si1", self.renew_secret(secret+3))
772         ss.remote_renew_lease("si1", self.renew_secret(secret+4))
773         self.compare_leases_without_timestamps(all_leases,
774                                                s0.debug_get_leases())
775         # get a new copy of the leases, with the current timestamps. Reading
776         # data and failing to renew/cancel leases should leave the timestamps
777         # alone.
778         all_leases = s0.debug_get_leases()
779         # renewing with a bogus token should prompt an error message
780
781         # TODO: examine the exception thus raised, make sure the old nodeid
782         # is present, to provide for share migration
783         self.failUnlessRaises(IndexError,
784                               ss.remote_renew_lease, "si1",
785                               self.renew_secret(secret+20))
786         # same for cancelling
787         self.failUnlessRaises(IndexError,
788                               ss.remote_cancel_lease, "si1",
789                               self.cancel_secret(secret+20))
790         self.failUnlessEqual(all_leases, s0.debug_get_leases())
791         s0.remote_read(0, 200)
792         self.failUnlessEqual(all_leases, s0.debug_get_leases())
793
794         answer = s0.remote_testv_and_writev(WE,
795                                             [],
796                                             [(200, "make me bigger"),],
797                                             new_length=None)
798         self.compare_leases_without_timestamps(all_leases,
799                                                s0.debug_get_leases())
800
801         answer = s0.remote_testv_and_writev(WE,
802                                             [],
803                                             [(500, "make me really bigger"),],
804                                             new_length=None)
805         self.compare_leases_without_timestamps(all_leases,
806                                                s0.debug_get_leases())
807
808         # now cancel them all
809         ss.remote_cancel_lease("si1", self.cancel_secret(secret))
810         ss.remote_cancel_lease("si1", self.cancel_secret(secret+1))
811         ss.remote_cancel_lease("si1", self.cancel_secret(secret+2))
812         ss.remote_cancel_lease("si1", self.cancel_secret(secret+3))
813         # the slot should still be there
814         shares3 = ss.remote_get_mutable_slot("si1")
815         self.failUnlessEqual(len(shares3), 3)
816         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
817
818         ss.remote_cancel_lease("si1", self.cancel_secret(secret+4))
819         # now the slot should be gone
820         self.failUnlessEqual(ss.remote_get_mutable_slot("si1"), {})
821