]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
rename "secret" to "lease_secret" and change its size from 16 to 32 bytes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
8 import itertools
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil, idlib
11 from allmydata.storage import BucketWriter, BucketReader, \
12      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile
13 from allmydata.interfaces import BadWriteEnablerError
14
15 class Bucket(unittest.TestCase):
16     def make_workdir(self, name):
17         basedir = os.path.join("storage", "Bucket", name)
18         incoming = os.path.join(basedir, "tmp", "bucket")
19         final = os.path.join(basedir, "bucket")
20         fileutil.make_dirs(basedir)
21         fileutil.make_dirs(os.path.join(basedir, "tmp"))
22         return incoming, final
23
24     def bucket_writer_closed(self, bw, consumed):
25         pass
26
27     def make_lease(self):
28         owner_num = 0
29         renew_secret = os.urandom(32)
30         cancel_secret = os.urandom(32)
31         expiration_time = time.time() + 5000
32         return (owner_num, renew_secret, cancel_secret, expiration_time)
33
34     def test_create(self):
35         incoming, final = self.make_workdir("test_create")
36         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
37         bw.remote_write(0, "a"*25)
38         bw.remote_write(25, "b"*25)
39         bw.remote_write(50, "c"*25)
40         bw.remote_write(75, "d"*7)
41         bw.remote_close()
42
43     def test_readwrite(self):
44         incoming, final = self.make_workdir("test_readwrite")
45         bw = BucketWriter(self, incoming, final, 200, self.make_lease())
46         bw.remote_write(0, "a"*25)
47         bw.remote_write(25, "b"*25)
48         bw.remote_write(50, "c"*7) # last block may be short
49         bw.remote_close()
50
51         # now read from it
52         br = BucketReader(final)
53         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
54         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
55         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
56
57 class RemoteBucket:
58
59     def callRemote(self, methname, *args, **kwargs):
60         def _call():
61             meth = getattr(self.target, "remote_" + methname)
62             return meth(*args, **kwargs)
63         return defer.maybeDeferred(_call)
64
65 class BucketProxy(unittest.TestCase):
66     def make_bucket(self, name, size):
67         basedir = os.path.join("storage", "BucketProxy", name)
68         incoming = os.path.join(basedir, "tmp", "bucket")
69         final = os.path.join(basedir, "bucket")
70         fileutil.make_dirs(basedir)
71         fileutil.make_dirs(os.path.join(basedir, "tmp"))
72         bw = BucketWriter(self, incoming, final, size, self.make_lease())
73         rb = RemoteBucket()
74         rb.target = bw
75         return bw, rb, final
76
77     def make_lease(self):
78         owner_num = 0
79         renew_secret = os.urandom(32)
80         cancel_secret = os.urandom(32)
81         expiration_time = time.time() + 5000
82         return (owner_num, renew_secret, cancel_secret, expiration_time)
83
84     def bucket_writer_closed(self, bw, consumed):
85         pass
86
87     def test_create(self):
88         bw, rb, final = self.make_bucket("test_create", 500)
89         bp = WriteBucketProxy(rb,
90                               data_size=300,
91                               segment_size=10,
92                               num_segments=5,
93                               num_share_hashes=3,
94                               uri_extension_size=500)
95         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
96
97     def test_readwrite(self):
98         # Let's pretend each share has 100 bytes of data, and that there are
99         # 4 segments (25 bytes each), and 8 shares total. So the three
100         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
101         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
102         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
103         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
104         # long. That should make the whole share:
105         #
106         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
107
108         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
109                             for i in range(7)]
110         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
111                             for i in range(7)]
112         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
113                         for i in range(7)]
114         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
115                         for i in (1,9,13)]
116         uri_extension = "s" + "E"*498 + "e"
117
118         bw, rb, final = self.make_bucket("test_readwrite", 1414)
119         bp = WriteBucketProxy(rb,
120                               data_size=95,
121                               segment_size=25,
122                               num_segments=4,
123                               num_share_hashes=3,
124                               uri_extension_size=len(uri_extension))
125
126         d = bp.start()
127         d.addCallback(lambda res: bp.put_block(0, "a"*25))
128         d.addCallback(lambda res: bp.put_block(1, "b"*25))
129         d.addCallback(lambda res: bp.put_block(2, "c"*25))
130         d.addCallback(lambda res: bp.put_block(3, "d"*20))
131         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
132         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
133         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
134         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
135         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
136         d.addCallback(lambda res: bp.close())
137
138         # now read everything back
139         def _start_reading(res):
140             br = BucketReader(final)
141             rb = RemoteBucket()
142             rb.target = br
143             rbp = ReadBucketProxy(rb)
144             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
145
146             d1 = rbp.startIfNecessary()
147             d1.addCallback(lambda res: rbp.get_block(0))
148             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
149             d1.addCallback(lambda res: rbp.get_block(1))
150             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
151             d1.addCallback(lambda res: rbp.get_block(2))
152             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
153             d1.addCallback(lambda res: rbp.get_block(3))
154             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
155
156             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
157             d1.addCallback(lambda res:
158                            self.failUnlessEqual(res, plaintext_hashes))
159             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
160             d1.addCallback(lambda res:
161                            self.failUnlessEqual(res, crypttext_hashes))
162             d1.addCallback(lambda res: rbp.get_block_hashes())
163             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
164             d1.addCallback(lambda res: rbp.get_share_hashes())
165             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
166             d1.addCallback(lambda res: rbp.get_uri_extension())
167             d1.addCallback(lambda res:
168                            self.failUnlessEqual(res, uri_extension))
169
170             return d1
171
172         d.addCallback(_start_reading)
173
174         return d
175
176
177
178 class Server(unittest.TestCase):
179
180     def setUp(self):
181         self.sparent = service.MultiService()
182         self._lease_secret = itertools.count()
183     def tearDown(self):
184         return self.sparent.stopService()
185
186     def workdir(self, name):
187         basedir = os.path.join("storage", "Server", name)
188         return basedir
189
190     def create(self, name, sizelimit=None):
191         workdir = self.workdir(name)
192         ss = StorageServer(workdir, sizelimit)
193         ss.setServiceParent(self.sparent)
194         return ss
195
196     def test_create(self):
197         ss = self.create("test_create")
198
199     def allocate(self, ss, storage_index, sharenums, size):
200         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
201         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
202         return ss.remote_allocate_buckets(storage_index,
203                                           renew_secret, cancel_secret,
204                                           sharenums, size, Referenceable())
205
206     def test_remove_incoming(self):
207         ss = self.create("test_remove_incoming")
208         already, writers = self.allocate(ss, "vid", range(3), 10)
209         for i,wb in writers.items():
210             wb.remote_write(0, "%10d" % i)
211             wb.remote_close()
212         incomingdir = os.path.join(self.workdir("test_remove_incoming"),
213                                    "shares", "incoming")
214         leftover_dirs = os.listdir(incomingdir)
215         self.failUnlessEqual(leftover_dirs, [])
216
217     def test_allocate(self):
218         ss = self.create("test_allocate")
219
220         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
221
222         canary = Referenceable()
223         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
224         self.failUnlessEqual(already, set())
225         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
226
227         # while the buckets are open, they should not count as readable
228         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
229
230         for i,wb in writers.items():
231             wb.remote_write(0, "%25d" % i)
232             wb.remote_close()
233
234         # now they should be readable
235         b = ss.remote_get_buckets("vid")
236         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
237         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
238
239         # now if we about writing again, the server should offer those three
240         # buckets as already present. It should offer them even if we don't
241         # ask about those specific ones.
242         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
243         self.failUnlessEqual(already, set([0,1,2]))
244         self.failUnlessEqual(set(writers.keys()), set([3,4]))
245
246         # while those two buckets are open for writing, the server should
247         # tell new uploaders that they already exist (so that we don't try to
248         # upload into them a second time)
249
250         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
251         self.failUnlessEqual(already, set([0,1,2,3,4]))
252         self.failUnlessEqual(set(writers.keys()), set([5]))
253
254     def test_sizelimits(self):
255         ss = self.create("test_sizelimits", 5000)
256         canary = Referenceable()
257         # a newly created and filled share incurs this much overhead, beyond
258         # the size we request.
259         OVERHEAD = 3*4
260         LEASE_SIZE = 4+32+32+4
261
262         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
263         self.failUnlessEqual(len(writers), 3)
264         # now the StorageServer should have 3000 bytes provisionally
265         # allocated, allowing only 2000 more to be claimed
266         self.failUnlessEqual(len(ss._active_writers), 3)
267
268         # allocating 1001-byte shares only leaves room for one
269         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
270         self.failUnlessEqual(len(writers2), 1)
271         self.failUnlessEqual(len(ss._active_writers), 4)
272
273         # we abandon the first set, so their provisional allocation should be
274         # returned
275         del already
276         del writers
277         self.failUnlessEqual(len(ss._active_writers), 1)
278         # now we have a provisional allocation of 1001 bytes
279
280         # and we close the second set, so their provisional allocation should
281         # become real, long-term allocation, and grows to include the
282         # overhead.
283         for bw in writers2.values():
284             bw.remote_write(0, "a"*25)
285             bw.remote_close()
286         del already2
287         del writers2
288         del bw
289         self.failUnlessEqual(len(ss._active_writers), 0)
290
291         allocated = 1001 + OVERHEAD + LEASE_SIZE
292         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
293         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
294         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
295         self.failUnlessEqual(len(writers3), 39)
296         self.failUnlessEqual(len(ss._active_writers), 39)
297
298         del already3
299         del writers3
300         self.failUnlessEqual(len(ss._active_writers), 0)
301         ss.disownServiceParent()
302         del ss
303
304         # creating a new StorageServer in the same directory should see the
305         # same usage.
306
307         # metadata that goes into the share file is counted upon share close,
308         # as well as at startup. metadata that goes into other files will not
309         # be counted until the next startup, so if we were creating any
310         # extra-file metadata, the allocation would be more than 'allocated'
311         # and this test would need to be changed.
312         ss = self.create("test_sizelimits", 5000)
313         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
314         self.failUnlessEqual(len(writers4), 39)
315         self.failUnlessEqual(len(ss._active_writers), 39)
316
317     def test_seek(self):
318         basedir = self.workdir("test_seek_behavior")
319         fileutil.make_dirs(basedir)
320         filename = os.path.join(basedir, "testfile")
321         f = open(filename, "wb")
322         f.write("start")
323         f.close()
324         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
325         # files. mode="a" preserves previous contents but does not allow
326         # seeking-to-create-holes. mode="r+" allows both.
327         f = open(filename, "rb+")
328         f.seek(100)
329         f.write("100")
330         f.close()
331         filelen = os.stat(filename)[stat.ST_SIZE]
332         self.failUnlessEqual(filelen, 100+3)
333         f2 = open(filename, "rb")
334         self.failUnlessEqual(f2.read(5), "start")
335
336
337     def test_leases(self):
338         ss = self.create("test_leases")
339         canary = Referenceable()
340         sharenums = range(5)
341         size = 100
342
343         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
344                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
345         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
346                                                      sharenums, size, canary)
347         self.failUnlessEqual(len(already), 0)
348         self.failUnlessEqual(len(writers), 5)
349         for wb in writers.values():
350             wb.remote_close()
351
352         leases = list(ss.get_leases("si0"))
353         self.failUnlessEqual(len(leases), 1)
354         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
355
356         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
357                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
358         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
359                                                      sharenums, size, canary)
360         for wb in writers.values():
361             wb.remote_close()
362
363         # take out a second lease on si1
364         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
365                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
366         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
367                                                      sharenums, size, canary)
368         self.failUnlessEqual(len(already), 5)
369         self.failUnlessEqual(len(writers), 0)
370
371         leases = list(ss.get_leases("si1"))
372         self.failUnlessEqual(len(leases), 2)
373         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
374
375         # check that si0 is readable
376         readers = ss.remote_get_buckets("si0")
377         self.failUnlessEqual(len(readers), 5)
378
379         # renew the first lease. Only the proper renew_secret should work
380         ss.remote_renew_lease("si0", rs0)
381         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
382         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
383
384         # check that si0 is still readable
385         readers = ss.remote_get_buckets("si0")
386         self.failUnlessEqual(len(readers), 5)
387
388         # now cancel it
389         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
390         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
391         ss.remote_cancel_lease("si0", cs0)
392
393         # si0 should now be gone
394         readers = ss.remote_get_buckets("si0")
395         self.failUnlessEqual(len(readers), 0)
396         # and the renew should no longer work
397         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
398
399
400         # cancel the first lease on si1, leaving the second in place
401         ss.remote_cancel_lease("si1", cs1)
402         readers = ss.remote_get_buckets("si1")
403         self.failUnlessEqual(len(readers), 5)
404         # the corresponding renew should no longer work
405         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
406
407         leases = list(ss.get_leases("si1"))
408         self.failUnlessEqual(len(leases), 1)
409         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
410
411         ss.remote_renew_lease("si1", rs2)
412         # cancelling the second should make it go away
413         ss.remote_cancel_lease("si1", cs2)
414         readers = ss.remote_get_buckets("si1")
415         self.failUnlessEqual(len(readers), 0)
416         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
417         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
418
419         leases = list(ss.get_leases("si1"))
420         self.failUnlessEqual(len(leases), 0)
421
422
423         # test overlapping uploads
424         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
425                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
426         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
427                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
428         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
429                                                      sharenums, size, canary)
430         self.failUnlessEqual(len(already), 0)
431         self.failUnlessEqual(len(writers), 5)
432         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
433                                                        sharenums, size, canary)
434         self.failUnlessEqual(len(already2), 5)
435         self.failUnlessEqual(len(writers2), 0)
436         for wb in writers.values():
437             wb.remote_close()
438
439         leases = list(ss.get_leases("si3"))
440         self.failUnlessEqual(len(leases), 2)
441
442
443
444 class MutableServer(unittest.TestCase):
445
446     def setUp(self):
447         self.sparent = service.MultiService()
448         self._lease_secret = itertools.count()
449     def tearDown(self):
450         return self.sparent.stopService()
451
452     def workdir(self, name):
453         basedir = os.path.join("storage", "MutableServer", name)
454         return basedir
455
456     def create(self, name, sizelimit=None):
457         workdir = self.workdir(name)
458         ss = StorageServer(workdir, sizelimit)
459         ss.setServiceParent(self.sparent)
460         ss.setNodeID("\x00" * 32)
461         return ss
462
463     def test_create(self):
464         ss = self.create("test_create")
465
466     def write_enabler(self, we_tag):
467         return hashutil.tagged_hash("we_blah", we_tag)
468
469     def renew_secret(self, tag):
470         return hashutil.tagged_hash("renew_blah", str(tag))
471
472     def cancel_secret(self, tag):
473         return hashutil.tagged_hash("cancel_blah", str(tag))
474
475     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
476         write_enabler = self.write_enabler(we_tag)
477         renew_secret = self.renew_secret(lease_tag)
478         cancel_secret = self.cancel_secret(lease_tag)
479         rstaraw = ss.remote_slot_testv_and_readv_and_writev
480         testandwritev = dict( [ (shnum, ([], [], None) )
481                          for shnum in sharenums ] )
482         readv = []
483         rc = rstaraw(storage_index,
484                      (write_enabler, renew_secret, cancel_secret),
485                      testandwritev,
486                      readv)
487         (did_write, readv_data) = rc
488         self.failUnless(did_write)
489         self.failUnless(isinstance(readv_data, dict))
490         self.failUnlessEqual(len(readv_data), 0)
491
492     def test_allocate(self):
493         ss = self.create("test_allocate")
494         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
495                                set([0,1,2]), 100)
496
497         read = ss.remote_slot_readv
498         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
499                              {0: [""]})
500         self.failUnlessEqual(read("si1", [], [(0, 10)]),
501                              {0: [""], 1: [""], 2: [""]})
502         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
503                              {0: [""]})
504
505         # try writing to one
506         secrets = ( self.write_enabler("we1"),
507                     self.renew_secret("we1"),
508                     self.cancel_secret("we1") )
509         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
510         write = ss.remote_slot_testv_and_readv_and_writev
511         answer = write("si1", secrets,
512                        {0: ([], [(0,data)], None)},
513                        [])
514         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
515
516         self.failUnlessEqual(read("si1", [0], [(0,20)]),
517                              {0: ["00000000001111111111"]})
518         self.failUnlessEqual(read("si1", [0], [(95,10)]),
519                              {0: ["99999"]})
520         #self.failUnlessEqual(s0.remote_get_length(), 100)
521
522         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
523         self.failUnlessRaises(BadWriteEnablerError,
524                               write, "si1", bad_secrets,
525                               {}, [])
526
527         # this testv should fail
528         answer = write("si1", secrets,
529                        {0: ([(0, 12, "eq", "444444444444"),
530                              (20, 5, "eq", "22222"),
531                              ],
532                             [(0, "x"*100)],
533                             None),
534                         },
535                        [(0,12), (20,5)],
536                        )
537         self.failUnlessEqual(answer, (False,
538                                       {0: ["000000000011", "22222"],
539                                        1: ["", ""],
540                                        2: ["", ""],
541                                        }))
542         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
543
544         # as should this one
545         answer = write("si1", secrets,
546                        {0: ([(10, 5, "lt", "11111"),
547                              ],
548                             [(0, "x"*100)],
549                             None),
550                         },
551                        [(10,5)],
552                        )
553         self.failUnlessEqual(answer, (False,
554                                       {0: ["11111"],
555                                        1: [""],
556                                        2: [""]},
557                                       ))
558         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
559
560
561     def test_operators(self):
562         # test operators, the data we're comparing is '11111' in all cases.
563         # test both fail+pass, reset data after each one.
564         ss = self.create("test_operators")
565
566         secrets = ( self.write_enabler("we1"),
567                     self.renew_secret("we1"),
568                     self.cancel_secret("we1") )
569         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
570         write = ss.remote_slot_testv_and_readv_and_writev
571         read = ss.remote_slot_readv
572
573         def reset():
574             write("si1", secrets,
575                   {0: ([], [(0,data)], None)},
576                   [])
577
578         reset()
579
580         #  lt
581         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
582                                              ],
583                                             [(0, "x"*100)],
584                                             None,
585                                             )}, [(10,5)])
586         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
587         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
588         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
589         reset()
590
591         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
592                                              ],
593                                             [(0, "x"*100)],
594                                             None,
595                                             )}, [(10,5)])
596         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
597         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
598         reset()
599
600         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
601                                              ],
602                                             [(0, "y"*100)],
603                                             None,
604                                             )}, [(10,5)])
605         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
606         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
607         reset()
608
609         #  le
610         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
611                                              ],
612                                             [(0, "x"*100)],
613                                             None,
614                                             )}, [(10,5)])
615         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
616         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
617         reset()
618
619         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
620                                              ],
621                                             [(0, "y"*100)],
622                                             None,
623                                             )}, [(10,5)])
624         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
625         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
626         reset()
627
628         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
629                                              ],
630                                             [(0, "y"*100)],
631                                             None,
632                                             )}, [(10,5)])
633         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
634         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
635         reset()
636
637         #  eq
638         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
639                                              ],
640                                             [(0, "x"*100)],
641                                             None,
642                                             )}, [(10,5)])
643         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
644         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
645         reset()
646
647         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
648                                              ],
649                                             [(0, "y"*100)],
650                                             None,
651                                             )}, [(10,5)])
652         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
653         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
654         reset()
655
656         #  ne
657         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
658                                              ],
659                                             [(0, "x"*100)],
660                                             None,
661                                             )}, [(10,5)])
662         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
663         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
664         reset()
665
666         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
667                                              ],
668                                             [(0, "y"*100)],
669                                             None,
670                                             )}, [(10,5)])
671         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
672         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
673         reset()
674
675         #  ge
676         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
677                                              ],
678                                             [(0, "y"*100)],
679                                             None,
680                                             )}, [(10,5)])
681         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
682         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
683         reset()
684
685         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
686                                              ],
687                                             [(0, "y"*100)],
688                                             None,
689                                             )}, [(10,5)])
690         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
691         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
692         reset()
693
694         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
695                                              ],
696                                             [(0, "y"*100)],
697                                             None,
698                                             )}, [(10,5)])
699         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
700         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
701         reset()
702
703         #  gt
704         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
705                                              ],
706                                             [(0, "y"*100)],
707                                             None,
708                                             )}, [(10,5)])
709         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
710         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
711         reset()
712
713         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
714                                              ],
715                                             [(0, "x"*100)],
716                                             None,
717                                             )}, [(10,5)])
718         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
719         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
720         reset()
721
722         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
723                                              ],
724                                             [(0, "x"*100)],
725                                             None,
726                                             )}, [(10,5)])
727         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
728         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
729         reset()
730
731     def test_readv(self):
732         ss = self.create("test_readv")
733         secrets = ( self.write_enabler("we1"),
734                     self.renew_secret("we1"),
735                     self.cancel_secret("we1") )
736         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
737         write = ss.remote_slot_testv_and_readv_and_writev
738         read = ss.remote_slot_readv
739         data = [("%d" % i) * 100 for i in range(3)]
740         rc = write("si1", secrets,
741                    {0: ([], [(0,data[0])], None),
742                     1: ([], [(0,data[1])], None),
743                     2: ([], [(0,data[2])], None),
744                     }, [])
745         self.failUnlessEqual(rc, (True, {}))
746
747         answer = read("si1", [], [(0, 10)])
748         self.failUnlessEqual(answer, {0: ["0"*10],
749                                       1: ["1"*10],
750                                       2: ["2"*10]})
751
752     def compare_leases_without_timestamps(self, a, b):
753         self.failUnlessEqual(len(a), len(b))
754         for i in range(len(a)):
755             (num_a, (ownerid_a, expiration_time_a,
756                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
757             (num_b, (ownerid_b, expiration_time_b,
758                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
759             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
760                                    cancel_secret_a, nodeid_a),
761                                   (num_b, ownerid_b, renew_secret_b,
762                                    cancel_secret_b, nodeid_b) )
763
764     def test_leases(self):
765         ss = self.create("test_leases")
766         def secrets(n):
767             return ( self.write_enabler("we1"),
768                      self.renew_secret("we1-%d" % n),
769                      self.cancel_secret("we1-%d" % n) )
770         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
771         write = ss.remote_slot_testv_and_readv_and_writev
772         read = ss.remote_slot_readv
773         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
774         self.failUnlessEqual(rc, (True, {}))
775
776         # create a random non-numeric file in the bucket directory, to
777         # exercise the code that's supposed to ignore those.
778         bucket_dir = os.path.join(self.workdir("test_leases"),
779                                   "shares", idlib.b2a("si1"))
780         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
781         f.write("you ought to be ignoring me\n")
782         f.close()
783
784         # re-allocate the slots and use the same secrets, that should update
785         # the lease
786         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
787
788         # renew it directly
789         ss.remote_renew_lease("si1", secrets(0)[1])
790
791         # now allocate them with a bunch of different secrets, to trigger the
792         # extended lease code
793         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
794         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
795         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
796         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
797         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
798
799         # cancel one of them
800         ss.remote_cancel_lease("si1", secrets(5)[2])
801
802         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
803         all_leases = s0.debug_get_leases()
804         self.failUnlessEqual(len(all_leases), 5)
805
806         # and write enough data to expand the container, forcing the server
807         # to move the leases
808         write("si1", secrets(0),
809               {0: ([], [(0,data)], 200), },
810               [])
811
812         # read back the leases, make sure they're still intact.
813         self.compare_leases_without_timestamps(all_leases,
814                                                s0.debug_get_leases())
815
816         ss.remote_renew_lease("si1", secrets(0)[1])
817         ss.remote_renew_lease("si1", secrets(1)[1])
818         ss.remote_renew_lease("si1", secrets(2)[1])
819         ss.remote_renew_lease("si1", secrets(3)[1])
820         ss.remote_renew_lease("si1", secrets(4)[1])
821         self.compare_leases_without_timestamps(all_leases,
822                                                s0.debug_get_leases())
823         # get a new copy of the leases, with the current timestamps. Reading
824         # data and failing to renew/cancel leases should leave the timestamps
825         # alone.
826         all_leases = s0.debug_get_leases()
827         # renewing with a bogus token should prompt an error message
828
829         # TODO: examine the exception thus raised, make sure the old nodeid
830         # is present, to provide for share migration
831         self.failUnlessRaises(IndexError,
832                               ss.remote_renew_lease, "si1",
833                               secrets(20)[1])
834         # same for cancelling
835         self.failUnlessRaises(IndexError,
836                               ss.remote_cancel_lease, "si1",
837                               secrets(20)[2])
838         self.failUnlessEqual(all_leases, s0.debug_get_leases())
839
840         # reading shares should not modify the timestamp
841         read("si1", [], [(0,200)])
842         self.failUnlessEqual(all_leases, s0.debug_get_leases())
843
844         write("si1", secrets(0),
845               {0: ([], [(200, "make me bigger")], None)}, [])
846         self.compare_leases_without_timestamps(all_leases,
847                                                s0.debug_get_leases())
848
849         write("si1", secrets(0),
850               {0: ([], [(500, "make me really bigger")], None)}, [])
851         self.compare_leases_without_timestamps(all_leases,
852                                                s0.debug_get_leases())
853
854         # now cancel them all
855         ss.remote_cancel_lease("si1", secrets(0)[2])
856         ss.remote_cancel_lease("si1", secrets(1)[2])
857         ss.remote_cancel_lease("si1", secrets(2)[2])
858         ss.remote_cancel_lease("si1", secrets(3)[2])
859
860         # the slot should still be there
861         remaining_shares = read("si1", [], [(0,10)])
862         self.failUnlessEqual(len(remaining_shares), 1)
863         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
864
865         ss.remote_cancel_lease("si1", secrets(4)[2])
866         # now the slot should be gone
867         no_shares = read("si1", [], [(0,10)])
868         self.failUnlessEqual(no_shares, {})
869