]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: ignore shares in incoming/, to make clients use other servers during simulta...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11      storage_index_to_dir
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
14
15 class FakeCanary:
16     def notifyOnDisconnect(self, *args, **kwargs):
17         pass
18     def dontNotifyOnDisconnect(self, marker):
19         pass
20
21 class Bucket(unittest.TestCase):
22     def make_workdir(self, name):
23         basedir = os.path.join("storage", "Bucket", name)
24         incoming = os.path.join(basedir, "tmp", "bucket")
25         final = os.path.join(basedir, "bucket")
26         fileutil.make_dirs(basedir)
27         fileutil.make_dirs(os.path.join(basedir, "tmp"))
28         return incoming, final
29
30     def bucket_writer_closed(self, bw, consumed):
31         pass
32
33     def make_lease(self):
34         owner_num = 0
35         renew_secret = os.urandom(32)
36         cancel_secret = os.urandom(32)
37         expiration_time = time.time() + 5000
38         return (owner_num, renew_secret, cancel_secret, expiration_time)
39
40     def test_create(self):
41         incoming, final = self.make_workdir("test_create")
42         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
43                           FakeCanary())
44         bw.remote_write(0, "a"*25)
45         bw.remote_write(25, "b"*25)
46         bw.remote_write(50, "c"*25)
47         bw.remote_write(75, "d"*7)
48         bw.remote_close()
49
50     def test_readwrite(self):
51         incoming, final = self.make_workdir("test_readwrite")
52         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
53                           FakeCanary())
54         bw.remote_write(0, "a"*25)
55         bw.remote_write(25, "b"*25)
56         bw.remote_write(50, "c"*7) # last block may be short
57         bw.remote_close()
58
59         # now read from it
60         br = BucketReader(bw.finalhome)
61         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
62         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
63         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
64
65 class RemoteBucket:
66
67     def callRemote(self, methname, *args, **kwargs):
68         def _call():
69             meth = getattr(self.target, "remote_" + methname)
70             return meth(*args, **kwargs)
71         return defer.maybeDeferred(_call)
72
73 class BucketProxy(unittest.TestCase):
74     def make_bucket(self, name, size):
75         basedir = os.path.join("storage", "BucketProxy", name)
76         incoming = os.path.join(basedir, "tmp", "bucket")
77         final = os.path.join(basedir, "bucket")
78         fileutil.make_dirs(basedir)
79         fileutil.make_dirs(os.path.join(basedir, "tmp"))
80         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
81                           FakeCanary())
82         rb = RemoteBucket()
83         rb.target = bw
84         return bw, rb, final
85
86     def make_lease(self):
87         owner_num = 0
88         renew_secret = os.urandom(32)
89         cancel_secret = os.urandom(32)
90         expiration_time = time.time() + 5000
91         return (owner_num, renew_secret, cancel_secret, expiration_time)
92
93     def bucket_writer_closed(self, bw, consumed):
94         pass
95
96     def test_create(self):
97         bw, rb, sharefname = self.make_bucket("test_create", 500)
98         bp = WriteBucketProxy(rb,
99                               data_size=300,
100                               segment_size=10,
101                               num_segments=5,
102                               num_share_hashes=3,
103                               uri_extension_size=500, nodeid=None)
104         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
105
106     def test_readwrite(self):
107         # Let's pretend each share has 100 bytes of data, and that there are
108         # 4 segments (25 bytes each), and 8 shares total. So the three
109         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
110         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
111         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
112         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
113         # long. That should make the whole share:
114         #
115         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
116
117         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
118                             for i in range(7)]
119         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
120                             for i in range(7)]
121         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
122                         for i in range(7)]
123         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
124                         for i in (1,9,13)]
125         uri_extension = "s" + "E"*498 + "e"
126
127         bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
128         bp = WriteBucketProxy(rb,
129                               data_size=95,
130                               segment_size=25,
131                               num_segments=4,
132                               num_share_hashes=3,
133                               uri_extension_size=len(uri_extension),
134                               nodeid=None)
135
136         d = bp.start()
137         d.addCallback(lambda res: bp.put_block(0, "a"*25))
138         d.addCallback(lambda res: bp.put_block(1, "b"*25))
139         d.addCallback(lambda res: bp.put_block(2, "c"*25))
140         d.addCallback(lambda res: bp.put_block(3, "d"*20))
141         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
142         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
143         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
144         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
145         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
146         d.addCallback(lambda res: bp.close())
147
148         # now read everything back
149         def _start_reading(res):
150             br = BucketReader(sharefname)
151             rb = RemoteBucket()
152             rb.target = br
153             rbp = ReadBucketProxy(rb)
154             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
155
156             d1 = rbp.startIfNecessary()
157             d1.addCallback(lambda res: rbp.get_block(0))
158             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
159             d1.addCallback(lambda res: rbp.get_block(1))
160             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
161             d1.addCallback(lambda res: rbp.get_block(2))
162             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
163             d1.addCallback(lambda res: rbp.get_block(3))
164             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
165
166             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
167             d1.addCallback(lambda res:
168                            self.failUnlessEqual(res, plaintext_hashes))
169             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
170             d1.addCallback(lambda res:
171                            self.failUnlessEqual(res, crypttext_hashes))
172             d1.addCallback(lambda res: rbp.get_block_hashes())
173             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
174             d1.addCallback(lambda res: rbp.get_share_hashes())
175             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
176             d1.addCallback(lambda res: rbp.get_uri_extension())
177             d1.addCallback(lambda res:
178                            self.failUnlessEqual(res, uri_extension))
179
180             return d1
181
182         d.addCallback(_start_reading)
183
184         return d
185
186
187
188 class Server(unittest.TestCase):
189
190     def setUp(self):
191         self.sparent = LoggingServiceParent()
192         self._lease_secret = itertools.count()
193     def tearDown(self):
194         return self.sparent.stopService()
195
196     def workdir(self, name):
197         basedir = os.path.join("storage", "Server", name)
198         return basedir
199
200     def create(self, name, sizelimit=None):
201         workdir = self.workdir(name)
202         ss = StorageServer(workdir, sizelimit)
203         ss.setServiceParent(self.sparent)
204         return ss
205
206     def test_create(self):
207         ss = self.create("test_create")
208
209     def allocate(self, ss, storage_index, sharenums, size):
210         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
211         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
212         return ss.remote_allocate_buckets(storage_index,
213                                           renew_secret, cancel_secret,
214                                           sharenums, size, FakeCanary())
215
216     def test_dont_overfill_dirs(self):
217         """
218         This test asserts that if you add a second share whose storage index
219         share lots of leading bits with an extant share (but isn't the exact
220         same storage index), this won't add an entry to the share directory.
221         """
222         ss = self.create("test_dont_overfill_dirs")
223         already, writers = self.allocate(ss, "storageindex", [0], 10)
224         for i, wb in writers.items():
225             wb.remote_write(0, "%10d" % i)
226             wb.remote_close()
227         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
228                                 "shares")
229         children_of_storedir = set(os.listdir(storedir))
230
231         # Now store another one under another storageindex that has leading
232         # chars the same as the first storageindex.
233         already, writers = self.allocate(ss, "storageindey", [0], 10)
234         for i, wb in writers.items():
235             wb.remote_write(0, "%10d" % i)
236             wb.remote_close()
237         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
238                                 "shares")
239         new_children_of_storedir = set(os.listdir(storedir))
240         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
241
242     def test_remove_incoming(self):
243         ss = self.create("test_remove_incoming")
244         already, writers = self.allocate(ss, "vid", range(3), 10)
245         for i,wb in writers.items():
246             wb.remote_write(0, "%10d" % i)
247             wb.remote_close()
248         incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
249         self.failIf(os.path.exists(incomingdir))
250
251     def test_allocate(self):
252         ss = self.create("test_allocate")
253
254         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
255
256         canary = FakeCanary()
257         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
258         self.failUnlessEqual(already, set())
259         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
260
261         # while the buckets are open, they should not count as readable
262         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
263
264         # close the buckets
265         for i,wb in writers.items():
266             wb.remote_write(0, "%25d" % i)
267             wb.remote_close()
268
269         # now they should be readable
270         b = ss.remote_get_buckets("vid")
271         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
272         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
273
274         # now if we about writing again, the server should offer those three
275         # buckets as already present. It should offer them even if we don't
276         # ask about those specific ones.
277         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
278         self.failUnlessEqual(already, set([0,1,2]))
279         self.failUnlessEqual(set(writers.keys()), set([3,4]))
280
281         # while those two buckets are open for writing, the server should
282         # refuse to offer them to uploaders
283
284         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
285         self.failUnlessEqual(already, set([0,1,2]))
286         self.failUnlessEqual(set(writers.keys()), set([5]))
287
288     def test_sizelimits(self):
289         ss = self.create("test_sizelimits", 5000)
290         canary = FakeCanary()
291         # a newly created and filled share incurs this much overhead, beyond
292         # the size we request.
293         OVERHEAD = 3*4
294         LEASE_SIZE = 4+32+32+4
295
296         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
297         self.failUnlessEqual(len(writers), 3)
298         # now the StorageServer should have 3000 bytes provisionally
299         # allocated, allowing only 2000 more to be claimed
300         self.failUnlessEqual(len(ss._active_writers), 3)
301
302         # allocating 1001-byte shares only leaves room for one
303         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
304         self.failUnlessEqual(len(writers2), 1)
305         self.failUnlessEqual(len(ss._active_writers), 4)
306
307         # we abandon the first set, so their provisional allocation should be
308         # returned
309         del already
310         del writers
311         self.failUnlessEqual(len(ss._active_writers), 1)
312         # now we have a provisional allocation of 1001 bytes
313
314         # and we close the second set, so their provisional allocation should
315         # become real, long-term allocation, and grows to include the
316         # overhead.
317         for bw in writers2.values():
318             bw.remote_write(0, "a"*25)
319             bw.remote_close()
320         del already2
321         del writers2
322         del bw
323         self.failUnlessEqual(len(ss._active_writers), 0)
324
325         allocated = 1001 + OVERHEAD + LEASE_SIZE
326         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
327         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
328         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
329         self.failUnlessEqual(len(writers3), 39)
330         self.failUnlessEqual(len(ss._active_writers), 39)
331
332         del already3
333         del writers3
334         self.failUnlessEqual(len(ss._active_writers), 0)
335         ss.disownServiceParent()
336         del ss
337
338         # creating a new StorageServer in the same directory should see the
339         # same usage.
340
341         # metadata that goes into the share file is counted upon share close,
342         # as well as at startup. metadata that goes into other files will not
343         # be counted until the next startup, so if we were creating any
344         # extra-file metadata, the allocation would be more than 'allocated'
345         # and this test would need to be changed.
346         ss = self.create("test_sizelimits", 5000)
347         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
348         self.failUnlessEqual(len(writers4), 39)
349         self.failUnlessEqual(len(ss._active_writers), 39)
350
351     def test_seek(self):
352         basedir = self.workdir("test_seek_behavior")
353         fileutil.make_dirs(basedir)
354         filename = os.path.join(basedir, "testfile")
355         f = open(filename, "wb")
356         f.write("start")
357         f.close()
358         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
359         # files. mode="a" preserves previous contents but does not allow
360         # seeking-to-create-holes. mode="r+" allows both.
361         f = open(filename, "rb+")
362         f.seek(100)
363         f.write("100")
364         f.close()
365         filelen = os.stat(filename)[stat.ST_SIZE]
366         self.failUnlessEqual(filelen, 100+3)
367         f2 = open(filename, "rb")
368         self.failUnlessEqual(f2.read(5), "start")
369
370
371     def test_leases(self):
372         ss = self.create("test_leases")
373         canary = FakeCanary()
374         sharenums = range(5)
375         size = 100
376
377         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
378                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
379         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
380                                                      sharenums, size, canary)
381         self.failUnlessEqual(len(already), 0)
382         self.failUnlessEqual(len(writers), 5)
383         for wb in writers.values():
384             wb.remote_close()
385
386         leases = list(ss.get_leases("si0"))
387         self.failUnlessEqual(len(leases), 1)
388         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
389
390         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
391                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
392         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
393                                                      sharenums, size, canary)
394         for wb in writers.values():
395             wb.remote_close()
396
397         # take out a second lease on si1
398         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
399                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
400         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
401                                                      sharenums, size, canary)
402         self.failUnlessEqual(len(already), 5)
403         self.failUnlessEqual(len(writers), 0)
404
405         leases = list(ss.get_leases("si1"))
406         self.failUnlessEqual(len(leases), 2)
407         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
408
409         # check that si0 is readable
410         readers = ss.remote_get_buckets("si0")
411         self.failUnlessEqual(len(readers), 5)
412
413         # renew the first lease. Only the proper renew_secret should work
414         ss.remote_renew_lease("si0", rs0)
415         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
416         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
417
418         # check that si0 is still readable
419         readers = ss.remote_get_buckets("si0")
420         self.failUnlessEqual(len(readers), 5)
421
422         # now cancel it
423         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
424         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
425         ss.remote_cancel_lease("si0", cs0)
426
427         # si0 should now be gone
428         readers = ss.remote_get_buckets("si0")
429         self.failUnlessEqual(len(readers), 0)
430         # and the renew should no longer work
431         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
432
433
434         # cancel the first lease on si1, leaving the second in place
435         ss.remote_cancel_lease("si1", cs1)
436         readers = ss.remote_get_buckets("si1")
437         self.failUnlessEqual(len(readers), 5)
438         # the corresponding renew should no longer work
439         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
440
441         leases = list(ss.get_leases("si1"))
442         self.failUnlessEqual(len(leases), 1)
443         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
444
445         ss.remote_renew_lease("si1", rs2)
446         # cancelling the second should make it go away
447         ss.remote_cancel_lease("si1", cs2)
448         readers = ss.remote_get_buckets("si1")
449         self.failUnlessEqual(len(readers), 0)
450         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
451         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
452
453         leases = list(ss.get_leases("si1"))
454         self.failUnlessEqual(len(leases), 0)
455
456
457         # test overlapping uploads
458         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
459                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
460         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
461                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
462         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
463                                                      sharenums, size, canary)
464         self.failUnlessEqual(len(already), 0)
465         self.failUnlessEqual(len(writers), 5)
466         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
467                                                        sharenums, size, canary)
468         self.failUnlessEqual(len(already2), 0)
469         self.failUnlessEqual(len(writers2), 0)
470         for wb in writers.values():
471             wb.remote_close()
472
473         leases = list(ss.get_leases("si3"))
474         self.failUnlessEqual(len(leases), 1)
475
476         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
477                                                        sharenums, size, canary)
478         self.failUnlessEqual(len(already3), 5)
479         self.failUnlessEqual(len(writers3), 0)
480
481         leases = list(ss.get_leases("si3"))
482         self.failUnlessEqual(len(leases), 2)
483
484
485
486 class MutableServer(unittest.TestCase):
487
488     def setUp(self):
489         self.sparent = LoggingServiceParent()
490         self._lease_secret = itertools.count()
491     def tearDown(self):
492         return self.sparent.stopService()
493
494     def workdir(self, name):
495         basedir = os.path.join("storage", "MutableServer", name)
496         return basedir
497
498     def create(self, name, sizelimit=None):
499         workdir = self.workdir(name)
500         ss = StorageServer(workdir, sizelimit)
501         ss.setServiceParent(self.sparent)
502         ss.setNodeID("\x00" * 32)
503         return ss
504
505     def test_create(self):
506         ss = self.create("test_create")
507
508     def write_enabler(self, we_tag):
509         return hashutil.tagged_hash("we_blah", we_tag)
510
511     def renew_secret(self, tag):
512         return hashutil.tagged_hash("renew_blah", str(tag))
513
514     def cancel_secret(self, tag):
515         return hashutil.tagged_hash("cancel_blah", str(tag))
516
517     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
518         write_enabler = self.write_enabler(we_tag)
519         renew_secret = self.renew_secret(lease_tag)
520         cancel_secret = self.cancel_secret(lease_tag)
521         rstaraw = ss.remote_slot_testv_and_readv_and_writev
522         testandwritev = dict( [ (shnum, ([], [], None) )
523                          for shnum in sharenums ] )
524         readv = []
525         rc = rstaraw(storage_index,
526                      (write_enabler, renew_secret, cancel_secret),
527                      testandwritev,
528                      readv)
529         (did_write, readv_data) = rc
530         self.failUnless(did_write)
531         self.failUnless(isinstance(readv_data, dict))
532         self.failUnlessEqual(len(readv_data), 0)
533
534     def test_allocate(self):
535         ss = self.create("test_allocate")
536         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
537                                set([0,1,2]), 100)
538
539         read = ss.remote_slot_readv
540         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
541                              {0: [""]})
542         self.failUnlessEqual(read("si1", [], [(0, 10)]),
543                              {0: [""], 1: [""], 2: [""]})
544         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
545                              {0: [""]})
546
547         # try writing to one
548         secrets = ( self.write_enabler("we1"),
549                     self.renew_secret("we1"),
550                     self.cancel_secret("we1") )
551         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
552         write = ss.remote_slot_testv_and_readv_and_writev
553         answer = write("si1", secrets,
554                        {0: ([], [(0,data)], None)},
555                        [])
556         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
557
558         self.failUnlessEqual(read("si1", [0], [(0,20)]),
559                              {0: ["00000000001111111111"]})
560         self.failUnlessEqual(read("si1", [0], [(95,10)]),
561                              {0: ["99999"]})
562         #self.failUnlessEqual(s0.remote_get_length(), 100)
563
564         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
565         f = self.failUnlessRaises(BadWriteEnablerError,
566                                   write, "si1", bad_secrets,
567                                   {}, [])
568         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
569
570         # this testv should fail
571         answer = write("si1", secrets,
572                        {0: ([(0, 12, "eq", "444444444444"),
573                              (20, 5, "eq", "22222"),
574                              ],
575                             [(0, "x"*100)],
576                             None),
577                         },
578                        [(0,12), (20,5)],
579                        )
580         self.failUnlessEqual(answer, (False,
581                                       {0: ["000000000011", "22222"],
582                                        1: ["", ""],
583                                        2: ["", ""],
584                                        }))
585         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
586
587         # as should this one
588         answer = write("si1", secrets,
589                        {0: ([(10, 5, "lt", "11111"),
590                              ],
591                             [(0, "x"*100)],
592                             None),
593                         },
594                        [(10,5)],
595                        )
596         self.failUnlessEqual(answer, (False,
597                                       {0: ["11111"],
598                                        1: [""],
599                                        2: [""]},
600                                       ))
601         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
602
603
604     def test_operators(self):
605         # test operators, the data we're comparing is '11111' in all cases.
606         # test both fail+pass, reset data after each one.
607         ss = self.create("test_operators")
608
609         secrets = ( self.write_enabler("we1"),
610                     self.renew_secret("we1"),
611                     self.cancel_secret("we1") )
612         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
613         write = ss.remote_slot_testv_and_readv_and_writev
614         read = ss.remote_slot_readv
615
616         def reset():
617             write("si1", secrets,
618                   {0: ([], [(0,data)], None)},
619                   [])
620
621         reset()
622
623         #  lt
624         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
625                                              ],
626                                             [(0, "x"*100)],
627                                             None,
628                                             )}, [(10,5)])
629         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
630         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
631         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
632         reset()
633
634         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
635                                              ],
636                                             [(0, "x"*100)],
637                                             None,
638                                             )}, [(10,5)])
639         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
640         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
641         reset()
642
643         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
644                                              ],
645                                             [(0, "y"*100)],
646                                             None,
647                                             )}, [(10,5)])
648         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
649         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
650         reset()
651
652         #  le
653         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
654                                              ],
655                                             [(0, "x"*100)],
656                                             None,
657                                             )}, [(10,5)])
658         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
659         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
660         reset()
661
662         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
663                                              ],
664                                             [(0, "y"*100)],
665                                             None,
666                                             )}, [(10,5)])
667         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
668         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
669         reset()
670
671         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
672                                              ],
673                                             [(0, "y"*100)],
674                                             None,
675                                             )}, [(10,5)])
676         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
677         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
678         reset()
679
680         #  eq
681         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
682                                              ],
683                                             [(0, "x"*100)],
684                                             None,
685                                             )}, [(10,5)])
686         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
687         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
688         reset()
689
690         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
691                                              ],
692                                             [(0, "y"*100)],
693                                             None,
694                                             )}, [(10,5)])
695         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
696         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
697         reset()
698
699         #  ne
700         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
701                                              ],
702                                             [(0, "x"*100)],
703                                             None,
704                                             )}, [(10,5)])
705         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
706         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
707         reset()
708
709         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
710                                              ],
711                                             [(0, "y"*100)],
712                                             None,
713                                             )}, [(10,5)])
714         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
715         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
716         reset()
717
718         #  ge
719         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
720                                              ],
721                                             [(0, "y"*100)],
722                                             None,
723                                             )}, [(10,5)])
724         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
725         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
726         reset()
727
728         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
729                                              ],
730                                             [(0, "y"*100)],
731                                             None,
732                                             )}, [(10,5)])
733         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
734         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
735         reset()
736
737         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
738                                              ],
739                                             [(0, "y"*100)],
740                                             None,
741                                             )}, [(10,5)])
742         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
743         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
744         reset()
745
746         #  gt
747         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
748                                              ],
749                                             [(0, "y"*100)],
750                                             None,
751                                             )}, [(10,5)])
752         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
753         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
754         reset()
755
756         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
757                                              ],
758                                             [(0, "x"*100)],
759                                             None,
760                                             )}, [(10,5)])
761         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
762         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
763         reset()
764
765         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
766                                              ],
767                                             [(0, "x"*100)],
768                                             None,
769                                             )}, [(10,5)])
770         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
771         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
772         reset()
773
774     def test_readv(self):
775         ss = self.create("test_readv")
776         secrets = ( self.write_enabler("we1"),
777                     self.renew_secret("we1"),
778                     self.cancel_secret("we1") )
779         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
780         write = ss.remote_slot_testv_and_readv_and_writev
781         read = ss.remote_slot_readv
782         data = [("%d" % i) * 100 for i in range(3)]
783         rc = write("si1", secrets,
784                    {0: ([], [(0,data[0])], None),
785                     1: ([], [(0,data[1])], None),
786                     2: ([], [(0,data[2])], None),
787                     }, [])
788         self.failUnlessEqual(rc, (True, {}))
789
790         answer = read("si1", [], [(0, 10)])
791         self.failUnlessEqual(answer, {0: ["0"*10],
792                                       1: ["1"*10],
793                                       2: ["2"*10]})
794
795     def compare_leases_without_timestamps(self, a, b):
796         self.failUnlessEqual(len(a), len(b))
797         for i in range(len(a)):
798             (num_a, (ownerid_a, expiration_time_a,
799                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
800             (num_b, (ownerid_b, expiration_time_b,
801                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
802             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
803                                    cancel_secret_a, nodeid_a),
804                                   (num_b, ownerid_b, renew_secret_b,
805                                    cancel_secret_b, nodeid_b) )
806
807     def test_leases(self):
808         ss = self.create("test_leases")
809         def secrets(n):
810             return ( self.write_enabler("we1"),
811                      self.renew_secret("we1-%d" % n),
812                      self.cancel_secret("we1-%d" % n) )
813         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
814         write = ss.remote_slot_testv_and_readv_and_writev
815         read = ss.remote_slot_readv
816         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
817         self.failUnlessEqual(rc, (True, {}))
818
819         # create a random non-numeric file in the bucket directory, to
820         # exercise the code that's supposed to ignore those.
821         bucket_dir = os.path.join(self.workdir("test_leases"),
822                                   "shares", storage_index_to_dir("si1"))
823         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
824         f.write("you ought to be ignoring me\n")
825         f.close()
826
827         # re-allocate the slots and use the same secrets, that should update
828         # the lease
829         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
830
831         # renew it directly
832         ss.remote_renew_lease("si1", secrets(0)[1])
833
834         # now allocate them with a bunch of different secrets, to trigger the
835         # extended lease code
836         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
837         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
838         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
839         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
840         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
841
842         # cancel one of them
843         ss.remote_cancel_lease("si1", secrets(5)[2])
844
845         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
846         all_leases = s0.debug_get_leases()
847         self.failUnlessEqual(len(all_leases), 5)
848
849         # and write enough data to expand the container, forcing the server
850         # to move the leases
851         write("si1", secrets(0),
852               {0: ([], [(0,data)], 200), },
853               [])
854
855         # read back the leases, make sure they're still intact.
856         self.compare_leases_without_timestamps(all_leases,
857                                                s0.debug_get_leases())
858
859         ss.remote_renew_lease("si1", secrets(0)[1])
860         ss.remote_renew_lease("si1", secrets(1)[1])
861         ss.remote_renew_lease("si1", secrets(2)[1])
862         ss.remote_renew_lease("si1", secrets(3)[1])
863         ss.remote_renew_lease("si1", secrets(4)[1])
864         self.compare_leases_without_timestamps(all_leases,
865                                                s0.debug_get_leases())
866         # get a new copy of the leases, with the current timestamps. Reading
867         # data and failing to renew/cancel leases should leave the timestamps
868         # alone.
869         all_leases = s0.debug_get_leases()
870         # renewing with a bogus token should prompt an error message
871
872         # TODO: examine the exception thus raised, make sure the old nodeid
873         # is present, to provide for share migration
874         self.failUnlessRaises(IndexError,
875                               ss.remote_renew_lease, "si1",
876                               secrets(20)[1])
877         # same for cancelling
878         self.failUnlessRaises(IndexError,
879                               ss.remote_cancel_lease, "si1",
880                               secrets(20)[2])
881         self.failUnlessEqual(all_leases, s0.debug_get_leases())
882
883         # reading shares should not modify the timestamp
884         read("si1", [], [(0,200)])
885         self.failUnlessEqual(all_leases, s0.debug_get_leases())
886
887         write("si1", secrets(0),
888               {0: ([], [(200, "make me bigger")], None)}, [])
889         self.compare_leases_without_timestamps(all_leases,
890                                                s0.debug_get_leases())
891
892         write("si1", secrets(0),
893               {0: ([], [(500, "make me really bigger")], None)}, [])
894         self.compare_leases_without_timestamps(all_leases,
895                                                s0.debug_get_leases())
896
897         # now cancel them all
898         ss.remote_cancel_lease("si1", secrets(0)[2])
899         ss.remote_cancel_lease("si1", secrets(1)[2])
900         ss.remote_cancel_lease("si1", secrets(2)[2])
901         ss.remote_cancel_lease("si1", secrets(3)[2])
902
903         # the slot should still be there
904         remaining_shares = read("si1", [], [(0,10)])
905         self.failUnlessEqual(len(remaining_shares), 1)
906         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
907
908         ss.remote_cancel_lease("si1", secrets(4)[2])
909         # now the slot should be gone
910         no_shares = read("si1", [], [(0,10)])
911         self.failUnlessEqual(no_shares, {})
912