]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
test_storage: add coverage for readonly_storage
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 from twisted.trial import unittest
3
4 from twisted.internet import defer
5 import time, os.path, stat
6 import itertools
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10      WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
11      storage_index_to_dir
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
14
15 class FakeCanary:
16     def notifyOnDisconnect(self, *args, **kwargs):
17         pass
18     def dontNotifyOnDisconnect(self, marker):
19         pass
20
21 class Bucket(unittest.TestCase):
22     def make_workdir(self, name):
23         basedir = os.path.join("storage", "Bucket", name)
24         incoming = os.path.join(basedir, "tmp", "bucket")
25         final = os.path.join(basedir, "bucket")
26         fileutil.make_dirs(basedir)
27         fileutil.make_dirs(os.path.join(basedir, "tmp"))
28         return incoming, final
29
30     def bucket_writer_closed(self, bw, consumed):
31         pass
32     def add_latency(self, category, latency):
33         pass
34     def count(self, name, delta=1):
35         pass
36
37     def make_lease(self):
38         owner_num = 0
39         renew_secret = os.urandom(32)
40         cancel_secret = os.urandom(32)
41         expiration_time = time.time() + 5000
42         return (owner_num, renew_secret, cancel_secret, expiration_time)
43
44     def test_create(self):
45         incoming, final = self.make_workdir("test_create")
46         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
47                           FakeCanary())
48         bw.remote_write(0, "a"*25)
49         bw.remote_write(25, "b"*25)
50         bw.remote_write(50, "c"*25)
51         bw.remote_write(75, "d"*7)
52         bw.remote_close()
53
54     def test_readwrite(self):
55         incoming, final = self.make_workdir("test_readwrite")
56         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
57                           FakeCanary())
58         bw.remote_write(0, "a"*25)
59         bw.remote_write(25, "b"*25)
60         bw.remote_write(50, "c"*7) # last block may be short
61         bw.remote_close()
62
63         # now read from it
64         br = BucketReader(self, bw.finalhome)
65         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
66         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
67         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
68
69 class RemoteBucket:
70
71     def callRemote(self, methname, *args, **kwargs):
72         def _call():
73             meth = getattr(self.target, "remote_" + methname)
74             return meth(*args, **kwargs)
75         return defer.maybeDeferred(_call)
76
77 class BucketProxy(unittest.TestCase):
78     def make_bucket(self, name, size):
79         basedir = os.path.join("storage", "BucketProxy", name)
80         incoming = os.path.join(basedir, "tmp", "bucket")
81         final = os.path.join(basedir, "bucket")
82         fileutil.make_dirs(basedir)
83         fileutil.make_dirs(os.path.join(basedir, "tmp"))
84         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
85                           FakeCanary())
86         rb = RemoteBucket()
87         rb.target = bw
88         return bw, rb, final
89
90     def make_lease(self):
91         owner_num = 0
92         renew_secret = os.urandom(32)
93         cancel_secret = os.urandom(32)
94         expiration_time = time.time() + 5000
95         return (owner_num, renew_secret, cancel_secret, expiration_time)
96
97     def bucket_writer_closed(self, bw, consumed):
98         pass
99     def add_latency(self, category, latency):
100         pass
101     def count(self, name, delta=1):
102         pass
103
104     def test_create(self):
105         bw, rb, sharefname = self.make_bucket("test_create", 500)
106         bp = WriteBucketProxy(rb,
107                               data_size=300,
108                               segment_size=10,
109                               num_segments=5,
110                               num_share_hashes=3,
111                               uri_extension_size=500, nodeid=None)
112         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
113
114     def test_readwrite(self):
115         # Let's pretend each share has 100 bytes of data, and that there are
116         # 4 segments (25 bytes each), and 8 shares total. So the three
117         # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
118         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
119         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
120         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
121         # long. That should make the whole share:
122         #
123         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
124
125         plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
126                             for i in range(7)]
127         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
128                             for i in range(7)]
129         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
130                         for i in range(7)]
131         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
132                         for i in (1,9,13)]
133         uri_extension = "s" + "E"*498 + "e"
134
135         bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
136         bp = WriteBucketProxy(rb,
137                               data_size=95,
138                               segment_size=25,
139                               num_segments=4,
140                               num_share_hashes=3,
141                               uri_extension_size=len(uri_extension),
142                               nodeid=None)
143
144         d = bp.start()
145         d.addCallback(lambda res: bp.put_block(0, "a"*25))
146         d.addCallback(lambda res: bp.put_block(1, "b"*25))
147         d.addCallback(lambda res: bp.put_block(2, "c"*25))
148         d.addCallback(lambda res: bp.put_block(3, "d"*20))
149         d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
150         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
151         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
152         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
153         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
154         d.addCallback(lambda res: bp.close())
155
156         # now read everything back
157         def _start_reading(res):
158             br = BucketReader(self, sharefname)
159             rb = RemoteBucket()
160             rb.target = br
161             rbp = ReadBucketProxy(rb)
162             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
163
164             d1 = rbp.startIfNecessary()
165             d1.addCallback(lambda res: rbp.get_block(0))
166             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
167             d1.addCallback(lambda res: rbp.get_block(1))
168             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
169             d1.addCallback(lambda res: rbp.get_block(2))
170             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
171             d1.addCallback(lambda res: rbp.get_block(3))
172             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
173
174             d1.addCallback(lambda res: rbp.get_plaintext_hashes())
175             d1.addCallback(lambda res:
176                            self.failUnlessEqual(res, plaintext_hashes))
177             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
178             d1.addCallback(lambda res:
179                            self.failUnlessEqual(res, crypttext_hashes))
180             d1.addCallback(lambda res: rbp.get_block_hashes())
181             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
182             d1.addCallback(lambda res: rbp.get_share_hashes())
183             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
184             d1.addCallback(lambda res: rbp.get_uri_extension())
185             d1.addCallback(lambda res:
186                            self.failUnlessEqual(res, uri_extension))
187
188             return d1
189
190         d.addCallback(_start_reading)
191
192         return d
193
194
195
196 class Server(unittest.TestCase):
197
198     def setUp(self):
199         self.sparent = LoggingServiceParent()
200         self._lease_secret = itertools.count()
201     def tearDown(self):
202         return self.sparent.stopService()
203
204     def workdir(self, name):
205         basedir = os.path.join("storage", "Server", name)
206         return basedir
207
208     def create(self, name, sizelimit=None):
209         workdir = self.workdir(name)
210         ss = StorageServer(workdir, sizelimit)
211         ss.setServiceParent(self.sparent)
212         return ss
213
214     def test_create(self):
215         ss = self.create("test_create")
216
217     def allocate(self, ss, storage_index, sharenums, size):
218         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
219         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
220         return ss.remote_allocate_buckets(storage_index,
221                                           renew_secret, cancel_secret,
222                                           sharenums, size, FakeCanary())
223
224     def test_dont_overfill_dirs(self):
225         """
226         This test asserts that if you add a second share whose storage index
227         share lots of leading bits with an extant share (but isn't the exact
228         same storage index), this won't add an entry to the share directory.
229         """
230         ss = self.create("test_dont_overfill_dirs")
231         already, writers = self.allocate(ss, "storageindex", [0], 10)
232         for i, wb in writers.items():
233             wb.remote_write(0, "%10d" % i)
234             wb.remote_close()
235         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
236                                 "shares")
237         children_of_storedir = set(os.listdir(storedir))
238
239         # Now store another one under another storageindex that has leading
240         # chars the same as the first storageindex.
241         already, writers = self.allocate(ss, "storageindey", [0], 10)
242         for i, wb in writers.items():
243             wb.remote_write(0, "%10d" % i)
244             wb.remote_close()
245         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
246                                 "shares")
247         new_children_of_storedir = set(os.listdir(storedir))
248         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
249
250     def test_remove_incoming(self):
251         ss = self.create("test_remove_incoming")
252         already, writers = self.allocate(ss, "vid", range(3), 10)
253         for i,wb in writers.items():
254             wb.remote_write(0, "%10d" % i)
255             wb.remote_close()
256         incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
257         self.failIf(os.path.exists(incomingdir))
258
259     def test_allocate(self):
260         ss = self.create("test_allocate")
261
262         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
263
264         canary = FakeCanary()
265         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
266         self.failUnlessEqual(already, set())
267         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
268
269         # while the buckets are open, they should not count as readable
270         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
271
272         # close the buckets
273         for i,wb in writers.items():
274             wb.remote_write(0, "%25d" % i)
275             wb.remote_close()
276
277         # now they should be readable
278         b = ss.remote_get_buckets("vid")
279         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
280         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
281
282         # now if we about writing again, the server should offer those three
283         # buckets as already present. It should offer them even if we don't
284         # ask about those specific ones.
285         already,writers = self.allocate(ss, "vid", [2,3,4], 75)
286         self.failUnlessEqual(already, set([0,1,2]))
287         self.failUnlessEqual(set(writers.keys()), set([3,4]))
288
289         # while those two buckets are open for writing, the server should
290         # refuse to offer them to uploaders
291
292         already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
293         self.failUnlessEqual(already, set([0,1,2]))
294         self.failUnlessEqual(set(writers.keys()), set([5]))
295
296     def test_sizelimits(self):
297         ss = self.create("test_sizelimits", 5000)
298         canary = FakeCanary()
299         # a newly created and filled share incurs this much overhead, beyond
300         # the size we request.
301         OVERHEAD = 3*4
302         LEASE_SIZE = 4+32+32+4
303
304         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
305         self.failUnlessEqual(len(writers), 3)
306         # now the StorageServer should have 3000 bytes provisionally
307         # allocated, allowing only 2000 more to be claimed
308         self.failUnlessEqual(len(ss._active_writers), 3)
309
310         # allocating 1001-byte shares only leaves room for one
311         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
312         self.failUnlessEqual(len(writers2), 1)
313         self.failUnlessEqual(len(ss._active_writers), 4)
314
315         # we abandon the first set, so their provisional allocation should be
316         # returned
317         del already
318         del writers
319         self.failUnlessEqual(len(ss._active_writers), 1)
320         # now we have a provisional allocation of 1001 bytes
321
322         # and we close the second set, so their provisional allocation should
323         # become real, long-term allocation, and grows to include the
324         # overhead.
325         for bw in writers2.values():
326             bw.remote_write(0, "a"*25)
327             bw.remote_close()
328         del already2
329         del writers2
330         del bw
331         self.failUnlessEqual(len(ss._active_writers), 0)
332
333         allocated = 1001 + OVERHEAD + LEASE_SIZE
334         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
335         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
336         already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
337         self.failUnlessEqual(len(writers3), 39)
338         self.failUnlessEqual(len(ss._active_writers), 39)
339
340         del already3
341         del writers3
342         self.failUnlessEqual(len(ss._active_writers), 0)
343         ss.disownServiceParent()
344         del ss
345
346         # creating a new StorageServer in the same directory should see the
347         # same usage.
348
349         # metadata that goes into the share file is counted upon share close,
350         # as well as at startup. metadata that goes into other files will not
351         # be counted until the next startup, so if we were creating any
352         # extra-file metadata, the allocation would be more than 'allocated'
353         # and this test would need to be changed.
354         ss = self.create("test_sizelimits", 5000)
355         already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
356         self.failUnlessEqual(len(writers4), 39)
357         self.failUnlessEqual(len(ss._active_writers), 39)
358
359     def test_seek(self):
360         basedir = self.workdir("test_seek_behavior")
361         fileutil.make_dirs(basedir)
362         filename = os.path.join(basedir, "testfile")
363         f = open(filename, "wb")
364         f.write("start")
365         f.close()
366         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
367         # files. mode="a" preserves previous contents but does not allow
368         # seeking-to-create-holes. mode="r+" allows both.
369         f = open(filename, "rb+")
370         f.seek(100)
371         f.write("100")
372         f.close()
373         filelen = os.stat(filename)[stat.ST_SIZE]
374         self.failUnlessEqual(filelen, 100+3)
375         f2 = open(filename, "rb")
376         self.failUnlessEqual(f2.read(5), "start")
377
378
379     def test_leases(self):
380         ss = self.create("test_leases")
381         canary = FakeCanary()
382         sharenums = range(5)
383         size = 100
384
385         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
386                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
387         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
388                                                      sharenums, size, canary)
389         self.failUnlessEqual(len(already), 0)
390         self.failUnlessEqual(len(writers), 5)
391         for wb in writers.values():
392             wb.remote_close()
393
394         leases = list(ss.get_leases("si0"))
395         self.failUnlessEqual(len(leases), 1)
396         self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
397
398         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
399                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
400         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
401                                                      sharenums, size, canary)
402         for wb in writers.values():
403             wb.remote_close()
404
405         # take out a second lease on si1
406         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
407                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
408         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
409                                                      sharenums, size, canary)
410         self.failUnlessEqual(len(already), 5)
411         self.failUnlessEqual(len(writers), 0)
412
413         leases = list(ss.get_leases("si1"))
414         self.failUnlessEqual(len(leases), 2)
415         self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
416
417         # check that si0 is readable
418         readers = ss.remote_get_buckets("si0")
419         self.failUnlessEqual(len(readers), 5)
420
421         # renew the first lease. Only the proper renew_secret should work
422         ss.remote_renew_lease("si0", rs0)
423         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
424         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
425
426         # check that si0 is still readable
427         readers = ss.remote_get_buckets("si0")
428         self.failUnlessEqual(len(readers), 5)
429
430         # now cancel it
431         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
432         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
433         ss.remote_cancel_lease("si0", cs0)
434
435         # si0 should now be gone
436         readers = ss.remote_get_buckets("si0")
437         self.failUnlessEqual(len(readers), 0)
438         # and the renew should no longer work
439         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
440
441
442         # cancel the first lease on si1, leaving the second in place
443         ss.remote_cancel_lease("si1", cs1)
444         readers = ss.remote_get_buckets("si1")
445         self.failUnlessEqual(len(readers), 5)
446         # the corresponding renew should no longer work
447         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
448
449         leases = list(ss.get_leases("si1"))
450         self.failUnlessEqual(len(leases), 1)
451         self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
452
453         ss.remote_renew_lease("si1", rs2)
454         # cancelling the second should make it go away
455         ss.remote_cancel_lease("si1", cs2)
456         readers = ss.remote_get_buckets("si1")
457         self.failUnlessEqual(len(readers), 0)
458         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
459         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
460
461         leases = list(ss.get_leases("si1"))
462         self.failUnlessEqual(len(leases), 0)
463
464
465         # test overlapping uploads
466         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
467                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
468         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
469                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
470         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
471                                                      sharenums, size, canary)
472         self.failUnlessEqual(len(already), 0)
473         self.failUnlessEqual(len(writers), 5)
474         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
475                                                        sharenums, size, canary)
476         self.failUnlessEqual(len(already2), 0)
477         self.failUnlessEqual(len(writers2), 0)
478         for wb in writers.values():
479             wb.remote_close()
480
481         leases = list(ss.get_leases("si3"))
482         self.failUnlessEqual(len(leases), 1)
483
484         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
485                                                        sharenums, size, canary)
486         self.failUnlessEqual(len(already3), 5)
487         self.failUnlessEqual(len(writers3), 0)
488
489         leases = list(ss.get_leases("si3"))
490         self.failUnlessEqual(len(leases), 2)
491
492     def test_readonly(self):
493         workdir = self.workdir("test_readonly")
494         ss = StorageServer(workdir, readonly_storage=True)
495         ss.setServiceParent(self.sparent)
496
497         canary = FakeCanary()
498         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
499         self.failUnlessEqual(already, set())
500         self.failUnlessEqual(writers, {})
501
502
503
504 class MutableServer(unittest.TestCase):
505
506     def setUp(self):
507         self.sparent = LoggingServiceParent()
508         self._lease_secret = itertools.count()
509     def tearDown(self):
510         return self.sparent.stopService()
511
512     def workdir(self, name):
513         basedir = os.path.join("storage", "MutableServer", name)
514         return basedir
515
516     def create(self, name, sizelimit=None):
517         workdir = self.workdir(name)
518         ss = StorageServer(workdir, sizelimit)
519         ss.setServiceParent(self.sparent)
520         ss.setNodeID("\x00" * 32)
521         return ss
522
523     def test_create(self):
524         ss = self.create("test_create")
525
526     def write_enabler(self, we_tag):
527         return hashutil.tagged_hash("we_blah", we_tag)
528
529     def renew_secret(self, tag):
530         return hashutil.tagged_hash("renew_blah", str(tag))
531
532     def cancel_secret(self, tag):
533         return hashutil.tagged_hash("cancel_blah", str(tag))
534
535     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
536         write_enabler = self.write_enabler(we_tag)
537         renew_secret = self.renew_secret(lease_tag)
538         cancel_secret = self.cancel_secret(lease_tag)
539         rstaraw = ss.remote_slot_testv_and_readv_and_writev
540         testandwritev = dict( [ (shnum, ([], [], None) )
541                          for shnum in sharenums ] )
542         readv = []
543         rc = rstaraw(storage_index,
544                      (write_enabler, renew_secret, cancel_secret),
545                      testandwritev,
546                      readv)
547         (did_write, readv_data) = rc
548         self.failUnless(did_write)
549         self.failUnless(isinstance(readv_data, dict))
550         self.failUnlessEqual(len(readv_data), 0)
551
552     def test_allocate(self):
553         ss = self.create("test_allocate")
554         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
555                                set([0,1,2]), 100)
556
557         read = ss.remote_slot_readv
558         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
559                              {0: [""]})
560         self.failUnlessEqual(read("si1", [], [(0, 10)]),
561                              {0: [""], 1: [""], 2: [""]})
562         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
563                              {0: [""]})
564
565         # try writing to one
566         secrets = ( self.write_enabler("we1"),
567                     self.renew_secret("we1"),
568                     self.cancel_secret("we1") )
569         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
570         write = ss.remote_slot_testv_and_readv_and_writev
571         answer = write("si1", secrets,
572                        {0: ([], [(0,data)], None)},
573                        [])
574         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
575
576         self.failUnlessEqual(read("si1", [0], [(0,20)]),
577                              {0: ["00000000001111111111"]})
578         self.failUnlessEqual(read("si1", [0], [(95,10)]),
579                              {0: ["99999"]})
580         #self.failUnlessEqual(s0.remote_get_length(), 100)
581
582         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
583         f = self.failUnlessRaises(BadWriteEnablerError,
584                                   write, "si1", bad_secrets,
585                                   {}, [])
586         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
587
588         # this testv should fail
589         answer = write("si1", secrets,
590                        {0: ([(0, 12, "eq", "444444444444"),
591                              (20, 5, "eq", "22222"),
592                              ],
593                             [(0, "x"*100)],
594                             None),
595                         },
596                        [(0,12), (20,5)],
597                        )
598         self.failUnlessEqual(answer, (False,
599                                       {0: ["000000000011", "22222"],
600                                        1: ["", ""],
601                                        2: ["", ""],
602                                        }))
603         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
604
605         # as should this one
606         answer = write("si1", secrets,
607                        {0: ([(10, 5, "lt", "11111"),
608                              ],
609                             [(0, "x"*100)],
610                             None),
611                         },
612                        [(10,5)],
613                        )
614         self.failUnlessEqual(answer, (False,
615                                       {0: ["11111"],
616                                        1: [""],
617                                        2: [""]},
618                                       ))
619         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
620
621
622     def test_operators(self):
623         # test operators, the data we're comparing is '11111' in all cases.
624         # test both fail+pass, reset data after each one.
625         ss = self.create("test_operators")
626
627         secrets = ( self.write_enabler("we1"),
628                     self.renew_secret("we1"),
629                     self.cancel_secret("we1") )
630         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
631         write = ss.remote_slot_testv_and_readv_and_writev
632         read = ss.remote_slot_readv
633
634         def reset():
635             write("si1", secrets,
636                   {0: ([], [(0,data)], None)},
637                   [])
638
639         reset()
640
641         #  lt
642         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
643                                              ],
644                                             [(0, "x"*100)],
645                                             None,
646                                             )}, [(10,5)])
647         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
648         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
649         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
650         reset()
651
652         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
653                                              ],
654                                             [(0, "x"*100)],
655                                             None,
656                                             )}, [(10,5)])
657         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
658         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
659         reset()
660
661         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
662                                              ],
663                                             [(0, "y"*100)],
664                                             None,
665                                             )}, [(10,5)])
666         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
667         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
668         reset()
669
670         #  le
671         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
672                                              ],
673                                             [(0, "x"*100)],
674                                             None,
675                                             )}, [(10,5)])
676         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
677         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
678         reset()
679
680         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
681                                              ],
682                                             [(0, "y"*100)],
683                                             None,
684                                             )}, [(10,5)])
685         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
686         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
687         reset()
688
689         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
690                                              ],
691                                             [(0, "y"*100)],
692                                             None,
693                                             )}, [(10,5)])
694         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
695         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
696         reset()
697
698         #  eq
699         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
700                                              ],
701                                             [(0, "x"*100)],
702                                             None,
703                                             )}, [(10,5)])
704         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
705         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
706         reset()
707
708         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
709                                              ],
710                                             [(0, "y"*100)],
711                                             None,
712                                             )}, [(10,5)])
713         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
714         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
715         reset()
716
717         #  ne
718         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
719                                              ],
720                                             [(0, "x"*100)],
721                                             None,
722                                             )}, [(10,5)])
723         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
724         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
725         reset()
726
727         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
728                                              ],
729                                             [(0, "y"*100)],
730                                             None,
731                                             )}, [(10,5)])
732         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
733         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
734         reset()
735
736         #  ge
737         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
738                                              ],
739                                             [(0, "y"*100)],
740                                             None,
741                                             )}, [(10,5)])
742         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
743         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
744         reset()
745
746         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
747                                              ],
748                                             [(0, "y"*100)],
749                                             None,
750                                             )}, [(10,5)])
751         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
752         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
753         reset()
754
755         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
756                                              ],
757                                             [(0, "y"*100)],
758                                             None,
759                                             )}, [(10,5)])
760         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
761         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
762         reset()
763
764         #  gt
765         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
766                                              ],
767                                             [(0, "y"*100)],
768                                             None,
769                                             )}, [(10,5)])
770         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
771         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
772         reset()
773
774         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
775                                              ],
776                                             [(0, "x"*100)],
777                                             None,
778                                             )}, [(10,5)])
779         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
780         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
781         reset()
782
783         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
784                                              ],
785                                             [(0, "x"*100)],
786                                             None,
787                                             )}, [(10,5)])
788         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
789         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
790         reset()
791
792     def test_readv(self):
793         ss = self.create("test_readv")
794         secrets = ( self.write_enabler("we1"),
795                     self.renew_secret("we1"),
796                     self.cancel_secret("we1") )
797         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
798         write = ss.remote_slot_testv_and_readv_and_writev
799         read = ss.remote_slot_readv
800         data = [("%d" % i) * 100 for i in range(3)]
801         rc = write("si1", secrets,
802                    {0: ([], [(0,data[0])], None),
803                     1: ([], [(0,data[1])], None),
804                     2: ([], [(0,data[2])], None),
805                     }, [])
806         self.failUnlessEqual(rc, (True, {}))
807
808         answer = read("si1", [], [(0, 10)])
809         self.failUnlessEqual(answer, {0: ["0"*10],
810                                       1: ["1"*10],
811                                       2: ["2"*10]})
812
813     def compare_leases_without_timestamps(self, a, b):
814         self.failUnlessEqual(len(a), len(b))
815         for i in range(len(a)):
816             (num_a, (ownerid_a, expiration_time_a,
817                    renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
818             (num_b, (ownerid_b, expiration_time_b,
819                    renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
820             self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
821                                    cancel_secret_a, nodeid_a),
822                                   (num_b, ownerid_b, renew_secret_b,
823                                    cancel_secret_b, nodeid_b) )
824
825     def test_leases(self):
826         ss = self.create("test_leases")
827         def secrets(n):
828             return ( self.write_enabler("we1"),
829                      self.renew_secret("we1-%d" % n),
830                      self.cancel_secret("we1-%d" % n) )
831         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
832         write = ss.remote_slot_testv_and_readv_and_writev
833         read = ss.remote_slot_readv
834         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
835         self.failUnlessEqual(rc, (True, {}))
836
837         # create a random non-numeric file in the bucket directory, to
838         # exercise the code that's supposed to ignore those.
839         bucket_dir = os.path.join(self.workdir("test_leases"),
840                                   "shares", storage_index_to_dir("si1"))
841         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
842         f.write("you ought to be ignoring me\n")
843         f.close()
844
845         # re-allocate the slots and use the same secrets, that should update
846         # the lease
847         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
848
849         # renew it directly
850         ss.remote_renew_lease("si1", secrets(0)[1])
851
852         # now allocate them with a bunch of different secrets, to trigger the
853         # extended lease code
854         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
855         write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
856         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
857         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
858         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
859
860         # cancel one of them
861         ss.remote_cancel_lease("si1", secrets(5)[2])
862
863         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
864         all_leases = s0.debug_get_leases()
865         self.failUnlessEqual(len(all_leases), 5)
866
867         # and write enough data to expand the container, forcing the server
868         # to move the leases
869         write("si1", secrets(0),
870               {0: ([], [(0,data)], 200), },
871               [])
872
873         # read back the leases, make sure they're still intact.
874         self.compare_leases_without_timestamps(all_leases,
875                                                s0.debug_get_leases())
876
877         ss.remote_renew_lease("si1", secrets(0)[1])
878         ss.remote_renew_lease("si1", secrets(1)[1])
879         ss.remote_renew_lease("si1", secrets(2)[1])
880         ss.remote_renew_lease("si1", secrets(3)[1])
881         ss.remote_renew_lease("si1", secrets(4)[1])
882         self.compare_leases_without_timestamps(all_leases,
883                                                s0.debug_get_leases())
884         # get a new copy of the leases, with the current timestamps. Reading
885         # data and failing to renew/cancel leases should leave the timestamps
886         # alone.
887         all_leases = s0.debug_get_leases()
888         # renewing with a bogus token should prompt an error message
889
890         # TODO: examine the exception thus raised, make sure the old nodeid
891         # is present, to provide for share migration
892         self.failUnlessRaises(IndexError,
893                               ss.remote_renew_lease, "si1",
894                               secrets(20)[1])
895         # same for cancelling
896         self.failUnlessRaises(IndexError,
897                               ss.remote_cancel_lease, "si1",
898                               secrets(20)[2])
899         self.failUnlessEqual(all_leases, s0.debug_get_leases())
900
901         # reading shares should not modify the timestamp
902         read("si1", [], [(0,200)])
903         self.failUnlessEqual(all_leases, s0.debug_get_leases())
904
905         write("si1", secrets(0),
906               {0: ([], [(200, "make me bigger")], None)}, [])
907         self.compare_leases_without_timestamps(all_leases,
908                                                s0.debug_get_leases())
909
910         write("si1", secrets(0),
911               {0: ([], [(500, "make me really bigger")], None)}, [])
912         self.compare_leases_without_timestamps(all_leases,
913                                                s0.debug_get_leases())
914
915         # now cancel them all
916         ss.remote_cancel_lease("si1", secrets(0)[2])
917         ss.remote_cancel_lease("si1", secrets(1)[2])
918         ss.remote_cancel_lease("si1", secrets(2)[2])
919         ss.remote_cancel_lease("si1", secrets(3)[2])
920
921         # the slot should still be there
922         remaining_shares = read("si1", [], [(0,10)])
923         self.failUnlessEqual(len(remaining_shares), 1)
924         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
925
926         ss.remote_cancel_lease("si1", secrets(4)[2])
927         # now the slot should be gone
928         no_shares = read("si1", [], [(0,10)])
929         self.failUnlessEqual(no_shares, {})
930
931 class Stats(unittest.TestCase):
932
933     def setUp(self):
934         self.sparent = LoggingServiceParent()
935         self._lease_secret = itertools.count()
936     def tearDown(self):
937         return self.sparent.stopService()
938
939     def workdir(self, name):
940         basedir = os.path.join("storage", "Server", name)
941         return basedir
942
943     def create(self, name, sizelimit=None):
944         workdir = self.workdir(name)
945         ss = StorageServer(workdir, sizelimit)
946         ss.setServiceParent(self.sparent)
947         return ss
948
949     def test_latencies(self):
950         ss = self.create("test_latencies")
951         for i in range(10000):
952             ss.add_latency("allocate", 1.0 * i)
953         for i in range(1000):
954             ss.add_latency("renew", 1.0 * i)
955         for i in range(10):
956             ss.add_latency("cancel", 2.0 * i)
957         ss.add_latency("get", 5.0)
958
959         output = ss.get_latencies()
960
961         self.failUnlessEqual(sorted(output.keys()),
962                              sorted(["allocate", "renew", "cancel", "get"]))
963         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
964         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
965         self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
966         self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
967         self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
968         self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
969         self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
970
971         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
972         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
973         self.failUnless(abs(output["renew"]["median"] - 500) < 1)
974         self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
975         self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
976         self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
977         self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
978
979         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
980         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
981         self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
982         self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
983         self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
984         self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
985         self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
986
987         self.failUnlessEqual(len(ss.latencies["get"]), 1)
988         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
989         self.failUnless(abs(output["get"]["median"] - 5) < 1)
990         self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
991         self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
992         self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
993         self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)