]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
crawler: fix performance problems: only save state once per timeslice (not after...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin
12 from allmydata.storage.server import StorageServer, storage_index_to_dir
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError
16 from allmydata.storage.lease import LeaseInfo
17 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
18      ReadBucketProxy
19 from allmydata.interfaces import BadWriteEnablerError
20 from allmydata.test.common import LoggingServiceParent
21 from allmydata.web.storage import StorageStatus, remove_prefix
22
23 class Marker:
24     pass
25 class FakeCanary:
26     def __init__(self, ignore_disconnectors=False):
27         self.ignore = ignore_disconnectors
28         self.disconnectors = {}
29     def notifyOnDisconnect(self, f, *args, **kwargs):
30         if self.ignore:
31             return
32         m = Marker()
33         self.disconnectors[m] = (f, args, kwargs)
34         return m
35     def dontNotifyOnDisconnect(self, marker):
36         if self.ignore:
37             return
38         del self.disconnectors[marker]
39
40 class FakeStatsProvider:
41     def count(self, name, delta=1):
42         pass
43     def register_producer(self, producer):
44         pass
45
46 class Bucket(unittest.TestCase):
47     def make_workdir(self, name):
48         basedir = os.path.join("storage", "Bucket", name)
49         incoming = os.path.join(basedir, "tmp", "bucket")
50         final = os.path.join(basedir, "bucket")
51         fileutil.make_dirs(basedir)
52         fileutil.make_dirs(os.path.join(basedir, "tmp"))
53         return incoming, final
54
55     def bucket_writer_closed(self, bw, consumed):
56         pass
57     def add_latency(self, category, latency):
58         pass
59     def count(self, name, delta=1):
60         pass
61
62     def make_lease(self):
63         owner_num = 0
64         renew_secret = os.urandom(32)
65         cancel_secret = os.urandom(32)
66         expiration_time = time.time() + 5000
67         return LeaseInfo(owner_num, renew_secret, cancel_secret,
68                          expiration_time, "\x00" * 20)
69
70     def test_create(self):
71         incoming, final = self.make_workdir("test_create")
72         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
73                           FakeCanary())
74         bw.remote_write(0, "a"*25)
75         bw.remote_write(25, "b"*25)
76         bw.remote_write(50, "c"*25)
77         bw.remote_write(75, "d"*7)
78         bw.remote_close()
79
80     def test_readwrite(self):
81         incoming, final = self.make_workdir("test_readwrite")
82         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
83                           FakeCanary())
84         bw.remote_write(0, "a"*25)
85         bw.remote_write(25, "b"*25)
86         bw.remote_write(50, "c"*7) # last block may be short
87         bw.remote_close()
88
89         # now read from it
90         br = BucketReader(self, bw.finalhome)
91         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
92         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
93         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
94
95 class RemoteBucket:
96
97     def callRemote(self, methname, *args, **kwargs):
98         def _call():
99             meth = getattr(self.target, "remote_" + methname)
100             return meth(*args, **kwargs)
101         return defer.maybeDeferred(_call)
102
103 class BucketProxy(unittest.TestCase):
104     def make_bucket(self, name, size):
105         basedir = os.path.join("storage", "BucketProxy", name)
106         incoming = os.path.join(basedir, "tmp", "bucket")
107         final = os.path.join(basedir, "bucket")
108         fileutil.make_dirs(basedir)
109         fileutil.make_dirs(os.path.join(basedir, "tmp"))
110         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
111                           FakeCanary())
112         rb = RemoteBucket()
113         rb.target = bw
114         return bw, rb, final
115
116     def make_lease(self):
117         owner_num = 0
118         renew_secret = os.urandom(32)
119         cancel_secret = os.urandom(32)
120         expiration_time = time.time() + 5000
121         return LeaseInfo(owner_num, renew_secret, cancel_secret,
122                          expiration_time, "\x00" * 20)
123
124     def bucket_writer_closed(self, bw, consumed):
125         pass
126     def add_latency(self, category, latency):
127         pass
128     def count(self, name, delta=1):
129         pass
130
131     def test_create(self):
132         bw, rb, sharefname = self.make_bucket("test_create", 500)
133         bp = WriteBucketProxy(rb,
134                               data_size=300,
135                               block_size=10,
136                               num_segments=5,
137                               num_share_hashes=3,
138                               uri_extension_size_max=500, nodeid=None)
139         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
140
141     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
142         # Let's pretend each share has 100 bytes of data, and that there are
143         # 4 segments (25 bytes each), and 8 shares total. So the two
144         # per-segment merkle trees (crypttext_hash_tree,
145         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
146         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
147         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
148         # long. That should make the whole share:
149         #
150         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
151         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
152
153         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
154
155         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
156                             for i in range(7)]
157         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
158                         for i in range(7)]
159         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
160                         for i in (1,9,13)]
161         uri_extension = "s" + "E"*498 + "e"
162
163         bw, rb, sharefname = self.make_bucket(name, sharesize)
164         bp = wbp_class(rb,
165                        data_size=95,
166                        block_size=25,
167                        num_segments=4,
168                        num_share_hashes=3,
169                        uri_extension_size_max=len(uri_extension),
170                        nodeid=None)
171
172         d = bp.put_header()
173         d.addCallback(lambda res: bp.put_block(0, "a"*25))
174         d.addCallback(lambda res: bp.put_block(1, "b"*25))
175         d.addCallback(lambda res: bp.put_block(2, "c"*25))
176         d.addCallback(lambda res: bp.put_block(3, "d"*20))
177         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
178         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
179         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
180         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
181         d.addCallback(lambda res: bp.close())
182
183         # now read everything back
184         def _start_reading(res):
185             br = BucketReader(self, sharefname)
186             rb = RemoteBucket()
187             rb.target = br
188             rbp = rbp_class(rb, peerid="abc", storage_index="")
189             self.failUnless("to peer" in repr(rbp))
190             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
191
192             d1 = rbp.get_block_data(0, 25, 25)
193             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
194             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
195             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
196             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
200
201             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
202             d1.addCallback(lambda res:
203                            self.failUnlessEqual(res, crypttext_hashes))
204             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
205             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
206             d1.addCallback(lambda res: rbp.get_share_hashes())
207             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
208             d1.addCallback(lambda res: rbp.get_uri_extension())
209             d1.addCallback(lambda res:
210                            self.failUnlessEqual(res, uri_extension))
211
212             return d1
213
214         d.addCallback(_start_reading)
215
216         return d
217
218     def test_readwrite_v1(self):
219         return self._do_test_readwrite("test_readwrite_v1",
220                                        0x24, WriteBucketProxy, ReadBucketProxy)
221
222     def test_readwrite_v2(self):
223         return self._do_test_readwrite("test_readwrite_v2",
224                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
225
226 class FakeDiskStorageServer(StorageServer):
227     def stat_disk(self, d):
228         return self.DISKAVAIL
229
230 class Server(unittest.TestCase):
231
232     def setUp(self):
233         self.sparent = LoggingServiceParent()
234         self.sparent.startService()
235         self._lease_secret = itertools.count()
236     def tearDown(self):
237         return self.sparent.stopService()
238
239     def workdir(self, name):
240         basedir = os.path.join("storage", "Server", name)
241         return basedir
242
243     def create(self, name, reserved_space=0, klass=StorageServer):
244         workdir = self.workdir(name)
245         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
246                    stats_provider=FakeStatsProvider())
247         ss.setServiceParent(self.sparent)
248         return ss
249
250     def test_create(self):
251         ss = self.create("test_create")
252
253     def allocate(self, ss, storage_index, sharenums, size, canary=None):
254         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
255         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
256         if not canary:
257             canary = FakeCanary()
258         return ss.remote_allocate_buckets(storage_index,
259                                           renew_secret, cancel_secret,
260                                           sharenums, size, canary)
261
262     def test_large_share(self):
263         ss = self.create("test_large_share")
264
265         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
266         self.failUnlessEqual(already, set())
267         self.failUnlessEqual(set(writers.keys()), set([0]))
268
269         shnum, bucket = writers.items()[0]
270         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
271         bucket.remote_write(2**32, "ab")
272         bucket.remote_close()
273
274         readers = ss.remote_get_buckets("allocate")
275         reader = readers[shnum]
276         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
277     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
278
279     def test_dont_overfill_dirs(self):
280         """
281         This test asserts that if you add a second share whose storage index
282         share lots of leading bits with an extant share (but isn't the exact
283         same storage index), this won't add an entry to the share directory.
284         """
285         ss = self.create("test_dont_overfill_dirs")
286         already, writers = self.allocate(ss, "storageindex", [0], 10)
287         for i, wb in writers.items():
288             wb.remote_write(0, "%10d" % i)
289             wb.remote_close()
290         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
291                                 "shares")
292         children_of_storedir = set(os.listdir(storedir))
293
294         # Now store another one under another storageindex that has leading
295         # chars the same as the first storageindex.
296         already, writers = self.allocate(ss, "storageindey", [0], 10)
297         for i, wb in writers.items():
298             wb.remote_write(0, "%10d" % i)
299             wb.remote_close()
300         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
301                                 "shares")
302         new_children_of_storedir = set(os.listdir(storedir))
303         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
304
305     def test_remove_incoming(self):
306         ss = self.create("test_remove_incoming")
307         already, writers = self.allocate(ss, "vid", range(3), 10)
308         for i,wb in writers.items():
309             wb.remote_write(0, "%10d" % i)
310             wb.remote_close()
311         incoming_share_dir = wb.incominghome
312         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
313         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
314         incoming_dir = os.path.dirname(incoming_prefix_dir)
315         self.failIf(os.path.exists(incoming_bucket_dir))
316         self.failIf(os.path.exists(incoming_prefix_dir))
317         self.failUnless(os.path.exists(incoming_dir))
318
319     def test_allocate(self):
320         ss = self.create("test_allocate")
321
322         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
323
324         canary = FakeCanary()
325         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
326         self.failUnlessEqual(already, set())
327         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
328
329         # while the buckets are open, they should not count as readable
330         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
331
332         # close the buckets
333         for i,wb in writers.items():
334             wb.remote_write(0, "%25d" % i)
335             wb.remote_close()
336             # aborting a bucket that was already closed is a no-op
337             wb.remote_abort()
338
339         # now they should be readable
340         b = ss.remote_get_buckets("allocate")
341         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
342         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
343
344         # now if we ask about writing again, the server should offer those
345         # three buckets as already present. It should offer them even if we
346         # don't ask about those specific ones.
347         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
348         self.failUnlessEqual(already, set([0,1,2]))
349         self.failUnlessEqual(set(writers.keys()), set([3,4]))
350
351         # while those two buckets are open for writing, the server should
352         # refuse to offer them to uploaders
353
354         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
355         self.failUnlessEqual(already2, set([0,1,2]))
356         self.failUnlessEqual(set(writers2.keys()), set([5]))
357
358         # aborting the writes should remove the tempfiles
359         for i,wb in writers2.items():
360             wb.remote_abort()
361         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362         self.failUnlessEqual(already2, set([0,1,2]))
363         self.failUnlessEqual(set(writers2.keys()), set([5]))
364
365         for i,wb in writers2.items():
366             wb.remote_abort()
367         for i,wb in writers.items():
368             wb.remote_abort()
369
370     def test_disconnect(self):
371         # simulate a disconnection
372         ss = self.create("test_disconnect")
373         canary = FakeCanary()
374         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
375         self.failUnlessEqual(already, set())
376         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
377         for (f,args,kwargs) in canary.disconnectors.values():
378             f(*args, **kwargs)
379         del already
380         del writers
381
382         # that ought to delete the incoming shares
383         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
384         self.failUnlessEqual(already, set())
385         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
386
387     def test_reserved_space(self):
388         ss = self.create("test_reserved_space", reserved_space=10000,
389                          klass=FakeDiskStorageServer)
390         # the FakeDiskStorageServer doesn't do real statvfs() calls
391         ss.DISKAVAIL = 15000
392         # 15k available, 10k reserved, leaves 5k for shares
393
394         # a newly created and filled share incurs this much overhead, beyond
395         # the size we request.
396         OVERHEAD = 3*4
397         LEASE_SIZE = 4+32+32+4
398         canary = FakeCanary(True)
399         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
400         self.failUnlessEqual(len(writers), 3)
401         # now the StorageServer should have 3000 bytes provisionally
402         # allocated, allowing only 2000 more to be claimed
403         self.failUnlessEqual(len(ss._active_writers), 3)
404
405         # allocating 1001-byte shares only leaves room for one
406         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
407         self.failUnlessEqual(len(writers2), 1)
408         self.failUnlessEqual(len(ss._active_writers), 4)
409
410         # we abandon the first set, so their provisional allocation should be
411         # returned
412         del already
413         del writers
414         self.failUnlessEqual(len(ss._active_writers), 1)
415         # now we have a provisional allocation of 1001 bytes
416
417         # and we close the second set, so their provisional allocation should
418         # become real, long-term allocation, and grows to include the
419         # overhead.
420         for bw in writers2.values():
421             bw.remote_write(0, "a"*25)
422             bw.remote_close()
423         del already2
424         del writers2
425         del bw
426         self.failUnlessEqual(len(ss._active_writers), 0)
427
428         allocated = 1001 + OVERHEAD + LEASE_SIZE
429
430         # we have to manually increase DISKAVAIL, since we're not doing real
431         # disk measurements
432         ss.DISKAVAIL -= allocated
433
434         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
435         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
436         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
437         self.failUnlessEqual(len(writers3), 39)
438         self.failUnlessEqual(len(ss._active_writers), 39)
439
440         del already3
441         del writers3
442         self.failUnlessEqual(len(ss._active_writers), 0)
443         ss.disownServiceParent()
444         del ss
445
446     def test_seek(self):
447         basedir = self.workdir("test_seek_behavior")
448         fileutil.make_dirs(basedir)
449         filename = os.path.join(basedir, "testfile")
450         f = open(filename, "wb")
451         f.write("start")
452         f.close()
453         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
454         # files. mode="a" preserves previous contents but does not allow
455         # seeking-to-create-holes. mode="r+" allows both.
456         f = open(filename, "rb+")
457         f.seek(100)
458         f.write("100")
459         f.close()
460         filelen = os.stat(filename)[stat.ST_SIZE]
461         self.failUnlessEqual(filelen, 100+3)
462         f2 = open(filename, "rb")
463         self.failUnlessEqual(f2.read(5), "start")
464
465
466     def test_leases(self):
467         ss = self.create("test_leases")
468         canary = FakeCanary()
469         sharenums = range(5)
470         size = 100
471
472         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
473                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
474         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
475                                                      sharenums, size, canary)
476         self.failUnlessEqual(len(already), 0)
477         self.failUnlessEqual(len(writers), 5)
478         for wb in writers.values():
479             wb.remote_close()
480
481         leases = list(ss.get_leases("si0"))
482         self.failUnlessEqual(len(leases), 1)
483         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
484
485         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
486                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
487         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
488                                                      sharenums, size, canary)
489         for wb in writers.values():
490             wb.remote_close()
491
492         # take out a second lease on si1
493         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
494                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
495         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
496                                                      sharenums, size, canary)
497         self.failUnlessEqual(len(already), 5)
498         self.failUnlessEqual(len(writers), 0)
499
500         leases = list(ss.get_leases("si1"))
501         self.failUnlessEqual(len(leases), 2)
502         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
503
504         # and a third lease, using add-lease
505         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
506                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
507         ss.remote_add_lease("si1", rs2a, cs2a)
508         leases = list(ss.get_leases("si1"))
509         self.failUnlessEqual(len(leases), 3)
510         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
511
512         # add-lease on a missing storage index is silently ignored
513         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
514
515         # check that si0 is readable
516         readers = ss.remote_get_buckets("si0")
517         self.failUnlessEqual(len(readers), 5)
518
519         # renew the first lease. Only the proper renew_secret should work
520         ss.remote_renew_lease("si0", rs0)
521         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
522         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
523
524         # check that si0 is still readable
525         readers = ss.remote_get_buckets("si0")
526         self.failUnlessEqual(len(readers), 5)
527
528         # now cancel it
529         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
530         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
531         ss.remote_cancel_lease("si0", cs0)
532
533         # si0 should now be gone
534         readers = ss.remote_get_buckets("si0")
535         self.failUnlessEqual(len(readers), 0)
536         # and the renew should no longer work
537         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
538
539
540         # cancel the first lease on si1, leaving the second and third in place
541         ss.remote_cancel_lease("si1", cs1)
542         readers = ss.remote_get_buckets("si1")
543         self.failUnlessEqual(len(readers), 5)
544         # the corresponding renew should no longer work
545         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
546
547         leases = list(ss.get_leases("si1"))
548         self.failUnlessEqual(len(leases), 2)
549         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
550
551         ss.remote_renew_lease("si1", rs2)
552         # cancelling the second and third should make it go away
553         ss.remote_cancel_lease("si1", cs2)
554         ss.remote_cancel_lease("si1", cs2a)
555         readers = ss.remote_get_buckets("si1")
556         self.failUnlessEqual(len(readers), 0)
557         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
558         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
559         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
560
561         leases = list(ss.get_leases("si1"))
562         self.failUnlessEqual(len(leases), 0)
563
564
565         # test overlapping uploads
566         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
567                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
568         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
569                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
570         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
571                                                      sharenums, size, canary)
572         self.failUnlessEqual(len(already), 0)
573         self.failUnlessEqual(len(writers), 5)
574         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
575                                                        sharenums, size, canary)
576         self.failUnlessEqual(len(already2), 0)
577         self.failUnlessEqual(len(writers2), 0)
578         for wb in writers.values():
579             wb.remote_close()
580
581         leases = list(ss.get_leases("si3"))
582         self.failUnlessEqual(len(leases), 1)
583
584         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
585                                                        sharenums, size, canary)
586         self.failUnlessEqual(len(already3), 5)
587         self.failUnlessEqual(len(writers3), 0)
588
589         leases = list(ss.get_leases("si3"))
590         self.failUnlessEqual(len(leases), 2)
591
592     def test_readonly(self):
593         workdir = self.workdir("test_readonly")
594         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
595         ss.setServiceParent(self.sparent)
596
597         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
598         self.failUnlessEqual(already, set())
599         self.failUnlessEqual(writers, {})
600
601         stats = ss.get_stats()
602         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
603                              False)
604         if "storage_server.disk_avail" in stats:
605             # windows does not have os.statvfs, so it doesn't give us disk
606             # stats. But if there are stats, readonly_storage means
607             # disk_avail=0
608             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
609
610     def test_discard(self):
611         # discard is really only used for other tests, but we test it anyways
612         workdir = self.workdir("test_discard")
613         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
614         ss.setServiceParent(self.sparent)
615
616         canary = FakeCanary()
617         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
618         self.failUnlessEqual(already, set())
619         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
620         for i,wb in writers.items():
621             wb.remote_write(0, "%25d" % i)
622             wb.remote_close()
623         # since we discard the data, the shares should be present but sparse.
624         # Since we write with some seeks, the data we read back will be all
625         # zeros.
626         b = ss.remote_get_buckets("vid")
627         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
628         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
629
630     def test_advise_corruption(self):
631         workdir = self.workdir("test_advise_corruption")
632         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
633         ss.setServiceParent(self.sparent)
634
635         si0_s = base32.b2a("si0")
636         ss.remote_advise_corrupt_share("immutable", "si0", 0,
637                                        "This share smells funny.\n")
638         reportdir = os.path.join(workdir, "corruption-advisories")
639         reports = os.listdir(reportdir)
640         self.failUnlessEqual(len(reports), 1)
641         report_si0 = reports[0]
642         self.failUnless(si0_s in report_si0, report_si0)
643         f = open(os.path.join(reportdir, report_si0), "r")
644         report = f.read()
645         f.close()
646         self.failUnless("type: immutable" in report)
647         self.failUnless(("storage_index: %s" % si0_s) in report)
648         self.failUnless("share_number: 0" in report)
649         self.failUnless("This share smells funny." in report)
650
651         # test the RIBucketWriter version too
652         si1_s = base32.b2a("si1")
653         already,writers = self.allocate(ss, "si1", [1], 75)
654         self.failUnlessEqual(already, set())
655         self.failUnlessEqual(set(writers.keys()), set([1]))
656         writers[1].remote_write(0, "data")
657         writers[1].remote_close()
658
659         b = ss.remote_get_buckets("si1")
660         self.failUnlessEqual(set(b.keys()), set([1]))
661         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
662
663         reports = os.listdir(reportdir)
664         self.failUnlessEqual(len(reports), 2)
665         report_si1 = [r for r in reports if si1_s in r][0]
666         f = open(os.path.join(reportdir, report_si1), "r")
667         report = f.read()
668         f.close()
669         self.failUnless("type: immutable" in report)
670         self.failUnless(("storage_index: %s" % si1_s) in report)
671         self.failUnless("share_number: 1" in report)
672         self.failUnless("This share tastes like dust." in report)
673
674
675
676 class MutableServer(unittest.TestCase):
677
678     def setUp(self):
679         self.sparent = LoggingServiceParent()
680         self._lease_secret = itertools.count()
681     def tearDown(self):
682         return self.sparent.stopService()
683
684     def workdir(self, name):
685         basedir = os.path.join("storage", "MutableServer", name)
686         return basedir
687
688     def create(self, name):
689         workdir = self.workdir(name)
690         ss = StorageServer(workdir, "\x00" * 20)
691         ss.setServiceParent(self.sparent)
692         return ss
693
694     def test_create(self):
695         ss = self.create("test_create")
696
697     def write_enabler(self, we_tag):
698         return hashutil.tagged_hash("we_blah", we_tag)
699
700     def renew_secret(self, tag):
701         return hashutil.tagged_hash("renew_blah", str(tag))
702
703     def cancel_secret(self, tag):
704         return hashutil.tagged_hash("cancel_blah", str(tag))
705
706     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
707         write_enabler = self.write_enabler(we_tag)
708         renew_secret = self.renew_secret(lease_tag)
709         cancel_secret = self.cancel_secret(lease_tag)
710         rstaraw = ss.remote_slot_testv_and_readv_and_writev
711         testandwritev = dict( [ (shnum, ([], [], None) )
712                          for shnum in sharenums ] )
713         readv = []
714         rc = rstaraw(storage_index,
715                      (write_enabler, renew_secret, cancel_secret),
716                      testandwritev,
717                      readv)
718         (did_write, readv_data) = rc
719         self.failUnless(did_write)
720         self.failUnless(isinstance(readv_data, dict))
721         self.failUnlessEqual(len(readv_data), 0)
722
723     def test_container_size(self):
724         ss = self.create("test_container_size")
725         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
726                       set([0,1,2]), 100)
727         rstaraw = ss.remote_slot_testv_and_readv_and_writev
728         secrets = ( self.write_enabler("we1"),
729                     self.renew_secret("we1"),
730                     self.cancel_secret("we1") )
731         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
732         answer = rstaraw("si1", secrets,
733                          {0: ([], [(0,data)], len(data)+12)},
734                          [])
735         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
736
737         # trying to make the container too large will raise an exception
738         TOOBIG = MutableShareFile.MAX_SIZE + 10
739         self.failUnlessRaises(DataTooLargeError,
740                               rstaraw, "si1", secrets,
741                               {0: ([], [(0,data)], TOOBIG)},
742                               [])
743
744         # it should be possible to make the container smaller, although at
745         # the moment this doesn't actually affect the share
746         answer = rstaraw("si1", secrets,
747                          {0: ([], [(0,data)], len(data)+8)},
748                          [])
749         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
750
751     def test_allocate(self):
752         ss = self.create("test_allocate")
753         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
754                       set([0,1,2]), 100)
755
756         read = ss.remote_slot_readv
757         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
758                              {0: [""]})
759         self.failUnlessEqual(read("si1", [], [(0, 10)]),
760                              {0: [""], 1: [""], 2: [""]})
761         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
762                              {0: [""]})
763
764         # try writing to one
765         secrets = ( self.write_enabler("we1"),
766                     self.renew_secret("we1"),
767                     self.cancel_secret("we1") )
768         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
769         write = ss.remote_slot_testv_and_readv_and_writev
770         answer = write("si1", secrets,
771                        {0: ([], [(0,data)], None)},
772                        [])
773         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
774
775         self.failUnlessEqual(read("si1", [0], [(0,20)]),
776                              {0: ["00000000001111111111"]})
777         self.failUnlessEqual(read("si1", [0], [(95,10)]),
778                              {0: ["99999"]})
779         #self.failUnlessEqual(s0.remote_get_length(), 100)
780
781         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
782         f = self.failUnlessRaises(BadWriteEnablerError,
783                                   write, "si1", bad_secrets,
784                                   {}, [])
785         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
786
787         # this testv should fail
788         answer = write("si1", secrets,
789                        {0: ([(0, 12, "eq", "444444444444"),
790                              (20, 5, "eq", "22222"),
791                              ],
792                             [(0, "x"*100)],
793                             None),
794                         },
795                        [(0,12), (20,5)],
796                        )
797         self.failUnlessEqual(answer, (False,
798                                       {0: ["000000000011", "22222"],
799                                        1: ["", ""],
800                                        2: ["", ""],
801                                        }))
802         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
803
804         # as should this one
805         answer = write("si1", secrets,
806                        {0: ([(10, 5, "lt", "11111"),
807                              ],
808                             [(0, "x"*100)],
809                             None),
810                         },
811                        [(10,5)],
812                        )
813         self.failUnlessEqual(answer, (False,
814                                       {0: ["11111"],
815                                        1: [""],
816                                        2: [""]},
817                                       ))
818         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
819
820
821     def test_operators(self):
822         # test operators, the data we're comparing is '11111' in all cases.
823         # test both fail+pass, reset data after each one.
824         ss = self.create("test_operators")
825
826         secrets = ( self.write_enabler("we1"),
827                     self.renew_secret("we1"),
828                     self.cancel_secret("we1") )
829         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
830         write = ss.remote_slot_testv_and_readv_and_writev
831         read = ss.remote_slot_readv
832
833         def reset():
834             write("si1", secrets,
835                   {0: ([], [(0,data)], None)},
836                   [])
837
838         reset()
839
840         #  lt
841         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
842                                              ],
843                                             [(0, "x"*100)],
844                                             None,
845                                             )}, [(10,5)])
846         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
847         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
848         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
849         reset()
850
851         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
852                                              ],
853                                             [(0, "x"*100)],
854                                             None,
855                                             )}, [(10,5)])
856         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
857         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
858         reset()
859
860         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
861                                              ],
862                                             [(0, "y"*100)],
863                                             None,
864                                             )}, [(10,5)])
865         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
866         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
867         reset()
868
869         #  le
870         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
871                                              ],
872                                             [(0, "x"*100)],
873                                             None,
874                                             )}, [(10,5)])
875         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
876         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
877         reset()
878
879         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
880                                              ],
881                                             [(0, "y"*100)],
882                                             None,
883                                             )}, [(10,5)])
884         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
885         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
886         reset()
887
888         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
889                                              ],
890                                             [(0, "y"*100)],
891                                             None,
892                                             )}, [(10,5)])
893         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
894         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
895         reset()
896
897         #  eq
898         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
899                                              ],
900                                             [(0, "x"*100)],
901                                             None,
902                                             )}, [(10,5)])
903         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
904         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
905         reset()
906
907         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
908                                              ],
909                                             [(0, "y"*100)],
910                                             None,
911                                             )}, [(10,5)])
912         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
913         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
914         reset()
915
916         #  ne
917         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
918                                              ],
919                                             [(0, "x"*100)],
920                                             None,
921                                             )}, [(10,5)])
922         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
923         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
924         reset()
925
926         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
927                                              ],
928                                             [(0, "y"*100)],
929                                             None,
930                                             )}, [(10,5)])
931         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
932         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
933         reset()
934
935         #  ge
936         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
937                                              ],
938                                             [(0, "y"*100)],
939                                             None,
940                                             )}, [(10,5)])
941         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
942         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
943         reset()
944
945         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
946                                              ],
947                                             [(0, "y"*100)],
948                                             None,
949                                             )}, [(10,5)])
950         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
951         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
952         reset()
953
954         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
955                                              ],
956                                             [(0, "y"*100)],
957                                             None,
958                                             )}, [(10,5)])
959         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
960         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
961         reset()
962
963         #  gt
964         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
965                                              ],
966                                             [(0, "y"*100)],
967                                             None,
968                                             )}, [(10,5)])
969         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
970         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
971         reset()
972
973         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
974                                              ],
975                                             [(0, "x"*100)],
976                                             None,
977                                             )}, [(10,5)])
978         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
979         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
980         reset()
981
982         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
983                                              ],
984                                             [(0, "x"*100)],
985                                             None,
986                                             )}, [(10,5)])
987         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
988         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
989         reset()
990
991         # finally, test some operators against empty shares
992         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
993                                              ],
994                                             [(0, "x"*100)],
995                                             None,
996                                             )}, [(10,5)])
997         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
998         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
999         reset()
1000
1001     def test_readv(self):
1002         ss = self.create("test_readv")
1003         secrets = ( self.write_enabler("we1"),
1004                     self.renew_secret("we1"),
1005                     self.cancel_secret("we1") )
1006         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1007         write = ss.remote_slot_testv_and_readv_and_writev
1008         read = ss.remote_slot_readv
1009         data = [("%d" % i) * 100 for i in range(3)]
1010         rc = write("si1", secrets,
1011                    {0: ([], [(0,data[0])], None),
1012                     1: ([], [(0,data[1])], None),
1013                     2: ([], [(0,data[2])], None),
1014                     }, [])
1015         self.failUnlessEqual(rc, (True, {}))
1016
1017         answer = read("si1", [], [(0, 10)])
1018         self.failUnlessEqual(answer, {0: ["0"*10],
1019                                       1: ["1"*10],
1020                                       2: ["2"*10]})
1021
1022     def compare_leases_without_timestamps(self, leases_a, leases_b):
1023         self.failUnlessEqual(len(leases_a), len(leases_b))
1024         for i in range(len(leases_a)):
1025             num_a, a = leases_a[i]
1026             num_b, b = leases_b[i]
1027             self.failUnlessEqual(num_a, num_b)
1028             self.failUnlessEqual(a.owner_num,       b.owner_num)
1029             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1030             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1031             self.failUnlessEqual(a.nodeid,          b.nodeid)
1032
1033     def compare_leases(self, leases_a, leases_b):
1034         self.failUnlessEqual(len(leases_a), len(leases_b))
1035         for i in range(len(leases_a)):
1036             num_a, a = leases_a[i]
1037             num_b, b = leases_b[i]
1038             self.failUnlessEqual(num_a, num_b)
1039             self.failUnlessEqual(a.owner_num,       b.owner_num)
1040             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1041             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1042             self.failUnlessEqual(a.nodeid,          b.nodeid)
1043             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1044
1045     def test_leases(self):
1046         ss = self.create("test_leases")
1047         def secrets(n):
1048             return ( self.write_enabler("we1"),
1049                      self.renew_secret("we1-%d" % n),
1050                      self.cancel_secret("we1-%d" % n) )
1051         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1052         write = ss.remote_slot_testv_and_readv_and_writev
1053         read = ss.remote_slot_readv
1054         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1055         self.failUnlessEqual(rc, (True, {}))
1056
1057         # create a random non-numeric file in the bucket directory, to
1058         # exercise the code that's supposed to ignore those.
1059         bucket_dir = os.path.join(self.workdir("test_leases"),
1060                                   "shares", storage_index_to_dir("si1"))
1061         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1062         f.write("you ought to be ignoring me\n")
1063         f.close()
1064
1065         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1066         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1067
1068         # add-lease on a missing storage index is silently ignored
1069         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1070
1071         # re-allocate the slots and use the same secrets, that should update
1072         # the lease
1073         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1074         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1075
1076         # renew it directly
1077         ss.remote_renew_lease("si1", secrets(0)[1])
1078         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1079
1080         # now allocate them with a bunch of different secrets, to trigger the
1081         # extended lease code. Use add_lease for one of them.
1082         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1083         self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1084         secrets2 = secrets(2)
1085         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1086         self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1087         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1088         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1089         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1090
1091         self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1092
1093         # cancel one of them
1094         ss.remote_cancel_lease("si1", secrets(5)[2])
1095         self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1096
1097         all_leases = s0.debug_get_leases()
1098         # and write enough data to expand the container, forcing the server
1099         # to move the leases
1100         write("si1", secrets(0),
1101               {0: ([], [(0,data)], 200), },
1102               [])
1103
1104         # read back the leases, make sure they're still intact.
1105         self.compare_leases_without_timestamps(all_leases,
1106                                                s0.debug_get_leases())
1107
1108         ss.remote_renew_lease("si1", secrets(0)[1])
1109         ss.remote_renew_lease("si1", secrets(1)[1])
1110         ss.remote_renew_lease("si1", secrets(2)[1])
1111         ss.remote_renew_lease("si1", secrets(3)[1])
1112         ss.remote_renew_lease("si1", secrets(4)[1])
1113         self.compare_leases_without_timestamps(all_leases,
1114                                                s0.debug_get_leases())
1115         # get a new copy of the leases, with the current timestamps. Reading
1116         # data and failing to renew/cancel leases should leave the timestamps
1117         # alone.
1118         all_leases = s0.debug_get_leases()
1119         # renewing with a bogus token should prompt an error message
1120
1121         # examine the exception thus raised, make sure the old nodeid is
1122         # present, to provide for share migration
1123         e = self.failUnlessRaises(IndexError,
1124                                   ss.remote_renew_lease, "si1",
1125                                   secrets(20)[1])
1126         e_s = str(e)
1127         self.failUnless("Unable to renew non-existent lease" in e_s)
1128         self.failUnless("I have leases accepted by nodeids:" in e_s)
1129         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1130
1131         # same for cancelling
1132         self.failUnlessRaises(IndexError,
1133                               ss.remote_cancel_lease, "si1",
1134                               secrets(20)[2])
1135         self.compare_leases(all_leases, s0.debug_get_leases())
1136
1137         # reading shares should not modify the timestamp
1138         read("si1", [], [(0,200)])
1139         self.compare_leases(all_leases, s0.debug_get_leases())
1140
1141         write("si1", secrets(0),
1142               {0: ([], [(200, "make me bigger")], None)}, [])
1143         self.compare_leases_without_timestamps(all_leases,
1144                                                s0.debug_get_leases())
1145
1146         write("si1", secrets(0),
1147               {0: ([], [(500, "make me really bigger")], None)}, [])
1148         self.compare_leases_without_timestamps(all_leases,
1149                                                s0.debug_get_leases())
1150
1151         # now cancel them all
1152         ss.remote_cancel_lease("si1", secrets(0)[2])
1153         ss.remote_cancel_lease("si1", secrets(1)[2])
1154         ss.remote_cancel_lease("si1", secrets(2)[2])
1155         ss.remote_cancel_lease("si1", secrets(3)[2])
1156
1157         # the slot should still be there
1158         remaining_shares = read("si1", [], [(0,10)])
1159         self.failUnlessEqual(len(remaining_shares), 1)
1160         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1161
1162         # cancelling a non-existent lease should raise an IndexError
1163         self.failUnlessRaises(IndexError,
1164                               ss.remote_cancel_lease, "si1", "nonsecret")
1165
1166         # and the slot should still be there
1167         remaining_shares = read("si1", [], [(0,10)])
1168         self.failUnlessEqual(len(remaining_shares), 1)
1169         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1170
1171         ss.remote_cancel_lease("si1", secrets(4)[2])
1172         # now the slot should be gone
1173         no_shares = read("si1", [], [(0,10)])
1174         self.failUnlessEqual(no_shares, {})
1175
1176         # cancelling a lease on a non-existent share should raise an IndexError
1177         self.failUnlessRaises(IndexError,
1178                               ss.remote_cancel_lease, "si2", "nonsecret")
1179
1180     def test_remove(self):
1181         ss = self.create("test_remove")
1182         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1183                       set([0,1,2]), 100)
1184         readv = ss.remote_slot_readv
1185         writev = ss.remote_slot_testv_and_readv_and_writev
1186         secrets = ( self.write_enabler("we1"),
1187                     self.renew_secret("we1"),
1188                     self.cancel_secret("we1") )
1189         # delete sh0 by setting its size to zero
1190         answer = writev("si1", secrets,
1191                         {0: ([], [], 0)},
1192                         [])
1193         # the answer should mention all the shares that existed before the
1194         # write
1195         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1196         # but a new read should show only sh1 and sh2
1197         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1198                              {1: [""], 2: [""]})
1199
1200         # delete sh1 by setting its size to zero
1201         answer = writev("si1", secrets,
1202                         {1: ([], [], 0)},
1203                         [])
1204         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1205         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1206                              {2: [""]})
1207
1208         # delete sh2 by setting its size to zero
1209         answer = writev("si1", secrets,
1210                         {2: ([], [], 0)},
1211                         [])
1212         self.failUnlessEqual(answer, (True, {2:[]}) )
1213         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1214                              {})
1215         # and the bucket directory should now be gone
1216         si = base32.b2a("si1")
1217         # note: this is a detail of the storage server implementation, and
1218         # may change in the future
1219         prefix = si[:2]
1220         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1221         bucketdir = os.path.join(prefixdir, si)
1222         self.failUnless(os.path.exists(prefixdir))
1223         self.failIf(os.path.exists(bucketdir))
1224
1225 class Stats(unittest.TestCase):
1226
1227     def setUp(self):
1228         self.sparent = LoggingServiceParent()
1229         self._lease_secret = itertools.count()
1230     def tearDown(self):
1231         return self.sparent.stopService()
1232
1233     def workdir(self, name):
1234         basedir = os.path.join("storage", "Server", name)
1235         return basedir
1236
1237     def create(self, name):
1238         workdir = self.workdir(name)
1239         ss = StorageServer(workdir, "\x00" * 20)
1240         ss.setServiceParent(self.sparent)
1241         return ss
1242
1243     def test_latencies(self):
1244         ss = self.create("test_latencies")
1245         for i in range(10000):
1246             ss.add_latency("allocate", 1.0 * i)
1247         for i in range(1000):
1248             ss.add_latency("renew", 1.0 * i)
1249         for i in range(10):
1250             ss.add_latency("cancel", 2.0 * i)
1251         ss.add_latency("get", 5.0)
1252
1253         output = ss.get_latencies()
1254
1255         self.failUnlessEqual(sorted(output.keys()),
1256                              sorted(["allocate", "renew", "cancel", "get"]))
1257         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1258         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1259         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1260         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1261         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1262         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1263         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1264         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1265         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1266
1267         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1268         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1269         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1270         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1271         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1272         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1273         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1274         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1275         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1276
1277         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1278         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1279         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1280         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1281         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1282         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1283         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1284         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1285         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1286
1287         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1288         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1289         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1290         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1291         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1292         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1293         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1294         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1295         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1296
1297 def remove_tags(s):
1298     s = re.sub(r'<[^>]*>', ' ', s)
1299     s = re.sub(r'\s+', ' ', s)
1300     return s
1301
1302 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1303
1304     def setUp(self):
1305         self.s = service.MultiService()
1306         self.s.startService()
1307     def tearDown(self):
1308         return self.s.stopService()
1309
1310     def test_bucket_counter(self):
1311         basedir = "storage/BucketCounter/bucket_counter"
1312         fileutil.make_dirs(basedir)
1313         ss = StorageServer(basedir, "\x00" * 20)
1314         # to make sure we capture the bucket-counting-crawler in the middle
1315         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1316         # also make it start sooner than usual.
1317         ss.bucket_counter.slow_start = 0
1318         orig_cpu_slice = ss.bucket_counter.cpu_slice
1319         ss.bucket_counter.cpu_slice = 0
1320         ss.setServiceParent(self.s)
1321
1322         w = StorageStatus(ss)
1323
1324         # this sample is before the crawler has started doing anything
1325         html = w.renderSynchronously()
1326         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1327         s = remove_tags(html)
1328         self.failUnless("Accepting new shares: Yes" in s, s)
1329         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1330         self.failUnless("Total buckets: Not computed yet" in s, s)
1331         self.failUnless("Next crawl in" in s, s)
1332
1333         # give the bucket-counting-crawler one tick to get started. The
1334         # cpu_slice=0 will force it to yield right after it processes the
1335         # first prefix
1336
1337         d = eventual.fireEventually()
1338         def _check(ignored):
1339             # are we really right after the first prefix?
1340             state = ss.bucket_counter.get_state()
1341             self.failUnlessEqual(state["last-complete-prefix"],
1342                                  ss.bucket_counter.prefixes[0])
1343             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1344             html = w.renderSynchronously()
1345             s = remove_tags(html)
1346             self.failUnless(" Current crawl " in s, s)
1347             self.failUnless(" (next work in " in s, s)
1348         d.addCallback(_check)
1349
1350         # now give it enough time to complete a full cycle
1351         def _watch():
1352             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1353         d.addCallback(lambda ignored: self.poll(_watch))
1354         def _check2(ignored):
1355             ss.bucket_counter.cpu_slice = orig_cpu_slice
1356             html = w.renderSynchronously()
1357             s = remove_tags(html)
1358             self.failUnless("Total buckets: 0 (the number of" in s, s)
1359             self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
1360         d.addCallback(_check2)
1361         return d
1362
1363     def test_bucket_counter_cleanup(self):
1364         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1365         fileutil.make_dirs(basedir)
1366         ss = StorageServer(basedir, "\x00" * 20)
1367         # to make sure we capture the bucket-counting-crawler in the middle
1368         # of a cycle, we reach in and reduce its maximum slice time to 0.
1369         ss.bucket_counter.slow_start = 0
1370         orig_cpu_slice = ss.bucket_counter.cpu_slice
1371         ss.bucket_counter.cpu_slice = 0
1372         ss.setServiceParent(self.s)
1373
1374         d = eventual.fireEventually()
1375
1376         def _after_first_prefix(ignored):
1377             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1378             # now sneak in and mess with its state, to make sure it cleans up
1379             # properly at the end of the cycle
1380             state = ss.bucket_counter.state
1381             self.failUnlessEqual(state["last-complete-prefix"],
1382                                  ss.bucket_counter.prefixes[0])
1383             state["bucket-counts"][-12] = {}
1384             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1385             ss.bucket_counter.save_state()
1386         d.addCallback(_after_first_prefix)
1387
1388         # now give it enough time to complete a cycle
1389         def _watch():
1390             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1391         d.addCallback(lambda ignored: self.poll(_watch))
1392         def _check2(ignored):
1393             ss.bucket_counter.cpu_slice = orig_cpu_slice
1394             s = ss.bucket_counter.get_state()
1395             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1396             self.failIf("bogusprefix!" in s["storage-index-samples"],
1397                         s["storage-index-samples"].keys())
1398         d.addCallback(_check2)
1399         return d
1400
1401 class NoStatvfsServer(StorageServer):
1402     def do_statvfs(self):
1403         raise AttributeError
1404
1405 class WebStatus(unittest.TestCase, pollmixin.PollMixin):
1406
1407     def setUp(self):
1408         self.s = service.MultiService()
1409         self.s.startService()
1410     def tearDown(self):
1411         return self.s.stopService()
1412
1413     def test_no_server(self):
1414         w = StorageStatus(None)
1415         html = w.renderSynchronously()
1416         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1417
1418     def test_status(self):
1419         basedir = "storage/WebStatus/status"
1420         fileutil.make_dirs(basedir)
1421         ss = StorageServer(basedir, "\x00" * 20)
1422         ss.setServiceParent(self.s)
1423         w = StorageStatus(ss)
1424         html = w.renderSynchronously()
1425         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1426         s = remove_tags(html)
1427         self.failUnless("Accepting new shares: Yes" in s, s)
1428         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1429
1430     def test_status_no_statvfs(self):
1431         # windows has no os.statvfs . Make sure the code handles that even on
1432         # unix.
1433         basedir = "storage/WebStatus/status_no_statvfs"
1434         fileutil.make_dirs(basedir)
1435         ss = NoStatvfsServer(basedir, "\x00" * 20)
1436         ss.setServiceParent(self.s)
1437         w = StorageStatus(ss)
1438         html = w.renderSynchronously()
1439         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1440         s = remove_tags(html)
1441         self.failUnless("Accepting new shares: Yes" in s, s)
1442         self.failUnless("Total disk space: ?" in s, s)
1443
1444     def test_readonly(self):
1445         basedir = "storage/WebStatus/readonly"
1446         fileutil.make_dirs(basedir)
1447         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
1448         ss.setServiceParent(self.s)
1449         w = StorageStatus(ss)
1450         html = w.renderSynchronously()
1451         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1452         s = remove_tags(html)
1453         self.failUnless("Accepting new shares: No" in s, s)
1454
1455     def test_reserved(self):
1456         basedir = "storage/WebStatus/reserved"
1457         fileutil.make_dirs(basedir)
1458         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1459         ss.setServiceParent(self.s)
1460         w = StorageStatus(ss)
1461         html = w.renderSynchronously()
1462         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1463         s = remove_tags(html)
1464         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1465
1466     def test_huge_reserved(self):
1467         basedir = "storage/WebStatus/reserved"
1468         fileutil.make_dirs(basedir)
1469         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1470         ss.setServiceParent(self.s)
1471         w = StorageStatus(ss)
1472         html = w.renderSynchronously()
1473         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1474         s = remove_tags(html)
1475         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1476
1477     def test_util(self):
1478         w = StorageStatus(None)
1479         self.failUnlessEqual(w.render_space(None, None), "?")
1480         self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
1481         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
1482         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
1483